dbms: experiments with hash tables [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-05-03 05:45:34 +04:00
parent 04cb4d878b
commit 2dc708e70c
5 changed files with 221 additions and 69 deletions

View File

@ -71,6 +71,10 @@ struct HashMapCell
DB::assertString(",", rb);
DB::readDoubleQuoted(value.second, rb);
}
/// Нужны, если используется HashTableMergeCursor.
void swap(HashMapCell & rhs) { std::swap(value, rhs.value); }
bool less(HashMapCell & rhs) const { return value.first < rhs.value.first; }
};

View File

@ -178,6 +178,11 @@ struct HashTableGrower
? initial_size_degree
: (static_cast<size_t>(log2(num_elems - 1)) + 2));
}
void setBufSize(size_t buf_size_)
{
size_degree = static_cast<size_t>(log2(buf_size_ - 1) + 1);
}
};
@ -285,7 +290,7 @@ protected:
/// Увеличить размер буфера.
void resize(size_t for_num_elems = 0)
void resize(size_t for_num_elems = 0, size_t for_buf_size = 0)
{
#ifdef DBMS_HASH_MAP_DEBUG_RESIZES
Stopwatch watch;
@ -300,6 +305,12 @@ protected:
if (grower.bufSize() <= old_size)
return;
}
else if (for_buf_size)
{
grower.setBufSize(for_buf_size);
if (grower.bufSize() <= old_size)
return;
}
else
grower.increaseSize();

View File

