温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

python如何实现对kafka的基本操作

发布时间:2021-10-18 15:25:40 来源:亿速云 阅读:219 作者:小新 栏目:编程语言

这篇文章主要为大家展示了“python如何实现对kafka的基本操作”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“python如何实现对kafka的基本操作”这篇文章吧。

-- coding:utf-8 --

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic

"""生产者"""
def produce(self):
    producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
    for i in range(4):
        msg = "msg%d" %i
        producer.send(self.topic,key=str(i),value=msg)
    producer.close()

"""一个消费者消费一个topic"""
def consume(self):
    #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)
    consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)
    print consumer.partitions_for_topic(self.topic)  #获取test主题的分区信息
print consumer.topics()  #获取主题列表
print consumer.subscription()  #获取当前消费者订阅的主题
print consumer.assignment()  #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #重置偏移量,从第1个偏移量消费
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" 
        % (message.topic,message.partition,message.offset, message.key,message.value))

"""一个消费者订阅多个topic """
def consume2(self):
    consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                      message.offset, message.key,
                                      message.value))
"""消费者(手动拉取消息)"""
def consume3(self):
    consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:
        message = consumer.poll(timeout_ms=5)   #从kafka获取消息
        if message:
        print message
        time.sleep(1)

def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()

以上是“python如何实现对kafka的基本操作”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI