FastAPI
快速上手
- 创建
python -m venv venv
venv/Scripts/activate,bat
pip install "fastapi[all]"
uvicorn main:app --reload
- 依赖
- fastapi 本体
- uvicorn ASGI
- typing 类型扩展 标准库
- pydantic 接口类型定义
- Starlette web相关功能
- 自动生成文档
/docs
/redoc
/openapi.json
- 类型
- 基本 str int float bool bytes
- 嵌套
from typing import List
List[str]
Tuple[int, int, str]
Set[bytes]
Dict[str, float]
前者表键 后者表值
- 对象
Person
- 枚举
from enum import Enum
class ModelName(str, Enum):
def get_model(model_name: ModelName):
- Pydantic
py
# main.py
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
# /items/5?q=theQuery
@app.get("/items/{item_id}")
def read_item(item_id: int, q= None):
return {"item_id": item_id, "query": q}
py
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
is_offer: bool | None = None
# 请求体
@app.put("/items/{item_id}")
def update_item(item_id: int, item: Item):
return {"item_name": item.name, "item_id": item_id}
请求
- 请求中的参数
- 路径参数同时写在path中即生效
- g大于 l小于 than equal
- 参数为单类型时取为query
- 每个参数分开
- 没有默认值为必须
Query(不写default或者=...)
都表示必须- 多值
q: list[str]
?q=foo&q=bar
- alias title description deprecated min_length max_length regex
- 参数使用pydantic声明取为body
- 可多参数
Field
Pydantic 模型内部声明校验- 可以嵌套模型
- 路径参数同时写在path中即生效
- 响应
- 通过 response_model 参数来定义响应模型
- 使用 status_code 参数声明状态码
- 多模型间可用继承
py
from enum import Enum
from fastapi import FastAPI, Path
class ModelName(str, Enum):
alexnet = "alexnet"
resnet = "resnet"
lenet = "lenet"
app = FastAPI()
@app.get("/items/{item_id}")
async def read_items(
*,
item_id: int = Path(title="The ID of the item to get", gt=0, le=1000),
q: str,
):
results = {"item_id": item_id}
if q:
results.update({"q": q})
return results
@app.get("/models/{model_name}")
async def get_model(model_name: ModelName):
if model_name is ModelName.alexnet:
return {"model_name": model_name}
if model_name.value == "lenet":
return {"model_name": model_name}
return {"model_name": model_name}
py
from fastapi import FastAPI, Query
app = FastAPI()
@app.get("/items/{item_id}")
async def read_item(
item_id: str,
q: str | None = Query(default=None, max_length=5,regex="^reg")
):
if q:
return {"item_id": item_id, "q": q}
return {"item_id": item_id}
py
from fastapi import FastAPI
from pydantic import BaseModel
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
app = FastAPI()
@app.put("/items/{item_id}")
async def create_item(item_id: int, item: Item, q: str | None = None):
result = {"item_id": item_id, **item.dict()}
if q:
result.update({"q": q})
if item.tax:
price_with_tax = item.price + item.tax
result.update({"price_with_tax": price_with_tax})
return result
py
from fastapi import Cookie, FastAPI
app = FastAPI()
@app.get("/items/")
async def read_items(ads_id: str | None = Cookie(default=None)):
return {"ads_id": ads_id}
py
from fastapi import FastAPI, Header
app = FastAPI()
@app.get("/items/")
async def read_items(user_agent: str | None = Header(default=None)):
return {"User-Agent": user_agent}
校验
前面的部分应该拆分成请求和校验
等有空了改下
表单和文件
- 当接受的并非JSON而是表单时使用
Form
pip install python-multipart
- UploadFile
- filename content_type file
- write(data) read(?size) seek(offset) close()
py
from fastapi import FastAPI, Form
app = FastAPI()
@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):
return {"username": username}
py
from fastapi import FastAPI, File, UploadFile
app = FastAPI()
@app.post("/files/")
async def create_file(file: bytes = File()):
return {"file_size": len(file)}
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile):
return {"filename": file.filename}
py
from fastapi import FastAPI, File, Form, UploadFile
app = FastAPI()
@app.post("/files/")
async def create_file(
file: bytes = File(), fileb: UploadFile = File(), token: str = Form()
):
return {
"file_size": len(file),
"token": token,
"fileb_content_type": fileb.content_type,
}
杂项
- 错误处理
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="Item not found")
- 路径操作配置
- 通过传递参数给路径操作装饰器 ,即可轻松地配置路径操作、添加元数据
- JSON 兼容编码器
jsonable_encoder(item)
- 额外数据类型
UUID
数据库做IDdatetime.datetime
时间datetime.date
日期datetime.time
时间间隔datetime.timedelta
秒差frozenset
setbytes
字节Decimal
小数
- PUT & PATCH
依赖注入
要造汽车,可以买来所有的原材料属性丢进构造函数里,汽车类里面又分成引擎,轮胎等多个类,要是其中一个类要加一个属性,那么一层层的直到汽车类的构造函数里都要引入新的原材料,这样的话就比较麻烦了.因此使用依赖注入,造汽车我不关心你轮胎,引擎怎么造,我只要直接去买轮胎引擎就好了.在构造函数里我直接就输入的是轮胎对象,引擎对象,我自己只要再加个标牌就完事了,这样的方法就叫依赖注入,其目的叫控制反转.
py
from fastapi import Depends, FastAPI
app = FastAPI()
async def common_parameters(q: str | None = None, skip: int = 0, limit: int = 100):
return {"q": q, "skip": skip, "limit": limit}
@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
return commons
@app.get("/users/")
async def read_users(commons: dict = Depends(common_parameters)):
return commons
py
from fastapi import Depends, FastAPI
app = FastAPI()
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]
class CommonQueryParams:
def __init__(self, q: str | None = None, skip: int = 0, limit: int = 100):
self.q = q
self.skip = skip
self.limit = limit
@app.get("/items/")
async def read_items(commons: CommonQueryParams = Depends(CommonQueryParams)):
response = {}
if commons.q:
response.update({"q": commons.q})
items = fake_items_db[commons.skip : commons.skip + commons.limit]
response.update({"items": items})
return response
安全
- 登录验证
- 手机号和密码发给后端
- 密码转哈希存入数据库完成注册
- 登录时发送手机号和密码
- 查询数据库
- 验证哈希后生成token
- 后端返回token
- 前端保存token到LocalStorage
- 登录保持
- 前端发送请求时把token放在请求头中
- 后端获取token并检验有效性
- 得出手机号并去数据库读取请求内容
- 返回响应
py
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from pydantic import BaseModel
from passlib.context import CryptContext
from jose import jwt
from datetime import datetime, timedelta
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from orm import UserAll, hasUser, c_user, u_user, engine, SessionLocal, Base
# 连接数据库
Base.metadata.create_all(bind=engine)
app = FastAPI()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
theHash = CryptContext(schemes=["bcrypt"], deprecated="auto")
KEY = "09d25e094fa"
def c_token(phone: str):
expire = datetime.utcnow()+timedelta(days=7)
info = {"phone": phone, "exp": expire}
return jwt.encode(info, KEY)
def r_token(token: str):
info = jwt.decode(token, KEY, "HS256")
return info.get("phone"), info.get('exp')
@app.post("/user/sign")
# 注册
def sign(user: UserAll, db: Session = Depends(get_db)):
# 数据校验pass
if (hasUser(db, user.phone)):
return "用户已存在"
user.password = theHash.hash(user.password)
return c_user(db=db, user=user)
@app.get("/user/login")
# 登录
def login(user: UserAll, db: Session = Depends(get_db)):
pwd = hasUser(db, user.phone)
if (not pwd):
return '无用户'
if (theHash.verify(user.password, pwd)):
return c_token(user.phone)
else:
return "密码错误"
@app.post("/user/update")
# 改
def update(user: UserAll, db: Session = Depends(get_db)):
# 验证权限
return u_user(db, user)
@app.get("/user/me")
def test(token: str = Depends(OAuth2PasswordBearer(tokenUrl="token"))):
print(token)
return r_token(token)
py
from sqlalchemy import create_engine, Boolean, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
engine = create_engine(
"sqlite:///./sql_app.db",
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 表类
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
phone = Column(String, unique=True, index=True)
password = Column(String)
username = Column(String)
status = Column(Boolean, default=True)
class UserBase(BaseModel):
phone: str
class UserCreate(BaseModel):
password: str
username: str | None = None
class UserAll(UserBase):
password: str | None = None
username: str | None = None
is_active: bool | None = None
def hasUser(db: Session, phone: str):
u = db.query(User).filter(User.phone == phone).first()
if (u):
return u.password
else:
return False
# curd create update read delete
def c_user(db: Session, user: UserCreate):
username = user.username
if (username == None):
username = user.phone[-4:]
db_user = User(
phone=user.phone,
password=user.password,
username=username
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return True
def u_user(db: Session, user: UserAll):
arr = ['password', 'username', 'is_active']
dic = {}
for i in arr:
print(i)
if (user.__dict__[i] == None):
continue
dic[i] = user.__dict__[i]
db.query(User).filter(User.phone == user.phone).update(dic)
db.commit()
return True
中间件
py
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def theTime(request: Request, call_next):
start_time = time.time()
# 处理前
response = await call_next(request)
# 处理后
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
CORS
py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def main():
return {"message": "Hello World"}
ORM
pip install sqlalchemy
__init__.py
表示模块 实为空curd.py
增删改查database.py
创建连接main.py
models.py
ORM模型schemas.py
python
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
url = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine( url )
# connect_args={"check_same_thread": False} 只用于sqlite
# 会话类
SessionLocal = sessionmaker(bind=engine)
# autocommit=False, autoflush=False 可加
# ORM基类
Base = declarative_base()
python
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
# 一个类一个表
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
# 表关系
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
py
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: str | None = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
py
from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
py
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
文件组织
后台任务
py
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
# 具体任务
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
静态文件
py
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")