Advanced schema management for Spark applications at scale

Schema management workflow

With the scale of LinkedIn’s data processing ecosystem, it is mandatory to provide a solution that imposes minimal overhead on the data processing application development lifecycle. In addition to addressing the challenges stated in the previous section, we had to ensure that our solution relied on standard software development practices to include and build schemas. For that purpose, we built infrastructure to serve dataset schemas as software artifacts. This entails capturing Avro schemas of datasets from the Hive Metastore, generating their corresponding SpecificRecord classes, and then compiling and pushing them to an artifact repository manager (Artifactory, for example). Each table corresponds to an artifact that is named after the Hive database name and table names. When table schemas change, a new artifact is generated and pushed with a new version number.

From the application development perspective, application developers consume schemas using standard dependency management systems such as Gradle. For example, say our Company schema is associated with the Hive table, company, in the database example. Let us also say the corresponding generated artifact has the group Id “com.linkedin.dali.schema”, and artifact Id “example-company” (which is derived from the database and table names). The first version of such an artifact will be 0.0.1. Therefore, to include this schema in an application that processes company data, the build script should simply declare a dependency along the lines of:

compile com.linkedin.dali.schema:example-company:0.0.1

Since data schemas are integrated to a standard dependency management process, version conflicts are managed in a standard way as well through dependency management tools such as Gradle. This leads to automatically resolving dependency conflicts, and avoiding classpath conflict situations.

As the Company schema evolves, possibly by introducing new fields, new artifacts are generated with new version numbers, and again pushed to Artifactory, our artifact repository manager. Users can always be on the latest schema by simply declaring dependency on the latest artifacts.

compile com.linkedin.dali.schema:example-company:+

When table schemas change, derived view schemas potentially change as well. For example, if the Address field schema evolves to include the Unit Number, all views built on top of the Company table should expose that field as well. In the following section, we discuss our design choices for schema evolution management techniques for both tables and views.

Schema evolution management 

As new tables and views are created, and as table schemas evolve, schemas of views dependent on them also change. A change management mechanism should capture those changes, generate new Avro schemas for new tables or views, and publish new versions of their respective artifacts.

Snapshot- and event-driven models
In general, there are two broad schema evolution management models: snapshot-driven and event-driven. In the snapshot-driven model, our schema management system takes a snapshot of the metastore schema information at regular intervals, creates an artifact for each table or view, and publishes the artifacts to Artifactory. 

In the event-driven model, our schema management system captures change events on the metastore—such as those triggered by ALTER TABLE, CREATE TABLE, or CREATE VIEW commands—before extracting the new schema information and publishing it to the Artifactory (the next section will touch on how to generate Avro schemas for views). Trade-offs between the two models mainly revolve around schema freshness requirements. The event-driven model offers near real-time schema artifacts generation and publication.

Snapshot-driven algorithm
A very basic snapshot-driven algorithm for artifact generation—assuming S denotes the set of tables or views—looks like this:

Source link