Python: SQLAlchemy 处理 PostgreSQL on conflict

news/2024/7/9 21:16:00 标签: python, postgresql

目录

一、数据模型定义

二、ON CONFLICT DO NOTHING - 忽略本条数据的插入

三、ON CONFLICT DO UPDATE - 更新旧数据


一、数据模型定义

models.py 文件内是数据模型定义。

python">import datetime
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Dog(Base):
    __tablename__ = "dog"

    id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True)
    name = sqlalchemy.Column(sqlalchemy.String(256), nullable=True)
    uuid = sqlalchemy.Column(sqlalchemy.String(64), unique=True)
    addtime = sqlalchemy.Column(sqlalchemy.DateTime())
    data = sqlalchemy.Column(sqlalchemy.Text(), default='{}')

    def __init__(self, name, uuid, data='{}', addtime=None):
        self.name = name
        self.uuid = uuid
        self.data = data
        self.addtime = addtime if None != addtime else datetime.datetime.now()

其中 uuid 字段是 unique 的,也就是不允许重复,当要插入的数据的 uuid 和表中的某个 uuid 重复时,就会产生冲突,数据库会返回错误

可以使用 PostgreSQL 提供的 ON CONFLICT 特性在数据冲突时做出处理。

处理方式有两种:

  • 遇到冲突时,忽略本条数据的插入
  • 遇到冲突时,更新旧的数据

二、ON CONFLICT DO NOTHING - 忽略本条数据的插入

在 SQLAlchemy 中使用 on_conflict_do_nothing 功能,需要手动写 insert 参数。

完整的 on_conflict_do_nothing 代码如下:

python">from sqlalchemy.dialects.postgresql import insert
import models
import dbopt
import uuid
import sqlalchemy.orm as orm
import sqlalchemy
import models

url = 'postgresql://postgres:dbpassword@127.0.0.1:5432/zoodb'
engine = sqlalchemy.create_engine(url)
metadata = sqlalchemy.schema.MetaData(bind=engine)
models.Base.metadata.create_all(engine)

Sess = orm.sessionmaker(bind=engine)

def add_random_dog(sess, n=3):
    doges = list()
    for i in range(n):
        dog = models.Dog(
                'dog-{0}'.format(i),
                str(uuid.uuid4()))
        sess.add(dog)
        doges.append(dog)
    sess.commit()

    return doges

def dog_on_conflict_do_nothing(sess, d):
    dog = models.Dog('Big' + d.name, d.uuid)

    insert_stmt = insert(models.Dog).values(
            name = dog.name,
            uuid = dog.uuid,
            addtime = dog.addtime,
            data = dog.data)

    do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
            index_elements=['uuid']
            )
    sess.execute(do_nothing_stmt)
    sess.commit()

def show_dog(sess, doges, msg=''):
    print("{0} =======================================".format(msg))
    uuids = [d.uuid for d in doges]
    for d in sess.query(models.Dog).filter(models.Dog.uuid.in_(uuids)):
        print(d.name, d.uuid, d.addtime, d.data, sep=' | ')

def main():
    sess = Sess()
    doges = add_random_dog(sess, 2)
    show_dog(sess, doges, "after insert")

    # new 一个 dog 对象,但是数据库中此时没有 'new dog'
    doges.append(models.Dog('new dog', str(uuid.uuid4())))

    for d in doges:
        dog_on_conflict_do_nothing(sess, d)
    show_dog(sess, doges, "after update")

if "__main__" == __name__:
    main()

运行结果如下,可以看到 uuid 字段冲突数据的 name 没有发生变化,而没有 uuid 冲突的新数据插入成功:

after insert =======================================
dog-0 | 19d89e94-29da-43bb-8a9b-0acbc7c9d332 | 2021-12-08 18:05:54.522041 | {}
dog-1 | befc7010-2939-4bca-aeb8-bf9dec025266 | 2021-12-08 18:05:54.522369 | {}
after update =======================================
dog-0 | 19d89e94-29da-43bb-8a9b-0acbc7c9d332 | 2021-12-08 18:05:54.522041 | {}
dog-1 | befc7010-2939-4bca-aeb8-bf9dec025266 | 2021-12-08 18:05:54.522369 | {}
Bignew dog | 0f3c0d8f-9560-4308-a7f5-8b101ecb9524 | 2021-12-08 18:05:54.545064 | {}

三、ON CONFLICT DO UPDATE - 更新旧数据

完整的 on_conflict_do_update 代码如下:

python">from sqlalchemy.dialects.postgresql import insert
import models
import dbopt
import uuid
import sqlalchemy.orm as orm
import sqlalchemy
import models

url = 'postgresql://postgres:dbpassword@127.0.0.1:5432/zoodb'
engine = sqlalchemy.create_engine(url)
metadata = sqlalchemy.schema.MetaData(bind=engine)
models.Base.metadata.create_all(engine)

Sess = orm.sessionmaker(bind=engine)

