Аналитика Больших Данных, изначально возникшая в ответ на потребности коммерческих компаний, находит сегодня все более широкое применение в государственных структурах, имеющих доступ к огромному массиву данных как в рамках отдельной территории, так и в стране в целом. Анализ всех имеющихся данных может быть полезен для решения множества экономических и социальных задач, таких как мониторинг и оптимизация дорожного трафика в городских ситуационных центрах или построение систем быстрого реагирования.

В ситуационные центры российских городов данные о трафике дорожного движения поступают от систем фотофиксации (например, «Одиссей», «АвтоУраган», «Стрелка» и др.) в виде потока текстовых сообщений по фактам фотофиксации номеров транспортных средств, прошедших рубежи наблюдения, и их фотографий. Интенсивность данного потока зависит от времени суток, количества рубежей контроля и может составлять несколько тысяч событий в секунду. Анализ этого потока позволяет решать следующие задачи:

  • выявление автотранспортных происшествий по факту изменения интенсивности потока с целью последующей оптимизации дорожного трафика, например ускорения прибытия различных служб быстрого реагирования;
  • определение оптимальных скоростей движения на отдельных участках трассы;
  • определение узких мест городской транспортной сети;
  • формирование рекомендаций по планированию ремонта отдельных участков трассы в зависимости от загруженности транспортных артерий, типа проходящих по ним автотранспортных средств и др.;
  • выделение из общего потока транспортных кортежей групп транспортных средств, последовательно проходящих рубежи наблюдения с превышением среднего для данного участка времени перемещения.

Стандартный подход для решения этих задач предполагает использование мощных и дорогостоящих вычислительных ресурсов, которые тем не менее не гарантируют приемлемой скорости обработки информации. Альтернативным вариантом может стать использование решений класса «высокопроизводительная аналитика данных» (High Performance Data Analytics) на основе программного стека технологий обработки Больших Данных Apache Hadoop [1, 2].

Рис. 1. Общая схема потоков обработки данных в рамках ситуационного центра
Рис. 1. Общая схема потоков обработки данных в рамках ситуационного центра

 

Как видно из рис. 1, поток данных, поступающих в ситуационный центр, содержит не только телеметрию от датчиков, но и сведения о состоянии дорожного полотна, данные об авариях и пробках, сообщения службы «112», данные по распознаванию лиц и т. п. Все эти сведения поступают на вход различных систем контроля и анализа, осуществляющих их индексацию, архивацию, оценку состояния оборудования и т. д. Для координации всех потоков и уменьшения накладных расходов на передачу данных применяются сервисные шины, которые делают возможным централизованный прием сообщений от различных источников, их сохранение и раздачу потребителям. Главным звеном сервисной шины сегодня, как правило, является реляционная СУБД, обеспечивающая хранение передаваемых данных и служебной информации о состоянии передачи по каждому источнику и потребителю. Из-за большой нагрузки и сложности механизмов, обеспечивающих гарантированную доставку сообщений при распределенной обработке в режиме кластера, на сервисных шинах могут происходить сбои. Как показывает опыт применения сервисной шины с реляционной СУБД Microsoft SQL Server, при интенсивности входного или выходного потока свыше нескольких десятков сообщений в секунду могут возникать проблемы с синхронизацией и передачей данных. Брокер сообщений Apache Kafka, входящий в стек решений Apache Hadoop, специально создан для выполнения высокопроизводительной распределенной обработки потоковых сообщений и позволяет устранить большинство перечисленных недостатков.

 

Технологии для ситуационного центра

Сегодня имеется два основных независимых поставщика технологий обработки Больших Данных Apache Hadoop: Cloudera и Hortonworks, дистрибутивы которых (CDH и HDP соответственно) поддерживают достаточно большой и быстро расширяющийся стек технологий. Однако HDP более открыт и включает систему управления кластером Ambari, что позволяет строить решения как для Linux, так и для Windows с возможной миграцией в облачную среду Azure HDInsight. Поэтому для реализации задач городского ситуационного центра в качестве базового дистрибутива был выбран HDP, позволяющий тиражировать решение на максимальное количество платформ.

Программный стек HDP
Программный стек HDP

 

