Параллельный и распределённый GROUP BY

Параллельный и распределённый GROUP BY

Обо мне

Алексей, разработчик ClickHouse.

С 2008 занимался движком обработки данных в Яндекс.Метрике.

 

ClickHouse — это аналитическая СУБД.

Один запрос — много данных на входе, мало на выходе.

Данные нужно агрегировать налету.

Метрика 2.0

Пример запроса

SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u
FROM hits
WHERE MobilePhoneModel != ''
GROUP BY MobilePhoneModel
ORDER BY u DESC

 

Чтобы быстро обрабатывать запросы, данные необходимо:

 

Конвейер выполнения запроса:

— фильтрация, JOIN, агрегация, сортировка...

Как тестировать производительность?

Бенчмарки должны быть:

Пример бенчмарка (не лучшего)

/** Выполнять так:
for file in MobilePhoneModel PageCharset Params URLDomain UTMSource Referer URL Title; do
 for size in 30000 100000 300000 1000000 5000000; do
  echo
  BEST_METHOD=0
  BEST_RESULT=0
  for method in {1..10}; do
   echo -ne $file $size $method '';
   TOTAL_ELEMS=0
   for i in {0..1000}; do
    TOTAL_ELEMS=$(( $TOTAL_ELEMS + $size ))
    if [[ $TOTAL_ELEMS -gt 25000000 ]]; then break; fi
    ./hash_map_string_3 $size $method < ${file}.bin 2>&1 |
     grep HashMap | grep -oE '[0-9\.]+ elem';
   done | awk -W interactive '{ if ($1 > x) { x = $1 }; printf(".") } END { print x }' |
    tee /tmp/hash_map_string_3_res;
   CUR_RESULT=$(cat /tmp/hash_map_string_3_res | tr -d '.')
   if [[ $CUR_RESULT -gt $BEST_RESULT ]]; then
    BEST_METHOD=$method
    BEST_RESULT=$CUR_RESULT
   fi;
  done;
  echo Best: $BEST_METHOD - $BEST_RESULT
 done;
done
*/

Агрегация

Одна машина, одно ядро

Плохой способ

Читаем данные в массив; сортируем по ключу; идём по группам ключей и считаем агрегатные функции.

Достоинства:

+ простота интерфейса агрегатных функций; + возможность более эффективной реализации агрегатных функций; + можно запускать произвольные скрипты для reduce в режиме streaming.

Недостатки:

− пусть N — общее число данных, а M — количество ключей; Отвратительно работает при N > M — в типичном случае. Тратится O(N) оперативки на промежуточные данные вместо O(M).

Хороший способ

Читаем данные, кладём в ассоциативный массив

key tuple -> states of aggregate functions

обновляем состояния агрегатных функций.

Какой ассоциативный массив?

Lookup-таблица. Хэш-таблица.

Бинарное дерево. Skip-лист. B-дерево.

Трай. Трай+хэш-таблица...

Бинарное дерево

− слишком большой оверхед на элемент;

− отвратительная кэш-локальность;

− вообще тормозит.

Skip-лист. Трай. B-дерево...

− вообще для другой задачи;

Lookup-таблица

+ прекрасно для агрегации по числовым ключам не более ~16 бит;

− не подходит для чуть более сложных случаев.

Хэш-таблица

+ моя любимая структура данных;

− много деталей.

Трай+хэш-таблица

+ иногда кое что в этом есть, см. далее;

Одна машина, много ядер

1. Тривиальный способ

Разные потоки читают разные данные по мере возможности. Агрегируют независимо в свои локальные хэш-таблицы. Когда все данные прочитаны, мержим все хэш-таблицы в одну. Например, идём по всем локальным хэш-таблицам кроме первой и перекладываем всё в первую. Фаза чтения данных и предварительной агрегации распараллеливается. Фаза мержа выполняется последовательно. Пусть N — общее число данных, а M — количество ключей. O(M) работы выполняется последовательно и при большом M (кардинальность GROUP BY) работа плохо распараллеливается. Достоинства: тривиально. Недостатки: не масштабируется при большой кардинальности.

