FastIngest: Low-latency Gobblin with Apache Iceberg and ORC format


Since the containers are already provisioned (except for new container allocations needed for cluster expansion), job restarts in a Gobblin-on-Yarn cluster are lightweight and bounded by the time required by Helix to do task assignments.

Besides the replanner, the Kafka source is also enhanced to use historical statistics (such as rate of traffic production and average record size) to estimate the work size of each topic partition and evenly distribute work across Gobblin workers. The traffic production into a Kafka topic is tracked by hour of day and day of week, to take into account daily and weekly variances. The traffic estimates and the record sizes are tracked and updated by the data pipeline inside the Kafka extractor (which is responsible for polling records from Kafka) and written to the watermark store along with the offsets when data is flushed.

Direct ORC ingest
A key decision we made as we began working on the new pipeline architecture was to write the data out directly in ORC format. This decision was prompted by the fact that we wanted to bring the benefits of the ORC format, such as improved I/O efficiency and predicate pushdown capabilities, to the compute engines like Spark and Presto, without having to incur the additional latency of the Avro-to-ORC conversion pipeline.

Given the goal of writing data directly in ORC format with low latency, we performed benchmarking experiments to compare the write performance of different ORC writer implementations and against writer configurations. With the results of the experiments (which are consistent with other published studies investigating ORC write performance), we finally decided to use native ORC Writer without enabling dictionary encoding. We concluded that disabling dictionary encoding was an acceptable tradeoff between the writer performance and the query performance. The main reasons are two-fold:

  1. The data landed by FastIngest is re-materialized by a daily compaction job, which enables dictionary encoding when writing out ORC data.
  2. Only a small fraction of the data queried by a user (i.e., the “recent” data) will typically be in a sub-optimal format. 

Continuous data publish
Batch pipelines, which have a finite work size, publish data when tasks finish, fetch the latest schema at the start of the job, and use this schema to write out the data on HDFS. In contrast, the streaming pipeline, which has long-running tasks, must be able to publish data periodically and handle schema changes dynamically.

To this end, we introduced the concept of “Control Messages,” which are injected into the event stream and passed down the Gobblin construct chain. Each Gobblin construct (such as the converter and the writer) is enhanced to handle these messages when operating in the streaming mode. 

In the case of a flush, which happens periodically (every five minutes at LinkedIn, based on our latency settings), the Kafka extractor injects a “Flush” control message at regular intervals as defined by the ingestion SLA. This message is injected as a record in the Kafka events stream, and when the downstream writer sees the control message, it closes the current file and acknowledges the control message. After the control message is “acked,” the extractor publishes the data files and then the checkpoint offsets.

To handle schema changes, we have a special converter that tracks the latest schema from Kafka Schema Registry. In addition, it caches all the schemas it has seen so far since the beginning of the current execution. When seeing a record with schema change, it updates its latest schema and sends a “MetadataUpdate” control message to signal the downstream writer to close the file. If it receives a record with an old schema, it up-converts the record to the latest schema. 

Monitoring and alerting
The data ingestion pipeline is instrumented to emit tracking events into Kafka, which are used to monitor the health of the ingestion for each topic partition. The tracking events, which are emitted by each Gobblin task, contain information such as low/high watermark, bytes consumed, record counts, and ingestion latency. In addition, the events contain information, such as the Yarn container ID, that allows pipeline operators to quickly locate a problematic container. Also, each Yarn container reports container health metrics like process CPU load, overall system load, memory usage, and GCcounts and durations. A separate Samza-based monitoring pipeline is used to consume these tracking events, populate a Superset dashboard, and alerts on pipeline health issues. The figure below describes the overall architecture of the monitoring sub-system.



Source link