导航菜单

  • 1.vector
  • 2.milvus
  • 3.pymilvus
  • 4.rag
  • 5.rag_measure
  • ragflow
  • heapq
  • HNSW
  • cosine_similarity
  • math
  • typing
  • etcd
  • minio
  • collections
  • jieba
  • random
  • beautifulsoup4
  • chromadb
  • sentence_transformers
  • numpy
  • lxml
  • openpyxl
  • PyMuPDF
  • python-docx
  • requests
  • python-pptx
  • text_splitter
  • all-MiniLM-L6-v2
  • openai
  • llm
  • BPETokenizer
  • Flask
  • RAGAS
  • BagofWords
  • langchain
  • Pydantic
  • abc
  • faiss
  • MMR
  • scikit-learn
  • Runnable
  • PromptEngineering
  • dataclasses
  • LaTeX
  • rank_bm25
  • TF-IDF
  • asyncio
  • sqlalchemy
  • fastapi
  • Starlette
  • uvicorn
  • argparse
  • Generic
  • ssl
  • urllib
  • python-dotenv
  • RRF
  • CrossEncoder
  • Lost-in-the-middle
  • Jinja2
  • logger
  • io
  • venv
  • concurrent
  • parameter
  • SSE
  • 1. SQLAlchemy 是什么?
    • 1.1 为什么需要 SQLAlchemy?
    • 1.2 SQLAlchemy 的核心优势
  • 2. 前置知识
    • 2.1 什么是 ORM?
    • 2.2 数据库基础术语
  • 3. 安装与环境准备
    • 3.1 安装 SQLAlchemy
    • 3.2 选择数据库
  • 4. 核心概念:Engine、Base、Session
    • 4.1 Engine(引擎)
    • 4.2 Base(声明式基类)
    • 4.3 Session(会话)
  • 5. 定义数据模型
    • 5.1 基本模型定义
    • 5.2 常用列类型
    • 5.3 列参数
  • 6. 基本 CRUD 操作
    • 6.1 创建(Create):添加数据
    • 6.2 读取(Read):查询数据
    • 6.3 更新(Update):修改数据
    • 6.4 删除(Delete):删除数据
  • 7. 一对多和多对一
  • 8. 多对多关系
  • 9. 复杂条件与聚合
    • 9.1 复杂条件查询
    • 9.2 聚合查询
  • 10. 事务管理
    • 10.1 基本事务操作
    • 10.2 高级事务操作
  • 11. 常见错误与最佳实践
    • 11.1 错误 1:忘记提交事务
    • 11.2 错误 2:忘记关闭会话
    • 11.3 错误 3:循环中频繁提交
    • 11.4 最佳实践

1. SQLAlchemy 是什么? #

SQLAlchemy 是 Python 中最流行的 ORM(对象关系映射)框架。简单来说,它让你用 Python 类和对象来操作数据库,而不用直接写 SQL。

1.1 为什么需要 SQLAlchemy? #

  1. 简化代码,提升开发效率
    用 SQL 写复杂业务时,代码通常“胶水味”重,维护难。SQLAlchemy 可以让你用 Python 代码优雅地描述数据操作,逻辑更清晰,代码可重用性也更高。

  2. 更易维护与重构
    数据表结构变动时,只需调整 Python 类定义,无需大规模修改 SQL 语句。

  3. 跨数据库兼容
    项目迁移数据库,比如从 SQLite 换成 MySQL,只需调整一行配置,大部分业务代码可以原封不动。

  4. 数据安全 & 注入防护
    ORM 自动处理参数绑定,天然防止 SQL 注入漏洞。

  5. 丰富的生态和扩展能力
    支持事务、关系映射、迁移工具、异步等高级特性。

一句话:ORM 能让数据操作变得更“像写 Python”,而不是“搬砖 SQL”。

1.2 SQLAlchemy 的核心优势 #

  1. 代码简洁直观:用对象而不是SQL字符串操作数据
  2. 类型安全:更早发现问题
  3. 数据库无关:可无缝切换不同数据库
  4. 自动管理连接与事务:无需手动管理细节

2. 前置知识 #

学习 SQLAlchemy 之前,需要了解:

2.1 什么是 ORM? #

ORM(Object-Relational Mapping,对象关系映射):把数据库表映射为 Python 类,行映射为对象,列映射为对象属性。

比如:

  • 表 users → Python类 User
  • 一行数据 → User实例
  • 列(如 name、age)→ 对象属性

2.2 数据库基础术语 #

  • 表(Table):存储结构,类似 Excel 表
  • 行(Row):一条记录
  • 列(Column):字段
  • 主键(Primary Key):唯一标识
  • 外键(Foreign Key):关联其他表

3. 安装与环境准备 #

3.1 安装 SQLAlchemy #

使用pip:

pip install sqlalchemy

3.2 选择数据库 #

这里用SQLite示例,因为:

  • 免服务器、易上手
  • Python自带支持
  • 学习/开发首选

其它数据库(如MySQL、PostgreSQL)需安装驱动:

  • MySQL: pip install pymysql
  • PostgreSQL: pip install psycopg2

4. 核心概念:Engine、Base、Session #

4.1 Engine(引擎) #

Engine 负责数据库连接和方言处理。

# 导入SQLAlchemy中的create_engine用于创建数据库引擎
from sqlalchemy import create_engine

# 创建一个连接到SQLite数据库的引擎,"example.db"为数据库文件名,echo=True表示输出SQL日志
engine = create_engine("sqlite:///example.db", echo=True)
  • SQLite: sqlite:///文件名.db
  • MySQL: mysql+pymysql://用户名:密码@主机:端口/数据库名
  • PostgreSQL: postgresql://用户名:密码@主机:端口/数据库名

4.2 Base(声明式基类) #

Base 是所有ORM模型的基类。

# 从SQLAlchemy ORM模块导入声明式基类
from sqlalchemy.orm import DeclarativeBase

# 定义基础类Base,继承自DeclarativeBase,供所有模型类继承
class Base(DeclarativeBase):
    # 占位语句,不实现任何内容
    pass

4.3 Session(会话) #

Session 负责ORM操作,推荐用 with 上下文:

# 从 SQLAlchemy 的 ORM 模块导入 Session 类
from sqlalchemy.orm import Session

# 使用 with 语句自动管理会话资源,连接到指定的 engine(数据库引擎)
with Session(engine) as session:
    # 在此处编写数据库操作,如增删改查等

5. 定义数据模型 #

在定义数据模型(Model)时,我们实际上是在用 Python 类描述数据库中的表结构。每个类对应数据库中的一张表,类的每个属性(带有类型注解的字段)对应表中的一列。

通常,每个数据模型都应:

  • 继承自 Base,获得 SQLAlchemy ORM 的所有功能。
  • 设置 __tablename__ 指定映射到的表名。
  • 使用类型注解(Mapped[类型])配合 mapped_column 定义表字段的类型、主键、是否可为空、默认值等。
  • 可以实现如 __repr__ 的魔法方法方便调试和展示。

定义好数据模型后,调用 Base.metadata.create_all(engine) 可以根据模型类自动在数据库中创建表。这种声明式的方式大大提升了代码可读性和开发效率。此外,还可以通过继承和关系映射(如一对多、多对多等)来描述更复杂的数据结构。

5.1 基本模型定义 #

# 导入SQLAlchemy的相关模块和类型
from sqlalchemy import create_engine, String, Integer, Float, Boolean, DateTime
# 导入SQLAlchemy的声明式基类和类型映射工具
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
# 导入datetime模块用于时间类型
from datetime import datetime

# 定义基础类,用作所有模型类的基类
class Base(DeclarativeBase):
    pass

# 定义Product类,映射到数据库的"products"表
class Product(Base):
    # 指定表名为"products"
    __tablename__ = "products"
    # 定义主键id列,类型为整数
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义name列,类型为字符串,最大长度100,不能为空
    name: Mapped[str] = mapped_column(String(100), nullable=False)
    # 定义price列,类型为浮点数
    price: Mapped[float]
    # 定义is_available列,类型为布尔,默认值为True
    is_available: Mapped[bool] = mapped_column(default=True)
    # 定义created_at列,类型为日期时间,默认值为当前时间
    created_at: Mapped[datetime] = mapped_column(default=datetime.now)

    # 定义对象的字符串表示形式
    def __repr__(self):
        return f"<Product(id={self.id}, name='{self.name}', price={self.price})>"

# 创建数据库引擎,使用sqlite数据库,文件名为model_example.db,开启SQL语句回显
engine = create_engine("sqlite:///model_example.db", echo=True)
# 创建所有定义的数据表
Base.metadata.create_all(engine)
# 打印表创建成功的信息
print("表创建成功!")
# 隐式开启一个事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询主数据库中"products"表的结构信息
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("products")
# 执行原生SQL,没有查询参数
INFO sqlalchemy.engine.Engine [raw sql] ()
# 查询临时数据库中"products"表的结构信息
INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("products")
# 执行原生SQL,没有查询参数
INFO sqlalchemy.engine.Engine [raw sql] ()
# 执行创建products表的SQL语句
INFO sqlalchemy.engine.Engine
CREATE TABLE products (
        id INTEGER NOT NULL,         # id列,整数类型,非空
        name VARCHAR(100) NOT NULL,  # name列,长度最多100的字符串,非空
        price FLOAT NOT NULL,        # price列,浮点数,非空
        is_available BOOLEAN NOT NULL, # is_available列,布尔型,非空
        created_at DATETIME NOT NULL,  # created_at列,日期时间类型,非空
        PRIMARY KEY (id)               # id设置为主键
)
# 执行原生SQL,没有返回结果(表示DDL成功)
INFO sqlalchemy.engine.Engine [no key 0.00021s] ()
# 提交事务,表创建操作完成
INFO sqlalchemy.engine.Engine COMMIT

