Best Practices For Kafka in Python

Best Practices For Kafka in Python

Santhosh Thomas
·Mar 14, 2022·

Subscribe to my newsletter and never miss my upcoming articles

Table of contents

  • What is an Event Driven Architecture, and how does it work?
  • What is Apache Kafka?
  • How to integrate kafka with python

Specifically, this blog will assist you in introducing tiny terminology and providing a brief introduction of technology. But, based on my experience, my major motivation here is to give a better configuration that aids in the efficient use of Kafka.

What is an Event Driven Architecture, and how does it work?

Events are used to trigger and communicate across disconnected services in an event-driven architecture, which is prevalent in today's microservices applications. A state change or update, such as when an item is purchased on an e-commerce website, is referred to as an event. It produces items like productCreated, cartCreated, and cartUpdated. The main advantage of using an event-driven architecture is that you don't have to wait for a database response or data processing. It's decoupled, so we can quickly respond to the user interface without causing too much delay. As a result, it will provide the best possible user experience. Every architecture has benefits and drawbacks. There are some drawbacks to event-driven architecture in this case.

What is Apache Kafka?

Apache Kafka is an open-source stream-processing software platform created by LinkedIn in 2011 to handle throughput, low latency transmission, and processing of the stream of records in real-time.

It has the following three significant capabilities, which makes it ideal for users:

  1. A high-throughput system. Kafka can handle high-velocity and high-volume data even if it doesn't have a lot of hardware.

  2. Short Latency

  3. Fault-Tolerant

  4. Longevity

  5. Scalability is a term used to describe the ability of a system to scale up or down

How to integrate kafka with python

There are numerous Python libraries for Apache Kafka, including kafka-python, confluent-kafka, and pykafka. Kafka-python and confluent-kafka were two of the tools I utilised.

Kafka-python, in my opinion, was simple to set up, and there are numerous tutorials and blogs available. But the worst thing was that once I utilised it, it would occasionally lose connection with the cluster when the producer wrote a message to it; I believe this was due to a broker compatibility issue. Another issue I had was that I was getting a lot of duplicate messages, so I tried to adjust the setup on the python side. They did not, however, resolve the issue. The first reason is because the kafka-python library does not support idempotence, which was the only feature that required the library to provide ids for each message.

Confluent Python is one of the best Python libraries for managing Kafka. It also supports idempotence, and I was able to lower the maximum number of duplicate messages produced by the producer by using this library. There are numerous blogs on how to connect to Kafka, but none on how to use this library effectively. As a result, I'll give a few samples.

Producer Sample Code

from confluent_kafka import Producer
import socket

 config= {
            'bootstrap.servers':  '**********',
            'security.protocol':  '**********',
            'ssl.ca.location': '**********',
            'client.id': socket.gethostname(),
            'enable.idempotence': True,
            'acks': 'all',
            'retries': 10,
            'compression.type': 'gzip',
            'max.in.flight.requests.per.connection': 5,
            'compression.codec': 'gzip'
            }

payload={
"message": "hello"
}
producer = Producer(config)
producer.produce(topic="sample_topic", key=str(uuid.uuid4().hex), value=json.dumps(payload).encode('utf-8'))
producer.poll(1)
producer.flush(1)

Here

  1. topic (str) – Topic to produce message to.
  2. key (object, optional) – Message key.
  3. value (object, optional) – Message payload.
  4. bootstrap.servers - Initial list of brokers as a CSV list of broker host
  5. client.id - Client identifier
  6. enable.idempotence - When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
  7. acks - This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: 0=Broker does not send any response/ack to client, -1 or all=Broker will block until message is committed by all in sync replicas (ISRs). If there are less than min.insync.replicas (broker configuration) in the ISR set the produce request will fail.
  8. retries - How many times to retry sending a failing Message. Note: retrying may cause reordering unless enable.idempotence is set to true.
  9. compression.type - compression codec to use for compressing message sets.
  10. compression.codec -Compression codec to use for compressing message sets. inherit = inherit global compression.codec
  11. max.in.flight.requests.per.connection - Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.

Consumer Sample Code

def create_consumer():

    def commit_callback(kafka_error, topic_partition):
        response = {
            "kafka_error": kafka_error,
            "topic_partition": topic_partition
        }
        logging.info("Commit info: "+ str(response))

    """Tries to establing the Kafka consumer connection"""
    try:
        brokers = '*******************'
        logger.info(f"Creating new kafka consumer using brokers: str(brokers) + ' and topic  is {configure.get_topic_name()}" )
        config= {
        'bootstrap.servers': brokers,
        'group.id': 'sample-consumer-group',
        'enable.auto.commit': False,
        'auto.offset.reset': 'earliest',
        'group.instance.id': socket.gethostname(),
        'security.protocol': 'SSL', 
        'on_commit': commit_callback,
        'ssl.ca.location': '*******************',
        'client.id': socket.gethostname()
        }
        return Consumer(config)  

    except Exception as e:
        logger.error("Error when connecting with kafka consumer: " + str(e))


consumer.subscribe(['sample_topic'])
try: 
     while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    logger.error('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
                raise KafkaException(msg.error())
            else:
                consumer.commit(message=msg, asynchronous=False)
                in_kafka_message = msg.value().decode('utf-8')
except Exception as e:
        logger.error(str(e))
finally:
        consumer.close()
  1. group.id - Client group id string. All clients sharing the same group.id belong to the same group.
  2. group.instance.id - Enable static group membership. Static group members are able to leave and rejoin a group within the configured session.timeout.ms without prompting a group rebalance. This should be used in combination with a larger session.timeout.ms to avoid group rebalances caused by transient unavailability (e.g. process restarts). Requires broker version >= 2.3.0
  3. enable.auto.commit - Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().
  4. auto.offset.reset - Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.

Did you find this article valuable?

Support Santhosh Thomas by becoming a sponsor. Any amount is appreciated!

See recent sponsors Learn more about Hashnode Sponsors
 
Share this