본문 바로가기

Language/Python

Python으로 100만건 집어넣기

728x90
반응형

 

개요 

돌이켜 보면 Python을 써오면서 실 서비스에서 한 번에 많은 데이터를 다뤄야 할 일은 비교적 적었던 듯싶다. 그러다 보니  Python으로 대량의 데이터를 다뤄야 하는 경우를 맞닥뜨리면 어떤 경험치를 가지고 판단할 수 있을까 못내 궁금해하고 있었다.

 

그런 생각에 힘입어 이번 기회에 지금 사용하는 기술 수준에서 이런저런 잔머리를 굴려보며 대량의 데이터를 삽입(INSERT)할 경우 어느정도 시간이 소요되는지 측정해 보기로 했다.

 


대충 100만건 정도 가정하자. 

사실 "대량"이라고 한다면 그 기준을 정하기는 애매한 듯싶다.

 

따라서 단순하게 MySQL 5.7 에 100만 건의 데이터를 삽입해 보는 걸 목표로 잡았다. Column의 수나 데이터의 형식에 따라 100만건을 삽입하는데 변수로 작용할 수 있겠지만 이런 부분은 차치하고 다음과 같은 테이블을 하나 생성하고 이를 기준으로 했다.

CREATE TABLE `member` (
  `pk` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `nanoid` char(24) NOT NULL,
  `name` varchar(32) DEFAULT NULL,
  `age` tinyint(3) unsigned DEFAULT NULL,
  `address1` varchar(1024) DEFAULT NULL,
  `address2` varchar(1024) DEFAULT NULL,
  PRIMARY KEY (`pk`),
  KEY `member_name_IDX` (`name`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

1.  자주 사용하는 SQLAlchemy로 100만건 INSERT 해보기

SQLAlchemy  설정

SQLAlchemy는 최근 2.x 대 까지의 나왔으나 나는 1.4 대를 주로 사용했으니 이 버전을 이용해서 테스트를 진행했다. 또한 SQLAlchemy의 특징인 ORM을 활용했다. ORM을 활용하기 위해 앞서 언급한 DDL에 다음과 같은 SQLAlchemy Model을 생성하자.

from sqlalchemy import CHAR, Column, String
from sqlalchemy.dialects.mysql import INTEGER, TINYINT
from sqlalchemy.orm import declarative_base

Base = declarative_base()
metadata = Base.metadata

class Member(Base):
    __tablename__ = 'member'

    pk = Column(INTEGER(10), primary_key=True)
    nanoid = Column(CHAR(24), nullable=False)
    name = Column(String(32))
    age = Column(TINYINT(3))
    address1 = Column(String(1024))
    address2 = Column(String(1024))

 

또한 Driver는 mysqlclient를 사용했으며 Engine과 Session의 설정은 다음과 같다.

DB_URL = URL.create(
    drivername="mysql+mysqldb",
    username='root',
    password="1234",
    database="demo",
    host="127.0.0.1",
    port=19501
)

def get_engine() -> engine.Engine:
    return engine.create_engine(
        DB_URL,
        pool_pre_ping=False,
        pool_recycle=3600,
        pool_size=50,
        max_overflow=100,
        pool_timeout=10,
        echo=False
    )

def get_session(engine) -> Session:
    return = scoped_session(sessionmaker(
        bind=engine,
        expire_on_commit=False,
        autocommit=False,
        autoflush=False
    ))

 

SQLAlchemy Model 초기화

개요에서 언급했듯 100만건 삽입이 목표다. 바로 위에서 생성한 Member Class를 100만 건 정도 생성하는데 다음 코드를 사용했다.

def generate_member_object():
    start_time = time.time()

    names = [get_random_with_urandom() for _ in range(MILLION)]
    ages = [random.randint(1, 100) for _ in range(MILLION)]
    address1s = [get_random_with_urandom() for _ in range(MILLION)]
    address2s = [get_random_with_urandom() for _ in range(MILLION)]
    ids = [nanoid.generate(size=12) for _ in range(MILLION)]

    grouping = []

    for i in range(MILLION):
        grouping.append(Member(
            nanoid=ids[i],
            name=names[i],
            age=ages[i],
            address1=address1s[i],
            address2=address2s[i]
        ))


    print("MEMBER OBJECT SPENT TIME", time.time() - start_time)

    return grouping

 

단순 삽입이 목표라 컬럼별 최대 데이터 크기를 목표로 생성하진 않았다.

 

Member Class 생성 시, 처음에는 uuid를 사용해 100만건100만 건 을 생성했다. 결과는 17~18초 정도가 나왔는데 os.urandom을 사용하니 13~14초 정도로 단축할 수 있었다.

 

uuid를 통해 100만 건을 생성하는 것은 생각보다 느리다는 점이 새로 알게 부분이다.

 

위 코드에서 os.urandom을 이용해서 100만건 생성에 걸리는 시간을 줄였지만 실제로 랜덤값 생성으로 쓰기에는 보안성이 약한 함수이니 참고하도록 하자.

 

 

add() 로 100만 건 삽입하기

어느 정도 구색은 갖춰졌으니 이제 100만 건 삽입을 시작해 보자.

 

처음은 단일 레코드를 집어넣는 방식인 add() 함수를 사용하자. 딱히 테스트를 안 해도 느리다는 건 예측할 수 있지만 그래도 얼마나 걸릴까 싶어 측정해 봤다.

 

100만 건 삽입에 사용된 코드는 다음과 같다.

def save_member(engine, members):
    s = get_session(engine)

    for member in members:
        s.add(member)

    s.commit()
    s.close()

 

코드를 보면 알 수 있다시피 for 문을 돌면서 일일이 add()를 호출해 삽입한다. 또한 add_all()이라는 함수가 존재하는데 결국 add()를 호출하기 때문에 add_all()은 측정에 사용하지 않았다.

 

위 코드를 통해 100만 건을 데이터를 집어넣는데 걸리는 시간을 측정한 결과는 다음과 같다.

598.400171995163 (sec)

 

9분 58.4초 정도이다.

 

bulk_save_objects()로 100만건 삽입하기

bulk_save_objects()는 MySQL의 경우 execute many를 사용해서 데이터를 집어넣는 방식이다. 이 함수를 사용하는 경우 SQL은 다음과 같이 생성된다.

INSERT INTO ... VALUES ...

 

bulk_save_objects()의 장점은 SQLAlchmey의 Model을 Python List로 바인딩시켜 그대로 INSERT 할 수 있다는 점이다. "execute many" 메커니즘을 사용하기 때문에 단순 레코드 삽입보다는 빠를 것으로 예상되지만 얼마나 빠를까?

 

bulk_save_objects를 사용해 100만 건 삽입에 사용된 코드는 다음과 같다.

def save_member(engine, members:List[Member]):
    s = get_session(engine)

    s.bulk_save_objects(members)

    s.commit()
    s.close()

 

삽입할 데이터를 Python 객체로 다루기 때문에 비교적 편한 방식이었다.  위 코드를  실행한 결과 측정된 수행 시간은 다음과 같다.

24.56 (sec)

 

대략 “24.56” 초이다. MySQL Driver를 pymysql로 변경한 경우 “18~19” 초가 걸렸다.

 

bulk_insert_mappings로 100만건 집어넣기

SQLAlchemy는 데이터를 집어넣기 위한 다른 방식이 또 있는데  "bulk_insert_mappings()"라는 함수이다. 이 함수는 Python의 Dictionary로 초기화된 데이터를 "execute many" 메커니즘으로 데이터를 삽입한다.

 

bulk_insert_mappings()를 사용해 100만 건 삽입에 사용된 코드는 다음과 같다.

def save_member(engine, members):
    s = get_session(engine)

    s.bulk_insert_mappings(Member, [
        {
            'nanoid': member.nanoid,
            'name': member.name,
            'age': member.age,
            'address1': member.address1,
            'address2': member.address2
        } for member in members
    ])
    s.commit()
    s.close()

 

Member Class를 다시 Dictionary로 풀어야 했기에 조금은 번거롭다. 위 코드를  실행한 결과 측정된 수행 시간은 다음과 같다.

23.82 sec

 

여러 번 실행해본 결과 25초 까지 나오는 걸로봐선 100만건 삽입에 대략 23 ~ 25초 정도가 걸리는 듯 싶다. MySQL Driver를 pymysql로 변경한 경우 “19.29” 초가 걸렸다. 이 또한 여러번 반복해 본 결과 “18~19” 초가 걸렸다. 

 

core 방식을 이용해 100만 건 집어넣기

이 방법은 SQLAlchemy에서 가장 빠르다고 알려진 core를 이용한 방법이다. core 방식은 engine 레벨에서 Raw SQL을 날리는 방식이다.

 

이 방식을 사용해 100만 건을 집어넣는 데 사용한 코드는 다음과 같다.

def insert_member():
    engine = get_engine()

    members = generate_member_object()

    start_time = time.time()

    engine.execute(
        Member.__table__.insert(),
        [
            {
                'nanoid': member.nanoid,
                'name': member.name,
                'age': member.age,
                'address1': member.address1,
                'address2': member.address2
            } for member in members
        ]
    )

    print(time.time() - start_time)

    engine.dispose()

이 방식의 수행 결과 측정된 시간은 다음과 같다.

21.05 sec

여러 번 수행한 결과 대략 "20~22초" 사이의 결과를 내는 듯하다. 이 또한 SQLALchemy의 engine을 pymysql로 변경한 경우 16 ~ 17 sec 가 걸렸다.

 

 

종합해 보면?

지금까지의 결과를  정리해 보면 다음과 같다.

METHOD mysqlclient pymysql
add 9분 58.4 초  
bulk_save_object 24.56 sec 18 ~19 sec
bulk_insert_mappings 23~25 sec 18 ~ 19sec
core 20~22sec 16 ~ 17 sec

 

즉 SQLAlchemy의 데이터 삽입 시, 성능 우위는 다음 순서이다.

  • add -> bulk_save_objects -> bulk_insert_mappings -> core

 

그런데 한 가지 의아한 점이 존재한다. SQLAlchemy에서 사용된 DB Driver이다. 

 

Python에는 MySQL과 연결하기 위한 Driver에 mysqlclient와 pymysql이 존재한다. 검색하거나 알려진 사실에 의해서는 mysqlclient가 pymysql 보다 빠르다고 하는데 위 결과에서는 실제로 pymysql이 데이터를 삽입하는데 좀 더 빠른 결과를 냈다.

 

왜 이런 결과가 나온 걸까?

 

이와 관련해서는 출처가 명확한 근거가 있는 공식 문서나 해답을 찾진 못했다. ChatGPT에 물어보니 SQLAlchemy가 DB Driver에 따라 쿼리를 생성하는데 다른 전략을 쓴다는 사실이고 mysqlclient는 C API를 사용하기 때문에 이와 관련된 작업 시에 오버헤드 들어간다는 사실 정도다.

 

이런저런 사실 끝에 정리할 수 있었던 사실은, 시스템의 신뢰성과 안정성을 고려할 때는 mysqlclient를, 데이터를 삽입하는 작업에는 pymysql을 사용하는 게 어떨지 생각을 남겨본다.

 


2. 더 개선할 수 없을까?

SQLAlchemy를 사용한 경우 10초 중후반에서 20초 중반까지 나온 게 측정된 사실이다. 이게 최선일까?

 

병렬 처리로 집어넣기

아직 삽입 속도를 더 줄일 수 있는 여지가 있다. Multi Thread를 이용한 방식이다. Python의 Multi Thread는 GIL 때문에 성능이 안 좋다는 사실이 존재하지만 이 작업에서는 CPU 바운드가 아니기 때문에 영향이 없을 듯하다.

 

SQLAlchemy의 bulk_save_objects를 multi thread로 처리하기

앞서 활용했던 bulk_save_objects()를 multi thread로 처리하는 경우다.

 

이 방식을 사용해 100만 건을 집어넣는 데 사용한 코드는 다음과 같다.

def insert_member():
    engine = get_engine()

    members = generate_member_object()

    start_time = time.time()

    ths = []

    chunk_size = 10_000
    for i in range(0, len(members), chunk_size):
        member_chnuk = members[i:i + chunk_size]
        th = threading.Thread(target=save_member, args=(engine, member_chnuk))
        ths.append(th)
        th.start()

    for th in ths:
        th.join()

    print(time.time() - start_time)

    engine.dispose()

위 실행 결과는 8.5초 정도가 나왔다. 또한 각각의 thread에 처리를 위임하기 위해 chunk로 구분했다.

 

한 가지 주목할만했던 사실은 chunk size를 늘려가면서 테스트해도 어느 순간부터는 일정한 측정 시간이 나왔다는 부분이다.

 

chunk_size가 8000일 때부터 결과가 8.5초로 계속 일정했다.

 

SQLAlchemy의 bulk_insert_objects를 multi process로 처리하기

앞서 활용했던 bulk_insert_mappings()를 multi process로 처리해 봤다.

def insert_member():
    members = generate_member_object()
    chunk_size = 10_000

    start_time = time.time()

    processes = []

    for i in range(0, len(members), chunk_size):
        member_dict = generate_member_dict(members[i:i + chunk_size])
        p = Process(target=save_member, args=(member_dict,))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    print("Time taken:", time.time() - start_time)

 

이 코드를 사용해 측정한 수행 시간은 “35.6”초이다. 더 느려진 걸 알 수 있는ㄷ, Process 간 object 전달 시, 데이터 복사 비용 때문에 소모된 시간과 각각의 Process가 engine을 설정하고 connection 맺고 끊는데 들이는 비용, 새로운 process를 실행하는 데 걸리는 시간 등이 원인으로 생각된다.

 

 

MySQLClient를 사용해 Raw Query를 Multi Thread로 처리하기

튜닝의 끝은 순정이라고 했던가.. 

 

MySQLClient를 사용해서 Raw Query를 만들고 이를 Multi Thread로 처리했다. MySQLClient는 connection pool 설정을 지원하지 않아 thread 개별에서 connection을 생성하고 닫아줬다.

def save_member(members):
    connection = Connection(
        host='127.0.0.1',
        port=19501,
        user='root',
        passwd='1234',
        db='demo',
        charset='utf8'
    )
    cursor = connection.cursor()

    sql = "INSERT INTO demo.`member` (nanoid, name, age, address1, address2) VALUES  (%(nanoid)s, %(name)s, %(age)s, %(address1)s, %(address2)s)"
    cursor.executemany(sql, [
        {
            'nanoid': member.nanoid,
            'name': member.name,
            'age': member.age,
            'address1': member.address1,
            'address2': member.address2
        } for member in members
    ])
    connection.commit()
    connection.close()

 

위 결과로 측정된 수행 시간은 12.4초 정도로 측정되었다.

 


3. 비동기로 집어넣기

중간에 병렬 처리로 돌렸어도 본질은 "동기식" 작업이었다.  모던한 파이썬 개발에서 강세를 보이고 있는 비동기 방식을 사용해서도 측정했다.

async def insert_data(pool, members):
    async with pool.acquire() as connection:
        async with connection.cursor() as cur:
            _sql = """
            INSERT INTO demo.`member` (nanoid, name, age, address1, address2)
             VALUES (%(nanoid)s, %(name)s, %(age)s, %(address1)s, %(address2)s);
            """

            await cur.executemany(
                _sql,
                [{
                    'nanoid': member.nanoid,
                    'name': member.name,
                    'age': member.age,
                    'address1': member.address1,
                    'address2': member.address2
                } for member in members])

        await connection.commit()


async def main():
    members = generate_member_object()

    pool = await aiomysql.create_pool(
        ...
        minsize=10,
        maxsize=100
    )

    import time
    start_time = time.time()

    tasks = []
    chunk_size = 10_000
    for i in range(0, len(members), chunk_size):
        member_dict = members[i:i + chunk_size]

        task = asyncio.create_task(insert_data(pool, member_dict))
        tasks.append(task)

    if tasks:
        await asyncio.gather(*tasks)

    pool.close()
    await pool.wait_closed()

    end_time = time.time()
    print(end_time - start_time)


asyncio.run(main())

 

chunk의 크기를 1만 건 그리고 connection pool을 고려한 상황에서의 측정 결과 6.7초대 측정되었다. 

 

확실히 비동기가 빠르긴 빠르다..


 

4. LOAD DATA LOCAL INFILE 방식으로 집어넣기

지금까지의 작업은 연결하는 방식을 차치하면 본질적으로 “쿼리”를 생성해 날리는 방식이었다.

 

MySQL에서 대용량 데이터를 집어넣기 위한 방법 중 csv 파일을 생성해 넣는 방식도 존재한다. 이는 “LOAD DATA LOCAL INFILE”이라는 구문을 통해서 가능하다.

 

이 방식으로 집어넣는다면 얼마나 걸릴까?

def insert_member():
    members = generate_member_object()

    member_df = pd.DataFrame([member.to_dict() for member in members])
    member_df.to_csv("./member.csv", index=False)

    import time
    start_time = time.time()
    connection = Connection(
        host='127.0.0.1',
        port=19501,
        user='root',
        passwd='1234',
        db='demo',
        charset='utf8',
        local_infile=1  # 중요
    )
    cursor = connection.cursor()

    sql = "LOAD DATA LOCAL INFILE './member.csv' INTO TABLE member FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' IGNORE 1 LINES;"
    cursor.execute(sql)
    connection.commit()

    cursor.close()
    connection.close()

    print(time.time() - start_time)

Member 객체를 csv로 저장하기 위해 중간에 pandas를 사용했다.  이후 생성된 csv 파일을 “LOAD DATA LOCAL INFILE”을 통해 DB에 넣는 코드이다.

 

이 케이스의 경우 9~10초 정도가 소모됐다.

 

측정 시간만 봤을 땐 이 방법도 나쁘진 않다. 그러나  실제 API에서 위와 같은 코드를 다룬다고 가정했을 때는 선호하는 방식이 되진 않을 것 같다.


 

마치며

지금까지 데이터 삽입에는 DB 테이블에 데이터가 없는 초기 상태를 전제로 측정된 시간을 기록했다.

 

만약 이미 데이터가 존재하는 경우이고 이에 더해 index가 적용된 경우에는 삽입할 때마다 시간이 증가될 것이다.

 

처음 100만 건을 삽입할 때는 20초대의 측정시간이 index 추가 후  100만건을 집어넣을 때는 38~40초 정도로 늘어나는 걸 확인했다.

 

단순 측정된 시간이 더 빠른 걸 사용하기보다 여러 상황과 그에 맞는 기술을 선택해서  장점을 취하는 방식이 옳을 듯하다. 끝으로 측정된 시간을 표로 정리하며 이만 글을 마친다.

 

SQLAlchemy

종류 사용 방식 mysqlclient pymysql 
SQLAlchemy add 9분 58.4 가 걸렸다.  
SQLAlchemy bulk_save_object 24.56 sec 18 ~19 sec
  bulk_insert_mappings 23~25 sec 18 ~ 19sec
  core 20~22sec 16 ~ 17 sec

 

병렬 처리

종류 사용 방식  pymysql
SQLAlchemy bulk_save_object (multi thread) 8.5 sec
SQLAlchemy bulk_insert_mappings (multi process) 35.6 sec
MySQLClient Raw Query (multi thread) 12.4 sec

 

비동기

종류 사용방식 aiomysql
aiomysql aioymsql + connection pool  6.7 esc

 

LOAD DATA LOCAL INFILE

종류 사용 방식  
Raw Query LOAD DATA LOCAL INFILE 9~10 sec
728x90
반응형