Managing “Exploding” Big Data | LinkedIn Engineering

The left side of the figure above shows the partitioned tables. A mapper is launched for each pair partition, which will load the hash table slice for member and item features. In the figure above, the logic is shown for a mapper that processes partition 4 of member features and partition 1 of the item features. Thereafter, for each record, we look up the member and item features from the hash tables and score them right away! The code is given in pseudo-form, setup is called once in a mapper, map is called for each record, and the load_[member|item]_slice loads the slice file directly from HDFS.

Observe how we have shifted our data read/write/shuffle patterns: previously we were shuffling and writing the intermediate data. With 2D Partitioned Hash Join, we have completely avoided that. Our cost now is reading the same partitions of member and item features tables from several mapper sites. This is better, as we are reading the pre-joined data (which is smaller) and the read access is fully parallelized (which is faster).

The net effect? Loading the slices in memory takes less than 10 seconds. So, the two joins that used to take several hours are now done in seconds!

A wide middle in graphs

A key value of LinkedIn’s professional network is discovering connections with people you should know or reconnecting with old or lost connections. Over the years, the People You May Know (PYMK) feature has been helping members build a healthy and thriving professional identity and network. One component of discovering connections (presented in simplified form here) is via the second degree network (also called friends-of-friends network). If you and I are not already connected but we both share several mutual common connections, it is a good indicator that our professional identities overlap and we should “get in touch.”

The “standard” technique for generating this on a cluster is actually rather simple. The graph is stored as an adjacency list: each row of data is a member and a list of their connections—e.g., {member: Adam, connections: [Bob, Charlie, David]}. The mappers load the partitions of this graph and generate the pairs of second degree connections as:

{Bob, Charlie, mutual: Adam}
{Bob, David, mutual: Adam}
{Charlie, David, mutual: Adam}

That is, each input row produces several output rows. If each member has on average 100 connections, the mappers will produce 10,000x rows to the next stage of computation. This is the exploding data!

Fortunately for us, this huge intermediate data is subsequently reduced in two ways: first, even though there may be, say, 50 mutual connections between Bob and Charlie, we will group them and only count the number of common connections (instead of 50 rows from mappers, we only keep one row in reducer). Second, we will discard such pairs with low (say, less than three) common connections. Both factors taken together bring down the data by several orders of magnitude.

If we were to think in relation algebra, we are joining the connections relation with itself on keys which are not unique in either table. This, as we now know, leads to data explosion.

Matrix-Multiply Join
We view the graph, G, as a 2-D (sparse) matrix with sources as rows and destinations as columns. The original graph is preprocessed twice. In the first place, the graph is transposed, hash partitioned, and sorted within partition on the dest vertex and stored on HDFS. We call this the “left partitioned graph.” The second step is a little unconventional: the graph is partitioned on the dest vertex but sorted on source vertex within each partition, and also stored to HDFS. We call this the “right partitioned graph.”

Source link