Алексей, разработчик ClickHouse.
С 2008 занимался движком обработки данных в Яндекс.Метрике.
ClickHouse — это аналитическая СУБД.
Один запрос — много данных на входе, мало на выходе.
Данные нужно агрегировать налету.
SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u
FROM hits
WHERE MobilePhoneModel != ''
GROUP BY MobilePhoneModel
ORDER BY u DESC
Чтобы быстро обрабатывать запросы, данные необходимо:
Конвейер выполнения запроса:
— фильтрация, JOIN, агрегация, сортировка...
Бенчмарки должны быть:
/** Выполнять так: for file in MobilePhoneModel PageCharset Params URLDomain UTMSource Referer URL Title; do for size in 30000 100000 300000 1000000 5000000; do echo BEST_METHOD=0 BEST_RESULT=0 for method in {1..10}; do echo -ne $file $size $method ''; TOTAL_ELEMS=0 for i in {0..1000}; do TOTAL_ELEMS=$(( $TOTAL_ELEMS + $size )) if [[ $TOTAL_ELEMS -gt 25000000 ]]; then break; fi ./hash_map_string_3 $size $method < ${file}.bin 2>&1 | grep HashMap | grep -oE '[0-9\.]+ elem'; done | awk -W interactive '{ if ($1 > x) { x = $1 }; printf(".") } END { print x }' | tee /tmp/hash_map_string_3_res; CUR_RESULT=$(cat /tmp/hash_map_string_3_res | tr -d '.') if [[ $CUR_RESULT -gt $BEST_RESULT ]]; then BEST_METHOD=$method BEST_RESULT=$CUR_RESULT fi; done; echo Best: $BEST_METHOD - $BEST_RESULT done; done */
Читаем данные в массив; сортируем по ключу; идём по группам ключей и считаем агрегатные функции.
Достоинства:
+ простота интерфейса агрегатных функций; + возможность более эффективной реализации агрегатных функций; + можно запускать произвольные скрипты для reduce в режиме streaming.
Недостатки:
− пусть N — общее число данных, а M — количество ключей; Отвратительно работает при N > M — в типичном случае. Тратится O(N) оперативки на промежуточные данные вместо O(M).
Читаем данные, кладём в ассоциативный массив
key tuple -> states of aggregate functions
обновляем состояния агрегатных функций.
Lookup-таблица. Хэш-таблица.
Бинарное дерево. Skip-лист. B-дерево.
Трай. Трай+хэш-таблица...
− слишком большой оверхед на элемент;
− отвратительная кэш-локальность;
− вообще тормозит.
− вообще для другой задачи;
+ прекрасно для агрегации по числовым ключам не более ~16 бит;
− не подходит для чуть более сложных случаев.
+ моя любимая структура данных;
− много деталей.
+ иногда кое что в этом есть, см. далее;
Разные потоки читают разные данные по мере возможности. Агрегируют независимо в свои локальные хэш-таблицы. Когда все данные прочитаны, мержим все хэш-таблицы в одну. Например, идём по всем локальным хэш-таблицам кроме первой и перекладываем всё в первую. Фаза чтения данных и предварительной агрегации распараллеливается. Фаза мержа выполняется последовательно. Пусть N — общее число данных, а M — количество ключей. O(M) работы выполняется последовательно и при большом M (кардинальность GROUP BY) работа плохо распараллеливается. Достоинства: тривиально. Недостатки: не масштабируется при большой кардинальности.
Для каждого блока данных, выполняем агрегацию в две стадии: Стадия 1. Разные потоки будут обрабатывать разные куски блока, какие успеют. В каждом потоке, с помощью отдельной хэш-функции, хэшируем ключ в номер потока и запоминаем его. hash: key -> bucket_num Стадия 2. Каждый поток идёт по всему блоку данных и берёт для агрегации только строки с нуждым номером корзины. Модификация: можно всё в одну стадию — тогда каждый поток будет вычислять хэш-функцию от всех строк заново: подходит, если это дёшево.
Достоинства: + хорошо масштабируется при большой кардинальности и равномерном распределении ключей; + идейная простота. Недостатки: − если объём данных распределён неравномерно по ключам, то стадия 2 плохо масштабируется. Это типичный случай. Почти всегда объём данных по ключам распределён по power law. Ещё недостатки: − если размер блока маленький, то получается слишком мелко-гранулированная многопоточность: большой оверхед на синхронизацию; − если размер блока большой, то плохая кэш-локальность; − на второй стадии, часть memory bandwidth умножается на число потоков; − нужно вычислять ещё одну хэш-функцию, она должна быть независима от той, что в хэш-таблице;
Отресайзим полученные в разных потоках хэш-таблицы к одному размеру. Разобъём их неявно на разные подмножества ключей. В разных потоках будем мержить соответствующие подмножества ключей хэш-таблиц. Рисунок на доске. Недостаток: − очень сложный код.
Для open addressing linear probing хэш-таблиц, или для chaining хэш-таблиц, данные в хэш-таблице расположены почти упорядоченно по остатку от деления хэш-функции на размер хэш-таблицы — с точностью до цепочек разрешения коллизий. Отресайзим полученные в разных потоках хэш-таблицы к одному размеру. Сделаем ordered iterator, который будет перебирать данные в хэш-таблице в фиксированном порядке. Объём работы на итерирование: количество цепочек разрешения коллизий * средний квадрат длин цепочек. Сделаем merging iterator, который с помощью heap (priority queue) будет перебирать все хэш-таблицы разом.
Достоинства: + не нужно никуда перекладывать элементы: мерж делается inplace. + бонус: подходит для внешней памяти. Недостатки: − отвратительно сложный код; − для open addressing linear probing хэш-таблиц, средний квадрат длин цепочек разрешения коллизий слишком большой; − priority queue тормозит; − стадия мержа не распараллеливается* * — можно совместить с предыдущим способом.
Если использовать Robin Hood хэш-таблицу, то данные (за исключением O(1) граничных цепочек разрешения коллизий) будут полностью упорядочены по остатку от деления хэш-функции на размер хэш-таблицы. Достоинства: + вроде бы красивый алгоритм. + бонус: подходит для внешней памяти. Недостатки: − вынуждает использовать robin-hood probing; − priority queue тормозит; − стадия мержа не распараллеливается*
Достоинства: очень просто. Недостатки: отрицательная масштабируемость.
В какую класть — выбирается с помощью отдельной хэш-функции. Недостатки: − в типичном случае данные распределены сильно неравномерно, и потоки будут конкурировать на одной горячей корзине. − в случае маленькой хэш-таблицы, слишком тормозит. Достоинства: если данные почему-то распределены равномерно, то кое-как масштабируется.
Недостатки: − spin-lock — это очень опасно; очень сложно тестировать производительность; вы обязательно сделаете отстой. − в типичном случае данные распределены сильно неравномерно, и потоки будут конкурировать на одной горячей ячейке.
Недостатки: − lock free хэш-таблицы либо нельзя ресайзить, либо они очень сложные; − в типичном случае данные распределены сильно неравномерно, и потоки будут конкурировать на одной горячей ячейке: false sharing, тормоза. − сложный код, много инструкций, всё тормозит; − я вообще недолюбливаю lock-free алгоритмы.
Пытаемся положить в shared хэш-таблицу путём блокирования ячейки; если ячейка уже заблокирована — кладём к локальную хэш-таблицу. Тогда горячие ключи попадут в локальные хэш-таблицы. Локальные хэш-таблицы будут маленькими. В конце мержим все локальные хэш-таблицы в глобальную. Дополнения: можно сначала смотреть на наличие ключа в локальной хэш-таблице. Достоинства: + отлично масштабируется; + сравнительно простая реализация. Недостатки: − много лукапов, много инструкций — в целом довольно медленно. Даже не смотря на то, что thread local хэш-таблица зачастую ещё и cache local.
На первой стадии, в каждом потоке независимо кладём данные в свои num_buckets = 256 хэш-таблиц, хранящих разные ключи. В какую из них класть (номер корзины) определяется другой хэш-функцией, либо отдельным байтом хэш-функции. Имеем num_threads * num_buckets хэш-таблиц. На второй стадии мержим состояния num_threads * num_buckets хэш-таблиц в одни num_buckets хэш-таблиц, распараллеливая мерж по bucket-ам.
Достоинства: + отлично масштабируется; + простота реализации; + бонус: ресайзы хэш-таблиц амортизируются; + бонус: нахаляву получаем в результате partitioning, который полезен для других стадий конвейера. + бонус: подходит для внешней памяти. Недостатки: − при большой кардинальности, во время мержа делается до такого же объёма работ как на первой стадии; − при маленькой кардинальности, слишком много отдельных хэш-таблиц; − при маленькой кардинальности, работает несколько медленнее тривиального способа;
Используем тривиальный способ. Когда разных ключей много, конвертируем в two-level.
Именно такой способ используется в ClickHouse :)
На разных машинах расположены части данных, которые надо обработать. Отличия от shared memory: — почти отсутствует возможность work stealing; — нужно явно передавать данные по сети.
Передаём промежуточные результаты на сервер-инициатор запроса. Последовательно кладём всё в одну хэш-таблицу. Достоинства: + тривиально; + хорошо масштабируется при маленькой кардинальности. Недостатки: − при большой кардинальности не масштабируется; − требуется оперативка на весь результат.
Передаём промежуточные результаты на сервер-инициатор запроса в заданном порядке. Мержим. Достоинства: + тратится O(1) оперативки; Недостатки: − при большой кардинальности не масштабируется; − мерж сортированных потоков (heap) — это медленная операция; − требуется либо сортировать результаты на удалённых серверах, либо использовать один из тех fancy алгоритмов выше.
Передаём промежуточные результаты на сервер-инициатор запроса, разбитыми на отдельные согласованные корзины-партиции, в заданном порядке корзин. Мержим по одной или по несколько корзин одновременно. Достоинства: + тратится до в num_buckets раз меньше оперативки, чем размер результата; + можно легко распараллелить, мержа сразу несколько корзин — отлично масштабируется по ядрам. Недостатки: − мерж делается на одном сервере — инициаторе запроса — эта стадия не масштабируется по серверам.
Именно такой способ используется в ClickHouse :)
На удалённых серверах получаем промежуточные результаты, разбитые на согласованные партиции. Затем передаём партиции между серверами так, чтобы на разных серверах были разные партиции, а данные одной партиции оказались на одном сервере. Мержим на всех серверах параллельно, да ещё и используя многие ядра. Достоинства: + прекрасно масштабируется; + при INSERT SELECT, результат можно вовсе не передавать на сервер-инициатор, а сразу сохранить в распределённую таблицу на кластере. Недостатки: − сложная координация серверов;
Можно задавать вопросы.