Distributed Message Service (Kafka)

Python

2024-05-09 09:27:50

Preparing the Environment

1.         Install Python 2.7 or 3.X.

2.         Install the dependency library. (Install confluent-kafka 1.9.2 or later versions for connection over a public network)


pip install confluent-kafka==1.9.2 


3.         Download the demo package kafka-confluent-python-demo.zip.

Modifying Configurations

1.         If it is an SSL connection, download the certificate from the console. Unzip the compressed package to get ssl.client.truststore.jks, and execute the following command to generate the caRoot.pem file.

 

keytool -importkeystore -srckeystore ssl.client.truststore.jks -destkeystore caRoot.p12 -deststoretype pkcs12

openssl pkcs12 -in caRoot.p12 -out caRoot.pem


2.         Modify the setting.py file. (configure ca_location only for SSL connections)

 

kafka_setting = {

    'bootstrap_servers': 'XXX',

    'topic_name': 'XXX',

    'group_name': 'XXX'

}


Producing Messages

Send the following command to produce messages.


python kafka_producer.py



Examples:

 

from confluent_kafka import Producer

import setting

conf = setting.kafka_setting

"""Initializing a Producer Object"""

p = Producer({'bootstrap.servers': conf['bootstrap_servers']})

def delivery_report(err, msg):

    """ Called once for each message produced to indicate delivery result.

        Triggered by poll() or flush(). """

    if err is not None:

        print('Message delivery failed: {}'.format(err))

    else:

        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

"""Sending Messages Asynchronously"""

p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)

p.poll(0)

"""At the end of the program, call flush"""

p.flush()




Consuming Messages

Send the following command to consume messages.


python kafka_consumer.py

 


Examples:

 

from confluent_kafka import Consumer, KafkaError

import setting

conf = setting.kafka_setting

c = Consumer({

    'bootstrap.servers': conf['bootstrap_servers'],

    'group.id': conf['group_name'],

    'auto.offset.reset': 'latest'

})

c.subscribe([conf['topic_name']])

while True:

    msg = c.poll(1.0)

    if msg is None:

        continue

    if msg.error():

            if msg.error().code() == KafkaError._PARTITION_EOF:

                     continue

            else:

             print("Consumer error: {}".format(msg.error()))

             continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()




dLq__.Mlom_G