开发者

SQLAlchemy ON DUPLICATE KEY UPDATE

开发者 https://www.devze.com 2023-03-18 11:53 出处:网络
Is there an elegant way to do an INSERT ... ON DUPLICATE KEY UPDATE in SQLAlchemy? I mean something with a syntax similar to ins开发者_运维百科erter.insert().execute(list_of_dictionaries) ?ON DUPLICAT

Is there an elegant way to do an INSERT ... ON DUPLICATE KEY UPDATE in SQLAlchemy? I mean something with a syntax similar to ins开发者_运维百科erter.insert().execute(list_of_dictionaries) ?


ON DUPLICATE KEY UPDATE post version-1.2 for MySQL

This functionality is now built into SQLAlchemy for MySQL only. somada141's answer below has the best solution: https://stackoverflow.com/a/48373874/319066

ON DUPLICATE KEY UPDATE in the SQL statement

If you want the generated SQL to actually include ON DUPLICATE KEY UPDATE, the simplest way involves using a @compiles decorator.

The code (linked from a good thread on the subject on reddit) for an example can be found on github:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def append_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    if 'append_string' in insert.kwargs:
        return s + " " + insert.kwargs['append_string']
    return s


my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)

But note that in this approach, you have to manually create the append_string. You could probably change the append_string function so that it automatically changes the insert string into an insert with 'ON DUPLICATE KEY UPDATE' string, but I'm not going to do that here due to laziness.

ON DUPLICATE KEY UPDATE functionality within the ORM

SQLAlchemy does not provide an interface to ON DUPLICATE KEY UPDATE or MERGE or any other similar functionality in its ORM layer. Nevertheless, it has the session.merge() function that can replicate the functionality only if the key in question is a primary key.

session.merge(ModelObject) first checks if a row with the same primary key value exists by sending a SELECT query (or by looking it up locally). If it does, it sets a flag somewhere indicating that ModelObject is in the database already, and that SQLAlchemy should use an UPDATE query. Note that merge is quite a bit more complicated than this, but it replicates the functionality well with primary keys.

But what if you want ON DUPLICATE KEY UPDATE functionality with a non-primary key (for example, another unique key)? Unfortunately, SQLAlchemy doesn't have any such function. Instead, you have to create something that resembles Django's get_or_create(). Another StackOverflow answer covers it, and I'll just paste a modified, working version of it here for convenience.

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
        if defaults:
            params.update(defaults)
        instance = model(**params)
        return instance


I should mention that ever since the v1.2 release, the SQLAlchemy 'core' has a solution to the above with that's built in and can be seen under here (copied snippet below):

from sqlalchemy.dialects.mysql import insert

insert_stmt = insert(my_table).values(
    id='some_existing_id',
    data='inserted value')

on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
    data=insert_stmt.inserted.data,
    status='U'
)

conn.execute(on_duplicate_key_stmt)


Based on phsource's answer, and for the specific use-case of using MySQL and completely overriding the data for the same key without performing a DELETE statement, one can use the following @compiles decorated insert expression:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def append_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    if insert.kwargs.get('on_duplicate_key_update'):
        fields = s[s.find("(") + 1:s.find(")")].replace(" ", "").split(",")
        generated_directive = ["{0}=VALUES({0})".format(field) for field in fields]
        return s + " ON DUPLICATE KEY UPDATE " + ",".join(generated_directive)
    return s


It's depends upon you. If you want to replace then pass OR REPLACE in prefixes

  def bulk_insert(self,objects,table):
    #table: Your table class and objects are list of dictionary [{col1:val1, col2:vale}] 
    for counter,row in enumerate(objects):
        inserter = table.__table__.insert(prefixes=['OR IGNORE'], values=row)
        try:
            self.db.execute(inserter)
        except Exception as E:
            print E
        if counter % 100 == 0:
            self.db.commit()                    
    self.db.commit()

Here commit interval can be changed to speed up or speed down


My way

import typing
from datetime import datetime
from sqlalchemy.dialects import mysql

class MyRepository:

    def model(self):
        return MySqlAlchemyModel

    def upsert(self, data: typing.List[typing.Dict]):
        if not data:
            return
        model = self.model()
        if hasattr(model, 'created_at'):
            for item in data:
                item['created_at'] = datetime.now()

        stmt = mysql.insert(getattr(model, '__table__')).values(data)
        for_update = []
        for k, v in data[0].items():
            for_update.append(k)

        dup = {k: getattr(stmt.inserted, k) for k in for_update}
        stmt = stmt.on_duplicate_key_update(**dup)
        self.db.session.execute(stmt)
        self.db.session.commit()

Usage:

myrepo.upsert([
    {
        "field11": "value11",
        "field21": "value21",
        "field31": "value31",
    },
    {
        "field12": "value12",
        "field22": "value22",
        "field32": "value32",
    },
])


The other answers have this covered but figured I'd reference another good example for mysql I found in this gist. This also includes the use of LAST_INSERT_ID, which may be useful depending on your innodb auto increment settings and whether your table has a unique key. Lifting the code here for easy reference but please give the author a star if you find it useful.

