dbms: experiments with hash tables [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-05-03 04:08:35 +04:00
parent 36b677e873
commit 04cb4d878b
4 changed files with 593 additions and 38 deletions

View File

@ -130,6 +130,10 @@ struct HashTableCell
/// Десериализация, в бинарном и текстовом виде.
void read(DB::ReadBuffer & rb) { DB::readBinary(key, rb); }
void readText(DB::ReadBuffer & rb) { DB::writeDoubleQuoted(key, rb); }
/// Нужны, если используется HashTableMergeCursor.
void swap(HashTableCell & rhs) { std::swap(key, rhs.key); }
bool less(HashTableCell & rhs) const { return key < rhs.key; }
};
@ -209,6 +213,10 @@ struct ZeroValueStorage<false, Cell>
};
template <typename Table>
class HashTableMergeCursor;
template
<
typename Key,
@ -230,6 +238,13 @@ protected:
typedef size_t HashValue;
typedef HashTable<Key, Cell, Hash, Grower, Allocator> Self;
typedef Cell cell_type;
template <typename Table>
friend class HashTableMergeCursor;
template <typename Table, typename MergeFunction, typename Callback>
friend void processMergedHashTables(std::vector<Table*> & tables, MergeFunction && merge_func, Callback && callback);
size_t m_size = 0; /// Количество элементов
Cell * buf; /// Кусок памяти для всех элементов кроме элемента с ключём 0.

View File

