扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
Celery基础概念
Celery架构由三部分组成:客户端(发送任务)、消息代理(存储任务队列)、工作者(执行任务)。典型的消息代理选择包括Redis(支持6379端口)和RabbitMQ(默认使用5672端口)。任务结果存储建议使用Redis或关系型数据库,需在配置中明确指定backend参数。
FastAPI与Celery集成步骤
pip install fastapi==0.103.2 celery==5.3.4 redis==4.5.5 uvicorn==0.23.2 pydantic==2.5.2
项目结构:
project/
├── main.py
├── celery_app.py
└── tasks.py
核心代码示例:
# celery_app.py
from celery import Celery
celery_app = Celery(
'worker',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['project.tasks']
)
celery_app.conf.update(task_track_started=True)
# tasks.py
from .celery_app import celery_app
@celery_app.task
def process_data(data: dict) -> dict:
"""模拟耗时数据处理任务"""
import time
time.sleep(5)
return {"result": f"Processed {data['input']}"}
# main.py
from fastapi import FastAPI
from pydantic import BaseModel
from .tasks import process_data
app = FastAPI()
class RequestData(BaseModel):
input: str
priority: int = 1
@app.post("/submit-task")
async def submit_task(data: RequestData):
"""提交异步任务接口"""
task = process_data.apply_async(
kwargs={"data": data.dict()},
priority=data.priority
)
return {"task_id": task.id}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
"""任务状态查询接口"""
from celery.result import AsyncResult
result = AsyncResult(task_id, app=celery_app)
return {
"status": result.status,
"result": result.result if result.ready() else None
}
Q2:如何处理长时间运行的任务超时问题?
A2:在任务装饰器中设置soft_time_limit参数,例如@task(soft_time_limit=300),同时配置worker的--maxtasksperchild参数限制最大任务数。
redis-cli ping
应返回PONG错误现象:任务结果无法获取
解决方案:
检查backend配置是否正确
确认任务执行完成(状态为SUCCESS)
检查Redis存储空间是否充足
任务优先级配置
在启动worker时指定队列:
celery -A project.celery_app worker -Q high_priority,default -l INFO
接口调用时指定优先级:
process_data.apply_async(
kwargs={"data": data},
queue='high_priority' if data.priority > 5 else 'default'
)
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何让FastAPI与Celery完美联姻,打造高效异步任务处理系统?
参与评论
手机查看
返回顶部