from app import db
from sqlalchemy import func
from sqlalchemy.dialects.mysql import insert

def upsert(model, insert_dict):
    """model can be a db.Model or a table(), insert_dict should contain a primary or unique key."""
    inserted = insert(model).values(**insert_dict)
    upserted = inserted.on_duplicate_key_update(
        id=func.LAST_INSERT_ID(model.id), **{k: inserted.inserted[k]
                               for k, v in insert_dict.items()})
    res = db.engine.execute(upserted)
    return res.lastrowid


ORM use upset func based on on_duplicate_key_update

class Model():
    __input_data__ = dict()

    def __init__(self, **kwargs) -> None:
        self.__input_data__ = kwargs
        self.session = Session(engine)

    def save(self):
        self.session.add(self)
        self.session.commit()
    
    def upsert(self, *, ingore_keys = []):
        column_keys = self.__table__.columns.keys()

        udpate_data = dict()
        for key in self.__input_data__.keys():
            if key not in column_keys:
                continue
            else:
                udpate_data[key] = self.__input_data__[key]

        insert_stmt = insert(self.__table__).values(**udpate_data)

        all_ignore_keys = ['id']
        if isinstance(ingore_keys, list):
            all_ignore_keys =[*all_ignore_keys, *ingore_keys]
        else:
            all_ignore_keys.append(ingore_keys)

        udpate_columns = dict()
        for key in self.__input_data__.keys():
            if key not in column_keys or key in all_ignore_keys:
                continue
            else:
                udpate_columns[key] = insert_stmt.inserted[key]
        
        on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
            **udpate_columns
        )
        # self.session.add(self)
        self.session.execute(on_duplicate_key_stmt)
        self.session.commit()

class ManagerAssoc(ORM_Base, Model):
    def __init__(self, **kwargs):
        self.id = idWorker.get_id()
        column_keys = self.__table__.columns.keys()
        udpate_data = dict()
        for key in kwargs.keys():
            if key not in column_keys:
                continue
            else:
                udpate_data[key] = kwargs[key]
        ORM_Base.__init__(self, **udpate_data)
        Model.__init__(self, **kwargs, id = self.id)

   ....
# you can call it as following:
manager_assoc.upsert()
manager.upsert(ingore_keys = ['manager_id'])


Got a simpler solution:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def replace_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    s = s.replace("INSERT INTO", "REPLACE INTO")
    return s

my_connection.execute(my_table.insert(replace_string=""), my_values)


I just used plain sql as:

insert_stmt = "REPLACE INTO tablename (column1, column2) VALUES (:column_1_bind, :columnn_2_bind) "
session.execute(insert_stmt, data)


Update Feb 2023: SQLAlchemy version 2 was recently released and supports on_duplicate_key_update in the MySQL dialect. Many many thanks to Federico Caselli of the SQLAlchemy project who helped me develop sample code in a discussion at https://github.com/sqlalchemy/sqlalchemy/discussions/9328

Please see https://stackoverflow.com/a/75538576/1630244

If it's ok to post the same answer twice (?) here is my small self-contained code example:

import sqlalchemy as db
import sqlalchemy.dialects.mysql as mysql
from sqlalchemy import delete, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "foo"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))


engine = db.create_engine('mysql+mysqlconnector://USER-NAME-HERE:PASS-WORD-HERE@localhost/SCHEMA-NAME-HERE')
conn = engine.connect()

# setup step 0 - ensure the table exists
Base().metadata.create_all(bind=engine)

# setup step 1 - clean out rows with id 1..5
del_stmt = delete(User).where(User.id.in_([1, 2, 3, 4, 5]))
conn.execute(del_stmt)
conn.commit()
sel_stmt = select(User)
users = list(conn.execute(sel_stmt))
print(f'Table size after cleanout: {len(users)}')

# setup step 2 - insert 4 rows
ins_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "x"},
        {"id": 2, "name": "y"},
        {"id": 3, "name": "w"},
        {"id": 4, "name": "z"},
    ]
)
conn.execute(ins_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after insert: {len(users)}')

# demonstrate upsert
ups_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "xx"},
        {"id": 2, "name": "yy"},
        {"id": 3, "name": "ww"},
        {"id": 5, "name": "new"},
    ]
)
ups_stmt = ups_stmt.on_duplicate_key_update(name=ups_stmt.inserted.name)
# if you want to see the compiled result
# x = ups_stmt.compile(dialect=mysql.dialect())
# print(x.string, x.construct_params())
conn.execute(ups_stmt)
conn.commit()

users = list(conn.execute(sel_stmt))
print(f'Table size after upsert: {len(users)}')


As none of these solutions seem all the elegant. A brute force way is to query to see if the row exists. If it does delete the row and then insert otherwise just insert. Obviously some overhead involved but it does not rely on modifying the raw sql and it works on non orm stuff.

0

精彩评论

暂无评论...
验证码 换一张
取 消