Как видно из таблицы, программное обеспечение, входящее в дистрибутив HDP 2.3, позволяет обеспечить распределенную обработку на всех этапах анализа данных, используя одно из основных преимуществ систем, созданных в рамках технологии Apache Hadoop, — горизонтальную масштабируемость. Имеющиеся алгоритмы могут эффективно функционировать даже на кластере, состоящем из одного сервера, и масштабироваться при увеличении объема обрабатываемых данных в сотни раз путем добавления серверов в горячем режиме.

Посредством этих технологий можно обрабатывать квазиструктурированные данные больших объемов, выполнять автоматическое горизонтальное масштабирование с пропорциональным увеличением производительности и объема обрабатываемых данных, осуществлять быстрый поиск с минимальным использованием операций соединения (join), оперативно обрабатывать потоки событий большой интенсивности и многое другое.

 

В потоке выделяются темы (topic в терминологии Kafka) сообщений, каждая из которых может включать один или несколько разделов (partition). Производители (producer) формируют поток сообщений для одного или нескольких разделов одной или нескольких тем, а получатели (consumer) подписываются на один или несколько разделов темы. Производительность Kafka по передаче потоковых сообщений может достигать десятков тысяч сообщений в секунду на одном сервере, а за счет линейной масштабируемости можно обрабатывать десятки миллионов сообщений, как, например, происходит в системе LinkedIn. Такие показатели достигаются благодаря использованию следующих решений:

  • каждая часть потока (раздел) записывается в очередь (файл) согласно дисциплине FIFO (накладные расходы на ведение линейного файла, по сравнению с SQL-базой, минимальны);
  • кэширование файлов-очередей средствами ОС, позволяющее не записывать большинство сообщений на диск, а передавать их через оперативную память от производителя потребителям;
  • для передачи сообщений по сети используется оптимизированный системный вызов sendFile операционной системы Unix, обеспечивающий минимальные накладные расходы на передачу данных (на серверах Kafka не выполняется сбор информации о состоянии очереди для каждого клиента — эта функция остается за клиентом, что позволяет легко распараллелить алгоритм ведения очередей сообщений, обеспечивая практически линейную масштабируемость производительности).

Каждый экземпляр Kafka ведет свой набор разделов очередей. При этом копия каждого раздела поддерживается на другом сервере, и в случае выхода из строя одного из них производители и потребители сообщений незаметно для задачи переключаются на другой.

Аналитическая система городского ситуационного центра должна обрабатывать большие объемы данных, одновременно предоставляя удобный интерфейс для отображения как поступающих данных, так и результатов их обработки. Технологии Apache Hadoop, Apache Spark и Apache Spark Streaming [2] позволяют обрабатывать данные объемом от сотен терабайт до сотен петабайт. Для быстрого доступа к результатам обработки используется сервер-индексатор Solr (рис. 2).

Рис. 2. Примеры интерфейсов системы анализа трафика городского ситуационного центра
Рис. 2. Примеры интерфейсов системы анализа трафика городского ситуационного центра

 

Сервер Solr выполняет индексацию потоковых данных практически в режиме реального времени (фиксация событий в индексе производится раз в несколько секунд) и поддерживает различные типы записей в различных ядрах (таблицах), неограниченный объем индексируемых данных за счет горизонтального масштабирования при использовании облачного режима, богатый язык поисковых запросов. Кроме того, обеспечиваются быстрый поиск (несколько миллисекунд) по большинству запросов, фасетизация, группировка и простой статистический анализ запрашиваемых данных, а также кластеризация хранимых данных. Ключевыми особенностями Solr являются его горизонтальная масштабируемость, а также высокая скорость поиска и обработки информации на данных большого (сотни миллионов записей) объема. Основной поток данных поступает на вход Solr (рис. 1) через адаптеры, получающие данные по индексируемым потокам (темам) от брокера Kafka. Данные могут поступать и от процессов, выполняющих анализ в реальном времени или в автономном режиме. Часть данных может корректироваться, добавляться или удаляться в процессе улучшения их качества, например в результате автоматической корректировки неверно распознанных номеров машин.

