Transport: Towards Logical Independence Using Translatable Portable UDFs


In a recent blog post, we touched upon Dali’s new architecture, which is designed to make data and logic seamlessly accessible and shareable across LinkedIn’s diverse environments. Dali achieves this vision by adopting the principles of physical and logical independence. Physical independence refers to enabling users to transparently access data regardless of its physical location, storage format, partitioning strategy, or even sharding strategy across multiple clusters. Logical independence refers to making Dali user-defined logic available in any engine or data processing framework, regardless of its language interface or data processing capability. As a layer for sharing data and logic, Dali strives to achieve these goals while being dedicated to providing virtually the same performance guarantees that the mission-critical systems it serves have.

This blog post is the first in a series of articles where we detail the steps we are taking to realize this vision. In this post, we shed light on one of the most difficult, yet interesting, pieces for achieving logical independence: Translatable and Portable User-Defined Functions, or Transportable UDFs.

Dali logic is expressed in the form of SQL views. A SQL view is a set of logical transformations on a set of input datasets (which could be base datasets or other views). Dali views are extensively used across LinkedIn to summarize and aggregate data, clean noisy data, extract interesting or relevant information, apply privacy filters, or combine data from different sources to create insights and business value out of raw data. UDFs are used extensively in views to apply transformations that cannot be expressed using SQL expressions alone, and usually involve quite a bit of complex operations expressed in an imperative language such as Java. True logical independence can only be achieved if both the view logic, and UDF definitions are portable across engines. While relational algebra expressions used to express view transformations can be mapped to different declarative language interfaces of different data processing engines, that is not the case with UDF definitions, which are imperative and opaque, and hence the challenge.

UDF APIs differ vastly among data processing engines, since those APIs have to take into account the internal data representation of choice for each engine, and have to provide a way to connect that data representation to the relational schema. This variation introduces a burden to application developers who have to learn the UDF API and the internal data model of each engine, and then re-implement the same logic, but using different APIs, once there is a need to move the logic from one engine to another, or even to share the same logic across engines. This introduces what we call UDF denormalization, i.e., harmful redundancy that negatively impacts productivity and craftsmanship.

Given these challenges, the question becomes: How can we enable our users to write user-defined function definitions once, and reuse them in any engine without sacrificing performance? To solve this challenge, we decided to pursue the journey of translatable, portable user-defined functions, a framework we refer to as Transport. While it seemed like a crazy and fuzzy idea at the beginning, fast forward to today, we have a number of users already employing those functions in production pipelines, we are covering three engines (Hive, Presto, and Spark), and a data format (Avro), and are expanding the set of engines the framework can be integrated with.

Transport is an API and a framework. Users implement their UDFs in the Transportable UDF API, and the framework transforms those UDFs into native UDFs of a variety of target engines. For example, a user can write a single implementation for their UDF against the Transport UDF API, and the framework can transparently convert the UDF to a native Hive UDF, as if the user has initially written it as a Hive UDF. Now, if the user wants to run this UDF in another engine, no problem! For example, if the user wants to run the same UDF as part of a Presto query, the framework can also transparently convert the UDF to a native Presto UDF, as if the user has initially written it as a Presto UDF.

Before describing how Transportable UDFs work, we first explore the motivation behind the idea by describing two phenomena that exhibit today’s UDF situation: UDF API disparity and UDF API complexity.

UDF API disparity

There are many engines in the data processing world, and each comes with its own set of features that make it suitable for certain use cases. Similarly, each engine comes with its own UDF API that differs considerably from one engine to another. In this section, we detail some of those differences, focusing on UDF APIs of three popular engines: Hive, Presto, and Spark.

UDF type validation and inference
UDF APIs usually offer their users some means to specify which data types the UDF expects (i.e., type validation), and how the output type of the UDF is related to its input types (i.e., type inference). Some UDF APIs, such as Presto’s, use type signatures to declare what types the UDF expects, while others, such as Hive and Spark, expect the user to write imperative code to express type validation and inference, by traversing the given type object tree.

Engine’s underlying data model
Different platforms use different data models to represent the data being processed in their execution engines, and therefore expose those data models directly to their UDF APIs as well. For example, Presto UDFs use Presto “Type” objects, along with Block, Slice, long, boolean, double, etc. data types, to describe data in those objects, while Hive UDFs use ObjectInspectors along with Objects. Similar differences exist for other engines too.

The UDF definition API
Additionally, the way users are expected to define UDFs is by implementing APIs that differ from engine to engine. For example, Presto UDFs use special type annotations to declare that a class represents a UDF. Hive UDF classes extend the GenericUDF Hive abstract class, while Spark UDFs implement either the Spark SQL UDF API, or alternatively, the Expression API.

UDF API features
Finally, not all UDF APIs offer the same set of features. For example, Hive UDFs offer hooks to add files to the MapReduce distributed cache, allowing UDFs executed on different workers to process those files. Such a feature does not exist in Presto or Spark UDFs. Even when a feature exists in multiple engines, the way it is expressed in the API can sometimes be fundamentally different. Presto UDFs allow users to declare what UDF arguments are nullable (i.e., can receive null values; otherwise the UDF automatically returns null), while Hive and Spark UDFs delegate null support to the user. Presto implements UDF overloading (i.e., the UDF expecting more than one signature) by implementing the UDF class a number of times while using the same name. In Hive and Spark, users use the same class, but manually check whether the input types conform to one of the expected types.

UDF API complexity

Current engine APIs come with varying degrees of complexity and expectations for the users’ skill sets (i.e., UDF developers). For example, Hive UDFs require users to understand the Object + ObjectInspector model (ObjectInspectors are metadata that describe the type of data stored in the corresponding Objects being processed by the UDF). Users are expected to capture the input ObjectInspectors upon UDF initialization, and inspect them level by level in a tree traversal fashion to validate that they conform to the expected types. Furthermore, users are expected to bind arguments explicitly by capturing subtrees of the input ObjectInspectors and creating new ObjectInspector trees out of them to return as the output ObjectInspectors. Let us say that a UDF expects a Map from a String to an Array of some type K, and returns a Map from K to K. Let us say at query execution time, the UDF is invoked by a Map from a String to an Array of Integers. The following Object inspector tree is passed to the UDF initialization method:



Source link