Projection and Filter Pushdown with Apache Spark DataFrames and Datasets
Projection and filter pushdown improve query performance. When you apply the
select
and filter
methods on DataFrames and Datasets, the
HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark pushes these elements to HPE Ezmeral Data Fabric Database where
possible.
Projection Pushdown
Projection pushdown minimizes data transfer between HPE Ezmeral Data Fabric Database and the Apache Spark engine by omitting unnecessary fields from table scans. It is especially beneficial when a table contains many columns.
When you invoke the following select
method on a DataFrame, the connector
pushes the projection:
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
val df = sparkSession.loadFromMapRDB("/tmp/user_profiles")
df.select("_id", "first_name", "last_name")
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
MapRDBJavaSession maprSession = new MapRDBJavaSession(sparkSession);
Dataset<Row> df = maprSession.loadFromMapRDB("/tmp/user_profiles");
df.select("_id", "first_name", "last_name");
from pyspark.sql import SparkSession
df = spark_session.loadFromMapRDB("/tmp/user_profiles")
df.select("_id", "first_name", "last_name")
The equivalent example using Datasets is as follows:
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
val ds = sparkSession.loadFromMapRDB[Person]("/tmp/user_profiles").as[Person]
ds.select("_id", "first_name", "last_name")
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
MapRDBJavaSession maprSession = new MapRDBJavaSession(sparkSession);
Dataset<Row> ds = maprSession.loadFromMapRDB("/tmp/user_profiles", Person.class);
ds.select("_id", "first_name", "last_name");
Filter Pushdown
Filter pushdown improves performance by reducing the amount of data passed between HPE Ezmeral Data Fabric Database and the Apache Spark engine when filtering data.
Consider the following example:
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
val df = sparkSession.loadFromMapRDB("/tmp/user_profiles")
df.filter("first_name = 'Bill'")
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
MapRDBJavaSession maprSession = new MapRDBJavaSession(spark);
Dataset<Row> df = maprSession.loadFromMapRDB("/tmp/user_profiles");
df.filter("first_name = 'Bill'")
from pyspark.sql import SparkSession
df = spark_session.loadFromMapRDB("/tmp/user_profiles")
df.filter("first_name = 'Bill'")
The HPE Ezmeral Data Fabric Database OJAI
Connector for Apache Spark pushes the filter firstName = 'Bill'
down to
HPE Ezmeral Data Fabric Database.
The equivalent example using Datasets is as follows:
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
val ds = sparkSession.loadFromMapRDB[Person]("/tmp/user_profiles").as[Person]
ds.filter($"first_name" === "Bill")
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
Dataset ds = maprSession.loadFromMapRDB("/tmp/user_profiles").as(Encoders.bean(Person.getClass()));
ds.filter(col("first_name").equalTo("Bill"));
The following DataFrame filters those rows in which first_name is either
"David"
or "Peter"
:
df.filter($"first_name" === "David" || $"first_name" === "Peter")
df.filter(col("first_name").equalTo("David").or(col("first_name").equalTo("Peter")))
df.filter((col("first_name") == "David") | (col("first_name") == "Peter"))
The following
DataFrame retrieves only the rows in which the first_name is "David"
and
the last_name is "Jones"
:
df.filter($"first_name" === "David" && $"last_name" === "Jones")
df.filter(col("first_name").equalTo("David").and(col("last_name").equalTo("Jones")))
df.filter((col("first_name") == "David") & (col("last_name") == "Jones"))
not
condition to return rows where the first_name is not
"David"
and the last_name is not
"Peter"
: df.filter(not($"first_name" === "David || $"last_name" === "Peter"))
df.filter(not(col("first_name").equalTo("David").or(col("last_name").equalTo("Peter"))))
df.filter(~((col("first_name") == "David") | (col("last_name") == "Peter")))
The HPE Ezmeral Data Fabric Database OJAI Connector pushes down all of the filters shown in the earlier examples. It
can push down the following types of filters, provided that the field is not an
Array
or Map
:
- Equal To (
=
) - Not Equal To (
!=
) - Less Than (
<
) - Less Than or Equal To (
<=
) - Greater Than (
>
) - Greater Than or Equal To (
>=
) - In Predicate (
IN
) - Like predicate (
LIKE
) AND
,OR
NOT
Restrictions
Pushdowns with DataFrames and Datasets are not supported in the following scenarios:
- Filters on complex types, including arrays, maps, and structsFor example, a filter on a field in a map, as shown in the following example, is not pushed down:
df.filter($"address.city" === "Milpitas")
df.filter(col("address.city").equalTo("Milpitas"));
df.filter(col("address.city") == "Milpitas")
- Filters with functions
sizeof
,typeof
, andmatches
Spark SQL does not support these functions.
- Projections on complex types, including arrays, maps, and structsFor example, if you select an element of an array, as shown in the following example, it is not pushed down:
ds.select($"hobbies" (0))
df.select(col("hobbies").getItem(0));
df.select(col("hobbies").getItem(0))