Типичная задача работы с социальными сетями обычно состоит в отслеживании и анализе упоминаний о продукте, компании или персоне, что подразумевает следующие этапы: выгрузку данных из социальной сети; сохранение сырых данных в базе на случай, если данные будут удалены из социальной сети, и для оперативной работы приложений; индексацию данных и поиск. Как оказалось, система мониторинга социальных сетей, ограниченная данными из одного субъекта России, может быть легко организована классическими средствами: например, СУБД MySQL, сборщика — на языке PHP, подсистемы индексации и поиска — на базе Apache Solr [1]. Однако при увеличении объемов данных, собираемых из социальных сетей для их мониторинга, производительность такого решения резко падает на всех этапах обработки.
Тонкая оптимизация настроек MySQL, индексов и запросов позволяет вдвое увеличить производительность, но этого недостаточно — стало ясно, что требуется переработка всей системы хранения. Одно из решений — создание распределенной базы на основе нескольких серверов MySQL, оно успешно применяется, например, в сетях «ВКонтакте», Facebook и Badoo. Но для этого необходимо разработать систему управления серверами MySQL, обеспечить репликацию и автоматическое восстановление узлов, что требует дополнительных ресурсов. Поиск готового решения класса OpenSource привел к проектам Hadoop+HBase [2], в которых отказоустойчивость предлагается уже «из коробки» вместе с запуском задач MapReduce для распределенной обработки данных на узлах кластера. Однако, чтобы технологии из стека Hadoop действительно помогли организовать поиск в реальном времени по данным из социальных сетей без каких-либо ограничений, требуется найти оптимальную с экономической точки зрения конфигурацию кластеров Hadoop и Solr.
Типичный кластер Hadoop содержит физические управляющие узлы, узлы хранения и обработки, причем в небольших кластерах нагрузка на управляющие узлы минимальна и их можно разместить на виртуальных машинах. Стандартом де-факто для узлов хранения данных является двухпроцессорная серверная платформа с большим объемом памяти и более чем 12 дисками. Выбор такой конфигурации обусловлен тем, что обработка данных в Hadoop связана с постоянным чтением с дисков. И распараллеливание чтения данных с разных дисков увеличивает производительность. Если при выполнении операции чтения у сервера имеется достаточно процессорных ресурсов и оперативной памяти, то при увеличении количества дисков производительность растет линейно, причем оптимальным по стоимости является объем диска в 2 Тбайт.
Двухпроцессорные серверы достаточно дороги, а при росте производительности их стоимость растет нелинейно, поэтому был выбран однопроцессорный сервер на базе Xeon E3 (32 Гбайт памяти, шесть дисков по 2 Tбайт). Кроме этого, рассматривался более бюджетный вариант на базе Core i7, однако отсутствие в нем поддержки кода коррекции ошибок (ECC) на больших объемах данных приводит к появлению мусора в базе. Обычно для выполнения аналогичных задач приобретаются десятки серверов, для повышения надежности которых ведущие производители предлагают RAID-контроллеры, специальные системы хранения и другие дорогостоящие средства, тогда как архитектура Hadoop и HDFS рассчитана на работу на серверах стандартной архитектуры и не требует приобретения дополнительных устройств.
В качестве программной платформы был выбран стабильный дистрибутив CDH 4 от Cloudera, однако позже, при обновлении до CDH 5, обнаружилась несовместимость форматов файлов HBase, что вызвало остановку работы кластера. Поэтому перед выбором следует изучить все доступные версии и оценить возможные затраты при переходе на более свежую.
СУБД HBase взаимодействует с приложениями по собственному RPC-протоколу, реализация которого доступна только на Java, что в нашем случае вызывало сложности ввиду наличия большого объема кода на PHP. Для приложений, разработанных не на Java, предлагается использовать шлюзы Thrift или REST, производительность которых можно сравнить на синтетическом тесте, оценивающем скорость вставки. Производительность REST-шлюза вдвое выше, чем Thrift, однако лидером оказался Java RPC. На реальном проекте эти результаты подтвердились, однако при разработке нового приложения для работы с HBase надо выбирать платформу на Java.
Эксплуатация решения на базе HBase и Hadoop показала, что отдельные сервисы иногда «падают», хотя на работоспособности всей системы это не отражается. Стоит обратить внимание на то, что безупречная работа HBase возможна только при тонкой настройке, для выполнения которой нужен определенный опыт.
В HBase нет вторичных индексов, поэтому если требуется искать данные по какому-либо полю, то нужно вручную делать вторичный индекс в отдельной таблице. Такова плата за производительность и масштабируемость. Для построения вторичного индекса существуют два пути: применение MapReduce с записью индекса в другую таблицу (на формирование индекса требуется время — например, в кластере из 20 серверов для таблицы с 300 млн записями нужно около минуты); использование сопроцессоров (данные обрабатываются при записи в Hbase, что можно считать аналогом триггеров в MySQL, и тогда индекс формируется на лету непосредственно в момент записи).
HBase отлично справляется с хранением большого объема данных и предоставлением быстрого произвольного доступа к информации. Собрать кластер можно из недорогих серверов, его масштабирование осуществляется путем добавления новых серверов, при этом производительность растет линейно. Однако функциональность HBase не покрывает возможностей СУБД MySQL, и к тому же время доступа к данным у HBase выше. Поэтому полностью от MySQL отказываться нецелесообразно — в этой СУБД можно, например, хранить небольшие таблицы.
В уже имеющемся решении система индексации Solr была запущена на одном сервере в единственном экземпляре на индексе размером 300 Гбайт, и хотя обработка поискового запроса занимала приемлемое время, при увеличении объема данных в два раза время увеличивалось втрое, а время индексации — на порядок. Использование кластерного режима SolrCloud позволяет преодолеть эту проблему.
Скорость поиска в первую очередь зависит от размера части индекса, которая помещается в оперативную память, поэтому сохранить приемлемую скорость обработки запросов можно, ограничив размер индекса. Для работы с реальными объемами данных используется кластерная конфигурация, для которой индексы разделяются на независимые части, и операции поиска по каждой части происходят параллельно. Такая схема открывает возможность для использования нескольких дисков на одной машине, и тогда при параллельном поиске можно обрабатывать индексы большого объема. Использование SSD-дисков еще более увеличивает скорость поиска, сокращая время доступа и равномерно загружая диски и процессор, а размер одного индекса можно увеличивать без заметной потери производительности.
Apache Solr позволяет изменять существующий индекс, что дает возможность выполнять инкрементальную индексацию, при которой новые данные сразу становятся доступны для поиска. Такой подход показал неплохие результаты на тестовом стенде, однако на больших объемах данных оказался неудачным. Индексы большого объема использовать предпочтительнее, хотя их изменение приводит к значительным (от 20 секунд и более) задержкам. Избежать этого можно, разделив работу с большими индексами (содержащими историческую информацию) и индексами реального времени. Правда, такое разделение на фоне требования работы в режиме 24x7 неизбежно приводит к необходимости решения задач управления поиском и индексацией. К этим задачам относятся: поиск по историческим данным; репликация исторических индексов, автоматическое создание реплик и их запуск в аварийных ситуациях; построение индекса в режиме реального времени; поиск по данным реального времени; объединение данных реального времени и исторических данных.
Например, в соцсети «ВКонтакте» только за сутки генерируется более миллиарда новых документов, и индекс реального времени уже за день вырастает до критических значений, после чего добавление новых данных в индекс и поиск выполняются неприемлемо долго. Чтобы этого избежать, индексы реального времени периодически автоматически объединяются, оптимизируются и переводятся в разряд исторических. Для хранения исторических индексов и индексов реального времени целесообразно использовать серверы с дисками различной скорости чтения. Твердотельные накопители хорошо применять для чтения, однако они пока существенно дороже обычных, которые имеет смысл использовать для постоянной перезаписи данных. В любом случае при выборе SSD стоит учитывать особенности индекса и задачи поиска — для некоторых прикладных задач индексы можно организовать так, чтобы данные, по которым ведется поиск, лежали на диске «пачкой» (то есть размещались в том же порядке, в каком их запрашивает приложение). Естественно, чтение таких данных происходит быстрее, чем чтение произвольных.
Если историческая информация постоянно не участвует в поиске, время доступа к ней не критично для пользователя или хранить ее вообще не требуется, то можно отказаться от связки Hadoop+HBase в пользу более простых решений, например MongoDB или MySQL. В этом случае исторические данные хранятся в запакованном виде в системах хранения данных – это самый дешевый способ организации online-мониторинга. На российском рынке имеется множество решений, работаюющих именно по такой схеме.
Сложная обслуживающая инфраструктура Solr, разработанная для «быстрого» поиска по данным реального времени и историческим данным, не применима для многих аналитических задач. Это касается анализа: распространения информации в масштабах всей социальной сети; активности пользователей в масштабах всей сети; выявления трендов обсуждений в сети; выявление трендов в различных предметных областях; автоматическая классификация и кластеризация потока сообщений. Все эти задачи требуют совместного использования пакетной и потоковой обработки данных, применения Lambda-архитектуры и средств машинного обучения.
Для создания системы online-мониторинга, отслеживающей только новую информацию в социальных сетях, к подсистеме сбора нужно добавить комплекс интеллектуальных фильтров, выполняемых на уровне потоковой обработки и периодически переобучаемых на пакетных данных. Мы используем два типа фильтров: поиск по словарю и автоматическую классификацию, причем первый встроен в подсистему сбора. Таким образом, фильтрация по словарю не требует создания отдельной подсистемы и горизонтально масштабируется вместе с системой сбора.
Поток данных из социальных сетей, отфильтрованный по словарю, попадает в систему автоматической классификации. В каждой прикладной задаче используется своя пара словарь — классификатор, что позволяет не подвергать классификации весь поток данных. В качестве классификатора используется машинное обучение с учителем, реализованное на базе библиотеки liblinear и хорошо работающее с данными большой размерности. Данные от всех уровней фильтрации (по словарю, от классификатора и ручного ввода) постоянно отправляются в буфер для модерации, во время которой оператор комплекса SEUS оценивает релевантность документов, что позволяет постоянно повышать качество каждой связки словарь — классификатор. Классификатор выполнен в виде отдельного микросервиса и взаимодействует со сборщиком по протоколу HTTP через шлюз. Масштабирование в этом случае осуществляется достаточно просто – система сбора сама по себе распределенная и каждый классификатор работает со своими узлами системы сбора данных.
Результирующий поток получается небольшой и обрабатывается классическими средствами, например с помощью in-memory СУБД Redis либо любого хранилища, построенного на SQL СУБД.
Для решения аналитических задач система сбора постоянно собирает данные обо всех объектах социальной сети, например, для выявления тренда на основании данных о постоянно растущем количестве лайков какого-либо материала, приходится постоянно опрашивать эти показатели для всех материалов. Можно выделить три уровня мониторинга: информации, процессов и адресный мониторинг реакции на информацию.
Прикладной интерфейс социальных сетей устроен таким образом, что для получения новых данных требуется постоянно проводить полное сканирование всей сети, поэтому каждый последующий уровень мониторинга требует на порядок больше ресурсов от системы сбора. Входящий трафик из сети «ВКонтакте», который генерируется системой сбора данных для получения только новой информации – это более 10Гбайт/с, что позволяет в течение суток получать актуальную информацию. Соответственно для сокращения времени появления информации из сети в системе (при условии, что классификация, индексация и поиск не становятся узким местом) можно линейно увеличивать скорость канала доступа к серверам API социальной сети.
Для оптимизации потребления ресурсов используется интеллектуальная распределенная система сбора, автоматически выбирающая данные для сканирования. Решение принимается на основе данных пакетного уровня: распределение пользователей сети по активности; распределение активности пользователей сети по времени суток, географии; распределение реакции сети (активности по репостам, комментариям и лайкам) на появление нового контента в сети; особенностей прикладной задачи пользователя. Даже, применив интеллектуальную систему сбора с набором адаптируемых эвристик, для мониторинга процессов требуется загрузка API социальной сети со скоростью на порядок больше 10Гбайт/с.
Распределение системы сбора на множество небольших сборщиков позволяет использовать стандартное оборудование и не вызывает больших точечных нагрузок на серверы – каждый сборщик выбирает только новые данные и отправляет их в базу, а для выявления новых данных на сборщиках используются локальные базы, содержащие идентификаторы уже собранных данных. В качестве СУБД для такой базы применяется собственная разработка, оптимизированная под задачу сбора данных в условиях минимального объема оперативной памяти. За каждым сборщиком закреплен диапазон ID профилей в социальной сети, по аналогии организацией HBase с системой регионов. Данные из локальных БД через агрегирующие шлюзы в запакованном виде отправляются в систему хранения HBase.
Управление сетью сборщиков — отдельный централизованный сервис, содержащий реестр сборщиков и связанные с ними диапазоны ID профилей и групп социальных сетей. При выходе из строя одного из сборщиков, центральный узел некоторое время ожидает, а после дает команду на запуск нового сборщика.
Solr предоставляет механизм репликации индексов, и в случае нарушения работы сервера с основным индексом поисковые запросы будут обрабатываться репликами, которых может быть больше одной. Это имеет ряд преимуществ: репликация сразу доступна в Solr; если одна реплика отключается, остальные продолжают участвовать в обработке поисковых запросов без нарушения работоспособности; поисковый запрос может быть обработан репликой, что для ускорения работы позволяет распределить нагрузки. Однако это требует избыточных ресурсов на резервирование и увеличивает вероятность нестабильной работы, причем часто происходит рассинхронизация между репликой и оригиналом индекса, не позволяющая восстановить поиск при отключении оригинала. Следует учесть, что особенно часто такие проблемы возникают при аварийном завершении работы Solr, а система резервирования, призванная защитить от нештатных ситуаций, нередко их только усугубляет. Зная такие особенности работы Solr, надо в каждом конкретном случае оценивать целесообразность двойного резервирования ресурсов.
Другое решение задачи резервирования — хранение копий всех индексов в HDFS. В этом случае резервные копии не занимают места на поисковых машинах, однако при работе с HDFS необходимо время на восстановление поиска на резервных копиях, причем избыточность данных все равно сохраняется и даже увеличивается за счет собственной избыточности файловой системы HDFS (400% на HDFS против 200% для стандартной реплики Solr).
Как бы то ни было, Solr за счет распределенной работы обеспечивает поиск по большим массивам данных практически без потери производительности, а его простые настройки дают возможность создавать любую конфигурацию распределения индексов на диске и дисков на сервере. Свободное подключение новых индексов или отключение неактуальных без прерывания поиска открывает дополнительные широкие возможности для «горячей» замены рабочих индексов. Обратная сторона медали заключается в необходимости выполнения ручной настройки или создания сложной инфраструктуры для автоматической работы с индексами, требуемой для операций с реальными данными. Таким образом, стандартный Solr отлично справляется с большим объемом неизменяемых данных, однако в случае постоянного потока новых документов требуется усложнение логики его работы, учитывающей особенности конкретного поискового кластера и данных. В нашем случае это необходимость отдельного кластера для индексации потоковых данных, их автоматического объединения и размещения в поисковом кластере. Не рекомендуется развертывать поисковый кластер на одном оборудовании вместе с другими подсистемами, особенно если применяются однопроцессорные серверы.
***
Даже при ограниченных бюджетах можно быстро развернуть экономное решение для осуществления поиска в социальных сетях, и вполне реально найти оптимальную конфигурацию кластеров Hadoop и Solr для работы с большими потоками данных, используя типовые серверы стандартной архитектуры. Вместе с тем при выборе платформы нужно четко представлять себе объемы обрабатываемых данных, перспективы расширения решения и особенности организации поиска. В противном случае популярные сегодня бесплатные продукты для работы с Большими Данными не дадут ожидаемой отдачи.
Литература
- Дмитрий Морозов. Масштабируемое хранилище журналов // Открытые системы.СУБД. — 2015. — № 1. — С. 20–21. URL: http://www.osp.ru/os/2015/01/13045320 (дата обращения: 19.09.2015).
- Леонид Черняк. Платформы для Больших Данных // Открытые системы.СУБД. — 2012. — № 7. — С.10–14. URL: http://www.osp.ru/os/2012/07/13017635 (дата обращения: 21.09.2015).
Евгений Рабчевский — ассистент, Пермский государственный национальный исследовательский университет, Александр Безруков, Николай Пьянников (info@seuslab.ru) — сотрудники, компания SEUSLab (Пермь).