Merge pull request #6873 from yandex/aku/internal-iteration

Use internal iteration over hash tables in Aggregator.
This commit is contained in:
akuzm 2019-09-10 22:02:53 +03:00 committed by GitHub
commit ab85d145a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 162 additions and 88 deletions

View File

@ -60,12 +60,6 @@ struct HashMethodOneNumber
/// Is used for default implementation in HashMethodBase.
FieldType getKeyHolder(size_t row, Arena &) const { return unalignedLoad<FieldType>(vec + row * sizeof(FieldType)); }
/// Get StringRef from value which can be inserted into column.
static StringRef getValueRef(const Value & value)
{
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
};
@ -102,8 +96,6 @@ struct HashMethodString
}
}
static StringRef getValueRef(const Value & value) { return value.first; }
protected:
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
};
@ -142,8 +134,6 @@ struct HashMethodFixedString
}
}
static StringRef getValueRef(const Value & value) { return value.first; }
protected:
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
};
@ -560,11 +550,6 @@ struct HashMethodHashed
{
return hash128(row, key_columns.size(), key_columns);
}
static ALWAYS_INLINE StringRef getValueRef(const Value & value)
{
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
};
}

View File

@ -36,7 +36,6 @@ struct FixedClearableHashMapCell
}
Key key;
FixedClearableHashMapCell * ptr;
Key & getFirstMutable() { return key; }
const Key & getFirst() const { return key; }
Mapped & getSecond() { return ptr->mapped; }
const Mapped & getSecond() const { return *ptr->mapped; }

View File

@ -23,7 +23,6 @@ struct FixedClearableHashTableCell
struct CellExt
{
Key key;
value_type & getValueMutable() { return key; }
const value_type & getValue() const { return key; }
void update(Key && key_, FixedClearableHashTableCell *) { key = key_; }
};

View File