Параллельно с индексацией событий сервером Solr, поток событий записывается в файловую систему HDFS для их последующей распределенной обработки по технологиям MapReduce и Spark. MapReduce используется в проектах ситуационного центра для решения задач, которые не требуют сложной итеративной обработки данных и запускаются с определенной периодичностью (раз в час, сутки, неделю). К таким задачам относятся анализ изображений, статистический анализ и т. п. Обработка всего массива данных происходит сразу на всех серверах кластера, что сокращает время анализа, причем для части задач возможно использование только шага Map — в этом случае результат записывается в файловую систему, передается на вход Solr или реляционную базу данных. Если объем данных невелик и при развитии системы не предполагается кратного увеличения их объема, то для ускорения обработки данных можно применять обычные непараллельные алгоритмы.

Для решения сложных аналитических задач, требующих комплексной многоступенчатой обработки или обработки с использованием графов, предпочтительна технология Apache Spark [2], применение которой для определенного класса задач позволяет на порядок увеличить производительность по сравнению с технологией MapReduce за счет выполнения операций непосредственно в оперативной памяти. Apache Spark включает в себя базовый набор операций с множествами, набор функций для доступа к обрабатываемым данным в SQL-формате (Apache Spark DataFrames), набор статистических функций и функций машинного обучения (Apache Spark MLlib), набор функций для распределенной обработки графов (Apache Spark GraphX).

Apache Spark поддерживает широкий класс операций, позволяющих обрабатывать данные, распределенные по серверам кластера, что дает возможность максимально эффективно задействовать процессорную мощность кластера. При грамотном делении на разделы можно свести к минимуму передачу промежуточных данных по сети, это обеспечит практически линейное увеличение производительности кластера при увеличении числа серверов.

Ключевым элементом технологии Spark является RDD (Resilient Distributed Dataset) — набор данных определенной структуры, используемый на всех этапах обработки (загрузка/выгрузка, обмены с HDFS, формирование выходных данных: реляционных таблиц, индексов Solr и др.). Как правило, в процессе вычисления создаются промежуточные RDD, которые могут повторно использоваться либо в текущем расчете, либо после кэширования на диск в других расчетах. Например, исходный поток RDD-событий с помощью Spark-трансформаций Map, GroupByKey и ReduceByKey преобразуется в RDD трех типов:

  • «события по времени» (ключ — временной интервал: минуты, часы,.. .);
  • «события по рубежам» (ключ — номер рубежа контроля);
  • «события по номерам транспортных средств» (ключ — номер ТС).

Каждый тип RDD, в свою очередь, порождает новые типы RDD: в частности, RDD типа «события по номерам ТС» порождает RDD типа «число рубежей, пройденных ТС» (ключ — номер ТС, значение — число рубежей), а также «трасса ТС» (ключ — номер ТС, значение — список пройденных рубежей).

Операционная среда Apache Spark обеспечивает распределенную обработку RDD и их восстановление в случае сбоя или выхода из строя одного или нескольких серверов кластера, обрабатывающих RDD-таблицы. С помощью технологии Spark при анализе трафика дорожного движения выполняются следующие операции: формирование RDD ежедневного перемещения транспортного средства через рубежи наблюдения; выделение из общего потока транспортных кортежей групп ТС, последовательно проходящих рубежи наблюдения через небольшие (менее минуты) интервалы времени; определение на основе изучения ежедневного трафика оптимальных скоростей движения на участках трассы между рубежами; автоматическая корректировка ошибочно распознанных номеров; подготовка данных для других задач анализа.

Автоматическая корректировка номеров по результатам анализа номеров машин в рамках одного рубежа позволила на 4% улучшить качество распознавания номеров. Это неплохой результат, если принять во внимание большие объемы данных (20 млн фактов фотофиксации за месяц). Для дальнейшего повышения точности используется усовершенствованный алгоритм корректировки, который учитывает расстояние между рубежами при помощи технологий графов и ориентирован на корректировку номеров в режиме реального времени.

