The exabyte club: LinkedIn’s journey of scaling the Hadoop Distributed File System

Figure 2. Non-fair locking improves NameNode RPC latency by 10x.

Satellite cluster

The small files problem
HDFS is optimized for maintaining large files and provides high throughput for sequential reads and writes that are essential for batch data processing systems. Small files cause problems for most file systems, however, and HDFS is not an exception. The problem is well understood; see, e.g., HDFS Scalability: The limits to growth and the sequel Apache Hadoop: the Scalability Update.

A file is small if its size is less than a block size, which on our clusters is set to 512MB. Small files in HDFS disproportionally inflate metadata compared to the aggregate size of data referenced by it. Since all metadata is stored in NameNode’s RAM, it becomes a scalability limit and a performance bottleneck. Block-to-file ratio reflects how many small versus large files are in the system: if all files are small, then the ratio is 1, while if all files are large, then the ratio is at least 2.

The logging directory
We started the satellite cluster project with the block-to-file ratio at around 1.1, which means that 90% of files in our system were small. Our analysis of file size distribution revealed a system directory /system, primarily composed of very small files with an average size of less than 1MB, representing configurations and logs managed by Hadoop-owned services, such as YARN, Spark history servers, and other system applications. The directory contained half of the files of the entire namespace but used only a small fraction, 0.07%, of the cluster data capacity. Since the system directory is used exclusively by Hadoop internal systems, we could move it into a separate cluster without any user-facing changes. This prompted us to create a new (satellite) cluster with the NameNode of the same size as on the primary cluster, but with 100 times fewer DataNodes.

Bootstrapping the satellite cluster
Bootstrapping required us to move the logging data from the primary to the satellite cluster. While the amount of data (60TB) was relatively small, the number of files (100 million) was big, which presented the first challenge.

Initially, we tried a straightforward approach of copying all 100 million files using DistCp (distributed copy)—a standard Hadoop tool for large inter/intra-cluster parallel data copying. Because we had such a big number of files, the DistCp job got stuck on the initialization stage trying to collect all file paths that it needed to copy. The job produced such an exhaustive load on the NameNode that it became unresponsive and inaccessible for other clients during that time. We estimated that even if we split the copy job into multiple manageable steps, it would still take over 12 hours to complete. By that time, new data would have arrived and some of the copied data would become obsolete. The solution turned out to be non-viable.

We decided to instead build a custom HDFS-compatible file system driver, called FailoverFS, which always writes new logs to the satellite cluster, while presenting the combined read view of both clusters’ files. This allowed new jobs to write their logs to the satellite cluster, and services accessing those logs, such as the Spark history server, could read logs from both clusters. The log retention policy service, which is set to delete files older than 1 week, eventually removed the logs on the primary cluster.

Very large block reports
DataNodes in HDFS send periodic block reports to the NameNode to let it know of all block replicas they own. For each block, the report contains block id, its generation stamp, and the length. The block report is partitioned into chunks corresponding to blocks from the same volume (disk) of the DataNode. The NameNode processes each volume report contiguously, holding a global namespace lock.

The satellite cluster contains only 32 DataNodes, because the amount of data is small. Thus, we have a large number of blocks (roughly equal to the number of files) distributed among a small number of DataNodes, which makes the per-node replica count, and therefore the size of a block report, very big. For comparison, a DataNode on the satellite cluster contains 9 million block replicas, versus 200K replicas on the primary. This presented another challenge, as the block report processing time on the NameNode became overwhelming, making it unresponsive for other operations during the processing time.

The solution was to split each DataNode drive into ten virtual volumes. This partitions the block report into a larger number of smaller-sized volume chunks and makes the report processing on the NameNode more granular.

Overall, the introduction of the satellite cluster allowed us to manage another round of doubling of the infrastructure. It resulted in an improved block-to-file ratio of 1.4.

Motivation and requirements
In the era of Moore’s Law dramatically slowing down—and eventually ending by 2025—the main limiting factor for HDFS scalability becomes the NameNode performance, due to CPU speed restrictions. In anticipation of the next cycle of cluster growth, we realized that even though we can keep up with metadata object growth by increasing the heap size of the NameNode, we cannot rival the increase in metadata operation workloads with a single metadata server. The workload should be parallelized between multiple servers.

