Loading Data from HPE Ezmeral Data Fabric Database as an Apache Spark Dataset

You can use one of three ways to load data from HPE Ezmeral Data Fabric Database into an Apache Spark Dataset:

  • Load the data into a Dataset.
  • Load the data into a DataFrame, and then convert it to a Dataset.
  • Load the data into a Dataset using a custom encoder.

Load into a Dataset

For loading as a Dataset, apply the following method on a SparkSession object:
def loadFromMapRDB[T](table: String, schema : StructType).as [T]: Dataset
              
import com.mapr.db.spark.sql._
              
val ds = sparkSession.loadFromMapRDB[T]("/tmp/user_profiles").as [T]: Dataset
For loading as a Dataset, apply the following method on a MapRDBJavaSession object:
def loadFromMapRDB[T <: java.lang.Object](tableName: String, schema: StructType, sampleSize: Double, clazz: Class[T]): Dataset[T]
              
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;

MapRDBJavaSession maprSession = new MapRDBJavaSession(sparkSession);
              
Dataset<Row> ds = maprSession.loadFromMapRDB("/tmp/user_profiles");       
NOTE
The only required parameter to the methods is tableName. All the others are optional.

Load into DataFrame and Convert to Dataset

To load the data as a DataFrame, see Loading Data from HPE Ezmeral Data Fabric Database as an Apache Spark DataFrame. To convert the DataFrame to a Dataset, use the as[<type>] method. The <type> can be any of the basic types in Scala.

The following code example creates a Dataset[Person] using the as[<type>] method:

import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
              
case class Address(Pin: Integer, street: String, city: String)
              
case class Person (_id:String, 
              first_name:String,
              last_name: String, dob: java.sql.Date,
              Interests: Seq[String, address: Address)
              
val ds = sparkSession.loadFromMapRDB[Person]("/tmp/user_profiles").as[Person]
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
              
public static class Address implements Serializable {
     private Integer pin;
     private String street;
     private String city;
     
     public Integer getPin() { return pin; }
     public void setPin(Integer pin) { this.pin = pin; }
     public String getStreet() { return street; }
     public void setStreet(String street) { this.street = street; }
     public String getCity() { return city; }
     public void setCity(String city) { this.city = city; }
}

public static class Person implements Serializable {
     private String _id;
     private String firstName;
     private String lastName;
     private Date dob;
     private Seq<String> interests;
              public String get_id() { return _id; }
              public void set_id(String _id) { this._id = _id; }
              public String getFirstName() { return firstName; }
              public void setFirstName(String firstName) { this.firstName = firstName; }
              public String getLastName() { return lastName; }
              public void setLastName(String lastName) { this.lastName = lastName; }
              public Date getDob() { return dob; }
              public void setDob(Date dob) { this.dob = dob; }
              public Seq<String> getInterests() { return interests; }
              public void setInterests(Seq<String> interests) { this.interests = interests; }
              }
Dataset<Person> ds = maprSession.loadFromMapRDB(tableName, Person.class);

Load into Dataset Using Custom Encoder

You can create a custom encoder for Java bean classes by calling the Encoders.bean method. Encoders.bean only support Java classes. To create a Dataset of the Scala class, the previous code can be used. The following example shows how to load into a Dataset by creating a custom encoder for a Java class named beanClass:

import org.apache.spark.sql.SparkSession        
import com.mapr.db.spark.sql._ 
            
val ds = sparkSession.loadFromMapRDB("/tmp/user_profiles")
         .as(Encoders.bean(beanClass))
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
            
maprSession.loadFromMapRDB("/tmp/user_profiles").as(Encoders.bean(beanClass);

Filter Pushdown

After you have loaded data into a Dataset, you can apply filter pushdowns. The following example filters on first_name:

ds.filter($"first_name" === "David")
ds.filter(col("first_name").equalTo("David")).show();

See Projection and Filter Pushdown with Apache Spark DataFrames and Datasets for other examples.