/* Copyright (c) 2009 & onwards. MapR Tech, Inc., All rights reserved */

package com.mapr.streams;

import org.ojai.Document;
import org.ojai.DocumentStream;
import org.ojai.store.DocumentStore;

import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.regex.Pattern;

import com.mapr.streams.Streams;

public class StreamAnalyzer {

  public static String streamName = null;
  public static Pattern topicRegex = null;
  public static String[] topicNames = null;
  public static String[] projections = null;
  public static boolean countMessages = true;
  public static boolean printMessages = false;

  public static void usage() {
    System.err.println("StreamAnalyzer -path <stream-full-name>");
    System.err.println("     [ -topics <comma separated topic names> ]");
    System.err.println("     [ -regex  <regular expression representing topic names> ]");
    System.err.println("     [ -countMessages <true/false> (default: true) ]");
    System.err.println("     [ -printMessages <true/false> (default: false) ]");
    System.err.println("     [ -projectFields <comma separated field names> ");
    System.err.println("                      (Valid field names: "
                                               + Streams.KEY + ", "
                                               + Streams.VALUE + ", "
                                               + Streams.OFFSET + ", "
                                               + Streams.TOPIC + ", "
                                               + Streams.PARTITION + ", "
                                               + Streams.PRODUCER + ".");
    System.err.println("                       Works only if printMessages is true.");
    System.err.println("                       Default: all fields.) ]");
    System.exit(1);
  }

  public static void main(String[] args) throws IOException {
    for (int i = 0; i < args.length; ++i) {
      if (args[i].equals("-path")) {
        i++;
        if (i >= args.length) usage();
        streamName = args[i];
      } else if (args[i].equals("-topics")) {
        i++;
        if (i >= args.length) usage();
        topicNames = args[i].split(",");
      } else if (args[i].equals("-regex")) {
        i++;
        if (i >= args.length) usage();
        topicRegex = Pattern.compile(args[i]);
      } else if (args[i].equals("-countMessages")) {
        i++;
        if (i >= args.length) usage();
        countMessages = Boolean.parseBoolean(args[i]);
      } else if (args[i].equals("-printMessages")) {
        i++;
        if (i >= args.length) usage();
        printMessages = Boolean.parseBoolean(args[i]);
      } else if (args[i].equals("-projectFields")) {
        i++;
        if (i >= args.length) usage();
        projections = args[i].split(",");
      } else {
        usage();
      }
    }

    if (streamName == null) {
      System.err.println("Stream path is mandatory command-line option");
      usage();
    }

    if ((projections != null) && (printMessages == false)) {
      System.err.println("Projections are only used if '-printMessages' is set to true");
      usage();
    }

    if ((topicNames != null) && (topicRegex != null)) {
      System.err.println("Only '-topics' or '-regex' allowed");
      usage();
    }

    runTest();
  }

  public static void runTest() throws IOException {
    DocumentStore store = null;
    if (topicNames != null) {
      store = Streams.getMessageStore(streamName, topicNames);
    } else if (topicRegex != null) {
      store = Streams.getMessageStore(streamName, topicRegex);
    } else {
      store = Streams.getMessageStore(streamName);
    }

    DocumentStream<Document>rs = null;

    if (projections != null) {
      rs = (DocumentStream<Document>)store.find(projections);
    } else {
      rs = (DocumentStream<Document>)store.find();
    }

    Iterator<Document> iter = rs.iterator();
    int count = 0;
    while (iter.hasNext()) {
      Document d = iter.next();
      if (printMessages) {
        if (d.getString(Streams.TOPIC) != null) {
          System.out.print(" topic: " + d.getString(Streams.TOPIC));
        }

        try {
          System.out.print(" partition: " + d.getInt(Streams.PARTITION));
        } catch (NoSuchElementException e) { }

        try {
          System.out.print(" offset: " + d.getLong(Streams.OFFSET));
        } catch (NoSuchElementException e) { }

        if (d.getString(Streams.PRODUCER) != null) {
          System.out.print(" producer: " + d.getString(Streams.PRODUCER));
        }

        if (d.getBinary(Streams.KEY) != null) {
          System.out.print(" key: <binary>");
        }

        if (d.getBinary(Streams.VALUE) != null) {
          System.out.print(" value: <binary>");
        }

        System.out.println();
      }

      count ++;
    }

    if (countMessages) {
      System.out.println("Total number of messages: " + count);
    }
  }
}
