Subject: notes on mapreduce, FLuX From: Joe Hellerstein Date: Thu, 26 Oct 2006 01:09:44 -0700 To: joeh@cs.berkeley.edu MAPREDUCE =========== map (k1, v1) -> list(k2, v2) reduce(k2, list(v2)) -> (k2, whatever) Platform: - Commodity PCs (dual processor), commodity NW, 100's/1000's of machines, cheap IDE disks - GFS for reliable file storage - job = {tasks}, passed to scheduler Basic Execution: - data "automatically" partitioning into M "splits". Reduce done by hashing mod R. M and R specified by the user. - Splits are of a single "file" into physical chunks (# of Bytes) - Mappers write to memory buffers. Periodically, buffers are flushed locally. Location sent to master. - Master notifies Reduce workers about flushed results of Mappers. Fetches them via RPC. - Reduce then performs Sort-based groupBy. Output appended to final output file in GFS for this reduce partition. - When all M's and R's done, Master wakes up user program to "return" from the MapReduce call. R Reduce files are left in place, possibly to be reused in another MapReduce stage. FT: - Master pings workers for failure. - Completed map tasks are reset as "idle" (i.e. not yet started) because they're inaccessible on the failed node's disk. In-Progress Maps and Reduces also set as "idle". Completed Reduces are safely in GFS. - All reducers need to be told of a failure, due to "pull" model. - Master failure can be handled by checkpointing of worker state. - To ensure correctness, need ATOMIC commit of map and reduce. So tentatively write to private temp files (R for M's, 1 for R's). - When Mapper completes, it sends message to master with the names of the R files, which it records in its data structure. Subsequent such messages ignored. - When Reduce completes, worker renames temp output to final output name. Atomic rename in GFS ensures that only one of possibly many redundant reducers "wins". - Clearly non-deterministic functions provide loose semantics. Locality: - Master assigns Mappers with an understanding of where the GFS blocks of the input are. Best on a machine with a replica, else "close" in the network (same switch). Granularity: - Many more M's and R's than machines. Good for load-balancing, and you need to LB after failures. Culler calls this "oversampling". Since master has to bookkeep the M's and R's -- O(M*R) state and O(M+R) scheduling decisions, don't want this insanely big. Also, may not want too many R's cos result is spread into many files. - Rule of thumb: choose M to be about 16MB-64MB, R a small multiple (e.g. 100) of the #machines. 2K machines, M=200K, R=5K. Stragglers: - Note causes: faulty though correctable disks, shared resource utilization, bugs in code. - One solution: competition. Redundant execution of the last in-progress tasks. When useful, when not? Combiner function: for commutative/associative Reduce - partial aggregation at the mapper. output to the local intermediate file to be sent to reducer. Other stuff - Side effects: up to the programmer to make atomic and idempotent. - Optionally skip bad input records: signal handler sends a UDP packet with seqno to Master. If it sees >1 such, skips it next time. - Sequential local version for debugging. - HTTP server in master for status, stderr, stdout - Running aggregation of "counters" for sanity check Performance - 1800 machines, 2GHz Xeons, 4GB RAM< 160GB disk (!!), Gb Ethernet. Two level tree switched net with 100-200 Gbps aggregate at root - Grep experiment: see startup cost in graph -- 1 minute startup, 150 seconds total! Startup includes code propagation, opening 1000 files in GFS, getting GFS metadata for locality optimization. WOW. - Sort: 891 seconds. 1057 seconds was TeraSort benchmark. -------- FLUX ==== - Shared-nothing cluster of nodes, FIFO comm channels. - Heisenbugs, fail-stop. - Controller managers group membership, setup, teardown. Not a machine, a service. Can be distributed, use consensus to ensure controller messages arrive at all nodes in the same order despite failures. - Controller knows nothing about in-flight data or operator state. - NO stable storage assumed! - Process pairs: quick failover. - Assumptions -- Concentrate on dataflow fault tolerance -- Not end-to-end flow, not entry and exit points -- Every FT system has its boundaries -- Operators are piecewise deterministic -- Same input sequence -> Same output sequence -- Attach sequence numbers (unique ids) to tuples -- Input is ordered (imposed) -- Connections: FIFO queues -- In-order delivery, e.g. TCP -- Contents lost on endpoint failure Single-Site --------------- Picture: INGRESS --> PAIRED LOCKBOX(S-ConsP - STUFF - S-ProdP) --> EGRESS -- Egress Acks Secondary! -- S-ProdP produces output. S-ProdS buffer tuples and acks, which can come in either order! -- Ingress also buffers, needs to keep tuple and prim/sec acks. -- Loss-Free/Dup-Free Take-Over: Get secondary producing tuples. Catchup: get a new standby available. Window of vulnerability between. - need to checkpoint state to the standby, the fold standby back into the protocol Take-Over: controller pushes a "Reverse" message to secondary in the dataflow (which flushes acks before it.) Secondary now produces tuples, no dups! Also informs Ingress to stop fwing to primary. Catchup: initiated by S-Cons of survivor. Quiesces, marshals and xfers state, unmarshals and installs on other side, restarts. - Operators as deterministic state machines -- given the same sequence of inputs, produce the same sequence of outputs. HOW TO ACHIEVE THIS WITH PROCESS PAIRS? - Regular processing is easy, due to FIFO comm and deterministic state machines. - On failure it's tricky! Ensure Exactly-Once delivery: Loss-Free, Dup-Free. - Interpose FLuX operators to assign seqnos to tuples. Consumer acks REPLICA of producer. Producer buffers until ACK, ensures properties in case of failure. Details of FT for single-site - Ingress and Egress boundary operators on dataflow SEGMENTS. They are the boundaries of responsibility, and they do as much as they can with the outside world (e.g. acking at Ingress, waiting for ack at egress). Ingress also assigns seqnos. - S-Cons and S-Prod inside Ingress and Egress respectively. - S-Cons acks to Ingress *before* forwarding. Ingress drops the item on acks from both copies of S-Cons. - S-Prod assigns seqnos on output, buffers tuple. Only the primary sends to Egress. Egress sends acks to the S-Prod *replica*, only *after* which it can forward the packet. - Weird: an ack can arrive at S-Prod replica before the tuple does, if the primary gets ahead. - LOSS-FREE: in-order delivery, ack-ingress before consuming, ack-egress before assuming production. - DUP-FREE? Requires careful management of buffers for tuples and acks in Ingress and S-Prod. - MTTF: (MTTF_s)2/MTTR Take-