Sample Java Consumer

You need to first add the following dependency to the POM file:
/* This code is successfully tested for common-logging version 1.11 and 1.2. */

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SampleConsumer {
    // Set the stream and topic to read from
    public static String topic = "/<path to and name of the stream>:<name of topic>";

    // Declare a new consumer.
    public static KafkaConsumer<Integer, String> consumer;

    public static void main(String[] args) {

        // Subscribe to the topic.

        // Set the timeout interval for requests for unread messages.
        Duration pollTimeout = Duration.ofMillis(1000);

        try {
            while (true) {
                ConsumerRecords<Integer, String> records = consumer.poll(pollTimeout);
                records.forEach(record -> {
                    System.out.printf("%s %d %d %s %s \n", record.topic(),
                            record.partition(), record.offset(), record.key(), record.value());

        } finally {

    /* Set the value for a configuration parameter. 
       This configuration parameter specifies which 
       class to use to deserialize the value of each message. */
    public static void configureConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumer = new KafkaConsumer(props);