Python 中的 Kombu 类库

Kombu 是一个用于 Python 的消息队列库,提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一,但也可以单独使用。Kombu 支持多种消息代理(如 RabbitMQ、Redis、Amazon SQS 等),并提供了消息生产者和消费者的功能。安装命令 pip install kombu redis

一.主要功能

1.消息队列

提供可靠的消息传递和队列机制,允许将消息从生产者发送到消费者。

2.消息代理支持

支持多种消息代理,如 RabbitMQ、Redis、Amazon SQS、MongoDB 等。

3.异步任务

可以用来实现异步任务处理,配合 Celery 使用时,可以构建分布式任务队列。

4.消息格式

支持多种消息格式,包括 JSON、YAML、pickle 等。

5.路由和交换

提供了高级的消息路由和交换功能,可以实现复杂的消息分发逻辑。

二.基本使用

1. 创建消息生产者

生产者负责向消息队列发送消息。

(1)Redis 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({'key': 'value'},exchange=exchange,routing_key='my_key',serializer='json')print("Message sent.")

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({'key': 'value'},exchange=exchange,routing_key='my_key',serializer='json')print("Message sent.")

2. 创建消息消费者

消费者从消息队列中接收和处理消息。

(1)Redis 消息代理

from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'def callback(body, message):print(f"Received message: {body}")message.ack()  # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建消费者with Consumer(conn, [queue], callback=callback) as consumer:print("Waiting for messages...")# 运行消费者,等待消息while True:conn.drain_events()

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'def callback(body, message):print(f"Received message: {body}")message.ack()  # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建消费者with Consumer(conn, [queue], callback=callback) as consumer:print("Waiting for messages...")# 运行消费者,等待消息while True:conn.drain_events()

3. 高级用法:消息路由

Kombu 支持复杂的消息路由配置,以下示例展示了如何使用路由功能将消息发送到不同的队列。

(1)Redis 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')def route_message(message):if message['type'] == 'type1':return 'key1'return 'key2'# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({'type': 'type1', 'data': 'value1'},exchange=exchange,routing_key=route_message({'type': 'type1'}),serializer='json')print("Message routed and sent.")

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')def route_message(message):if message['type'] == 'type1':return 'key1'return 'key2'# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({'type': 'type1', 'data': 'value1'},exchange=exchange,routing_key=route_message({'type': 'type1'}),serializer='json')print("Message routed and sent.")

4. 结合 Celery 使用

Kombu 通常与 Celery 一起使用来处理异步任务。简单理解,Kombu 是 Celery 的依赖库,Celery 需要 Kombu 来访问消息队列系统。同时 Celery 扩展了 Kombu 的功能,提供了一个高级的任务队列系统。Celery 使用 Kombu 来处理与消息代理之间的连接、消息发送、消息接收等操作。

(1)Redis 消息代理

from celery import Celery# 配置 Celery 使用 Redis 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y

在 Dify 中默认消息代理使用 Redis,如下所示:

(2)RabbitMQ 消息代理

from celery import Celery# 配置 Celery 使用 RabbitMQ 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='amqp://guest:guest@localhost//')@app.task
def add(x, y):return x + y

Kombu 是一个强大的消息传递库,提供了多种消息代理的支持,并能实现复杂的消息队列和路由功能。它支持多种消息格式和高级功能,如交换机、队列、路由等。基础用法 包括创建生产者和消费者,通过消息代理发送和接收消息。高级用法 包括消息路由、与 Celery 集成等,用于构建分布式系统和异步任务处理。

参考文献

[1] https://github.com/celery/kombu

[2] https://docs.celeryq.dev/projects/kombu/en/stable/

[3] 消息队列 Kombu 之 基本架构:https://www.cnblogs.com/rossiXYZ/p/14454761.html

[4] Kombu 库用法详解(连接、连接池、生产者、消费者):https://blog.csdn.net/weixin_44799217/article/details/128490325

NLP工程化(星球号)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/145453.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

计算机的错误计算(九十九)

摘要 讨论 的计算精度问题。 计算机的错误计算(五十五)、(七十八)以及(九十六)分别列出了 IEEE 754-2019[1]中的一些函数与运算。下面再截图给出其另外几个运算。 另外,计算机的错误计算&…

风力发电机各部位边缘识别检测数据集 yolo数据集 共7300张

风力发电机各部位边缘识别检测数据集 yolo数据集 共7300张 风力发电机各部位边缘识别检测数据集 数据集描述 该数据集是一个专门用于风力发电机各部位边缘识别和检测的数据集,旨在帮助研究人员和开发者训练和评估基于深度学习的目标检测模型。数据集中的图像涵盖了…

卡牌抽卡机小程序:市场发展下的创新

今年以来,卡牌成为了行业中的黑马,在国内迅速流行,成为消费者的心头好。小小的卡牌创下了百亿的市场规模,发展前景巨大! 不过,随着卡牌市场的不断增长,市场发展也需要进行创新。线上抽卡机小程…

系统架构设计师|数据库基础-006

📫 作者简介:「六月暴雪飞梨花」,专注于研究Java,就职于科技型公司后端工程师 🏆 近期荣誉:华为云云享专家、阿里云专家博主、 腾讯云优秀创作者、腾讯云TDP-KOL、墨天轮技术专家博主、ACDU成员 &#x1f3…

