Concourse: Generating Personalized Content Notifications in Near-Real-Time


Feature management/decoration
For the purposes of scoring, the relevance algorithm needs to know information that is not directly a part of the notification data. This might include LinkedIn’s estimation of the strength of a connection between the content creator and candidate recipient, which would be learned from their interaction on the site and other sources, the country of the sender and recipient, and so on. This data is called feature data; to rank a notification, the ranker needs access to four sets of feature data:

  1. Features of the source (actor) (ActorFeatures).
  2. Features of the notification content (ItemFeatures).
  3. Features of the recipient (RecipientFeatures).
  4. Features of the relationship between the sender and recipient (EdgeFeatures).

This data is easily accessible at LinkedIn, either via online REST APIs or through tables in LinkedIn’s shared HDFS clusters. Actor features and notification content features can be fetched before we do the fanout and can be sent as part of the notification payload. These features do not change after fanout, and the volume of messages before fanout is much lower compared to after fanout. This allows us to make network calls to fetch this data without impacting throughput. We chose to implement this feature decoration functionality in a separate, “prefanout” Samza instance.

Since the recipient is identified only after fanout, features about the recipient and features of the relationship between the sender and recipient need to be fetched by the scorer before ranking a candidate notification. As mentioned earlier, the scorer cannot make network calls to fetch this data, so these features need to be made available in the scorer’s local Key-Value store to support fast lookups at scale. This data set is also large and often runs several terabytes. Therefore, we cannot push a full data set into every scorer instance, and the data needs to be partitioned across multiple scorer instances.

So we need to ensure that the scorer instance which ranks notifications for a recipient has feature data about that recipient as well as feature data for all edges which include that recipient cached locally. Samza also manages this for us. When a Samza job processes multiple Kafka topics, Samza can be configured to process data for the same partition across all topics in the same Samza instance. However, all the Kafka topics need to use an identical partitioning key and identical number of partitions for this to work. This feature data is generated from multiple data sets, all of which have their own requirements. This necessitates another Samza job, called “features-processor,” which consumes data from multiple sources at LinkedIn, re-keys each messages based on the recipient, and emits the data to a different Kafka stream. This Kafka stream is then consumed by the scorer job to index data in its local Key-Value stores.

Challenges and design decisions

In order to provide LinkedIn’s over 550 million members with the best possible user experience, Concourse is dedicated to generating the highest possible liquidity of candidate notifications and selecting the most relevant ones to be sent to our members in a timely manner by leveraging machine learning and AI technologies. In this regard, besides requirements on redundancy and low-latency, Concourse faces the following challenges in scalability:

  1. Concourse needs to support scalable scoring on the order of millions of candidate notifications per second and make a send or no-send decision for each notification in real-time.
  2. To enable machine learning-based scoring, Concourse also needs to support scalable retrieval of features, which is on the order of hundreds of billions of records and several terabytes in data size.

A few design decisions and optimizations have been made throughout Concourse development to overcome the above challenges.

Why Samza?
Leveraging Samza brings a number of benefits along with it:

  1. Concourse needs to store data to perform fanout and to rank notifications. Samza manages this state for us and takes care of backups and bootstrapping this data on a new machine.
  2. When a host goes down, Samza’s YARN scheduler automatically moves the job to a new host from the host pool.
  3. Samza’s YARN scheduler also efficiently uses the resources available on the host pool by scheduling multiple jobs on the same machines.

Partitioning metadata and scoring requests by recipient
The mission of the Concourse platform is to make personalized decisions for the recipient for each notification on the fly. Concourse adopts a recipient-based partitioning strategy for all the incoming Kafka topics in its scoring processor. This design ensures that all notifications and metadata for a specific recipient are routed to the same scoring host to allow for distributed scoring of the candidate notifications, thus enabling scalable capability in scoring. Given the fact that some superactors (such as Bill Gates) have extremely large connections and followers (over millions), each of their activity will generate millions of candidate notifications and result in spiky traffic, imposing a fatal system challenge if not handled properly. Partitioning based on recipient allows for a balanced load to the scoring processors and thus enables Concourse to easily handle spiky traffic caused by superactors or influencers. Using partitioning based on recipient, we avoid the need to replicate entire metadata in every host, which would be prohibitively expensive in terms of storage. Instead, each host is only required to store metadata for the recipients whose member ID is hashed to this host.

