65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 SQLAlchemy 和 Python 的通用存储库

2022 年 7 月 5 日

CPOL

1分钟阅读

viewsIcon

15665

downloadIcon

183

使用 SQLAlchemy 和 Python 的通用存储库类的示例。

引言

仓库模式以及工作单元模式允许在应用程序的数据访问层和业务逻辑层之间创建一个抽象层。创建此层的目的是隔离数据访问层,以便我们可能进行的更改不会直接影响业务逻辑层。大多数情况下,使用通用仓库类,以避免重复代码。

在这里,我们将重点创建用于 SQLAlchemy 的通用仓库类,使用 Python,并在 FastAPI 项目中使用它。

通用仓库

表格

  • AppBaseModelOrm 基类,用于通用属性或列
  • TaskQueue 数据库表模型
  • GroupQueue 数据库表模型

models.py:

import datetime
from sqlalchemy import Boolean, Column, Integer, String,  \
     DateTime, PickleType, Enum as EnumType, JSON
from sqlalchemy.dialects.postgresql import UUID
from db.database import Base

# common fields for all entities
class AppBaseModelOrm:
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    is_active = Column(Boolean, default=True)  # soft delete
    created_by = Column(Integer)
    updated_by = Column(Integer, default=None)
    created_datetime = Column(DateTime(timezone=True), default=datetime.datetime.utcnow)
    updated_datetime = Column(DateTime(timezone=True), \
                       default=None, onupdate=datetime.datetime.utcnow)

    account_id = Column(Integer)

# tables
class TaskQueue(AppBaseModelOrm, Base):
    __tablename__ = "task_queues"
    name = Column(String, index=True)

class GroupQueue(AppBaseModelOrm, Base):
    __tablename__ = "group_queues"
    name = Column(String, index=True)

Repository 类

TableRepository 是基础仓库类,包含一些基本操作/方法,例如

  • 从表中读取数据
  • 从/向表中添加/更新/删除行

在构造函数级别,仓库类期望

  • db:Session 数据库会话对象
  • entity:object 表实体

table_repo.py:

from sqlalchemy import and_
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import false
from datetime import datetime

class TableRepository:

    entity:object = NotImplementedError
    db:Session = NotImplementedError

    def __init__(self, db:Session, entity:object):
        self.db = db
        self.entity = entity

    def get_all(self):
        return self.db.query(self.entity)
           
    def get_by_id(self, id:int):
        return self.db.query(self.entity).filter(self.entity.id==id).one()

    def find_by_id(self, id:int):
        return self.db.query(self.entity).filter(self.entity.id==id).first()

    def get_actives(self):
        return self.db.query(self.entity).filter(self.entity.is_active==True)

    def get_by_account_id(self, account_id:int):
        return self.db.query(self.entity).filter(self.entity.account_id==account_id)

    def get_actives_by_account_id(self, account_id:int):
        return self.db.query(self.entity).filter\
        (self.entity.is_active==True, self.entity.account_id==account_id)

    def get_by_create_datetime_range(self, from_datetime:datetime, to_datetime:datetime):
        data = self.db.query(self.entity).filter\
        (self.entity.created_datetime >= from_datetime, \
        self.entity.created_datetime <= to_datetime)
        return data

    def add(self, entity, created_by_user_id:int = None):
        entity.created_by = created_by_user_id
        self.db.add(entity)      

    def update(self, entity, updated_by_user_id:int = None):
        entity.updated_by = updated_by_user_id

    def delete(self, entity, deleted_by_user_id:int = None):
        entity.is_active = False
        self.update(entity, updated_by_user_id=deleted_by_user_id)

    def permanent_delete(self, entity):
        self.db.delete(entity) 

使用仓库类

在这里,我们正在使用 TableRepository 仓库类,不进行任何继承,用于 TaskQueue 实体,并读/写表 task_queues

task_queue.py:

from typing import Optional, List
from fastapi import FastAPI, Request, Depends
from fastapi_utils.cbv import cbv
from fastapi_utils.inferring_router import InferringRouter
from sqlalchemy.orm import Session

from app import schemas
from app.depends.db_depend import get_db
from app.depends.auth_depend import get_current_user, CurrentUser
from db import models
from db.table_repo import TableRepository

router = InferringRouter()

