Loading Data into a DataFrame Using an Explicit Schema

If you know the schema of your data, you can specify an explicit schema when loading a DataFrame.

The following example loads data into a user profile table using an explicit schema:

import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
              
val addressSchema = StructType(StructField("Pin", IntegerType) ::
                    StructField("city", StringType) ::
                    StructField("street", StringType) :: Nil)
              
val personSchema = StructType(StructField("_id", StringType) ::
                   StructField("first_name", StringType) ::
                   StructField("last_name", StringType) :: 
                   StructField("address", addressSchema) ::
                   StructField("interests", ArrayType(StringType)) :: Nil)
              
val df = sparkSession.loadFromMapRDB("/tmp/user_profiles", personSchema)
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
import org.apache.spark.sql.SparkSession;
              
StructField[] addressSchema = {
                               new StructField("Pin", IntegerType, true, Metadata.empty()),
                               new StructField("city",StringType, true, Metadata.empty()),
                               new StructField("street", StringType, true, Metadata.empty())
                              };
StructField[] schemaFields = {
   new StructField("_id", StringType, true, Metadata.empty()),
   new StructField("first_name", StringType, true, Metadata.empty()),
   new StructField("address", new StructType(addressSchema), true, Metadata.empty()),
   new StructField("interests", new ArrayType(StringType, true), true, Metadata.empty())
                             };
StructType personSchema = new StructType(schemaFields);
MapRDBJavaSession maprSession = new MapRDBJavaSession(sparkSession);
Dataset<Row> df = maprSession.loadFromMapRDB("/tmp/user_profiles", personSchema);
from pyspark.sql import SparkSession
              
addressSchema = [StructField("Pin", IntegerType(), True),
                 StructField("city", StringType(), True),
                 StructField("street", StringType(), True)]
schemaFields = [StructField("_id", StringType(), True),
                StructField("first_name", StringType(), True),
                StructField("last_name", StringType(), True),
                StructField("address", StructType(addressSchema), True),
                StructField("interests", ArrayType(StringType()), True)]
personSchema = StructType(schemaFields)
              
df = spark_session.loadFromMapRDB("/tmp/user_profiles", personSchema)

To create the DataFrame object named df, pass the schema as a parameter to the load call. Invoke the loadFromMapRDB method on a SparkSession object.

The resulting schema of the object is the following:

df.printSchema()
 ----------------------------------
 root
 |-- _id: String (nullable = true)
 |-- first_name: String (nullable = true)
 |-- last_name: String (nullable = true) 
 |-- address: Struct (nullable = true)
 |    |-- Pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
 |-- interests: array (nullable = true)
 |    |-- element: string (containsNull = true)

When specifying StructField in a schema, optionally specify whether the field is nullable. In the example above, all fields are nullable.

Depending on the nullability of the field in the schema and the existence of fields in the HPE Ezmeral Data Fabric Database table, the load returns an InvalidSchema exception in the following cases:

  • The schema contains a non-nullable field and the load attempts to put a NULL value into the field.
  • The schema contains a non-nullable field and the field does not exist in the HPE Ezmeral Data Fabric Database table.
  • The HPE Ezmeral Data Fabric Database table has fields that do not exist in the specified schema.