@ -0,0 +1,241 @@
#pragma once
#include <queue>
#include <DB/Common/HashTable/HashTable.h>
/** Позволяет проитерироваться по данным в поряке возрастания остатка от деления хэш-функции на размер таблицы,
* а для элементов с одинаковым остатком - в порядке ключа.
*
* Для разных хэш-таблиц с одинаковым размером буфера, итерация производится в одном и том же порядке.
* То есть, это может быть использовано, чтобы пройти одновременно несколько хэш-таблиц так,
* чтобы одинаковые элементы из разных хэш-таблиц обрабатывались последовательно.
*
* В процессе итерации, меняется расположение элементов в хэш-таблице.
*/
template <typename Table>
class HashTableMergeCursor
{
private:
typedef typename Table::cell_type Cell;
typedef typename Table::value_type value_type;
Table * container;
Cell * ptr;
/** Находимся ли мы в цепочке разрешения коллизий в самом начале буфера?
* В этом случае, цепочка может содержать ключи, которые должны были бы быть в конце буфера.
*/
bool in_first_chain;
/** Начало цепочки разрешения коллизий, которая доходит до конца хэш-таблицы,
* или buf + buf_size, если такой нет.
*/
Cell * begin_of_last_chain;
bool overlapped = false;
size_t place(const Cell * ptr) const
{
return container->grower.place(ptr->getHash(*container));
}
public:
HashTableMergeCursor(Table * container_) : container(container_)
{
in_first_chain = !container->buf->isZero(*container);
begin_of_last_chain = container->buf + container->grower.bufSize();
/// Есть ли цепочка, доходящая до конца таблицы? И если есть, то где её начало?
while (!begin_of_last_chain[-1].isZero(*container))
--begin_of_last_chain;
ptr = container->buf - 1;
next();
}
void next()
{
++ptr;
/// Дойдём до следующей цепочки разрешения коллизий.
while (ptr < container->buf + container->grower.bufSize() && ptr->isZero(*container))
{
in_first_chain = false;
++ptr;
}
/// Если мы в первой цепочке, над элементом, который должен был бы быть в конце, то пропустим все такие элементы.
if (in_first_chain)
{
while (!ptr->isZero(*container) && place(ptr) * 2 > container->grower.bufSize() && ptr < container->buf + container->grower.bufSize())
++ptr;
while (ptr->isZero(*container) && ptr < container->buf + container->grower.bufSize())
++ptr;
}
/// Если мы в последней цепочке, и, возможно, уже перешли через конец буфера, то пропустим все слишком маленькие элементы.
if (overlapped)
{
if (ptr == container->buf + container->grower.bufSize())
ptr = container->buf;
while (!ptr->isZero(*container) && place(ptr) * 2 < container->grower.bufSize())
++ptr;
/// Конец.
if (ptr->isZero(*container))
{
ptr = container->buf + container->grower.bufSize();
return;
}
}
if (ptr == container->buf + container->grower.bufSize())
return;
/// Положим под курсор минимальный элемент в цепочке разрешения коллизий, поменяв его местами с тем, что уже там есть.
size_t min_place_value = place(ptr);
Cell * cell_with_min_place_value = ptr;
if (ptr < begin_of_last_chain && !overlapped)
{
for (Cell * lookahead = ptr + 1; !lookahead->isZero(*container); ++lookahead)
{
size_t place_of_lookahead = place(lookahead);
// std::cerr << place_of_lookahead << ", " << min_place_value << std::endl;
if (place_of_lookahead < min_place_value
|| (place_of_lookahead == min_place_value && lookahead->less(*cell_with_min_place_value)))
{
min_place_value = place_of_lookahead;
cell_with_min_place_value = lookahead;
}
}
if (ptr != cell_with_min_place_value)
ptr->swap(*cell_with_min_place_value);
}
else
{
size_t lookahead_pos = container->grower.next(ptr - container->buf + 1);
overlapped = true;
for (; !container->buf[lookahead_pos].isZero(*container); lookahead_pos = container->grower.next(lookahead_pos))
{
size_t place_of_lookahead = place(&container->buf[lookahead_pos]);
if ((place_of_lookahead < min_place_value
|| (place_of_lookahead == min_place_value && container->buf[lookahead_pos].less(*cell_with_min_place_value)))
&& place_of_lookahead * 2 > container->grower.bufSize())
{
min_place_value = place_of_lookahead;
cell_with_min_place_value = &container->buf[lookahead_pos];
}
}
if (ptr != cell_with_min_place_value)
ptr->swap(*cell_with_min_place_value);
}
}
bool isValid() const
{
return ptr != container->buf + container->grower.bufSize();
}
value_type & get()
{
return ptr->getValue();
}
Cell * getCell()
{
return ptr;
}
/// Для priority_queue (инвертировано).
bool operator< (const HashTableMergeCursor & rhs) const
{
if (!isValid())
return true;
if (!rhs.isValid())
return false;
size_t lhs_place = place(ptr);
size_t rhs_place = place(rhs.ptr);
return lhs_place > rhs_place
|| (lhs_place == rhs_place && !ptr->less(*rhs.ptr));
}
};
/** Позволяет обработать одинаковые ключи нескольких разных хэш-таблиц.
* Если встречает два или более одинаковых ключей, то вызывает
* merge_func(value_type & dst, value_type & src) с одним аргументом dst.
* После обработки всех записей одного ключа, вызывает
* callback(value_type & dst).
*/
template <typename Table, typename MergeFunction, typename Callback>
void processMergedHashTables(std::vector<Table*> & tables, MergeFunction && merge_func, Callback && callback)
{
typedef HashTableMergeCursor<Table> Cursor;
typedef typename Table::cell_type Cell;
size_t tables_size = tables.size();
/// Определим максимальный размер таблиц.
size_t max_size = 0;
for (size_t i = 0; i < tables_size; ++i)
if (tables[i].size() > max_size)
max_size = tables[i].size();
/// Ресайзим все таблицы к этому размеру.
for (size_t i = 0; i < tables_size; ++i)
if (tables[i].size() < max_size)
tables[i].resize(max_size);
typedef std::priority_queue<Cursor> Queue;
Queue queue;
for (size_t i = 0; i < tables_size; ++i)
{
Cursor cursor(&tables[i]);
if (cursor.isValid())
queue.push(cursor);
}
Cell * prev_cell = nullptr;
while (!queue.empty())
{
Cursor cursor = queue.top();
queue.pop();
Cell * current_cell = cursor.getCell();
if (!prev_cell)
{
prev_cell = current_cell;
}
else if (!prev_cell->keyEquals(*current_cell))
{
callback(prev_cell->getValue());
prev_cell = current_cell;
}
else
{
merge_func(prev_cell->getValue(), current_cell->getValue());
}
cursor.next();
if (cursor.isValid())
queue.push(cursor);
}
if (prev_cell)
callback(prev_cell->getValue());
}

