title: 异步之舞:FastAPI与MongoDB的深度协奏
date: 2025/05/18 19:09:08
updated: 2025/05/18 19:09:08
author: cmdragon
excerpt:
MongoDB与FastAPI的基础集成方法。首先,环境要求包括Python 3.8+、MongoDB 4.4+、FastAPI 0.95+和Motor 3.1+,并提供了依赖安装命令。其次,通过Motor驱动配置异步数据库连接,使用Pydantic进行数据验证,并实现异步CRUD操作。此外,还展示了聚合管道实践和索引优化策略,如创建单字段索引、复合索引和文本索引。最后,提供了常见报错的解决方案,如ServerSelectionTimeoutError、ValidationError和查询性能低下的处理方法。
categories:
tags:
扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
探索数千个预构建的 AI 应用,开启你的下一个伟大创意:https://tools.cmdragon.cn/
安装所需依赖:
pip install fastapi==0.95.0
pip install motor==3.1.2
pip install pydantic==1.10.7
pip install python-multipart==0.0.6
pip install uvicorn==0.21.1
from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel, Field
import os
app = FastAPI()
# MongoDB配置模型
class MongoDBConfig:
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
DB_NAME = "fastapi_demo"
COLLECTION = "users"
# 异步数据库客户端
@app.on_event("startup")
async def startup_db_client():
app.mongodb_client = AsyncIOMotorClient(MongoDBConfig.MONGO_URI)
app.mongodb = app.mongodb_client[MongoDBConfig.DB_NAME]
@app.on_event("shutdown")
async def shutdown_db_client():
app.mongodb_client.close()
from bson import ObjectId
from typing import Optional
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
class UserCreate(BaseModel):
name: str = Field(..., min_length=3)
age: int = Field(..., gt=0)
tags: list[str] = []
class UserResponse(UserCreate):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
class Config:
json_encoders = {ObjectId: str}
@app.post("/users/")
async def create_user(user: User):
result = await db.users.insert_one(user.dict())
return {"id": str(result.inserted_id)}
@app.get("/users/{user_id}")
async def read_user(user_id: str):
if not ObjectId.is_valid(user_id):
raise HTTPException(400, "Invalid ID format")
user = await db.users.find_one({"_id": ObjectId(user_id)})
if not user:
raise HTTPException(404, "User not found")
# 转换 MongoDB 的 ObjectId 为字符串
user["id"] = str(user.pop("_id"))
return user
@app.patch("/users/{user_id}")
async def update_user(user_id: str, update_data: dict):
# 过滤无效字段
valid_fields = User.__annotations__.keys()
filtered_data = {k: v for k, v in update_data.items() if k in valid_fields}
result = await db.users.update_one(
{"_id": ObjectId(user_id)},
{"$set": filtered_data}
)
return {"modified_count": result.modified_count}
from fastapi import APIRouter
router = APIRouter()
@router.get("/users/stats/age-distribution")
async def get_age_distribution():
pipeline = [
{"$group": {
"_id": "$age",
"count": {"$sum": 1}
}},
{"$sort": {"_id": 1}}
]
results = []
async for doc in app.mongodb.users.aggregate(pipeline):
results.append({
"age": doc["_id"],
"count": doc["count"]
})
return results
async def create_indexes():
# 单字段索引
await app.mongodb.users.create_index("name", unique=True)
# 复合索引
await app.mongodb.users.create_index([("age", 1), ("tags", 1)])
# 文本索引
await app.mongodb.users.create_index([("name", "text")])
为什么在FastAPI中推荐使用Motor驱动而不是同步的PyMongo?
答案:FastAPI基于异步架构,Motor作为异步驱动可以避免阻塞事件循环,提升系统吞吐量。PyMongo的同步操作会阻塞整个事件循环,导致性能下降。
配置 Motor 驱动时,为什么要传入 io_loop=app.state.loop
参数?
处理 MongoDB 的日期字段时,Pydantic 模型为什么推荐使用 datetime.utcnow()
?
当收到"422 Unprocessable Entity"错误时,应该如何快速定位问题?
答案:检查请求数据是否符合Pydantic模型定义,使用Swagger文档验证请求格式。错误响应体包含详细的字段验证信息。
如何验证用户年龄字段必须是正整数?
age: int = Field(..., gt=0)
该定义表示age必须大于0(gt=greater than)
错误1:ServerSelectionTimeoutError
sudo systemctl status mongod
mongodb://username:password@host:port
错误2:ValidationError
from fastapi.exceptions import RequestValidationError
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
return JSONResponse(status_code=400, content={"detail": exc.errors()})
错误3:查询性能低下
explain()
分析查询计划余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:异步之舞:FastAPI与MongoDB的深度协奏 | cmdragon's Blog
参与评论
手机查看
返回顶部