Jun 6, 2019 - Yuriy Drohobytskyi (Data Engineer)  

Spark Structured Streaming: customizing Kafka stream processing

Data engineers and spark developers with intermediate level of experience, who want to improve and expand stream processing techniques.

According to Spark documentation:

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. … In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming. New approach introduced with Spark Structured Streaming allows to write similar code for batch and streaming processing, simplifies regular tasks coding and brings new challenges to developers.

It is intended to discover problems and solutions which arise while processing Kafka streams, HDFS file granulation and general stream processing on the example of the real project for data ingestion into Hadoop.

Task

The task arisen is to ingest data into Hadoop from the outer sources. Service that collects data outside the Hadoop cluster retrieves data from the several Databases, transforms and enriches it and this finally compiled data is pushed into the Kafka topic. From the other side, Spark application listens to the Kafka topic and stores data into HDFS files. These files will be used later by the other services. ‘Outside’ service should run once per day - this is enough for the purposes of the task. This is the main concept of the system.

Let’s consider the Spark application which will read data from Kafka and store them into HDFS files. The typical streaming application should run infinitely and process data from Kafka as soon as they arrived. As for our particular task this is overkill. Hadoop cluster resources for Spark containers will be wasted for almost all the time, as the useful application load will last only from minutes to hours. So, the idea is to start an application at the specified time, wait till it consumes all records from Kafka, and put it down till next time (usually in about the day). But how to determine that service finished processing all data from Kafka? There is no and there could not be any marks that signal that for now that is all and no further messages will arrive till the next day.

Solution ideas

The idea how to solve this challenge is to wait proper time period and if during this period there are no messages in Kafka then we should stop waiting, make data post-processing and quit application till next scheduled time. In this case, we will not waste cluster resources.

Solution implementation

To implement the solution idea we can split all processing time into equal time periods and check if any new messages arrive during the last period. To guarantee that there will be no more data at this application run we may check N such periods and in the case when during all of them there were no messages we decide to stop the application. Let’s look on typical Spark streaming application that consumes data from Kafka and stores them into Parquet file in HDFS. In the real application there are more details that should be taken into consideration, but all of them is out of the article scope and will be discussed in the following topics. All the code runs on Spark v 2.3.2 currently, but it is fully compatible with the latest Spark 2.4.x version. The next code starts listening and consuming messages from Kafka topic:

val kafkaMessages: DataFrame = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", bootstrapServers)
 .option("subscribe", topicIn)
 .option("startingoffsets", "latest")
 .load()

bootstrapServers - is the address’s of Kafka brokers, topicIn - is the name of the topic, latest - we will wait only for the new messages in the topic. The last one is a completely wrong idea as in this case application should be always started before the service which produces data outside the Hadoop. Later we will describe the solution of this problem, but currently, it is not important. For further purposes we need message information as string, as well as Kafka message information: partition and offset. For this we cast the message from Kafka to the next model:

case class KafkaMessage(
   partition: Int,
   offset: Long,
   value: String
)

All fields here are self-explained.

val message = kafkaMessages.selectExpr("partition", "offset", "CAST(value AS STRING)").as[KafkaMessage]

Now message is the DataFrame which consists of the KafkaMessages. To write messages into file we can run the next stream:

val fileStream: StreamingQuery = message.writeStream
 .format("parquet")
 .outputMode("append")
 .trigger(Trigger.ProcessingTime(triggerInterval))
 .option("checkpointLocation", checkpointLocation)
 .option("path", outFilePath)
 .queryName(queryName)
 .start()

checkpointLocation - is the path for the Spark Streaming Checkpoint data to be stored in. This is necessary as Spark Streaming is fault-tolerant, and Spark needs to store its metadata into it. queryName - is the arbitrary name of the streaming query, outFilePath - is the path to the file on HDFS. triggerInterval - is the period of time during which the Spark micro-batch is compiled and then processed by parquet writer at a time. So, at the moment we have the stream that reads messages from Kafka and store them into HDFS file. Also we have time granularity of the stream. Following the idea of the solution, we should be able to check how many messages we consume during each time period, and if for N times we consumed no messages to stop the stream.

There is an interface to listen to events from the stream (in org.apache.spark.sql.streaming.StreamingQueryListener):

abstract class StreamingQueryListener {

  import StreamingQueryListener._

  /**
   * Called when a query is started.
   * @note This is called synchronously with
   *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]],
   *       that is, `onQueryStart` will be called on all listeners before
   *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]]. Please
   *       don't block this method as it will block your query.
   * @since 2.0.0
   */
  def onQueryStarted(event: QueryStartedEvent): Unit

  /**
   * Called when there is some status update (ingestion rate updated, etc.)
   *
   * @note This method is asynchronous. The status in [[StreamingQuery]] will always be
   *       latest no matter when this method is called. Therefore, the status of [[StreamingQuery]]
   *       may be changed before/when you process the event. E.g., you may find [[StreamingQuery]]
   *       is terminated when you are processing `QueryProgressEvent`.
   * @since 2.0.0
   */
  def onQueryProgress(event: QueryProgressEvent): Unit

  /**
   * Called when a query is stopped, with or without error.
   * @since 2.0.0
   */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit
}

To implement desired functionality we create the following listener:

class StreamQueryListener(val query: StreamingQuery, val maxEmptyTicks: Int = 3) extends StreamingQueryListener {

