Sample Uncached Consumer Application for Audit Stream
The ConsumerUncached.java application demonstrates how to connect to the HPE Ezmeral Data Fabric file system, and consume the messages in a stream topic.
Before running this application, ensure that you have access to a cluster running file system. To build and run this application:
- Set the classpath as shown below:
export CLASSPATH=`hadoop classpath`
- Compile the Java file as shown below:
javac -cp .:`mapr classpath` ConsumerUncached.java
- Run the final
ConsumerUncached.class
file. For example:java -cp .:`mapr classpath` ConsumerUncached
This application requires the following:
org.apache.kafka.clients.consumer.ConsumerRecord
org.apache.kafka.clients.consumer.ConsumerRecords
org.apache.kafka.clients.consumer.KafkaConsumer
org.apache.hadoop.conf.Configuration
com.mapr.fs.MapRFileSystem
com.google.common.io.Resources
java.net.URI
java.io.IOException
java.io.InputStream
java.util.Iterator
java.util.Properties
java.util.Random
java.util.StringTokenizer
java.util.regex.Pattern
The application performs the actions described in the following sections.
- Initializes the consumer properties
- The configuration
parameters for the consumer are stored in
consumer.props
file. This file should be present in the current directory or mapr classpath. For example, yourconsumer.props
file could look similar to the following:
The application initializes the consumer properties stored in the#bootstrap.servers=localhost:9092 group.id=test enable.auto.commit=true key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer # fast session timeout makes it more fun to play with failover ## apps specific ? session.timeout.ms=10000 # These buffer sizes are needed to avoid consumer switching to # a mode where it processes one bufferful every 5 seconds with multiple # timeouts along the way. fetch.min.bytes=50000 # receive.buffer.bytes=262144 // fixed size buffer max.partition.fetch.bytes=2097152 auto.offset.reset=earliest
consumer.props
file.public static void main(String[] args) throws IOException,InterruptedException { KafkaConsumer<String, String> consumer; try (InputStream props = Resources.getResource("consumer.props").openStream()) { Properties properties = new Properties(); properties.load(props); if (properties.getProperty("group.id") == null) { properties.setProperty("group.id", "group-" + new Random().nextInt(100000)); } consumer = new KafkaConsumer<>(properties); } }
- Subscribes to the topic to read from
- The application initializes the filesystem object, with the last parameter as
true
so that the audit logs generated by the operations for converting fid to file path and volid to volume name are sent to theExpandAudit.json.log
file used by theexpandaudit
utility and not to the stream. It then selects the stream and subscribes to the topic to read at path/var/mapr/auditstream/auditlogstream:<clustername>
.Configuration conf = new Configuration(); String uri = MAPRFS_URI; uri = uri + "mapr/"; conf.set("fs.default.name", uri); MapRFileSystem fs = new MapRFileSystem(); fs.initialize(URI.create(uri), conf, true); Pattern pattern = Pattern.compile("/var/mapr/auditstream/auditlogstream:<clustername>.+"); consumer.subscribe(pattern);
- Requests unread messages from the topic
- The application requests to read unread messages in the subscribed topic. It then
iterates through the returned records, extracts the value of each message, and prints
the value to the standard output.
boolean stop = false; int pollTimeout = 1000; while (!stop) { ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); if (iterator.hasNext()) { while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); String value = record.value(); String rvalue = value.replace("\"",""); String recordValue = processRecord(fs, rvalue, value); System.out.println((" Consumed Record: " + recordValue)); } } else { //stop = true; } }
- Gets the record and expands individual fields
- The application then takes the record and expands fid in the message to path to file
using the
getMountPathFid()
API and volid in the message to volume name using thegetVolumeName()
API.public static String processRecord(MapRFileSystem fs, String rvalue, String value) { StringTokenizer st = new StringTokenizer(rvalue, ","); String lfidPath = ""; String lvolName = ""; while (st.hasMoreTokens()) { String field = st.nextToken(); StringTokenizer st1 = new StringTokenizer(field, ":"); while (st1.hasMoreTokens()) { String token = st1.nextToken(); if (token.endsWith("Fid")) { String lfidStr = st1.nextToken(); String path= null; try { path = fs.getMountPathFid(lfidStr); } catch (IOException e){ } lfidPath = "\"FidPath\":\""+path+"\","; if (token.endsWith("volumeId")) { String volid = st1.nextToken(); String name= null; try { int volumeId = Integer.parseInt(volid); name = fs.getVolumeName(volumeId); } catch (IOException e){ } lvolName = "\"VolumeName\":\""+name+"\","; } } }
- Returns the record
- The application finally returns the record after expanding the fid and volid to file
path and volume name respectively.
String result = ""; StringTokenizer st2 = new StringTokenizer(value,","); while (st2.hasMoreTokens()) { String tokens = st2.nextToken(); result = result + tokens + ","; if (tokens.contains("Fid")) { result = result + lfidPath; } if (tokens.contains("volumeId")) { result = result + lvolName; } return result.substring(0, result.length() - 1); }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.hadoop.conf.Configuration; import com.mapr.fs.MapRFileSystem; import com.google.common.io.Resources; import java.net.URI; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; import java.util.Properties; import java.util.Random; import java.util.StringTokenizer; import java.util.regex.Pattern; public class ConsumerUncached { // Set the stream and topic to read from. private static final String MAPRFS_URI = "maprfs:///"; public static void main(String[] args) throws IOException,InterruptedException { //configureConsumer(args); // and the consumer KafkaConsumer<String, String> consumer; try (InputStream props = Resources.getResource("consumer.props").openStream()) { Properties properties = new Properties(); properties.load(props); if (properties.getProperty("group.id") == null) { properties.setProperty("group.id", "group-" + new Random().nextInt(100000)); } consumer = new KafkaConsumer<>(properties); } Configuration conf = new Configuration(); String uri = MAPRFS_URI; uri = uri + "mapr/"; conf.set("fs.default.name", uri); MapRFileSystem fs = new MapRFileSystem(); fs.initialize(URI.create(uri), conf, true); //final String topic = "/var/mapr/auditstream/auditlogstream:<clustername>_atsqa4-130.qa.lab"; //Replace <clustername> by the name of cluster Pattern pattern = Pattern.compile("/var/mapr/auditstream/auditlogstream:<clustername>.+"); // Subscribe to the topic. consumer.subscribe(pattern); boolean stop = false; int pollTimeout = 1000; while (!stop) { // Request unread messages from the topic. ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); if (iterator.hasNext()) { while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); // Iterate through returned records, extract the value // of each message, and print the value to standard output. //System.out.println((" Consumed Record: " + record.toString())); String value = record.value(); String rvalue = value.replace("\"",""); String recordValue = processRecord(fs, rvalue, value); System.out.println((" Consumed Record: " + recordValue)); //System.out.println((" Consumed Record: " + value)); } } else { Thread.sleep(1000); //stop = true; } } consumer.close(); System.out.println("All done."); } public static String processRecord(MapRFileSystem fs, String rvalue, String value) { StringTokenizer st = new StringTokenizer(rvalue, ","); String lfidPath = ""; String lvolName = ""; while (st.hasMoreTokens()) { String field = st.nextToken(); StringTokenizer st1 = new StringTokenizer(field, ":"); while (st1.hasMoreTokens()) { String token = st1.nextToken(); if (token.endsWith("Fid")) { String lfidStr = st1.nextToken(); String path= null; try { path = fs.getMountPathFid(lfidStr); } catch (IOException e){ } lfidPath = "\"FidPath\":\""+path+"\","; // System.out.println("\nPAth for fid " + lfidStr + "is " + path); } if (token.endsWith("volumeId")) { String volid = st1.nextToken(); String name= null; try { int volumeId = Integer.parseInt(volid); name = fs.getVolumeName(volumeId); } catch (IOException e){ } lvolName = "\"VolumeName\":\""+name+"\","; // System.out.println("\nVolume Name for volid " + volid + "is " + name); } } } String result = ""; StringTokenizer st2 = new StringTokenizer(value,","); while (st2.hasMoreTokens()) { String tokens = st2.nextToken(); result = result + tokens + ","; if (tokens.contains("Fid")) { result = result + lfidPath; } if (tokens.contains("volumeId")) { result = result + lvolName; } } return result.substring(0, result.length() - 1); } }