[UPDATE: Check out the Kafka Web Console to more easily administer your Kafka topics]


 

This week I’ve been working with the Kafka messaging system in a project.

Basic C# Methods for Kafka Producer

To publish to Kafka I built a C# app that uses the Kafka4n libraries – it doesn’t get much simpler than this:

using Kafka.Client;

Connector connector = new Connector(serverAddress, serverPort);
connector.Produce(correlationId, hostName, timeOut, topicName, partitionId, message);

I was reading from various event and performance monitoring logs and pushing them through just fine.

Basic Python Kafka Consumer

For watching the realtime feed, I created a consumer on a Linux machine using the Python kafka-python package:

from kafka import KafkaClient, SimpleConsumer
kafka = KafkaClient("10.0.1.223:6667")

consumer = SimpleConsumer(kafka, "my-group", "winlogs")
for message in consumer:
  print(message)

This worked great until I started pushing in a lot of data both in size and quantity.  Eventually I started getting an error that seems to relate to the max size my consumer could request:

No handlers could be found for logger "kafka"
Traceback (most recent call last):
 File "consume_winlogs.py", line 5, in <module>
 for message in consumer:
 File "/usr/lib/python2.6/site-packages/kafka/consumer.py", line 406, in __iter__
 message = self.get_message(True, timeout)
 File "/usr/lib/python2.6/site-packages/kafka/consumer.py", line 365, in get_message
 return self._get_message(block, timeout, get_partition_info)
 File "/usr/lib/python2.6/site-packages/kafka/consumer.py", line 378, in _get_message
 self._fetch()
 File "/usr/lib/python2.6/site-packages/kafka/consumer.py", line 438, in _fetch
 for message in resp.messages:
 File "/usr/lib/python2.6/site-packages/kafka/protocol.py", line 134, in _decode_message_set_iter
 raise ConsumerFetchSizeTooSmall()
kafka.common.ConsumerFetchSizeTooSmall

So I tweaked my publisher to make sure it wasn’t putting in really large messages (which wasn’t needed for my application anyway) and then cleared the Kafka topic.

Clearing Kafka Topics with Python

After trying a few different approaches to clearing the topic, I found this python approach to be simplest, using the zc.zk module.  First I listed the contents from Zookeeper:

import zc.zk
zk = zc.zk.ZooKeeper('10.0.1.223:2181')
zk.print_tree()

Which showed the topics and consumers that were of interest to me .  I found deleting topics was not enough, I had to also reset the consumer data here so my script would not try to pick up where it left off:

/brokers
 /topics
  /winlogs
...
/consumers
 /my-group
  /offsets
   /winlogs
   /0
    string_value = '1400'

To delete the topic and consumers, it only takes a couple more commands:

zk.delete_recursive('brokers/topics/winlogs')
zk.delete_recursive('consumers/my-group')

Then I started streaming new data back into my topic which will be auto-created.

Increasing Buffer Size

The default buffer size for my Python consumer script was set to some small size that prevents it from getting too many messages.

To fix this I add one more line to my Python consumer script, setting the max buffer size.  See what the default is by getting the value for:

consumer.max_buffer_size

Here I set it to 1Mb and it streams on happily for now, setting it to zero seems to let it go infinitely:

from kafka import KafkaClient, SimpleConsumer
kafka = KafkaClient("10.0.1.223:6667")
consumer = SimpleConsumer(kafka, "my-group2", "winlogs",)
print "Default max buffer size: " + consumer.max_buffer_size
consumer.max_buffer_size=1000000
for message in consumer:
  print(message)

References

  1. Kafka messaging system: https://kafka.apache.org/
  2. Kafka4n .NET client: https://github.com/miknil/Kafka4n
  3. Kafka-python module: https://pypi.python.org/pypi/kafka-python/0.9.2
  4. Python Zookeeper module: https://pypi.python.org/pypi/zc.zk

About Tyler Mitchell

Director Product Marketing @ OmniSci.com GPU-accelerate data analytics | Sr. Product Manager @ Couchbase.com - next generation Data Platform for System of Engagement! Former Eng. Director @Actian.com, author and technology writer in NoSQL, big data, graph analytics, geospatial and Internet of Things. Follow me @1tylermitchell or get my book from http://locatepress.com/.