Implement Custom Spark Evaluators in StreamSets

This post presents a step-by-step guide on how to implement custom Spark Evaluators in StreamSets.

The Spark Evaluator performs custom processing within a pipeline based on a Spark application. You can only use the Spark Evaluator processor in cluster pipelines that process data from a Kafka or MapR cluster in cluster streaming mode. For example, a pipeline in Cluster Yarn Streaming execution mode with Kafka Consumer origin is processed by the Data Collector in a spawned Spark Streaming application. Such pipeline can benefit from Spark Evaluators to transform the records with custom-developed code, allowing us to implement much complex transformation logic not possible using the provided Processors.

Implementing the SparkTransformer

The main class of your Spark Evaluator must extend the SparkTransformer abstract class and implement its init, transform and destroy methods.

The init method is optional and is called once when the pipeline starts to read arguments that you configure in the Spark Evaluator processor. You can use this method to create any connections to external systems.

void init(JavaSparkContext context, List<String> parameters)
// OR
void init(SparkSession session, List<String> parameters)

The transform method is required and is called for each batch of records that the pipeline processes. The Spark Evaluator processor passes the batch data with RDD of Records parameter.

TransformResult transform(JavaRDD<Record> recordRDD)

The destroy method is also optional and should be used to close any connections to external systems initialized in init method, since it is called when the pipeline stops.

void destroy()

Example

Before implementing your custom Spark Evaluator, you’ll need to include the required dependency in your project:

<dependency>
    <groupId>com.streamsets</groupId>
    <artifactId>streamsets-datacollector-spark-api</artifactId>
    <version>3.7.2</version>
    <scope>provided</scope>
</dependency>

The code snippet below represents a working example of a custom Spark processor, implemented by extending the SparkTransformer abstract class and overriding the init and transform methods.

package com.xpandit.bdu

import java.util.regex.Pattern

import com.streamsets.pipeline.api.Field.Type
import com.streamsets.pipeline.api.{Field, Record}
import com.streamsets.pipeline.spark.api.{SparkTransformer, TransformResult}
import com.xpandit.bdu.commons.core.Loggable
import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}

import scala.collection.JavaConverters._

class ValidateFieldTransformer extends SparkTransformer with Serializable with Loggable {

  var sparkContext: SparkContext = _
  var emptyRDD: JavaRDD[(Record, String)] = _
  var fieldRegexMap: Map[String, Pattern] = _

  override def init(javaSparkContext: JavaSparkContext, parameters: java.util.List[String]): Unit = {
    sparkContext = javaSparkContext.sc

    fieldRegexMap = parameters.asScala.map { param =>
      val sepIndex = param.indexOf(' ')
      val fieldName = param.substring(0, sepIndex)
      val regex = param.substring(sepIndex + 1)
      INFO(s"Got: '$fieldName' -> '$regex'")
      fieldName -> Pattern.compile(regex, Pattern.CASE_INSENSITIVE)
    }.toMap
  }

  override def transform(javaRDD: JavaRDD[Record]): TransformResult = {
    val rdd = javaRDD.rdd
    val errors = sparkContext.emptyRDD[(Record, String)]

    val fieldRegexMapLocal = fieldRegexMap   // Using directly 'fieldRegexMap' results in this class instance serialization

    val transformedRDD = rdd.map { record =>
      val rootField = record.get.getValueAsMap
      fieldRegexMapLocal.foreach { case (fieldName, pattern) =>
        if(rootField.containsKey(fieldName) && rootField.get(fieldName).getType == Type.STRING){
          val isValid = pattern.matcher(rootField.get(fieldName).getValueAsString).matches()
          rootField.put("valid" + fieldName.capitalize, Field.create(isValid))
        }
      }
      record
    }

    new TransformResult(transformedRDD.toJavaRDD(), new JavaPairRDD[Record, String](errors))
  }
}

This processor receives through the init method parameters strings containing a field name together with the RegEx that should be validated, which will be provided through the Data Collector UI as we will soon see. E.g. email ^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,6}$

The init method uses the received parameters to populate the Map[String, Pattern] collection containing a compiled representation of a regular expression for every field. Also init saves the instance of SparkContext contained within the received JavaSparkContext object. Saving its instance allows us to create empty RDDs in the transform method.

The transform method receives a JavaRDD[Record] parameter, where JavaRDD is just a wrapper around an RDD, used in Spark’s Java API. The Record contained in it is the internal class StreamSets uses to represent each of the processed records.

A Record contains a Header with attributes and a root Field containing the record’s data. Header’s attributes are represented by a key/value map collection and contain a set of origin or user defined entries. The Field is a data structure construct consisting of a type/value pair. It supports basic types (i.e. numbers, strings, dates) as well as structure oriented types (i.e. Maps and Lists).

Depending on the configured origin and its data format, the data is represented differently within the Record’s root Field. For example, with a Kafka Consumer and JSON data format as origin, the processed JSON is represented in StreamSets as follows:

{
  "email": "testing@gmail.com", 
  "digits" : "123ups456"
}