Многие задачи ситуационного центра хорошо решаются с применением теории графов — например, анализ трафика на транспортной сети города, анализ связей между персонами на основе социальных сетей и т. п. Для решения задач такого класса на распределенных графах большого объема применяют специализированные алгоритмы — в частности, модель вычислений Pregel, предложенную Google и реализованную в продукте Apache Giraph. Модель строится путем последовательного выполнения супершагов (superstep). На первом шаге вершины графа, распределенные на серверах кластера, загружаются в оперативную память и выбираются начальные активные вершины. Затем активные вершины обмениваются сообщениями со связанными вершинами, а далее вершины, получившие сообщения в рамках предыдущего супершага, обрабатывают поступившие сообщения и формируют, в зависимости от состояния вершины и поступивших сообщений, новые сообщения и передают их связанным с ними вершинам. Работа алгоритма заканчивается, когда ни одна вершина не сгенерировала нового сообщения в рамках очередного супершага. Полученное в итоге супершагов состояние графа и является решением задачи. Данная модель позволяет решать большинство графовых задач: вычисление кратчайшего пути, подсчет ранга страниц (Page Ranking) и др. Преимущество этой модели в том, что она позволяет в парадигме MapReduce выполнить распределенные вычисления на графе в рамках одного шага Map, благодаря чему значительно ускоряется скорость обработки.

Помимо вершинно-ориентированной модели вычисления, в которой происходит распределенная загрузка вершин графа для проекта ситуационного центра, была предложена ориентированная на распределенную загрузку ребер графа модель, которая более эффективна на сильносвязанных графах.

Модуль GraphX, входящий в состав Apache Spark, поддерживает обе модели и позволяет создавать эффективные алгоритмы распределенных вычислений как на слабосвязанных, так и на сильносвязанных графах.

Входящий в состав Apache Spark модуль MLlib статистического анализа и машинного обучения позволяет в рамках анализа трафика дорожного движения выполнять следующие задачи: определение интенсивности движения транспортных средств по уровням наблюдения (камера/полоса, направление, рубеж наблюдения, транспортный район) и по интервалам времени; вычисление средних значений и дисперсии трафика дорожного движения по всем уровням наблюдения и интервалам; кластеризация направления и рубежи по типам графиков интенсивности суточного движения; предсказание трафика движения по рубежам и интервалам движения.

Для решения задач оперативного характера (обнаружение искомых транспортных средств, контроль соответствия трафика по рубежам среднестатистическим значениям и др.) используется потоковая обработка поступающих данных на основе модуля Apache Spark Streaming. Для каждой оперативной задачи запускается процесс, в котором осуществляется чтение соответствующего потока из брокера сообщений Kafka и анализ его в зависимости от поставленной задачи: обнаружение в потоке событий искомых транспортных средств; анализ интенсивности потоков от камер, направлений, рубежей, сравнение их со среднестатистическими значениями и уведомление оператора о нестандартных ситуациях; формирование картины текущих транспортных потоков уличной сети города, предсказание мест возникновения пробок и способов их ликвидации.

***

Использование стека решений Apache Hadoop в рамках аналитической системы городского ситуационного центра позволило преодолеть проблемы, возникающие в случае применения стандартных решений на основе реляционных моделей: неспособность обработки потоков данных высокой интенсивности и больших объемов, медленный поиск и др. С помощью новых технологий даже в рамках одного сервера удалось повысить скорость обработки потока с десятков до тысячи событий в секунду, а время поиска данных на полугодовом интервале сократилось с минут до секунд. Кроме того, на тех же вычислительных мощностях появилась возможность значительно повысить качество данных и уровень подготовки аналитической информации, что в рамках прежнего подхода потребовало бы существенно увеличить мощность кластера.

Построенная система анализа данных фотофиксации тестируется сегодня на данных трафика дорожного движения города Перми. В дальнейшем к системе анализа трафика будут подключаться дополнительные модули, необходимые для функционирования ситуационного центра.

Литература

  1. Дмитрий Волков, Лев Левин. Большие Данные и суперкомпьютеры // Открытые системы.СУБД. — 2014. — № 7. — С. 21–22. URL: http://www.osp.ru/os/2014/07/13042912 (дата обращения: 18.09.2015).
  2. Андрей Николаенко, Дмитрий Волков. Новые инструменты Hadoop // Открытые системы.СУБД. — 2014. — № 10. — С.12–14. URL: http://www.osp.ru/os/2014/10/13044382 (дата обращения: 18.09.2015).

Алексей Костарев (kaf@nevod.ru) — директор, компания «Невод» (Пермь), Игорь Постаногов (ipostanogov@outlook.com) — ассистент, кафедра «Математическое обеспечение вычислительных систем», Пермский государственный национальный исследовательский университет. Статья подготовлена на основе материалов доклада, представленного авторами на семинаре «Hadoop на практике: проекты и инструменты».