@ -39,7 +39,6 @@ struct FixedHashMapCell
Key key;
FixedHashMapCell * ptr;
Key & getFirstMutable() { return key; }
const Key & getFirst() const { return key; }
Mapped & getSecond() { return ptr->mapped; }
const Mapped & getSecond() const { return ptr->mapped; }
@ -53,12 +52,53 @@ class FixedHashMap : public FixedHashTable<Key, FixedHashMapCell<Key, Mapped>, A
{
public:
using Base = FixedHashTable<Key, FixedHashMapCell<Key, Mapped>, Allocator>;
using Self = FixedHashMap;
using key_type = Key;
using mapped_type = Mapped;
using value_type = typename Base::cell_type::value_type;
using Cell = typename Base::cell_type;
using value_type = typename Cell::value_type;
using Base::Base;
template <typename Func>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{
for (auto it = this->begin(), end = this->end(); it != end; ++it)
{
decltype(it) res_it;
bool inserted;
that.emplace(it->getFirst(), res_it, inserted, it.getHash());
func(res_it->getSecond(), it->getSecond(), inserted);
}
}
template <typename Func>
void ALWAYS_INLINE mergeToViaFind(Self & that, Func && func)
{
for (auto it = this->begin(), end = this->end(); it != end; ++it)
{
decltype(it) res_it = that.find(it->getFirst(), it.getHash());
if (res_it == that.end())
func(it->getSecond(), it->getSecond(), false);
else
func(res_it->getSecond(), it->getSecond(), true);
}
}
template <typename Func>
void forEachValue(Func && func)
{
for (auto & v : *this)
func(v.getFirst(), v.getSecond());
}
template <typename Func>
void forEachMapped(Func && func)
{
for (auto & v : *this)
func(v.getSecond());
}
mapped_type & ALWAYS_INLINE operator[](Key x)
{
typename Base::iterator it;

View File

@ -28,7 +28,6 @@ struct FixedHashTableCell
{
Key key;
value_type & getValueMutable() { return key; }
const value_type & getValue() const { return key; }
void update(Key && key_, FixedHashTableCell *) { key = key_; }
};

View File

@ -49,12 +49,10 @@ struct HashMapCell
HashMapCell(const Key & key_, const State &) : value(key_, NoInitTag()) {}
HashMapCell(const value_type & value_, const State &) : value(value_) {}
Key & getFirstMutable() { return value.first; }
const Key & getFirst() const { return value.first; }
Mapped & getSecond() { return value.second; }
const Mapped & getSecond() const { return value.second; }
value_type & getValueMutable() { return value; }
const value_type & getValue() const { return value; }
static const Key & getKey(const value_type & value) { return value.first; }
@ -137,12 +135,65 @@ template <
class HashMapTable : public HashTable<Key, Cell, Hash, Grower, Allocator>
{
public:
using Self = HashMapTable;
using key_type = Key;
using mapped_type = typename Cell::Mapped;
using value_type = typename Cell::value_type;
using HashTable<Key, Cell, Hash, Grower, Allocator>::HashTable;
/// Merge every cell's value of current map into the destination map via emplace.
/// Func should have signature void(Mapped & dst, Mapped & src, bool emplaced).
/// Each filled cell in current map will invoke func once. If that map doesn't
/// have a key equals to the given cell, a new cell gets emplaced into that map,
/// and func is invoked with the third argument emplaced set to true. Otherwise
/// emplaced is set to false.
template <typename Func>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{
for (auto it = this->begin(), end = this->end(); it != end; ++it)
{
decltype(it) res_it;
bool inserted;
that.emplace(it->getFirst(), res_it, inserted, it.getHash());
func(res_it->getSecond(), it->getSecond(), inserted);
}
}
/// Merge every cell's value of current map into the destination map via find.
/// Func should have signature void(Mapped & dst, Mapped & src, bool exist).
/// Each filled cell in current map will invoke func once. If that map doesn't
/// have a key equals to the given cell, func is invoked with the third argument
/// exist set to false. Otherwise exist is set to true.
template <typename Func>
void ALWAYS_INLINE mergeToViaFind(Self & that, Func && func)
{
for (auto it = this->begin(), end = this->end(); it != end; ++it)
{
decltype(it) res_it = that.find(it->getFirst(), it.getHash());
if (res_it == that.end())
func(it->getSecond(), it->getSecond(), false);
else
func(res_it->getSecond(), it->getSecond(), true);
}
}
/// Call func(const Key &, Mapped &) for each hash map element.
template <typename Func>
void forEachValue(Func && func)
{
for (auto & v : *this)
func(v.getFirst(), v.getSecond());
}
/// Call func(Mapped &) for each hash map element.
template <typename Func>
void forEachMapped(Func && func)
{
for (auto & v : *this)
func(v.getSecond());
}
mapped_type & ALWAYS_INLINE operator[](Key x)
{
typename HashMapTable::iterator it;

View File

@ -98,7 +98,6 @@ struct HashTableCell
HashTableCell(const Key & key_, const State &) : key(key_) {}
/// Get what the value_type of the container will be.
value_type & getValueMutable() { return key; }
const value_type & getValue() const { return key; }
/// Get the key.

View File

@ -22,6 +22,13 @@ public:
using TwoLevelHashTable<Key, Cell, Hash, Grower, Allocator, ImplTable<Key, Cell, Hash, Grower, Allocator>>::TwoLevelHashTable;
template <typename Func>
void ALWAYS_INLINE forEachMapped(Func && func)
{
for (auto i = 0u; i < this->NUM_BUCKETS; ++i)
this->impls[i].forEachMapped(func);
}
mapped_type & ALWAYS_INLINE operator[](Key x)
{
typename TwoLevelHashMapTable::iterator it;

View File

@ -901,15 +901,15 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
}
}
for (const auto & value : data)
data.forEachValue([&](const auto & key, auto & mapped)
{
method.insertKeyIntoColumns(value.getValue(), key_columns, key_sizes);
method.insertKeyIntoColumns(key, key_columns, key_sizes);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
value.getSecond() + offsets_of_aggregate_states[i],
mapped + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
}
});
destroyImpl<Method>(data);
}
@ -932,16 +932,16 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
}
}
for (auto & value : data)
data.forEachValue([&](const auto & key, auto & mapped)
{
method.insertKeyIntoColumns(value.getValue(), key_columns, key_sizes);
method.insertKeyIntoColumns(key, key_columns, key_sizes);
/// reserved, so push_back does not throw exceptions
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(value.getSecond() + offsets_of_aggregate_states[i]);
aggregate_columns[i]->push_back(mapped + offsets_of_aggregate_states[i]);
value.getSecond() = nullptr;
}
mapped = nullptr;
});
}
@ -1274,32 +1274,27 @@ void NO_INLINE Aggregator::mergeDataImpl(
if constexpr (Method::low_cardinality_optimization)
mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
for (auto it = table_src.begin(), end = table_src.end(); it != end; ++it)
table_src.mergeToViaEmplace(table_dst,
[&](AggregateDataPtr & dst, AggregateDataPtr & src, bool inserted)
{
typename Table::iterator res_it;
bool inserted;
table_dst.emplace(it->getFirst(), res_it, inserted, it.getHash());
if (!inserted)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
res_it->getSecond() + offsets_of_aggregate_states[i],
it->getSecond() + offsets_of_aggregate_states[i],
dst + offsets_of_aggregate_states[i],
src + offsets_of_aggregate_states[i],
arena);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(
it->getSecond() + offsets_of_aggregate_states[i]);
aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]);
}
else
{
res_it->getSecond() = it->getSecond();
}
it->getSecond() = nullptr;
dst = src;
}
src = nullptr;
});
table_src.clearAndShrink();
}
@ -1315,26 +1310,21 @@ void NO_INLINE Aggregator::mergeDataNoMoreKeysImpl(
if constexpr (Method::low_cardinality_optimization)
mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
for (auto it = table_src.begin(), end = table_src.end(); it != end; ++it)
table_src.mergeToViaFind(table_dst, [&](AggregateDataPtr dst, AggregateDataPtr & src, bool found)
{
typename Table::iterator res_it = table_dst.find(it->getFirst(), it.getHash());
AggregateDataPtr res_data = table_dst.end() == res_it
? overflows
: res_it->getSecond();
AggregateDataPtr res_data = found ? dst : overflows;
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
it->getSecond() + offsets_of_aggregate_states[i],
src + offsets_of_aggregate_states[i],
arena);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(it->getSecond() + offsets_of_aggregate_states[i]);
it->getSecond() = nullptr;
}
aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]);
src = nullptr;
});
table_src.clearAndShrink();
}
@ -1348,27 +1338,23 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
if constexpr (Method::low_cardinality_optimization)
mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
for (auto it = table_src.begin(); it != table_src.end(); ++it)
table_src.mergeToViaFind(table_dst,
[&](AggregateDataPtr dst, AggregateDataPtr & src, bool found)
{
decltype(it) res_it = table_dst.find(it->getFirst(), it.getHash());
if (table_dst.end() == res_it)
continue;
AggregateDataPtr res_data = res_it->getSecond();
if (!found)
return;
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
it->getSecond() + offsets_of_aggregate_states[i],
dst + offsets_of_aggregate_states[i],
src + offsets_of_aggregate_states[i],
arena);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(it->getSecond() + offsets_of_aggregate_states[i]);
it->getSecond() = nullptr;
}
aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]);
src = nullptr;
});
table_src.clearAndShrink();
}
@ -2217,23 +2203,21 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
template <typename Method, typename Table>
void NO_INLINE Aggregator::destroyImpl(Table & table) const
{
for (auto elem : table)
table.forEachMapped([&](AggregateDataPtr & data)
{
AggregateDataPtr & data = elem.getSecond();
/** If an exception (usually a lack of memory, the MemoryTracker throws) arose
* after inserting the key into a hash table, but before creating all states of aggregate functions,
* then data will be equal nullptr.
*/
if (nullptr == data)
continue;
return;
for (size_t i = 0; i < params.aggregates_size; ++i)
if (!aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
data = nullptr;
}
});
}