View File

@ -0,0 +1,198 @@
#include <iostream>
#include <iomanip>
#include <vector>
#include <statdaemons/Stopwatch.h>
#include <DB/Common/HashTable/Hash.h>
#include <DB/Common/HashTable/HashTable.h>
#include <DB/Common/HashTable/HashTableMerge.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/CompressedReadBuffer.h>
typedef UInt64 Key;
typedef UInt64 Value;
template
<
typename Key,
typename Cell,
typename Hash,
typename Grower,
typename Allocator = HashTableAllocator
>
class HashTableWithDump : public HashTable<Key, Cell, Hash, Grower, Allocator>
{
public:
void dump() const
{
for (size_t i = 0; i < this->grower.bufSize(); ++i)
{
if (this->buf[i].isZero(*this))
std::cerr << "[ ]";
else
std::cerr << '[' << std::right << std::setw(4) << this->buf[i].getValue() << ']';
}
std::cerr << std::endl;
}
};
struct TrivialHash
{
size_t operator() (UInt64 x) const { return x; }
};
struct Grower
{
/// Состояние этой структуры достаточно, чтобы получить размер буфера хэш-таблицы.
/// Определяет начальный размер хэш-таблицы.
static const size_t initial_size_degree = 4;
UInt8 size_degree = initial_size_degree;
/// Размер хэш-таблицы в ячейках.
size_t bufSize() const { return 1 << size_degree; }
size_t maxFill() const { return 1 << (size_degree - 1); }
size_t mask() const { return bufSize() - 1; }
/// Из значения хэш-функции получить номер ячейки в хэш-таблице.
size_t place(size_t x) const { return x & mask(); }
/// Следующая ячейка в цепочке разрешения коллизий.
size_t next(size_t pos) const { ++pos; return pos & mask(); }
/// Является ли хэш-таблица достаточно заполненной. Нужно увеличить размер хэш-таблицы, или удалить из неё что-нибудь ненужное.
bool overflow(size_t elems) const { return false; }
/// Увеличить размер хэш-таблицы.
void increaseSize()
{
size_degree += size_degree >= 23 ? 1 : 2;
}
/// Установить размер буфера по количеству элементов хэш-таблицы. Используется при десериализации хэш-таблицы.
void set(size_t num_elems)
{
size_degree = num_elems <= 1
? initial_size_degree
: ((initial_size_degree > static_cast<size_t>(log2(num_elems - 1)) + 2)
? initial_size_degree
: (static_cast<size_t>(log2(num_elems - 1)) + 2));
}
};
typedef HashTableWithDump<Key, HashTableCell<Key, TrivialHash>, TrivialHash, HashTableGrower, HashTableAllocator> Set;
int main(int argc, char ** argv)
{
/* Set set;
set.dump();
set.insert(37);
set.dump();
set.insert(21);
set.dump();
set.insert(5);
set.dump();
set.insert(6);
set.dump();
set.insert(22);
set.dump();
set.insert(14);
set.dump();
set.insert(15);
set.dump();
set.insert(30);
set.dump();
set.insert(1);
set.dump();
std::cerr << std::endl;
for (HashTableMergeCursor<Set> it(&set); it.isValid(); it.next())
std::cerr << it.get() << std::endl;
set.resize(15);
set.dump();
std::cerr << std::endl;
for (HashTableMergeCursor<Set> it(&set); it.isValid(); it.next())
std::cerr << it.get() << std::endl;
*/
size_t n = atoi(argv[1]);
std::vector<Key> data(n);
{
Stopwatch watch;
DB::ReadBufferFromFileDescriptor in1(STDIN_FILENO);
DB::CompressedReadBuffer in2(in1);
in2.readStrict(reinterpret_cast<char*>(&data[0]), sizeof(data[0]) * n);
watch.stop();
std::cerr << std::fixed << std::setprecision(2)
<< "Vector. Size: " << n
<< ", elapsed: " << watch.elapsedSeconds()
<< " (" << n / watch.elapsedSeconds() << " elem/sec.)"
<< std::endl;
}
{
Stopwatch watch;
Set set;
for (size_t i = 0; i < n; ++i)
set.insert(data[i]);
watch.stop();
std::cerr << std::fixed << std::setprecision(2)
<< "HashSet. Size: " << set.size()
<< ", elapsed: " << watch.elapsedSeconds()
<< " (" << n / watch.elapsedSeconds() << " elem/sec.)"
<< std::endl;
{
watch.restart();
size_t sum = 0;
for (auto x : set)
sum += x;
watch.stop();
std::cerr << std::fixed << std::setprecision(2)
<< "Iterated in: " << watch.elapsedSeconds()
<< " (" << set.size() / watch.elapsedSeconds() << " elem/sec.)"
<< " sum = " << sum
<< std::endl;
}
{
watch.restart();
size_t sum = 0;
for (HashTableMergeCursor<Set> it(&set); it.isValid(); it.next())
sum += it.get();
watch.stop();
std::cerr << std::fixed << std::setprecision(2)
<< "Ordered iterated in: " << watch.elapsedSeconds()
<< " (" << set.size() / watch.elapsedSeconds() << " elem/sec.)"
<< " sum = " << sum
<< std::endl;
}
}
return 0;
}

