Loading Data from HPE Ezmeral Data Fabric Database as an Apache Spark Dataset
You can use one of three ways to load data from HPE Ezmeral Data Fabric Database into an Apache Spark Dataset:
- Load the data into a Dataset.
- Load the data into a DataFrame, and then convert it to a Dataset.
- Load the data into a Dataset using a custom encoder.
Load into a Dataset
SparkSession
object:
def loadFromMapRDB[T](table: String, schema : StructType).as [T]: Dataset
import com.mapr.db.spark.sql._
val ds = sparkSession.loadFromMapRDB[T]("/tmp/user_profiles").as [T]: Dataset
MapRDBJavaSession
object:
def loadFromMapRDB[T <: java.lang.Object](tableName: String, schema: StructType, sampleSize: Double, clazz: Class[T]): Dataset[T]
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
MapRDBJavaSession maprSession = new MapRDBJavaSession(sparkSession);
Dataset<Row> ds = maprSession.loadFromMapRDB("/tmp/user_profiles");
Load into DataFrame and Convert to Dataset
To load the data as a DataFrame, see Loading Data from HPE Ezmeral Data Fabric Database as an Apache Spark DataFrame. To convert the DataFrame
to a Dataset, use the as[<type>]
method. The <type>
can be any of the basic types in Scala.
The following code example creates a Dataset[Person]
using the
as[<type>]
method:
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
case class Address(Pin: Integer, street: String, city: String)
case class Person (_id:String,
first_name:String,
last_name: String, dob: java.sql.Date,
Interests: Seq[String, address: Address)
val ds = sparkSession.loadFromMapRDB[Person]("/tmp/user_profiles").as[Person]
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
public static class Address implements Serializable {
private Integer pin;
private String street;
private String city;
public Integer getPin() { return pin; }
public void setPin(Integer pin) { this.pin = pin; }
public String getStreet() { return street; }
public void setStreet(String street) { this.street = street; }
public String getCity() { return city; }
public void setCity(String city) { this.city = city; }
}
public static class Person implements Serializable {
private String _id;
private String firstName;
private String lastName;
private Date dob;
private Seq<String> interests;
public String get_id() { return _id; }
public void set_id(String _id) { this._id = _id; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
public Date getDob() { return dob; }
public void setDob(Date dob) { this.dob = dob; }
public Seq<String> getInterests() { return interests; }
public void setInterests(Seq<String> interests) { this.interests = interests; }
}
Dataset<Person> ds = maprSession.loadFromMapRDB(tableName, Person.class);
Load into Dataset Using Custom Encoder
You can create a custom encoder for Java bean classes by calling the
Encoders.bean
method. Encoders.bean
only support Java classes. To create
a Dataset of the Scala class, the previous code can be used. The following example shows how
to load into a Dataset by creating a custom encoder for a Java class named
beanClass
:
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
val ds = sparkSession.loadFromMapRDB("/tmp/user_profiles")
.as(Encoders.bean(beanClass))
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
maprSession.loadFromMapRDB("/tmp/user_profiles").as(Encoders.bean(beanClass);
Filter Pushdown
After you have loaded data into a Dataset, you can apply filter pushdowns. The following
example filters on first_name
:
ds.filter($"first_name" === "David")
ds.filter(col("first_name").equalTo("David")).show();
See Projection and Filter Pushdown with Apache Spark DataFrames and Datasets for other examples.