分布式消息服务Kafka

Python

2024-05-09 02:50:33

环境安装

1.         安装Python。(Python版本为2.7或3.X。)

2.         安装依赖库。(使用公网连接需要安装confluent-kafka 1.9.2及以下版本的依赖库)

 


pip install     confluent-kafka==1.9.2


3.         下载Demo包kafka-confluent-python-demo.zip

配置修改

1.         如果是ssl连接,需要在控制台下载证书。并且解压压缩包得到ssl.client.truststore.jks,执行以下命令生成caRoot.pem文件。

 


keytool     -importkeystore -srckeystore ssl.client.truststore.jks -destkeystore     caRoot.p12 -deststoretype pkcs12

openssl pkcs12 -in     caRoot.p12 -out caRoot.pem


2.         修改setting.py文件。(ca_location仅在ssl连接时需要配置)

 


kafka_setting = {

    'bootstrap_servers': 'XXX',

    'topic_name': 'XXX',

    'group_name': 'XXX'

}


生产消息

发送以下命令发送消息。

 


python     kafka_producer.py


生产消息示例代码如下:

 


from confluent_kafka     import Producer

import setting

conf =     setting.kafka_setting

"""初始化一个 Producer 对象"""

p =     Producer({'bootstrap.servers': conf['bootstrap_servers']})

def     delivery_report(err, msg):

    """ Called once for each     message produced to indicate delivery result.

        Triggered by poll() or flush().     """

    if err is not None:

        print('Message delivery failed:     {}'.format(err))

    else:

        print('Message delivered to {}     [{}]'.format(msg.topic(), msg.partition()))

"""异步发送消息"""

p.produce(conf['topic_name'],     "Hello".encode('utf-8'), callback=delivery_report)

p.poll(0)

"""在程序结束时,调用flush"""

p.flush()


消费消息

发送以下命令消费消息。

 


python     kafka_consumer.py


消费消息示例代码如下:

 


from confluent_kafka     import Consumer, KafkaError

import setting

conf =     setting.kafka_setting

c = Consumer({

    'bootstrap.servers':     conf['bootstrap_servers'],

    'group.id': conf['group_name'],

    'auto.offset.reset': 'latest'

})

c.subscribe([conf['topic_name']])

while True:

    msg = c.poll(1.0)

    if msg is None:

        continue

    if msg.error():

                if msg.error().code() == KafkaError._PARTITION_EOF:

                     continue

                else:

             print("Consumer     error: {}".format(msg.error()))

             continue

    print('Received message:     {}'.format(msg.value().decode('utf-8')))

c.close()



mvYn60ugnAob