5.2 常用列类型 #

常用的列类型(Column Types)用于定义数据库表中每个字段的数据类型。下面介绍几个 SQLAlchemy 中经常使用的列类型:

  • Integer: 整数类型,适合用于自增主键或计数数字等。
  • String(length): 字符串类型,需要指定最大长度,如 String(100) 表示最长100个字符。
  • Float: 浮点数类型,常用于价格、分数等有小数的字段。
  • Boolean: 布尔类型,只能存储 True 或 False。
  • DateTime: 日期时间类型,适合记录创建、更新时间等。
  • Text: 长文本类型,可以存储大量文本数据,比如文章内容等。
  • Date: 单独的日期类型(不包括时间),适合如生日、纪念日等只需日期的场景。

在定义模型类时,通常使用 mapped_column(ColumnType, 配置参数...) 的方式声明字段类型。例如:

# 导入常用的列类型
from sqlalchemy import String, Integer, Float, Boolean, DateTime, Text, Date

# 定义Example表对应的模型类
class Example(Base):
    # 设置表名为examples
    __tablename__ = "examples"
    # id列,整型,主键
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    # title列,长度不超过255的字符串,不能为空
    title: Mapped[str] = mapped_column(String(255), nullable=False)
    # price列,浮点数类型
    price: Mapped[float] = mapped_column(Float)
    # is_active列,布尔类型,默认值为True
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    # created_at列,日期时间类型
    created_at: Mapped[datetime] = mapped_column(DateTime)
    # content列,长文本类型
    content: Mapped[str] = mapped_column(Text)
    # birthday列,日期类型
    birthday: Mapped[date] = mapped_column(Date)

5.3 列参数 #

列参数用于进一步描述数据库表中每个字段的属性。这些参数在使用mapped_column()时传递,可以控制字段的主键、是否允许为空、是否唯一、默认值等。常用列参数说明如下:

  • primary_key=True:指定该列为表的主键(如自增id)。
  • nullable=False:该列不能为空,必须有值。
  • unique=True:该列值必须唯一,不能重复。
  • default=值:设置列的默认值,当插入时没有提供值会使用该默认值。
  • index=True:为该列创建数据库索引,加快查询速度(常见于经常筛选的字段)。
  • server_default=文本:为该列设置数据库层面的默认值,常用SQL表达式(如server_default=text("CURRENT_TIMESTAMP"))。

例如,增加列参数的模型定义如下:

# 导入SQLAlchemy常用列类型及text函数
from sqlalchemy import String, Integer, Float, Boolean, DateTime, Text, Date, text

# 定义Product模型类,对应数据库中的products表
class Product(Base):
    # 指定表名为products
    __tablename__ = "products"
    # id列:整型,主键,自增
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    # name列:字符串类型,最大长度255,唯一且不能为空
    name: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
    # price列:浮点数类型,不能为空,默认值为0.0
    price: Mapped[float] = mapped_column(Float, nullable=False, default=0.0)
    # in_stock列:布尔类型,默认值为True
    in_stock: Mapped[bool] = mapped_column(Boolean, default=True)
    # created_at列:日期时间类型,默认值为当前时间(数据库层面)
    created_at: Mapped[datetime] = mapped_column(
        DateTime, server_default=text("CURRENT_TIMESTAMP")
    )

6. 基本 CRUD 操作 #

在进行数据库开发时,除了定义模型类和字段类型,还需要掌握对数据表的基本增删改查(CRUD)操作。SQLAlchemy通过ORM方式让我们可以像操作Python对象一样操作数据库中的数据。下面将依次介绍如何使用SQLAlchemy完成创建(Create)、查询(Read)、更新(Update)、删除(Delete)等常见操作。

每个操作通常都在数据库会话(Session)中进行,只有提交(commit)之后,操作才会实际反映到数据库。后续小节将通过实例演示每种基本操作的具体用法。

6.1 创建(Create):添加数据 #

在进行创建(Create)操作时,我们通过构造模型类的实例,将其添加到数据库会话(Session)中,然后通过commit()方法提交事务,即可将新数据实际保存到数据库表格里。可以单独添加一条记录,也可以批量添加多条记录。通常推荐使用上下文管理器(with Session(engine) as session:)来自动管理数据库会话,保证资源正确释放和自动回滚未提交的事务。

当创建新对象时,未提交前对象的主键(如自增id)通常为None,只有调用commit()后,数据库会自动生成主键并回填到Python对象中。通过add()方法可添加单个对象,使用add_all()方法则可以一次性批量添加多个对象。

# 导入SQLAlchemy库中的create_engine、String、Integer等类
from sqlalchemy import create_engine, String, Integer
# 从sqlalchemy.orm导入声明基类、会话、类型映射等工具
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

# 定义基础模型类,所有模型类都应该继承自Base
class Base(DeclarativeBase):
    pass

# 定义User用户表模型,继承自Base
class User(Base):
    # 指定数据库中的表名为users
    __tablename__ = "users"
    # 定义id字段,主键,自增
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义name字段,字符串类型,最大长度50
    name: Mapped[str] = mapped_column(String(50))
    # 定义age字段,整型
    age: Mapped[int]

    # 定义实例的字符串表现形式,便于打印用户对象信息
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', age={self.age})>"

# 创建sqlite数据库引擎,数据库文件名为example.db,echo为True会输出SQL日志
engine = create_engine("sqlite:///example.db", echo=True)
# 根据所有模型定义在数据库中创建表
Base.metadata.create_all(engine)

# 使用上下文管理器打开数据库会话
with Session(engine) as session:
    # 单个添加用户
    user1 = User(name="张三", age=25)
    # 将user1对象添加到session中,处于待添加状态
    session.add(user1)
    # 提交事务,将user1信息写入数据库
    session.commit()
    # 输出添加成功的信息
    print("添加用户1成功")

    # 批量添加多个用户
    users = [
        User(name="李四", age=30),
        User(name="王五", age=28),
        User(name="赵六", age=35),
    ]
    # 将多个User对象一次性添加到session
    session.add_all(users)
    # 提交事务,将users中的用户信息写入数据库
    session.commit()
    # 输出批量添加成功的信息
    print("批量添加用户成功")
# 输出SQLAlchemy自动生成建表过程的调试信息
print("INFO sqlalchemy.engine.Engine BEGIN (implicit)")
# 显示主库中users表的结构信息
print("INFO sqlalchemy.engine.Engine PRAGMA main.table_info(\"users\")")
# 执行原生SQL,获取表信息
print("INFO sqlalchemy.engine.Engine [raw sql] ()")
# 显示临时库中users表的结构信息
print("INFO sqlalchemy.engine.Engine PRAGMA temp.table_info(\"users\")")
# 执行原生SQL,获取临时表信息
print("INFO sqlalchemy.engine.Engine [raw sql] ()")
# 显示生成的CREATE TABLE users建表SQL语句
print("INFO sqlalchemy.engine.Engine\nCREATE TABLE users (\n        id INTEGER NOT NULL,\n        name VARCHAR(50) NOT NULL,\n        age INTEGER NOT NULL,\n        PRIMARY KEY (id)\n)\n")
# 建表完成,无主键约束ID消耗时间信息
print("INFO sqlalchemy.engine.Engine [no key 0.00015s] ()")
# 提交建表事务
print("INFO sqlalchemy.engine.Engine COMMIT")

# 单条添加用户时输出的SQLAlchemy日志信息
print("INFO sqlalchemy.engine.Engine BEGIN (implicit)")
# 执行插入一条用户数据的SQL语句
print("INFO sqlalchemy.engine.Engine INSERT INTO users (name, age) VALUES (?, ?)")
# 显示插入操作所用时间和参数
print("INFO sqlalchemy.engine.Engine [generated in 0.00015s] ('张三', 25)")
# 提交插入事务
print("INFO sqlalchemy.engine.Engine COMMIT")
# 控制台输出添加用户1成功的提示
print("添加用户1成功")

# 批量添加多条用户时输出的SQLAlchemy日志信息
print("INFO sqlalchemy.engine.Engine BEGIN (implicit)")
# 执行批量插入用户数据并返回id
print("INFO sqlalchemy.engine.Engine INSERT INTO users (name, age) VALUES (?, ?) RETURNING id")
# 插入第一条用户记录,显示参数与性能信息
print("INFO sqlalchemy.engine.Engine [generated in 0.00006s (insertmanyvalues) 1/3 (ordered; batch not supported)] ('李四', 30)")
# 插入第二条用户记录
print("INFO sqlalchemy.engine.Engine INSERT INTO users (name, age) VALUES (?, ?) RETURNING id")
# 第二条记录参数与性能信息
print("INFO sqlalchemy.engine.Engine [insertmanyvalues 2/3 (ordered; batch not supported)] ('王五', 28)")
# 插入第三条用户记录
print("INFO sqlalchemy.engine.Engine INSERT INTO users (name, age) VALUES (?, ?) RETURNING id")
# 第三条记录参数与性能信息
print("INFO sqlalchemy.engine.Engine [insertmanyvalues 3/3 (ordered; batch not supported)] ('赵六', 35)")
# 批量插入事务提交
print("INFO sqlalchemy.engine.Engine COMMIT")
# 控制台输出批量添加用户成功的提示
print("批量添加用户成功")

6.2 读取(Read):查询数据 #

本节介绍了 SQLAlchemy ORM 中常见的查询(Read)操作。查询是数据库操作中最核心的环节之一,通过不同方式可以灵活地检索、筛选和排序表中的数据。

