2023-06-06

Динамические фильтры в Trino

Ускоряем обработку SQL-запросов в Trino c помощью динамической фильтрации (dynamic filtering)

Введение

Принцип большинства оптимизаций производительности в аналитических SQL-движках — ответить на запрос пользователя, затратив минимум вычислительных ресурсов. Динамические фильтры — это оптимизация, которая создает дополнительный предикат для одной из сторон оператора Join на основе данных другой стороны.

Так как аналитические запросы часто содержат операции Join и сканируют таблицы большого размера, наличие динамических фильтров позволяет существенно сократить объем обрабатываемой информации, а значит повысить производительность. Рассмотрим реализацию динамических фильтров на примере Trino.

Проблема

Аналитические запросы часто содержат операции Join между большой таблицей фактов и одним или несколькими измерениями, к которым применены фильтры. Следующий запрос из схемы TPC-DS возвращает статистику ежедневных продаж за один месяц:


SELECT d_date, SUM(ss_net_paid)
FROM store_sales JOIN date_dim 
  ON ss_sold_date_sk = d_date_sk
WHERE d_year = 1998 AND d_moy = 6
GROUP BY d_date

Запрос не содержит предикатов по таблице фактов `store_sales`, а значит в общем случае мы должны прочитать ее целиком. Если таблица содержит продажи за несколько лет, то большая часть прочитанных данных будет отброшена, так как нас интересуют факты только за один месяц. Каким образом можно оптимизировать данный запрос, что бы читать меньше данных из таблицы `store_sales`?

Принцип

Динамические фильтры — это оптимизация, которая создает дополнительные предикаты к различным операторам в процессе выполнения запроса. Распространенный вариант данной оптимизации — создание фильтра на основе данных одной стороны Join, и его применение к операторам сканирования на другой стороне. Это позволяет выбрать более оптимальную стратегию сканирования данных, даже если оригинальный запрос не содержит явных предикатов для конкретной таблицы.

В нашем запросе оператор Join объединяет данные из таблицы-справочника `date_dim` и большой таблицы фактов `store_sales`. Таблица `date_dim` имеет предикаты по атрибутам `d_year` и `d_month`, при этом Join происходит по суррогатному ключу `d_date_sk`. Таблица `date_dim` содержит одну строку на каждый день. Таким образом, если мы отсканируем таблицу `date_dim` и применим предикаты, мы получим 30 записей.


d_year=1998, d_moy=6, d_date_sk=2415051 // 1 июня 1998
d_year=1998, d_moy=6, d_date_sk=2415052 // 2 июня 1998
...
d_year=1998, d_moy=6, d_date_sk=2415080 // 30 июня 1998

Так как по условию запроса `ss_sold_date_sk = d_date_sk`, мы можем создать дополнительный предикат для атрибута `ss_sold_date_sk`. Если физическая реализация таблицы `store_sales` может использовать данный предикат для выбора более оптимального метода сканирования, запрос будет выполнен быстрее.


s_sold_date_sk IN (2415051, 2415052, ..., 2415080)

Например, в OLTP системах таблица `store_sales` вероятно будет иметь primary index по атрибуту `s_sold_date_sk`. Оптимизаторы промышленных СУБД обычно умеют использовать индексы для условий IN (см. наш блог про searchable arguments), поэтому благодаря динамическим фильтрам мы сможем заменить full scan на index seek.

Если мы работаем с аналитическим хранилищем или озером данных, то таблица `store_sales` может быть партиционирована по атрибуту `s_sold_date_sk`. В таком случае движки могут применить оптимизацию partition pruning для чтения только тех партишенов, которые удовлетворяют условию динамического фильтра.

Рис. 1 — Оптимизация Scan с помощью динамических фильтров

