Kafka Consumer Setup: Receiving Data via CLI with Java
In this guide, you will learn extracting data via Java using Apache Kafka Consumer.
We will implement load balancing, manual offset recording (commit) and graceful shutdown steps with consumer groups.
🧠 Technical Summary
This guide shows you how to develop a Kafka Consumer application in a Maven-based Java project.
The purpose is to read and process data from the subject java_demo,
balancing the load with groups and preventing data loss with manual offset recording.
🧰 Prerequisites
- Server: At least 4GB RAM / 2 CPUs (e.g.
tr1-node01on GenixNode) - Java: JDK 8+ installed (
sudo apt install openjdk-17-jdk) - Kafka: Apache Kafka installed and configured
- Producer: Kafka producer project that sends messages with CLI must be ready
🛠️ Step 1 – Creating the Kafka Consumer Class
Create and open the file src/main/java/com/dokafka/ConsumerDemo.java:
package com.dokafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.*;
import java.time.Duration;
import java.util.*;
public class ConsumerDemo {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
String groupId = "grup_genixnode";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info(String.format("konu=%s | bölüt=%d | ofset=%d | değer=%s",
record.topic(), record.partition(), record.offset(), record.value()));
}
}
} catch (Exception e) {
log.error("Hata oluştu", e);
} finally {
consumer.close();
}
}
}
💬 This code receives and logs messages from the java_demo topic.
🔧 Step 2 – Compiling and Running
Create a script called run-consumer.sh:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ConsumerDemo
Make the script executable:
chmod +x run-consumer.sh
./run-consumer.sh
💬 This script cleans up the Maven project, packages it and runs the consumer.
🧼 Step 3 – Graceful Shutdown
If Kafka Consumer suddenly shuts down, connections may remain open. Add a shutdown hook to prevent this:
Thread currentThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup();
try {
currentThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
Update the try block:
} catch (WakeupException e) {
log.info("Consumer durduruluyor...");
💬 This method ensures that the consumer closes properly in situations such as CTRL+C.
⚖️ Step 4 – Consumer Groups and Partitions
Kafka topics are divided into partitions. Consumers with the same Group ID balance the load by sharing these segments.
bin/kafka-topics.sh --create --topic java_demo_partitions \
--bootstrap-server localhost:9092 --partitions 2
💬 Messages with the same key value are written to the same segment — ordering is preserved.
🧩 Step 5 – Manual Offset Posting (Manual Commit)
Prevent data loss by using commitSync() instead of automatic offset recording:
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
for (ConsumerRecord<String, String> record : records) {
log.info(String.format("konu=%s | bölüt=%d | ofset=%d | değer=%s",
record.topic(), record.partition(), record.offset(), record.value()));
consumer.commitSync(); // Manuel ofset kaydı
}
💬 This process is blocking, but it is the safest commit method.
❓ Frequently Asked Questions (FAQ)
- What does Deserializer do?
Converts byte arrays to the original data type (e.g. String).
- Why is Consumer Group ID important?
Consumers with the same ID share the same topic and the load is balanced.
- What does AUTO_OFFSET_RESET_CONFIG do?
If there is no past position, the consumer reads from the beginning with earliest and starts from new messages with latest.
- What is the difference between commitAsync?
It is performant but has poor error handling.
- What does consumer.poll() do?
It pulls new records from Kafka within a certain period of time.
🏁 Conclusion
In this guide:
Creating Kafka Consumer,
Safe shutdown (graceful shutdown) structure,
You learned about consumer groups and offset management.
💡 Now try this application on GenixNode and manage real-time data flow by setting up your own Kafka cluster. 🚀

