dbms: continue [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-05-02 19:48:03 +04:00
parent b394bc5a61
commit 4aa1b5963b
2 changed files with 244 additions and 4 deletions

View File

@ -33,8 +33,10 @@ protected:
typedef size_t HashValue;
typedef TwoLevelHashTable<Key, Cell, Hash, Grower, Allocator> Self;
public:
typedef ImplTable Impl;
protected:
size_t m_size = 0; /// Количество элементов
size_t hash(const Key & x) const { return Hash::operator()(x); }

View File

@ -1,5 +1,7 @@
#include <iostream>
#include <iomanip>
#include <mutex>
#include <atomic>
#include <DB/Interpreters/AggregationCommon.h>
@ -68,6 +70,24 @@ using TwoLevelHashMap = TwoLevelHashMapTable<Key, HashMapCell<Key, Mapped, Hash>
typedef TwoLevelHashMap<Key, Value> Map2;
struct __attribute__((__aligned__(64))) SmallLock
{
std::atomic<bool> locked {false};
char dummy[64 - sizeof(locked)];
bool tryLock()
{
bool expected = false;
return locked.compare_exchange_strong(expected, true, std::memory_order_acquire);
}
void unlock()
{
locked.store(false, std::memory_order_release);
}
};
void aggregate1(Map1 & map, Source::const_iterator begin, Source::const_iterator end)
{
for (auto it = begin; it != end; ++it)
@ -87,11 +107,68 @@ void merge2(Map2 * maps, size_t num_threads, size_t bucket)
maps[0].impls[bucket][it->first] += it->second;
}
void aggregate3(Map1 & local_map, Map1 & global_map, SmallLock & mutex, Source::const_iterator begin, Source::const_iterator end)
{
static constexpr size_t threshold = 65536;
for (auto it = begin; it != end; ++it)
{
Map1::iterator found = local_map.find(*it);
if (found != local_map.end())
++found->second;
else if (local_map.size() < threshold)
++local_map[*it]; /// TODO Можно было бы делать один lookup, а не два.
else
{
if (mutex.tryLock())
{
++global_map[*it];
mutex.unlock();
}
else
++local_map[*it];
}
}
}
void aggregate4(Map1 & local_map, Map2 & global_map, SmallLock * mutexes, Source::const_iterator begin, Source::const_iterator end)
{
DefaultHash<Key> hash;
static constexpr size_t threshold = 65536;
for (auto it = begin; it != end; ++it)
{
Map1::iterator found = local_map.find(*it);
if (found != local_map.end())
++found->second;
else if (local_map.size() < threshold)
++local_map[*it]; /// TODO Можно было бы делать один lookup, а не два.
else
{
size_t hash_value = hash(*it);
size_t bucket = hash_value >> 24;
if (mutexes[bucket].tryLock())
{
Map2::Impl::iterator inserted_it;
bool inserted;
global_map.impls[bucket].emplace(*it, inserted_it, inserted, hash_value);
mutexes[bucket].unlock();
}
else
++local_map[*it];
}
}
}
int main(int argc, char ** argv)
{
size_t n = atoi(argv[1]);
size_t num_threads = atoi(argv[2]);
size_t method = argc <= 3 ? 0 : atoi(argv[3]);
std::cerr << std::fixed << std::setprecision(2);
@ -114,6 +191,7 @@ int main(int argc, char ** argv)
<< std::endl;
}
if (!method || method == 1)
{
/** Вариант 1.
* В разных потоках агрегируем независимо в разные хэш-таблицы.
@ -139,9 +217,13 @@ int main(int argc, char ** argv)
<< " (" << n / time_aggregated << " elem/sec.)"
<< std::endl;
size_t size_before_merge = 0;
std::cerr << "Sizes: ";
for (size_t i = 0; i < num_threads; ++i)
{
std::cerr << (i == 0 ? "" : ", ") << maps[i].size();
size_before_merge += maps[i].size();
}
std::cerr << std::endl;
watch.restart();
@ -154,7 +236,7 @@ int main(int argc, char ** argv)
double time_merged = watch.elapsedSeconds();
std::cerr
<< "Merged in " << time_merged
<< " (" << n / time_merged << " elem/sec.)"
<< " (" << size_before_merge / time_merged << " elem/sec.)"
<< std::endl;
double time_total = time_aggregated + time_merged;
@ -162,13 +244,17 @@ int main(int argc, char ** argv)
<< "Total in " << time_total
<< " (" << n / time_total << " elem/sec.)"
<< std::endl;
std::cerr << "Size: " << maps[0].size() << std::endl;
std::cerr << "Size: " << maps[0].size() << std::endl << std::endl;
}
if (!method || method == 2)
{
/** Вариант 2.
* В разных потоках агрегируем независимо в разные two-level хэш-таблицы.
* Затем сливаем их вместе, распараллелив по bucket-ам первого уровня.
* При использовании хэш-таблиц больших размеров (10 млн. элементов и больше),
* и большого количества потоков (8-32), слияние является узким местом,
* и преимущество в производительности достигает 4 раз.
*/
Map2 maps[num_threads];
@ -190,9 +276,13 @@ int main(int argc, char ** argv)
<< " (" << n / time_aggregated << " elem/sec.)"
<< std::endl;
size_t size_before_merge = 0;
std::cerr << "Sizes: ";
for (size_t i = 0; i < num_threads; ++i)
{
std::cerr << (i == 0 ? "" : ", ") << maps[i].size();
size_before_merge += maps[i].size();
}
std::cerr << std::endl;
watch.restart();
@ -207,7 +297,7 @@ int main(int argc, char ** argv)
double time_merged = watch.elapsedSeconds();
std::cerr
<< "Merged in " << time_merged
<< " (" << n / time_merged << " elem/sec.)"
<< " (" << size_before_merge / time_merged << " elem/sec.)"
<< std::endl;
double time_total = time_aggregated + time_merged;
@ -220,7 +310,155 @@ int main(int argc, char ** argv)
for (size_t i = 0; i < Map2::NUM_BUCKETS; ++i)
sum_size += maps[0].impls[i].size();
std::cerr << "Size: " << sum_size << std::endl;
std::cerr << "Size: " << sum_size << std::endl << std::endl;
}
if (!method || method == 3)
{
/** Вариант 3.
* В разных потоках агрегируем независимо в разные хэш-таблицы,
* пока их размер не станет достаточно большим.
* Если размер локальной хэш-таблицы большой, и в ней нет элемента,
* то вставляем его в одну глобальную хэш-таблицу, защищённую mutex-ом,
* а если mutex не удалось захватить, то вставляем в локальную.
* Затем сливаем все локальные хэш-таблицы в глобальную.
* Этот метод плохой - много contention-а.
*/
Map1 local_maps[num_threads];
Map1 global_map;
SmallLock mutex;
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate3,
std::ref(local_maps[i]),
std::ref(global_map),
std::ref(mutex),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
pool.wait();
watch.stop();
double time_aggregated = watch.elapsedSeconds();
std::cerr
<< "Aggregated in " << time_aggregated
<< " (" << n / time_aggregated << " elem/sec.)"
<< std::endl;
size_t size_before_merge = 0;
std::cerr << "Sizes (local): ";
for (size_t i = 0; i < num_threads; ++i)
{
std::cerr << (i == 0 ? "" : ", ") << local_maps[i].size();
size_before_merge += local_maps[i].size();
}
std::cerr << std::endl;
std::cerr << "Size (global): " << global_map.size() << std::endl;
size_before_merge += global_map.size();
watch.restart();
for (size_t i = 0; i < num_threads; ++i)
for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it)
global_map[it->first] += it->second;
pool.wait();
watch.stop();
double time_merged = watch.elapsedSeconds();
std::cerr
<< "Merged in " << time_merged
<< " (" << size_before_merge / time_merged << " elem/sec.)"
<< std::endl;
double time_total = time_aggregated + time_merged;
std::cerr
<< "Total in " << time_total
<< " (" << n / time_total << " elem/sec.)"
<< std::endl;
std::cerr << "Size: " << global_map.size() << std::endl << std::endl;
}
if (!method || method == 4)
{
/** Вариант 4.
* В разных потоках агрегируем независимо в разные хэш-таблицы,
* пока их размер не станет достаточно большим.
* Если размер локальной хэш-таблицы большой, и в ней нет элемента,
* то вставляем его в одну из 256 глобальных хэш-таблиц, каждая из которых под своим mutex-ом.
* Затем сливаем все локальные хэш-таблицы в глобальную.
* Этот метод тоже плохой.
*/
Map1 local_maps[num_threads];
Map2 global_map;
SmallLock mutexes[Map2::NUM_BUCKETS];
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate4,
std::ref(local_maps[i]),
std::ref(global_map),
&mutexes[0],
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
pool.wait();
watch.stop();
double time_aggregated = watch.elapsedSeconds();
std::cerr
<< "Aggregated in " << time_aggregated
<< " (" << n / time_aggregated << " elem/sec.)"
<< std::endl;
size_t size_before_merge = 0;
std::cerr << "Sizes (local): ";
for (size_t i = 0; i < num_threads; ++i)
{
std::cerr << (i == 0 ? "" : ", ") << local_maps[i].size();
size_before_merge += local_maps[i].size();
}
std::cerr << std::endl;
size_t sum_size = 0;
for (size_t i = 0; i < Map2::NUM_BUCKETS; ++i)
sum_size += global_map.impls[i].size();
std::cerr << "Size (global): " << sum_size << std::endl;
size_before_merge += sum_size;
watch.restart();
for (size_t i = 0; i < num_threads; ++i)
for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it)
global_map[it->first] += it->second;
pool.wait();
watch.stop();
double time_merged = watch.elapsedSeconds();
std::cerr
<< "Merged in " << time_merged
<< " (" << size_before_merge / time_merged << " elem/sec.)"
<< std::endl;
double time_total = time_aggregated + time_merged;
std::cerr
<< "Total in " << time_total
<< " (" << n / time_total << " elem/sec.)"
<< std::endl;
sum_size = 0;
for (size_t i = 0; i < Map2::NUM_BUCKETS; ++i)
sum_size += global_map.impls[i].size();
std::cerr << "Size: " << sum_size << std::endl << std::endl;
}
return 0;