Принятие решений на основе данных предусматривает поиск оптимальных ответов на запросы бизнеса путем анализа, в частности, исторической информации. Так, финансовые компании используют данные для оценки кредитных рисков, выявления мошенничества и инвестиционного моделирования, а телеком-операторы строят единый профиль клиента и повышают конверсию за счет индивидуальных тарифных планов и предложений. Основным инструментом принятия решения на основе данных является аналитическая платформа предприятия, объединяющая информацию из различных систем.

Ядро аналитической платформы обычно представлено корпоративным хранилищем данных и озером данных. Первое традиционно представляет собой кластер, на котором в структурированном формате хранятся исторические данные. Для их загрузки в хранилище инженеры платформы разрабатывают специальные ETL-процедуры в соответствии с требованиями бизнес-заказчиков, обычно работающих с хранилищем через SQL-интерфейс. Озеро данных — набор файлов в распределенной файловой системе, содержащих структурированные, неструктурированные и бинарные (видео, изображения) данные, загрузка которых в озеро, в отличие от ETL, происходит с помощью ELT-процессов (extraction, loading, transformation). Для чтения данных обычно применяются программы на базе Apache Spark.

Корпоративное хранилище предоставляет быстрый доступ к предварительно очищенным и подготовленным данным, но хранит только часть исторической информации. Озеро требует от пользователей больше усилий по извлечению данных, но компенсирует это высокой гибкостью в реализации сложных аналитических приложений. Комбинация «хранилище и озера» дает организациям мощный инструмент поддержки принятия решений.

Проблемы аналитических платформ

Компании, в полной степени раскрывающие потенциал своих данных, получают серьезные конкурентные преимущества. Например, телеком-оператор, который проверит больше гипотез поведения клиентов, сможет сформулировать более привлекательные тарифные пакеты и персонализированные предложения, а также направить свой маркетинг на наиболее перспективный сегмент аудитории пользователей. Однако, несмотря на обилие доступных продуктов и технологий, предприятия все чаще сталкиваются с ограничениями классической архитектуры «хранилище-озеро». Хранилища весьма инерционны и дороги — реализация новых требований бизнеса обычно требует разработки ETL-процедур для добавления в хранилище недостающих данных, что означает привлечение высококвалифицированных инженеров платформы данных. Попытки масштабирования принципа принятия решений на основе данных в организации приводят к росту нагрузки на дефицитные инженерные ресурсы, которые становятся «узким местом». В итоге время внедрения новых аналитических сценариев может составлять месяцы, а значит — налицо упущенная прибыль. Кроме того, хранилище содержит копию операционных данных в собственном внутреннем формате и требует резервирования избыточных серверных мощностей для обеспечения отказоустойчивости. Так как объем накопленной информации ежегодно увеличивается на 20%, наличие дополнительных копий приводит к значительному удорожанию серверной инфраструктуры.

Озера данных обычно хранят всю историческую информацию организации, предоставляя пользователям практически неограниченную гибкость в реализации новых аналитических сценариев. Вместе с тем чтение данных из озера обычно требует специального программного кода, что существенно сужает круг потребителей и не позволяет применять принцип принятия решений на основе данных в масштабе всей организации.

Подобные проблемы усугубляются, когда возникает необходимость объединения данных из одного или нескольких хранилищ и озера данных.

Классическая архитектура платформы данных со временем оказывается не в состоянии удовлетворить растущие требования бизнеса, будь то высокий темп внедрении изменений, увеличение объема данных, числа их источников или количества пользователей.

Trino

Trino — технология Open Source, позволяющая через единый SQL-интерфейс выполнять запросы к данным из разных источников: корпоративные хранилища, озера данных и операционные СУБД. За счет этого пользователи с минимальными расходами на интеграцию могут быстро анализировать все доступные им данные.