SQLAlchemy 的 ORM 查询主要依靠 select() 语句结合会话(Session)进行,通常分为如下几个步骤:

  1. 创建会话(Session): 用于操作数据库的入口,生命周期内可执行多次查询。

    with Session(engine) as session:
        # 查询操作写在这里
  2. 构造查询(select): 使用 select(Model) 来指定要查询的数据表和模型。

    stmt = select(User)
  3. 添加条件(where): 可通过 .where() 添加筛选条件,实现类似 SQL 的 WHERE 子句。

    stmt = select(User).where(User.age > 25)
  4. 排序(order_by): 使用 .order_by() 进行排序,可升序(默认)或降序(desc)。

    stmt = select(User).order_by(User.age.desc())
  5. 限制、分页(limit/offset): 可通过 .limit() 限制结果数量,.offset() 跳过指定行数,常用于分页查询。

    stmt = select(User).limit(10).offset(20)
  6. 执行查询:

    • session.scalars(stmt).all():返回所有结果,列表形式。
    • session.scalars(stmt).first():返回首条结果或 None。
    • session.execute(stmt):更通用,返回 Result 对象。

    例子:

    # 查询所有用户
    users = session.scalars(select(User)).all()
    # 查询指定条件第一个用户
    user = session.scalars(select(User).where(User.name == "张三")).first()
  7. 结果处理: 查询结果为 ORM 对象实例,可直接访问其属性或参与后续操作。

通过以上结构,SQLAlchemy ORM 查询能灵活实现各种数据检索、筛选和排序需求,查询语法与原生 SQL 高度对应,又具有 Pythonic 的优势。

在 SQLAlchemy ORM 查询中,session.execute() 和 session.scalars() 都可用于执行 SQL 语句,但有不同的用法和返回类型:

  • session.execute(stmt):

    • 用于执行各种 SQL 语句(查询、更新、删除等),返回一个通用的 Result 对象。
    • 查询时返回的是“元组形式的行”,即一行中的所有字段会以 tuple 形式返回。
    • 适合自定义字段选择、聚合查询、原生 SQL 等复杂/特殊场景。
    • 例子:
      # 查询所有用户的id和name字段(返回元组)
      result = session.execute(select(User.id, User.name))
      for row in result:
          print(row)  # 如:(1, '张三')
  • session.scalars(stmt):

    • 适用于查询“单一实体”或“单列字段”。
    • 返回指定字段/对象的直接列表,而不是元组。
    • 最常见于直接查询 ORM 模型或者单列时的高效提取。
    • 例子:

      # 查询所有用户对象,直接得到User实例列表
      users = session.scalars(select(User)).all()
      for user in users:
          print(user)  # <User(id=1, name='张三', age=26)>
      
      # 查询所有用户的name
      names = session.scalars(select(User.name)).all()
      print(names)  # ['张三', '李四', ...]

总结:

  • execute 适用范围广,返回元组结构,适合复杂/多字段/多表查询。
  • scalars 聚焦单列或单实体,返回直接的元素,更加简洁与便捷。

实际开发时,可根据查询目标及结果形式选择合适方法。

# 导入SQLAlchemy库中的create_engine、String、Integer、select、desc、func
from sqlalchemy import create_engine, String, Integer,select,desc,func

# 从sqlalchemy.orm导入声明基类、会话、类型映射等工具
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

# 定义基础模型类,所有模型类都应该继承自Base
class Base(DeclarativeBase):
    # 占位,用于继承
    pass

# 定义User用户表模型,继承自Base
class User(Base):
    # 指定表名为users
    __tablename__ = "users"
    # 定义id字段,类型为整型,主键,自增
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义name字段,类型为字符串,最大长度50
    name: Mapped[str] = mapped_column(String(50))
    # 定义age字段,类型为整型
    age: Mapped[int]

    # 定义对象的字符串表示形式,方便打印输出
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', age={self.age})>"

# 创建sqlite数据库引擎,数据库名为example.db,echo=True表示输出SQL日志
engine = create_engine("sqlite:///example.db", echo=True)

# 根据模型创建所有表结构
Base.metadata.create_all(engine)

# 使用上下文管理器打开数据库会话
with Session(engine) as session:
    # 查询所有用户
    all_users = session.scalars(select(User)).all()
    # 打印所有用户
    print("所有用户:")
    for user in all_users:
        print(user)

    # 查询第一条用户记录
    first_user = session.scalars(select(User)).first()
    # 打印第一条用户
    print(f"\n第一条记录:{first_user}")

    # 按主键查询id为1的用户
    user_by_id = session.get(User, 1)
    # 打印id为1的用户
    print(f"\nID为1的用户:{user_by_id}")

    # 根据用户名等值查询用户
    user_zhang = session.scalars(select(User).where(User.name == "张三")).first()
    # 打印姓名为"张三"的用户
    print(f"\n姓名为'张三'的用户:{user_zhang}")

    # 查询年龄小于30岁的用户
    young_users = session.scalars(select(User).where(User.age < 30)).all()
    # 打印年龄小于30岁的用户
    print(f"\n年龄小于30的用户:")
    for user in young_users:
        print(user)

    # 按年龄倒序排列所有用户
    sorted_users = session.scalars(select(User).order_by(desc(User.age))).all()
    # 打印按年龄降序排列的用户
    print(f"\n按年龄降序排列:")
    for user in sorted_users:
        print(user)

    # 查询年龄最大(前2名)的用户
    top_2 = session.scalars(select(User).order_by(desc(User.age)).limit(2)).all()
    # 打印年龄最大的2个用户
    print(f"\n年龄最大的2个用户:")
    for user in top_2:
        print(user)

    # 查询用户总数
    user_count = session.execute(select(func.count(User.id))).scalar_one()
    # 打印用户总数
    print(f"\n用户总数:{user_count}")
# 隐式开启事务(自动事务管理)
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询 users 表结构(表字段信息)
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("users")
# 执行原始 SQL,查询表结构
INFO sqlalchemy.engine.Engine [raw sql] ()
# 提交事务
INFO sqlalchemy.engine.Engine COMMIT
# 隐式开启新事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询 users 表所有字段
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age
FROM users
# 显示 SQL 查询被生成和执行的耗时
INFO sqlalchemy.engine.Engine [generated in 0.00017s] ()
# 打印所有用户
所有用户:
<User(id=1, name='张三', age=25)>
<User(id=2, name='李四', age=30)>
<User(id=3, name='王五', age=28)>
<User(id=4, name='赵六', age=35)>
# 再次查询 users 表,用到了 SQL 查询缓存
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age
FROM users
# 显示 SQL 查询来自缓存及耗时
INFO sqlalchemy.engine.Engine [cached since 0.0008783s ago] ()
# 打印第一条记录
第一条记录:<User(id=1, name='张三', age=25)>
# 按主键查询 id=1 的用户
ID为1的用户:<User(id=1, name='张三', age=25)>
# 执行按姓名等于"张三"查询
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age
FROM users
WHERE users.name = ?
# 显示 SQL 及参数
INFO sqlalchemy.engine.Engine [generated in 0.00019s] ('张三',)
# 打印查询结果
姓名为'张三'的用户:<User(id=1, name='张三', age=25)>
# 执行按年龄<30查询
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age
FROM users
WHERE users.age < ?
# 显示 SQL 及参数
INFO sqlalchemy.engine.Engine [generated in 0.00021s] (30,)
# 打印所有年龄小于30的用户
年龄小于30的用户:
<User(id=1, name='张三', age=25)>
<User(id=3, name='王五', age=28)>
# 按年龄降序排列查询
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age
FROM users ORDER BY users.age DESC
# 显示 SQL 执行耗时
INFO sqlalchemy.engine.Engine [generated in 0.00167s] ()
# 打印按年龄降序排列的用户
按年龄降序排列:
<User(id=4, name='赵六', age=35)>
<User(id=2, name='李四', age=30)>
<User(id=3, name='王五', age=28)>
<User(id=1, name='张三', age=25)>
# 查询年龄最大的前2个用户
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age
FROM users ORDER BY users.age DESC
 LIMIT ? OFFSET ?
# 显示 SQL 及参数
INFO sqlalchemy.engine.Engine [generated in 0.00019s] (2, 0)
# 打印年龄最大的2个用户
年龄最大的2个用户:
<User(id=4, name='赵六', age=35)>
<User(id=2, name='李四', age=30)>
# 查询用户总数
INFO sqlalchemy.engine.Engine SELECT count(users.id) AS count_1
FROM users
# 显示 SQL 执行耗时
INFO sqlalchemy.engine.Engine [generated in 0.00017s] ()
# 打印用户总数
用户总数:4
# 回滚(一般在会话结束时自动回滚未提交事务)
INFO sqlalchemy.engine.Engine ROLLBACK

6.3 更新(Update):修改数据 #

在数据库操作中,更新(Update)用于修改已有表中的数据。SQLAlchemy 支持多种方式更新数据,既可以通过 ORM 对象方式修改单个实例的属性并提交,也可以直接使用 update() 方法进行批量更新。

下面将展示两种常用的更新方式:

  1. 修改单条记录
    先通过查询定位到目标对象,然后修改属性值并提交:

    user = session.scalars(select(User).where(User.name == "张三")).first()
    if user:
        user.age = 26
        session.commit()
  2. 批量更新多条记录
    使用 SQLAlchemy 的 update() 函数,可一次性批量更改满足条件的多条记录:

    session.execute(
        update(User)
        .where(User.age > 30)
        .values(age=31)
    )
    session.commit()

    这两种操作方式在实际开发中常用:前者适用于有业务逻辑处理的单实例修改,后者则适用于大批量的直接更新场景。

# 导入SQLAlchemy中的create_engine、String、Integer、select、desc、func、update方法
from sqlalchemy import create_engine, String, Integer,select,desc,func,update

# 从sqlalchemy.orm导入声明基类、Session会话、类型映射等类
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