In transform implementation all the processing logic happens inside the rdd.map Spark transformation. For each record you get its root Field and validate the entries contained in it with the stored regular expression Patterns. For every checked field we add another one containing a boolean with the RegEx matching result:

Notice that the SparkContext instance is used to create an empty RDD for errors, which together with the transformed RDD are passed as parameter to create an instance of TransformResult, returned by the transform method. The transformed RDD will be then passed to the next pipeline stage and the error RDD processed according to the Spark Evaluator processor configuration.

Packaging the SparkTransformer implementation

Your custom Spark Evaluator implementation should be bundled in a JAR package.

If you use Maven to build your project, by default the dependant JARs are not packaged. In case you use any external libraries in your custom Spark Evaluator, you will need to create a fat/uber JAR with Maven Assembly Plugin.

An important detail not mentioned anywhere in StreamSets documentation and examples is that the included streamsets-datacollector-spark-api dependency should have provided scope, since it is already included by the Data Collector, otherwise the pipeline will crash in runtime with:

java.lang.RuntimeException: Validate errors when loading services: [Service interface com.streamsets.pipeline.api.service.dataformats.DataFormatParserService have multiple implementations., Service interface com.streamsets.pipeline.api.service.dataformats.DataFormatGeneratorService have multiple implementations., Service interface com.streamsets.pipeline.api.service.dataformats.DataFormatGeneratorService have multiple implementations., Service interface com.streamsets.pipeline.api.service.dataformats.DataFormatParserService have multiple implementations.]
	at com.streamsets.datacollector.stagelibrary.ClassLoaderStageLibraryTask.validateServices(ClassLoaderStageLibraryTask.java:683)
	at com.streamsets.datacollector.stagelibrary.ClassLoaderStageLibraryTask.initTask(ClassLoaderStageLibraryTask.java:295)
	at com.streamsets.datacollector.task.AbstractTask.init(AbstractTask.java:62)
	at com.streamsets.datacollector.task.CompositeTask.initTask(CompositeTask.java:44)
	at com.streamsets.datacollector.task.AbstractTask.init(AbstractTask.java:62)
	at com.streamsets.datacollector.task.TaskWrapper.init(TaskWrapper.java:40)
	at com.streamsets.datacollector.main.Main.lambda$doMain$1(Main.java:118)
	at java.security.AccessController.doPrivileged(Native Method)
	at com.streamsets.datacollector.security.SecurityUtil.doAs(SecurityUtil.java:92)
	at com.streamsets.datacollector.main.Main.doMain(Main.java:153)
	at com.streamsets.datacollector.main.DataCollectorMain.main(DataCollectorMain.java:55)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.streamsets.pipeline.BootstrapMain.main(BootstrapMain.java:307)

To check that the fat JAR was generated correctly, you can list out the content with:

jar -tf streamsets-spark-transformer-1.0.jar

Configuring the Spark Evaluator

After packaging the processor implementation in a JAR, you can configure a pipeline using your custom Spark Evaluator. This tutorial focuses exclusively on Spark Evaluator, so the configuration of pipeline origin and destinations are out of our scope. The steps below contain instructions to configure a Spark Evaluator:

1 - Create a pipeline with Kafka Consumer as origin, Spark Evaluator as processor and any destination of your choice.

2 - Click on Spark Evaluator and in General tab choose the Stage Library compatible with our Cluster Yarn Streaming execution mode (an error pops-up if you choose a non-compatible library) and if possible which matches the Spark 2 version installed in our cluster.

3 - Add the transformer implementing JAR to the External Libraries.

Unfortunately, the tested 3.7.2 version of StreamSets has a known bug which does not allow to upload the library on Spark Evaluator’s configuration. Clicking on the upload icon simply has no effect.

The library can still be uploaded using the Package Manager, by choosing the same Stage Library and providing the implementation JAR, see Install External Libraries.

After uploading the JAR the Data Collector should be restarted. Another problem I noticed is that the restart is not performed by clicking on Restart Data Collector. The Data Collector instance stops and is not started, until you start it manually.

Removing a previously uploaded JAR also seems not possible because the checkbox seems not be working as well (ugh!), but uploading a JAR with the same name replaces the existing one.

Additionally, it is also possible to directly replace the JAR in its directory. Removing the library also works as workaround, since it is correctly reflected in the UI.

[root@nos3 sdc-extras]# tree
.
└── streamsets-datacollector-cdh_spark_2_1_r1-lib
    └── lib
        └── streamsets-spark-transformer-1.0.jar

4 - Check that the External Library is available in Spark Evaluator configurations, if it is not, then you may’ve uploaded a JAR to a non-matching Spark Evaluator’s Stage Library.

5 - In Spark tab provide the package of the class implementing the Spark Transformer and the Init Method Arguments your class is expecting. Out tutorial implementation receives strings containing a field name together with the RegEx that should be validated.

6 - Finally, you can validate if the Spark Evaluator works as expected by running the Preview feature. In the example below you can see highlighted in green the added Fields containing the RegEx validity.

Leave a comment