python—Celery异步分布式

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介:

一、Celery异步分布式

Celery  是一个python开发的异步分布式任务调度模块,是一个消息传输的中间件,可以理解为一个邮箱,每当应用程序调用celery的异步任务时,会向broker传递消息,然后celery的worker从中取消息

Celery  用于存储消息以及celery执行的一些消息和结果


对于brokers,官方推荐是rabbitmq和redis

对于backend,也就是指数据库,为了简单一般使用redis


clipboard.png


使用redis连接url格式:

redis://:password@hostname:port/db_number


1)定义连接脚本tasks.py


1
2
3
4
5
6
7
8
9
#!/usr/bin/env python
from  celery  import  Celery
broker  =  "redis://192.168.2.230:6379/1"
backend  =  "redis://192.168.2.230:6379/2"
app  =  Celery( "tasks" , broker = broker, backend = backend)
 
@app .task
def  add(x,y):
     return  x + y


2)安装启动celery

pip install celery

pip install redis

启动方式:celery -A huang tasks -l info  #-l 等同于 --loglevel

1.png


3)执行测试 huang.py 

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python
from  tasks  import  add
 
re  =  add.delay( 10 , 20 )
 
print (re.result)    #任务返回值
print (re.ready)      #如果任务被执行返回True,其他情况返回False
 
print (re.get(timeout = 1 ))   #带参数的等待,最后返回结果
print (re.status)   #任务当前状态

运行结果:

30

<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>

30

SUCCESS


4)根据成功返回的key或celery界面输出的信息,查看redis存储

blob.png


说明:停止celery服务,执行完huang.py之后,再启动celery服务也是有保存数据的



二、celery多进程

1.png

1)配置文件 celeryconfig.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  kombu  import  Exchange,Queue
 
BROKER_URL  =  "redis://192.168.2.230:6379/3"
CELERY_RESULT_BACKEND  =  "redis://192.168.2.230:6379/4"
 
CELERY_QUEUES  =  (
Queue( "default" ,Exchange( "default" ),routing_key = "default" ),
Queue( "for_task_A" ,Exchange( "for_task_A" ),routing_key = "for_task_A" ),
Queue( "for_task_B" ,Exchange( "for_task_B" ),routing_key = "for_task_B" )
)
 
CELERY_ROUTES  =  {
'tasks.taskA' :{ "queue" : "for_task_A" , "routing_key" : "for_task_A" },
'tasks.taskB' :{ "queue" : "for_task_B" , "routing_key" : "for_task_B" }
}


2)tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  celery  import  Celery
 
app  =  Celery()
app.config_from_object( "celeryconfig" )
 
@app .task
     def  taskA(x,y):
     return  x + y
     
@app .task
     def  taskB(x,y,z):
     return  x + y + z


3)启动celery

celery -A tasks worker --loglevel info


4)执行脚本huang2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  tasks  import  taskA,taskB
 
re  =  taskA.delay( 10 , 20 )
 
print (re.result)    #任务返回值
print (re.ready)      #如果任务被执行返回True,其他情况返回False
print (re.get(timeout = 1 ))   #带参数的等待,最后返回结果
print (re.status)   #任务当前状态
 
re2  =  taskB.delay( 10 , 20 , 30 )
print (re2.result)
print (re2.ready)
print (re2.get(timeout = 1 ))
print (re2.status)


5)运行结果

None

<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>

30

SUCCESS

None

<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>

60

SUCCESS











本文转自 huangzp168 51CTO博客,原文链接:http://blog.51cto.com/huangzp/2052713,如需转载请自行联系原作者
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
22天前
|
算法 数据处理 Python
Python并发编程:解密异步IO与多线程
本文将深入探讨Python中的并发编程技术,重点介绍异步IO和多线程两种常见的并发模型。通过对比它们的特点、适用场景和实现方式,帮助读者更好地理解并发编程的核心概念,并掌握在不同场景下选择合适的并发模型的方法。
|
11天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
1月前
|
数据采集 数据挖掘 调度
异步爬虫实践攻略:利用Python Aiohttp框架实现高效数据抓取
本文介绍了如何使用Python的Aiohttp框架构建异步爬虫,以提升数据抓取效率。异步爬虫利用异步IO和协程技术,在等待响应时执行其他任务,提高效率。Aiohttp是一个高效的异步HTTP客户端/服务器框架,适合构建此类爬虫。文中还展示了如何通过代理访问HTTPS网页的示例代码,并以爬取微信公众号文章为例,说明了实际应用中的步骤。
|
1月前
|
API 数据库 Python
python中的异步操作
在Python中,`async`和`await`是用于构建异步程序的关键字。它们是Python 3.5版本引入的,使得异步编程变得更加直观和容易理解。这两个关键字一起使用,可以帮助提高应用程序的性能,特别是在涉及到I/O操作(如网络请求、文件读写等)时,可以显著提升效率。【2月更文挑战第15天】
22 5
|
1月前
|
调度 数据库 Python
Python中的并发编程:使用asyncio库实现异步IO
传统的Python程序在面对IO密集型任务时,往往会遇到性能瓶颈。本文将介绍如何利用Python中的asyncio库,通过异步IO的方式来提升程序的效率和性能,让你的Python程序能够更好地处理并发任务。
|
1月前
|
消息中间件 监控 NoSQL
在Windows下设置分布式队列Celery的心跳轮询
在Windows下设置分布式队列Celery的心跳轮询
28 0
|
1月前
|
消息中间件 监控 NoSQL
一文读懂python分布式任务队列-celery
celery是一个简单,灵活、可靠的分布式任务执行框架,可以支持大量任务的并发执行。celery采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行【2月更文挑战第11天】
84 5
|
2月前
|
Python
Python 列表解析式竟然支持异步?
Python 列表解析式竟然支持异步?
31 1
|
2月前
|
安全 调度 Python
什么是Python中的事件驱动编程?如何使用`asyncio`模块实现异步事件处理?
【2月更文挑战第4天】【2月更文挑战第9篇】什么是Python中的事件驱动编程?如何使用`asyncio`模块实现异步事件处理?
|
2月前
|
程序员 调度 云计算
Python并发编程的未来趋势:协程、异步IO与多进程的融合
Python并发编程的未来趋势:协程、异步IO与多进程的融合