MapReduce
-
-
The MapReduce pattern aims to handle large-scale computations across a cluster of servers, often involving massive amounts of data.
-
"The computation takes a set of input key/value pairs, and produces a set of output key/value pairs. The developer expresses the computation as two Func delegates: Map and Reduce.
-
Map - takes a single input pair and produces a set of intermediate key/value pairs. The MapReduce function groups results by key and passes them to the Reduce function.
-
Reduce - accepts an intermediate key I and a set of values for that key. It merges together these values to form a possibly smaller set of values. Typically just zero or one output value is produced per Reduce invocation. The intermediate values are supplied to the user's Reduce function via an iterator."
-
the canonical MapReduce example: counting word frequency in a text file.
MapReduce using DryadLINQ
-
-
DryadLINQ provides a simple and straightforward way to implement MapReduce operations. This The implementation has two primary components: A Pair structure, which serves as a data container. A MapReduce method, which counts word frequency and returns the top five words.
-
The TupleStructure - the Tuple has two properties: Item1is a string that holds a word or key. Item2 is an int that holds the word count.
public static IQueryable<Pair> MapReduce(
string directory,
string fileName,
int k)
{
DryadDataContext ddc = new DryadDataContext("file://" + directory);
DryadTable<LineRecord> inputTable = ddc.GetTable<LineRecord>(fileName);
IQueryable<string> words = inputTable.SelectMany(x => x.line.Split(' '));
IQueryable<IGrouping<string, string>> groups = words.GroupBy(x => x);
IQueryable<Tuple<string, int>> counts = groups.Select(x => new Tuple<string, int>(x.Key, x.Count()));
IQueryable<Tuple<string, int>> ordered = counts.OrderByDescending(x => x.Item2);
IQueryable<Tuple<string, int>> top = ordered.Take(k);
return top;
}
IQueryable<Pair> results = MapReduce(@"c:\DryadData\input",
"TestFile.txt",
100);
foreach (var words in results)
Debug.Print(words.ToString());
return inputTable
.SelectMany(x => x.line.Split(' '))
.GroupBy(x => x)
.Select(x => new Tuple<string, int>(x.Key, x.Count()))
.OrderByDescending(x => x.Item2)
.Take(k);
MapReduce using PLINQ
-
The pattern is relevant even for a single multi-core machine, however. We can write our own PLINQ MapReduce in a few lines.
-
the Map function takes a single input value and returns a set of mapped values àLINQ's SelectMany operator.
-
These are then grouped according to an intermediate key à LINQ GroupBy operator.
-
The Reduce function takes each intermediate key and a set of values for that key, and produces any number of outputs per key à LINQ SelectMany again.
-
We can put all of this together to implement MapReduce in PLINQ that returns a ParallelQuery<T>
public static ParallelQuery<TResult> MapReduce<TSource, TMapped, TKey, TResult>(
this ParallelQuery<TSource> source,
Func<TSource, IEnumerable<TMapped>> map,
Func<TMapped, TKey> keySelector,
Func<IGrouping<TKey, TMapped>, IEnumerable<TResult>> reduce)
{
return source
.SelectMany(map)
.GroupBy(keySelector)
.SelectMany(reduce);
}
var files = Directory.EnumerateFiles(dirPath, "*.txt").AsParallel();
var counts = files.MapReduce(
path => File.ReadLines(path).SelectMany(line => line.Split(delimiters)),
word => word,
group => new[] { new KeyValuePair<string, int>(group.Key, group.Count()) });