@ -24,16 +24,18 @@ private:
Table * container;
Cell * ptr;
/** Находимся ли мы в цепочке разрешения коллизий в самом начале буфера?
* В этом случае, цепочка может содержать ключи, которые должны были бы быть в конце буфера.
*/
bool in_first_chain;
/** Начало цепочки разрешения коллизий, которая доходит до конца хэш-таблицы,
* или buf + buf_size, если такой нет.
*/
Cell * begin_of_last_chain;
size_t current_place_value;
/** Находимся ли мы в цепочке разрешения коллизий в самом начале буфера?
* В этом случае, цепочка может содержать ключи, которые должны были бы быть в конце буфера.
*/
bool in_first_chain;
bool overlapped = false;
size_t place(const Cell * ptr) const
@ -140,6 +142,8 @@ public:
if (ptr != cell_with_min_place_value)
ptr->swap(*cell_with_min_place_value);
}
current_place_value = min_place_value;
}
bool isValid() const
@ -157,19 +161,10 @@ public:
return ptr;
}
/// Для priority_queue (инвертировано).
bool operator< (const HashTableMergeCursor & rhs) const
{
if (!isValid())
return true;
if (!rhs.isValid())
return false;
size_t lhs_place = place(ptr);
size_t rhs_place = place(rhs.ptr);
return lhs_place > rhs_place
|| (lhs_place == rhs_place && !ptr->less(*rhs.ptr));
return current_place_value < rhs.current_place_value
|| (current_place_value == rhs.current_place_value && ptr->less(*rhs.ptr));
}
};
@ -189,33 +184,41 @@ void processMergedHashTables(std::vector<Table*> & tables, MergeFunction && merg
size_t tables_size = tables.size();
/// Определим максимальный размер таблиц.
size_t max_size = 0;
size_t max_buf_size = 0;
for (size_t i = 0; i < tables_size; ++i)
if (tables[i].size() > max_size)
max_size = tables[i].size();
if (tables[i]->grower.bufSize() > max_buf_size)
max_buf_size = tables[i]->grower.bufSize();
std::cerr << "max_buf_size: " << max_buf_size << std::endl;
/// Ресайзим все таблицы к этому размеру.
for (size_t i = 0; i < tables_size; ++i)
if (tables[i].size() < max_size)
tables[i].resize(max_size);
if (tables[i]->grower.bufSize() < max_buf_size)
tables[i]->resize(0, max_buf_size);
typedef std::priority_queue<Cursor> Queue;
for (size_t i = 0; i < tables_size; ++i)
std::cerr << "buf_size: " << tables[i]->grower.bufSize() << std::endl;
typedef std::vector<Cursor> Queue;
Queue queue;
queue.reserve(tables_size);
for (size_t i = 0; i < tables_size; ++i)
{
Cursor cursor(&tables[i]);
Cursor cursor(tables[i]);
if (cursor.isValid())
queue.push(cursor);
queue.emplace_back(cursor);
}
Cell * prev_cell = nullptr;
while (!queue.empty())
{
Cursor cursor = queue.top();
queue.pop();
size_t min_pos = 0;
for (size_t i = 1, size = queue.size(); i < size; ++i)
if (queue[i] < queue[min_pos])
min_pos = i;
Cell * current_cell = cursor.getCell();
Cell * current_cell = queue[min_pos].getCell();
if (!prev_cell)
{
@ -231,9 +234,9 @@ void processMergedHashTables(std::vector<Table*> & tables, MergeFunction && merg
merge_func(prev_cell->getValue(), current_cell->getValue());
}
cursor.next();
if (cursor.isValid())
queue.push(cursor);
queue[min_pos].next();
if (!queue[min_pos].isValid())
queue.erase(queue.begin() + min_pos);
}
if (prev_cell)

View File

@ -3,13 +3,14 @@
#include <mutex>
#include <atomic>
#include <DB/Interpreters/AggregationCommon.h>
#define DBMS_HASH_MAP_DEBUG_RESIZES
#include <DB/Interpreters/AggregationCommon.h>
#include <DB/Common/HashTable/HashMap.h>
#include <DB/Common/HashTable/TwoLevelHashTable.h>
#include <DB/Common/HashTable/HashTableWithSmallLocks.h>
//#include <DB/Common/HashTable/HashTableWithSmallLocks.h>
#include <DB/Common/HashTable/HashTableMerge.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/CompressedReadBuffer.h>
@ -73,6 +74,22 @@ using TwoLevelHashMap = TwoLevelHashMapTable<Key, HashMapCell<Key, Mapped, Hash>
typedef TwoLevelHashMap<Key, Value> MapTwoLevel;
struct SmallLock
{
std::atomic<int> locked {false};
bool tryLock()
{
int expected = 0;
return locked.compare_exchange_strong(expected, 1, std::memory_order_acquire);
}
void unlock()
{
locked.store(0, std::memory_order_release);
}
};
struct __attribute__((__aligned__(64))) AlignedSmallLock : public SmallLock
{
char dummy[64 - sizeof(SmallLock)];
@ -85,14 +102,14 @@ struct FixedSizeGrower : public HashTableGrower
FixedSizeGrower() { size_degree = initial_size_degree; }
};
typedef HashTableWithSmallLocks<
/*typedef HashTableWithSmallLocks<
Key,
HashTableCellWithLock<
Key,
HashMapCell<Key, Value, DefaultHash<Key> > >,
DefaultHash<Key>,
FixedSizeGrower,
HashTableAllocator> MapSmallLocks;
HashTableAllocator> MapSmallLocks;*/
void aggregate1(Map & map, Source::const_iterator begin, Source::const_iterator end)
@ -169,20 +186,20 @@ void aggregate4(Map & local_map, MapTwoLevel & global_map, AlignedSmallLock * mu
}
}
}
/*
void aggregate5(Map & local_map, MapSmallLocks & global_map, Source::const_iterator begin, Source::const_iterator end)
{
// static constexpr size_t threshold = 65536;
static constexpr size_t threshold = 65536;
for (auto it = begin; it != end; ++it)
{
/* Map::iterator found = local_map.find(*it);
Map::iterator found = local_map.find(*it);
if (found != local_map.end())
++found->second;
else if (local_map.size() < threshold)
++local_map[*it]; /// TODO Можно было бы делать один lookup, а не два.
else*/
else
{
SmallScopedLock lock;
MapSmallLocks::iterator insert_it;
@ -194,7 +211,7 @@ void aggregate5(Map & local_map, MapSmallLocks & global_map, Source::const_itera
++local_map[*it];
}
}
}
}*/
@ -495,9 +512,9 @@ int main(int argc, char ** argv)
std::cerr << "Size: " << sum_size << std::endl << std::endl;
}
if (!method || method == 5)
/* if (!method || method == 5)
{
/** Вариант 5.
*/ /** Вариант 5.
* В разных потоках агрегируем независимо в разные хэш-таблицы,
* пока их размер не станет достаточно большим.
* Если размер локальной хэш-таблицы большой, и в ней нет элемента,
@ -505,7 +522,7 @@ int main(int argc, char ** argv)
* а если защёлку не удалось захватить, то вставляем в локальную.
* Затем сливаем все локальные хэш-таблицы в глобальную.
*/
/*
Map local_maps[num_threads];
MapSmallLocks global_map;
@ -560,6 +577,78 @@ int main(int argc, char ** argv)
<< std::endl;
std::cerr << "Size: " << global_map.size() << std::endl << std::endl;
}*/
if (!method || method == 6)
{
/** Вариант 6.
* В разных потоках агрегируем независимо в разные хэш-таблицы.
* Затем "сливаем" их, проходя по ним в одинаковом порядке ключей.
* Довольно тормозной вариант.
*/
Map maps[num_threads];
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
pool.wait();
watch.stop();
double time_aggregated = watch.elapsedSeconds();
std::cerr
<< "Aggregated in " << time_aggregated
<< " (" << n / time_aggregated << " elem/sec.)"
<< std::endl;
size_t size_before_merge = 0;
std::cerr << "Sizes: ";
for (size_t i = 0; i < num_threads; ++i)
{
std::cerr << (i == 0 ? "" : ", ") << maps[i].size();
size_before_merge += maps[i].size();
}
std::cerr << std::endl;
watch.restart();
typedef std::vector<Map *> Maps;
Maps maps_to_merge(num_threads);
for (size_t i = 0; i < num_threads; ++i)
maps_to_merge[i] = &maps[i];
size_t size = 0;
for (size_t i = 0; i < 100; ++i)
processMergedHashTables(maps_to_merge,
[] (Map::value_type & dst, const Map::value_type & src) { dst.second += src.second; },
[&] (const Map::value_type & dst) { ++size; });
/* size_t sum = 0;
for (size_t i = 0; i < num_threads; ++i)
for (HashTableMergeCursor<Map> it(&maps[i]); it.isValid(); it.next())
sum += it.get().second;
std::cerr << "sum: " << sum << std::endl;*/
watch.stop();
double time_merged = watch.elapsedSeconds();
std::cerr
<< "Merged in " << time_merged
<< " (" << size_before_merge / time_merged << " elem/sec.)"
<< std::endl;
double time_total = time_aggregated + time_merged;
std::cerr
<< "Total in " << time_total
<< " (" << n / time_total << " elem/sec.)"
<< std::endl;
std::cerr << "Size: " << size << std::endl << std::endl;
}
return 0;

