You need to first add the following dependency to the POM file: <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
/* This code is successfully tested for common-logging version 1.11 and 1.2. */
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SampleConsumer {
// Set the stream and topic to read from
public static String topic = "/<path to and name of the stream>:<name of topic>";
// Declare a new consumer.
public static KafkaConsumer<Integer, String> consumer;
public static void main(String[] args) {
configureConsumer();
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(topic));
// Set the timeout interval for requests for unread messages.
Duration pollTimeout = Duration.ofMillis(1000);
try {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(pollTimeout);
records.forEach(record -> {
System.out.printf("%s %d %d %s %s \n", record.topic(),
record.partition(), record.offset(), record.key(), record.value());
});
}
} finally {
consumer.close();
}
}
/* Set the value for a configuration parameter.
This configuration parameter specifies which
class to use to deserialize the value of each message. */
public static void configureConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer(props);
}
}