Incremental MapReduce for Abelian-group reduction functions

Kragen Javier Sitaker, 2015-09-03 (4 minutes)

MapReduce is a batch-processing framework. Manuel Simoni wrote about incremental MapReduce in 2008 and is working on it still. He explains that in Damien Katz’s proposed solution, “you have to store intermediate results and do recomputation when the inputs change,” but you can avoid this if “map() stays the same, but reduce() is extended so that it takes a diff of the map outputs from before and after a document update.” Simoni’s design has an additional benefit over Katz’s: Katz’s will nearly always involve re-executing the entire reduce() stage. Simoni’s design does, however, involve storing the final reduce() output.

This is a very interesting idea. Simoni comments that it has a drawback:

[R]educe functions get more complex in this scheme, but the Google papers on MapReduce and Sawzall suggest that map functions are much more often user defined than reduce functions.

There’s actually a whole algebraic landscape associated with this statement.

To summarize algebra in a few lines, a magma that's associative is a semigroup, a semigroup that’s commutative and idempotent is a semilattice, a semigroup with identity is a monoid, a monoid with inverses is a group, and a commutative group is an Abelian group.

In many cases, you can construct your reduce-function by composing an Abelian group operation with some other operation. Undoing an Abelian group operation is relatively trivial: because the operation is associative (because semigroup) it doesn’t matter what order the operations were done in, and because it’s commutative, it doesn’t even matter where in the sequence the removed element was. So you can just apply the inverse of the removed element to the previous reduce-function result.

As one broadly applicable example, if we extend bags over some set A, A → ℕ, to allow negative multiplicities, A → ℤ, then any bag has its antibag which is its inverse under extended bag sum ⨄.

So if you store the result of the Abelian group operation, you can apply the diff to it, then redo the final operation.

For example, to maintain the set of unique words present in a set of documents, you can maintain in storage an extended bag that counts the number of times each word occurs, and then just discard the multiplicities to produce the final result.

As another example, if you want to know the largest one-second price jump from a large collection of stock-market data, you can divide your input data into shingles that overlap by a second, use a map function that outputs just the largest jump in each shingle, and then use a max-by-jump-size as your reduce function. Unfortunately, max, like semilattice operations generally, does not admit element inverses. A general incremental solution for this at the point of reduce would involve maintaining, at least, a max-heap of existing records, to which you could add and remove records. Max-heaps, then, are the Abelian group needed here. But this has the same space cost (modulo overhead) of Katz’s solution of storing previous map results and re-executing affected reductions.

If you cannot decompose your reduce-function into an Abelian group operation, the problem is more complex; but in fact reduce-functions in MapReduce are almost invariably intended to be commutative and associative, because the order in which the different map-results are presented to them is more or less arbitrary.

The max-heap example points up an inadequacy in the group-based, or even magma-based, analysis of incrementally updatable reduce-functions: it requires that the things coming from the map-functions be of the same type as the internal state of the reduce-function. In Haskell, foldl has type (a → b → a) → a → [b] → a — it takes an initial a, produces a final a, and in the middle it folds a list of bs into it using an a → b → a function. For example, a could be a max-heap of stock-market price moves, and b could be a single stock-market price move.

Topics