Технология Trino (изначально — Presto) была разработана компанией Facebook (прежнее название компании Meta Platforms; признана экстремистской и запрещена в России. — Прим. ред.) для решения задачи интерактивного анализа больших данных как альтернатива парадигме MapReduce [1]. Сегодня тысячи крупных организаций по всему миру успешно используют Presto и Trino для быстрого анализа своих данных.

Технически Trino представляет собой распределенный массивно-параллельный SQL-движок, который обрабатывает запросы пользователей, но данные не хранит. При получении запроса Trino определяет, какие источники содержат запрошенную информацию. Далее запрос преобразуется в последовательность команд к выбранным источникам, а после получения ответов из источников Trino производит финальную интеграцию данных и выдает ответ пользователю. Trino поддерживает работу с различными источниками:

  • озера данных на основе HDFS и S3 и форматов данных Apache Parquet, Apache ORC и Apache Iceberg;
  • корпоративные хранилища данных: Greenplum, ClickHouse и т. п.;
  • СУБД: Postgres, MySQL, Oracle, Microsoft SQL Server;
  • системы NoSQL: Apache Cassandra, Apache Kafka, Redis и т. п.

Перед запуском Trino необходимо сконфигурировать подключение к источникам данных, указав сетевой адрес источника и параметры аутентификации. После этого пользователь запускает один или несколько узлов Trino в зависимости от количества данных. Trino может быть запущен как на рабочей станции аналитика для обработки небольших объемов данных, так и в виде кластера из десятков и сотен узлов для анализа больших данных размером до десятков петабайт.

Узлы обрабатывают данные, которые находятся во внешних источниках. Такой подход, известный как shared storage («разделяемое хранилище»), позволяет независимо масштабировать вычислительные мощности и подсистему хранения, обеспечивая эластичную масштабируемость и упрощая эксплуатацию. Пользователи Trino могут быстро изменять размер кластера — запуск узлов не приводит к перемещению данных. Узлы Trino могут быть быстро развернуты не только локально (on-premises), но и в облаке с использованием технологий Docker и Kubernetes без необходимости конфигурирования подсистемы хранения. Переход к архитектуре shared storage стал одной из ключевых технических инноваций, обеспечивших взрывной рост популярности новых аналитических систем, таких как Snowflake [2] и Trino.

Рис. 1. Архитектуры shared nothing и shared storage

Для сравнения, корпоративные хранилища (Greenplum, Teradata, Vertica) обычно используют архтектуру shared nothing, в которой отдельный узел отвечает и за хранение данных, и за их обработку. В таких системах изменение количества узлов в кластере приводит к перемещению данных для обеспечения необходимого уровня избыточности. Следствием этого является невозможность быстрого изменения размера кластера, что приводит к повышенному потреблению ресурсов и необходимости резервирования избыточных вычислительных мощностей.

Trino выгодно отличается от других продуктов, способных объединять данные из разных источников.

Greenplum имеет специальный интерфейс PXF для работы с внешними данными. Однако сервис PXF отделен от ядра Greenplum [3], а потому имеет ограниченные возможности переноса вычислений в источники (pushdown) и требует дополнительных накладных расходов на передачу данных между PXF и ядром. По этой причине PXF подходит для интеграции небольших объемов данных, но в отличие от Trino не может быть использован для быстрой обработки больших данных.

Система виртуализации данных Denodo [4] позволяет объединять данные из разных источников, но в отличие от Trino не имеет собственного массивно-параллельного движка. По этой причине Denodo вынужден либо переносить максимум вычислений в источник, что не всегда желательно, либо делегировать вычисления в стороннюю систему [5] (например, Apache Spark), что приводит к дополнительным накладным расходам. Trino не только имеет высокопроизводительный массивно-параллельный движок, но и позволяет регулировать, какие вычисления передавать в источник.

Таким образом, открытый проект Trino — это: увеличение скорости внедрения новых аналитических сценариев за счет уменьшения потребности в ETL (бизнес-аналитики получают возможность проверять новые гипотезы в течение нескольких часов или дней вместо недель и месяцев); уменьшение порога входа для конечных пользователей за счет использования привычного языка SQL и интеграции с пользовательскими приложениями; удешевление инфраструктуры за счет сокращения количества копий данных и переноса нагрузки из дорогих корпоративных хранилищ в более дешевые озера; гибкость в выборе архитектуры платформы данных за счет отделения вычислений от хранения.

CedrusData

Российская платформа CedrusData — это коммерческая распределенная аналитическая система на основе Trino, адаптированная для отечественного рынка и предоставляющая дополнительные возможности интеграции данных, улучшения производительности, а также сервисы поддержки и внедрения. Платформа предоставляет пользователям возможность через единый интерфейс анализировать корпоративные данные из разных источников (рис. 2).

Рис. 2. Принципиальная схема работы CedrusData

Созданием продукта занимается компания «Кверифай Лабс», инженеры которой ранее принимали участие в создании высокопроизводительных открытых систем ClickHouse, Apache Ignite, Apache Calcite и Hazelcast.

Для работы с данными необходимо предоставить параметры подключения к источникам. Например, для подключения к озеру данных на основе S3-совместимого объектного хранилища под управлением Hive Metastore пользователь предоставляет сетевой адрес Hive Metastore, сетевой адрес объектного хранилища, а также ключи доступа к нему:

connector.name=hive

hive.metastore.uri= thrift://<адрес metastore>

hive.s3.endpoint=<адрес хранилища>

hive.s3.aws-access-key=<ключ доступа>

hive.s3.aws-secret-key= <секретный ключ>

После конфигурации подключений к источникам пользователь запускает один или несколько узлов CedrusData. Процесс запуска занимает обычно нескольких минут, после чего система готова обрабатывать SQL-запросы.

При получении запроса оптимизатор CedrusData формирует стратегию его выполнения. По итогам оптимизации система делегирует часть вычислений источникам, а финальная обработка (например, операции Join, агрегации, функции визуализации и т. п.) выполняется массивно-параллельным движком CedrusData, который стремится задействовать все доступные ресурсы (центральный процессор, память, сеть) для максимально быстрого выполнения запроса.

Предположим, пользователь хочет получить объем продаж за определенный день по клиентам со штаб-квартирами в Москве. Записи о продажах находятся в озере данных, а справочник клиентов в базе данных СУБД PostgreSQL. Объекты озера данных и СУБД представлены пользователю CedrusData в виде таблиц в разных логических контейнерах — каталогах. Физическая реализация таблицы зависит от типа источника, например, для каталога озера данных таблицей является набор файлов в распределенной файловой системе, а таблица каталога PostgreSQL представлена соответствующей таблицей в этой СУБД. Пользователь пишет запрос, в котором обращается к таблице продаж и таблице-справочнику клиентов, указывая их логические каталоги, что позволяет CedrusData определить, какие источники будут задействованы в запросе (рис. 3).

Рис. 3. Пример выполнения SQL-запроса к двум источникам данных

После планирования запроса CedrusData принимает решение делегировать выполнение фильтра по городу в СУБД PostgreSQL (оптимизация filter pushdown), что позволяет избежать полного сканирования таблицы. Для получения данных из озера CedrusData формирует команды на чтение определенных диапазонов файлов через S3 API. При этом CedrusData предварительно определяет, какие колонки необходимо прочитать (оптимизация projection pushdown). При использовании колоночных форматов хранения, таких как Parquet, данная оптимизация позволяет читать только те колонки, которые нужны для выполнения запроса. Фильтр по дате может быть использован, чтобы cчитывать только те файлы, которые содержат факты за требуемую дату (оптимизация partition pruning). Кроме того, получив из PostgreSQL идентификатор города, CedrusData может использовать его для дополнительной фильтрации данных из озера (оптимизация filter pushdown), что дополнительно еще уменьшает количество данных, которые необходимо передать из озера. Таким образом, в большинстве случаев CedrusData читает только незначительные объемы информации из источников, что обеспечивает высокую производительность. Выполнение операций JOIN и GROUP BY происходит непосредственно в кластере CedrusData.

