SQLAlchemy Bulk Insert & Update 알아보기

오늘은 PostgreSQL 데이터베이스에 SQLAlchemy Bulk Insert & Update 작업을 해보려고 합니다.

PostgreSQL 데이터베이스는 지난 번과 마찬가지로 로컬에 도커 컨테이너를 따로 띄어놓고 작업해보겠습니다.

개요

다량의 데이터를 추가/갱신할 때 레거시 코드처럼 Session.add() 메소드는 적절하지 못해보여서 벌크 작업으로 알아보았습니다.

이 상황에서 사용할 수 있는 메소드는 add_all, bulk_save_objects, bulk_insert_mappings / bulk_update_mappings 정도로 나열할 수 있습니다.

또한 SQLAlchemy Core도 포함할 수 있습니다.

PostgreSQL docker-compose 구성

작업하기 전, PostgreSQL 컨테이너로 실행해서 테스트할 데이터베이스를 준비합니다.

version: '3.8'
services:
  postgres:
    image: postgres:14.1
    restart: always
    environment:
      POSTGRES_PASSWORD: password
      POSTGRES_USER: admin
      POSTGRES_DB: content
      POSTGRES_INITDB_ARGS: --encoding=UTF-8
    volumes:
    - ./pg_data:/var/lib/postgresql/data
    - ./init-user-db.sh:/docker-entrypoint-initdb.d/init-user-db.sh
    ports:
    - "5432:5432"

postgres 이미지로 시작하며, 볼륨과 init-user-db.sh 파일을 지정했습니다.

#!/bin/bash
set -e

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
    CREATE TABLE article (
    id SERIAL NOT NULL,
    description VARCHAR(256) NOT NULL,
    CONSTRAINT id_pk PRIMARY KEY (id)
);
EOSQL
chmod +x init-user-db.sh

위 init-user-db.sh 스크립트로 테이블을 미리 만들고 스크립트로 실행 가능하도록 변경합니다.

패키지 준비

SQLAlchemy==1.4.29
psycopg2-binary==2.9.3

PostgreSQL 데이터베이스를 조작할 예정이므로 SQLAlchemy, psycopg2-binary 패키지를 준비합니다.

코딩

  • 엔진
@contextlib.contextmanager
def session_manager():
    engine = create_engine(DB_URL, echo=True)
    session = Session(bind=engine)
    Base.metadata.create_all(engine)
    yield session
    session.close()

내장되어있는 contextlib 라이브러리로 컨텍스트 매니저를 만들어 세션에 사용합니다.

세션을 사용하고 닫습니다.

  • 테이블
class Article(Base):
    __tablename__ = "article"
    id = Column(Integer, primary_key=True)
    description = Column(String(255))

스펙대로 테이블 클래스를 만들어서 준비합니다.

  • add_all
with session_manager() as session:
    session.add_all(
        [Article(description="test 1"), Article(description="test 2"), Article(description="test 3")]
    )
    session.commit()

먼저 add_all 구현은 add를 반복 호출하게 랩핑되었으므로 실제 동작은 아래와 같이 구성됩니다.

def add_all(self, instances):
    for instance in [Article(description="test 1"), Article(description="test 2"), Article(description="test 3")]:
      self.add(instance, _warn=False)
    self.commit()

그러므로 실제 벌크 동작보다 여러 객체를 편리하게 넣는다는 점에 초점을 맞출 수 있습니다.

  • bulk_save_objects
with session_manager() as session:
    session.bulk_save_objects(
        [Article(description="test 1"), Article(description="test 2"), Article(description="test 3")],
        return_defaults=return_defaults
    )
    session.commit()

bulk_save_objects는 이름처럼 객체들을 벌크 작업으로 저장하는 메소드로서, executemany 동작을 수행하게 됩니다.

if return_defaults and isstates:
    identity_cls = mapper._identity_class
    identity_props = [p.key for p in mapper._identity_key_props]
    for state, dict_ in states:
        state.key = (
            identity_cls,
            tuple([dict_[key] for key in identity_props]),
        )

return_defaults 파라미터로 ID 값을 포함해서 반환할 수 있지만, 위 코드처럼 내부 구현을 보면 속도에서 불리하다는 점을 알 수 있습니다.

for i in save_objs:
  i.description = i.description.upper()

session.bulk_save_objects(save_objs)
session.commit()

가져온 객체를 조작해서 bulk_save_objects 동일하게 수행해서 다시 업데이트할 수 있습니다.

  • bulk_insert_mappings / bulk_update_mappings
with session_manager() as session:
    session.bulk_insert_mappings(
        Article, [dict(description="test 1"), dict(description="test 2"), dict(description="test 3")],
        return_defaults=return_defaults
    )
    session.commit()

    session.bulk_update_mappings(
        Article, [dict(id=118, description="TEST 1"), dict(id=119, description="TEST 2")]
    )
    session.commit()

bulk_insert_mappings / bulk_update_mappings는 객체 리스트가 아닌 dictionary 리스트와 맵핑할 테이블 모델로 executemany 동작을 수행합니다.

새롭게 추가할 떄는 bulk_insert_mappings, 업데이트할 때는 동일 ROW 파악하기 위해서 PK 값도 명시하고 bulk_update_mappings를 사용합니다.

bulk_save_objects 메소드보다 객체를 만드는 순간 오버헤드를 줄일 수 있다고 합니다.

if return_defaults and isstates:
    identity_cls = mapper._identity_class
    identity_props = [p.key for p in mapper._identity_key_props]
    for state, dict_ in states:
        state.key = (
            identity_cls,
            tuple([dict_[key] for key in identity_props]),
        )

bulk_save_objects처럼 return_defaults 파라미터로 ID 값을 포함해서 반환할 수 있지만, 위 코드처럼 내부 구현을 보면 속도에서 불리하다는 점을 알 수 있습니다.

  • core
with session_manager() as session:
    with session.bind.begin() as conn:
        conn.execute(
            insert(Article.__table__),
            [dict(description="test 1"), dict(description="test 2"), dict(description="test 3")]
        )

        conn.execute(update(Article.__table__).where(Article.id == bindparam('id')).values(
                {'id': bindparam('id'), 'description': bindparam('description')}
            ), [dict(id=1, description="TEST 1"), dict(id=2, description="TEST 2")])

SQLAlchemy Core도 벌크 insert 또는 update 동작으로 진행할 수 있습니다.

ORM은 기본적으로 고성능을 위한 것이 아니므로 core insert, update를 세션에서 받아서 처리합니다.

퍼포먼스

https://docs.sqlalchemy.org/en/14/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

해당 문서에 따르면 core 로직이 가장 빠르고, bulk_insert_mappings, bulk_save_objects, ORM 순서로 느려지는 것을 볼 수 있습니다.

이처럼 같은 결과를 보여줘도 요구하는 데이터 타입과 처리 방식이 다르므로, 구현하고 싶은 로직에 따라 차이점을 파악해서 고려할 필요가 있다고 생각합니다.

감사합니다.

Written on March 15, 2022