2. Partitioning способ

Для каждого блока данных, выполняем агрегацию в две стадии: Стадия 1. Разные потоки будут обрабатывать разные куски блока, какие успеют. В каждом потоке, с помощью отдельной хэш-функции, хэшируем ключ в номер потока и запоминаем его. hash: key -> bucket_num Стадия 2. Каждый поток идёт по всему блоку данных и берёт для агрегации только строки с нуждым номером корзины. Модификация: можно всё в одну стадию — тогда каждый поток будет вычислять хэш-функцию от всех строк заново: подходит, если это дёшево.

Достоинства: + хорошо масштабируется при большой кардинальности и равномерном распределении ключей; + идейная простота. Недостатки: − если объём данных распределён неравномерно по ключам, то стадия 2 плохо масштабируется. Это типичный случай. Почти всегда объём данных по ключам распределён по power law. Ещё недостатки: − если размер блока маленький, то получается слишком мелко-гранулированная многопоточность: большой оверхед на синхронизацию; − если размер блока большой, то плохая кэш-локальность; − на второй стадии, часть memory bandwidth умножается на число потоков; − нужно вычислять ещё одну хэш-функцию, она должна быть независима от той, что в хэш-таблице;

3. Параллельный мерж хэш-таблиц

Отресайзим полученные в разных потоках хэш-таблицы к одному размеру. Разобъём их неявно на разные подмножества ключей. В разных потоках будем мержить соответствующие подмножества ключей хэш-таблиц. Рисунок на доске. Недостаток: − очень сложный код.

4. Ordered мерж хэш-таблиц

Для open addressing linear probing хэш-таблиц, или для chaining хэш-таблиц, данные в хэш-таблице расположены почти упорядоченно по остатку от деления хэш-функции на размер хэш-таблицы — с точностью до цепочек разрешения коллизий. Отресайзим полученные в разных потоках хэш-таблицы к одному размеру. Сделаем ordered iterator, который будет перебирать данные в хэш-таблице в фиксированном порядке. Объём работы на итерирование: количество цепочек разрешения коллизий * средний квадрат длин цепочек. Сделаем merging iterator, который с помощью heap (priority queue) будет перебирать все хэш-таблицы разом.

Достоинства: + не нужно никуда перекладывать элементы: мерж делается inplace. + бонус: подходит для внешней памяти. Недостатки: − отвратительно сложный код; − для open addressing linear probing хэш-таблиц, средний квадрат длин цепочек разрешения коллизий слишком большой; − priority queue тормозит; − стадия мержа не распараллеливается* * — можно совместить с предыдущим способом.

5. Robin Hood ordered мерж хэш-таблиц

Если использовать Robin Hood хэш-таблицу, то данные (за исключением O(1) граничных цепочек разрешения коллизий) будут полностью упорядочены по остатку от деления хэш-функции на размер хэш-таблицы. Достоинства: + вроде бы красивый алгоритм. + бонус: подходит для внешней памяти. Недостатки: − вынуждает использовать robin-hood probing; − priority queue тормозит; − стадия мержа не распараллеливается*

6. Shared хэш-таблица под mutex-ом

Достоинства: очень просто. Недостатки: отрицательная масштабируемость.

7. Много мелких хэш-таблиц под разными mutex-ами

В какую класть — выбирается с помощью отдельной хэш-функции. Недостатки: − в типичном случае данные распределены сильно неравномерно, и потоки будут конкурировать на одной горячей корзине. − в случае маленькой хэш-таблицы, слишком тормозит. Достоинства: если данные почему-то распределены равномерно, то кое-как масштабируется.

8. Shared хэш-таблица и в каждой ячейке spin-lock

Недостатки: − spin-lock — это очень опасно; очень сложно тестировать производительность; вы обязательно сделаете отстой. − в типичном случае данные распределены сильно неравномерно, и потоки будут конкурировать на одной горячей ячейке.

9. Lock free shared хэш-таблица

