[Paper Notes] Some Classic Distributed Systems
Scaling Memcache at Facebook
NSDI'13, by Facebook
TL; DR: Lessons learned of building large scale caching system at Facebook
Main goal: Show the important schemes of system design at different scales
- E.g., performance, efficiency, fault-tolerance, and consistency
Why read it:
- Though using memcache service as an example, show many general system design idea at different scales
Designs:
Single server:
- Automatic expansion of hash table of memcached -> reduce lookup time
- Multi-threaded memcached with a global lock
- Own UDP port per thread to reduce contention
In a cluster:
- Settings:
- Thousands of servers
- Hundreds of memcached servers
- Use consistent hashing to distribute items across memcached servers
- Load a page requiring an average of 521 distinct items
- Reducing latency: on the
memcache
client side (run on each web server)- Parallel requests and batching: construct a DAG to maximize the concurrent fetch
- Client-server communication:
get
with UDP: discard dropped packet (extremely rare in production: 0.25%)set, delete
with TCP throughmcrouter
(an instance run the same machine with web server)mcrouter
can coalesce TCP connections of web threads (high cost of connection due to the nature of high degree of parallelism and over-subscripting of web servers which intends to improve throughput)
- Incast-congestion: balance the sliding window size (number of outstanding requests)
- Too small: low utilization
- Too large: incast congestion
- Reducing load: reduce the frequency of more expensive fetches, like database queries
- Leases: give lease upon a cache miss bound to that key; invalidate the lease when receives a delete of the key at memcached server
- Stale value: stale cache set will fail as the lease was invalidated
- Thundering herd (heavy read/write load to one key, the cache is invalidated repeatedly, which causes reads fail to fetch cache): 10s/lease, if fail to get lease, wait; then typically other clients will set the cache
memcache
pool: separate the pool to accommodate the working sets- Replication within pool: trade space for more bandwidth
- Leases: give lease upon a cache miss bound to that key; invalidate the lease when receives a delete of the key at memcached server
In a region:
Settings:
- Split web and memcached servers into multiple frontend clusters
- Multiple clusters + one storage cluster = a region
Regional invalidations:
The storage will invalidate the cache for frontend clusters
- Use
mcsqueal
daemon, by checking the MySQL commit log and broadcasting the deletes - Optimization: batch the invalidations
- Use
Optimization: the frontend cluster will also invalidate it local cache upon updates to provide the read-after-write semantics within its cluster
Regional pools:
- Multiple frontend clusters share the same set of memcached servers -> reducing key replication -> enhance memory efficiency and lower offline maintenance cost
Cold cluster warmup
- The cold cluster can retrieve the data from a warm cluster at start
- Consistency improvement: the cold cluster first update the db and then fetch from the warm cache before invalidation from db arriving the warm cache -> Add hold-off time for
delete
(db update -> cold cache invalidation), reject theadd
(cold client read the warm cache valid before db invalidation, and update cold cache byadd
, which will be rejected to indicate the previous update) within that time
Across regions:
Settings:
- Multiple geo-distributed resions, each with its own storage cluster
- One region has master storage while others have read-only replica storage
- Utilize MySQL's mechanism for storage consistency
Consistency model: best-effort eventual consistency
Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
NSDI'12, by Matei Zaharia, UC Berkeley
TL; DR: A data-sharing abstraction used by Spark; keep data in memory; achieve fault-tolerant
Main goal: Improve performance -> reduce disk/network -> reuse in-memory data / improve data locality
Key ideas:
- Use memory to store the intermediate data instead of disk between iterations
- Background: network was much slower than memory
- but how to make fault tolerance?
- Log the coarse-grained transformations (map, filter, ...) for batch operations instead of fine-grained data (traditional systems)
- Recovery: redo the computation based on the log
- other optimizations:
- Data locality: fully utilize in-memory data
- Checkpoint: avoid the case of long dependencies -> reduce recomputation
Programming model:
- Create a RDD object (lazy creation):
.textFile("hdfs://")
- Run iterative processing:
- Some transformations (some RDD to another RDD, also lazy):
.filter(_.startWith("xxx"))
,.map()
,.join()
- Some actions (launch the actual computation):
.count()
,.reduce()
- Persist the intermediate RDD for reuse:
.persist()
- Note: by the default all data is in memory, but some will be flushed to disk when out of space.
persist()
will guarantee in memory
- Note: by the default all data is in memory, but some will be flushed to disk when out of space.
- Some transformations (some RDD to another RDD, also lazy):
Mesos: A Platform for {Fine-Grained} Resource Sharing in the Data Center
NSDI'11, by UC Berkeley
TL; DR: Resource manager for the Spark; separate scheduling as offer and pick by two parties
Research problem: Build a scalable and efficient scheduler for different frameworks
Key Ideas:
- Two layer scheduling resource offer:
- Mesos decides how many resources to offer each framework
- Considerations: fair sharing, priority
- Format: list of
<xxx CPU, xxx Mem, ...>
- Each framework: which resource to accept and which task to run
- Considerations: efficiency, data locality
- Format: list of
<task1, x1 CPU, y1 Mem, ...>
<task2, x2 CPU, y2 Mem, ...>
- Mesos decides how many resources to offer each framework
Datacenter RPCs can be General and Fast
NSDI'19, by Anuj Kalia, CMU
Goal: Break performance/generality trade-off for DC RPC
Background
- DC networking is fast
- RDMA-based: use PFC for lossless ethernet -> HoL blocking, storm
- Switch buffer >> BDP
- BW: 25 GbE, RTT: 6 us -> BDP=18kB
- Switch buffer: 10s of MB
API
- Event-loop based processing
Rpc
: Communication endpoint per user thread- TX, RX queue for I/O
- An event loop for driving rpc processing
- Several sessions for multiple endpoints
- Control flow
- Client:
- Call
rpc->enqueue_request()
for new request - Periodically run event loop to make progress
- On receiving response -> invoke continuation callback
- Call
- Server:
- Run event loop
- On receiving request -> call request handler
- Client:
- Request handler work-flow:
- Dispatcher dispatches to a worker thread (e.g. RAMCloud)
- Cons: high inter-thread communication
- Dispatcher directly handles itself (e.g. Accelio, FaRM)
- Cons: block dispatcher thread -> increase tail latency
- Prevent rapid server-to-client congestion feedback
- Cons: block dispatcher thread -> increase tail latency
- Mixed usage in eRPC:
- User-defined
- Processing time < 100s of ns: dispatch thread
- Longer: worker thread
- Dispatcher dispatches to a worker thread (e.g. RAMCloud)
Design
High level idea: optimize for the common cases
Scalability
RDMA writes: limited NIC SRAM
-> Choose user-space packet I/O for eRPC
Zero-Copy
Key idea: message buffer management
- Buffer layout: single DMA-capable buffer (capable of multiple packets, optimized for the first packet)
- Two requirements: contiguous data region (used by app as buffer); contiguous first packet (header + data)
- Why: RPC message is small in common case: first packet congious -> one DMA
- Receiver-side zero-copy for the fast path
Sessions
Congestion Control
use Google Timely as CCA: RTT-based, RTT increase -> congestion -> decrease sending rate
Still common-case optimizations:
- Timely bypass: if RTT < Timely's low threshold (50 us) -> not update rate
- rate limiter bypass
- batched timestamps for RTT measurements