如何使用Python操作Kafka
1、安装kafka
pip install kafka
2、直接编写Kafka工具脚本
kafka_tools.py
# -*- coding: utf-8 -*-import json
from kafka import KafkaConsumer, KafkaProducerclass KProducer:def __init__(self, username_ip, topic):"""kafka 生产者:param bootstrap_servers: 地址:param topic: topic"""bootstrap_servers = Config.get('kafka:{}:bootstrap_servers'.format(username_ip))self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda m: json.dumps(m).encode('ascii'), ) # json 格式化发送的内容self.topic = topicdef sync_producer(self, data_li):"""同步发送 数据:param data_li: 发送数据:return:"""for data in data_li:future = self.producer.send(self.topic, data)record_metadata = future.get(timeout=10) # 同步确认消费partition = record_metadata.partition # 数据所在的分区offset = record_metadata.offset # 数据所在分区的位置print('save success, partition: {}, offset: {}'.format(partition, offset))def asyn_producer(self, data_li):"""我目前优先使用的是此方案异步发送数据:param data_li:发送数据:return:"""for data in data_li:self.producer.send(self.topic, data)self.producer.flush() # 批量提交# self.producer.close() # 提交完成关闭 不需要关闭注释此行def asyn_producer_callback(self, data_li):"""异步发送数据 + 发送状态处理:param data_li:发送数据:return:"""for data in data_li:self.producer.send(self.topic, data).add_callback(self.send_success).add_errback(self.send_error)self.producer.flush() # 批量提交def send_success(self, *args, **kwargs):"""异步发送成功回调函数"""print('save success')returndef send_error(self, *args, **kwargs):"""异步发送错误回调函数"""print('save error')returndef close_producer(self):try:self.producer.close()except:passclass KConsumer:def __init__(self, username_ip, topic, offset_reset):bootstrap_servers = Config.get('kafka:{}:bootstrap_servers'.format(username_ip))self.bootstrap_servers = bootstrap_serversself.offset_reset = offset_resetself.topic = topicdef read_kafka(self):consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers,auto_offset_reset=self.offset_reset)return consumer
3、使用此工具
demo_kafka.py
# -*- coding: utf-8 -*-
from kafka_tools import KProducer, KConsumerdef data_read_kafka(username_ip, topic, offset_reset):"""读取卡夫卡数据username_ip: 你的kafka的地址topic:你的topicoffset_reset:消费方式 LATEST 获取当前偏移量最新消息 EARLIEST 从头开始获取信息"""consumer = KConsumer(username_ip, topic, offset_reset).read_kafka()for message in consumer:data_info = message.value.decode('utf-8')print(data_info)def send_kafka(username_ip, topic, data_list):"""向kafka写入数据username_ip: 你的kafka的地址topic:你的topicdata_list: 需要发送的数据列表"""KProducer(username_ip, topic).asyn_producer(data_list)