Аналитика Больших Данных, изначально возникшая в ответ на потребности коммерческих компаний находит сегодня все более широкое применение, в частности, в государственных структурах, имеющих доступ к огромному массиву данных как в рамках отдельной территории, так и страны в целом. Анализ всех имеющихся данных может быть полезен для решения множества экономических и социальных задач, таких как мониторинг и оптимизация дорожного трафика в городских ситуационных центрах или построение систем быстрого реагирования.
В ситуационных центрах российских городов данные о трафике дорожного движения поступают от систем фотофиксации, таких как «Одиссей», «АвтоУраган», «Стрелка» и др. в виде потока текстовых сообщений по фактам фотофиксации номеров транспортных средств, прошедших рубежи наблюдения и потока их фотографий. Интенсивность данного потока зависит от времени суток, количества рубежей контроля и может составлять несколько тысяч событий в секунду. Анализ этого потока позволяет решать следующие задачи:
- выявление автотранспортных происшествий по факту изменения интенсивности потока с целью последующей оптимизации дорожного трафика, например ускорения прибытия различных служб быстрого реагирования;
- определение оптимальных скоростей движения на отдельных участках трассы;
- определение узких мест городской транспортной сети;
- формирование рекомендаций по планированию ремонта отдельных участков трассы в зависимости от загруженности транспортных артерий, типа проходящих по ним автотранспортных средств и др.;
- выделение из общего потока транспортных кортежей групп транспортных средств, последовательно проходящих рубежи наблюдения с превышением среднего для данного участка времени перемещения.
Применение стандартных подходов для решения этих задач неизбежно предполагает использование мощных и дорогостоящих вычислительных ресурсов, тем не менее не гарантирующих приемлимой скорости обработки информации. Использование решений класса  «высокопроизводительная аналитика данных (High Performance Data Analytics)» [1] на основе программного стека технологий обработки Больших Данных Apache Hadoop [2] позволило решить данные проблемы.
Как видно из рис. 1, поток данных, поступающих в ситуационный центр включает не только телеметрию от датчиков, но и множество других сведений: состояние дорожного полотна, данные об авариях и пробках,  сообщения службы 112, данные по распознаванию лиц, и т.п. Все эти сведения поступают на вход различных систем контроля и анализа, осуществляющих их индексацию, архивацию, оценку состояния оборудования и т. д. Для координации всех потоков и уменьшения накладных расходов на передачу данных применяются сервисные шины, обеспечивающие централизованный прием сообщений от различных источников, их сохранение и раздачу потребителям. Главным звеном сервисной шины сегодня, как правило, является реляционная СУБД, обеспечивающая хранение передаваемых данных и служебной информации о состоянии передачи по каждому источнику и потребителю. Из-за большой нагрузки и сложности механизмов обеспечения гарантированной доставки сообщений при распределенной обработке в режиме кластера на сервисных шинах могут происходить сбои. Как показывает опыт применения сервисной шины с реляционной базой MS SQL Server, при интенсивности входного или выходного потока свыше нескольких десятков сообщений в секунду могут возникать проблемы с синхронизацией и передачей данных. Брокер сообщений Apache Kafka из стека решений Apache Hadoop был специально создан для выполнения высокопроизводительной распределенной обработки потоковых сообщений и позволяет устранить большинство перечисленных недостатков.
[Врезка]
Технологии для ситуационного центра
Сегодня имеется два основных независимых поставщика технологий обработки Больших Данных Apache Hadoop: Cloudera и Hortonworks, дистрибутивы которых (CDH и HDP соответственно) поддерживают достаточно большой и быстро расширяющийся стек технологий, однако HDP более открыт и включает систему управления кластером Ambari, допуская строить решения как для ОС Linux, так и для Windows с возможной миграцией в облачную среду Azure HDInsight. Поэтому для реализации задач городского ситуационного центра в качестве базового дистрибутива был выбран дистрибутив HDP, позволяющий тиражировать решение на максимальное количество платформ.
Как видно из таблицы, ПО, входящее в дистрибутив HDP 2.3, позволяет обеспечить распределенную обработку на всех этапах анализа данных, используя одно из основных преимуществ систем, созданных в рамках технологии Apache Hadoop – горизонтальную масштабируемость. Имеющиеся алгоритмы могут эффективно функционировать даже на кластере, состоящим из одного сервера и масштабироваться при увеличении объема обрабатываемых данных в сотни раз путем добавления серверов в горячем режиме.
Таблица. Программный стек HDP
Этап обработки
Программное обеспечение
Прием входных потоков данных
Брокер сообщений Kafka
Индексация, поиск, группировка, фасетизация, кластеризация, простейшая статистическая обработка
Индексатор Solr
Online-обработка интенсивных потоковых данных
Apache Spark Streaming, Storm
Пакетная обработка больших объемов линейных данных
MapReduce, Apache Spark, Apache Spark DataFrame
Пакетная обработка больших графовых данных
GraphX, Apache Giraph
Статистическая обработка, машинное обучение, предиктивный анализ
Apache Spark MLlib