Storing external signals in local store
To enable machine learning model-based scoring, Concourse consumes various signals, like member metadata and edge metadata. To allow for scalable scoring (~500 k QPS at peak), Concourse first retrieves these signals from both online data stores and offline HDFS and then puts these data into the RocksDB instance residing on the disk of the processing host. These local RocksDB stores allow for a fast read while scoring each candidate message and thus increases the scoring throughput. While the client-side latency of remote calls could range from a couple milliseconds to hundreds of milliseconds, the read to RocksDB has a much lower latency in the order of tens of microseconds.

Distributing incoming traffic across data centers
To further boost the scalability of the Concourse platform, we have decided to let Concourse consume raw activity content from local Kafka topics instead of the ones aggregated across data centers. At LinkedIn, any Kafka event is first queued in a local Kafka topic (within each data center), which only contains events that are routed to the current hosting data center. Afterwards, each event is copied from the local Kafka topic to a global Kafka topic, which contains all the events that have been queued in all data centers. By consuming from local Kafka topics, Concourse hosts in a particular data center only need to handle traffic routed to that particular data center, which increases the processing capacity of Concourse to four times if compared with consumption from global Kafka topic.

Over-partitioning Kafka topics
The number of Samza processes that process this workload in parallel is determined by the number of Kafka partitions in the topic between the fanout and scoring Samza jobs. A higher partition count implies a higher degree of parallelism, and each individual task then becomes more manageable in terms of CPU cost. So we over-partitioned the Kafka topic that feeds into the scoring job so that each process has to score at most a few thousand records per second. The scoring job also needs to store feature data, which is also partitioned. We also needed to ensure that each Samza job stores at most a few GBs of feature data for Samza to effectively distribute these jobs across the shared set of machines.

The feature data which runs several terabytes and multiple billion records are also produced in Hadoop by joining multiple different data sets. Pushing this full data from Hadoop, re-partitioning, and indexing it locally in the scoring jobs takes almost a day, since these pipelines are shared and have a limited amount of bandwidth per stream. The scoring job also needs to process and index this data while it is ranking notifications in real time. Pushing this data also puts a lot of strain on the underlying Kafka systems.

We realized that each underlying data set gets refreshed at a different frequency. For example, one data set gets fully refreshed each week in a single large batch. Other data is more live in nature, that is, it is constantly getting updated based on live traffic. So, to scale the feature push job better, we implemented a solution where a delta is computed in Hadoop where only rows which actually changed since the last push are selected. This smaller data set is pushed everyday, which significantly reduces the cost. We also push the full data set once a week over several days to reduce pressure on underlying systems.

Next steps

Concourse is now powering all content recommendation notifications to their network. We realized that there are other use cases at LinkedIn which are not necessarily around content but which also require near-realtime fanout and relevance based filtering and scoring. One such example is the notification that gets sent when a member updates their profile about a job or title change. We are in the process of onboarding other types of notifications to Concourse.

We also realized that fetching a list of candidate members from a service, such as fetching connections from the LinkedIn Graph, is one such of method of performing fanout. Edges other than connections are also possible. For example, we might want to notify certain LinkedIn members working in the New York metropolitan area about highly relevant local news. Or we might want to notify certain LinkedIn members working in social media companies about new GDPR regulations. The set of members for such edges is very large and it might not be practical to fetch them page-by-page using an online API. The approach we are considering here is to index these attributes in a local Key-Value store, partition across multiple fanout instances, and compute the fanout by iterating through this table.

Acknowledgments

Designing and releasing Concourse to production was a very challenging endeavor which would not have been as smooth without support of our partner teams: Samza, Kafka, Communications Relevance, and others. We’d also like to thank the following individuals for their support and contributions to this project: Banu MuthukumarChien-Chun HungEric BrownroutAnkit GuptaCurtis WangAjith MuralidharanViral GuptaShaunak ChatterjeeShipeng Yu, and Parin Shah.

We look forward to continued collaboration as we expand Concourse’s capabilities!



Source link