title: 异步日志监控:FastAPI与MongoDB的高效整合之道
date: 2025/05/27 17:49:39
updated: 2025/05/27 17:49:39
author: cmdragon
excerpt:
FastAPI与MongoDB整合实现日志监控系统的实战指南。首先配置MongoDB异步连接,定义日志数据模型。核心功能包括日志写入接口、聚合管道查询和索引优化。性能优化技巧涵盖批量写入和查询分页。常见报错解决方案涉及422 Validation Error和MongoClient连接超时。生产环境建议包括连接池配置、读写分离、慢查询监控和TTL索引。通过该方案,可构建日均千万级日志处理系统,建议配合Prometheus和Grafana进行监控和可视化。
categories:
tags:
扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
探索数千个预构建的 AI 应用,开启你的下一个伟大创意:https://tools.cmdragon.cn/
# 安装核心库
pip install fastapi==0.103.1
pip install motor==3.3.2
pip install pydantic==1.10.7
pip install uvicorn==0.23.2
from fastapi import FastAPI, Depends
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from datetime import datetime
app = FastAPI()
# MongoDB连接配置
async def get_db():
client = AsyncIOMotorClient(
"mongodb://admin:password@localhost:27017",
maxPoolSize=10,
minPoolSize=5
)
return client.log_db
# 日志数据模型
class LogEntry(BaseModel):
timestamp: datetime
level: str # DEBUG/INFO/WARNING/ERROR
service: str
message: str
metadata: dict = None
@app.post("/logs")
async def create_log(log: LogEntry, db=Depends(get_db)):
"""异步写入日志到MongoDB"""
log_dict = log.dict()
result = await db.logs.insert_one(log_dict)
return {"inserted_id": str(result.inserted_id)}
@app.get("/logs/stats")
async def get_log_stats(service: str, db=Depends(get_db)):
"""按服务统计错误日志数量"""
pipeline = [
{"$match": {
"service": service,
"level": "ERROR",
"timestamp": {"$gte": datetime(2023, 1, 1)}
}},
{"$group": {
"_id": "$service",
"error_count": {"$sum": 1},
"latest_error": {"$last": "$timestamp"}
}}
]
cursor = db.logs.aggregate(pipeline)
results = await cursor.to_list(length=100)
return results
# 启动时创建索引
@app.on_event("startup")
async def create_indexes():
db = await get_db()
await db.logs.create_index([("timestamp", 1)], name="timestamp_asc")
await db.logs.create_index(
[("service", 1), ("level", 1)],
name="service_level_compound"
)
@app.post("/logs/bulk")
async def bulk_insert(logs: list[LogEntry], db=Depends(get_db)):
"""批量插入日志提升写入性能"""
documents = [log.dict() for log in logs]
result = await db.logs.insert_many(documents)
return {"inserted_count": len(result.inserted_ids)}
@app.get("/logs")
async def query_logs(
page: int = 1,
page_size: int = 50,
db=Depends(get_db)
):
"""带分页的日志查询接口"""
skip = (page - 1) * page_size
cursor = db.logs.find().sort("timestamp", -1).skip(skip).limit(page_size)
return await cursor.to_list(length=page_size)
现象:请求体字段类型不匹配
解决方案:
from fastapi.exceptions import RequestValidationError
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
return JSONResponse(
status_code=400,
content={"detail": "请求数据格式错误"}
)
现象:ServerSelectionTimeoutError
排查步骤:
systemctl status mongod
问题1:当使用$match
进行时间范围查询时,如何确保查询性能?
A) 使用内存缓存
B) 在timestamp字段创建索引
C) 增加数据库连接池
正确答案:B
解析:创建索引可以显著提升字段的查询效率,特别是对时间戳这种常用于范围查询的字段
问题2:在批量插入日志时,如何保证数据完整性?
A) 使用事务操作
B) 启用写入确认机制
C) 增加重试逻辑
正确答案:B
解析:MongoDB的写入确认(write concern)机制可以确保数据成功写入磁盘
db.currentOp()
的输出# 创建7天过期的TTL索引
await db.logs.create_index(
[("timestamp", 1)],
name="logs_ttl",
expireAfterSeconds=604800 # 7天
)
通过本文的完整实现方案,开发者可以快速构建日均千万级日志处理系统。实际部署时建议配合Prometheus进行性能监控,并使用Grafana实现可视化看板。
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:异步日志监控:FastAPI与MongoDB的高效整合之道 | cmdragon's Blog
参与评论
手机查看
返回顶部