Sample Cached Consumer Application for Audit Stream
The Consumer.java application demonstrates how to connect to the file system, and consume the messages in a stream topic.
Before running this application, ensure that you have access to a cluster running file system. To build and run this application:
- Set the classpath as shown below:
export CLASSPATH=`hadoop classpath`
- Compile the Java file as shown below:
javac -cp .:`mapr classpath` Consumer.java
- Run the final
Consumer.class
file. For example:java -cp .:`mapr classpath` Consumer
This application requires the following imports:
org.apache.kafka.clients.consumer.ConsumerRecord
org.apache.kafka.clients.consumer.ConsumerRecords
org.apache.kafka.clients.consumer.KafkaConsumer
org.apache.hadoop.conf.Configuration
com.mapr.fs.MapRFileSystem
com.google.common.io.Resources
java.net.URI
java.io.IOException
java.io.InputStream
java.util.Iterator
java.util.Properties
java.util.Random
java.util.StringTokenizer
java.util.regex.Pattern
The application performs the actions described in the following sections.
- Initializes the consumer properties
- The configuration parameters for the
consumer are stored in
consumer.props
file. This file should be present in the current directory or mapr classpath. For example, yourconsumer.props
file could look similar to the following:
The application initializes the consumer properties stored in the#bootstrap.servers=localhost:9092 group.id=test enable.auto.commit=true key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer # fast session timeout makes it more fun to play with failover ## apps specific ? session.timeout.ms=10000 # These buffer sizes are needed to avoid consumer switching to # a mode where it processes one bufferful every 5 seconds with multiple # timeouts along the way. fetch.min.bytes=50000 # receive.buffer.bytes=262144 // fixed size buffer max.partition.fetch.bytes=2097152 auto.offset.reset=earliest
consumer.props
file.public static void main(String[] args) throws IOException,InterruptedException { KafkaConsumer<String, String> consumer; try (InputStream props = Resources.getResource("consumer.props").openStream()) { Properties properties = new Properties(); properties.load(props); if (properties.getProperty("group.id") == null) { properties.setProperty("group.id", "group-" + new Random().nextInt(100000)); } consumer = new KafkaConsumer<>(properties); } }
- Subscribes to the topic to read from
- The application initializes the file system object, with the last parameter as
true
so that the audit logs generated by the operations for converting fid to file path and volid to volume name are sent to theExpandAudit.json.log
file used by theexpandaudit
utility and not to the stream. It then selects the stream and subscribes to the topic to read at path/var/mapr/auditstream/auditlogstream:<clustername>
.Configuration conf = new Configuration(); String uri = MAPRFS_URI; uri = uri + "mapr/"; conf.set("fs.default.name", uri); MapRFileSystem fs = new MapRFileSystem(); fs.initialize(URI.create(uri), conf, true); Pattern pattern = Pattern.compile("/var/mapr/auditstream/auditlogstream:<clustername>.+"); consumer.subscribe(pattern);
- Requests unread messages from the topic
- The application requests to read unread messages in the subscribed topic. It then
iterates through the returned records, extracts the value of each message, and prints
the value to the standard output.
boolean stop = false; int pollTimeout = 1000; while (!stop) { ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); if (iterator.hasNext()) { while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); String value = record.value(); String rvalue = value.replace("\"",""); String recordValue = processRecord(fs, rvalue, value); System.out.println((" Consumed Record: " + recordValue)); } } else { Thread.sleep(1000); //stop = true; } }
- Gets the record and expands individual fields
- The application then takes the record and expands fid in the message to path to file
using the
getMountPathFidCached()
API and volid in the message to volume name using thegetVolumeNameCached()
API.while (st.hasMoreTokens()) { String field = st.nextToken(); StringTokenizer st1 = new StringTokenizer(field, ":"); while (st1.hasMoreTokens()) { String token = st1.nextToken(); if (token.endsWith("Fid")) { String lfidStr = st1.nextToken(); String path= null; try { path = fs.getMountPathFidCached(lfidStr); // Expand FID to path } catch (IOException e){ } lfidPath = "\"FidPath\":\""+path+"\","; // System.out.println("\nPAth for fid " + lfidStr + "is " + path); } if (token.endsWith("volumeId")) { String volid = st1.nextToken(); String name= null; try { int volumeId = Integer.parseInt(volid); name = fs.getVolumeNameCached(volumeId); // Cached API to convert volume Id to volume Name } catch (IOException e){ } lvolName = "\"VolumeName\":\""+name+"\","; // System.out.println("\nVolume Name for volid " + volid + "is " + name); } }
- Returns the record
- The application finally returns the record after expanding the fid and volid to file
path and volume name respectively.
String result = ""; StringTokenizer st2 = new StringTokenizer(value,","); while (st2.hasMoreTokens()) { String tokens = st2.nextToken(); result = result + tokens + ","; if (tokens.contains("Fid")) { result = result + lfidPath; } if (tokens.contains("volumeId")) { result = result + lvolName; } } return result.substring(0, result.length() - 1);
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.hadoop.conf.Configuration; import com.mapr.fs.MapRFileSystem; import com.google.common.io.Resources; import java.net.URI; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; import java.util.Properties; import java.util.Random; import java.util.StringTokenizer; import java.util.regex.Pattern; public class Consumer { // Set the stream and topic to read from. private static final String MAPRFS_URI = "maprfs:///"; public static void main(String[] args) throws IOException,InterruptedException { //configureConsumer(args); //and the consumer KafkaConsumer<String, String> consumer; try (InputStream props = Resources.getResource("consumer.props").openStream()) { Properties properties = new Properties(); properties.load(props); if (properties.getProperty("group.id") == null) { properties.setProperty("group.id", "group-" + new Random().nextInt(100000)); } consumer = new KafkaConsumer<>(properties); } Configuration conf = new Configuration(); String uri = MAPRFS_URI; uri = uri + "mapr/"; conf.set("fs.default.name", uri); MapRFileSystem fs = new MapRFileSystem(); fs.initialize(URI.create(uri), conf, true); //final String topic = "/var/mapr/auditstream/auditlogstream:<clustername>_atsqa4-130.qa.lab"; //Replace <clustername> by the name of cluster Pattern pattern = Pattern.compile("/var/mapr/auditstream/auditlogstream:<clustername>.+"); // Subscribe to the topic. consumer.subscribe(pattern); boolean stop = false; int pollTimeout = 1000; while (!stop) { // Request unread messages from the topic. ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); if (iterator.hasNext()) { while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); // Iterate through returned records, extract the value // of each message, and print the value to standard output. // System.out.println((" Consumed Record: " + record.toString())); String value = record.value(); String rvalue = value.replace("\"",""); String recordValue = processRecord(fs, rvalue, value); System.out.println((" Consumed Record: " + recordValue)); //System.out.println((" Consumed Record: " + value)); } } else { Thread.sleep(1000); //stop = true; } } consumer.close(); System.out.println("All done."); } /* Get the record and expand individual fields */ public static String processRecord(MapRFileSystem fs, String rvalue, String value) { StringTokenizer st = new StringTokenizer(rvalue, ","); String lfidPath = ""; String lvolName = ""; while (st.hasMoreTokens()) { String field = st.nextToken(); StringTokenizer st1 = new StringTokenizer(field, ":"); while (st1.hasMoreTokens()) { String token = st1.nextToken(); /* If the field has fid, expand it using Cached API */ if (token.endsWith("Fid")) { String lfidStr = st1.nextToken(); String path= null; try { path = fs.getMountPathFidCached(lfidStr); // Expand FID to path } catch (IOException e){ } lfidPath = "\"FidPath\":\""+path+"\","; // System.out.println("\nPAth for fid " + lfidStr + "is " + path); } if (token.endsWith("volumeId")) { String volid = st1.nextToken(); String name= null; try { int volumeId = Integer.parseInt(volid); name = fs.getVolumeNameCached(volumeId); // Cached API to convert volume Id to volume Name } catch (IOException e){ } lvolName = "\"VolumeName\":\""+name+"\","; // System.out.println("\nVolume Name for volid " + volid + "is " + name); } } } String result = ""; StringTokenizer st2 = new StringTokenizer(value,","); while (st2.hasMoreTokens()) { String tokens = st2.nextToken(); result = result + tokens + ","; if (tokens.contains("Fid")) { result = result + lfidPath; } if (tokens.contains("volumeId")) { result = result + lvolName; } } //return record after expansion of fid and volume id return result.substring(0, result.length() - 1); } }