使用 SQLAlchemy 和 Python 的通用存储库






4.27/5 (5投票s)
使用 SQLAlchemy 和 Python 的通用存储库类的示例。
引言
仓库模式以及工作单元模式允许在应用程序的数据访问层和业务逻辑层之间创建一个抽象层。创建此层的目的是隔离数据访问层,以便我们可能进行的更改不会直接影响业务逻辑层。大多数情况下,使用通用仓库类,以避免重复代码。
在这里,我们将重点创建用于 SQLAlchemy
的通用仓库类,使用 Python
,并在 FastAPI
项目中使用它。
通用仓库
表格
AppBaseModelOrm
基类,用于通用属性或列TaskQueue
数据库表模型GroupQueue
数据库表模型
models.py:
import datetime
from sqlalchemy import Boolean, Column, Integer, String, \
DateTime, PickleType, Enum as EnumType, JSON
from sqlalchemy.dialects.postgresql import UUID
from db.database import Base
# common fields for all entities
class AppBaseModelOrm:
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
is_active = Column(Boolean, default=True) # soft delete
created_by = Column(Integer)
updated_by = Column(Integer, default=None)
created_datetime = Column(DateTime(timezone=True), default=datetime.datetime.utcnow)
updated_datetime = Column(DateTime(timezone=True), \
default=None, onupdate=datetime.datetime.utcnow)
account_id = Column(Integer)
# tables
class TaskQueue(AppBaseModelOrm, Base):
__tablename__ = "task_queues"
name = Column(String, index=True)
class GroupQueue(AppBaseModelOrm, Base):
__tablename__ = "group_queues"
name = Column(String, index=True)
Repository 类
TableRepository
是基础仓库类,包含一些基本操作/方法,例如
- 从表中读取数据
- 从/向表中添加/更新/删除行
在构造函数级别,仓库类期望
db:Session
数据库会话对象entity:object
表实体
table_repo.py:
from sqlalchemy import and_
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import false
from datetime import datetime
class TableRepository:
entity:object = NotImplementedError
db:Session = NotImplementedError
def __init__(self, db:Session, entity:object):
self.db = db
self.entity = entity
def get_all(self):
return self.db.query(self.entity)
def get_by_id(self, id:int):
return self.db.query(self.entity).filter(self.entity.id==id).one()
def find_by_id(self, id:int):
return self.db.query(self.entity).filter(self.entity.id==id).first()
def get_actives(self):
return self.db.query(self.entity).filter(self.entity.is_active==True)
def get_by_account_id(self, account_id:int):
return self.db.query(self.entity).filter(self.entity.account_id==account_id)
def get_actives_by_account_id(self, account_id:int):
return self.db.query(self.entity).filter\
(self.entity.is_active==True, self.entity.account_id==account_id)
def get_by_create_datetime_range(self, from_datetime:datetime, to_datetime:datetime):
data = self.db.query(self.entity).filter\
(self.entity.created_datetime >= from_datetime, \
self.entity.created_datetime <= to_datetime)
return data
def add(self, entity, created_by_user_id:int = None):
entity.created_by = created_by_user_id
self.db.add(entity)
def update(self, entity, updated_by_user_id:int = None):
entity.updated_by = updated_by_user_id
def delete(self, entity, deleted_by_user_id:int = None):
entity.is_active = False
self.update(entity, updated_by_user_id=deleted_by_user_id)
def permanent_delete(self, entity):
self.db.delete(entity)
使用仓库类
在这里,我们正在使用 TableRepository
仓库类,不进行任何继承,用于 TaskQueue
实体,并读/写表 task_queues
。
task_queue.py:
from typing import Optional, List
from fastapi import FastAPI, Request, Depends
from fastapi_utils.cbv import cbv
from fastapi_utils.inferring_router import InferringRouter
from sqlalchemy.orm import Session
from app import schemas
from app.depends.db_depend import get_db
from app.depends.auth_depend import get_current_user, CurrentUser
from db import models
from db.table_repo import TableRepository
router = InferringRouter()
@cbv(router)
class TaskQueue:
db: Session = Depends(get_db)
current_user:CurrentUser = Depends(get_current_user)
@router.get("/", response_model=List[schemas.TaskQueueSchema])
def get_all(self):
repo = TableRepository(self.db, models.TaskQueue)
items = repo.get_all().all()
return items
@router.get("/actives", response_model=List[schemas.TaskQueueSchema])
def get_actives(self):
repo = TableRepository(self.db, models.TaskQueue)
items = repo.get_actives().all()
return items
@router.get("/account/{account_id}", response_model=List[schemas.TaskQueueSchema])
def get_by_account(self, account_id: int):
repo = TableRepository(self.db, models.TaskQueue)
items = repo.get_by_account_id(account_id).all()
return items
@router.get("/account/{account_id}/actives", \
response_model=List[schemas.TaskQueueSchema])
def get_actives_by_account(self, account_id: int):
repo = TableRepository(self.db, models.TaskQueue)
items = repo.get_actives_by_account_id(account_id).all()
return items
@router.get("/{id}", response_model=schemas.TaskQueueSchema)
def get_by_id(self, id: int):
repo = TableRepository(self.db, models.TaskQueue)
item = repo.get_by_id(id)
return item
@router.get("/find/{id}", response_model=schemas.TaskQueueSchema)
def find_by_id(self, id: int):
'''can be null'''
repo = TableRepository(self.db, models.TaskQueue)
item = repo.find_by_id(id)
return item
@router.post("/", response_model=schemas.TaskQueueSchema)
def post_item(self, model: schemas.TaskQueueCreate):
item = models.TaskQueue(name=model.name, account_id=model.account_id)
repo = TableRepository(self.db, models.TaskQueue)
repo.add(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
@router.put("/{id}", response_model=schemas.TaskQueueSchema)
def put_item(self, id:int, model: schemas.TaskQueueUpdate):
'''can be null'''
repo = TableRepository(self.db, models.TaskQueue)
item = repo.find_by_id(id)
if item:
item.name = model.name
repo.update(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
@router.delete("/{id}", response_model=schemas.TaskQueueSchema)
def delete_item(self, id: int):
'''can be null'''
repo = TableRepository(self.db, models.TaskQueue)
item = repo.find_by_id(id)
if item:
repo.delete(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
@router.delete("/permanent/{id}", response_model=schemas.TaskQueueSchema)
def permanent_delete_item(self, id: int):
'''can be null'''
repo = TableRepository(self.db, models.TaskQueue)
item = repo.find_by_id(id)
if item:
repo.permanent_delete(item)
self.db.commit()
return item
将仓库类作为基础使用
GroupQueueCrud
继承了仓库类 TableRepository
,用于 GroupQueue
实体,并读/写表 group_queues
继承仓库类
group_queue_crud.py:
from sqlalchemy.orm import Session
from app import schemas
from db import models
from db.table_repo import TableRepository
class GroupQueueCrud(TableRepository):
def __init__(self, db:Session):
super().__init__(db=db, entity=models.GroupQueue)
使用 CRUD 类
group_queue.py:
from datetime import datetime
from typing import Optional, List
from fastapi import FastAPI, Request, Depends, Query
from fastapi_utils.cbv import cbv
from fastapi_utils.inferring_router import InferringRouter
from sqlalchemy.orm import Session
from app import schemas
from app.depends.db_depend import get_db
from app.depends.auth_depend import get_current_user, CurrentUser
from app.cruds.group_queue_crud import GroupQueueCrud
from db import models
router = InferringRouter()
@cbv(router)
class GroupQueue:
db: Session = Depends(get_db)
current_user:CurrentUser = Depends(get_current_user)
@router.get("/{id}", response_model=schemas.GroupQueueSchema)
def get_by_id(self, id: int):
repo = GroupQueueCrud(self.db)
item = repo.find_by_id(id)
return item
@router.post("/", response_model=schemas.GroupQueueSchema)
def post_item(self, model: schemas.GroupQueueCreate):
item = models.GroupQueue(name=model.name, account_id=model.account_id)
repo = GroupQueueCrud(self.db)
repo.add(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
@router.put("/{id}", response_model=schemas.GroupQueueSchema)
def put_item(self, id:int, model: schemas.GroupQueueUpdate):
'''can be null'''
repo = GroupQueueCrud(self.db)
item = repo.find_by_id(id)
if item:
item.name = model.name
repo.update(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
@router.delete("/{id}", response_model=schemas.GroupQueueSchema)
def delete_item(self, id: int):
'''can be null'''
repo = GroupQueueCrud(self.db)
item = repo.find_by_id(id)
if item:
repo.delete(item, self.current_user.id)
self.db.commit()
self.db.refresh(item)
return item
Using the Code
Go to backend folder
Open cmd
Type docker-compose up -d
\backend> docker-compose up -d
project will run https://:4003
Go to Api Doc
https://:4003/docs#/
参考文献
历史
- 2022 年 7 月 5 日:初始版本