Пользователи CedrusData могут формировать SQL-запросы вручную или через сторонние инструменты. Например, бизнес-аналитики строят отчеты на основе данных через BITableau, Apache Superset и т. п.; исследователи данных проектируют сложные ML-модели, подключившись к CedrusData из Python; разработчики создают новые приложения, которые работают с CedrusData через интерфейс JDBC.

Главная цель разработки CedrusData — повышение производительности, обеспечение безопасности и мониторинга данных, а также включение дополнительных средств интеграции с источниками данных.

Ключевые отличия CedrusData от Trino:

  • высокопроизводительные коннекторы к Greenplum и Teradata с возможностью параллельного сканирования таблиц;
  • локальное кэширование данных на узлах для повышения производительности чтений и уменьшения нагрузки на источники;
  • новый cost-based-оптимизатор, ускоряющий выполнение запросов за счет выбора оптимальной схемы путем оценки ее «стоимости» и перераспределения данных в кластере;
  • улучшения производительности запросов к файлам в формате Apache Parquet (бинарный, колоночно-ориентированный формат хранения больших данных в среде Hadoop) в озерах данных;
  • расширенные возможности аутентификации и авторизации пользователей во внешних системах.
Рис. 4. Сравнение производительности Trino и CedrusData

Следствием выполненных усовершенствований стало существенное повышение производительности, по сравнению с исходной реализацией Tinto. На рис. 4 представлены данные по тесту TPC-DS (99 различных SQL-запросов) времени выполнения запросов к данным, которые хранятся в файлах Apache Parquet в озере. CedrusData выполняет запросы быстрее за счет выбора более оптимальных планов, а также кэширования повторно используемых данных на локальных дисках узлов кластера. Например, время выполнения TPC-DS запроса 28 в кластере CedrusData занимает в три раза меньше времени, чем в аналогичном кластере Trino.

***

Платформа CedrusData, построенная от открытой технологии Trino, позволяет бизнесу не только через единый интерфейс анализировать корпоративные данные, но и ускорять реализацию новых аналитических сценариев. Инженеры платформы данных получают дополнительную свободу в выборе архитектуры работы с различными источниками данных и средства обеспечения экономия ресурсов. Платформа включена в реестр российского ПО, пользователи получают профессиональную поддержку, помощь во внедрении и участие в формировании публичной дорожной карты ее развития.

Литература

1. Леонид Черняк. Платформы для Больших Данных // Открытые системы.СУБД. — 2012. — № 7. — С. 10–14. URL: https://www.osp.ru/os/2012/07/13017635 (дата обращения: 21.06.2023).

2. Midhul Vuppalapati, Justin Miron, etc. Building An Elastic Query Engine on Disaggregated Storage // URL: https://www.usenix.org/conference/nsdi20/presentation/vuppalapati (дата обращения: 21.06.2023).

3. Venkatesh Raghavan, Alexander Denissov, etc. Platform Extension Framework (PXF): Enabling Parallel Query Processing Over Heterogeneous Data Sources In Greenplum // Portland'20, June 2020, Portland, Oregon, USA. ACM. URL: https://s3.amazonaws.com/greenplum.org/wp-content/uploads/2020/05/12171437/pxf-paper.pdf (дата обращения: 21.06.2023).

4. Наталья Дубова. Еще одна виртуализация // Открытые системы.СУБД. — 2013. — № 6. — С. 50–53. URL: https://www.osp.ru/os/2013/06/13036857 (дата обращения: 21.06.2023).

5. Parallel Processing // User Manual. URL: https://community.denodo.com/docs/html/browse/7.0/vdp/administration/optimizing_queries/parallel_processing/parallel_processing (дата обращения: 21.06.2023).

Владимир Озеров (vozerov@querifylabs.com) – генеральный директор, CedrusData (Москва). Статья подготовлена на основе материалов выступления на форуме BIG DATA&AI 2023 .