# 定义基础模型类Base,所有的ORM模型都继承自此类
class Base(DeclarativeBase):
    # 占位,表示该基类无具体实现,仅供继承
    pass

# 定义用户(User)模型,继承自Base
class User(Base):
    # 指明数据库中的表名为users
    __tablename__ = "users"
    # 定义id字段,整型,主键,自动递增
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义name字段,字符串类型,最大长度为50
    name: Mapped[str] = mapped_column(String(50))
    # 定义age字段,整型
    age: Mapped[int]

    # 定义实例的打印显示格式,便于调试和输出
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', age={self.age})>"

# 创建SQLite数据库引擎,数据库文件名为crud_example.db,echo=True用于打印SQL日志
engine = create_engine("sqlite:///crud_example.db", echo=True)

# 创建所有数据表(如果表不存在则创建)
Base.metadata.create_all(engine)

# 使用上下文管理器开启数据库会话
with Session(engine) as session:
    # 查询名字为“张三”的用户
    user = session.scalars(select(User).where(User.name == "张三")).first()
    # 如果用户存在则修改age字段为26
    if user:
        user.age = 26
        # 提交事务保存更改
        session.commit()
        print(f"更新成功:{user}")
    # 如果用户不存在则输出提示
    else:
        print("用户不存在")

    # 批量更新所有age大于30用户的年龄为31
    session.execute(
        update(User).where(User.age > 30).values(age=31)
    )
    # 提交事务保存更改
    session.commit()
    print("批量更新成功")

    # 查询所有用户,验证更新后的结果
    updated_users = session.scalars(select(User)).all()
    print("\n更新后的所有用户:")
    # 按顺序打印所有用户信息
    for user in updated_users:
        print(user)
# 执行隐式事务的开始
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询表结构信息
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("users")
# 执行原始SQL语句
INFO sqlalchemy.engine.Engine [raw sql] ()
# 提交事务
INFO sqlalchemy.engine.Engine COMMIT

# 再次开启隐式事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询users表中名字为“张三”的所有字段
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age
FROM users
WHERE users.name = ?
# SQL参数为('张三',)
INFO sqlalchemy.engine.Engine [generated in 0.00018s] ('张三',)
# 更新users表中id=1的用户的age为26
INFO sqlalchemy.engine.Engine UPDATE users SET age=? WHERE users.id = ?
# SQL参数为(26, 1)
INFO sqlalchemy.engine.Engine [generated in 0.00014s] (26, 1)
# 提交事务
INFO sqlalchemy.engine.Engine COMMIT

# 再次开启隐式事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询users表中id=1的所有字段并起别名
INFO sqlalchemy.engine.Engine SELECT users.id AS users_id, users.name AS users_name, users.age AS users_age
FROM users
WHERE users.id = ?
# SQL参数为(1,)
INFO sqlalchemy.engine.Engine [generated in 0.00045s] (1,)
# 输出更新成功后的用户信息
更新成功:<User(id=1, name='张三', age=26)>

# 批量更新users表中所有age大于30的用户的age为31
INFO sqlalchemy.engine.Engine UPDATE users SET age=? WHERE users.age > ?
# SQL参数为(31, 30)
INFO sqlalchemy.engine.Engine [generated in 0.00013s] (31, 30)
# 提交事务
INFO sqlalchemy.engine.Engine COMMIT
# 打印批量更新成功
批量更新成功

# 开启新的隐式事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询users表中所有用户信息
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age
FROM users
# 执行无参数SQL
INFO sqlalchemy.engine.Engine [generated in 0.00014s] ()
# 打印更新后的所有用户信息
更新后的所有用户:
<User(id=1, name='张三', age=26)>
<User(id=2, name='李四', age=30)>
<User(id=3, name='王五', age=28)>
<User(id=4, name='赵六', age=31)>
# 回滚事务或结束会话
INFO sqlalchemy.engine.Engine ROLLBACK

6.4 删除(Delete):删除数据 #

删除操作用于从数据库中移除一条或多条数据。在SQLAlchemy中,可以通过两种方式进行删除:

  1. 删除单个对象

    通过session.delete(obj)可以删除已经查询出来的某个对象。删除后记得commit提交事务。

    示例(删除名字为“张三”的用户):

    # 查询name为'张三'的用户对象
    user = session.scalars(select(User).where(User.name == "张三")).first()
    # 如果查到该用户,则删除
    if user:
        session.delete(user)
        session.commit()
        print("删除成功")
    else:
        print("用户不存在")

    控制台输出如下:

    INFO sqlalchemy.engine.Engine DELETE FROM users WHERE users.id = ?
    # SQL参数为(1,)
    INFO sqlalchemy.engine.Engine [generated in 0.00021s] (1,)
    INFO sqlalchemy.engine.Engine COMMIT
    删除成功
  2. 批量删除

    使用delete()方法可以批量删除符合条件的数据行。此方法不会加载对象到内存,效率更高。建议执行后通过session.commit()提交。

    例如,删除所有age小于25的用户:

    # 批量删除所有age小于25的用户,返回删除条数
    deleted_count = session.execute(
        delete(User).where(User.age < 25)
    ).rowcount
    session.commit()
    print(f"删除了 {deleted_count} 条记录")

    控制台输出如下:

    INFO sqlalchemy.engine.Engine DELETE FROM users WHERE users.age < ?
    # SQL参数为(25,)
    INFO sqlalchemy.engine.Engine [generated in 0.00013s] (25,)
    INFO sqlalchemy.engine.Engine COMMIT
    删除了 1 条记录

通过上述方法,即可实现SQLAlchemy中常用的删除操作。

# 导入SQLAlchemy模块中的create_engine、String、Integer、select、desc、func、update、delete函数
from sqlalchemy import create_engine, String, Integer,select,desc,func,update,delete

# 从sqlalchemy.orm中导入声明基类、会话类Session、类型映射Mapped、以及列定义mapped_column
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

# 定义数据库ORM模型的基类,所有ORM类都将继承自Base
class Base(DeclarativeBase):
    # 占位符,无实际内容,仅作为基类被继承
    pass

# 定义用户模型User,继承自基类Base
class User(Base):
    # 设置数据表名称为users
    __tablename__ = "users"
    # 定义id字段,类型为整数,作为主键,自动递增
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义name字段,类型为字符串,最大长度为50
    name: Mapped[str] = mapped_column(String(50))
    # 定义age字段,类型为整数
    age: Mapped[int]

    # 定义实例的打印输出格式,便于调试输出
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', age={self.age})>"

# 创建SQLite数据库引擎,数据库文件名为crud_example.db,echo=True表示输出运行日志
engine = create_engine("sqlite:///crud_example.db", echo=True)

# 创建所有模型中定义的数据表(如果已存在则不会重复创建)
Base.metadata.create_all(engine)

# 使用上下文管理器开启数据库会话(自动提交和关闭)
with Session(engine) as session:
    # 查询name为'张三'的用户对象
    user = session.scalars(select(User).where(User.name == "张三")).first()
    # 如果查到该用户,则删除
    if user:
        session.delete(user)
        session.commit()
        print("删除成功")
    # 没有查到用户则输出不存在提示
    else:
        print("用户不存在")

    # 批量删除所有age小于25的用户,返回删除条数
    deleted_count = session.execute(
        delete(User).where(User.age < 25)
    ).rowcount
    # 提交批量删除操作
    session.commit()
    # 输出批量删除的记录条数
    print(f"批量删除了{deleted_count}条记录")

    # 查询所有剩余用户信息
    remaining_users = session.scalars(select(User)).all()
    # 输出剩余用户信息标题
    print("\n剩余用户:")
    # 循环遍历并输出每一个剩余用户信息
    for user in remaining_users:
        print(user)
# 开始一个隐式事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询users表结构信息
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("users")
# 执行原始SQL
INFO sqlalchemy.engine.Engine [raw sql] ()
# 提交事务(表结构查询后)
INFO sqlalchemy.engine.Engine COMMIT
# 再次开始一个隐式事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询name为'张三'的用户
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age
FROM users
WHERE users.name = ?
# 绑定参数为'张三'
INFO sqlalchemy.engine.Engine [generated in 0.00017s] ('张三',)
# 删除查询到的用户,按主键id删除
INFO sqlalchemy.engine.Engine DELETE FROM users WHERE users.id = ?
# 绑定参数为1(假设张三用户id为1)
INFO sqlalchemy.engine.Engine [generated in 0.00013s] (1,)
# 提交删除操作
INFO sqlalchemy.engine.Engine COMMIT
# 输出删除成功提示
删除成功
# 开启新事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 批量删除age小于25的用户
INFO sqlalchemy.engine.Engine DELETE FROM users WHERE users.age < ?
# 绑定参数为25
INFO sqlalchemy.engine.Engine [generated in 0.00012s] (25,)
# 提交批量删除
INFO sqlalchemy.engine.Engine COMMIT
# 输出批量删除的记录条数
批量删除了0条记录
# 开启新事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询所有剩余的用户
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age
FROM users
# 查询实际执行详细信息
INFO sqlalchemy.engine.Engine [generated in 0.00011s] ()
# 输出剩余用户标题
剩余用户:
# 打印剩余用户信息
<User(id=2, name='李四', age=30)>
<User(id=3, name='王五', age=28)>
<User(id=4, name='赵六', age=31)>
# 回滚事务(关闭会话时自动回滚未提交的更改)
INFO sqlalchemy.engine.Engine ROLLBACK

7. 一对多和多对一 #

一对多(One-to-Many)关系是数据库设计中最常见的关系之一。例如,一个用户(User)可以拥有多个地址(Address),但每个地址只能属于一个用户。这种关系在ORM(对象关系映射)中通过“外键 + 关系属性”来实现。