View File

@ -182,9 +182,11 @@ struct AggregationMethodOneNumber
static const bool low_cardinality_optimization = false;
// Insert the key from the hash table into columns.
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
static void insertKeyIntoColumns(const Key & key, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
{
static_cast<ColumnVectorHelper *>(key_columns[0].get())->insertRawData<sizeof(FieldType)>(reinterpret_cast<const char *>(&value.first));
auto key_holder = reinterpret_cast<const char *>(&key);
auto column = static_cast<ColumnVectorHelper *>(key_columns[0].get());
column->insertRawData<sizeof(FieldType)>(key_holder);
}
};
@ -208,9 +210,9 @@ struct AggregationMethodString
static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &)
{
key_columns[0]->insertData(value.first.data, value.first.size);
key_columns[0]->insertData(key.data, key.size);
}
};
@ -234,9 +236,9 @@ struct AggregationMethodFixedString
static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &)
{
key_columns[0]->insertData(value.first.data, value.first.size);
key_columns[0]->insertData(key.data, key.size);
}
};
@ -262,10 +264,19 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
static const bool low_cardinality_optimization = true;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns_low_cardinality, const Sizes & /*key_sizes*/)
static void insertKeyIntoColumns(const Key & key,
MutableColumns & key_columns_low_cardinality, const Sizes & /*key_sizes*/)
{
auto ref = BaseState::getValueRef(value);
assert_cast<ColumnLowCardinality *>(key_columns_low_cardinality[0].get())->insertData(ref.data, ref.size);
auto col = assert_cast<ColumnLowCardinality *>(key_columns_low_cardinality[0].get());
if constexpr (std::is_same_v<Key, StringRef>)
{
col->insertData(key.data, key.size);
}
else
{
col->insertData(reinterpret_cast<const char *>(&key), sizeof(key));
}
}
};
@ -293,7 +304,7 @@ struct AggregationMethodKeysFixed
static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & key_sizes)
static void insertKeyIntoColumns(const Key & key, MutableColumns & key_columns, const Sizes & key_sizes)
{
size_t keys_size = key_columns.size();
@ -330,7 +341,7 @@ struct AggregationMethodKeysFixed
/// corresponding key is nullable. Update the null map accordingly.
size_t bucket = i / 8;
size_t offset = i % 8;
UInt8 val = (reinterpret_cast<const UInt8 *>(&value.first)[bucket] >> offset) & 1;
UInt8 val = (reinterpret_cast<const UInt8 *>(&key)[bucket] >> offset) & 1;
null_map->insertValue(val);
is_null = val == 1;
}
@ -340,7 +351,7 @@ struct AggregationMethodKeysFixed
else
{
size_t size = key_sizes[i];
observed_column->insertData(reinterpret_cast<const char *>(&value.first) + pos, size);
observed_column->insertData(reinterpret_cast<const char *>(&key) + pos, size);
pos += size;
}
}
@ -371,9 +382,9 @@ struct AggregationMethodSerialized
static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &)
{
auto pos = value.first.data;
auto pos = key.data;
for (auto & column : key_columns)
pos = column->deserializeAndInsertFromArena(pos);
}