Интересности      Книги      Утилиты    

10 февраля 2017 г.

MapReduce


mapreduce_cpver

Что такое MapReduce?

Это подход, алгоритм, ну или паттерн, тут уж как кто назовет, параллельной обработки больших объемов сырых данных, например результатов работы краулеров или логов веб запросов, вообще по статистике до 80% задач могут маппится на MapReduce, и именно MapReduce драйвит NoSQL. Существуют разные имплементации MapReduce. Достаточно известна и запатентована реализация этого алгоритма и подхода Google. Или как пример MySpace Qizmt - MySpace’s Open Source Mapreduce Framework, также используется в Hadoop, MongoDb и еще много разных примеров можно привести. Более детально можно почитать в статье MapReduce: Simplified Data Processing on Large Clusters 

Алгоритм получает на вход 3 аргумента, исходную коллекцию, Map функцию, Reduce функцию, и возвращает новую коллекцию данных.

Collection MapReduce(Collection source, Function map, Function reduce)



Алгоритм состоит из нескольких шагов. В качестве первого шага выполняется Map функция к каждому элементу исходной коллекции. Map вернет ноль либо создаст экземпляры Key/Value объектов.

ArrayOfKeyValue Map(object itemFromSourceCollection)

То есть, можно сказать что обязанность Map функции конвертировать элементы исходной коллекции в ноль или несколько экземпляров Key/Value объектов. Это продемонстрировано ниже на изображении:

image

Следующим шагом, алгоритм отсортирует все пары Key/Value и создаст новые экземпляры объектов, где все значения (value) будут сгруппированы по ключу.

image

Последним шагом выполнится функция Reduce для каждого сгруппированного экземпляра Key/Value объекта

ItemResult Reduce(KeyWithArrayOfValues item)

Функция Reduce вернет новый экземпляр объекта, который будет включен в результирующую коллекцию.

image

Пример

В качестве примера реализуем очень простую c# имплементацию алгоритма. Пример считает количество гласных построчно в наборе строк.

В примере создана обобщённая функция MapReduce, как основная в алгоритме, которая просто вызывает функции Map и Reduce распараллеливая их выполнение. А также функции Map и Reduce, реализация которых является уже специфической для той задачи, которую мы пытаемся решить, в данном случае это “посчитать количество гласных в наборе строк”

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;

namespace MapReduceSample
{
    // Элемент результирующей коллекции, гласная и ее количество
    class VocalCount
    {
        public char Vocal;
        public int Count;
    }

    class Program
    {
        static void Main(string[] args)
        {
            // "lines" это исходня коллекция.
            var lines = new[] {
                "How many vocals do",
                "these two lines have?"
            };

            foreach (var line in lines)
            {
                Console.WriteLine(line);
            }
            Console.WriteLine();

            // Вызывается MapReduce
            var results = MapReduce(lines, Map, Reduce); 

            // Отображение результата
            foreach (var result in results)
            {
                Console.WriteLine("{0} = {1}", result.Vocal, result.Count);
            }

            Console.ReadKey();
        }

        /// <summary>
        /// Функция Map считает количество гласных в строке
        /// </summary>
        /// <param name="sourceItem" />Строка для подсчета</param>
        /// <returns>Коллекция экземпляров Key/Value. 
        /// Где key - гласная, и значение ее количество.</returns>
        static IEnumerable<KeyValuePair<char, int>> Map(string sourceItem)
        {
            return sourceItem
                .ToLower()
                .Where(c => "aeiou".Contains(c))
                .GroupBy(c => c, (c, instances) => new KeyValuePair<char, int>(c, instances.Count())); 

        }

        /// <summary>
        /// Функция Reduce считает общее число каждой гласной
        /// </summary>
         /// <param name="reduceItem" />Экземпляр Key/Values, где key - гласная, 
        /// и value - список всех подсчетов гласных по строкам</param>
        /// <returns>Экземпляр элемента результирующей коллекции, VocalCount</returns>
        static VocalCount Reduce(KeyValuePair<char, IEnumerable<int>> reduceItem)
        {
            return new VocalCount
            {
                Vocal = reduceItem.Key,
                Count = reduceItem.Value.Sum()  // Computes total count
            };
        }

        /// <summary>
        /// Обобщенная реализация функции MapReduce
        /// </summary>
        /// <typeparam name="TSource">Тип элементов исходной коллекции</typeparam>
        /// <typeparam name="TKey">Типа ключа Key используемого в функциях Map и Reduce</typeparam>
        /// <typeparam name="TValue">Тип Value используемый в функциях Map и Reduce</typeparam>
        /// <typeparam name="TResult">Тип элементов результирующей коллекции</typeparam>
        /// <param name="source" />Исходная коллекция</param>
        /// <param name="map" />Функция Map</param>
        /// <param name="reduce" />Функция Reduce</param>        
        static IEnumerable<TResult>  MapReduce<TSource, TKey, TValue, TResult>(
            IEnumerable<TSource> source,
            Func<TSource, IEnumerable<KeyValuePair<TKey, TValue>>> map,
            Func<KeyValuePair<TKey, IEnumerable<TValue>>, TResult> reduce)
        {
            // Коллекция, где сохраним результаты map функции
            var mapResults = new ConcurrentBag<KeyValuePair<TKey, TValue>>();

            // Выолним функцию Map паралельно для каждого элемента исходной коллекции
            Parallel.ForEach(source, sourceItem =>
            {
                foreach (var mapResult in map(sourceItem))
                {
                    mapResults.Add(mapResult);
                }
            });

            // Сгрупируем все значения по ключам
            var reduceSources = mapResults.GroupBy(
                    item => item.Key,
                    (key, values) => new KeyValuePair<TKey, IEnumerable<TValue>>(key, values.Select(i=>i.Value)));

            var resultCollection = new BlockingCollection<TResult>();

            // Старуем reduce
            Task.Factory.StartNew(() =>
            {
                // Выполняем функцию Reduce паралельно для каждого элемента reduceSources
                Parallel.ForEach(reduceSources,
                                (reduceItem) => resultCollection.Add(reduce(reduceItem)));
                
                resultCollection.CompleteAdding();
            });
            
            return resultCollection.GetConsumingEnumerable();
        }
    }
}



Я не мог удержаться чтобы не сделать перевод, такого по сути красивого обьяснения этого алгоритма в виде трех картинок, почитать оригинал можно тут

1 комментарий: