dbms: more scalable aggregator: development [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-12-27 09:29:20 +03:00
parent 5b6ce478cf
commit ef160102f6
2 changed files with 12 additions and 38 deletions

View File

@ -4,7 +4,7 @@
/** Двухуровневая хэш-таблица. /** Двухуровневая хэш-таблица.
* Представляет собой 256 маленьких хэш-таблиц (bucket-ов первого уровня). * Представляет собой 256 (или 1 << BITS_FOR_BUCKET) маленьких хэш-таблиц (bucket-ов первого уровня).
* Для определения, какую из них использовать, берётся один из байтов хэш-функции. * Для определения, какую из них использовать, берётся один из байтов хэш-функции.
* *
* Обычно работает чуть-чуть медленнее простой хэш-таблицы. * Обычно работает чуть-чуть медленнее простой хэш-таблицы.
@ -31,7 +31,8 @@ template
typename Hash, typename Hash,
typename Grower, typename Grower,
typename Allocator, /// TODO WithStackMemory typename Allocator, /// TODO WithStackMemory
typename ImplTable = HashTable<Key, Cell, Hash, Grower, Allocator> typename ImplTable = HashTable<Key, Cell, Hash, Grower, Allocator>,
size_t BITS_FOR_BUCKET = 8
> >
class TwoLevelHashTable : class TwoLevelHashTable :
private boost::noncopyable, private boost::noncopyable,
@ -46,8 +47,13 @@ protected:
public: public:
typedef ImplTable Impl; typedef ImplTable Impl;
static constexpr size_t NUM_BUCKETS = 1 << BITS_FOR_BUCKET;
static constexpr size_t MAX_BUCKET = NUM_BUCKETS - 1;
size_t hash(const Key & x) const { return Hash::operator()(x); } size_t hash(const Key & x) const { return Hash::operator()(x); }
size_t getBucketFromHash(size_t hash_value) const { return (hash_value >> 24) & 0xFF; } /// NOTE Плохо для хэш-таблиц больше чем на 2^32 ячеек.
/// NOTE Плохо для хэш-таблиц больше чем на 2^32 ячеек.
size_t getBucketFromHash(size_t hash_value) const { return (hash_value >> (32 - BITS_FOR_BUCKET)) & MAX_BUCKET; }
protected: protected:
typename Impl::iterator beginOfNextNonEmptyBucket(size_t & bucket) typename Impl::iterator beginOfNextNonEmptyBucket(size_t & bucket)
@ -78,8 +84,6 @@ public:
typedef typename Impl::key_type key_type; typedef typename Impl::key_type key_type;
typedef typename Impl::value_type value_type; typedef typename Impl::value_type value_type;
static constexpr size_t NUM_BUCKETS = 256;
static constexpr size_t MAX_BUCKET = NUM_BUCKETS - 1;
Impl impls[NUM_BUCKETS]; Impl impls[NUM_BUCKETS];

View File

@ -21,7 +21,6 @@
typedef UInt64 Key; typedef UInt64 Key;
typedef UInt64 Value; typedef UInt64 Value;
typedef std::vector<Key> Source; typedef std::vector<Key> Source;
@ -136,7 +135,7 @@ struct MergeSequential
}; };
template <typename Map> template <typename Map>
struct MergeSequentialTransposed struct MergeSequentialTransposed /// На практике не лучше обычного.
{ {
template <typename Merger> template <typename Merger>
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map, static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
@ -250,9 +249,8 @@ struct Work
}; };
typedef HashMap<Key, Value, HashCRC32<Key>> Map;
typedef HashMap<Key, Value> Map; typedef TwoLevelHashMap<Key, Value, HashCRC32<Key>> MapTwoLevel;
typedef TwoLevelHashMap<Key, Value> MapTwoLevel;
typedef Poco::FastMutex Mutex; typedef Poco::FastMutex Mutex;
@ -379,20 +377,6 @@ int main(int argc, char ** argv)
MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequential<MapTwoLevel::Impl>> MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequential<MapTwoLevel::Impl>>
>::execute(data, num_threads, creator, updater, merger, pool); >::execute(data, num_threads, creator, updater, merger, pool);
if (!method || method == 11)
Work<
MapTwoLevel,
AggregateIndependent<MapTwoLevel>,
MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequential<MapTwoLevel::Impl>>
>::execute(data, num_threads, creator, updater, merger, pool);
if (!method || method == 12)
Work<
MapTwoLevel,
AggregateIndependentWithSequentialKeysOptimization<MapTwoLevel>,
MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequential<MapTwoLevel::Impl>>
>::execute(data, num_threads, creator, updater, merger, pool);
if (!method || method == 13) if (!method || method == 13)
Work< Work<
MapTwoLevel, MapTwoLevel,
@ -407,19 +391,5 @@ int main(int argc, char ** argv)
MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequentialTransposed<MapTwoLevel::Impl>> MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequentialTransposed<MapTwoLevel::Impl>>
>::execute(data, num_threads, creator, updater, merger, pool); >::execute(data, num_threads, creator, updater, merger, pool);
if (!method || method == 15)
Work<
MapTwoLevel,
AggregateIndependent<MapTwoLevel>,
MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequentialTransposed<MapTwoLevel::Impl>>
>::execute(data, num_threads, creator, updater, merger, pool);
if (!method || method == 16)
Work<
MapTwoLevel,
AggregateIndependentWithSequentialKeysOptimization<MapTwoLevel>,
MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequentialTransposed<MapTwoLevel::Impl>>
>::execute(data, num_threads, creator, updater, merger, pool);
return 0; return 0;
} }