Skip to main content

Python Celery

Python Celery + Redis

安装

pip install -U celery[redis]  

配置redis参考

配置所使用的redis服务器

redis://:password@hostname:port/db_number  
Example:  
BROKER_URL = 'redis://localhost:6379/0'  

配置redis的可见性超时

BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600}  # 1 hour.  

redis中存储状态和返回值

CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  

只让消息对活动的主机接受

BROKER_TRANSPORT_OPTIONS = {'fanout_prefix': True}  

创建一个task.py

新建一个文件,比如叫task.py。来建立我们的hello world。

from celery import Celery  
# 声明任务列表的来源
app = Celery('tasks',broker='redis://localhost:6379/0')  
# 定义任务
@app.task
def say_hello(name, words):  
    return '%s says %s'%(words, name)

运行一个task

运行task服务,需要指定服务的源文件

celery -A 文件名,不要后缀 worker  --loglevel=info -f 日志文件路径  
celery -A task worker  --loglevel=info  
celery -A XcTask.MailTask worker --loglevel=info -f /Volumes/data/Python/chichit/project/log/celery.log  
 celery -A XcTask.MailTask:app_celery worker --loglevel=info -f /Volumes/data/Python/chichit/project/log/celery.log

停止一个celery

停止Celery

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9  

在命令行中调用

在Python命令行中执行

Python 3.4.3 (default, Jul 31 2015, 14:07:15)  
[GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin
>>> sys.path.extend(['/Volumes/data/Python/chichit/project/src'])
>>> from XcTask.MailTask import send_ValidateMail
>>> send_ValidateMail.delay('jesse@chichit.com','Ryszard Xiao','https://www.xiaocan.me',33)
<AsyncResult: 883f0b78-fc69-418c-9d9d-04297af152a7>  

在程序中调用

from XcTask.MailTask import send_ResetMail,send_ValidateMail  
cRet = send_ValidateMail.delay(n.ml_email, n.ml_uname, n.ml_link, n.ml_uid);