python语言,一般适用于快速实现业务需求的用途,在大型架构方面其应用范围并没有Java、Golang以及C++那么丰富,因此相对来讲还没有形成非常统一的技术体系。在某些需求中可能需要开发多个服务,服务之间需要实现异步通信,甚至是调用对方的函数。celery就是一个典型的例子,它提供了以将python函数注册到消息队列的方式暴露服务的方法,并且支持RabbitMQ、Redis等多种Broker中间代理形式。
celery同时也支持多个python的web框架,在其介绍页中,也明确说明了Tornado、Django、web2py等框架都有自己的celery实现方法,当然要知道其他框架的实践,可以上github上探查一番。
celery简便易用,本文笔者以自己整理的start-fastapi为例,讲述接入celery的方法。
首先克隆start-fastapi,是笔者稍微加料的一个fastapi版本,主要在fastapi原本基础上对配置以及目录进行了整理,形成了一个开箱即用的Web后端框架。以Redis作为broker为例,我们首先需要pip install celery[redis]
,然后可以在service.celery
中新建一个app.py
文件,作为celery的app实例。
1 | from celery import Celery |
对于整个消息队列,broker代理的信息缓存在DB0中,而backend存储每一个消息trace info,缓存在DB1中。
之后在service.celery
新建worker.py
文件,里面import app后,注册待暴露的函数。这里的例子是通过item_id延迟3s获取item的price,get_item
方法获取了item basemodel的实例,通过这个实例可以拿到price
1 | from .app import celery_app |
为了让这个函数能暴露出去,我们需要应用这个worker文件的内容。进入到项目根目录,如果用venv的话,先source ./venv/bin/activate
,然后执行celery -A service.celery.worker worker -l info
,就能够启动celery的worker,并且可以看到tasks中有了service.celery.worker.get_item_price
通过ps -ef | grep python
,我们看到如下信息,可以看到fastapi应用跟celery worker处在不同的进程
1 | root 786 1 0 20:01 ? 00:00:00 /usr/bin/python3 /usr/bin/networkd-dispatcher --run-startup-triggers |
于是乎,我们增加一个controller,用来测试celery worker是否生效。controller也引用service.celery.app
,通过send_task
执行service.celery.worker.get_item_price
任务,传参为1,然后再打印返回值。
1 |
|
我们通过curl http://127.0.0.1:8000/v1/test/celery
来测试这个接口。结果也很显然,约3s之后,fastapi端打印了返回值到stdout中,然后curl接口返回了success。
例子已经上传到fastapi-celery-test上,基本操作就系介样,后面再慢慢探索把~