在上面的模型中,User 类通过 addresses 属性(类型为列表)和 relationship() 实现对多个 Address 的管理,并设置了 back_populates 以支持双向访问。Address 类则用 user_id 字段作为外键指定所属用户,同时也通过 relationship() 实现对 User 的访问,并通过 back_populates 关联上述关系。

典型的使用场景如下:

  1. 添加用户及其多个地址

    • 先创建 User 实例,再为其 addresses 属性赋值一个包含多个 Address 对象的列表。
    • 只需一次提交,该用户及其所有地址就会自动写入数据库。
  2. 通过用户查找其所有地址

    • 数据库会自动根据外键查找、组装该User拥有的全部Address对象。
  3. 通过地址查找其所属用户

    • 通过 Address 对象的 user 属性,可以直接访问到所属的用户实例。
关键词 说明
ForeignKey 声明外键
relationship() 定义模型之间的关系
back_populates 让关系可双向访问
# 引入SQLAlchemy中需要用到的模块和类
from sqlalchemy import create_engine, String, Integer, ForeignKey, select
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column, relationship

# 定义ORM基类,用于所有模型类的继承
class Base(DeclarativeBase):
    pass

# 定义用户(User)模型,继承自Base
class User(Base):
    # 指定映射到数据库的表名为'users'
    __tablename__ = "users"
    # 定义主键id字段,类型为int
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义name字段,类型为str,最大长度为50
    name: Mapped[str] = mapped_column(String(50))
    # 定义age字段,类型为int
    age: Mapped[int]
    # 定义与Address的一对多关系
    addresses: Mapped[list["Address"]] = relationship(
        "Address", back_populates="user", cascade="all, delete-orphan"
    )

    # 定义对象的字符串表示格式
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}')>"

# 定义地址(Address)模型,继承自Base
class Address(Base):
    # 指定映射到数据库的表名为'addresses'
    __tablename__ = "addresses"
    # 定义主键id字段,类型为int
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义外键user_id,关联到users表的id
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    # 定义email字段,类型为str,最大长度为100
    email: Mapped[str] = mapped_column(String(100))
    # 定义与User的多对一关系
    user: Mapped[User] = relationship("User", back_populates="addresses")

    # 定义对象的字符串表示格式
    def __repr__(self):
        return f"<Address(id={self.id}, email='{self.email}')>"

# 创建SQLite数据库连接,数据库文件名为relationship_example.db,echo为True用于输出SQL日志
engine = create_engine("sqlite:///relationship_example.db", echo=True)
# 创建所有表结构
Base.metadata.create_all(engine)

# 开启Session会话操作数据库
with Session(engine) as session:
    # 新建用户和多个地址
    user = User(name="赵六", age=35, addresses=[
        Address(email="zhaoliu@example.com"),
        Address(email="zl@company.com")
    ])
    # 添加用户对象到session
    session.add(user)
    # 提交事务,将数据写入数据库
    session.commit()
    # 打印创建成功信息
    print("创建用户和地址成功")

    # 查询用户及其地址
    user = session.scalars(select(User).where(User.name == "赵六")).first()
    # 打印查询到的用户名
    print(f"\n用户:{user.name}")
    # 打印用户的所有地址
    print("地址列表:")
    for address in user.addresses:
        print(f"  - {address.email}")

    # 通过地址查用户
    address = session.scalars(select(Address).where(Address.email == "zhaoliu@example.com")).first()
    # 打印查询到的地址
    print(f"\n地址:{address.email}")
    # 打印该地址所属的用户名
    print(f"所属用户:{address.user.name}")
# 隐式开启事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询主表users的表结构
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("users")
# 发送原始SQL命令(空参数)
INFO sqlalchemy.engine.Engine [raw sql] ()
# 查询临时表users的表结构
INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("users")
# 发送原始SQL命令(空参数)
INFO sqlalchemy.engine.Engine [raw sql] ()
# 查询主表addresses的表结构
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("addresses")
# 发送原始SQL命令(空参数)
INFO sqlalchemy.engine.Engine [raw sql] ()
# 查询临时表addresses的表结构
INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("addresses")
# 发送原始SQL命令(空参数)
INFO sqlalchemy.engine.Engine [raw sql] ()
# 创建users表
INFO sqlalchemy.engine.Engine
CREATE TABLE users (
        id INTEGER NOT NULL,
        name VARCHAR(50) NOT NULL,
        age INTEGER NOT NULL,
        PRIMARY KEY (id)
)

# 建表反馈,无主键
INFO sqlalchemy.engine.Engine [no key 0.00017s] ()
# 创建addresses表
INFO sqlalchemy.engine.Engine
CREATE TABLE addresses (
        id INTEGER NOT NULL,
        user_id INTEGER NOT NULL,
        email VARCHAR(100) NOT NULL,
        PRIMARY KEY (id),
        FOREIGN KEY(user_id) REFERENCES users (id)
)