Linux top命令详解与重点内容说明

文章目录 重点说明基本信息进程(任务)信息cpu占用信息%Cpu(s)内存信息交换内存信息每列含义说明交互命令多窗口模式颜色配置命令参数 重点说明 top命令非常强大,也非常复杂,很难面面俱到,也没有必要,这篇文章的目的是介绍重点&am…

面试题给图例举测试用例或测试点

目录 从功能测试的角度考虑: 从性能角度考虑: 从兼容性的角度考虑: 从自动化角度考虑: 从安全性角度考虑: 用户体验的角度测试: 面试通常会有技术和人事两种,侧重点不一样。 今天聊一下测…

前端univer创建、编辑excel

前端univer创建、编辑excel 源码在线demo:https://codesandbox.io/p/sandbox/univer-q87kqg?file/src/Demo.jsx univer官网地址:https://univer.ai/zh-CN/guides/sheet/introduction 安装univer npm install univerjs/core univerjs/design univerjs…

音频北斗定位系统有什么用?

在当今科技飞速发展的时代,定位技术已经成为我们日常生活和各行各业不可或缺的一部分。其中,音频北斗定位系统作为一种新兴的定位技术,正逐渐展现出其独特的优势和应用价值。那么,到底音频北斗定位系统有什么用呢?我们一起来了解…

计算机网络nat 映射案列

1 拓扑案列 2 配置 pc 访问外网 # interface LoopBack192 ip address 192.168.1.1 255.255.255.0 # interface Vlan-interface1 ip address 10.1.1.1 255.255.255.0 # # ip route-static 0.0.0.0 0 10.1.1.2 # local-user admin class manage password hash $h$6$0XD4lC…

微信小程序. tarojs webView的 onload 事件不触发

功能需求:想再webView加载成功后做一些逻辑操作。使用onLoad事件 现象:在taro里面webView的onload。onError 事件不触发了 版本:taro 3.6版本 分析:刚开始想着可能是版本,然后用另外一个项目(taro 3.4版…

Docker笔记-Docker Dockerfile

Docker笔记-Docker Dockerfile Dockerfile 是一个用来构建镜像的文本文件,文本内容包含了一条条构建镜像所需的指令和说明。 这里讲解如何运行 Dockerfile 文件来定制一个镜像。 DockerFile构建过程解析: 1、每条保留字指令都必须为大写字母且后面要…

元学习的简单示例

代码功能 模型结构:SimpleModel是一个简单的两层全连接神经网络。 元学习过程:在maml_train函数中,每个任务由支持集和查询集组成。模型先在支持集上进行训练,然后在查询集上进行评估,更新元模型参数。 任务生成&…

C#自定义曲线绘图面板

一、实现功能 1、显示面板绘制。 2、拖动面板,X轴、Y轴都可以拖动。 3、显示面板缩放,放大或者缩小。 4、鼠标在面板中对应的XY轴数值。 5、自动生成的数据数组,曲线显示。 6、鼠标是否在曲线上检测。 二、界面 拖动面板 鼠标在曲线上…

Linux —— 多线程

一、本篇重点 1.了解线程概念,理解线程与进程区别与联系 2.理解和学会线程控制相关的接口和操作 3.了解线程分离与线程安全的概念 4.学会线程同步。 5.学会互斥量,条件变量,posix信号量,以及读写锁 6.理解基于读写锁的读者写…

《JKTECH柔性振动盘:原理与多行业应用》东莞市江坤自动化科技有限公司

一、柔性振动盘的原理 柔性振动盘是一种新型的自动化上料设备,它采用先进的音圈电机技术和柔性振动技术,实现了对各种不规则形状、微小尺寸、易损伤零部件的高效上料和分拣。 其工作原理主要包括以下几个方面: 1. 音圈电机驱动 柔性振动盘内部…

分布式系统的概念与设计模式

概念 定义:分布式系统是指将数据和计算任务分散到多个独立的计算机上,这些计算机通过网络进行通信和协作,共同对外提供服务。分布式系统不仅提高了系统的可靠性和可扩展性,还增强了系统的并发处理能力和数据管理能力。 特点&…

运维开发之堡垒机(Fortress Machine for Operation and Development)

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:Linux运维老纪的首页…

mysql通过binlog做数据恢复

1 介绍 binlog(二进制日志)在 MySQL 中具有非常重要的作用。它记录了数据库的所有更改操作,主要用于数据恢复、复制和审计等方面。以下是 binlog 的主要作用: 1.数据恢复 binlog 可以用于恢复数据库中的数据。当数据库发生故障时…

分布式框架 - ZooKeeper

一、什么是微服务架构 1、单体架构 顾名思义一个软件系统只部署在一台服务器上。 ​ 在高并发场景中,比如电商项目,单台服务器往往难以支撑短时间内的大量请求,聪明的架构师想出了一个办法提高并发量:一台服务器不够就加一台&am…

微信小程序拨打电话点取消报错“errMsg“:“makePhoneCall:fail cancel“

问题:微信小程序中拨打电话点取消,控制台报错"errMsg":"makePhoneCall:fail cancel" 解决方法:在后面加上catch就可以解决这个报错 wx.makePhoneCall({phoneNumber: 181********}).catch((e) > {console.log(e) //用…