博客 / 詳情

返回

Python + Sqlalchemy 對數據庫的批量插入或更新(Upsert)

本篇承接上一篇《Mysql 數據庫的批量插入或更新(Upsert)》的內容,來看看在 Python 中,怎麼實現大量數據的 upsert(記錄存在就更新,不存在則插入)。

由於不同數據庫對這種 upsert 的實現機制不同,Sqlalchemy 也就不再試圖做一致性的封裝了,而是提供了各自的方言 API,具體到 Mysql,就是給 insert statement ,增加了 on_duplicate_key_update 方法。

基本用法

假設表數據模型如下:

class TableA(db.Model):
    __tablename__ = 'table_a'
    __table_args__ = (db.UniqueConstraint('a', 'b', name='table_a_a_b_unique'))

    id = db.Column(db.Integer, primary_key=True)
    a = db.Column(db.Integer)
    b = db.Column(db.Integer)
    c = db.Column(db.Integer)

其中 id 是自增主鍵,a, b 組成了唯一索引。那麼對應的 upsert 語句如下:
from sqlalchemy.dialects.mysql import insert

insert(TableA).values(a=1, b=2, c=3).on_duplicate_key_update(c=3)

複用數值

跟 SQL 語句類似,我們可以不用每次都重複填寫 insert 和 update 的數值:

update_keys = ['c']
insert_stmt = insert(table_cls).values(a=1, b=2, c=3)
update_columns = {x.name: x for x in insert_stmt.inserted if x.name in update_keys}
upsert_stmt = insert_stmt.on_duplicate_key_update(**update_columns)
db.session.execute(upsert_stmt)

注意,最後一句 on_duplicate_key_update 的參數是需要展開的,不接受 dict 作為參數

批量處理

同樣,insert 語句是支持傳一組數據作為參數的:

records = [{
    'a':1,
    'b':2,
    'c':3
},{
    'a':10,
    'b':20,
    'c':4
},{
    'a':20,
    'b':30,
    'c':5
}]

update_keys = ['c']
insert_stmt = insert(table_cls).values(records)
update_columns = {x.name: x for x in insert_stmt.inserted if x.name in update_keys}
upsert_stmt = insert_stmt.on_duplicate_key_update(**update_columns)
db.session.execute(upsert_stmt)

就可以實現整體的 upsert。

封裝

觀察上面的代碼,實際上 upsert 的部分是業務無關的,那麼就可以封裝一個更方便調用的通用函數了:

from sqlalchemy.dialects.mysql import insert

def upsert(table_cls, records, except_cols_on_update=[]):
    update_keys = [key for key in records[0].keys() if
                   key not in except_cols_on_update]
    insert_stmt = insert(table_cls).values(chunk)
    update_columns = {x.name: x for x in insert_stmt.inserted if x.name in update_keys}
    upsert_stmt = insert_stmt.on_duplicate_key_update(**update_columns)
    db.session.execute(upsert_stmt)

分批次生成

以上的封裝,還可以做一些改進:為避免records 數據集過大,可以分批執行 sql 語句,並通過參數決定是否要提交:

from sqlalchemy.dialects.mysql import insert

def upsert(table_cls, records, chunk_size=10000, commit_on_chunk=True, except_cols_on_update=[]):
    update_keys = [key for key in records[0].keys() if
                   key not in except_cols_on_update]
    for i in range(0, len(records), chunk_size):
        chunk = records[i:i + chunk_size]
        insert_stmt = insert(table_cls).values(chunk)
        update_columns = {x.name: x for x in insert_stmt.inserted if x.name in update_keys}
        upsert_stmt = insert_stmt.on_duplicate_key_update(**update_columns)
        db.session.execute(upsert_stmt)
        if commit_on_chunk:
            db.session.commit()

調用方式如下 :

upsert(TableA, records,
                       chunk_size=50000,
                       commit_on_chunk=True,
                       except_cols_on_update=['id', 'a', 'b'])

這時 records 可以數量很大,比如1千萬條,調用後每 5 萬條生成一條 sql 語句,並且執行後就commit(如果參數 commit_on_chunk = False,那麼函數內就一直不提交,可以結束後自行統一提交),update 語句中,避免更新 'id', 'a', 'b' 這三個字段。

我的語雀原文鏈接

user avatar u_16099302 頭像 friesonthepier 頭像 u_16175472 頭像 u_16175454 頭像 goody 頭像 fulade 頭像
6 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.