美文网首页
使用pykafka库测试kafka-180504-[github

使用pykafka库测试kafka-180504-[github

作者: Lacia | 来源:发表于2020-04-29 16:19 被阅读0次

使用pykafka库测试kafka

生产者生产数据
from pykafka import KafkaClient

client = KafkaClient(hosts ="10.10.30.66:9092") #可接受多个client
#查看所有的topic
client.topics
print(client.topics)

#选择一个topic
topic = client.topics[b'thresholdAlarm']
message ='{"alarmOid":"1.1.1.4","deviceId":"d92c66617162477da0137e139712e665","deviceName":"乐平水表001","deviceType":"edcd7a32-a744-41db-af2e-b5e95cf249b4","org":"1"}'

producer=topic.get_producer()
producer.produce(bytes(message, encoding = "utf8"))
#关闭链接,否则报错
producer.stop()   

在kafka目录下执行./kafka-console-consumer.sh --zookeeper 10.10.30.66:2181 --topic normalAlarm --from-beginning,即可看见刚刚发出的消息。

消费数据
from pykafka import KafkaClient

client = KafkaClient(hosts ="10.10.30.66:9092")
#查看所有的topic
#client.topics
print(client.topics)

topic = client.topics[b'thresholdAlarm']

consumer = topic.get_simple_consumer(consumer_group=b'test', auto_commit_enable=True, auto_commit_interval_ms=1, consumer_id=b'test')  
for message in consumer:  
    if message is not None:  
        print(message.offset, message.value)

print("+++++++++++++++++++++++++++++")

相关文章

网友评论

      本文标题:使用pykafka库测试kafka-180504-[github

      本文链接:https://www.haomeiwen.com/subject/upknwhtx.html