def add_random_dog(sess, n=3):
    doges = list()
    for i in range(n):
        dog = models.Dog(
                'dog-{0}'.format(i),
                str(uuid.uuid4()))
        sess.add(dog)
        doges.append(dog)
    sess.commit()

    return doges

def dog_on_conflict_do_update(sess, d):
    dog = models.Dog('Big' + d.name, d.uuid)
    insert_stmt = insert(models.Dog).values(
            name=dog.name,
            uuid=dog.uuid,
            addtime=dog.addtime,
            data=dog.data)

    do_update_stmt = insert_stmt.on_conflict_do_update(
            index_elements=['uuid'],
            set_=dict(
                name=dog.name,
                addtime=dog.addtime,
                data=dog.data))
    sess.execute(do_update_stmt)

    sess.commit()

def show_dog(sess, doges, msg=''):
    print("{0} =======================================".format(msg))
    uuids = [d.uuid for d in doges]
    for d in sess.query(models.Dog).filter(models.Dog.uuid.in_(uuids)):
        print(d.name, d.uuid, d.addtime, d.data, sep=' | ')

def main():
    sess = Sess()
    doges = add_random_dog(sess, 2)
    show_dog(sess, doges, "after insert")

    # new 一个 dog 对象,但是数据库中此时没有 'new dog'
    doges.append(models.Dog('new dog', str(uuid.uuid4())))

    for d in doges:
        dog_on_conflict_do_update(sess, d)
    show_dog(sess, doges, "after on_conflict_do_update")

if "__main__" == __name__:
    main()

执行结果如下,可以看到 uuid 冲突的数据,其 name 字段被更新了,而没有 uuid 冲突的数据,添加成功了。

after insert =======================================
dog-0 | 39127e0c-5286-4ce6-a12c-3a967907be0c | 2021-12-08 18:11:42.997584 | {}
dog-1 | 7d3aea63-62be-4643-a99b-04e942c1d80c | 2021-12-08 18:11:42.997909 | {}
after on_conflict_do_update =======================================
Bigdog-0 | 39127e0c-5286-4ce6-a12c-3a967907be0c | 2021-12-08 18:11:43.055750 | {}
Bigdog-1 | 7d3aea63-62be-4643-a99b-04e942c1d80c | 2021-12-08 18:11:43.062493 | {}
Bignew dog | 0988df82-c484-4a22-9418-0fa18f4758ce | 2021-12-08 18:11:43.066149 | {}

 


http://www.niftyadmin.cn/n/788489.html

相关文章

Python网络通信--socket编程

原文链接:http://www.juzicode.com/archives/819 在前面的系列文章中涉及的内容都是在本地进行的,接下来介绍的内容会更有趣,会进入到一个更广阔的世界。Python中内置的socket模块可以实现多台电脑间通过网口进行通信,互相收发消息&#xff…

leetcode-121 Path Sum II

题目: Given a binary tree and a sum, find all root-to-leaf paths where each paths sum equals the given sum. Note: A leaf is a node with no children. Example: Given the below binary tree and sum 22, 5/ \4 8/ / \11 13 4/ \ / \ 7 …

如何利用Guava实现方法调用超时自动中断

在实际的开发中,我们会经常遇见一些这样的情景, (1)对于突发高并发下环境下,服务器压力很大的情况下,调用某些方法超过100ms不响应,应自动拒绝服务,而不是一直阻塞下去,…

Python网络通信–smtplib发送邮件

原文链接:http://www.juzicode.com/archives/1381 在《Python进阶教程m9–网络通信–socket通信》中介绍了在Python中利用socket模块实现tcp或者udp方式进行网络通信的方法,socket模块可以看做在TCP/IP协议中传输层提供了编程接口,应用层的协议需要自己…

数学(欧拉函数):UVAOJ 11426 GCD - Extreme (II)

额&#xff0c;由于我并不知道如何显示PDF文件&#xff0c;于是就只好截图了。 这是一道数论题&#xff0c;我觉得还是挺有意思的。 1 #include <iostream>2 #include <algorithm>3 #include <cstring>4 #include <cmath>5 #include <cstdio>6 u…

C++11: vector 初始化赋值

目录 一、std::vector 的构造函数举例 二、 std::vector 构造函数列表 一、std::vector 的构造函数举例 #include <vector> #include <string> #include <iostream>template<typename T> std::ostream& operator<<(std::ostream& s, …

leetcode_123 Flatten Binary Tree to Linked List

题目&#xff1a; 把二叉树转换为链表 Given a binary tree, flatten it to a linked list in-place. For example, given the following tree: 1/ \2 5/ \ \ 3 4 6The flattened tree should look like: 1\2\3\4\5\6 解法一&#xff1a;递归 preorder的逆序 链表…

zbar:给我来10G“打码”图片

更多精彩尽在“桔子code”&#xff0c;后台回复“pyzbar”可以获取到本文源码&#xff1a; 原文链接&#xff1a;http://www.juzicode.com/archives/1399 前几天有位朋友联系桔子哥&#xff0c;他们的发货系统故障导致货物包装上的条码都没有录入到系统中跟踪&#xff0c;积累…