[UPDATE: Check out the Kafka Web Console that allows you to manage topics and see traffic going through your topics – all in a browser!]
When you’re pushing data into a Kafka topic, it’s always helpful to monitor the traffic using a simple Kafka consumer script. Here’s a simple script I’ve been using that subscribes to a given topic and outputs the results. It depends on the kafka-python module and takes a single argument for the topic name. Modify the script to point to the right server IP.
from kafka import KafkaClient, SimpleConsumer from sys import argv kafka = KafkaClient("10.0.1.100:6667") consumer = SimpleConsumer(kafka, "my-group", argv[1]) consumer.max_buffer_size=0 consumer.seek(0,2) for message in consumer: print("OFFSET: "+str(message[0])+"\t MSG: "+str(message[1][3]))
Max Buffer Size
There are two lines I wanted to focus on in particular. The first is the “max_buffer_size” setting:
consumer.max_buffer_size=0
When subscribing to a topic with a high level of messages that have not been received before, the consumer/client can max out and fail. Setting an infinite buffer size (zero) allows it to take everything that is available.
If you kill and restart the script it will continue where it last left off, at the last offset that was received. This is pretty cool but in some environments it has some trouble, so I changed the default by adding another line.
Offset Out of Range Error
As I regularly kill the servers running Kafka and the producers feeding it (yes, just for fun), things sometimes go a bit crazy, not entirely sure why but I got the error:
kafka.common.OffsetOutOfRangeError: FetchResponse(topic='my_messages', partition=0, error=1, highwaterMark=-1, messages=)
To fix it I added the “seek” setting:
consumer.seek(0,2)
If you set it to (0,0) it will restart scanning from the first message. Setting it to (0,2) allows it to start from the most recent offset – so letting you tap back into the stream at the latest moment.
Removing this line forces it back to the context mentioned earlier, where it will pick up from the last message it previously received. But if/when that gets broke, then you’ll want to have a line like this to save the day.
For more about Kafka on Hadoop – see Hortonworks excellent overview page from which the screenshot above is taken.
- Geography + Data - July 15, 2021
- DIY Battery – Weekend Project – Aluminum + Bleach? - January 17, 2021
- It’s all about the ecosystem – build and nurture yours - May 1, 2020
- Learnings from TigerGraph and Expero webinar - April 1, 2020
- 4 Webinars This Week – GPU, 5G, graph analytics, cloud - March 30, 2020
- Diving into #NoSQL from the SQL Empire … - February 28, 2017
- VID: Solving Performance Problems on Hadoop - July 5, 2016
- Storing Zeppelin Notebooks in AWS S3 Buckets - June 7, 2016
- VirtualBox extension pack update on OS X - April 11, 2016
- Zeppelin Notebook Quick Start on OSX v0.5.6 - April 4, 2016
Where does the client save its last offset?
The server actually tracks the client’s offset. The “group” name provided is used as an identifier for the server – so if you connect from another machine, but use the same group ID then the server knows what offset to use. Hope that makes sense.
Subject : data sending problem from one machine to another machine using Kafka python
Dear ,
We are facing problem when we send one topic data from one machine to another (Kafka server) ,
Actually my requirement is create topic in one machine send this topic via Kafka producer in another Kafka server
When we create topic and send this topic on same machine then we are able to subscribe this topic on same machine
But as per my requirement when we send this topic from one machine to another Kafka server then we are not able to subscribe this topic and when we try to send this topic then only topic name is showing in lenses GUI tool,but inside this topic no data is comming
Please provide me dummy code. In Kafka python and other file setting which is required for solving this problem
We are suffering this problem since from last week
We also take help from Kafka forum
But are not successful to solve this problem
Please help to solve this problem
And send one dummy code in python Kafka and other setting which is required to solve this problem
We are using CentOS operating system
[…] http://www.makedatauseful.com/kafka-consumer-simple-python-script-and-tips/ […]