Использование данных технологий позволяет обрабатывать квазиструктурированные данные больших объемов, выполнять автоматическое горизонтальное масштабирование с пропорциональным увеличением производительности и объема обрабатываемых данных, осуществлять быстрый поиск с минимальным использованием операций соединения (join), оперативно обрабатывать потоки событий большой интенсивности и многое другое.
[]
[kafkacurrent3.png]

Рис. 1. Общая схема потоков обработки  данных в рамках ситуационного центра
В потоке выделяются темы (topic в терминологии Kafka) сообщений, каждая из которых может включать один или несколько разделов (partition). Производители (producer) формируют поток сообщения для одного или нескольких разделов одной или нескольких тем, а получатели (consumer) подписываются на один или несколько разделов темы. Производительность Kafka по передаче потоковых сообщений может достигать десятков тысяч сообщений в секунду на одном сервере, а за счет линейной масштабируемости можно обрабатывать десятки миллионов сообщений, как например происходит в системе LinkedIn. Такие показатели достигаются за счет использования следующих решений:
- каждая часть потока (раздел) записывается в очередь (файл) согласно дисциплины FIFO (накладные расходы на ведение линейного файла, по сравнению с SQL-базой, минимальны);
- за счет кэширования файлов-очередей средствами ОС большинство сообщений не записываются на диск, а передаются через оперативную память от производителя потребителям;
- для передачи сообщений по сети используется оптимизированный системный вызов sendFile ОС UNIX, обеспечивающий минимальные накладные расходы на передачу данных (на серверах Kafka не выполняется сбор информации о состоянии очереди для каждого клиента – эта функция остается за клиентом, что позволяет легко распараллелить алгоритм ведения очередей сообщений, обеспечивая практически линейную масштабируемость производительности).
Каждый экземпляр Kafka ведет свой набор разделов очередей. При этом копия каждого раздела поддерживается на другом сервере и в случае выхода из строя одного из них, производители и потребители сообщений незаметно для задачи переключаются на другой.
Аналитическая система городского ситуационного центра должна обрабатывать большие объемы данных, одновременно предоставляя удобный интерфейс для отображения как поступающих данных, так и результатов их обработки. Технологии Apache Hadoop, Apache Spark и Apache Spark Streaming [2]  позволили обрабатывать данные объемом от сотен терабайт до сотен петабайт. Для быстрого доступа к результатам обработки используется сервер-индексатор Solr (рис. 2).
[collage.png]


