celery简单使用 安装celery 安装celery: pip3 install celery 安装redis 创建tasks.py文件进行验证 from celery import Celery import time app = Celery('TASK', broker='redis://localhost', backend='redis://localhost') @app.task def add(x, y): print("running..add.", x, y) return x + y @app.task Celery 是一个分布式任务队列,它专注于实时操作,但也支持调度。在Python环境中,Celery常用于异步处理和后台任务,以提高应用程序的性能和响应速度。本篇文章将介绍如何简单使用Celery以及在项目中如何进行有效集成。 安装Celery非常简单,只需通过pip3命令进行: ```bash pip3 install celery ``` 接着,为了使用Celery,我们需要安装一个消息中间件,这里选择Redis: ```bash pip3 install redis ``` 安装完成后,创建`tasks.py`文件,这是Celery的任务定义文件。在文件中导入Celery库,并配置连接到Redis作为broker和backend: ```python from celery import Celery import time app = Celery('TASK', broker='redis://localhost', # Redis作为消息代理 backend='redis://localhost') # Redis作为结果存储 @app.task def add(x, y): print("running..add.", x, y) return x + y @app.task def minus(x, y): time.sleep(60) # 假设这是一个耗时操作 print("running..minus.", x, y) return x - y ``` 启动Celery Worker来监听和执行任务,确保你已经在`tasks.py`所在的目录下运行以下命令: ```bash celery -A tasks worker --loglevel=info ``` 在不同终端中,你可以调用这些任务,例如: ```python import tasks t2 = tasks.minus.delay(9, 11) # 发布一个任务 t1 = tasks.add.delay(3, 4) # 发布另一个任务 t1.get() # 获取任务结果,如果任务还未完成,会阻塞等待 ``` Celery 提供了其他命令来管理任务,例如检查任务是否准备就绪: ```python t.ready() # 如果为True,表示任务可以获取结果 t.get(timeout=1) # 如果1秒内任务未完成,就会抛出TimeoutError t.get(propagate=False) # 出错时只打印错误信息,不会引发异常 t.traceback # 获取任务执行时的异常堆栈信息 ``` 在实际项目中,通常需要更复杂的结构来组织Celery。例如,创建一个名为 `celery_pro` 的目录,其中包含`celery.py`和多个任务文件,如`tasks.py`和`tasks2.py`。`celery.py`文件是Celery的主配置文件: ```python # celery.py from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('proj', broker='redis://localhost', backend='redis://localhost', include=['celery_pro.tasks', 'celery_pro.tasks2']) # 其他配置,例如结果过期时间 app.conf.result_expires = 3600 # 添加定时任务 app.conf.beat_schedule = { 'add-every-5-seconds': { 'task': 'celery_pro.tasks.add', 'schedule': 5.0, 'args': (16, 16), } } app.conf.timezone = 'UTC' if __name__ == '__main__': app.start() ``` 任务文件`tasks.py`和`tasks2.py`中,我们定义了具体的任务函数,例如: ```python # tasks.py from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y # tasks2.py from __future__ import absolute_import, unicode_literals from .celery import app import time, random @app.task def randnum(start, end): time.sleep(3) return random.randint(start, end) ``` 这样,你的项目就具备了使用Celery进行异步任务处理和调度的能力。可以根据需求添加更多任务,调整配置,或者集成到现有的Web框架中,例如Django或Flask,以实现更复杂的应用场景。
- 粉丝: 7
- 资源: 888
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
评论0