avlasov (avlasov) wrote,
avlasov
avlasov

Categories:

Мини-учебнег по распределенным системам/протоколам. MapReduce как пример распределенной системы

Проанализируем теперь подробнее MapReduce фреймворк, который сильно облегчает создание распределенной системы, если ее работу можно описать последовательностью Map и Reduce операций. Точнее, там есть еще несколько промежуточных операций, но Map и Reduce - ключевые. Они весьма близки тем map/reduce операциям, что мы рассмотрели ранее, хотя есть и некоторые отличия.

Map - это не просто map, он преобразует пару элементов (key1, value1) в список list(key2, value2), т.е. это flatMap операция, и она работает не просто над элементами, а над парами ключ/значение.
Reduce - это чуть модифицированный reduce. Более точно, между Map и Reduce есть еще Shuffle фаза, т.е. результат работы Map операции группируется по ключам key2, и к каждой группе применяется Reduce. Т.е. это функция вида (key2, list(value2)) -> list(value3).
В-общем, идеология очень близка к тем операциям, что мы рассмотрели ранее, но по сути это map/groupBy/reduce который работает не на простых элементах, а на парах ключ/значение. Ну и доработанная под распределенную специфику.

В нашем случае не так важны детали реализации, хотелось бы рассмотреть проблему восстановления после сбоев в такой системе. Как эта система решает проблему параллелизации и ускорения вычислений, думаю и так должно быть понятно. Т.е. Map операции они и так очень параллелизуемы (при условии, что мы изначально можем разбить наши данные на много незавимисых кусков). Ну а группировка по ключу key2, если ключей много, тоже позволяет параллелизовать редуцирование пар key2/list(value2), даже если сама операция редуцирования не ассоциативна. Впрочем, там есть в какой-то мере параллелизовать и редуцирование, использование combine (но это уже детали).

Рассмотрим чуть подробнее исполнение MapReduce Job'а:
1 подготовка входных данных - их надо разбить на несколько частей, которые будут обрабатываться параллелельно mapper'ами
2 исполнение Map операций
3 shuffle, сортировка результатов работы мапперов, группировка и пересылка reducer'ам
4 исполнение Reduce операций
5 запись результатов в stable хранилище

Отмечу, что я тут рассматриваю не детали конкретной реализации MapReduce, а некую обобщенную схему реализации, детали в которой могут существенно варьироваться.

Граф операций

В Map/Reduce есть много возможностей для извлечения параллелизма, т.е. некоторые этапы можно разбить на параллельные подзадачи, которые можно исполнять разных нодах. Соответственно, между этой кучей задач есть зависимости, например, чтобы начать делать Reduce задачи нужно собрать результаты работы Map задач, а также их отсортировать и проч.

Рассмотрим структуру образующегося графа задач. Для простоты будем считать, что каждая подзадача завершается записью результата на диск - пусть и не обязательно в трехкратном количестве. Т.е. задачи в графе взаимодействуют через диск, точнее через файловую систему (HDFS). При этом, на самом деле, результаты могут кэшироваться в памяти, а могут держаться на локальном диске, а могут и реплицироваться на несколько дисков. Нам важно, чтобы самый конечный результат был в надежном хранилище, а к промежуточным результам нет таких жестких требований, ибо в случае чего можно их пересчитать заново. И так мы можем экономить сетевой трафик, при необходимости. Ибо если данных много или очень много, то перекачка по сети может занять сильно больше времени, нежели пересчитать заново.

Также для простоты считаем, что мы очередные фазы делаем после того, как закончились предыдущие фазы, т.е. готовы все входящие данные. На практике, конечно, иногда можно начинать и раньше, но это уже оптимизация и вне темы поста.

1 Input split. Разбиваем входные данные на N кусков, которые будут обрабатываться мапперами независимо. Для простоты считаем, что результат разбиение (начало и длина каждого из N кусков), записываем на диск.
2 Map. Каждый Map процесс читает свой кусок, разбивает его на пары ключ/значение и производит соответствующую map операцию. Опять-таки считаем что результат пишем на диск. Конечно, желательно, чтобы маппер гонялся на той же ноде, где расположен соответствующий кусок входных данных. Ну и результат тоже желательно пока писать на локальный диск (или кэшировать в памяти).
3 Partition. Нужно теперь разбить результаты работы мапперов на части, которые можно подать на вход редьюсерам. Точнее тут две подфазы: разбить все результаты работы всех мапперов на куски, которые относятся к разным редьюсерам. И внутри каждого куска нужно сгруппировать данные по ключу (и отсортировать ключи). Partition это первая фаза, она нужна ибо нам нужно определить какие данные куда пересылать. Для простоты считаем что partition делается на той же ноде где соответствующий маппер произвел результат. А вот группировака/сортировка делается на стороне редьюсера.
На самом деле, на стороне мапперам можно тоже уже сгруппировать данные и сделать операцию combine, т.е. частично редуцировать результат. Таким образом, можно гонять по сети меньше данных. Но это уже детали.
Итого на Partition стадии, результаты работы каждого маппера раскидываются по партициям, соответствующих разным целевым редьюсерам. Это делается с помощью partition функции, которая фактически есть обычная хэш-функция, ну типа ее задача максимально равномерно раскидать ключи по разным редьюсерам.
4 Sort. Далее считаем, что на стороне редьюсера читаются все партиции, относящиеся к данном редьюсеру, результаты читаются, группируются и сортируются по ключу. Для простоты тоже считаем что результат пишем на локальный диск.
5 Reduce. Далее работают функции reduce. Это уже конечный результат, так что пишем его сразу в стабильное хранилище. Но, на самом деле, оно сперва может писаться на локальный диск, а потом реплицироваться в нужное кол-во копий.