# 建表反馈,无主键
INFO sqlalchemy.engine.Engine [no key 0.00024s] ()
# 提交表结构更改
INFO sqlalchemy.engine.Engine COMMIT
# 开始新事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 向users表插入数据
INFO sqlalchemy.engine.Engine INSERT INTO users (name, age) VALUES (?, ?)
# users表插入详细信息(name, age)
INFO sqlalchemy.engine.Engine [generated in 0.00016s] ('赵六', 35)
# 向addresses表插入数据,同时返回自增id
INFO sqlalchemy.engine.Engine INSERT INTO addresses (user_id, email) VALUES (?, ?) RETURNING id
# addresses表插入详细信息(user_id, email)- 第1条
INFO sqlalchemy.engine.Engine [generated in 0.00005s (insertmanyvalues) 1/2 (ordered; batch not supported)] (1, 'zhaoliu@example.com')
# addresses表插入详细信息(user_id, email)- 第2条
INFO sqlalchemy.engine.Engine INSERT INTO addresses (user_id, email) VALUES (?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [insertmanyvalues 2/2 (ordered; batch not supported)] (1, 'zl@company.com')
# 提交插入
INFO sqlalchemy.engine.Engine COMMIT
# 打印“创建用户和地址成功”
创建用户和地址成功
# 开始隐式事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询users表中name为“赵六”的用户
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age
FROM users
WHERE users.name = ?
# 查询参数(name)
INFO sqlalchemy.engine.Engine [generated in 0.00062s] ('赵六',)

# 打印查到的用户名
用户:赵六
# 打印“地址列表:”
地址列表:
# 查询赵六的所有地址(通过user_id)
INFO sqlalchemy.engine.Engine SELECT addresses.id AS addresses_id, addresses.user_id AS addresses_user_id, addresses.email AS addresses_email
FROM addresses
WHERE ? = addresses.user_id
# 查询参数(user_id=1)
INFO sqlalchemy.engine.Engine [generated in 0.00020s] (1,)
  - zhaoliu@example.com
  - zl@company.com
# 查询email为zhaoliu@example.com的地址
INFO sqlalchemy.engine.Engine SELECT addresses.id, addresses.user_id, addresses.email
FROM addresses
WHERE addresses.email = ?
# 查询参数(email)
INFO sqlalchemy.engine.Engine [generated in 0.00018s] ('zhaoliu@example.com',)

# 打印该地址
地址:zhaoliu@example.com
# 打印地址所属的用户
所属用户:赵六
# 回滚事务(结束或关闭session时自动回滚未提交的更改)
INFO sqlalchemy.engine.Engine ROLLBACK

8. 多对多关系 #

多对多(Many-to-Many)关系在实际开发中同样非常常见。例如,一个学生(Student)可以选修多门课程(Course),而一门课程也可以被多个学生选修。多对多关系通常通过“关联表 + relationship()”来实现。

在ORM模型设计时,我们需要单独定义一张关联表(不对应ORM类,只是Table对象),通过relationship()和secondary参数表明多对多的中介关系。同时可以通过back_populates让关系双向访问。

典型的使用场景如下:

  1. 添加学生及其所选课程

    • 先创建Student和Course对象,通过Student.courses或Course.students关联对方。
    • 一次提交,可以自动写入学生、课程及两者的关联关系。
  2. 通过学生查询其所有课程

    • 数据库会自动组装该学生关联的全部课程对象。
  3. 通过课程查询所有选修的学生

    • 通过Course.students属性可直接访问所有选该课的学生。
关键词 说明
Table 定义中间关联表(不对应ORM类)
relationship() 定义多对多关系,需指定secondary参数
back_populates 让关系可双向访问
# 导入SQLAlchemy需要用到的函数和类
from sqlalchemy import create_engine, String, Integer, Table, ForeignKey, select, Column
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column, relationship

# 定义ORM基类
class Base(DeclarativeBase):
    pass

# 定义用于实现学生与课程多对多关系的关联表,不需要创建独立模型类
student_course = Table(
    # 表名为student_course
    "student_course",
    # 绑定到Base的元数据信息
    Base.metadata,
    # 定义学生id列,外键关联到students表的id,设置为主键
    Column("student_id", Integer, ForeignKey("students.id"), primary_key=True),
    # 定义课程id列,外键关联到courses表的id,设置为主键
    Column("course_id", Integer, ForeignKey("courses.id"), primary_key=True),
)

# 定义学生(Student)模型类
class Student(Base):
    # 设置表名为students
    __tablename__ = "students"
    # 定义主键id字段
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义学生姓名字段,最大长度50
    name: Mapped[str] = mapped_column(String(50))
    # 与Course模型建立多对多关系,通过student_course中间表,反向引用为students
    courses: Mapped[list["Course"]] = relationship(
        "Course", secondary=student_course, back_populates="students"
    )

    # 定义Student对象的字符串表示形式
    def __repr__(self):
        return f"<Student(id={self.id}, name='{self.name}')>"

# 定义课程(Course)模型类
class Course(Base):
    # 设置表名为courses
    __tablename__ = "courses"
    # 定义主键id字段
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义课程名称字段,最大长度100
    title: Mapped[str] = mapped_column(String(100))
    # 与Student模型建立多对多关系,通过student_course中间表,反向引用为courses
    students: Mapped[list["Student"]] = relationship(
        "Student", secondary=student_course, back_populates="courses"
    )

    # 定义Course对象的字符串表示形式
    def __repr__(self):
        return f"<Course(id={self.id}, title='{self.title}')>"

# 创建SQLite数据库连接,指定文件名为many2many_example.db,echo=True用于打印SQL日志
engine = create_engine("sqlite:///many2many_example.db", echo=True)
# 创建所有数据库表结构
Base.metadata.create_all(engine)

# 使用Session上下文开启数据库会话
with Session(engine) as session:
    # 创建两门课程对象
    course1 = Course(title="高等数学")
    course2 = Course(title="计算机基础")
    # 创建两个学生对象,分别选修不同的课程
    student1 = Student(name="小明", courses=[course1, course2])
    student2 = Student(name="小红", courses=[course2])
    # 将学生对象添加到会话中
    session.add_all([student1, student2])
    # 提交事务到数据库
    session.commit()
    # 打印操作成功的提示
    print("创建学生和课程成功")

    # 查询姓名为“小明”的学生及其所选课程
    stu = session.scalars(select(Student).where(Student.name=="小明")).first()
    # 打印学生姓名
    print(f"\n学生:{stu.name}")
    # 打印学生所选的所有课程名称
    print("所选课程:")
    for c in stu.courses:
        print(f"  - {c.title}")

    # 查询课程名称为“计算机基础”的课程及所有选修该课程的学生
    cour = session.scalars(select(Course).where(Course.title=="计算机基础")).first()
    # 打印课程名称
    print(f"\n课程:{cour.title}")
    # 打印所有选修该课程的学生姓名
    print("选修学生:")
    for s in cour.students:
        print(f"  - {s.name}")
# 开始隐式事务(自动开启,用于数据库操作)
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询student_course中间表的表结构
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("student_course")
# 执行原生SQL(没有参数返回)
INFO sqlalchemy.engine.Engine [raw sql] ()
# 查询students表的表结构
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("students")
# 执行原生SQL(没有参数返回)
INFO sqlalchemy.engine.Engine [raw sql] ()
# 查询courses表的表结构
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("courses")
# 执行原生SQL(没有参数返回)
INFO sqlalchemy.engine.Engine [raw sql] ()
# 提交事务(表结构创建完成)
INFO sqlalchemy.engine.Engine COMMIT
# 开始新的隐式事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 插入课程“高等数学”,返回该课程的id
INFO sqlalchemy.engine.Engine INSERT INTO courses (title) VALUES (?) RETURNING id
# 数据库已生成id=3,插入的参数为('高等数学',)
INFO sqlalchemy.engine.Engine [generated in 0.00006s (insertmanyvalues) 1/2 (ordered; batch not supported)] ('高等数学',)
# 插入课程“计算机基础”,返回该课程的id
INFO sqlalchemy.engine.Engine INSERT INTO courses (title) VALUES (?) RETURNING id
# 数据库已生成id=4,插入的参数为('计算机基础',)
INFO sqlalchemy.engine.Engine [insertmanyvalues 2/2 (ordered; batch not supported)] ('计算机基础',)
# 插入学生“小明”,返回该学生的id
INFO sqlalchemy.engine.Engine INSERT INTO students (name) VALUES (?) RETURNING id
# 数据库已生成id=3,插入的参数为('小明',)
INFO sqlalchemy.engine.Engine [generated in 0.00005s (insertmanyvalues) 1/2 (ordered; batch not supported)] ('小明',)
# 插入学生“小红”,返回该学生的id
INFO sqlalchemy.engine.Engine INSERT INTO students (name) VALUES (?) RETURNING id
# 数据库已生成id=4,插入的参数为('小红',)
INFO sqlalchemy.engine.Engine [insertmanyvalues 2/2 (ordered; batch not supported)] ('小红',)
# 向student_course关系表插入多对多的关联记录(学生和课程的对应关系)
INFO sqlalchemy.engine.Engine INSERT INTO student_course (student_id, course_id) VALUES (?, ?)
# 插入关联记录,参数分别为(3, 4), (4, 4), (3, 3)
INFO sqlalchemy.engine.Engine [generated in 0.00015s] [(3, 4), (4, 4), (3, 3)]
# 提交插入操作
INFO sqlalchemy.engine.Engine COMMIT
# 打印提示:表示学生和课程创建成功
创建学生和课程成功
# 开始新的隐式事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询students表,查找name为“小明”的学生
INFO sqlalchemy.engine.Engine SELECT students.id, students.name
FROM students
WHERE students.name = ?
# 真正的参数为('小明',),查出来
INFO sqlalchemy.engine.Engine [generated in 0.00024s] ('小明',)
# 打印学生姓名
学生:小明
# 打印学生所选课程提示
所选课程:
# 查询courses表及student_course表,查找该学生(主键=1)对应的所有课程
INFO sqlalchemy.engine.Engine SELECT courses.id AS courses_id, courses.title AS courses_title
FROM courses, student_course
WHERE ? = student_course.student_id AND courses.id = student_course.course_id
# 参数为(1,),查出来结果
INFO sqlalchemy.engine.Engine [generated in 0.00043s] (1,)
# 打印学生选课结果
  - 高等数学
  - 计算机基础
# 查询courses表,查找title为“计算机基础”的课程
INFO sqlalchemy.engine.Engine SELECT courses.id, courses.title
FROM courses
WHERE courses.title = ?
# 参数为('计算机基础',),查出课程
INFO sqlalchemy.engine.Engine [generated in 0.00036s] ('计算机基础',)
# 打印课程名称
课程:计算机基础
# 打印选修该课程的学生提示
选修学生:
# 查询students表及student_course表,查找所有选修该课程(主键=2)的学生
INFO sqlalchemy.engine.Engine SELECT students.id AS students_id, students.name AS students_name
FROM students, student_course
WHERE ? = student_course.course_id AND students.id = student_course.student_id
# 参数为(2,),查出所有学生
INFO sqlalchemy.engine.Engine [generated in 0.00036s] (2,)
# 打印选修学生姓名
  - 小明
  - 小红
# 回滚事务(一般是在session关闭后自动回滚未提交的更改)
INFO sqlalchemy.engine.Engine ROLLBACK

9. 复杂条件与聚合 #

9.1 复杂条件查询 #

在实际业务中,我们经常会遇到需要根据多个条件综合查询的需求,比如“查找年龄大于某个值且城市为某地的用户”,或者“查找满足多个不同条件之一的记录”等。这时可以使用SQLAlchemy中的and_、or_等函数来拼接复杂的查询条件,实现灵活的多条件过滤。例如,可以查询“年龄大于26且城市为北京”,或者“年龄小于30或所在城市为上海”的所有用户。

此外,SQLAlchemy还可以方便地与LIKE模糊匹配、IN列表查询等结合,极大地增强了查询数据的能力。这些复杂条件的查询方式与直接书写SQL相比,ORM风格更优雅、更易维护,同时也更安全(能有效防止SQL注入)。

# 导入SQLAlchemy必要模块和函数
from sqlalchemy import create_engine, String, and_, or_, select
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

# 定义ORM模型基类
class Base(DeclarativeBase):
    pass

# 定义用户表对应的User类
class User(Base):
    # 指定表名为users
    __tablename__ = "users"
    # id字段,主键,自增长
    id: Mapped[int] = mapped_column(primary_key=True)
    # name字段,最大长度50
    name: Mapped[str] = mapped_column(String(50))
    # age字段
    age: Mapped[int]
    # city字段,最大长度50
    city: Mapped[str] = mapped_column(String(50))

    # 定义User实例的字符串显示格式
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', age={self.age}, city='{self.city}')>"

# 创建sqlite引擎,echo=True用于打印生成的SQL
engine = create_engine("sqlite:///query_example.db", echo=True)
# 根据ORM模型生成数据库表
Base.metadata.create_all(engine)

# 创建会话
with Session(engine) as session:
    # 构建用户数据列表
    users = [
        User(name="张三", age=25, city="北京"),
        User(name="李四", age=30, city="上海"),
        User(name="王五", age=28, city="北京"),
        User(name="赵六", age=35, city="广州"),
    ]
    # 添加所有用户数据到会话
    session.add_all(users)
    # 提交会话到数据库持久化数据
    session.commit()

    # 使用AND条件查询:年龄大于25并且城市为北京的用户
    result1 = session.scalars(select(User).where(and_(User.age > 25, User.city == "北京"))).all()
    # 输出查询结果
    print("年龄大于25且城市为北京的用户:")
    for user in result1:
        # 打印用户信息
        print(user)

    # 使用OR条件查询:年龄小于25或者城市为上海的用户
    result2 = session.scalars(select(User).where(or_(User.age < 25, User.city == "上海"))).all()
    # 输出查询结果
    print("\n年龄小于25或城市为上海的用户:")
    for user in result2:
        # 打印用户信息
        print(user)

    # 使用LIKE查询:姓名以'张'开头的用户
    result3 = session.scalars(select(User).where(User.name.like("张%"))).all()
    # 输出查询结果
    print("\n姓名以'张'开头的用户:")
    for user in result3:
        # 打印用户信息
        print(user)

    # 使用IN查询:城市为北京或上海的用户
    result4 = session.scalars(select(User).where(User.city.in_(["北京", "上海"]))).all()
    # 输出查询结果
    print("\n城市为北京或上海的用户:")
    for user in result4:
        # 打印用户信息
        print(user)

    # 使用BETWEEN范围查询:年龄在28到32之间的用户(包含28和32)
    result5 = session.scalars(select(User).where(User.age.between(28, 32))).all()
    # 输出查询结果
    print("\n年龄在28到32之间的用户:")
    for user in result5:
        # 打印用户信息
        print(user)
# 隐式开启事务(自动事务)
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询users表的主表结构信息
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("users")
# 执行原始SQL(查询主表结构)
INFO sqlalchemy.engine.Engine [raw sql] ()
# 查询users表的临时表结构信息
INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("users")
# 执行原始SQL(查询临时表结构)
INFO sqlalchemy.engine.Engine [raw sql] ()
# 创建users表,包含id、name、age、city字段,id为主键
INFO sqlalchemy.engine.Engine
CREATE TABLE users (
        id INTEGER NOT NULL,
        name VARCHAR(50) NOT NULL,
        age INTEGER NOT NULL,
        city VARCHAR(50) NOT NULL,
        PRIMARY KEY (id)
)

# 创建表操作执行完成,无返回键
INFO sqlalchemy.engine.Engine [no key 0.00017s] ()
# 提交创建表事务
INFO sqlalchemy.engine.Engine COMMIT
# 隐式开启新的事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 插入第一条用户记录,返回插入的id
INFO sqlalchemy.engine.Engine INSERT INTO users (name, age, city) VALUES (?, ?, ?) RETURNING id
# 第一条数据已生成并绑定参数(张三, 25, 北京)
INFO sqlalchemy.engine.Engine [generated in 0.00008s (insertmanyvalues) 1/4 (ordered; batch not supported)] ('张三', 25, '北京')
# 插入第二条用户记录,返回插入的id
INFO sqlalchemy.engine.Engine INSERT INTO users (name, age, city) VALUES (?, ?, ?) RETURNING id
# 第二条数据已生成并绑定参数(李四, 30, 上海)
INFO sqlalchemy.engine.Engine [insertmanyvalues 2/4 (ordered; batch not supported)] ('李四', 30, '上海')
# 插入第三条用户记录,返回插入的id
INFO sqlalchemy.engine.Engine INSERT INTO users (name, age, city) VALUES (?, ?, ?) RETURNING id
# 第三条数据已生成并绑定参数(王五, 28, 北京)
INFO sqlalchemy.engine.Engine [insertmanyvalues 3/4 (ordered; batch not supported)] ('王五', 28, '北京')
# 插入第四条用户记录,返回插入的id
INFO sqlalchemy.engine.Engine INSERT INTO users (name, age, city) VALUES (?, ?, ?) RETURNING id
# 第四条数据已生成并绑定参数(赵六, 35, 广州)
INFO sqlalchemy.engine.Engine [insertmanyvalues 4/4 (ordered; batch not supported)] ('赵六', 35, '广州')
# 插入数据事务提交
INFO sqlalchemy.engine.Engine COMMIT
# 隐式开启新的事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询users表,条件:age大于25且city为北京
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age, users.city
FROM users
WHERE users.age > ? AND users.city = ?
# 记录SQL参数(25, '北京')
INFO sqlalchemy.engine.Engine [generated in 0.00020s] (25, '北京')
# 打印筛选结果标题
年龄大于25且城市为北京的用户:
# 打印查询结果对象
<User(id=3, name='王五', age=28, city='北京')>
# 查询users表,条件:age小于25或city为上海
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age, users.city
FROM users
WHERE users.age < ? OR users.city = ?
# 记录SQL参数(25, '上海')
INFO sqlalchemy.engine.Engine [generated in 0.00018s] (25, '上海')
# 打印筛选结果标题
年龄小于25或城市为上海的用户:
# 打印查询结果对象
<User(id=2, name='李四', age=30, city='上海')>
# 查询users表,条件:name以'张'开头
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age, users.city
FROM users
WHERE users.name LIKE ?
# 记录SQL参数('张%',)
INFO sqlalchemy.engine.Engine [generated in 0.00018s] ('张%',)
# 打印筛选结果标题
姓名以'张'开头的用户:
# 打印查询结果对象
<User(id=1, name='张三', age=25, city='北京')>
# 查询users表,条件:city为北京或上海
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age, users.city
FROM users
WHERE users.city IN (?, ?)
# 记录SQL参数('北京', '上海')
INFO sqlalchemy.engine.Engine [generated in 0.00020s] ('北京', '上海')
# 打印筛选结果标题
城市为北京或上海的用户:
# 打印所有查询结果对象
<User(id=1, name='张三', age=25, city='北京')>
<User(id=2, name='李四', age=30, city='上海')>
<User(id=3, name='王五', age=28, city='北京')>
# 查询users表,条件:age在28到32之间(含边界)
INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.age, users.city
FROM users
WHERE users.age BETWEEN ? AND ?
# 记录SQL参数(28, 32)
INFO sqlalchemy.engine.Engine [generated in 0.00019s] (28, 32)
# 打印筛选结果标题
年龄在28到32之间的用户:
# 打印所有查询结果对象
<User(id=2, name='李四', age=30, city='上海')>
<User(id=3, name='王五', age=28, city='北京')>
# 结束会话(事务回滚)
INFO sqlalchemy.engine.Engine ROLLBACK

9.2 聚合查询 #

在实际数据库开发中,我们经常需要对数据进行统计和汇总分析,这就是“聚合查询”。聚合操作可以快速返回某些字段的统计值,比如总数(count)、平均值(avg)、最大值(max)、最小值(min)、总和(sum)等。SQLAlchemy通过func模块调用这些数据库原生聚合函数,非常简单直观。

常见的聚合用途包括:

  • 统计记录总数:如统计用户总人数。
  • 计算平均值:如所有用户的平均年龄。
  • 查找最大/最小值:如用户的最大和最小年龄。
  • 计算字段总和:如所有用户年龄的合计。

同时,配合group_by语句,可以实现按照某个字段分组后分别统计每组的相关汇总数据。例如,可以统计每个城市的用户数量、每个城市的平均年龄等。

# 导入SQLAlchemy的数据库引擎、类型定义和ORM相关类
from sqlalchemy import create_engine, String, Integer, select, func
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

# 定义基础模型类,所有ORM模型都需继承此类
class Base(DeclarativeBase):
    pass

# 定义用户表对应的ORM模型
class User(Base):
    # 指定数据库表名为users
    __tablename__ = "users"
    # 定义主键id字段,类型为int
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义姓名字段,类型为字符串,最大长度50
    name: Mapped[str] = mapped_column(String(50))
    # 定义年龄字段,类型为int
    age: Mapped[int]
    # 定义城市字段,类型为字符串,最大长度50
    city: Mapped[str] = mapped_column(String(50))

    # 定义对象的字符串表示,用于打印调试
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', age={self.age})>"

# 创建sqlite数据库引擎,数据库文件名为query_example.db,echo=True会输出SQL日志
engine = create_engine("sqlite:///query_example.db", echo=True)
# 根据ORM模型创建数据表(如果不存在则创建)
Base.metadata.create_all(engine)

# 创建数据库会话,并在会话范围内进行数据操作
with Session(engine) as session:
    # 查询所有用户的平均年龄
    avg_age = session.execute(select(func.avg(User.age))).scalar_one()
    # 打印平均年龄,保留两位小数
    print(f"平均年龄:{avg_age:.2f}")

    # 查询所有用户中的最大年龄
    max_age = session.execute(select(func.max(User.age))).scalar_one()
    # 打印最大年龄
    print(f"最大年龄:{max_age}")

    # 查询所有用户中的最小年龄
    min_age = session.execute(select(func.min(User.age))).scalar_one()
    # 打印最小年龄
    print(f"最小年龄:{min_age}")

    # 查询所有用户年龄总和
    total_age = session.execute(select(func.sum(User.age))).scalar_one()
    # 打印年龄总和
    print(f"年龄总和:{total_age}")

    # 查询所有用户的数量
    user_count = session.execute(select(func.count(User.id))).scalar_one()
    # 打印用户总数
    print(f"用户总数:{user_count}")

    # 按城市对用户进行分组统计,每个城市对应用户数量和平均年龄
    city_stats = session.execute(
        select(
            User.city,
            func.count(User.id).label("count"),
            func.avg(User.age).label("avg_age")
        ).group_by(User.city)
    ).all()
    # 打印分组统计结果
    print("\n按城市分组统计:")
    # 遍历每个城市的分组统计结果,依次输出
    for city, count, avg_age in city_stats:
        print(f"  {city}: {count}人, 平均年龄{avg_age:.2f}")
# 开始隐式事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询users表结构信息
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("users")
# 执行原始SQL
INFO sqlalchemy.engine.Engine [raw sql] ()
# 提交事务(表结构查询后)
INFO sqlalchemy.engine.Engine COMMIT
# 再次开始一个隐式事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询所有用户的平均年龄
INFO sqlalchemy.engine.Engine SELECT avg(users.age) AS avg_1
FROM users
# 查询执行耗时
INFO sqlalchemy.engine.Engine [generated in 0.00014s] ()
# 打印平均年龄
平均年龄:29.50
# 查询所有用户中的最大年龄
INFO sqlalchemy.engine.Engine SELECT max(users.age) AS max_1
FROM users
# 查询执行耗时
INFO sqlalchemy.engine.Engine [generated in 0.00013s] ()
# 打印最大年龄
最大年龄:35
# 查询所有用户中的最小年龄
INFO sqlalchemy.engine.Engine SELECT min(users.age) AS min_1
FROM users
# 查询执行耗时
INFO sqlalchemy.engine.Engine [generated in 0.00011s] ()
# 打印最小年龄
最小年龄:25
# 查询所有用户年龄总和
INFO sqlalchemy.engine.Engine SELECT sum(users.age) AS sum_1
FROM users
# 查询执行耗时
INFO sqlalchemy.engine.Engine [generated in 0.00012s] ()
# 打印年龄总和
年龄总和:118
# 查询所有用户的数量
INFO sqlalchemy.engine.Engine SELECT count(users.id) AS count_1
FROM users
# 查询执行耗时
INFO sqlalchemy.engine.Engine [generated in 0.00016s] ()
# 打印用户总数
用户总数:4
# 按城市对用户分组统计数量与平均年龄
INFO sqlalchemy.engine.Engine SELECT users.city, count(users.id) AS count, avg(users.age) AS avg_age
FROM users GROUP BY users.city
# 查询执行耗时
INFO sqlalchemy.engine.Engine [generated in 0.00019s] ()

# 打印按城市分组的统计结果
按城市分组统计:
  上海: 1人, 平均年龄30.00
  北京: 2人, 平均年龄26.50
  广州: 1人, 平均年龄35.00
# 回滚事务
INFO sqlalchemy.engine.Engine ROLLBACK

10. 事务管理 #

事务:一组要么全成功、要么全失败的数据库操作。SQLAlchemy 2.0+ 推荐直接使用 with Session(engine) as session 上下文管理器自动管理事务,无需手动关闭会话。

10.1 基本事务操作 #

# 导入SQLAlchemy中创建数据库引擎和字符串类型所需的模块
from sqlalchemy import create_engine, String
# 导入声明性基类、会话、类型映射和列映射相关的模块
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

# 定义基础模型类,所有ORM模型都要继承自Base
class Base(DeclarativeBase):
    pass

# 定义User模型,映射到"users"数据表
class User(Base):
    # 指定表名为"users"
    __tablename__ = "users"
    # 定义id字段,为主键,类型为整型
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义name字段,最大长度为50的字符串类型
    name: Mapped[str] = mapped_column(String(50))
    # 定义age字段,类型为整型
    age: Mapped[int]

    # 定义对象的字符串表示形式,便于调试和打印
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', age={self.age})>"

# 创建数据库引擎,使用SQLite文件数据库,并且开启SQL语句回显
engine = create_engine("sqlite:///transaction_example.db", echo=True)
# 根据模型类创建所有数据表
Base.metadata.create_all(engine)

# 使用上下文管理器自动管理事务(推荐方式)
try:
    # 使用with语句创建数据库会话,会自动管理打开和关闭
    with Session(engine) as session:
        # 创建第一个用户对象
        user1 = User(name="用户1", age=20)
        # 创建第二个用户对象
        user2 = User(name="用户2", age=25)
        # 将两个用户对象添加到会话中
        session.add_all([user1, user2])
        # 提交事务,将数据写入数据库
        session.commit()
        # 打印事务提交成功的消息
        print("事务提交成功")
# 捕获异常并输出错误信息
except Exception as e:
    print(f"发生错误:{e}")

说明:

  • 使用 with Session(engine) as session: 上下文管理器,退出时会自动关闭会话
  • 如果发生异常或未提交事务,上下文管理器会自动回滚
  • 推荐在生产环境中使用 try-except 进行异常处理

10.2 高级事务操作 #

# 导入SQLAlchemy中创建数据库引擎和字符串类型所需的模块
from sqlalchemy import create_engine, String
# 导入声明性基类、会话、类型映射和列映射相关的模块
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

# 定义基础模型类,所有ORM模型都要继承自Base
class Base(DeclarativeBase):
    pass

# 定义User模型,映射到"users"数据表
class User(Base):
    # 指定表名为"users"
    __tablename__ = "users"
    # 定义id字段,为主键,类型为整型
    id: Mapped[int] = mapped_column(primary_key=True)
    # 定义name字段,最大长度为50的字符串类型
    name: Mapped[str] = mapped_column(String(50))
    # 定义age字段,类型为整型
    age: Mapped[int]

    # 定义对象的字符串表示形式,便于调试和打印
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', age={self.age})>"

# 创建数据库引擎,使用SQLite文件数据库,并且开启SQL语句回显
engine = create_engine("sqlite:///transaction_example.db", echo=True)
# 根据模型类创建所有数据表
Base.metadata.create_all(engine)


# 定义事务管理类,实现自动提交和自动回滚
class Transaction:
    """事务管理类,使用上下文管理器自动处理事务的提交和回滚"""

    def __init__(self, engine):
        """
        初始化事务管理器

        参数:
            engine: SQLAlchemy数据库引擎
        """
        self.engine = engine
        self.session = None

    def __enter__(self):
        """
        进入上下文时创建数据库会话

        返回:
            Session对象,用于执行数据库操作
        """
        self.session = Session(self.engine)
        return self.session

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        退出上下文时自动处理事务

        参数:
            exc_type: 异常类型,如果没有异常则为None
            exc_val: 异常值
            exc_tb: 异常追踪信息

        返回:
            False表示不抑制异常,True表示抑制异常
        """
        try:
            if exc_type is None:
                # 没有异常,自动提交事务
                self.session.commit()
                print("事务提交成功")
            else:
                # 发生异常,自动回滚事务
                self.session.rollback()
                print(f"发生错误:{exc_val},事务已回滚")
        except Exception as e:
            # 提交或回滚时发生错误
            print(f"事务处理时发生错误:{e}")
            try:
                self.session.rollback()
            except:
                pass
        finally:
            # 确保会话被关闭
            self.session.close()

        # 返回False,不抑制异常,让异常继续传播
        return False


# 使用Transaction类自动管理事务
with Transaction(engine) as session:
    # 创建第一个用户对象
    user1 = User(name="用户1", age=20)
    # 创建第二个用户对象
    user2 = User(name="用户2", age=25)
    # 将两个用户对象添加到会话中
    session.add_all([user1, user2])
    # 不需要手动调用commit(),Transaction会自动提交
# 隐式开启事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 查询users表的表结构信息
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("users")
# 执行PRAGMA语句(原始SQL)
INFO sqlalchemy.engine.Engine [raw sql] ()
# 提交事务(表结构查询后)
INFO sqlalchemy.engine.Engine COMMIT
# 隐式开启新的事务
INFO sqlalchemy.engine.Engine BEGIN (implicit)
# 插入一条用户记录,包含name和age字段,返回自增id
INFO sqlalchemy.engine.Engine INSERT INTO users (name, age) VALUES (?, ?) RETURNING id
# 第一条用户数据已生成并绑定参数(“用户1”, 20)
INFO sqlalchemy.engine.Engine [generated in 0.00006s (insertmanyvalues) 1/2 (ordered; batch not supported)] ('用户1', 20)
# 插入第二条用户记录,返回自增id
INFO sqlalchemy.engine.Engine INSERT INTO users (name, age) VALUES (?, ?) RETURNING id
# 第二条用户数据已生成并绑定参数(“用户2”, 25)
INFO sqlalchemy.engine.Engine [insertmanyvalues 2/2 (ordered; batch not supported)] ('用户2', 25)
# 提交插入数据事务
INFO sqlalchemy.engine.Engine COMMIT
# 打印“事务提交成功”
事务提交成功

11. 常见错误与最佳实践 #

11.1 错误 1:忘记提交事务 #

# 错误示例
# 使用with语句创建数据库会话
with Session(engine) as session:
    # 创建一个User对象,姓名为"张三",年龄为25
    user = User(name="张三", age=25)
    # 将User对象添加到会话
    session.add(user)
    # 没有调用commit(),数据不会写入数据库!

# 正确示例
# 使用with语句创建数据库会话
with Session(engine) as session:
    # 创建一个User对象,姓名为"张三",年龄为25
    user = User(name="张三", age=25)
    # 将User对象添加到会话
    session.add(user)
    # 提交事务,数据才会真正写入数据库
    session.commit()

11.2 错误 2:忘记关闭会话 #

2.0+ 推荐始终用上下文,不手动close:

# 手动创建会话(容易忘记关闭)
session = Session(engine)
# 创建一个User对象
user = User(name="张三", age=25)
# 将User对象添加到会话
session.add(user)
# 提交事务
session.commit()
# 忘记调用close(),会话未关闭

# 推荐写法:使用with语句自动管理会话的开启与关闭
with Session(engine) as session:
    # 创建一个User对象
    user = User(name="张三", age=25)
    # 将User对象添加到会话
    session.add(user)
    # 提交事务
    session.commit()

11.3 错误 3:循环中频繁提交 #

# 错误示例:极低效(每添加一条数据都提交一次,效率极低)
with Session(engine) as session:
    # 循环100次,每次创建一个User对象
    for i in range(100):
        # 创建用户对象,姓名为 用户{i},年龄为20+i
        user = User(name=f"用户{i}", age=20+i)
        # 将用户添加到会话
        session.add(user)
        # 每次添加后立刻提交,极大影响性能
        session.commit()  # 每次循环都Commit

# 推荐:批量提交(提升效率,将所有数据一次性提交)
with Session(engine) as session:
    # 构建100个User对象的列表
    users = [User(name=f"用户{i}", age=20 + i) for i in range(100)]
    # 一次性将所有用户添加到会话
    session.add_all(users)
    # 只需要提交一次,显著提高插入效率
    session.commit()

11.4 最佳实践 #

  1. 始终用Session上下文管理器,自动关闭
  2. 批量操作推荐 add_all(),避免循环中多次提交
  3. 异常处理用try-except确保事务安全
  4. 模型定义分离,生产代码将模型单独放
  5. 类型提示让代码更可维护
← 上一节 sentence_transformers 下一节 SSE →

访问验证

请输入访问令牌

Token不正确,请重新输入