mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 18:42:26 +00:00
560 lines
23 KiB
HTML
560 lines
23 KiB
HTML
<!DOCTYPE html>
|
||
<html lang="en">
|
||
<head>
|
||
<title>Параллельный и распределённый GROUP BY</title>
|
||
<meta charset="utf-8">
|
||
<meta http-equiv="x-ua-compatible" content="ie=edge">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||
<link rel="stylesheet" href="shower/themes/ribbon/styles/screen-16x10.css">
|
||
</head>
|
||
<body class="shower list">
|
||
<header class="caption">
|
||
<h1>Параллельный и распределённый GROUP BY</h1>
|
||
</header>
|
||
|
||
<section class="slide" id="cover">
|
||
<h1 style="margin-top: 200px">Параллельный и распределённый GROUP BY</h1>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Обо мне</h2>
|
||
<p>Алексей, разработчик ClickHouse.</p>
|
||
<p>С 2008 занимался движком обработки данных в Яндекс.Метрике.</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2> </h2>
|
||
<p><b>ClickHouse</b> — это аналитическая СУБД.</p>
|
||
<p>Один запрос — много данных на входе, мало на выходе.</p>
|
||
<p>Данные нужно агрегировать налету.</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Метрика 2.0</h2>
|
||
<img src="pictures/metrika2.png" style="height:70%"/>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Пример запроса</h2>
|
||
<p style="font-family: Monospace">SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u<br />
|
||
FROM hits<br />
|
||
WHERE MobilePhoneModel != ''<br />
|
||
<b>GROUP BY</b> MobilePhoneModel<br />
|
||
ORDER BY u DESC</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2> </h2>
|
||
<p>Чтобы быстро обрабатывать запросы, данные необходимо:</p>
|
||
<ul>
|
||
<li>быстро читать;</li>
|
||
<li><b>быстро считать.</b></li>
|
||
</ul>
|
||
<p> </p>
|
||
<p>Конвейер выполнения запроса:</p>
|
||
<p>— фильтрация, JOIN, <b>агрегация</b>, сортировка...</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2 style="font-size: 45px;">Как тестировать производительность?</h2>
|
||
<p>Бенчмарки должны быть:</p>
|
||
<ul>
|
||
<li>на реальных данных;</li>
|
||
<li>на разных наборах данных, покрывающих разные кейсы;</li>
|
||
<li>воспроизводимые;</li>
|
||
<li>автоматизированные.</li>
|
||
</ul>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Пример бенчмарка (не лучшего)</h2>
|
||
<pre style="white-space: pre; font-family: Monospace; font-size: 14px; line-height: 1.25em;">/** Выполнять так:
|
||
for file in <b>MobilePhoneModel PageCharset Params URLDomain UTMSource Referer URL Title</b>; do
|
||
for size in <b>30000 100000 300000 1000000 5000000</b>; do
|
||
echo
|
||
BEST_METHOD=0
|
||
BEST_RESULT=0
|
||
for method in {1..10}; do
|
||
echo -ne $file $size $method '';
|
||
TOTAL_ELEMS=0
|
||
for i in {0..1000}; do
|
||
TOTAL_ELEMS=$(( $TOTAL_ELEMS + $size ))
|
||
if [[ $TOTAL_ELEMS -gt 25000000 ]]; then break; fi
|
||
./hash_map_string_3 $size $method < ${file}.bin 2>&1 |
|
||
grep HashMap | grep -oE '[0-9\.]+ elem';
|
||
done | awk -W interactive '{ if ($1 > x) { x = $1 }; printf(".") } END { print x }' |
|
||
tee /tmp/hash_map_string_3_res;
|
||
CUR_RESULT=$(cat /tmp/hash_map_string_3_res | tr -d '.')
|
||
if [[ $CUR_RESULT -gt $BEST_RESULT ]]; then
|
||
BEST_METHOD=$method
|
||
BEST_RESULT=$CUR_RESULT
|
||
fi;
|
||
done;
|
||
echo Best: $BEST_METHOD - $BEST_RESULT
|
||
done;
|
||
done
|
||
*/</pre>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Агрегация</h2>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Одна машина, одно ядро</h2>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Плохой способ</h2>
|
||
<p>Читаем данные в массив; сортируем по ключу;
|
||
идём по группам ключей и считаем агрегатные функции.</p>
|
||
<p>Достоинства:</p>
|
||
<p>+ простота интерфейса агрегатных функций;
|
||
+ возможность более эффективной реализации агрегатных функций;
|
||
+ можно запускать произвольные скрипты для reduce в режиме streaming.</p>
|
||
<p>Недостатки:</p>
|
||
<p>− пусть N — общее число данных, а M — количество ключей;
|
||
Отвратительно работает при N > M — в типичном случае.
|
||
Тратится O(N) оперативки на промежуточные данные вместо O(M).</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Хороший способ</h2>
|
||
<p>Читаем данные, кладём в ассоциативный массив</p>
|
||
<p><b>key tuple</b> -> <b>states of aggregate functions</b></p>
|
||
<p>обновляем состояния агрегатных функций.</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Какой ассоциативный массив?</h2>
|
||
<p>Lookup-таблица. Хэш-таблица.</p>
|
||
<p>Бинарное дерево. Skip-лист. B-дерево. </p>
|
||
<p>Трай. Трай+хэш-таблица...</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Бинарное дерево</h2>
|
||
<p>− слишком большой оверхед на элемент;</p>
|
||
<p>− отвратительная кэш-локальность;</p>
|
||
<p>− вообще тормозит.</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Skip-лист. Трай. B-дерево...</h2>
|
||
<p>− вообще для другой задачи;</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Lookup-таблица</h2>
|
||
<p>+ прекрасно для агрегации по числовым ключам не более ~16 бит;</p>
|
||
<p>− не подходит для чуть более сложных случаев.</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Хэш-таблица</h2>
|
||
<p>+ моя любимая структура данных;</p>
|
||
<p>− много деталей.</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Трай+хэш-таблица</h2>
|
||
<p>+ иногда кое что в этом есть, см. далее;</p>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>Одна машина, много ядер</h2>
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<h2>1. Тривиальный способ</h2>
|
||
|
||
<p>Разные потоки читают разные данные по мере возможности.
|
||
Агрегируют независимо в свои локальные хэш-таблицы.
|
||
Когда все данные прочитаны, мержим все хэш-таблицы в одну.
|
||
Например, идём по всем локальным хэш-таблицам кроме первой
|
||
и перекладываем всё в первую.
|
||
|
||
Фаза чтения данных и предварительной агрегации распараллеливается.
|
||
Фаза мержа выполняется последовательно.
|
||
|
||
Пусть N — общее число данных, а M — количество ключей.
|
||
O(M) работы выполняется последовательно
|
||
и при большом M (кардинальность GROUP BY)
|
||
работа плохо распараллеливается.
|
||
|
||
Достоинства: тривиально.
|
||
|
||
Недостатки: не масштабируется при большой кардинальности.</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2>2. Partitioning способ</h2>
|
||
|
||
<p>Для каждого блока данных, выполняем агрегацию в две стадии:
|
||
|
||
Стадия 1.
|
||
Разные потоки будут обрабатывать разные куски блока, какие успеют.
|
||
В каждом потоке, с помощью отдельной хэш-функции,
|
||
хэшируем ключ в номер потока и запоминаем его.
|
||
|
||
hash: key -> bucket_num
|
||
|
||
Стадия 2.
|
||
Каждый поток идёт по всему блоку данных
|
||
и берёт для агрегации только строки с нуждым номером корзины.
|
||
|
||
Модификация: можно всё в одну стадию — тогда каждый поток
|
||
будет вычислять хэш-функцию от всех строк заново:
|
||
подходит, если это дёшево.
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<p>
|
||
Достоинства:
|
||
+ хорошо масштабируется при большой кардинальности
|
||
и равномерном распределении ключей;
|
||
+ идейная простота.
|
||
|
||
Недостатки:
|
||
− если объём данных распределён неравномерно по ключам,
|
||
то стадия 2 плохо масштабируется.
|
||
Это типичный случай.
|
||
Почти всегда объём данных по ключам распределён по power law.
|
||
|
||
Ещё недостатки:
|
||
− если размер блока маленький, то получается слишком
|
||
мелко-гранулированная многопоточность:
|
||
большой оверхед на синхронизацию;
|
||
− если размер блока большой, то плохая кэш-локальность;
|
||
− на второй стадии, часть memory bandwidth умножается на число потоков;
|
||
− нужно вычислять ещё одну хэш-функцию,
|
||
она должна быть независима от той, что в хэш-таблице;</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2>3. Параллельный мерж хэш-таблиц</h2>
|
||
|
||
<p>Отресайзим полученные в разных потоках хэш-таблицы к одному размеру.
|
||
Разобъём их неявно на разные подмножества ключей.
|
||
В разных потоках будем мержить соответствующие
|
||
подмножества ключей хэш-таблиц.
|
||
|
||
Рисунок на доске.
|
||
|
||
Недостаток:
|
||
− очень сложный код.</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2>4. Ordered мерж хэш-таблиц</h2>
|
||
|
||
<p>Для open addressing linear probing хэш-таблиц, или для chaining хэш-таблиц,
|
||
данные в хэш-таблице расположены почти упорядоченно
|
||
по остатку от деления хэш-функции на размер хэш-таблицы
|
||
— с точностью до цепочек разрешения коллизий.
|
||
|
||
Отресайзим полученные в разных потоках хэш-таблицы к одному размеру.
|
||
Сделаем ordered iterator, который будет
|
||
перебирать данные в хэш-таблице в фиксированном порядке.
|
||
|
||
Объём работы на итерирование:
|
||
количество цепочек разрешения коллизий * средний квадрат длин цепочек.
|
||
|
||
Сделаем merging iterator, который с помощью heap (priority queue)
|
||
будет перебирать все хэш-таблицы разом.
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<p>Достоинства:
|
||
|
||
+ не нужно никуда перекладывать элементы: мерж делается inplace.
|
||
|
||
+ бонус: подходит для внешней памяти.
|
||
|
||
|
||
Недостатки:
|
||
|
||
− отвратительно сложный код;
|
||
|
||
− для open addressing linear probing хэш-таблиц,
|
||
средний квадрат длин цепочек разрешения коллизий слишком большой;
|
||
|
||
− priority queue тормозит;
|
||
|
||
− стадия мержа не распараллеливается*
|
||
|
||
|
||
* — можно совместить с предыдущим способом.</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2 style="font-size: 40px;">5. Robin Hood ordered мерж хэш-таблиц</h2>
|
||
|
||
<p>Если использовать Robin Hood хэш-таблицу, то данные
|
||
(за исключением O(1) граничных цепочек разрешения коллизий)
|
||
будут полностью упорядочены
|
||
по остатку от деления хэш-функции на размер хэш-таблицы.
|
||
|
||
Достоинства:
|
||
+ вроде бы красивый алгоритм.
|
||
+ бонус: подходит для внешней памяти.
|
||
|
||
Недостатки:
|
||
− вынуждает использовать robin-hood probing;
|
||
− priority queue тормозит;
|
||
− стадия мержа не распараллеливается*</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2 style="font-size: 40px;">6. Shared хэш-таблица под mutex-ом</h2>
|
||
|
||
<p>Достоинства: очень просто.
|
||
|
||
Недостатки: отрицательная масштабируемость.</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2 style="font-size: 30px;">7. Много мелких хэш-таблиц под разными mutex-ами</h2>
|
||
|
||
<p>В какую класть — выбирается с помощью отдельной хэш-функции.
|
||
|
||
Недостатки:
|
||
|
||
− в типичном случае данные распределены сильно неравномерно,
|
||
и потоки будут конкурировать на одной горячей корзине.
|
||
|
||
− в случае маленькой хэш-таблицы, слишком тормозит.
|
||
|
||
Достоинства: если данные почему-то распределены равномерно,
|
||
то кое-как масштабируется.</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2 style="font-size: 35px;">8. Shared хэш-таблица и в каждой ячейке spin-lock</h2>
|
||
|
||
<p>Недостатки:
|
||
|
||
− spin-lock — это очень опасно;
|
||
очень сложно тестировать производительность;
|
||
вы обязательно сделаете отстой.
|
||
|
||
− в типичном случае данные распределены сильно неравномерно,
|
||
и потоки будут конкурировать на одной горячей ячейке.</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2>9. Lock free shared хэш-таблица</h2>
|
||
|
||
<p>Недостатки:
|
||
|
||
− lock free хэш-таблицы либо нельзя ресайзить, либо они очень сложные;
|
||
|
||
− в типичном случае данные распределены сильно неравномерно,
|
||
и потоки будут конкурировать на одной горячей ячейке:
|
||
false sharing, тормоза.
|
||
|
||
− сложный код, много инструкций, всё тормозит;
|
||
|
||
− я вообще недолюбливаю lock-free алгоритмы.</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2 style="font-size: 35px;">10. Shared хэш-таблица + thread local хэш-таблицы</h2>
|
||
|
||
<p>Пытаемся положить в shared хэш-таблицу путём блокирования ячейки;
|
||
если ячейка уже заблокирована — кладём к локальную хэш-таблицу.
|
||
|
||
Тогда горячие ключи попадут в локальные хэш-таблицы.
|
||
Локальные хэш-таблицы будут маленькими.
|
||
В конце мержим все локальные хэш-таблицы в глобальную.
|
||
|
||
Дополнения: можно сначала смотреть
|
||
на наличие ключа в локальной хэш-таблице.
|
||
|
||
Достоинства:
|
||
+ отлично масштабируется;
|
||
+ сравнительно простая реализация.
|
||
|
||
Недостатки:
|
||
− много лукапов, много инструкций — в целом довольно медленно.
|
||
|
||
Даже не смотря на то, что thread local хэш-таблица
|
||
зачастую ещё и cache local.</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2>11. Two-level хэш-таблица</h2>
|
||
|
||
<p>На первой стадии, в каждом потоке независимо
|
||
кладём данные в свои num_buckets = 256 хэш-таблиц,
|
||
хранящих разные ключи.
|
||
|
||
В какую из них класть (номер корзины)
|
||
определяется другой хэш-функцией,
|
||
либо отдельным байтом хэш-функции.
|
||
|
||
Имеем num_threads * num_buckets хэш-таблиц.
|
||
|
||
На второй стадии мержим состояния
|
||
num_threads * num_buckets хэш-таблиц
|
||
в одни num_buckets хэш-таблиц,
|
||
распараллеливая мерж по bucket-ам.
|
||
</section>
|
||
|
||
<section class="slide">
|
||
<p>
|
||
Достоинства:
|
||
|
||
+ отлично масштабируется;
|
||
+ простота реализации;
|
||
|
||
+ бонус: ресайзы хэш-таблиц амортизируются;
|
||
|
||
+ бонус: нахаляву получаем в результате partitioning,
|
||
который полезен для других стадий конвейера.
|
||
|
||
+ бонус: подходит для внешней памяти.
|
||
|
||
Недостатки:
|
||
|
||
− при большой кардинальности, во время мержа
|
||
делается до такого же объёма работ как на первой стадии;
|
||
|
||
− при маленькой кардинальности,
|
||
слишком много отдельных хэш-таблиц;
|
||
|
||
− при маленькой кардинальности,
|
||
работает несколько медленнее тривиального способа;</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2 style="font-size: 40px;">12. Тривиальный + two-level хэш-таблица</h2>
|
||
|
||
<p>Используем тривиальный способ.
|
||
|
||
Когда разных ключей много, конвертируем в two-level.</p>
|
||
<p>
|
||
Именно такой способ используется в ClickHouse :)
|
||
</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2>Много машин, много ядер</h2>
|
||
|
||
<p>На разных машинах расположены части данных,
|
||
которые надо обработать.
|
||
|
||
Отличия от shared memory:
|
||
|
||
— почти отсутствует возможность work stealing;
|
||
— нужно явно передавать данные по сети.</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2>1. Тривиальный способ</h2>
|
||
|
||
<p>Передаём промежуточные результаты на сервер-инициатор запроса.
|
||
Последовательно кладём всё в одну хэш-таблицу.
|
||
|
||
Достоинства:
|
||
|
||
+ тривиально;
|
||
+ хорошо масштабируется при маленькой кардинальности.
|
||
|
||
Недостатки:
|
||
|
||
− при большой кардинальности не масштабируется;
|
||
− требуется оперативка на весь результат.</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2>2. Ordered merge</h2>
|
||
|
||
<p>Передаём промежуточные результаты на сервер-инициатор запроса
|
||
в заданном порядке. Мержим.
|
||
|
||
Достоинства:
|
||
+ тратится O(1) оперативки;
|
||
|
||
Недостатки:
|
||
− при большой кардинальности не масштабируется;
|
||
− мерж сортированных потоков (heap) — это медленная операция;
|
||
− требуется либо сортировать результаты на удалённых серверах,
|
||
либо использовать один из тех fancy алгоритмов выше.</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2>3. Partitioned merge</h2>
|
||
|
||
<p>Передаём промежуточные результаты на сервер-инициатор запроса,
|
||
разбитыми на отдельные согласованные корзины-партиции,
|
||
в заданном порядке корзин.
|
||
|
||
Мержим по одной или по несколько корзин одновременно.
|
||
|
||
Достоинства:
|
||
+ тратится до в num_buckets раз меньше оперативки, чем размер результата;
|
||
+ можно легко распараллелить, мержа сразу несколько корзин
|
||
— отлично масштабируется по ядрам.
|
||
|
||
Недостатки:
|
||
− мерж делается на одном сервере — инициаторе запроса
|
||
— эта стадия не масштабируется по серверам.</p>
|
||
|
||
<p>
|
||
Именно такой способ используется в ClickHouse :)
|
||
</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2>4. Reshuffle + partitioned merge</h2>
|
||
|
||
<p>На удалённых серверах получаем промежуточные результаты,
|
||
разбитые на согласованные партиции.
|
||
|
||
Затем передаём партиции между серверами так,
|
||
чтобы на разных серверах были разные партиции,
|
||
а данные одной партиции оказались на одном сервере.
|
||
|
||
Мержим на всех серверах параллельно, да ещё и используя многие ядра.
|
||
|
||
Достоинства:
|
||
+ прекрасно масштабируется;
|
||
+ при INSERT SELECT, результат можно
|
||
вовсе не передавать на сервер-инициатор,
|
||
а сразу сохранить в распределённую таблицу на кластере.
|
||
|
||
Недостатки:
|
||
− сложная координация серверов;</p>
|
||
</section>
|
||
|
||
|
||
<section class="slide">
|
||
<h2 style="font-size: 100px;">Всё</h2>
|
||
|
||
<p style="font-size: 50px;">Можно задавать вопросы.</p>
|
||
</section>
|
||
|
||
|
||
|
||
<div class="progress"></div>
|
||
<script src="shower/shower.min.js"></script>
|
||
</body>
|
||
</html>
|