南安市网站建设,怎么自己做网站空间,域名申请的理由和用途,湖北省工程建设信息官方网站postgresql 实践 pydantic 实践
1. SQLAlchemy 介绍
SQLAlchemy 是一个 ORM 框架。SQLAlchemy 是一个用于 Python 的 SQL 工具和对象关系映射#xff08;ORM#xff09;库。它允许你通过 Python 代码来与关系型数据库交互#xff0c;而不必直接编写SQL语句。 简单介绍一下…postgresql 实践 pydantic 实践
1. SQLAlchemy 介绍
SQLAlchemy 是一个 ORM 框架。SQLAlchemy 是一个用于 Python 的 SQL 工具和对象关系映射ORM库。它允许你通过 Python 代码来与关系型数据库交互而不必直接编写SQL语句。 简单介绍一下对象关系映射吧对象关系映射英语Object Relational Mapping简称 ORM或O/RM或O/R mapping是一种程序设计技术 用于实现面向对象编程语言里不同类型系统的数据之间的转换。 从效果上说它其实是创建了一个可在编程语言里使用的“虚拟对象数据库”。大白话对象模型与数据库表的映射。
1.1. SQLAlchemy 使用场景
SQLAlchemy 是一个强大的 Python ORM 框架主要应用于以下场景 数据库访问和操作 SQLAlchemy 提供了高层抽象来操作数据库可以避免写原生 SQL 语句。支持多种数据库后端MySQL、MongoDB、SQLite、PostgreSQL。 ORM映射 建立 Python 类与数据库表的映射关系简化数据模型的操作支持声明式操作。 复杂查询 SQLAlchemy 提供丰富的查询方式如过滤、分组、联结等可以构建复杂查询。 异步查询 基于Greenlet 等实现异步查询提高查询效率。 事务控制 通过 Session 管理数据库会话和事务。 工具集成 如数据迁移工具 Alembic可以实现 Schema 版本控制和迁移。 大数据集查询 基于 Pagination 实现数据分页避免大量数据查询内存溢出。 多数据库支持 支持 Postgres、MySQL、Oracle 等主流数据库。 Web框架集成 框架如 Flask 可以集成 SQLAlchemy便于 Web 应用开发。
2. SQLAlchemy 基本用法
参考 https://www.jb51.net/python/325524ud6.htm https://blog.51cto.com/u_13019/12307379
2.1. 安装 SQLAlchemy
在使用 SQLAlchemy 之前首先需要安装它。可以使用以下命令使用 pip 安装
pip install sqlalchemy
pip install pymysql # 安装 MySQL2.2. 连接数据库
使用 SQLAlchemy 连接到数据库需要提供数据库的连接字符串其中包含有关数据库类型、用户名、密码、主机和数据库名称的信息。
from sqlalchemy import create_engine# 例如连接到 SQLite 数据库
engine create_engine(sqlite:///example.db)# 例如连接到 MySQL 数据库
username your_mysql_username
password your_mysql_password
host your_mysql_host # 例如localhost 或 127.0.0.1
port your_mysql_port # 通常是 3306
database your_database_name
# 创建连接引擎
engine create_engine(fmysqlpymysql://{username}:{password}{host}:{port}/{database})2.3. 定义数据模型
使用 SQLAlchemy 的 ORM 功能可以定义 Python 类来映射数据库中的表。每个类对应数据库中的一张表类的属性对应表中的列。
# 导入必要的模块
from sqlalchemy import Column, Integer, String, Sequence
from sqlalchemy.ext.declarative import declarative_base# 创建一个基类用于定义数据模型的基本结构
Base declarative_base()# 定义一个数据模型类对应数据库中的 users 表
class User(Base):# 定义表名__tablename__ users# 定义列id是整数类型主键primary_keyTrue并使用 Sequence 生成唯一标识id Column(Integer, Sequence(user_id_seq), primary_keyTrue)# 定义列name是字符串类型最大长度为50name Column(String(50))# 定义列age是整数类型age Column(Integer)2.4. 创建表
通过在代码中调用 create_all 方法可以根据定义的模型创建数据库表。
Base.metadata.create_all(engine)2.5. 插入数据
使用 SQLAlchemy 进行插入数据的操作首先需要创建一个会话Session对象然后使用该对象添加数据并提交。
# 导入创建会话的模块
from sqlalchemy.orm import sessionmaker# 使用 sessionmaker 创建一个会话类 Session并绑定到数据库引擎bindengine
Session sessionmaker(bindengine)# 创建一个实例化的会话对象 session
session Session()# 创建一个新的 User 实例即要插入到数据库中的新用户
new_user User(nameJohn Doe, age30)# 将新用户添加到会话中即将其添加到数据库操作队列中
session.add(new_user)# 提交会话将所有在此会话中的数据库操作提交到数据库
session.commit()2.6. 使用事务添加数据
如果添加过程中发生任何错误我们将回滚事务确保数据库的一致性。
try:# 开始一个新的事务session.begin()# 创建新用户对象user1 User(nameAlice, emailaliceexample.com)user2 User(nameBob, emailbobexample.com)# 添加到会话中session.add(user1)session.add(user2)# 提交事务将所有更改保存到数据库session.commit()print(Users added successfully.)
except Exception as e:# 如果在添加用户过程中发生错误则回滚事务session.rollback()print(fAn error occurred: {e})
finally:# 关闭会话session.close()在这个示例中我们使用 session.begin() 显式地开始了一个新的事务。然后我们尝试添加两个新用户到会话中。如果在这个过程中没有发生任何错误我们使用 session.commit() 提交事务将所有更改保存到数据库中。但是如果在添加用户的过程中发生了任何异常例如由于重复的电子邮件地址或数据库连接问题我们将使用 session.rollback() 回滚事务确保数据库的一致性。
2.7. 查询数据
使用 SQLAlchemy 进行查询数据的操作可以通过查询语句或使用 ORM 查询接口。
# 使用查询语句
result engine.execute(SELECT * FROM users)# 使用 ORM 查询接口
users session.query(User).all()3. 复杂查询条件
3.1. 连接查询Join
假设我们有两个模型 User 和 Order 并且一个用户可以有多个订单。
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
Base declarative_base()
class TUsers(Base):__tablename__ usersid Column(Integer, primary_keyTrue, autoincrementTrue)name Column(String(32))orders relationship(TOrders, back_populatesuser) # 必须关联class TOrders(Base):__tablename__ ordersid Column(Integer, primary_keyTrue, autoincrementTrue)user_id Column(Integer, ForeignKey(users.id)) # 必须使用外键product Column(String(32))quantity Column(Integer)user relationship(TUsers, back_populatesorders) # 必须关联现在如果我们想要查询所有下过订单的用户及其订单信息我们可以进行连接查询
from sqlalchemy.orm import joinedload
# 加载所有用户的订单信息
users_with_orders session.query(TUsers).options(joinedload(TUsers.orders)).all()
for user in users_with_orders:print(fUser: {user.name})for order in user.orders:print(fOrder: {order.product}, Quantity: {order.quantity}, Id: {order.id})3.1.1. 双向关系relationship
更多 https://blog.csdn.net/JKQ8525350/article/details/139568447
3.2. 分组和聚合Grouping and Aggregation
假设我们想要统计每个用户下的订单总数。
from sqlalchemy import func
# 按用户分组并计算每个用户的订单数量
order_count_by_user session.query(User.id, User.name, func.count(Order.id).label(order_count)).\join(Order).group_by(User.id, User.name).all()
for user_id, user_name, order_count in order_count_by_user:print(fUser ID: {user_id}, Name: {user_name}, Order Count: {order_count})3.3. 子查询Subquery
如果我们想要找出订单数量超过平均订单数量的用户我们可以使用子查询。
from sqlalchemy import func, select
# 计算平均订单数量作为子查询
avg_order_quantity select([func.avg(Order.quantity).label(avg_quantity)]).select_from(Order).alias()
# 查询订单数量超过平均值的用户及其订单信息
users_above_avg session.query(User, Order.product, Order.quantity).\join(Order).filter(Order.quantity avg_order_quantity.c.avg_quantity).all()
for user, product, quantity in users_above_avg:print(fUser: {user.name}, Product: {product}, Quantity: {quantity})3.4. 复杂筛选条件Complex Filtering
假设我们想要找到名字以 “A” 开头的用户并且他们的订单中包含 “apple” 这个产品。
# 查询名字以“A”开头的用户且订单中包含“apple”产品的用户信息
users_with_apple session.query(User).join(Order).\filter(User.name.startswith(A)).\filter(Order.product.contains(apple)).\distinct().all() # 使用distinct() 确保结果中的用户不重复
for user in users_with_apple:print(fUser: {user.name})3.5. 分页查询
3.5.1. limit
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine# 假设已经有了一个定义好的模型和数据库引擎
engine create_engine(sqlite:///example.db)
Session sessionmaker(bindengine)
session Session()# 分页查询函数
def paginate_query(page, page_size):# 计算跳过的记录数offset (page - 1) * page_size# 执行分页查询results session.query(YourModel).order_by(YourModel.id).offset(offset).limit(page_size).all()return results# 使用分页查询
page 1
page_size 10
records paginate_query(page, page_size)3.5.2. slice
total session.query(YourModel).with_entities(func.count(YourModel.id)).scalar()
db_query session.query(YourModel).slice(page_start, page_end)4. SQLAlchemy 实践1
4.1. 定义数据模型类
我们定义三个数据模型类User用户、Post文章和Comment评论。这些类之间通过外键和关系进行关联。
# 导入 SQLAlchemy 中所需的模块
from sqlalchemy import create_engine, Column, Integer, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, relationship# 创建一个基类用于定义数据模型的基本结构
Base declarative_base()# 定义数据模型类 User对应数据库中的 users 表
class User(Base):__tablename__ users# 定义列id是整数类型作为主键id Column(Integer, primary_keyTrue)# 定义列username是字符串类型最大长度为50唯一且不可为空username Column(String(50), uniqueTrue, nullableFalse)# 定义列email是字符串类型最大长度为100唯一且不可为空email Column(String(100), uniqueTrue, nullableFalse)# 定义关系与 Post 类的关系为一对多关系通过 back_populates 指定反向关系属性名posts relationship(Post, back_populatesauthor)# 定义数据模型类 Post对应数据库中的 posts 表
class Post(Base):__tablename__ posts# 定义列id是整数类型作为主键id Column(Integer, primary_keyTrue)# 定义列title是字符串类型最大长度为100不可为空title Column(String(100), nullableFalse)# 定义列content是文本类型不可为空content Column(Text, nullableFalse)# 定义列user_id是整数类型外键关联到 users 表的 id 列user_id Column(Integer, ForeignKey(users.id))# 定义关系与 User 类的关系为多对一关系通过 back_populates 指定反向关系属性名author relationship(User, back_populatesposts)# 定义关系与 Comment 类的关系为一对多关系通过 back_populates 指定反向关系属性名comments relationship(Comment, back_populatespost)# 定义数据模型类 Comment对应数据库中的 comments 表
class Comment(Base):__tablename__ comments# 定义列id是整数类型作为主键id Column(Integer, primary_keyTrue)# 定义列text是文本类型不可为空text Column(Text, nullableFalse)# 定义列user_id是整数类型外键关联到 users 表的 id 列user_id Column(Integer, ForeignKey(users.id))# 定义列post_id是整数类型外键关联到 posts 表的 id 列post_id Column(Integer, ForeignKey(posts.id))# 定义关系与 User 类的关系为多对一关系author relationship(User)# 定义关系与 Post 类的关系为多对一关系通过 back_populates 指定反向关系属性名post relationship(Post, back_populatescomments)4.2. 创建数据库引擎和会话
这里我们选择了 SQLite 数据库并使用 create_all 创建相应的表。
# 导入 SQLAlchemy 中所需的模块
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker# 创建一个 SQLite 数据库引擎连接到名为 blog.db 的数据库文件
engine create_engine(sqlite:///blog.db)# 使用 Base 对象的 metadata 属性创建数据库中定义的所有表
Base.metadata.create_all(engine)# 使用 sessionmaker 创建一个会话类 Session并将其绑定到上面创建的数据库引擎
Session sessionmaker(bindengine)# 创建一个实例化的会话对象 session用于进行数据库操作
session Session()4.3. 进行数据库操作
这段代码演示了如何使用 SQLAlchemy 对数据库进行插入和查询操作。首先创建了一个用户、一篇文章和一条评论然后通过查询用户的方式打印出该用户的所有文章及评论。
# 创建一个新用户对象并设置其属性
user1 User(usernamejohn_doe, emailjohnexample.com)# 将新用户对象添加到会话表示要进行数据库插入操作
session.add(user1)# 提交会话将所有在此会话中的数据库操作提交到数据库
session.commit()# 创建一篇新文章对象并设置其属性
post1 Post(titleIntroduction to SQLAlchemy, contentSQLAlchemy is a powerful ORM for Python.)# 将文章的作者关联到之前创建的用户
post1.author user1# 将新文章对象添加到会话表示要进行数据库插入操作
session.add(post1)# 提交会话将所有在此会话中的数据库操作提交到数据库
session.commit()# 创建一条新评论对象并设置其属性
comment1 Comment(textGreat article!, authoruser1, postpost1)# 将评论对象添加到会话表示要进行数据库插入操作
session.add(comment1)# 提交会话将所有在此会话中的数据库操作提交到数据库
session.commit()# 查询用户名为 john_doe 的用户并打印其所有文章及评论
user session.query(User).filter_by(usernamejohn_doe).first()
print(fUser: {user.username})# 遍历用户的所有文章
for post in user.posts:print(fPost: {post.title})# 遍历文章的所有评论for comment in post.comments:print(fComment: {comment.text})5. SQLAlchemy 实践2 数据库初始化操作
入口 main.py
from sqlalchemy import create_engine
from sqlalchemy_utils import create_database, database_existsfrom database import Base
from schemas import UserBase, OrderBasedef is_db_exist(db_url: str):engine create_engine(db_url, max_overflow0, pool_size16, pool_timeout5, pool_recycle-1)if not database_exists(engine.url):create_database(engine.url)return Falseelse: return Truedef init_database(db_url: str):# 设置连接池的大小engine create_engine(db_url, max_overflow0, # 超过连接池大小外最多创建的连接pool_size16, # 连接池大小pool_timeout5, # 池中没有线程最多等待的时间否则报错pool_recycle-1 # 多久之后对线程池中的线程进行一次连接的回收重置)# 创建数据库Base.metadata.create_all(engine)if __name__ __main__:# 数据库参考db_host 127.0.0.1db_user rootdb_password lianapdb_url %s%s:%s%s/%s % (mysqlpymysql://, db_user, db_password, db_host, local_db)if not is_db_exist(db_url):init_database(db_url)from alchemy import AlchemyToolalchemytool AlchemyTool(db_urldb_url)db_user alchemytool.create_user(test_user)alchemytool.create_order(db_user.id)alchemytool.select()数据表结构 database.py
from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship# sqlalchemy
Base declarative_base()class TUsers(Base):__tablename__ usersid Column(Integer, primary_keyTrue, autoincrementTrue)name Column(String(32))orders relationship(TOrders, back_populatesuser) # 必须使用class TOrders(Base):__tablename__ ordersid Column(Integer, primary_keyTrue, autoincrementTrue)user_id Column(Integer, ForeignKey(users.id)) # 必须使用外键product Column(String(32))quantity Column(Integer)user relationship(TUsers, back_populatesorders) # 必须使用数据结构体 schemas.py
from pydantic import BaseModel, Field
from typing import Union, Optional, Literal, List, Dict## 任务基础结构体
class OrderBase(BaseModel):id: intuser_id: intproduct: strquantity: intclass Config:from_attributesTrueclass UserBase(BaseModel):id: int name: strorders: List[OrderBase]class Config:from_attributesTrue数据库操作 alchemy.py
from sqlalchemy import create_engine, and_, or_, func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker, joinedload
from sqlalchemy.pool import SingletonThreadPoolfrom database import Base
from database import TUsers, TOrders
from schemas import UserBase, OrderBaseclass AlchemyTool(object):def __init__(self, db_url: str):print(AlchemyTool init)# 设置连接池的大小db_config {pool_size: 10}#engine create_engine(SQLALCHEMY_DATABASE_URL, **db_config)# engine create_engine(db_url, # #poolclass SingletonThreadPool,# connect_args {check_same_thread: False}# )engine create_engine(db_url, max_overflow0, pool_size16, pool_timeout5, pool_recycle-1)# # 创建数据库# Base.metadata.create_all(engine)# 创建数据库连接self.__session sessionmaker(autocommitFalse, autoflushFalse, bindengine)()# ############################### User ###############################def create_user(self, name:str):db_location Nonetry:db_location TUsers(namename)self.__session.add(db_location)self.__session.commit()except SQLAlchemyError as e:print(fError.AlchemyTool.create_user SQLAlchemyError:{str(e)})self.__session.rollback()finally:passreturn db_locationdef create_order(self, user_id: int1):db_location Nonetry:# self.__session.begin()for index in range(5):db_location TOrders(user_iduser_id, productfproduct{index}, quantityindex)self.__session.add(db_location)self.__session.commit()except SQLAlchemyError as e:print(fError.AlchemyTool.create_order SQLAlchemyError:{str(e)})self.__session.rollback()finally:passreturn db_locationdef select(self):users_with_orders self.__session.query(TUsers).options(joinedload(TUsers.orders)).all()for user in users_with_orders:user_base UserBase.from_orm(user) # 一次性转换成 UserBase包含List[OrderBase]print(fUserBase: {user_base})for order in user.orders:# print(fOrder: {order.product}, Quantity: {order.quantity}, Id: {order.id})pass运行结果 python.exe .\main.py
AlchemyTool init
UserBase: id1 nametest_user orders[OrderBase(id1, user_id1, productproduct0, quantity0), OrderBase(id2, user_id1, productproduct1, quantity1), OrderBase(id3, user_id1, productproduct2, quantity2), OrderBase(id4, user_id1, productproduct3, quantity3), OrderBase(id5, user_id1, productproduct4, quantity4)]