@cbv(router)
class TaskQueue:
    db: Session = Depends(get_db)
    current_user:CurrentUser = Depends(get_current_user)

    @router.get("/", response_model=List[schemas.TaskQueueSchema])
    def get_all(self):
        repo = TableRepository(self.db, models.TaskQueue)
        items = repo.get_all().all()
        return items

    @router.get("/actives", response_model=List[schemas.TaskQueueSchema])
    def get_actives(self):
        repo = TableRepository(self.db, models.TaskQueue)
        items = repo.get_actives().all()
        return items

    @router.get("/account/{account_id}", response_model=List[schemas.TaskQueueSchema])
    def get_by_account(self, account_id: int):
        repo = TableRepository(self.db, models.TaskQueue)
        items = repo.get_by_account_id(account_id).all()
        return items

    @router.get("/account/{account_id}/actives", \
                 response_model=List[schemas.TaskQueueSchema])
    def get_actives_by_account(self, account_id: int):
        repo = TableRepository(self.db, models.TaskQueue)
        items = repo.get_actives_by_account_id(account_id).all()
        return items

    @router.get("/{id}", response_model=schemas.TaskQueueSchema)
    def get_by_id(self, id: int):
        repo = TableRepository(self.db, models.TaskQueue)
        item = repo.get_by_id(id)
        return item

    @router.get("/find/{id}", response_model=schemas.TaskQueueSchema)
    def find_by_id(self, id: int):
        '''can be null'''
        repo = TableRepository(self.db, models.TaskQueue)
        item = repo.find_by_id(id)
        return item

    @router.post("/", response_model=schemas.TaskQueueSchema)
    def post_item(self, model: schemas.TaskQueueCreate):
        item = models.TaskQueue(name=model.name, account_id=model.account_id)
        repo = TableRepository(self.db, models.TaskQueue)
        repo.add(item, self.current_user.id)
        self.db.commit()
        self.db.refresh(item)
        return item

    @router.put("/{id}", response_model=schemas.TaskQueueSchema)
    def put_item(self, id:int, model: schemas.TaskQueueUpdate):
        '''can be null'''
        repo = TableRepository(self.db, models.TaskQueue)
        item = repo.find_by_id(id)
        if item:
            item.name = model.name
            repo.update(item, self.current_user.id)
            self.db.commit()
            self.db.refresh(item)
        return item

    @router.delete("/{id}", response_model=schemas.TaskQueueSchema)
    def delete_item(self, id: int):
        '''can be null'''       
        repo = TableRepository(self.db, models.TaskQueue)
        item = repo.find_by_id(id)
        if item:
            repo.delete(item, self.current_user.id)
            self.db.commit()
            self.db.refresh(item)
        return item

    @router.delete("/permanent/{id}", response_model=schemas.TaskQueueSchema)
    def permanent_delete_item(self, id: int):
        '''can be null'''       
        repo = TableRepository(self.db, models.TaskQueue)
        item = repo.find_by_id(id)
        if item:
            repo.permanent_delete(item)
            self.db.commit()
        return item

将仓库类作为基础使用

GroupQueueCrud 继承了仓库类 TableRepository ,用于 GroupQueue 实体,并读/写表 group_queues

继承仓库类

group_queue_crud.py:

from sqlalchemy.orm import Session
from app import schemas
from db import models
from db.table_repo import TableRepository

class GroupQueueCrud(TableRepository):
    
    def __init__(self, db:Session): 
        super().__init__(db=db, entity=models.GroupQueue)

使用 CRUD 类

group_queue.py:

from datetime import datetime
from typing import Optional, List
from fastapi import FastAPI, Request, Depends, Query
from fastapi_utils.cbv import cbv
from fastapi_utils.inferring_router import InferringRouter
from sqlalchemy.orm import Session

from app import schemas
from app.depends.db_depend import get_db
from app.depends.auth_depend import get_current_user, CurrentUser
from app.cruds.group_queue_crud import GroupQueueCrud
from db import models

router = InferringRouter()


@cbv(router)
class GroupQueue:
    db: Session = Depends(get_db)
    current_user:CurrentUser = Depends(get_current_user)

    @router.get("/{id}", response_model=schemas.GroupQueueSchema)
    def get_by_id(self, id: int):
        repo = GroupQueueCrud(self.db)
        item = repo.find_by_id(id)
        return item    

    @router.post("/", response_model=schemas.GroupQueueSchema)
    def post_item(self, model: schemas.GroupQueueCreate):
        item = models.GroupQueue(name=model.name, account_id=model.account_id)
        repo = GroupQueueCrud(self.db)
        repo.add(item, self.current_user.id)
        self.db.commit()
        self.db.refresh(item)
        return item

    @router.put("/{id}", response_model=schemas.GroupQueueSchema)
    def put_item(self, id:int, model: schemas.GroupQueueUpdate):
        '''can be null'''
        repo = GroupQueueCrud(self.db)
        item = repo.find_by_id(id)
        if item:
            item.name = model.name
            repo.update(item, self.current_user.id)
            self.db.commit()
            self.db.refresh(item)
        return item

    @router.delete("/{id}", response_model=schemas.GroupQueueSchema)
    def delete_item(self, id: int):
        '''can be null'''       
        repo = GroupQueueCrud(self.db)
        item = repo.find_by_id(id)
        if item:
            repo.delete(item, self.current_user.id)
            self.db.commit()
            self.db.refresh(item)
        return item

Using the Code

Go to backend folder
Open cmd 
Type docker-compose up -d

\backend> docker-compose up -d

project will run https://:4003

Go to Api Doc
https://:4003/docs#/

查看 任务队列组队列 部分

参考文献

历史

  • 2022 年 7 月 5 日:初始版本
© . All rights reserved.