diff --git a/dbms/include/DB/Common/HashTable/HashTable.h b/dbms/include/DB/Common/HashTable/HashTable.h index 554c80c9b74..f5db6b9f0de 100644 --- a/dbms/include/DB/Common/HashTable/HashTable.h +++ b/dbms/include/DB/Common/HashTable/HashTable.h @@ -130,6 +130,10 @@ struct HashTableCell /// Десериализация, в бинарном и текстовом виде. void read(DB::ReadBuffer & rb) { DB::readBinary(key, rb); } void readText(DB::ReadBuffer & rb) { DB::writeDoubleQuoted(key, rb); } + + /// Нужны, если используется HashTableMergeCursor. + void swap(HashTableCell & rhs) { std::swap(key, rhs.key); } + bool less(HashTableCell & rhs) const { return key < rhs.key; } }; @@ -209,6 +213,10 @@ struct ZeroValueStorage }; +template +class HashTableMergeCursor; + + template < typename Key, @@ -230,6 +238,13 @@ protected: typedef size_t HashValue; typedef HashTable Self; + typedef Cell cell_type; + + template + friend class HashTableMergeCursor; + + template + friend void processMergedHashTables(std::vector & tables, MergeFunction && merge_func, Callback && callback); size_t m_size = 0; /// Количество элементов Cell * buf; /// Кусок памяти для всех элементов кроме элемента с ключём 0. diff --git a/dbms/include/DB/Common/HashTable/HashTableMerge.h b/dbms/include/DB/Common/HashTable/HashTableMerge.h new file mode 100644 index 00000000000..01696ac6179 --- /dev/null +++ b/dbms/include/DB/Common/HashTable/HashTableMerge.h @@ -0,0 +1,241 @@ +#pragma once + +#include + +#include + + +/** Позволяет проитерироваться по данным в поряке возрастания остатка от деления хэш-функции на размер таблицы, + * а для элементов с одинаковым остатком - в порядке ключа. + * + * Для разных хэш-таблиц с одинаковым размером буфера, итерация производится в одном и том же порядке. + * То есть, это может быть использовано, чтобы пройти одновременно несколько хэш-таблиц так, + * чтобы одинаковые элементы из разных хэш-таблиц обрабатывались последовательно. + * + * В процессе итерации, меняется расположение элементов в хэш-таблице. + */ +template +class HashTableMergeCursor +{ +private: + typedef typename Table::cell_type Cell; + typedef typename Table::value_type value_type; + + Table * container; + Cell * ptr; + + /** Находимся ли мы в цепочке разрешения коллизий в самом начале буфера? + * В этом случае, цепочка может содержать ключи, которые должны были бы быть в конце буфера. + */ + bool in_first_chain; + + /** Начало цепочки разрешения коллизий, которая доходит до конца хэш-таблицы, + * или buf + buf_size, если такой нет. + */ + Cell * begin_of_last_chain; + + bool overlapped = false; + + size_t place(const Cell * ptr) const + { + return container->grower.place(ptr->getHash(*container)); + } + +public: + HashTableMergeCursor(Table * container_) : container(container_) + { + in_first_chain = !container->buf->isZero(*container); + + begin_of_last_chain = container->buf + container->grower.bufSize(); + + /// Есть ли цепочка, доходящая до конца таблицы? И если есть, то где её начало? + while (!begin_of_last_chain[-1].isZero(*container)) + --begin_of_last_chain; + + ptr = container->buf - 1; + next(); + } + + void next() + { + ++ptr; + + /// Дойдём до следующей цепочки разрешения коллизий. + while (ptr < container->buf + container->grower.bufSize() && ptr->isZero(*container)) + { + in_first_chain = false; + ++ptr; + } + + /// Если мы в первой цепочке, над элементом, который должен был бы быть в конце, то пропустим все такие элементы. + if (in_first_chain) + { + while (!ptr->isZero(*container) && place(ptr) * 2 > container->grower.bufSize() && ptr < container->buf + container->grower.bufSize()) + ++ptr; + + while (ptr->isZero(*container) && ptr < container->buf + container->grower.bufSize()) + ++ptr; + } + + /// Если мы в последней цепочке, и, возможно, уже перешли через конец буфера, то пропустим все слишком маленькие элементы. + if (overlapped) + { + if (ptr == container->buf + container->grower.bufSize()) + ptr = container->buf; + + while (!ptr->isZero(*container) && place(ptr) * 2 < container->grower.bufSize()) + ++ptr; + + /// Конец. + if (ptr->isZero(*container)) + { + ptr = container->buf + container->grower.bufSize(); + return; + } + } + + if (ptr == container->buf + container->grower.bufSize()) + return; + + /// Положим под курсор минимальный элемент в цепочке разрешения коллизий, поменяв его местами с тем, что уже там есть. + size_t min_place_value = place(ptr); + Cell * cell_with_min_place_value = ptr; + + if (ptr < begin_of_last_chain && !overlapped) + { + for (Cell * lookahead = ptr + 1; !lookahead->isZero(*container); ++lookahead) + { + size_t place_of_lookahead = place(lookahead); + +// std::cerr << place_of_lookahead << ", " << min_place_value << std::endl; + + if (place_of_lookahead < min_place_value + || (place_of_lookahead == min_place_value && lookahead->less(*cell_with_min_place_value))) + { + min_place_value = place_of_lookahead; + cell_with_min_place_value = lookahead; + } + } + + if (ptr != cell_with_min_place_value) + ptr->swap(*cell_with_min_place_value); + } + else + { + size_t lookahead_pos = container->grower.next(ptr - container->buf + 1); + overlapped = true; + + for (; !container->buf[lookahead_pos].isZero(*container); lookahead_pos = container->grower.next(lookahead_pos)) + { + size_t place_of_lookahead = place(&container->buf[lookahead_pos]); + if ((place_of_lookahead < min_place_value + || (place_of_lookahead == min_place_value && container->buf[lookahead_pos].less(*cell_with_min_place_value))) + && place_of_lookahead * 2 > container->grower.bufSize()) + { + min_place_value = place_of_lookahead; + cell_with_min_place_value = &container->buf[lookahead_pos]; + } + } + + if (ptr != cell_with_min_place_value) + ptr->swap(*cell_with_min_place_value); + } + } + + bool isValid() const + { + return ptr != container->buf + container->grower.bufSize(); + } + + value_type & get() + { + return ptr->getValue(); + } + + Cell * getCell() + { + return ptr; + } + + /// Для priority_queue (инвертировано). + bool operator< (const HashTableMergeCursor & rhs) const + { + if (!isValid()) + return true; + if (!rhs.isValid()) + return false; + + size_t lhs_place = place(ptr); + size_t rhs_place = place(rhs.ptr); + + return lhs_place > rhs_place + || (lhs_place == rhs_place && !ptr->less(*rhs.ptr)); + } +}; + + +/** Позволяет обработать одинаковые ключи нескольких разных хэш-таблиц. + * Если встречает два или более одинаковых ключей, то вызывает + * merge_func(value_type & dst, value_type & src) с одним аргументом dst. + * После обработки всех записей одного ключа, вызывает + * callback(value_type & dst). + */ +template +void processMergedHashTables(std::vector & tables, MergeFunction && merge_func, Callback && callback) +{ + typedef HashTableMergeCursor Cursor; + typedef typename Table::cell_type Cell; + + size_t tables_size = tables.size(); + + /// Определим максимальный размер таблиц. + size_t max_size = 0; + for (size_t i = 0; i < tables_size; ++i) + if (tables[i].size() > max_size) + max_size = tables[i].size(); + + /// Ресайзим все таблицы к этому размеру. + for (size_t i = 0; i < tables_size; ++i) + if (tables[i].size() < max_size) + tables[i].resize(max_size); + + typedef std::priority_queue Queue; + Queue queue; + + for (size_t i = 0; i < tables_size; ++i) + { + Cursor cursor(&tables[i]); + if (cursor.isValid()) + queue.push(cursor); + } + + Cell * prev_cell = nullptr; + while (!queue.empty()) + { + Cursor cursor = queue.top(); + queue.pop(); + + Cell * current_cell = cursor.getCell(); + + if (!prev_cell) + { + prev_cell = current_cell; + } + else if (!prev_cell->keyEquals(*current_cell)) + { + callback(prev_cell->getValue()); + prev_cell = current_cell; + } + else + { + merge_func(prev_cell->getValue(), current_cell->getValue()); + } + + cursor.next(); + if (cursor.isValid()) + queue.push(cursor); + } + + if (prev_cell) + callback(prev_cell->getValue()); +} diff --git a/dbms/src/Common/tests/ordered_iterator.cpp b/dbms/src/Common/tests/ordered_iterator.cpp new file mode 100644 index 00000000000..14350ac0d35 --- /dev/null +++ b/dbms/src/Common/tests/ordered_iterator.cpp @@ -0,0 +1,198 @@ +#include +#include +#include + +#include + +#include +#include +#include + +#include +#include + + +typedef UInt64 Key; +typedef UInt64 Value; + + +template +< + typename Key, + typename Cell, + typename Hash, + typename Grower, + typename Allocator = HashTableAllocator +> +class HashTableWithDump : public HashTable +{ +public: + void dump() const + { + for (size_t i = 0; i < this->grower.bufSize(); ++i) + { + if (this->buf[i].isZero(*this)) + std::cerr << "[ ]"; + else + std::cerr << '[' << std::right << std::setw(4) << this->buf[i].getValue() << ']'; + } + std::cerr << std::endl; + } +}; + + +struct TrivialHash +{ + size_t operator() (UInt64 x) const { return x; } +}; + + +struct Grower +{ + /// Состояние этой структуры достаточно, чтобы получить размер буфера хэш-таблицы. + + /// Определяет начальный размер хэш-таблицы. + static const size_t initial_size_degree = 4; + + UInt8 size_degree = initial_size_degree; + + /// Размер хэш-таблицы в ячейках. + size_t bufSize() const { return 1 << size_degree; } + + size_t maxFill() const { return 1 << (size_degree - 1); } + size_t mask() const { return bufSize() - 1; } + + /// Из значения хэш-функции получить номер ячейки в хэш-таблице. + size_t place(size_t x) const { return x & mask(); } + + /// Следующая ячейка в цепочке разрешения коллизий. + size_t next(size_t pos) const { ++pos; return pos & mask(); } + + /// Является ли хэш-таблица достаточно заполненной. Нужно увеличить размер хэш-таблицы, или удалить из неё что-нибудь ненужное. + bool overflow(size_t elems) const { return false; } + + /// Увеличить размер хэш-таблицы. + void increaseSize() + { + size_degree += size_degree >= 23 ? 1 : 2; + } + + /// Установить размер буфера по количеству элементов хэш-таблицы. Используется при десериализации хэш-таблицы. + void set(size_t num_elems) + { + size_degree = num_elems <= 1 + ? initial_size_degree + : ((initial_size_degree > static_cast(log2(num_elems - 1)) + 2) + ? initial_size_degree + : (static_cast(log2(num_elems - 1)) + 2)); + } +}; + + +typedef HashTableWithDump, TrivialHash, HashTableGrower, HashTableAllocator> Set; + + +int main(int argc, char ** argv) +{ +/* Set set; + + set.dump(); + set.insert(37); + set.dump(); + set.insert(21); + set.dump(); + set.insert(5); + set.dump(); + set.insert(6); + set.dump(); + set.insert(22); + set.dump(); + + set.insert(14); + set.dump(); + set.insert(15); + set.dump(); + set.insert(30); + set.dump(); + set.insert(1); + set.dump(); + + std::cerr << std::endl; + for (HashTableMergeCursor it(&set); it.isValid(); it.next()) + std::cerr << it.get() << std::endl; + + set.resize(15); + set.dump(); + + std::cerr << std::endl; + for (HashTableMergeCursor it(&set); it.isValid(); it.next()) + std::cerr << it.get() << std::endl; + + */ + + size_t n = atoi(argv[1]); + std::vector data(n); + + { + Stopwatch watch; + DB::ReadBufferFromFileDescriptor in1(STDIN_FILENO); + DB::CompressedReadBuffer in2(in1); + + in2.readStrict(reinterpret_cast(&data[0]), sizeof(data[0]) * n); + + watch.stop(); + std::cerr << std::fixed << std::setprecision(2) + << "Vector. Size: " << n + << ", elapsed: " << watch.elapsedSeconds() + << " (" << n / watch.elapsedSeconds() << " elem/sec.)" + << std::endl; + } + + { + Stopwatch watch; + + Set set; + + for (size_t i = 0; i < n; ++i) + set.insert(data[i]); + + watch.stop(); + std::cerr << std::fixed << std::setprecision(2) + << "HashSet. Size: " << set.size() + << ", elapsed: " << watch.elapsedSeconds() + << " (" << n / watch.elapsedSeconds() << " elem/sec.)" + << std::endl; + + { + watch.restart(); + + size_t sum = 0; + for (auto x : set) + sum += x; + + watch.stop(); + std::cerr << std::fixed << std::setprecision(2) + << "Iterated in: " << watch.elapsedSeconds() + << " (" << set.size() / watch.elapsedSeconds() << " elem/sec.)" + << " sum = " << sum + << std::endl; + } + + { + watch.restart(); + + size_t sum = 0; + for (HashTableMergeCursor it(&set); it.isValid(); it.next()) + sum += it.get(); + + watch.stop(); + std::cerr << std::fixed << std::setprecision(2) + << "Ordered iterated in: " << watch.elapsedSeconds() + << " (" << set.size() / watch.elapsedSeconds() << " elem/sec.)" + << " sum = " << sum + << std::endl; + } + } + + return 0; +} diff --git a/dbms/src/Common/tests/parallel_aggregation.cpp b/dbms/src/Common/tests/parallel_aggregation.cpp index 7224f4a3dd3..d0477b798ec 100644 --- a/dbms/src/Common/tests/parallel_aggregation.cpp +++ b/dbms/src/Common/tests/parallel_aggregation.cpp @@ -5,8 +5,11 @@ #include +#define DBMS_HASH_MAP_DEBUG_RESIZES + #include #include +#include #include #include @@ -20,7 +23,7 @@ typedef UInt64 Value; typedef std::vector Source; -typedef HashMap Map1; +typedef HashMap Map; struct TwoLevelGrower : public HashTableGrower @@ -67,53 +70,57 @@ template > using TwoLevelHashMap = TwoLevelHashMapTable, Hash, Grower, Allocator>; -typedef TwoLevelHashMap Map2; +typedef TwoLevelHashMap MapTwoLevel; -struct __attribute__((__aligned__(64))) SmallLock +struct __attribute__((__aligned__(64))) AlignedSmallLock : public SmallLock { - std::atomic locked {false}; - char dummy[64 - sizeof(locked)]; - - bool tryLock() - { - bool expected = false; - return locked.compare_exchange_strong(expected, true, std::memory_order_acquire); - } - - void unlock() - { - locked.store(false, std::memory_order_release); - } + char dummy[64 - sizeof(SmallLock)]; }; -void aggregate1(Map1 & map, Source::const_iterator begin, Source::const_iterator end) +struct FixedSizeGrower : public HashTableGrower +{ + static const size_t initial_size_degree = 21; + FixedSizeGrower() { size_degree = initial_size_degree; } +}; + +typedef HashTableWithSmallLocks< + Key, + HashTableCellWithLock< + Key, + HashMapCell > >, + DefaultHash, + FixedSizeGrower, + HashTableAllocator> MapSmallLocks; + + +void aggregate1(Map & map, Source::const_iterator begin, Source::const_iterator end) { for (auto it = begin; it != end; ++it) ++map[*it]; } -void aggregate2(Map2 & map, Source::const_iterator begin, Source::const_iterator end) +void aggregate2(MapTwoLevel & map, Source::const_iterator begin, Source::const_iterator end) { for (auto it = begin; it != end; ++it) ++map[*it]; } -void merge2(Map2 * maps, size_t num_threads, size_t bucket) +void merge2(MapTwoLevel * maps, size_t num_threads, size_t bucket) { for (size_t i = 1; i < num_threads; ++i) for (auto it = maps[i].impls[bucket].begin(); it != maps[i].impls[bucket].end(); ++it) maps[0].impls[bucket][it->first] += it->second; } -void aggregate3(Map1 & local_map, Map1 & global_map, SmallLock & mutex, Source::const_iterator begin, Source::const_iterator end) +void aggregate3(Map & local_map, Map & global_map, AlignedSmallLock & mutex, Source::const_iterator begin, Source::const_iterator end) { static constexpr size_t threshold = 65536; for (auto it = begin; it != end; ++it) { - Map1::iterator found = local_map.find(*it); + Map::iterator found = local_map.find(*it); if (found != local_map.end()) ++found->second; @@ -132,14 +139,14 @@ void aggregate3(Map1 & local_map, Map1 & global_map, SmallLock & mutex, Source:: } } -void aggregate4(Map1 & local_map, Map2 & global_map, SmallLock * mutexes, Source::const_iterator begin, Source::const_iterator end) +void aggregate4(Map & local_map, MapTwoLevel & global_map, AlignedSmallLock * mutexes, Source::const_iterator begin, Source::const_iterator end) { DefaultHash hash; static constexpr size_t threshold = 65536; for (auto it = begin; it != end; ++it) { - Map1::iterator found = local_map.find(*it); + Map::iterator found = local_map.find(*it); if (found != local_map.end()) ++found->second; @@ -152,7 +159,7 @@ void aggregate4(Map1 & local_map, Map2 & global_map, SmallLock * mutexes, Source if (mutexes[bucket].tryLock()) { - Map2::Impl::iterator inserted_it; + MapTwoLevel::Impl::iterator inserted_it; bool inserted; global_map.impls[bucket].emplace(*it, inserted_it, inserted, hash_value); mutexes[bucket].unlock(); @@ -163,6 +170,33 @@ void aggregate4(Map1 & local_map, Map2 & global_map, SmallLock * mutexes, Source } } +void aggregate5(Map & local_map, MapSmallLocks & global_map, Source::const_iterator begin, Source::const_iterator end) +{ +// static constexpr size_t threshold = 65536; + + for (auto it = begin; it != end; ++it) + { +/* Map::iterator found = local_map.find(*it); + + if (found != local_map.end()) + ++found->second; + else if (local_map.size() < threshold) + ++local_map[*it]; /// TODO Можно было бы делать один lookup, а не два. + else*/ + { + SmallScopedLock lock; + MapSmallLocks::iterator insert_it; + bool inserted; + + if (global_map.tryEmplace(*it, insert_it, inserted, lock)) + ++insert_it->second; + else + ++local_map[*it]; + } + } +} + + int main(int argc, char ** argv) { @@ -188,7 +222,7 @@ int main(int argc, char ** argv) << "Vector. Size: " << n << ", elapsed: " << watch.elapsedSeconds() << " (" << n / watch.elapsedSeconds() << " elem/sec.)" - << std::endl; + << std::endl << std::endl; } if (!method || method == 1) @@ -198,7 +232,7 @@ int main(int argc, char ** argv) * Затем сливаем их вместе. */ - Map1 maps[num_threads]; + Map maps[num_threads]; Stopwatch watch; @@ -257,7 +291,7 @@ int main(int argc, char ** argv) * и преимущество в производительности достигает 4 раз. */ - Map2 maps[num_threads]; + MapTwoLevel maps[num_threads]; Stopwatch watch; @@ -287,7 +321,7 @@ int main(int argc, char ** argv) watch.restart(); - for (size_t i = 0; i < Map2::NUM_BUCKETS; ++i) + for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i) pool.schedule(std::bind(merge2, &maps[0], num_threads, i)); @@ -307,7 +341,7 @@ int main(int argc, char ** argv) << std::endl; size_t sum_size = 0; - for (size_t i = 0; i < Map2::NUM_BUCKETS; ++i) + for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i) sum_size += maps[0].impls[i].size(); std::cerr << "Size: " << sum_size << std::endl << std::endl; @@ -325,9 +359,9 @@ int main(int argc, char ** argv) * Этот метод плохой - много contention-а. */ - Map1 local_maps[num_threads]; - Map1 global_map; - SmallLock mutex; + Map local_maps[num_threads]; + Map global_map; + AlignedSmallLock mutex; Stopwatch watch; @@ -391,12 +425,12 @@ int main(int argc, char ** argv) * Если размер локальной хэш-таблицы большой, и в ней нет элемента, * то вставляем его в одну из 256 глобальных хэш-таблиц, каждая из которых под своим mutex-ом. * Затем сливаем все локальные хэш-таблицы в глобальную. - * Этот метод тоже плохой. + * Этот метод не такой уж плохой при большом количестве потоков, но хуже второго. */ - Map1 local_maps[num_threads]; - Map2 global_map; - SmallLock mutexes[Map2::NUM_BUCKETS]; + Map local_maps[num_threads]; + MapTwoLevel global_map; + AlignedSmallLock mutexes[MapTwoLevel::NUM_BUCKETS]; Stopwatch watch; @@ -427,7 +461,7 @@ int main(int argc, char ** argv) std::cerr << std::endl; size_t sum_size = 0; - for (size_t i = 0; i < Map2::NUM_BUCKETS; ++i) + for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i) sum_size += global_map.impls[i].size(); std::cerr << "Size (global): " << sum_size << std::endl; @@ -455,11 +489,78 @@ int main(int argc, char ** argv) << std::endl; sum_size = 0; - for (size_t i = 0; i < Map2::NUM_BUCKETS; ++i) + for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i) sum_size += global_map.impls[i].size(); std::cerr << "Size: " << sum_size << std::endl << std::endl; } + if (!method || method == 5) + { + /** Вариант 5. + * В разных потоках агрегируем независимо в разные хэш-таблицы, + * пока их размер не станет достаточно большим. + * Если размер локальной хэш-таблицы большой, и в ней нет элемента, + * то вставляем его в одну глобальную хэш-таблицу, содержащую маленькие защёлки в каждой ячейке, + * а если защёлку не удалось захватить, то вставляем в локальную. + * Затем сливаем все локальные хэш-таблицы в глобальную. + */ + + Map local_maps[num_threads]; + MapSmallLocks global_map; + + Stopwatch watch; + + for (size_t i = 0; i < num_threads; ++i) + pool.schedule(std::bind(aggregate5, + std::ref(local_maps[i]), + std::ref(global_map), + data.begin() + (data.size() * i) / num_threads, + data.begin() + (data.size() * (i + 1)) / num_threads)); + + pool.wait(); + + watch.stop(); + double time_aggregated = watch.elapsedSeconds(); + std::cerr + << "Aggregated in " << time_aggregated + << " (" << n / time_aggregated << " elem/sec.)" + << std::endl; + + size_t size_before_merge = 0; + std::cerr << "Sizes (local): "; + for (size_t i = 0; i < num_threads; ++i) + { + std::cerr << (i == 0 ? "" : ", ") << local_maps[i].size(); + size_before_merge += local_maps[i].size(); + } + std::cerr << std::endl; + std::cerr << "Size (global): " << global_map.size() << std::endl; + size_before_merge += global_map.size(); + + watch.restart(); + + for (size_t i = 0; i < num_threads; ++i) + for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it) + global_map.insert(std::make_pair(it->first, 0)).first->second += it->second; + + pool.wait(); + + watch.stop(); + double time_merged = watch.elapsedSeconds(); + std::cerr + << "Merged in " << time_merged + << " (" << size_before_merge / time_merged << " elem/sec.)" + << std::endl; + + double time_total = time_aggregated + time_merged; + std::cerr + << "Total in " << time_total + << " (" << n / time_total << " elem/sec.)" + << std::endl; + + std::cerr << "Size: " << global_map.size() << std::endl << std::endl; + } + return 0; }