Create And Delete Kafka Topic Using Python

Create And Delete Kafka Topic Using Python

Santhosh Thomas
·Jan 1, 2022·

Subscribe to my newsletter and never miss my upcoming articles

Table of contents

  • Install kafka-python package

Install kafka-python package

Python client for the Apache Kafka distributed stream processing system.

pip install kafka-python
from kafka.admin import KafkaAdminClient, NewTopic


admin_client = KafkaAdminClient(bootstrap_servers=[ipaddress:port])


topic_names = ['topic1', 'topic2', 'topic3' , 'topic3']

def create_topics(topic_names):
    print(list(consumer.topics()))
    topic_list = []
    for topic in topic_names:
        if topic not in existing_topic_list:
            print('Topic : {} added '.format(topic))
            topic_list.append(NewTopic(name=topic, num_partitions=3, replication_factor=3))
        else:
            print('Topic : {topic} already exist ')
    try:
        if topic_list:
            admin_client.create_topics(new_topics=topic_list, validate_only=False)
            print("Topic Created Successfully")
        else:
            print("Topic Exist")
    except TopicAlreadyExistsError as e:
        print("Topic Already Exist")
    except  Exception as e:
        print(e)

def delete_topics(topic_names):
    try:
        admin_client.delete_topics(topics=topic_names)
        print("Topic Deleted Successfully")
    except UnknownTopicOrPartitionError as e:
        print("Topic Doesn't Exist")
    except  Exception as e:
        print(e)

create_topics(topic_names)
 
Share this