An HA-enabled HDFS typically has two NameNodes, one in Active and another in Standby state. Any Standby node is a replica of the Active. Coordination of the metadata state between Active and Standby nodes is handled by Quorum Journal Manager. The Active NameNode publishes journal transactions to QJM, while the Standby tails the transactions from QJM in the same order in which they were executed on Active. This creates an opportunity for reading metadata from Standby instead of the Active NameNode. The Active will remain the only NameNode serving write requests as the sole source of truth for namespace updates.

Our analysis of metadata workloads in production showed that reads comprise 95% of all namespace operations. Therefore, load balancing of reads should substantially improve the total throughput of metadata operations.

However, traditional Standby NameNodes in HDFS are used only for failover and reject any client operations. To overcome this issue, we introduced a notion of an Observer node, which is a Standby that also serves read requests. The Observer node along with all related logic was fully implemented as part of HDFS-12943. We deployed it on our clusters and have been running it in production for over a year.

The main requirements that defined our design decisions included:

  1. Strong consistency. Clients should always get a consistent view of HDFS regardless of which NameNode they are connecting to.

  2. High throughput of metadata operations.

  3. Transparency for existing applications. Client APIs should treat NameNode as a single service.

Consistency model
In HA settings Active, Standby, and Observer nodes follow an identical sequence of events, where each event modifies the state of the namespace. “Namespace state id” characterizes the state of the namespace as it evolves through modifications. State id is a number monotonically increasing with each modification. Every modification on the NameNode corresponds to a journal transaction, so the state id is implemented as the id of that transaction.

The states of the Active and Standby nodes are the same when they reach the same state id. But Standby is a follower of its leader, Active, and therefore is always behind, as it consumes transactions that the Active has already executed.

The Stale Read Problem: from a client viewpoint, the namespace state of Observer is the same as Active’s, except for a limited set of recently modified “new” objects. Even though it can obtain correct information for the majority of “old” files and directories from either of the NameNodes, the Observer may return outdated information about the “new” objects.

The Consistency Principle informally states that clients should never see past states. It is a common assumption, which guarantees that clients look forward through the history of events even when they switch between NameNodes. More formally:

  • If client C sees or modifies an object at state s1 at time t1, then in any future time t2 > t1, C will see the state of that object at s2 >= s1.

We distinguish two major scenarios in which the consistency principle can be violated:

  1. Read your own writes (RYOW)
    If an HDFS client modifies the namespace on Active and then reads from an Observer, it should be able to see the same or a later state of the namespace, but not an earlier one.

  2. Third-party communication (3PC)
    If one client modifies the namespace and passes that knowledge to other clients, the latter ones should be able to read from Observer the same or a later state of the namespace, but not an earlier state.

An example of RYOW is when a client creates a directory on Active and then tries to ls it via Observer. If Observer has not yet processed the corresponding mkdir transaction, it will respond that the directory does not exist. This violates the consistency principle.

The 3PC scenario can occur, e.g., during MapReduce or Spark job submission. First, the job client creates job configuration and JAR files on HDFS. Then, it notifies the YARN ResourceManager to schedule the job tasks. Each task reads the job configuration and JAR files during startup. If the files are still not available on the Observer due to delayed transactions, the job may fail. The third-party communication between the job client and the ResourceManager here happens outside of the HDFS protocol.

In the next sections we explain how our design addresses both scenarios.

Journal tailing: Fast path
In order to guarantee consistency and performance of reads from Observer, journal tailing—the process of fetching new transactions from the QJM—should be fast, minimizing the delay in state updates between Standby and Active nodes. With a traditional implementation of journal tailing, the delay is on the order of minutes—2 minutes by default—but on our large clusters, it was up to 8 minutes. This would increase latency of read operations to minutes, while the maximum tolerable threshold should not exceed 50 milliseconds.

Active NameNode publishes journal transactions to a quorum of Journal nodes comprising the QJM service. When a Journal node receives transactions, it is required to persist them to disk in order to prevent data loss in case of failure. The transactions are written in segments, where a new segment file is rolled based on a configurable time interval (2 mins by default) or the number of transactions received. In the legacy implementation, Standby nodes could tail only entire segments, reading the last segment from disk via an HTTP call once the segment file was finished (closed) by a Journal node.

HDFS-13150 proposed and implemented Fast Path tailing through the following key modifications:

  1. It introduced in-memory caching of recent journal transactions. Journal nodes serve cached transactions to Standby nodes from memory instead of disk.

  2. A Standby node can request a sequence of transactions starting from a specified startTxId. Journal nodes then return all known transactions from the starting id up to a specified maximum batch size. This allows for a fine-grained journal tailing, since typically such requests would return only a few of the latest unapplied transactions.

  3. A request for the latest transactions is an RPC call, which is faster than segment file transfers via HTTP. The RPC call is implemented as a quorum read from Journal nodes to guarantee that Standbys see committed transactions only.

