A Deep Dive into Custom Spark Transformers for ML Pipelines | CrowdStrike

2022-07-30 04:42:20 By : Ms. Sarah Zhu

CrowdStrike data scientists often explore novel approaches for creating machine learning pipelines especially when processing a large volume of data. The CrowdStrike Security Cloud stores more than 15 petabytes of data in the cloud and gathers data from trillions of security events per day, using it to secure millions of endpoints, cloud workloads and containers around the globe with the power of machine learning and indicators of attack.

When processing so much data, making use of modern Spark Pipelines is a powerful way to use off-the-shelf libraries to reduce boilerplate code and improve readability. Because these Transformers may not fit all use cases, it’s important to understand how to currently construct a custom Spark Transformer that integrates with Spark Pipelines and understand the components of Transformer. 

Note: For this blog, we assume usage of PySpark version 3.0+

Machine learning workflows generally consist of multiple high-level steps:

The code and structure of each step vary greatly and if inconsistently implemented, can affect readability and flexibility of a data scientist’s workflow. In addition, data scientists often reuse components of their workflow with slight modifications in repeated experiments. This is why commonly used frameworks like scikit-learn and Spark have created pipeline frameworks to more flexibly express and assemble common high-level workflows. 

Such frameworks give the user a consistent approach to build out the steps required to conduct experiments and are easy to extend. A less obvious advantage of this frame is the reduction of complexity for collaborators. Pipelines provide a common code structure which is more readable and thus reduces the barrier of entry into your codebase.

The following is a simple example of a dataset using a pipeline:

Because experiments should be reproducible, we may need to save information regarding the state of transformation and model hyperparameters. Without a pipeline, each transformer and model may need to be saved separately, and the order of transformation must be manually preserved. Using Spark Pipeline allows us to save the entire pipeline (including transformer states, order and hyperparameters) as a single object and reload easily. From an experimentation and engineering perspective, this reduces the ambiguity of experiment configurations and makes integrating the model and pipeline downstream more straightforward.

With improvements in flexibility and readability comes some additional work. We must conform our code with a structure that’s acceptable by modern pipelines. One very common set of tasks used in pipelines is the transform step of the ETL process, where we must take our raw data and pass it through a series of data transformation steps. The output of these transforms are vectors and labels used for model training. Though many manipulations on Spark Data can already be done through either native functions or Spark SQL, there are often custom transforms we must apply to every row of our data that require custom code. 

Let’s take for example a simple text manipulation Spark Dataset containing id, text and label  columns:

It is recommended that data transformation should be expressed as Spark SQL when possible due to its under-the-hood integration with Spark query optimizers and JVM. However, this is sometimes not possible with more complex transformations. In such cases, we can use Spark User Defined Function (UDF) to write our transformations. (Note that UDFs will always be slower than native Spark SQL.)

We’d like to apply a transform such that if we see the string spark, we will append an additional signal string to the end of the text. A simple way to apply this transform to each row is to write this function, then run it as a UDF:

Note that although this will apply the correct transform, there are a few inconveniences:

To make our transformation function both savable and loadable and usable as part of a Pipeline, we will inherit from the SparkML Transformer class along with a few mixins to ensure API conformity with SparkML. The converted custom transformer would look like the following:

Let’s break down some components of this wrapper and discuss each in detail:

Every custom transformer must at least inherit pyspark.ml.Transformer as the abstract base class.

We must also at the minimum override the _transform() function so that the Transformer knows how to transform out data. The input passed to  _transform() is the entire input Dataset including all the columns so we will need to retrieve the input and output columns (usually set by the constructor).

Now that we have the input dataset , input_column  name, and output_column  name, we can wrap our transformation function append_string(). Note that if the transformation function requires more than a single input, you will need to convert the function into one which accepts a single input. You can do this using a lambda function.

As part of constructing the custom transformer, we will need to generate pyspark.ml.param.shared.Param objects for each of the following:

Param   objects can be set to a value like normal variables but enable us to more easily read/and write them to/from file using Spark’s native methods. Generally these can be set at initialization with the constructor (__init__()). However, because we inherit from HasInputCol and HasOutputCol, the Param type member variables inputCol and outputCol respectively are created automatically for us. Thus we only need to create the append_str Param object. See the next section for more information on the mixins.

The typeConverter parameter here helps implicitly apply type conversions if the data type is different.

Inheriting mixins HasInputCol and HasOutputCol allow us to reduce the amount of boiler plate code we must write to create. HasInputCol will create a Param member variable for your custom transformer class called inputCol  that can then be set/retrieved/written to file. Same effect for HasOutputCol and the member variable outputCol. Additionally each mixin here will also initialize default values for their member variable.

Optionally, you can implement setInputCol()  and setOutputCol() to conform more closely with standard transformers available in SparkML.

There are also additional mixins that can be inherited if needed (e.g., a list of input columns or output columns). For more information, please refer to the pyspark API .

To correctly create a custom transformer, we must be able to store the inputs used to create the transformer. The inputs will be stored as Param type member variables within our custom transformer class. Let’s break down how this is done.

Here, @keyword_only  will store input keyword arguments ( inputCol, outputCol and append_str in our example) as an internal map inside of the Transformer (in a protected variable called _input_kwargs). After the input arguments are stored, we must manually set any custom variable (using _setDefault()) we pass in that isn’t part of the mixins we inherited from. Specifically, because we inherited from HasInputCol  and HasOutputCol, we do not need to manually set.  This will ensure we can safely retrieve the variables later using the inherited member function getOrDefault().  

Next we set the Param type member variables (by calling setParams()) using our map _input_kwargs so that we can correctly retrieve the true assigned values when we need them later. 

Finally, when we decide to retrieve the variables such as inputCol or append_str , we will need to make a call to getOrDefault() like self.getOrDefault(self.append_str). This is different from how we normally retrieve a variable in Python because each variable is a Param object. See definition for function getAppendStr() for more detail.

The final component of creating a custom transformer is to inherit traits DefaultParamsReadable and DefaultParamsWritable to allow us to correctly read to file and write from file both as part of a pipeline or by itself. These traits will read/write the Params we have created to file.

Not inheriting these traits may lead to errors like the following when attempting to save a customer transformer:

Once the custom transformer is built, it’s easy to attach the transformer to add this component to a pipeline. We will need to initialize our custom transformer by setting the correct input/output columns and the append string to use. Then we will add it as a stage to our pipeline. For example, if we extend the pipeline from section “Pipeline Framework” above, we will have:

As we can see, converting a custom processing function into a custom transformer step requires us to implement the pattern discussed in this post. Although there are some non-trivial components to wrapping functions, the pattern for this work is consistent so it can be applied to most processing functions. Additionally, custom transformers can then be used as part of a pipeline to further improve code readability and integration with native spark pipeline frameworks. Finally, setting up your processing functions as transformers allows us to save entire pipelines to disk, which can be more easily shared and used by collaborators down-stream of your workflow.

Sign up now to receive the latest notifications and updates from CrowdStrike.

Detect, prevent, and respond to attacks— even malware-free intrusions—at any stage, with next-generation endpoint protection.