Preparing the Environment
1. Install Python 2.7 or 3.X.
2. Install the dependency library. (Install confluent-kafka 1.9.2 or later versions for connection over a public network)
pip install confluent-kafka==1.9.2
3. Download the demo package kafka-confluent-python-demo.zip.
Modifying Configurations
1. If it is an SSL connection, download the certificate from the console. Unzip the compressed package to get ssl.client.truststore.jks, and execute the following command to generate the caRoot.pem file.
keytool -importkeystore -srckeystore ssl.client.truststore.jks -destkeystore caRoot.p12 -deststoretype pkcs12
openssl pkcs12 -in caRoot.p12 -out caRoot.pem
2. Modify the setting.py file. (configure ca_location only for SSL connections)
kafka_setting = {
'bootstrap_servers': 'XXX',
'topic_name': 'XXX',
'group_name': 'XXX'
}
Producing Messages
Send the following command to produce messages.
python kafka_producer.py
Examples:
from confluent_kafka import Producer
import setting
conf = setting.kafka_setting
"""Initializing a Producer Object"""
p = Producer({'bootstrap.servers': conf['bootstrap_servers']})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
"""Sending Messages Asynchronously"""
p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)
"""At the end of the program, call flush"""
p.flush()
Consuming Messages
Send the following command to consume messages.
python kafka_consumer.py
Examples:
from confluent_kafka import Consumer, KafkaError
import setting
conf = setting.kafka_setting
c = Consumer({
'bootstrap.servers': conf['bootstrap_servers'],
'group.id': conf['group_name'],
'auto.offset.reset': 'latest'
})
c.subscribe([conf['topic_name']])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()