Python Celery + Redis
安装
pip install -U celery[redis]
配置redis参考
配置所使用的redis服务器
redis://:password@hostname:port/db_number
Example:
BROKER_URL = 'redis://localhost:6379/0'
配置redis的可见性超时
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600} # 1 hour.
redis中存储状态和返回值
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
只让消息对活动的主机接受
BROKER_TRANSPORT_OPTIONS = {'fanout_prefix': True}
创建一个task.py
新建一个文件,比如叫task.py。来建立我们的hello world。
from celery import Celery
# 声明任务列表的来源
app = Celery('tasks',broker='redis://localhost:6379/0')
# 定义任务
@app.task
def say_hello(name, words):
return '%s says %s'%(words, name)
运行一个task
运行task服务,需要指定服务的源文件
celery -A 文件名,不要后缀 worker --loglevel=info -f 日志文件路径
celery -A task worker --loglevel=info
celery -A XcTask.MailTask worker --loglevel=info -f /Volumes/data/Python/chichit/project/log/celery.log
celery -A XcTask.MailTask:app_celery worker --loglevel=info -f /Volumes/data/Python/chichit/project/log/celery.log
停止一个celery
停止Celery
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
在命令行中调用
在Python命令行中执行
Python 3.4.3 (default, Jul 31 2015, 14:07:15)
[GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin
>>> sys.path.extend(['/Volumes/data/Python/chichit/project/src'])
>>> from XcTask.MailTask import send_ValidateMail
>>> send_ValidateMail.delay('jesse@chichit.com','Ryszard Xiao','https://www.xiaocan.me',33)
<AsyncResult: 883f0b78-fc69-418c-9d9d-04297af152a7>
在程序中调用
from XcTask.MailTask import send_ResetMail,send_ValidateMail
cRet = send_ValidateMail.delay(n.ml_email, n.ml_uname, n.ml_link, n.ml_uid);