Сервер Solr выполняет индексацию потоковых данных почти в режиме реального времени (фиксация событий в индексе производятся раз в несколько секунд), поддерживает: различные типы записей в различных ядрах (таблицах); неограниченный объем индексируемых данных за счет горизонтального масштабирования при использовании облачного (cloud) режима; богатый язык поисковых запросов. Кроме этого обеспечивается быстрый поиск (несколько миллисекунд) по большинству запросов, фасетизация, группировка и простой статистический анализ запрашиваемых данных, а также кластеризация хранимых данных. Одна из ключевых особенностей  Solr – его горизонтальная масштабируемость, высокая скорость поиска и обработки информации на данных большого (сотни миллионов записей) объема. Основной поток данных на вход Solr (рис. 1) поступает через адаптеры, получающие данные по индексируемым потокам (темам) от брокера Kafka. Кроме этого данные могут поступать от процессов, выполняющих анализ в реальном времени или в режиме offline. Часть данных может корректироваться, добавляться или удаляться в процессе улучшения их качества, например в результате автоматической корректировки неверно распознанных номеров машин.
Параллельно с индексацией событий сервером Solr поток событий записывается в файловую систему HDFS для их последующей распределенной обработки по технологиям MapReduce и Spark. MapReduce используется в проекта ситуационного центра для решения задач, не требующих сложной итеративной обработки данных и запускаемых с определенной периодичностью (раз в час, сутки, неделю): анализ изображений, статистический анализ и т.п. Обработка всгое массива данных происходит сразу на всех серверах кластера, что сокращает время анализа, причем для части задач возможно использование только шага Map – в этом случае результат записывает в файловую систему, передается на вход Solr или реляционную базу данных. Если объем данных невелик и при развитии системы не предполагается кратного увеличения их объема, то для ускорения обработки данных возможно использование обычных непараллельных алгоритмов обработки.
Для решения сложных аналитических задач, требующих комплексной многоступенчатой обработки или обработки с использованием графов применяются технологии Apache Spark [2], использование которой для определенного класса задач позволяет на порядок увеличить производительность по сравнению с технологией MapReduce за счет выполнения операций непосредственно в оперативной памяти. Apache Spark включает в себя: базовый набор операций с множествами; набор функций для доступа к обрабатываемым данных в SQL-формате (Apache Spark DataFrames); набор статистический функций и функций машинного обучения (Apache Spark Mllib); набор функций для распределенной обработки графов (Apache Spark GraphX).
Apache Spark поддерживает широкий класс операций, позволяющих обрабатывать данные, распределенные по серверам кластера, что дает возможность максимально эффективно использовать процессорную мощность кластера. При грамотном делении на разделы можно свести к минимуму передачу промежуточных данных по сети, что обеспечит практически линейное увеличение производительности кластера при увеличении числа серверов.
Ключевым элементом технологии Spark являются RDD (Resilient Distributed Dataset) – набор данных определенной структуры, используемый на всех этапах обработки (загрузка\выгрузка, обмены с HDFS, формировании выходных данных: реляционных таблиц, индексов Solr и др.). Как правило, в процессе вычисления создаются промежуточные RDD, которые могут быть повторно использоваться либо в текущем расчете, либо после кеширования на диск в других расчетах. Например, исходный поток RDD-событий за счет Spark-трансформаций Map, GroupByKey и ReduceByKey преобразуется в RDD трех типов:
- «события по времени» (ключ -  временной интервал: минуты, часы, …);
- «события по рубежам» (ключ — номер рубежа контроля);
- «события по номерам транспортных средств» (ключ — номер ТС).
Каждый тип RDD также порождает новые типы RDD, в частности, RDD типа  «события по  номерам ТС» порождает RDD типа: «число рубежей пройденных ТС» (ключ: номер ТС, значение — число рубежей); «трасса ТС» - ключ: номер ТС, значение — список пройденных рубежей).
Операционная среда Apache Spark обеспечивает распределенную обработку RDD и их восстановление в случае сбоя или выхода из строя одного или нескольких серверов кластера, обрабатывающих RDD-таблицы. Технология Spark используется при анализе трафика дорожного движения для выполнения следующих операций: формирование RDD ежедневного перемещения транспортного средства через рубежи наблюдения; выделение из общего потока транспортных кортежей — групп ТС, последовательно проходящих рубежи наблюдения через небольшие (менее минуты) интервалы времени; определение на основе данных ежедневного трафика оптимальных скоростей движения на участках трассы между рубежами; автоматическая корректировка ошибочно распознанных номеров; подготовка данных для других задач анализа.
Автоматическая корректировка номеров по результатам анализа номеров машин в рамках одного рубежа позволила на 4% улучшить качество распознавания  номеров, что является неплохим результатом, учитывая большие объемы данных (20 млн. фактов фотофиксации за месяц). Дополнительно в системе используется усовершенствованный алгоритм корректировки, учитывающий расстояние между рубежами при помощи технологий графов и ориентированный на корректировку номеров в режиме реального времени.
Многие задачи ситуационного центра хорошо решаются с применением теории графов: анализ трафика на транспортной сети города, анализ связей между персонами на основе социальных сетей и т. п. Для решения задач такого класса на распределенных графах большого объема применяют специализированные алгоритмы, в частности, модель вычислений Pregel, предложенную Google и реализованную в продукте Apache Giraph. Модель строится путем последовательного выполнения супершагов (superstep).  На первом шаге вершины графа, распределенные на серверах кластера загружаются в оперативную память и выбираются начальные активные вершины. Затем активные вершины обмениваются сообщениями со связанными вершинами, а далее вершины, получившие сообщения в рамках предыдущего супершага обрабатывают поступившие сообщения и формируют, в зависимости от состояния вершины и поступивших сообщений, новые сообщения и передавая их связанным с ними вершинам. Работа алгоритма заканчивается, когда ни одна вершина не сгенерировала нового сообщения в рамках очередного супершага. Полученное в итоге супершагов состояние графа и является решением задачи. Данная модель позволяет решать большинство графовых задач: вычисление кратчайшего пути, подсчет ранга страниц (Page-Ranging) и др. Преимущество данной модели в том, что она позволяет в парадигме MapReduce выполнить распределенные вычисление на графе в рамках одного шага Map, что значительно ускоряет скорость обработки.
Помимо вершинно-ориентированной модели вычисления, в которой происходит распределенная загрузка вершин графа для проекта ситуационного центра была предложена модель, ориентированная на распределенную загрузку ребер графа, которая более эффективна на сильносвязанных графах.
Модуль GraphX, входящий в состав Apache Spark, поддерживает обе модели и позволяет создавать эффективные алгоритмы распределенных вычислений как на слабосвязанных, так и на сильносвязанных графах.
Входящий в состав Apache Spark модуль MLlib статистического анализа и машинного обучения позволяет в рамках анализа трафика дорожного движения выполнять такие задачи как: определение интенсивности движения транспортных средств по уровням наблюдения (камера/полоса, направление, рубеж наблюдения, транспортный район) и по интервалам времени; вычисление средних значений и дисперсии трафика дорожного движения по всем уровням наблюдения и интервалам; кластеризация направления и рубежи по типам графиков интенсивности суточного движения; предсказание трафика движения по рубежам и интервалам движения.
Для решения задач оперативного характера (например: оперативное обнаружение искомых транспортных средств, контроль соответствия трафика по рубежам среднестатистическим значениям и другие) используется потоковая обработка поступающих данных на основе модуля Apache Spark Streaming. Для каждой оперативной задачи запускается процесс, читающий соответствующий поток из брокера сообщений Kafka и анализирующий его в зависимости от поставленной задачи: обнаружение в потоке событий искомых транспортных средств; анализ интенсивности потоков от камер, направлений, рубежей, сравнение их со среднестатистическими значениями и сигнализация оператору о нестандартных ситуациях; формирование картины текущих транспортных потоков уличной сети города, предсказание мест возникновения пробок и способов их ликвидации.
***
Использование стека решений Apache Hadoop в рамках аналитической системы городского ситуационного центра позволило преодолеть проблемы, возникающие в случае использования стандартных решений на основе реляционных моделей: неспособность обработки потоков данных высокой интенсивности и больших объемов, медленный поиск и др. В результате использования новых технологий даже в рамках одного сервера удалось повысить скорость обработки потока с десятков до тысячи событий в секунду, а время поиска данных на полугодовом интервале уменьшить с минут до секунд. Кроме этого на тех же вычислительных мощностях появилась возможность существенно повысить качество данных и уровень подготовки аналитической информации, что в рамках прежнего подхода потребовало бы существенно увеличить мощность кластера.
Построенная система анализа данных фотофиксации тестируется сегодня на данных трафика дорожного движения города Перми. В дальнейшем к системе анализа трафика будут подключаться дополнительные модули, необходимые для функционирования ситуационного центра.

Литература
1. Дмитрий Волков, Лев Левин. Большие Данные и суперкомпьютеры // Открытые системы.СУБД. — 2014. — №7. — С.21-22. URL:http://www.osp.ru/os/2014/07/13042912 (дата обращения: 18.09.2015).
2. Андрей Николаенко, Дмитрий Волков. Новые инструменты Hadoop // Открытые системы.СУБД. — 2014. — №10. — С.12-14. URL: http://www.osp.ru/os/2014/10/13044382 (дата обращения: 18.09.2015).

Алексей Костарев (kaf@nevod.ru) – директор, компания «НЕВОД» (Пермь),
Игорь Постаногов (ipostanogov@outlook.com) -- инженер кафедры «Математическое Обеспечение Вычислительных Систем» Пермский Государственный Научно-Исследовательский Университет. Статья подготовлена на основе материалов доклада, представленного авторами на семинаре «Hadoop на практике: проекты и инструменты». www.ospcon.ru/event/4-i-prakticheskii-seminar-hadoop-na-praktike-proekty-i-instrumenty_121.html