Недостатки: − lock free хэш-таблицы либо нельзя ресайзить, либо они очень сложные; − в типичном случае данные распределены сильно неравномерно, и потоки будут конкурировать на одной горячей ячейке: false sharing, тормоза. − сложный код, много инструкций, всё тормозит; − я вообще недолюбливаю lock-free алгоритмы.

10. Shared хэш-таблица + thread local хэш-таблицы

Пытаемся положить в shared хэш-таблицу путём блокирования ячейки; если ячейка уже заблокирована — кладём к локальную хэш-таблицу. Тогда горячие ключи попадут в локальные хэш-таблицы. Локальные хэш-таблицы будут маленькими. В конце мержим все локальные хэш-таблицы в глобальную. Дополнения: можно сначала смотреть на наличие ключа в локальной хэш-таблице. Достоинства: + отлично масштабируется; + сравнительно простая реализация. Недостатки: − много лукапов, много инструкций — в целом довольно медленно. Даже не смотря на то, что thread local хэш-таблица зачастую ещё и cache local.

11. Two-level хэш-таблица

На первой стадии, в каждом потоке независимо кладём данные в свои num_buckets = 256 хэш-таблиц, хранящих разные ключи. В какую из них класть (номер корзины) определяется другой хэш-функцией, либо отдельным байтом хэш-функции. Имеем num_threads * num_buckets хэш-таблиц. На второй стадии мержим состояния num_threads * num_buckets хэш-таблиц в одни num_buckets хэш-таблиц, распараллеливая мерж по bucket-ам.

Достоинства: + отлично масштабируется; + простота реализации; + бонус: ресайзы хэш-таблиц амортизируются; + бонус: нахаляву получаем в результате partitioning, который полезен для других стадий конвейера. + бонус: подходит для внешней памяти. Недостатки: − при большой кардинальности, во время мержа делается до такого же объёма работ как на первой стадии; − при маленькой кардинальности, слишком много отдельных хэш-таблиц; − при маленькой кардинальности, работает несколько медленнее тривиального способа;

12. Тривиальный + two-level хэш-таблица

Используем тривиальный способ. Когда разных ключей много, конвертируем в two-level.

Именно такой способ используется в ClickHouse :)

Много машин, много ядер

На разных машинах расположены части данных, которые надо обработать. Отличия от shared memory: — почти отсутствует возможность work stealing; — нужно явно передавать данные по сети.

1. Тривиальный способ

Передаём промежуточные результаты на сервер-инициатор запроса. Последовательно кладём всё в одну хэш-таблицу. Достоинства: + тривиально; + хорошо масштабируется при маленькой кардинальности. Недостатки: − при большой кардинальности не масштабируется; − требуется оперативка на весь результат.

2. Ordered merge

Передаём промежуточные результаты на сервер-инициатор запроса в заданном порядке. Мержим. Достоинства: + тратится O(1) оперативки; Недостатки: − при большой кардинальности не масштабируется; − мерж сортированных потоков (heap) — это медленная операция; − требуется либо сортировать результаты на удалённых серверах, либо использовать один из тех fancy алгоритмов выше.

3. Partitioned merge

Передаём промежуточные результаты на сервер-инициатор запроса, разбитыми на отдельные согласованные корзины-партиции, в заданном порядке корзин. Мержим по одной или по несколько корзин одновременно. Достоинства: + тратится до в num_buckets раз меньше оперативки, чем размер результата; + можно легко распараллелить, мержа сразу несколько корзин — отлично масштабируется по ядрам. Недостатки: − мерж делается на одном сервере — инициаторе запроса — эта стадия не масштабируется по серверам.

Именно такой способ используется в ClickHouse :)

4. Reshuffle + partitioned merge

На удалённых серверах получаем промежуточные результаты, разбитые на согласованные партиции. Затем передаём партиции между серверами так, чтобы на разных серверах были разные партиции, а данные одной партиции оказались на одном сервере. Мержим на всех серверах параллельно, да ещё и используя многие ядра. Достоинства: + прекрасно масштабируется; + при INSERT SELECT, результат можно вовсе не передавать на сервер-инициатор, а сразу сохранить в распределённую таблицу на кластере. Недостатки: − сложная координация серверов;

Всё

Можно задавать вопросы.