Getting Started with Spark Interactive Shell

After you have a basic understanding of Apache Spark and have it installed and running on your cluster, you can use it to load datasets, apply schemas, and query data from the Spark interactive shell.

Reading Data from file system

  1. Copy sample data into file system:

    - For this example, the dataset constitutes a CSV file of a list of auctions.

    - Download the file from GitHub: https://github.com/mapr-demos/getting-started-spark-on-mapr/tree/master/data.

    - Copy the file into your cluster, in the /apps/ directory, using the cp/scp or hadoop put command:
    scp ./data/auctiondata.csv mapr@[mapr-cluster-node]:/mapr/[cluster-name]/apps/
    or
    $ hadoop fs -put ./data/auctiondata.csv /apps
    - This dataset is from eBay online auctions. The dataset contains the following fields:
    auctionid - Unique identifier of an auction.
    bid - Proxy bid placed by a bidder.
    bidtime - Time (in days) that the bid was placed from the start of the auction.
    bidder - eBay username of the bidder.
    bidderrate - eBay feedback rating of the bidder.
    openbid - Opening bid set by the seller.
    price - Closing price that the item sold for (equivalent to the second highest bid + an increment). 
    item - Type of item. 

    The table below shows the fields with some sample data:

    auctionid bid bidtime bidder bidderrate openbid price item daystolive
    8213034705 95 2.927373 jake7870 0 95 117.5 xbox 3
  2. Start the Spark interactive shell:

    - $SPARK_HOME represents the home of your Spark installation in MapR, for example: /opt/mapr/spark/spark-2.2.1/.

    $ $SPARK_HOME/bin/spark-shell --master local[2]
  3. Once the Spark shell is ready, load the dataset:
     val auctionData = spark.read.textFile("/apps/auctiondata.csv")
  4. Display the first entry:
     auctionData.first()
  5. Count the number of entries:
     auctionData.count()
  6. Use other Spark actions:
    // Displays first 20 lines
    auctionData.show()
                         
    // Displays first 3 lines - change value to see more/less
    auctionData.take(3)
  7. Transform the dataset into a new one that contains only xbox lines, and count them:
     val auctionWithXbox = auctionData.filter(line => line.contains("xbox"))
    auctionWithXbox.count()

    - This could also be done in a single line by chaining transformations and actions:

     auctionData.filter(line => line.contains("xbox")).count()
  8. Use Spark Dataframes:
     val auctionDataFrame = spark.read.format("csv").option("inferSchema",
    true).load("/apps/auctiondata.csv").toDF("auctionid","bid","bidtime","bidder","bidderrate","openbid","price","item","daystolive")
  9. Use a filter transformation on the Dataframe:
    auctionDataFrame.filter($"price" < 30).show()
Figure 1. Schematic representation of performing transformations and actions on a dataset
Applying transformations and actions to a dataset

Writing Data from file system

Using the same dataset, save all xbox items as a file in file system:

- You can use the filter($"item" === "xbox") filter and write.json or other options to save the result of the action to file system.

 auctionDataFrame.filter($"item" === "xbox").write.json("/apps/results/json/xbox")
This command creates the /apps/results/json/xbox directory in which you will see the JSON file(s) created. You can use the same command to create Parquet or any other file format:
auctionDataFrame.filter($"item" === "xbox").write.parquet("/apps/results/parquet/xbox")

Writing Data to HPE Ezmeral Data Fabric Database JSON

The first step when you are working with HPE Ezmeral Data Fabric Database JSON is to define a document _id that uniquely identifies the document.

Add a new _id field in the csv file and generate UUIDs to add to this field.

To load the Dataframe into the MapR-DB JSON:
 dataframe.saveToMapRDB("tableName", createTable = true, bulkInsert = false, idFieldPath = "_id")

The following commands will create a table and insert the data into: /apps/auction_json_table.

 import spark.implicits._
 import java.util.UUID
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.SaveMode
 import com.mapr.db.spark.sql._  // import the MapR-DB OJAI Connector
 val generateUUID = udf(() => UUID.randomUUID().toString) // create UDF to generate UUID
 // showing that you can create your own schema 
  val customSchema = 
  StructType(
   Array(
     StructField("actionid",StringType,true),
     StructField("bid",DoubleType,true), 
     StructField("bidtime",DoubleType,true),
     StructField("bidder",StringType,true),
     StructField("bidderrate",IntegerType,true),
     StructField("openbid",DoubleType,true),
     StructField("price",DoubleType,true),
     StructField("item",StringType,true),
     StructField("daystolive",IntegerType,true)
     )
   )            
You can now query the table using the HPE Ezmeral Data Fabric Database shell. Open a terminal on your cluster and run the following command:
$ mapr dbshell
maprdb mapr:> find /apps/auction_json_table --limit 10

Reading Data from HPE Ezmeral Data Fabric Database JSON

Now that you have the data in HPE Ezmeral Data Fabric Database JSON, you can create and query a Spark Dataframe using the following commands:
 import com.mapr.db.spark.sql._
 import org.apache.spark.sql.SparkSession
 val dataFromMapR = spark.loadFromMapRDB("/apps/auction_json_table")
 dataFromMapR.printSchema
 dataFromMapR.count
                  
 dataFromMapR.filter($"price" < 30).show()   // use a filter