[UPDATE: Check out the Kafka Web Console to more easily administer your Kafka topics]
This week I’ve been working with the Kafka messaging system in a project.
Basic C# Methods for Kafka Producer
To publish to Kafka I built a C# app that uses the Kafka4n libraries – it doesn’t get much simpler than this:
using Kafka.Client; Connector connector = new Connector(serverAddress, serverPort); connector.Produce(correlationId, hostName, timeOut, topicName, partitionId, message);
I was reading from various event and performance monitoring logs and pushing them through just fine.
Basic Python Kafka Consumer
For watching the realtime feed, I created a consumer on a Linux machine using the Python kafka-python package:
from kafka import KafkaClient, SimpleConsumer kafka = KafkaClient("10.0.1.223:6667") consumer = SimpleConsumer(kafka, "my-group", "winlogs") for message in consumer: print(message)
This worked great until I started pushing in a lot of data both in size and quantity. Eventually I started getting an error that seems to relate to the max size my consumer could request:
No handlers could be found for logger "kafka" Traceback (most recent call last): File "consume_winlogs.py", line 5, in <module> for message in consumer: File "/usr/lib/python2.6/site-packages/kafka/consumer.py", line 406, in __iter__ message = self.get_message(True, timeout) File "/usr/lib/python2.6/site-packages/kafka/consumer.py", line 365, in get_message return self._get_message(block, timeout, get_partition_info) File "/usr/lib/python2.6/site-packages/kafka/consumer.py", line 378, in _get_message self._fetch() File "/usr/lib/python2.6/site-packages/kafka/consumer.py", line 438, in _fetch for message in resp.messages: File "/usr/lib/python2.6/site-packages/kafka/protocol.py", line 134, in _decode_message_set_iter raise ConsumerFetchSizeTooSmall() kafka.common.ConsumerFetchSizeTooSmall
So I tweaked my publisher to make sure it wasn’t putting in really large messages (which wasn’t needed for my application anyway) and then cleared the Kafka topic.
Clearing Kafka Topics with Python
After trying a few different approaches to clearing the topic, I found this python approach to be simplest, using the zc.zk module. First I listed the contents from Zookeeper:
import zc.zk zk = zc.zk.ZooKeeper('10.0.1.223:2181') zk.print_tree()
Which showed the topics and consumers that were of interest to me . I found deleting topics was not enough, I had to also reset the consumer data here so my script would not try to pick up where it left off:
/brokers /topics /winlogs ... /consumers /my-group /offsets /winlogs /0 string_value = '1400'
To delete the topic and consumers, it only takes a couple more commands:
Then I started streaming new data back into my topic which will be auto-created.
Increasing Buffer Size
The default buffer size for my Python consumer script was set to some small size that prevents it from getting too many messages.
To fix this I add one more line to my Python consumer script, setting the max buffer size. See what the default is by getting the value for:
Here I set it to 1Mb and it streams on happily for now, setting it to zero seems to let it go infinitely:
from kafka import KafkaClient, SimpleConsumer kafka = KafkaClient("10.0.1.223:6667") consumer = SimpleConsumer(kafka, "my-group2", "winlogs",) print "Default max buffer size: " + consumer.max_buffer_size consumer.max_buffer_size=1000000 for message in consumer: print(message)
- Kafka messaging system: https://kafka.apache.org/
- Kafka4n .NET client: https://github.com/miknil/Kafka4n
- Kafka-python module: https://pypi.python.org/pypi/kafka-python/0.9.2
- Python Zookeeper module: https://pypi.python.org/pypi/zc.zk
- Geography + Data - July 15, 2021
- DIY Battery – Weekend Project – Aluminum + Bleach? - January 17, 2021
- It’s all about the ecosystem – build and nurture yours - May 1, 2020
- Learnings from TigerGraph and Expero webinar - April 1, 2020
- 4 Webinars This Week – GPU, 5G, graph analytics, cloud - March 30, 2020
- Diving into #NoSQL from the SQL Empire … - February 28, 2017
- VID: Solving Performance Problems on Hadoop - July 5, 2016
- Storing Zeppelin Notebooks in AWS S3 Buckets - June 7, 2016
- VirtualBox extension pack update on OS X - April 11, 2016
- Zeppelin Notebook Quick Start on OSX v0.5.6 - April 4, 2016