  private val queryId           = query.id
  private var currentEmptyCount = 0
  private var totalCount: Long  = 0

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    if (event.id == queryId) {
      !s"Query started. (id = $queryId)"
    }
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    if (event.progress.id == queryId) {
      !s"Query porgress. (id = $queryId)\n\tNumber of input rows = ${event.progress.numInputRows}, currentEmptyCount = $currentEmptyCount (total count = ${totalCount + event.progress.numInputRows})"
      event.progress.numInputRows match {
        case 0 =>
          currentEmptyCount += 1
          checkCounterLimit()
        case x =>
          currentEmptyCount = 0
          totalCount += x
      }
    }
  }
  private def checkCounterLimit(): Unit = {
    if (currentEmptyCount >= maxEmptyTicks) {
      !s"Query will be STOPPED! (id = $queryId)"
      query.stop()
    }
  }
  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    if (event.id == queryId) {
      !s"Query terminated. (id = $queryId)\n\tTotal rows processed= $totalCount"
    }
  }
}

We add this listener as follows:

spark.streams.addListener(new StreamQueryListener(fileStream, maxRetrives))

here maxRetrives - is the number of retrieves with no messages to wait until stop the stream.

The main logic is in onQueryProgress method. We look at event.progress.numInputRows value which equals to number of rows obtained during batch time window (set by .trigger(Trigger.ProcessingTime(triggerInterval))). If there are no messages in the stream we increment currentEmptyCount counter. When it reaches maximum allowed value then we can gracefully stop the stream by query.stop(). If we've got any messages during time window then we clear the counter and starts monitoring from the beginning. We also count the total number of processed messages here. (!"string" interpolator puts the string into logs.)

To complete the application workflow we wait for the stream to terminate:

fileStream.awaitTermination()

That's all for our task. We've got the streaming application which reads all data from Kafka topic and stops when the topic became 'empty'. It does the job for our scheduled task.

Manual Kafka offsets management.

As to our main goal to run streaming application scheduled (usually once per day) it is a completely bad idea to re-read the topic each time from the beginning. Normally we should start reading from the point we stopped last run. When we use .option("startingoffsets", "earliest") for the KafkaMessages we will always read topic messages from the beginning. If we specify starting offsets as "latest" - then we start reading from the end - this is also not satisfied as there could be new (and unread) messages in Kafka before the application starts. This need to be fixed. Let's consider how Spark manages offsets for Kafka stream consumer. There are several options here:

  1. Offset information could be stored by Kafka (usually Kafka uses Zookeeper for this). For this purpose each consumer should specify its own group.id and offsets are stored "per group". (This could be done automatically (autoCommit option), or manually). This option is completely useless with structure streaming API, as there is no possibility to specify group.id option (see. documentation). Spark will assign a different group.id for consumer each time application starts.
  2. The second option is what spark propose to do by default: offsets are stored in the directory called checkpoint. Also in this directory information about the output file writes is stored. Checkpoints used to store intermediate information for fault tolerance. In case of any kind of exceptional situation, i.e. container fault, JVM error or any other possible kind of error occurs, then the application will automatically recover from that point. This is a very powerful mechanism and it should be used for the critical applications. Nevertheless, there are pitfalls in this approach. First is (surprise!) that offsets are stored there too! We can not remove this folder - we will lose offset information. The second one - we can not remove output files. When we do this the next run of the application will end with the errors. As the information in the checkpoint will not match the output files. In our case, we'd like to have the possibility to remove the output files between subsequently application runs. The file will physically consist of the many parts (each part is the data obtained during one processing time window), so after application read all data from the topic we want to aggregate all files into one big file and delete the intermediate files. For this, we should also remove the checkpoint directory! This is the reason we should look for the other option to store Kafka offsets.
  3. Finally, we can manually store offsets and specify them when creating a stream. This demands more work but is the most flexible solution.

Implementation

There could be multiple different solutions. As one can see from above we defined case class KafkaMessage for received messages. It contains partition and offset information. So after we stored all messages in the files and we gracefully stopped the stream we can post-process the messages. The goal is to aggregate useful information into one big file and also store Kafka offsets for further usage. To split saved data into Offsets information and useful data we can do the following:

val offsets: DataFrame =
      fragmentedMsgs
        .select($"partition", $"offset")
        .groupBy($"partition")
        .agg(
          max($"offset").alias("offset")
        )

for the offsets information. And:

    val entities: Dataset[DataLakeEntity] = fragmentedMsgs
      .select(from_json(col("value"), DataLakeEntitySchemas.dataLakeEntitySchema).as("json_str"))
      .select($"json_str.*")
      .as[DataLakeEntity]

for our DataLakeEntity information. (We use Json for the messages in Kafka topic, this could be different for the other application, i.e. protobuf or other formats could be used). As for offsets, the work is almost done:

   val offsetsList = offsets.as[PartitionOffset].collect().toList

   if (offsetsList.nonEmpty) {
      // Store offsets somewhere, i.e.:
      offsetStore.insertOrUpdateOffsets(topicIn, offList)
   }

Now we need some storage for offsets. We can write them into Spark table, or in HBase DB or PostgreSQL DB etc. When starting the stream we should read offsets information back and pass it as option for the stream, like .option("startingoffsets", "latest"), but instead of the "latest" we should use special form, like:

{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}

This is self-explanatory and could be found in the documentation. (-1 is used for the 'latest', -2 - for the earliest).

Conclusion

The solution for reading data from Kafka stream that is filled once per day is described. The way how to manage stream flow and gracefully stop stream under specified conditions is proposed. Various ways to manage Kafka offsets during stream processing is considered.

Spark structured streaming programming guide

Title photo by: unsplash-logoCaleb Ralston