View File

@ -5,8 +5,11 @@
#include <DB/Interpreters/AggregationCommon.h>
#define DBMS_HASH_MAP_DEBUG_RESIZES
#include <DB/Common/HashTable/HashMap.h>
#include <DB/Common/HashTable/TwoLevelHashTable.h>
#include <DB/Common/HashTable/HashTableWithSmallLocks.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/CompressedReadBuffer.h>
@ -20,7 +23,7 @@ typedef UInt64 Value;
typedef std::vector<Key> Source;
typedef HashMap<Key, Value> Map1;
typedef HashMap<Key, Value> Map;
struct TwoLevelGrower : public HashTableGrower
@ -67,53 +70,57 @@ template
>
using TwoLevelHashMap = TwoLevelHashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, Grower, Allocator>;
typedef TwoLevelHashMap<Key, Value> Map2;
typedef TwoLevelHashMap<Key, Value> MapTwoLevel;
struct __attribute__((__aligned__(64))) SmallLock
struct __attribute__((__aligned__(64))) AlignedSmallLock : public SmallLock
{
std::atomic<bool> locked {false};
char dummy[64 - sizeof(locked)];
bool tryLock()
{
bool expected = false;
return locked.compare_exchange_strong(expected, true, std::memory_order_acquire);
}
void unlock()
{
locked.store(false, std::memory_order_release);
}
char dummy[64 - sizeof(SmallLock)];
};
void aggregate1(Map1 & map, Source::const_iterator begin, Source::const_iterator end)
struct FixedSizeGrower : public HashTableGrower
{
static const size_t initial_size_degree = 21;
FixedSizeGrower() { size_degree = initial_size_degree; }
};
typedef HashTableWithSmallLocks<
Key,
HashTableCellWithLock<
Key,
HashMapCell<Key, Value, DefaultHash<Key> > >,
DefaultHash<Key>,
FixedSizeGrower,
HashTableAllocator> MapSmallLocks;
void aggregate1(Map & map, Source::const_iterator begin, Source::const_iterator end)
{
for (auto it = begin; it != end; ++it)
++map[*it];
}
void aggregate2(Map2 & map, Source::const_iterator begin, Source::const_iterator end)
void aggregate2(MapTwoLevel & map, Source::const_iterator begin, Source::const_iterator end)
{
for (auto it = begin; it != end; ++it)
++map[*it];
}
void merge2(Map2 * maps, size_t num_threads, size_t bucket)
void merge2(MapTwoLevel * maps, size_t num_threads, size_t bucket)
{
for (size_t i = 1; i < num_threads; ++i)
for (auto it = maps[i].impls[bucket].begin(); it != maps[i].impls[bucket].end(); ++it)
maps[0].impls[bucket][it->first] += it->second;
}
void aggregate3(Map1 & local_map, Map1 & global_map, SmallLock & mutex, Source::const_iterator begin, Source::const_iterator end)
void aggregate3(Map & local_map, Map & global_map, AlignedSmallLock & mutex, Source::const_iterator begin, Source::const_iterator end)
{
static constexpr size_t threshold = 65536;
for (auto it = begin; it != end; ++it)
{
Map1::iterator found = local_map.find(*it);
Map::iterator found = local_map.find(*it);
if (found != local_map.end())
++found->second;
@ -132,14 +139,14 @@ void aggregate3(Map1 & local_map, Map1 & global_map, SmallLock & mutex, Source::
}
}
void aggregate4(Map1 & local_map, Map2 & global_map, SmallLock * mutexes, Source::const_iterator begin, Source::const_iterator end)
void aggregate4(Map & local_map, MapTwoLevel & global_map, AlignedSmallLock * mutexes, Source::const_iterator begin, Source::const_iterator end)
{
DefaultHash<Key> hash;
static constexpr size_t threshold = 65536;
for (auto it = begin; it != end; ++it)
{
Map1::iterator found = local_map.find(*it);
Map::iterator found = local_map.find(*it);
if (found != local_map.end())
++found->second;
@ -152,7 +159,7 @@ void aggregate4(Map1 & local_map, Map2 & global_map, SmallLock * mutexes, Source
if (mutexes[bucket].tryLock())
{
Map2::Impl::iterator inserted_it;
MapTwoLevel::Impl::iterator inserted_it;
bool inserted;
global_map.impls[bucket].emplace(*it, inserted_it, inserted, hash_value);
mutexes[bucket].unlock();
@ -163,6 +170,33 @@ void aggregate4(Map1 & local_map, Map2 & global_map, SmallLock * mutexes, Source
}
}
void aggregate5(Map & local_map, MapSmallLocks & global_map, Source::const_iterator begin, Source::const_iterator end)
{
// static constexpr size_t threshold = 65536;
for (auto it = begin; it != end; ++it)
{
/* Map::iterator found = local_map.find(*it);
if (found != local_map.end())
++found->second;
else if (local_map.size() < threshold)
++local_map[*it]; /// TODO Можно было бы делать один lookup, а не два.
else*/
{
SmallScopedLock lock;
MapSmallLocks::iterator insert_it;
bool inserted;
if (global_map.tryEmplace(*it, insert_it, inserted, lock))
++insert_it->second;
else
++local_map[*it];
}
}
}
int main(int argc, char ** argv)
{
@ -188,7 +222,7 @@ int main(int argc, char ** argv)
<< "Vector. Size: " << n
<< ", elapsed: " << watch.elapsedSeconds()
<< " (" << n / watch.elapsedSeconds() << " elem/sec.)"
<< std::endl;
<< std::endl << std::endl;
}
if (!method || method == 1)
@ -198,7 +232,7 @@ int main(int argc, char ** argv)
* Затем сливаем их вместе.
*/
Map1 maps[num_threads];
Map maps[num_threads];
Stopwatch watch;
@ -257,7 +291,7 @@ int main(int argc, char ** argv)
* и преимущество в производительности достигает 4 раз.
*/
Map2 maps[num_threads];
MapTwoLevel maps[num_threads];
Stopwatch watch;
@ -287,7 +321,7 @@ int main(int argc, char ** argv)
watch.restart();
for (size_t i = 0; i < Map2::NUM_BUCKETS; ++i)
for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
pool.schedule(std::bind(merge2,
&maps[0], num_threads, i));
@ -307,7 +341,7 @@ int main(int argc, char ** argv)
<< std::endl;
size_t sum_size = 0;
for (size_t i = 0; i < Map2::NUM_BUCKETS; ++i)
for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
sum_size += maps[0].impls[i].size();
std::cerr << "Size: " << sum_size << std::endl << std::endl;
@ -325,9 +359,9 @@ int main(int argc, char ** argv)
* Этот метод плохой - много contention-а.
*/
Map1 local_maps[num_threads];
Map1 global_map;
SmallLock mutex;
Map local_maps[num_threads];
Map global_map;
AlignedSmallLock mutex;
Stopwatch watch;
@ -391,12 +425,12 @@ int main(int argc, char ** argv)
* Если размер локальной хэш-таблицы большой, и в ней нет элемента,
* то вставляем его в одну из 256 глобальных хэш-таблиц, каждая из которых под своим mutex-ом.
* Затем сливаем все локальные хэш-таблицы в глобальную.
* Этот метод тоже плохой.
* Этот метод не такой уж плохой при большом количестве потоков, но хуже второго.
*/
Map1 local_maps[num_threads];
Map2 global_map;
SmallLock mutexes[Map2::NUM_BUCKETS];
Map local_maps[num_threads];
MapTwoLevel global_map;
AlignedSmallLock mutexes[MapTwoLevel::NUM_BUCKETS];
Stopwatch watch;
@ -427,7 +461,7 @@ int main(int argc, char ** argv)
std::cerr << std::endl;
size_t sum_size = 0;
for (size_t i = 0; i < Map2::NUM_BUCKETS; ++i)
for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
sum_size += global_map.impls[i].size();
std::cerr << "Size (global): " << sum_size << std::endl;
@ -455,11 +489,78 @@ int main(int argc, char ** argv)
<< std::endl;
sum_size = 0;
for (size_t i = 0; i < Map2::NUM_BUCKETS; ++i)
for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
sum_size += global_map.impls[i].size();
std::cerr << "Size: " << sum_size << std::endl << std::endl;
}
if (!method || method == 5)
{
/** Вариант 5.
* В разных потоках агрегируем независимо в разные хэш-таблицы,
* пока их размер не станет достаточно большим.
* Если размер локальной хэш-таблицы большой, и в ней нет элемента,
* то вставляем его в одну глобальную хэш-таблицу, содержащую маленькие защёлки в каждой ячейке,
* а если защёлку не удалось захватить, то вставляем в локальную.
* Затем сливаем все локальные хэш-таблицы в глобальную.
*/
Map local_maps[num_threads];
MapSmallLocks global_map;
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate5,
std::ref(local_maps[i]),
std::ref(global_map),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
pool.wait();
watch.stop();
double time_aggregated = watch.elapsedSeconds();
std::cerr
<< "Aggregated in " << time_aggregated
<< " (" << n / time_aggregated << " elem/sec.)"
<< std::endl;
size_t size_before_merge = 0;
std::cerr << "Sizes (local): ";
for (size_t i = 0; i < num_threads; ++i)
{
std::cerr << (i == 0 ? "" : ", ") << local_maps[i].size();
size_before_merge += local_maps[i].size();
}
std::cerr << std::endl;
std::cerr << "Size (global): " << global_map.size() << std::endl;
size_before_merge += global_map.size();
watch.restart();
for (size_t i = 0; i < num_threads; ++i)
for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it)
global_map.insert(std::make_pair(it->first, 0)).first->second += it->second;
pool.wait();
watch.stop();
double time_merged = watch.elapsedSeconds();
std::cerr
<< "Merged in " << time_merged
<< " (" << size_before_merge / time_merged << " elem/sec.)"
<< std::endl;
double time_total = time_aggregated + time_merged;
std::cerr
<< "Total in " << time_total
<< " (" << n / time_total << " elem/sec.)"
<< std::endl;
std::cerr << "Size: " << global_map.size() << std::endl << std::endl;
}
return 0;
}