环境安装
1. 安装Python。(Python版本为2.7或3.X。)
2. 安装依赖库。(使用公网连接需要安装confluent-kafka 1.9.2及以下版本的依赖库)
pip install confluent-kafka==1.9.2 |
3. 下载Demo包kafka-confluent-python-demo.zip。
配置修改
1. 如果是ssl连接,需要在控制台下载证书。并且解压压缩包得到ssl.client.truststore.jks,执行以下命令生成caRoot.pem文件。
keytool -importkeystore -srckeystore ssl.client.truststore.jks -destkeystore caRoot.p12 -deststoretype pkcs12 openssl pkcs12 -in caRoot.p12 -out caRoot.pem |
2. 修改setting.py文件。(ca_location仅在ssl连接时需要配置)
kafka_setting = { 'bootstrap_servers': 'XXX', 'topic_name': 'XXX', 'group_name': 'XXX' } |
生产消息
发送以下命令发送消息。
python kafka_producer.py |
生产消息示例代码如下:
from confluent_kafka import Producer import setting conf = setting.kafka_setting """初始化一个 Producer 对象""" p = Producer({'bootstrap.servers': conf['bootstrap_servers']}) def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) """异步发送消息""" p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report) p.poll(0) """在程序结束时,调用flush""" p.flush() |
消费消息
发送以下命令消费消息。
python kafka_consumer.py |
消费消息示例代码如下:
from confluent_kafka import Consumer, KafkaError import setting conf = setting.kafka_setting c = Consumer({ 'bootstrap.servers': conf['bootstrap_servers'], 'group.id': conf['group_name'], 'auto.offset.reset': 'latest' }) c.subscribe([conf['topic_name']]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) c.close() |