Поскольку мы считаем, что фазы коммуницируют через диск, то считаем, что каждая (под)задача выполнилась тогда, когда ее результат записан на диск, и он дожил до того, как его прочитал потребитель - а в случае конечного результата, она отреплицировалась нужное кол-во раз.
Если нода умерла в процессе работы, или умерла нода с промежуточными данными, и они не были реплицированы, то считаем, что соответствующая задача обломилась и нужно повторить попытку.

Восстановление после сбоев

Процедура восстановления после сбоев получается относительно простая (на уровне схемы, реально конечно есть множество деталей реализации). Мы всегда по описанию исходной задачи сконструировать граф задач (N мапперов и M редьюсеров): входные данные бьем на N частей, далее результат каждой части бьются на M партиций. Партиции с разных мапперов сливаются, группируются/сортируются, запускаются редьюсеры, результат пишется в нужное место.
Так как мы считаем, что задачи обмениваются результатами через диск, то мы всегда можем посмотреть на диск и определить завершилась соответствующая задача или нет. Конечно, тут можно отметить такой популярный прием: результат пишется во временный файл, а как только задача завершила работу, файл переименовывается в то имя, которое ожидается на выходе, ну типа там результат-работы-маппера-номер-xxx-попытка-yyy.
Итого, значительную часть состояния процесса можно хранить на диске (что весьма удобно). Если файловая система поддерживает кэширование, разные уровни репликации и проч, то еще удобнее.

Однако, нужно еще хранить адреса нод, которые участвуют в обработке результатов. Считаем что у нас есть два вида распределенных процессов: один JobTracker - координатор, который отслеживает исполнение MapReduce Job в целом, и TaskTracker'ы - процессы, по одному на каждую рабочую ноду, которые запускают задачи, отслеживают их состояние и проч. Для простоты мы игнорируем HDFS процессы (NameNode/DataNode).

В нашем случае, заботиться о восстановлении может только JobTracker. Т.е. он должен знать список worker'ов (TaskTracker'ов), отслеживать его изменения (добавления, удаления, сбои). JobTracker распределяет задачи (мапперы и редьюсеры) по TaskTracker'ам, перераспределяет в случае сбоев. Ну и ему нужно эту информации о распределении задач сохранять на диске, чтобы в случае сбоя, можно было восстановить картину исполнения Job'а.

Итого, если произошел сбой, то после перезапуска, JobTracker восстанавливает граф задач, смотрит на диск, выясняет какие задачи успели завершиться. И, соответственно, какие задачи еще нужно сделать.
Далее смотрим список TaskTracker'ов с учетом изменений их статуса (добавлены, исключены по тем или иным причинам). Нужно опросить TaskTracker'ы, потому как некоторые из них могут выжить и продолжать работать над своими задачами. Таким образом, можно выяснить какие из еще неисполненных задач находятся в процессе исполнения. Оставшиеся задачи, если они есть, нужно перераспределить по TaskTracker'ам в соответствии с scheduling политикой.

В принципе, процедура начального исполнения не сильно отличается от процедуры восстановления, просто исполненных задач еще нету, и задачи вообще никуда не назначены. Т.е. на диске в соответствующих директориях ничего не должно быть, ну и TaskTracker'ы тоже должны отвечать, что они ничем не заняты.

В принципе, TaskTracker'ы могут вообще не заботиться о восстановлении после сбоев. Типа если отвалился TaskTracker то и бог с ним. Но, можно и дополнить алгоритм, т.е. если допустим нода отвалилась от сети, а потом снова подключилась, причем JobTracker заметил что соответствующая нода не отвечает, и исключил ее из списка.
Соответственно, если нода снова вернулась в сеть и JobTracker ее обнаружил, то нужно смерджить состояние. Т.е. если JobTracker успел запустить соответствующие задачи на других нодах, то нужно выбрать кем пожертвовать. Если не успел, то тем проще. Ну и в любом случае, вновь объявившуюся ноду можно использовать для запуска на ней новых задач в соответствии со scheduling политикой.

Отмечу, что тут может быть потенциальный конфликт: JobTracker решил что нода отвалилась, и переназначил ее задачи другим нодам. Но тогда ведь они могут писать результаты под теми же именами. А что если первая нода не умерла, а просто отвалилась, а потом объявится и тоже будет писать результат в то же место? Чтобы сей конфликт разрешить, нужно чтобы файл с результатом содержал номер попытки. Т.е. изначально нода исполняет задачу и это типа попытка номер 1. Если JobTracker переназначает ноду, то это попытка номер 2. Ну и т.д. Таким образом, конфликт по именам разрешается.
И посему JobTracker должен записывать на диск какая попытка какой задачи (Task Attempt) назначена на какую ноду.
Subscribe
  • Post a new comment

    Error

    default userpic

    Your reply will be screened

    When you submit the form an invisible reCAPTCHA check will be performed.
    You must follow the Privacy Policy and Google Terms of use.
  • 0 comments