Extending Spark pipeline
For some time now Spark has been offering a Pipeline API (available in MLlib module) which facilitates building sequences of transformers and estimators in order to process the data and build a model. Moreover, Spark MLlib module ships with a plethora of custom transformers that make the process of data transformation easy and painless. But what happens if there is no transformer that supports a particular use case?
What are Transformer and Estimator?
Spark MLlib exposes a Pipeline
API, which facilitates a number of actions a developer may want any application to perform in order to prepare data for building a machine learning model (such as feature encoding, indexing, etc.) and producing a model itself. The Pipeline
accepts an array that consists of a number of Transformer
and Estimator
instances. There is a profound difference between these two entities, namely:
Transformer
is an instance that facilitates either feature transformation or model learning. The most important method anyTransformer
exposes istransform
which takes aDataSet[_]
and returns aDataFrame
. The returnedDataFrame
usually contains additional columns that represent a transformed feature or prediction result.Estimator
is responsible of transforming an input into model that can be utilized to train the data. It exposesfit
method which accepts aDataFrame
and returns aTransformer
that hosts a model.
The Pipeline
wraps a series of Transformer
and Estimator
instances and executes them in a given order.
There is a great reference to the whole Pipeline
process on Spark documentation site.
The use case
Let’s imagine we want to train a multiclass classification model based on Random Forest implementation in Spark. We have a number of different features, most of them categorical, represented by strings. In order to transform the data and feed it to a RandomForestClassifier
instance we need to go through a number Transformer
instances first, e.g. StringIndexer
and VectorAssembler
.
The problem that may emerge in the use case is that if we increase the number of classes the data can be assigned to the computation complexity raises and (if the resources are scarce) we can quickly run into OutOfMemory exception.
There are multiple techniques and optimization tricks that can prevent the application from OOM, but let’s assume that we want to subsample the data so it reduces the computation time and minimizes the risk of OOM.
Fortunately, RandomForestClassifier
contains a method that can do subsampling – subsamplingRate
. We can set subsampling ratio to quite small numbers, e.g. 0.3, if we feel comfortable with it. By doing so we reduce the number of rows in a dataset significantly, decrease the time needed to train the model and reduce the risk of OOM. Can we now fire the application and forget about it?
Not at all. With a high number of classes in a dataset there is a significant probability that the overall distribution of labels is skewed. In other words, we may discover that there is a long tail of underrepresented classes.
There are some techniques that minimize the skewness of data, e.g. resampling the underrepresented classes, but we are more interested in keeping the distribution as it is originally. Are we certain that subsamplingRate
is aware of the target class distribution and prevents some labels from being dropped whatsoever? Well, no.
Eventually, we make the decision to take some action to tackle the issue and introduce our own subsampling technique that will take into account the distribution of the target feature.
Attempt I – subsampling outside the Pipeline
There is a method available on any DataFrame
that reduces the number of rows – sample
. So the easiest way to limit the size of each class would be to split the DataFrame into many small data frames equal to the number of classes, subsample each of them, and perform union
. The code that performs the whole operation may look like that:
There are some disadvantages to this approach. Firstly, depending on number of classes, each dataframe in the resulting sequence is reshuffled into desired number of partitions, which means that after merging all those dataframes with union
the overall number of partitions for the final dataset may increase significantly.
Secondly, if we set sampling ratio to a significantly small number and the dataset contains some classes that are represented by only few entities we may lose these classes whatsoever.
Thirdly, let’s imagine that the model is going to be passed somewhere else with categorical features represented by indices produced by StringIndexer
. If we run the application multiple times and sample the data randomly the mapping between the indices and labels they represent will be lost. We may come to the conclusion that it would be best if we sampled the data after it has been indexed, inside the pipeline.
Attempt II – eating a cake and having it too
In order to solve our issues we need to change two things:
- create a set of operations that consciously sample the data without increasing the number of partitions,
- make this set perform all operations as a step in a pipeline.
Let’s address the first issue now.
Spark SQL to the rescue
If you have been working with some SQL, either plain or any specific implementation, you may know that there is a set of window functions (sometimes called analytical) that perform operations on an assigned window over data. In order to tackle our first issue we will resort to a bunch of these functions implemented in Spark (HiveContext). Let’s dive straight into the code.
The example above is pretty much self-explanatory, yet let’s dissect it into pieces. In the first step there is one additional column created that contains a random double between 0 and 1 (0, 1]. All the numbers generated with this function should be unique. The second step creates yet another column that holds the percent rank over each class in the target feature ordered by a random number column we have generated in a previous step.
This transformation ensures that there is always an entity with a percent rank equal to 0, which facilitates keeping at least one entity if the number of observations for a certain class and subsampling ratio are both extremely low. Finally, the subsampling ratio is applied to the column with percent rank numbers and both auxiliary columns are dropped.
With the use of SQL functionality available in Spark we have managed to consciously subsample the columns without unnecessary partition proliferation.
Now it’s time to implement it as a Transformer
in order to inject it into a Pipeline
.
Would you like me to wrap it up for you?
There are two ways of creating a custom transformer, either by extending your class with a UnaryTransformer
or just Transformer
abstract class. In this post, I would like to discuss the latter. I will use Transformer
from Spark 2.0.X as its implementation is a little bit different than in previous versions of Spark.
In order to build a transformer we need to override three methods required by it’s ancestors: transform
, transformSchema
and copy
. In our use case the implementation is a bit simpler than usually as we do not change the schema of the dataset.
Let’s start from declaration:
As you can see we specify a column name of a target feature, sampling ratio and a replacement flag in case we want to sample with replacement. Additionally, we implicitly pass SparkSession
, a common entry point for Spark application available since Spark 2.0, in order to perform SQL-like operations. Bear in mind that while instantiating SparkSession
you need to enable Hive support by invoking enableHiveSupport
method.
Now, let’s implement the core method – transform
:
Here you are! The implementation is almost identical to one of our previous snippets.
We won’t discuss implementation of tranformSchema
method. In our case the method just returns the schema it receives as an input. Consider it a plug for a dent.
The last method copy
can be implemented by passing its input attributes to defaultCopy
function exposed by Spark ml
module.
We should now have a fully functional Transformer
that can extend any pipeline should it be necessary.
Summary
I hope I have managed to convince you that building a custom Transformer
is nothing difficult and that by doing so we can easily extend any pipeline that modifies and processes data.
I have omitted some crucial things regarding sampling that might come to your mind as the purpose of this post was to present the way to build a custom Transformer
rather than stay statistically correct. I hope you will forgive me that one day!
Links
Do you like this post? Want to stay updated? Follow us on Twitter or subscribe to our Feed.
See also