The use of Fast Path tailing is a configurable option. It is required for Observers. During startup or if a Standby node falls too far behind for reading from QJM cache, it automatically switches back to reading persisted segments, which has much higher latency but also higher throughput.

Performance evaluation of Fast Path showed substantial improvement from the original 2-minute lag:

  • The average client-perceived lag of an Observer node in RYOW scenario (the time in which a client could see a transaction on the Observer after applying it on the Active) is under 6 milliseconds.

  • The average transaction processing time, starting from the moment it is submitted on Active until it is applied on Observer, is 25-30 milliseconds.

Fast Path tailing allows Observers to keep their state very close to Active NameNode, but does not eliminate stale reads from Observer, which we will discuss next.

Read your own writes
NameNodes maintain their latest state id as LastWrittenStateId, which corresponds to the latest update of the namespace and the respective transaction written to NameNode’s journal.

We introduce LastSeenStateId for each HDFS client to indicate the latest state of the namespace observed by the client. It is automatically updated with every call to a NameNode and is set to the respective LastWrittenStateId. E.g., when client C sends a write request to Active node, the call once executed will also set C.LastSeenStateId = A.LastWrittenStateId of the Active. Now C sends a read request to Observer passing along its C.LastSeenStateId. The Observer verifies that its own O.LastWrittenStateId matches or exceeds client’s C.LastSeenStateId and postpones the request execution until it is caught up. The call to Observer also resets C.LastSeenStateId = O.LastWrittenStateId., so subsequent calls to Observer will not be delayed for C. The client can always switch to the Active without a delay because:

A.LastWrittenStateId >= O.LastWrittenStateId >= C.LastSeenStateId.

LastSeenStateId is passed seamlessly between servers and clients as part of the RPC header.

As we have seen with Fast Path tailing, the state lag of Observer to Active is small enough to be comparable to the time needed for a client to switch from one node to another. If an Observer falls behind the client state farther than an allowable threshold, it notifies the client, and the client switches over to another Observer or the Active NameNode.

LastSeenStateId guarantees that clients never see past states of the namespace in an RYOW scenario.

Third-party communication: msync()
In the 3PC scenario, one client C1 creates, e.g., a file on Active NameNode while another client C2 is expected to see it on an Observer. C2 has the knowledge that the file was created but does not know LastSeenStateId of C1, which can cause stale information to be read. We introduced a new HDFS API FileSystem.msync() to guarantee consistent reads for C2, along with a series of enhancements to avoid actually calling msync() in common cases.

msync() is similar to HDFS hsync(). The latter guarantees that the data is available for other clients to read, while msync() provides the same guarantee for metadata. When C2 calls msync() the client contacts the Active NameNode with the sole purpose of forcing the update of its C2.LastSeenStateId. Now that it has learned the current state of the namespace, which is guaranteed to be at least C1.LastSeenStateId, C2 can safely read the file from Observer.

Adding an explicit msync() call to existing applications contradicts our transparency requirement (#3) listed above. It is not feasible to expect all applications to change before the new feature is deployed in production. This prompted us to make clients automatically synchronize their states, thus avoiding explicit msync() in most cases.

  1. No-cost automatic client state synchronization on startup. When an HDFS client starts, it discovers the HA status of existing NameNodes. The namespace state id is piggybacked on these calls.

  2. Periodic synchronization forces clients to automatically call msync() after a configurable period of time.

  3. Always-msync mode is a special case of the former when the time period equals 0. It forces the client to automatically call msync() before every read request.

  4. Always-active mode. The client is configured not to use Observer for reads.

Startup synchronization (1) particularly solved the stale read problem for job submission frameworks, as discussed earlier. When a new MapReduce or Spark task starts, it instantiates a new HDFS client, which automatically catches up with the latest state of the Active node and thus prevents stale reads from Observer.

Always-msync (3) is an expensive option, as every metadata operation results in two RPC calls.

The enhancements above prevent explicit use of msync() for the majority of use cases. It remains necessary for a narrow type of long-running read-only clients. Such clients exclusively read from Observer and can be dragged to fall behind along with the Observer. They should either call explicit msync() at key moments controlled by the application logic or use always-active mode (4).

Source link