Эволюция экосистемы Hadoop, взявшей курс на роль фактического стандарта в мире распределенной обработки, во многом повторяет путь, пройденный в 1950–1970-е годы программной индустрией, когда от пакетного режима она двинулась к интерактивным системам и далее к многозадачным системам с разделением времени. В ядре Hadoop 2.0 функции по управлению ресурсами и планированию заданий вынесены из модуля MapReduce в модуль YARN, который на высоком уровне абстракции и с подобающей гибкостью управляет кластером как мультиарендной системой, несущей одновременно разнопрофильные и разнокалиберные нагрузки. YARN часто называют кластерной операционной системой: как и ОС компьютера, этот модуль отвечает за взаимодействие с вычислительными ресурсами и предоставляет их по запросу другим программам с учетом текущих возможностей, приоритетов, расписаний и ограничений. Проекты Spark и Storm на полную мощность используют возможности YARN, привнося в мир Hadoop динамизм, ранее труднодостижимый в простой пакетной парадигме, а проект Tez призван стать мостом между YARN и разработчиками интерактивных и пакетных решений, которые получат доступ к распределенной обработке средствами как новых, так и уже созданных в рамках «старого» Hadoop приложений [1].
Проект Spark был инициирован в Калифорнийском университете в Беркли как замена MapReduce в распределенной обработке данных для кластера Hadoop. MapReduce обладает двумя серьезными недостатками. Первый состоит в том, что он работает только с данными постоянного хранения, на жестких дисках, даже если это промежуточные данные. Второй недостаток актуален для итерационных вычислений и интерактивного анализа данных — задания в MapReduce работают в один проход, и для повторной обработки надо запускать новое задание со всеми вытекающими отсюда накладными расходами. Spark устраняет эти недостатки и для соответствующего класса задач значительно повышает скорость обработки — до двух порядков по сравнению с MapReduce [2], если данные находятся в оперативной памяти, и до 10 раз, если они хранятся на жестком диске, благодаря сокращению операций чтения/записи. Кроме того, ускорение достигается за счет хранения информации о каждом операторе в оперативной памяти, что дает и еще одно важное преимущество — все процессы, связанные с данными (в том числе обработка потоков и пакетов информации и машинное обучение), происходят на одном и том же кластере, в одном и том же приложении. Все это, естественно, обеспечивается без потери способности фреймворка к горизонтальной масштабируемости и с сохранением «бесшовной» архитектуры приложения.
Spark состоит из пяти модулей (рис. 1), работающих в пределах одного кластера данных и одновременно распределенных горизонтально по всему кластеру Hadoop. Модуль SparkSQL обеспечивает интеграцию SQL-запросов со всеми элементами фреймворка. Благодаря представлению информации в виде так называемых упругих распределенных наборов данных (Resilient Distributed Dataset, RDD) удается обеспечить быстрый и простой доступ к данным через прикладные интерфейсы языков Python, Scala и Java. Кроме того, RDD позволяет запрашивать данные и одновременно запускать сложные алгоритмы их анализа. Модуль SparkStreaming предоставляет пользователю возможность писать и запускать приложения на Scala и Java в потоковом режиме. При этом приложение без существенных изменений в коде может работать и с потоками данных, и осуществлять пакетную обработку. Имеется также функция автоматического восстановления данных после сбоев.
Рис. 1. Модули Spark |
Следующие два модуля предназначены для решения конкретных задач. Библиотека машинного обучения MLLib обладает отличной интегрируемостью — ее можно подключать к прикладным интерфейсам, написанным на Java, Python и Scala. Еще два ключевых момента заключаются в скорости тестирования и обучения (в 100 раз быстрее по сравнению с MapReduce) и простом развертывании — библиотека инсталлируется на уже имеющемся кластере Hadoop и работает с уже имеющимися данными.
Последний модуль — прикладной интерфейс GraphX для работы с графами и графо-параллельных вычислений. Модуль обладает такими важными преимуществами, как гибкость («бесшовность» работы как с графами, так и с коллекциями данных) и высокая скорость работы.
Итак, основными преимуществами Saprk являются: высокая скорость работы на всех уровнях, начиная от SQL-запросов и кончая вычислениями на графах и машинным обучением; многофункциональность; сохранение сильных сторон MapReduce (горизонтальное масштабирование, отказоустойчивость) и полная совместимость с экосистемой Hadoop.
Проект Storm инициирован с целью разработки технологии для агрегатора BackType социальных медиа, созданием которого занимается одноименный стартап, финансируемый фондом Y Combinator. Руководитель команды разработки Storm Натан Марц достаточно хорошо известен в мире технологий Больших Данных и функционального программирования в качестве создателя Cascalog — реализации логического языка запросов Datalog к данным в Hadoop, написанной на Clojure (вариант Лиспа для виртуальных машин Java). В 2010–2011-е годы проект развивался по модели с закрытыми исходными кодами, но уже тогда интерес к нему был разогрет предложенной Марцем концепцией «λ-архитектуры», согласно которой большая система обработки входящих потоков данных должна быть устроена трехслойно: данные попадают одновременно в слои пакетной и быстрой обработки, комбинируясь в отдельном слое раздачи (рис. 2).
Трудно сказать, где в этой концепции греческая буква «λ», — явных пояснений автор на этот счет не дал. Скорее всего, будучи до мозга костей функциональным программистом, таким образом Марц прославил λ-исчисление, являющееся его теоретической основой, а потом уже обязательно как-нибудь свяжет λ-абстракцию с каким-либо важным свойством архитектуры. Возможно, это будет мысль о том, что данные в этой архитектуре рассматриваются как универсум, а запрос над ними должен олицетворять собой чистую (свободную от побочных эффектов) функцию над всем универсумом.
Идеологическую поддержку Storm как одной из ключевых технологий для реализации λ-архитектуры Марц усилил книгой с громким названием «Большие Данные. Принципы и лучшие практики масштабируемых систем работы с данными в реальном времени» (Big Data Principles and best practices of scalable realtime data systems), которая стала знаменитой еще задолго до подписания в печать, — по состоянию на конец 2014 года она еще не дописана и читателям доступны только ее отдельные главы в электронном виде. Вероятно, именно Storm, а не агрегатор социальных медиа стал решающим активом стартапа BackType, когда компания Twitter принимала решение о его поглощении в 2011 году. Для Storm итогом этого поглощения стали открытие исходных кодов и передача проекта в инкубатор фонда Apache, где за год с небольшим он стал проектом верхнего уровня, заняв видное и устойчивое место в экосистеме Hadoop.
Рис. 3. Пример топологии в Storm с метафорой «труба — задвижка» |
Что же такого особенного в Storm? Прежде всего это специфичный для мира Больших Данных подход к обработке — если верхним уровнем при обработке данных в кластере MapReduce является пакетное задание (job), то в Storm его роль выполняет «топология», некоторая конструкция обработки входящего потока. Задания запускаются и по мере выполнения завершаются, а «топология», будучи единожды созданной, функционирует постоянно, пока ее не остановит системный администратор. Иначе говоря, топология в Storm — это программно реализованная схема преобразования потока, непрерывного и потенциально бесконечного набора кортежей. По аналогии с потоком в системе водоснабжения разработчики Storm назвали два вида элементов топологии «труб» (spout) и «задвижек» (bolt) (рис. 3). Для создания топологии разработчик должен реализовать методы, заданные в интерфейсах «труб» и «задвижек», отражающие специфические для конкретной задачи атомарные преобразования, которые формируют целостный непрерывный процесс преобразования входящего потока сырых данных.
Кластер в Storm, как и в MapReduce, состоит из главного узла, отслеживающего топологию, и рабочих узлов, непосредственно занимающихся обработкой потока. Демон главного узла — Nimbus — распределяет код обработки по рабочим узлам кластера и занимается отслеживанием их работы. На рабочих узлах функционирует демон Supervisor, запускающий или приостанавливающий задания по командам от Nimbus. Координация между главным узлом и рабочими налажена через Apache Zookeeper, и, вообще, все в Storm изначально было направлено на обеспечение максимальной отказоустойчивости и стабильности.
Storm часто сравнивают со Spark, что в каком-то смысле правомерно — оба проекта являются альтернативами пакетному MapReduce, вносящими в экосистему Hadoop эффективные средства оперативной обработки. Заметна и культурная общность проектов: оба реализованы на модных языках программирования (Storm — на Clojure, а Spark — на функционально-объектном языке Scala, претендующем ни много ни мало на вытеснение Java), оба проекта достаточно быстро прошли «инкубацию» в системе проектов фонда Apache и за год вышли на верхний ее уровень, снискав значительную популярность. Иногда доходит даже до «религиозных войн» между апологетами того или иного проекта, в ходе которых сравнивают производительность (каждая сторона, естественно, в свою пользу) и утверждают единственную правильность выбранного языка и архитектуры. Однако предназначение у проектов разное — Storm представляет собой классическую распределенную систему обработки сложных событий (Complex Event Processing, CEP), выполненную в экосистеме Hadoop, а Spark — это интерактивно-аналитическая система, предназначенная для задач, использующих многопроходные обработки (машинное обучение, граф-анализ, глубинный анализ данных). Обработка входящих потоков событий в Spark Streaming — это, скорее, побочный эффект, равно как и побочным эффектом можно считать возможность в Spark решать «на лету» некоторые аналитические задачи.
Рис. 4. Место Tez в экосистеме Hadoop 2.0 |
Так же как и для создания программ, эффективно использующих возможности ОС, в случае с YARN, воспринимаемой как кластерная операционная система, нужны средства разработки и отладки, библиотеки и фреймворки с достаточно высоким уровнем абстракции. Иначе говоря, нужна среда, позволяющая реализовывать новые парадигмы распределенной обработки, заботясь в первую очередь об алгоритмической части проблематики и не отвлекаясь на низкоуровневые вопросы управления ресурсами. Проект Apache Tez создавался с целью разработки такой среды (рис. 4) и был инициирован в недрах компании Hortonworks. Tez предоставляет разработчикам основные примитивы для создания готовых распределенных приложений и, что особенно важно, механизмы распределенной обработки, предназначенные для проблемно-ориентированных приложений. Идеологическое ядро Tez — это представление стратегий вычислений в виде направленных ациклических графов. Получив задание, сформулированное на языке таких стратегий, Tez может осуществить манипуляции над графами и, обладая доступом к сведениям о ресурсах, передать в YARN задачи таким образом, чтобы решение было максимально эффективным, заранее рассчитывая нужные последовательности выполнения и определяя необходимые уровни параллелизма. Таким образом, миссия создателя нового «движка» распределенной обработки сводится к переводу проблемно-специфичного языка для своего класса задач на более универсальный и стандартный язык примитивов и стратегий Tez. При этом у разработчика, использующего Tez, сохраняется пространство для маневра — узлы направленных ациклических графов, являющиеся входами, выходами и обработчиками, представляют собой Java-классы, лишь реализующие соответствующие интерфейсы, заданные фреймворком. Такой подход роднит Tez со Storm, в котором разработчик создает топологию, реализуя на Java или Clojure интерфейсы «труб» и «задвижек». Но есть и существенное различие — топология Storm после развертывания работает сразу и непрерывно, с бесконечным входящим потоком, а стратегии выполнения Tez отправляются на выполнение по требованию и действуют над определенными наборами данных.
Одно из ключевых инженерных решений, используемых для повышения скорости выполнения заданий в Tez, — повторное использование контейнеров во время выполнения. Для каждого узла графа, представляющего вычислительную стратегию, Tez заранее определяет необходимые параметры контейнера (процессорные ресурсы, ресурсы памяти, окружение, опции командной строки) и, когда контейнеров в кластере еще нет, запрашивает у YARN их создание. В боевом кластере нередко уже существующие контейнеры пригодны для исполнения нового задания, и тогда Tez помечает их для повторного использования, чтобы они не высвобождали ресурсы и не отвлекались на повторную инициализацию, а, дожидаясь новой работы, занимались освобождением ресурсов и повторной инициализацией. Кроме того, часто бывает так, что большинство узлов кластера уже закончили выполнение очередного задания, и, чтобы они не простаивали в ожидании нескольких отстающих, Tez может перепоручить им часть оставшегося задания, если это возможно, либо загрузить их новым.
Еще одна ценная возможность Tez — мощные средства отладки, позволяющие локально «прогонять» стратегии вычислений в одном экземпляре виртуальной машины Java, без запуска на кластере. В этой же связи полезны и встроенные в Tez средства диагностики, позволяющие постфактум изучить статистику выполнения заданий и найти «бутылочные горла» с точки зрения производительности.
Tez уже прочно занял роль среды создания «движков» распределенной обработки над кластерной операционной системой YARN, и лучшее подтверждение этому — переход на использование Tez исторически первых интерактивных Hadoop-средств Hive и Pig. Но самое важное то, что появилось средство, признанное широкими массами разработчиков компонентов экосистемы Hadoop в качестве стандартного фреймворка для взаимодействия с YARN, избавляющего их от поиска в каждом случае уникальных низкоуровневых решений. В конечном счете все это должно внести изрядную долю стабильности и определенности во все усложняющуюся экосистему Hadoop.
***
Зародившись как набор библиотек вокруг пакетной среды MapReduce, Hadoop превратился в развитую экосистему Hadoop 2.0 для работы уже не только с пакетными нагрузками, что стало возможно благодаря проектам Spark, Storm и Tez, которые принесли в мир Hadoop динамизм, ранее труднодостижимый в простой пакетной парадигме.
Литература
- Vijay Agneeswaran. Big Data Analytics Beyond Hadoop: Real-Time Applications with Storm, Spark and More Hadoop Alternatives. — N. J.: Pearson FT Press, 2014, ISBN 9780133837940
- Леонид Черняк. Альтернативы MapReduce для реального времени // Открытые системы. СУБД. — 2014. — № 5. — С. 12–13. URL: http://www.osp.ru/os/2014/05/13041818 (дата обращения: 15.12.2014).
Дмитрий Волков (vlk@keldysh.ru) — сотрудник ИПМ им. М. В. Келдыша РАН; Андрей Николаенко (anikolaenko@acm.org) — системный архитектор, компания IBS. Статья подготовлена на основе материалов серии семинаров «Hadoop на практике. Новые инструменты и проекты».