В общем случае динамические фильтры могут быть использованы и для того, что бы уменьшить количество данных, доставляемых до оператора Join. Предположим, что мы работаем с озером данных, в котором таблица `store_sales` не партиционирована по атрибуту `s_sold_date_sk`. В таком случае применение динамического фильтра не поможет уменьшить объем сканируемых данных. Тем не менее с помощью динамического фильтра мы можем уменьшить количество записей левой стороны Join, которые необходимо сопоставлять с записями правой стороны. Если в конкретном движке применение предиката к записи дешевле, чем ее сопоставление с записями другой таблицы (например, лукап в хэш-таблицы в случае hash join), мы можем получить прирост производительности. На практике такой прирост обычно не столь значителен, как замена full scan более оптимальным методом доступа.

Рис. 2 — Оптимизация Scan и Join с помощью динамических фильтров

Динамические фильтры в Trino

В процессе оптимизации запроса Trino определяет позиции динамических фильтров в плане. Источником динамических фильтров является оператор Join, в котором присутствует условие равенства атрибутов с обеих сторон (equi-join). В таком случае существует возможность получить значения атрибута Join справа и передать их влево. Пример подходящего запроса:


SELECT * FROM fact 
  INNER JOIN dim1 ON fact.a = dim1.id
  INNER JOIN dim2 ON fact.b = dim2.id

Join так же должен удовлетворять ряду дополнительных условий, учитывающих реляционную семантику и особенности движка Trino. Например, динамические фильтры нельзя применять для `LEFT OUTER` и `FULL OUTER` Join-ов, потому что в этих случаях запись из левой стороны Join должна присутствовать в результате, даже если нет подходящих записей справа, и дополнительная фильтрация может привести к некорректному результату.

За расстановку динамических фильтров в Trino отвечает ряд правил оптимизатора, которые конструируют динамические предикаты, и создают вспомогательные операторы. Основные правила: PredicatePushDown, RemoveUnsupportedDynamicFilters и AddDynamicFilterSource. Принципиально данные правила делают следующее:

  1. Происходит обход узлов плана сверху вниз.
  2. Встретив подходящий оператор Join, Trino создает два узла: узел `DynamicFilterSource` справа, который будет собирать статистики колонок equi-join условия с правой стороны; и узел `Filter` слева, которому будут переданы собранные значения колонок.
  3. Происходит попытка пробросить фильтры в нижестоящие операторы (pushdown) таким образом, что бы `Filter` оказался над оператором `TableScan`. Перед началом выполнения запроса, Trino объединяет `Filter` и `TableScan` в один монолитный оператор, что позволяет использовать предикат фильтра для выбора оптимального способа сканирования таблицы.

План до оптимизации:

Рис. 3 — План запроса до расстановки динамических фильтров

План после оптимизации:

Рис. 4 — План запроса после расстановки динамических фильтров

Динамические фильтры могут быть локальными для узла или распределенным. Локальные фильтры применимы тогда, когда можно сконструировать предикат и применить его в рамках одного драйвера (подробнее про понятие "драйвер" можно почитать в нашей статье про параллелизм Trino). Это происходит в случае выполнения Join в co-located режиме, когда данные слева и справа партиционированы по соответствующим атрибутам equi-join условия.

В большинстве случаев динамические фильтры являются распределенными. В этом случае перед запуском запроса узел-координатор Trino регистрирует динамические фильтры. Экземпляры оператора `DynamicFilterSource` на удаленных узлах собирают статистики прошедших через них данных и отправляют их на координатора. Когда координатор получает статистики от всех узлов, он объединяет их с помощью дизъюнкции, формирует финальный фильтр, который теперь может быть использован операторами `TableScan`, и отправляет его на остальные узлы.

Рис. 5 — Взаимодействие узлов в процессе создания распределенного динамического фильтра

Статистика может быть представлена конкретными значениями или диапазонами, см. Domain и TupleDomain. Например, при небольшом количестве входных данных статистика может состоять из нескольких конкретных значений, например `a := (1, 3, 100)`. Если значений становится слишком много, может произойти compaction, который создаст более широкий предикат, например `a := [1:100]`.