View File

@ -21,7 +21,67 @@
#include <DB/Interpreters/AggregationCommon.h>
typedef StringRef Key;
struct CompactStringRef
{
union
{
const char * data_mixed = nullptr;
struct
{
char dummy[6];
UInt16 size;
};
};
CompactStringRef(const char * data_, size_t size_)
{
data_mixed = data_;
size = size_;
}
CompactStringRef(const unsigned char * data_, size_t size_) : CompactStringRef(reinterpret_cast<const char *>(data_), size_) {}
CompactStringRef(const std::string & s) : CompactStringRef(s.data(), s.size()) {}
CompactStringRef() {}
const char * data() const { return reinterpret_cast<const char *>(reinterpret_cast<intptr_t>(data_mixed) & 0x0000FFFFFFFFFFFFULL); }
std::string toString() const { return std::string(data(), size); }
};
inline bool operator==(CompactStringRef lhs, CompactStringRef rhs)
{
if (lhs.size != rhs.size)
return false;
const char * lhs_data = lhs.data();
const char * rhs_data = rhs.data();
for (size_t pos = lhs.size - 1; pos < lhs.size; --pos)
if (lhs_data[pos] != rhs_data[pos])
return false;
return true;
}
namespace ZeroTraits
{
template <>
inline bool check<CompactStringRef>(CompactStringRef x) { return nullptr == x.data_mixed; }
template <>
inline void set<CompactStringRef>(CompactStringRef & x) { x.data_mixed = nullptr; }
};
template <>
struct DefaultHash<CompactStringRef>
{
size_t operator() (CompactStringRef x) const
{
return CityHash64(x.data(), x.size);
}
};
typedef CompactStringRef Key;
typedef UInt64 Value;
struct CellWithSavedHash : public HashMapCell<Key, Value, DefaultHash<Key> >
@ -32,35 +92,20 @@ struct CellWithSavedHash : public HashMapCell<Key, Value, DefaultHash<Key> >
CellWithSavedHash(const Key & key_, const State & state) : HashMapCell(key_, state) {}
CellWithSavedHash(const value_type & value_, const State & state) : HashMapCell(value_, state) {}
static bool equals(const StringRef & lhs, const StringRef & rhs)
/* static bool equals(const StringRef & lhs, const StringRef & rhs)
{
if (lhs.size != rhs.size)
return false;
/* for (size_t pos = lhs.size - 1; pos < lhs.size; --pos)
if (lhs.data[pos] != rhs.data[pos])
return false;*/
size_t pos = 0;
while (pos < lhs.size)
{
if (*reinterpret_cast<const UInt64 *>(&lhs.data[pos]) != *reinterpret_cast<const UInt64 *>(&rhs.data[pos]))
return false;
pos += 8;
}
/* while (pos < lhs.size)
{
for (size_t pos = lhs.size - 1; pos < lhs.size; --pos)
if (lhs.data[pos] != rhs.data[pos])
return false;
++pos;
}*/
return true;
}
}*/
bool keyEquals(const Key & key_) const { return value.first == key_; }
bool keyEquals(const CellWithSavedHash & other) const { return saved_hash == other.saved_hash && equals(value.first, other.value.first); }
bool keyEquals(const CellWithSavedHash & other) const { return saved_hash == other.saved_hash && value.first == other.value.first; }
void setHash(size_t hash_value) { saved_hash = hash_value; }
size_t getHash(const DefaultHash<Key> & hash) const { return saved_hash; }
@ -125,7 +170,7 @@ int main(int argc, char ** argv)
for (size_t i = 0; i < n && !in2.eof(); ++i)
{
DB::readStringBinary(tmp, in2);
data[i] = StringRef(pool.insert(tmp.data(), tmp.size()), tmp.size());
data[i] = Key(pool.insert(tmp.data(), tmp.size()), tmp.size());
}
watch.stop();
@ -136,7 +181,7 @@ int main(int argc, char ** argv)
<< std::endl;
}
if (m == 1)
if (!m || m == 1)
{
Stopwatch watch;
@ -168,7 +213,7 @@ int main(int argc, char ** argv)
<< std::endl;
}
if (m == 2)
if (!m || m == 2)
{
Stopwatch watch;
@ -184,7 +229,7 @@ int main(int argc, char ** argv)
<< std::endl;
}
if (m == 3)
if (!m || m == 3)
{
Stopwatch watch;
@ -201,7 +246,7 @@ int main(int argc, char ** argv)
<< std::endl;
}
if (m == 4)
if (!m || m == 4)
{
Stopwatch watch;