В Trino сканирование таблицы состоит из двух этапов:

  1. Координатор определяет части таблицы, называемые Split, которые могут быть отсканированы независимо. Например, при чтении файлов из озера данных единицей сканирования может быть конкретный файл или его часть.
  2. Координатор рассылает запросы на обработку сплитов по узлам.

Динамические фильтры могут быть использованы на обоих этапах.

При формировании списка сплитов динамический фильтр помогает отбросить те из них, которые заведомо не содержат искомые данные. Например, если таблица `fact` — это набор файлов Parquet, партиционированных по атрибуту `a`, и динамический фильтр представлен условием `a := (1, 3)`, то мы можем ограничить сканирование таблицы только теми файлами, которые находятся в партишенах `a=1` и `a=3`. Так как вычисление динамического фильтра может занять время, Trino позволяет опционально отложить рассылку сплитов на некоторое время, что бы увеличить вероятность того, что динамический фильтр будет задействован.

Если мы не смогли задействовать динамический фильтр при рассылке сплитов, то мы можем попробовать использовать его в процессе обработки конкретного сплита. Например, мы можем пропускать чтение некоторых row group Parquet, сравнивая их статистики со значением динамического фильтра.

Рис. 6 — Фильтрация партишенов и row group Apache Parquet с помощью составного динамического фильтра

Динамические фильтры включены в Trino по умолчанию. Соответствующий параметр конфигурации — `enable-dynamic-filtering`. Вы можете включать и выключать динамические фильтры для индивидуальных запросов с помощью параметра сессии `enable_dynamic_filtering`. Некоторые коннекторы поддерживают установку таймаута ожидания динамических фильтров перед рассылкой сплитов. В наиболее популярном коннекторе Hive за это отвечает параметр конфигурации `hive.dynamic-filtering.wait-timeout` и параметр сессии `<имя_каталога>.dynamic_filtering_wait_timeout`.

Пример

Рассмотрим выполнение запроса из начала статьи. Мы будем работать со схемой TPC-DS, scale factor 1000 (1 Tb сырых данных). Данные хранятся в озере в формате Parquet. Таблица `store_sales` партиционирована по атрибуту `ss_sold_date_sk`.


SELECT d_date, SUM(ss_net_paid)
FROM store_sales JOIN date_dim 
  ON ss_sold_date_sk = d_date_sk
WHERE d_year = 1998 AND d_month = 6
GROUP BY d_date

В отсутствие динамических фильтров мы вынуждены сканировать таблицу фактов целиком, это `2 879 987 999` записей и около 24 Gb сжатых данных. Время выполнения такого запроса на тестовом стенде составило почти 10 секунд.

Динамические фильтры дают нам возможность определить тридцать конкретных значений `ss_sold_date_sk` и использовать их для прунинга большинства партишенов. При этом происходит чтение лишь `26 090 827` записей (~250 Mb), а выполнение запроса занимает 2.5 секунды. Таким образом, с помощью динамических фильтров мы смогли ускорить запрос в 4 раза и уменьшить количество зачитываемых данных примерно в 100 раз.

Рис. 7 — Результат применения динамического фильтра

Заключение

Динамические фильтры — это важная оптимизация, которая позволяет создать дополнительный фильтр для одной из сторон Join на основании данных другой стороны. Если созданный фильтр удается задействовать в сканировании таблицы, это может существенно снизить количество сканируемых данных. Реализация динамических фильтров в Trino поддерживает широкий спектр запросов и в большинстве случаев значительно увеличивает производительность.

В следующий раз мы расскажем про то, как оптимизатор Trino планирует порядок Join-ов. Спойлер: там не все хорошо. Поэтому в CedrusData мы работаем над альтернативным алгоритмом планирования Join-ов. Почитать подробнее про данную инициативу можно в нашем публичном родмапе.

Не забывайте подписываться на наши канал и чат в Telegram, где мы публикуем интересный контент про Trino и CedrusData.