Merge pull request #3138 from yandex/low-cardinality-group-by

Low cardinality group by
This commit is contained in:
alexey-milovidov 2018-09-21 13:38:20 +03:00 committed by GitHub
commit 4a54a1c310
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 935 additions and 89 deletions

View File

@ -95,6 +95,10 @@ public:
index.setColumn(getRawColumnPtr());
}
const UInt64 * tryGetSavedHash() const override { return index.tryGetSavedHash(); }
UInt128 getHash() const override { return hash.getHash(*getRawColumnPtr()); }
private:
ColumnPtr column_holder;
@ -105,6 +109,21 @@ private:
mutable ColumnPtr cached_null_mask;
mutable ColumnPtr cached_column_nullable;
class IncrementalHash
{
private:
UInt128 hash;
std::atomic<size_t> num_added_rows;
std::mutex mutex;
public:
IncrementalHash() : num_added_rows(0) {}
UInt128 getHash(const ColumnType & column);
};
mutable IncrementalHash hash;
static size_t numSpecialValues(bool is_nullable) { return is_nullable ? 2 : 1; }
size_t numSpecialValues() const { return numSpecialValues(is_nullable); }
@ -513,4 +532,30 @@ IColumnUnique::IndexesWithOverflow ColumnUnique<ColumnType>::uniqueInsertRangeWi
return indexes_with_overflow;
}
template <typename ColumnType>
UInt128 ColumnUnique<ColumnType>::IncrementalHash::getHash(const ColumnType & column)
{
size_t column_size = column.size();
UInt128 cur_hash;
if (column_size != num_added_rows.load())
{
SipHash sip_hash;
for (size_t i = 0; i < column_size; ++i)
column.updateHashWithValue(i, sip_hash);
std::lock_guard lock(mutex);
sip_hash.get128(hash.low, hash.high);
cur_hash = hash;
num_added_rows.store(column_size);
}
else
{
std::lock_guard lock(mutex);
cur_hash = hash;
}
return cur_hash;
}
}

View File

@ -275,7 +275,7 @@ void ColumnWithDictionary::getPermutation(bool reverse, size_t limit, int nan_di
size_t perm_size = std::min(indexes_size, limit);
res.resize(perm_size);
size_t perm_index = 0;
for (size_t row = 0; row < indexes_size && perm_index < perm_size; ++row)
for (size_t row = 0; row < unique_perm.size() && perm_index < perm_size; ++row)
{
const auto & row_indexes = indexes_per_row[unique_perm[row]];
for (auto row_index : row_indexes)
@ -356,6 +356,17 @@ ColumnWithDictionary::getMinimalDictionaryEncodedColumn(size_t offset, size_t li
return {std::move(sub_keys), std::move(sub_indexes)};
}
ColumnPtr ColumnWithDictionary::countKeys() const
{
const auto & nested_column = getDictionary().getNestedColumn();
size_t dict_size = nested_column->size();
auto counter = ColumnUInt64::create(dict_size, 0);
idx.countKeys(counter->getData());
return std::move(counter);
}
ColumnWithDictionary::Index::Index() : positions(ColumnUInt8::create()), size_of_type(sizeof(UInt8)) {}
@ -430,6 +441,18 @@ typename ColumnVector<IndexType>::Container & ColumnWithDictionary::Index::getPo
return positions_ptr->getData();
}
template <typename IndexType>
const typename ColumnVector<IndexType>::Container & ColumnWithDictionary::Index::getPositionsData() const
{
const auto * positions_ptr = typeid_cast<const ColumnVector<IndexType> *>(positions.get());
if (!positions_ptr)
throw Exception("Invalid indexes type for ColumnWithDictionary."
" Expected UInt" + toString(8 * sizeof(IndexType)) + ", got " + positions->getName(),
ErrorCodes::LOGICAL_ERROR);
return positions_ptr->getData();
}
template <typename IndexType>
void ColumnWithDictionary::Index::convertPositions()
{
@ -486,6 +509,19 @@ UInt64 ColumnWithDictionary::Index::getMaxPositionForCurrentType() const
return value;
}
size_t ColumnWithDictionary::Index::getPositionAt(size_t row) const
{
size_t pos;
auto getPosition = [&](auto type)
{
using CurIndexType = decltype(type);
pos = getPositionsData<CurIndexType>()[row];
};
callForType(std::move(getPosition), size_of_type);
return pos;
}
void ColumnWithDictionary::Index::insertPosition(UInt64 position)
{
while (position > getMaxPositionForCurrentType())
@ -572,6 +608,18 @@ void ColumnWithDictionary::Index::checkSizeOfType()
", but positions are " + positions->getName(), ErrorCodes::LOGICAL_ERROR);
}
void ColumnWithDictionary::Index::countKeys(ColumnUInt64::Container & counts) const
{
auto counter = [&](auto x)
{
using CurIndexType = decltype(x);
auto & data = getPositionsData<CurIndexType>();
for (auto pos : data)
++counts[pos];
};
callForType(std::move(counter), size_of_type);
}
ColumnWithDictionary::Dictionary::Dictionary(MutableColumnPtr && column_unique_)
: column_unique(std::move(column_unique_))

View File

@ -140,17 +140,34 @@ public:
bool withDictionary() const override { return true; }
const IColumnUnique & getDictionary() const { return dictionary.getColumnUnique(); }
const ColumnPtr & getDictionaryPtr() const { return dictionary.getColumnUniquePtr(); }
/// IColumnUnique & getUnique() { return static_cast<IColumnUnique &>(*column_unique->assumeMutable()); }
/// ColumnPtr getUniquePtr() const { return column_unique; }
/// IColumn & getIndexes() { return idx.getPositions()->assumeMutableRef(); }
const IColumn & getIndexes() const { return *idx.getPositions(); }
const ColumnPtr & getIndexesPtr() const { return idx.getPositions(); }
size_t getSizeOfIndexType() const { return idx.getSizeOfIndexType(); }
ALWAYS_INLINE size_t getIndexAt(size_t row) const
{
const IColumn * indexes = &getIndexes();
switch (idx.getSizeOfIndexType())
{
case sizeof(UInt8): return static_cast<const ColumnUInt8 *>(indexes)->getElement(row);
case sizeof(UInt16): return static_cast<const ColumnUInt16 *>(indexes)->getElement(row);
case sizeof(UInt32): return static_cast<const ColumnUInt32 *>(indexes)->getElement(row);
case sizeof(UInt64): return static_cast<const ColumnUInt64 *>(indexes)->getElement(row);
default: throw Exception("Unexpected size of index type for low cardinality column.", ErrorCodes::LOGICAL_ERROR);
}
}
///void setIndexes(MutableColumnPtr && indexes_) { indexes = std::move(indexes_); }
/// Set shared ColumnUnique for empty column with dictionary.
void setSharedDictionary(const ColumnPtr & column_unique);
bool isSharedDictionary() const { return dictionary.isShared(); }
/// Create column new dictionary with only keys that are mentioned in index.
MutablePtr compact();
@ -166,6 +183,8 @@ public:
DictionaryEncodedColumn getMinimalDictionaryEncodedColumn(size_t offset, size_t limit) const;
ColumnPtr countKeys() const;
class Index
{
public:
@ -176,6 +195,7 @@ public:
const ColumnPtr & getPositions() const { return positions; }
ColumnPtr & getPositionsPtr() { return positions; }
size_t getPositionAt(size_t row) const;
void insertPosition(UInt64 position);
void insertPositionsRange(const IColumn & column, size_t offset, size_t limit);
@ -185,6 +205,7 @@ public:
UInt64 getMaxPositionForCurrentType() const;
static size_t getSizeOfIndexType(const IColumn & column, size_t hint);
size_t getSizeOfIndexType() const { return size_of_type; }
void check(size_t max_dictionary_size);
void checkSizeOfType();
@ -192,6 +213,8 @@ public:
ColumnPtr detachPositions() { return std::move(positions); }
void attachPositions(ColumnPtr positions_);
void countKeys(ColumnUInt64::Container & counts) const;
private:
ColumnPtr positions;
size_t size_of_type = 0;
@ -202,6 +225,9 @@ public:
template <typename IndexType>
typename ColumnVector<IndexType>::Container & getPositionsData();
template <typename IndexType>
const typename ColumnVector<IndexType>::Container & getPositionsData() const;
template <typename IndexType>
void convertPositions();

View File

@ -1,5 +1,7 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/ColumnsNumber.h>
#include <Common/UInt128.h>
namespace DB
{
@ -16,6 +18,11 @@ public:
/// The same as getNestedColumn, but removes null map if nested column is nullable.
virtual const ColumnPtr & getNestedNotNullableColumn() const = 0;
/// Returns array with StringRefHash calculated for each row of getNestedNotNullableColumn() column.
/// Returns nullptr if nested column doesn't contain strings. Otherwise calculates hash (if it wasn't).
/// Uses thread-safe cache.
virtual const UInt64 * tryGetSavedHash() const = 0;
size_t size() const override { return getNestedColumn()->size(); }
/// Appends new value at the end of column (column's size is increased by 1).
@ -50,6 +57,9 @@ public:
virtual size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) = 0;
/// Returns dictionary hash which is sipHash is applied to each row of nested column.
virtual UInt128 getHash() const = 0;
const char * getFamilyName() const override { return "ColumnUnique"; }
void insert(const Field &) override

View File

@ -246,7 +246,7 @@ class ReverseIndex
{
public:
explicit ReverseIndex(UInt64 num_prefix_rows_to_skip, UInt64 base_index)
: num_prefix_rows_to_skip(num_prefix_rows_to_skip), base_index(base_index) {}
: num_prefix_rows_to_skip(num_prefix_rows_to_skip), base_index(base_index), saved_hash_ptr(nullptr) {}
void setColumn(ColumnType * column_);
@ -261,6 +261,26 @@ public:
ColumnType * getColumn() const { return column; }
size_t size() const;
const UInt64 * tryGetSavedHash() const
{
if (!use_saved_hash)
return nullptr;
UInt64 * ptr = saved_hash_ptr.load();
if (!ptr)
{
auto hash = calcHashes();
ptr = &hash->getData()[0];
UInt64 * expected = nullptr;
if(saved_hash_ptr.compare_exchange_strong(expected, ptr))
saved_hash = std::move(hash);
else
ptr = expected;
}
return ptr;
}
size_t allocatedBytes() const { return index ? index->getBufferSizeInBytes() : 0; }
private:
@ -272,7 +292,8 @@ private:
/// Lazy initialized.
std::unique_ptr<IndexMapType> index;
ColumnUInt64::MutablePtr saved_hash;
mutable ColumnUInt64::MutablePtr saved_hash;
mutable std::atomic<UInt64 *> saved_hash_ptr;
void buildIndex();
@ -287,15 +308,19 @@ private:
else
return StringRefHash()(ref);
}
};
ColumnUInt64::MutablePtr calcHashes() const;
};
template <typename IndexType, typename ColumnType>
void ReverseIndex<IndexType, ColumnType>:: setColumn(ColumnType * column_)
{
if (column != column_)
{
index = nullptr;
saved_hash = nullptr;
}
column = column_;
}
@ -322,7 +347,7 @@ void ReverseIndex<IndexType, ColumnType>::buildIndex()
index = std::make_unique<IndexMapType>(size);
if constexpr (use_saved_hash)
saved_hash = ColumnUInt64::create(size);
saved_hash = calcHashes();
auto & state = index->getState();
state.index_column = column;
@ -336,10 +361,11 @@ void ReverseIndex<IndexType, ColumnType>::buildIndex()
for (auto row : ext::range(num_prefix_rows_to_skip, size))
{
auto hash = getHash(column->getDataAt(row));
UInt64 hash;
if constexpr (use_saved_hash)
saved_hash->getElement(row) = hash;
hash = saved_hash->getElement(row);
else
hash = getHash(column->getDataAt(row));
index->emplace(row + base_index, iterator, inserted, hash);
@ -348,6 +374,21 @@ void ReverseIndex<IndexType, ColumnType>::buildIndex()
}
}
template <typename IndexType, typename ColumnType>
ColumnUInt64::MutablePtr ReverseIndex<IndexType, ColumnType>::calcHashes() const
{
if (!column)
throw Exception("ReverseIndex can't build index because index column wasn't set.", ErrorCodes::LOGICAL_ERROR);
auto size = column->size();
auto hash = ColumnUInt64::create(size);
for (auto row : ext::range(0, size))
hash->getElement(row) = getHash(column->getDataAt(row));
return std::move(hash);
}
template <typename IndexType, typename ColumnType>
UInt64 ReverseIndex<IndexType, ColumnType>::insert(UInt64 from_position)
{

View File

@ -252,9 +252,8 @@ public:
}
iterator ALWAYS_INLINE find(Key x)
iterator ALWAYS_INLINE find(Key x, size_t hash_value)
{
size_t hash_value = hash(x);
size_t buck = getBucketFromHash(hash_value);
typename Impl::iterator found = impls[buck].find(x, hash_value);
@ -264,9 +263,8 @@ public:
}
const_iterator ALWAYS_INLINE find(Key x) const
const_iterator ALWAYS_INLINE find(Key x, size_t hash_value) const
{
size_t hash_value = hash(x);
size_t buck = getBucketFromHash(hash_value);
typename Impl::const_iterator found = impls[buck].find(x, hash_value);
@ -276,6 +274,10 @@ public:
}
iterator ALWAYS_INLINE find(Key x) { return find(x, hash(x)); }
const_iterator ALWAYS_INLINE find(Key x) const { return find(x, hash(x)); }
void write(DB::WriteBuffer & wb) const
{
for (size_t i = 0; i < NUM_BUCKETS; ++i)

View File

@ -844,4 +844,12 @@ void registerDataTypeWithDictionary(DataTypeFactory & factory)
factory.registerDataType("LowCardinality", create);
}
DataTypePtr removeLowCardinality(const DataTypePtr & type)
{
if (auto * type_with_dictionary = typeid_cast<const DataTypeWithDictionary *>(type.get()))
return type_with_dictionary->getDictionaryType();
return type;
}
}

View File

@ -161,4 +161,7 @@ private:
static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const Creator & creator);
};
/// Returns dictionary type if type is DataTypeWithDictionary, type otherwise.
DataTypePtr removeLowCardinality(const DataTypePtr & type);
}

View File

@ -459,6 +459,7 @@ DataTypePtr FunctionBuilderImpl::getReturnType(const ColumnsWithTypeAndName & ar
{
bool has_low_cardinality = false;
size_t num_full_low_cardinality_columns = 0;
size_t num_full_ordinary_columns = 0;
ColumnsWithTypeAndName args_without_dictionary(arguments);
@ -476,9 +477,12 @@ DataTypePtr FunctionBuilderImpl::getReturnType(const ColumnsWithTypeAndName & ar
if (!is_const)
++num_full_low_cardinality_columns;
}
else if (!is_const)
++num_full_ordinary_columns;
}
if (canBeExecutedOnLowCardinalityDictionary() && has_low_cardinality && num_full_low_cardinality_columns <= 1)
if (canBeExecutedOnLowCardinalityDictionary() && has_low_cardinality
&& num_full_low_cardinality_columns <= 1 && num_full_ordinary_columns == 0)
return std::make_shared<DataTypeWithDictionary>(getReturnTypeWithoutDictionary(args_without_dictionary));
else
return getReturnTypeWithoutDictionary(args_without_dictionary);

View File

@ -12,6 +12,7 @@
#include <Columns/IColumn.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnWithDictionary.h>
template <>
@ -65,9 +66,11 @@ using KeysNullMap = std::array<UInt8, getBitmapSize<T>()>;
/// Pack into a binary blob of type T a set of fixed-size keys. Granted that all the keys fit into the
/// binary blob, they are disposed in it consecutively.
template <typename T>
template <typename T, bool has_low_cardinality = false>
static inline T ALWAYS_INLINE packFixed(
size_t i, size_t keys_size, const ColumnRawPtrs & key_columns, const Sizes & key_sizes)
size_t i, size_t keys_size, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
const ColumnRawPtrs * low_cardinality_positions [[maybe_unused]] = nullptr,
const Sizes * low_cardinality_sizes [[maybe_unused]] = nullptr)
{
union
{
@ -79,26 +82,43 @@ static inline T ALWAYS_INLINE packFixed(
for (size_t j = 0; j < keys_size; ++j)
{
size_t index = i;
const IColumn * column = key_columns[j];
if constexpr (has_low_cardinality)
{
if (const IColumn * positions = (*low_cardinality_positions)[j])
{
switch ((*low_cardinality_sizes)[j])
{
case sizeof(UInt8): index = static_cast<const ColumnUInt8 *>(positions)->getElement(i); break;
case sizeof(UInt16): index = static_cast<const ColumnUInt16 *>(positions)->getElement(i); break;
case sizeof(UInt32): index = static_cast<const ColumnUInt32 *>(positions)->getElement(i); break;
case sizeof(UInt64): index = static_cast<const ColumnUInt64 *>(positions)->getElement(i); break;
default: throw Exception("Unexpected size of index type for low cardinality column.", ErrorCodes::LOGICAL_ERROR);
}
}
}
switch (key_sizes[j])
{
case 1:
memcpy(bytes + offset, &static_cast<const ColumnUInt8 *>(key_columns[j])->getData()[i], 1);
memcpy(bytes + offset, &static_cast<const ColumnUInt8 *>(column)->getData()[index], 1);
offset += 1;
break;
case 2:
memcpy(bytes + offset, &static_cast<const ColumnUInt16 *>(key_columns[j])->getData()[i], 2);
memcpy(bytes + offset, &static_cast<const ColumnUInt16 *>(column)->getData()[index], 2);
offset += 2;
break;
case 4:
memcpy(bytes + offset, &static_cast<const ColumnUInt32 *>(key_columns[j])->getData()[i], 4);
memcpy(bytes + offset, &static_cast<const ColumnUInt32 *>(column)->getData()[index], 4);
offset += 4;
break;
case 8:
memcpy(bytes + offset, &static_cast<const ColumnUInt64 *>(key_columns[j])->getData()[i], 8);
memcpy(bytes + offset, &static_cast<const ColumnUInt64 *>(column)->getData()[index], 8);
offset += 8;
break;
default:
memcpy(bytes + offset, &static_cast<const ColumnFixedString *>(key_columns[j])->getChars()[i * key_sizes[j]], key_sizes[j]);
memcpy(bytes + offset, &static_cast<const ColumnFixedString *>(column)->getChars()[index * key_sizes[j]], key_sizes[j]);
offset += key_sizes[j];
}
}

View File

@ -24,6 +24,9 @@
#include <common/demangle.h>
#if __has_include(<Interpreters/config_compile.h>)
#include <Interpreters/config_compile.h>
#include <Columns/ColumnWithDictionary.h>
#include <DataTypes/DataTypeWithDictionary.h>
#endif
@ -185,6 +188,9 @@ Aggregator::Aggregator(const Params & params_)
}
method_chosen = chooseAggregationMethod();
AggregationStateCache::Settings cache_settings;
cache_settings.max_threads = params.max_threads;
aggregation_state_cache = AggregatedDataVariants::createCache(method_chosen, cache_settings);
}
@ -393,18 +399,25 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
DataTypes types_removed_nullable;
types_removed_nullable.reserve(params.keys.size());
bool has_nullable_key = false;
bool has_low_cardinality = false;
for (const auto & pos : params.keys)
{
const auto & type = (params.src_header ? params.src_header : params.intermediate_header).safeGetByPosition(pos).type;
DataTypePtr type = (params.src_header ? params.src_header : params.intermediate_header).safeGetByPosition(pos).type;
if (type->withDictionary())
{
has_low_cardinality = true;
type = removeLowCardinality(type);
}
if (type->isNullable())
{
has_nullable_key = true;
types_removed_nullable.push_back(removeNullable(type));
type = removeNullable(type);
}
else
types_removed_nullable.push_back(type);
types_removed_nullable.push_back(type);
}
/** Returns ordinary (not two-level) methods, because we start from them.
@ -430,7 +443,7 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
if (has_nullable_key)
{
if (params.keys_size == num_fixed_contiguous_keys)
if (params.keys_size == num_fixed_contiguous_keys && !has_low_cardinality)
{
/// Pack if possible all the keys along with information about which key values are nulls
/// into a fixed 16- or 32-byte blob.
@ -450,6 +463,19 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
if (params.keys_size == 1 && types_removed_nullable[0]->isValueRepresentedByNumber())
{
size_t size_of_field = types_removed_nullable[0]->getSizeOfValueInMemory();
if (has_low_cardinality)
{
if (size_of_field == 1)
return AggregatedDataVariants::Type::low_cardinality_key8;
if (size_of_field == 2)
return AggregatedDataVariants::Type::low_cardinality_key16;
if (size_of_field == 4)
return AggregatedDataVariants::Type::low_cardinality_key32;
if (size_of_field == 8)
return AggregatedDataVariants::Type::low_cardinality_key64;
}
if (size_of_field == 1)
return AggregatedDataVariants::Type::key8;
if (size_of_field == 2)
@ -466,6 +492,14 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
/// If all keys fits in N bits, will use hash table with all keys packed (placed contiguously) to single N-bit key.
if (params.keys_size == num_fixed_contiguous_keys)
{
if (has_low_cardinality)
{
if (keys_bytes <= 16)
return AggregatedDataVariants::Type::low_cardinality_keys128;
if (keys_bytes <= 32)
return AggregatedDataVariants::Type::low_cardinality_keys256;
}
if (keys_bytes <= 16)
return AggregatedDataVariants::Type::keys128;
if (keys_bytes <= 32)
@ -474,10 +508,20 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
/// If single string key - will use hash table with references to it. Strings itself are stored separately in Arena.
if (params.keys_size == 1 && isString(types_removed_nullable[0]))
return AggregatedDataVariants::Type::key_string;
{
if (has_low_cardinality)
return AggregatedDataVariants::Type::low_cardinality_key_string;
else
return AggregatedDataVariants::Type::key_string;
}
if (params.keys_size == 1 && isFixedString(types_removed_nullable[0]))
return AggregatedDataVariants::Type::key_fixed_string;
{
if (has_low_cardinality)
return AggregatedDataVariants::Type::low_cardinality_key_fixed_string;
else
return AggregatedDataVariants::Type::key_fixed_string;
}
return AggregatedDataVariants::Type::serialized;
}
@ -522,7 +566,10 @@ void NO_INLINE Aggregator::executeImpl(
AggregateDataPtr overflow_row) const
{
typename Method::State state;
state.init(key_columns);
if constexpr (Method::low_cardinality_optimization)
state.init(key_columns, aggregation_state_cache);
else
state.init(key_columns);
if (!no_more_keys)
executeImplCase<false>(method, state, aggregates_pool, rows, key_columns, aggregate_instructions, keys, overflow_row);
@ -547,17 +594,22 @@ void NO_INLINE Aggregator::executeImplCase(
AggregateDataPtr overflow_row) const
{
/// NOTE When editing this code, also pay attention to SpecializedAggregator.h.
/// TODO for low cardinality optimization.
/// For all rows.
typename Method::iterator it;
typename Method::Key prev_key;
AggregateDataPtr value = nullptr;
for (size_t i = 0; i < rows; ++i)
{
bool inserted; /// Inserted a new key, or was this key already?
bool overflow = false; /// The new key did not fit in the hash table because of no_more_keys.
bool inserted = false; /// Inserted a new key, or was this key already?
/// Get the key to insert into the hash table.
typename Method::Key key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
typename Method::Key key;
if constexpr (!Method::low_cardinality_optimization)
key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
AggregateDataPtr * aggregate_data = nullptr;
typename Method::iterator it; /// Is not used if Method::low_cardinality_optimization
if (!no_more_keys) /// Insert.
{
@ -567,7 +619,6 @@ void NO_INLINE Aggregator::executeImplCase(
if (i != 0 && key == prev_key)
{
/// Add values to the aggregate functions.
AggregateDataPtr value = Method::getAggregateData(it->second);
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
(*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
@ -578,19 +629,32 @@ void NO_INLINE Aggregator::executeImplCase(
prev_key = key;
}
method.data.emplace(key, it, inserted);
if constexpr (Method::low_cardinality_optimization)
aggregate_data = state.emplaceKeyFromRow(method.data, i, inserted, params.keys_size, keys, *aggregates_pool);
else
{
method.data.emplace(key, it, inserted);
aggregate_data = &Method::getAggregateData(it->second);
}
}
else
{
/// Add only if the key already exists.
inserted = false;
it = method.data.find(key);
if (method.data.end() == it)
overflow = true;
if constexpr (Method::low_cardinality_optimization)
aggregate_data = state.findFromRow(method.data, i);
else
{
it = method.data.find(key);
if (method.data.end() != it)
aggregate_data = &Method::getAggregateData(it->second);
}
}
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
/// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do.
if (no_more_keys && overflow && !overflow_row)
if (!aggregate_data && !overflow_row)
{
method.onExistingKey(key, keys, *aggregates_pool);
continue;
@ -599,21 +663,23 @@ void NO_INLINE Aggregator::executeImplCase(
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (inserted)
{
AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second);
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
aggregate_data = nullptr;
*aggregate_data = nullptr;
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
if constexpr (!Method::low_cardinality_optimization)
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
aggregate_data = place;
*aggregate_data = place;
if constexpr (Method::low_cardinality_optimization)
state.cacheAggregateData(i, place);
}
else
method.onExistingKey(key, keys, *aggregates_pool);
AggregateDataPtr value = (!no_more_keys || !overflow) ? Method::getAggregateData(it->second) : overflow_row;
value = aggregate_data ? *aggregate_data : overflow_row;
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
@ -660,6 +726,21 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
/// How to perform the aggregation?
if (result.empty())
{
result.init(method_chosen);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
if (params.compiler)
compileIfPossible(result.type);
}
if (isCancelled())
return true;
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
@ -667,6 +748,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
* To make them work anyway, we materialize them.
*/
Columns materialized_columns;
// ColumnRawPtrs key_counts;
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
@ -678,6 +760,16 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
materialized_columns.push_back(converted);
key_columns[i] = materialized_columns.back().get();
}
if (const auto * column_with_dictionary = typeid_cast<const ColumnWithDictionary *>(key_columns[i]))
{
if (!result.isLowCardinality())
{
materialized_columns.push_back(column_with_dictionary->convertToFullColumn());
key_columns[i] = materialized_columns.back().get();
}
//key_counts.push_back(materialized_columns.back().get());
}
}
AggregateFunctionInstructions aggregate_functions_instructions(params.aggregates_size + 1);
@ -694,6 +786,12 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
materialized_columns.push_back(converted);
aggregate_columns[i][j] = materialized_columns.back().get();
}
if (auto * col_with_dict = typeid_cast<const ColumnWithDictionary *>(aggregate_columns[i][j]))
{
materialized_columns.push_back(col_with_dict->convertToFullColumn());
aggregate_columns[i][j] = materialized_columns.back().get();
}
}
aggregate_functions_instructions[i].that = aggregate_functions[i];
@ -707,21 +805,6 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
size_t rows = block.rows();
/// How to perform the aggregation?
if (result.empty())
{
result.init(method_chosen);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
if (params.compiler)
compileIfPossible(result.type);
}
if (isCancelled())
return true;
if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
{
AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
@ -1843,7 +1926,10 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.safeGetByPosition(params.keys_size + i).column).getData();
typename Method::State state;
state.init(key_columns);
if constexpr (Method::low_cardinality_optimization)
state.init(key_columns, aggregation_state_cache);
else
state.init(key_columns);
/// For all rows.
StringRefs keys(params.keys_size);
@ -1851,27 +1937,41 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
for (size_t i = 0; i < rows; ++i)
{
typename Table::iterator it;
AggregateDataPtr * aggregate_data = nullptr;
bool inserted; /// Inserted a new key, or was this key already?
bool overflow = false; /// The new key did not fit in the hash table because of no_more_keys.
bool inserted = false; /// Inserted a new key, or was this key already?
/// Get the key to insert into the hash table.
auto key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
typename Method::Key key;
if constexpr (!Method::low_cardinality_optimization)
key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
if (!no_more_keys)
{
data.emplace(key, it, inserted);
if constexpr (Method::low_cardinality_optimization)
aggregate_data = state.emplaceKeyFromRow(data, i, inserted, params.keys_size, keys, *aggregates_pool);
else
{
data.emplace(key, it, inserted);
aggregate_data = &Method::getAggregateData(it->second);
}
}
else
{
inserted = false;
it = data.find(key);
if (data.end() == it)
overflow = true;
if constexpr (Method::low_cardinality_optimization)
aggregate_data = state.findFromRow(data, i);
else
{
it = data.find(key);
if (data.end() != it)
aggregate_data = &Method::getAggregateData(it->second);
}
}
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
/// If the key does not fit, and the data does not need to be aggregated into a separate row, then there's nothing to do.
if (no_more_keys && overflow && !overflow_row)
if (!aggregate_data && !overflow_row)
{
method.onExistingKey(key, keys, *aggregates_pool);
continue;
@ -1880,19 +1980,22 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (inserted)
{
AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second);
aggregate_data = nullptr;
*aggregate_data = nullptr;
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
if constexpr (!Method::low_cardinality_optimization)
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
aggregate_data = place;
*aggregate_data = place;
if constexpr (Method::low_cardinality_optimization)
state.cacheAggregateData(i, place);
}
else
method.onExistingKey(key, keys, *aggregates_pool);
AggregateDataPtr value = (!no_more_keys || !overflow) ? Method::getAggregateData(it->second) : overflow_row;
AggregateDataPtr value = aggregate_data ? *aggregate_data : overflow_row;
/// Merge state of aggregate functions.
for (size_t j = 0; j < params.aggregates_size; ++j)
@ -2221,7 +2324,10 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
std::vector<Block> & destinations) const
{
typename Method::State state;
state.init(key_columns);
if constexpr (Method::low_cardinality_optimization)
state.init(key_columns, aggregation_state_cache);
else
state.init(key_columns);
size_t rows = source.rows();
size_t columns = source.columns();

View File

@ -13,6 +13,8 @@
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/TwoLevelHashMap.h>
#include <common/ThreadPool.h>
#include <Common/UInt128.h>
#include <Common/LRUCache.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/SizeLimits.h>
@ -26,6 +28,7 @@
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnWithDictionary.h>
namespace DB
@ -85,6 +88,18 @@ using AggregatedDataWithStringKeyHash64 = HashMapWithSavedHash<StringRef, Aggreg
using AggregatedDataWithKeys128Hash64 = HashMap<UInt128, AggregateDataPtr, UInt128Hash>;
using AggregatedDataWithKeys256Hash64 = HashMap<UInt256, AggregateDataPtr, UInt256Hash>;
/// Cache which can be used by aggregations method's states. Object is shared in all threads.
struct AggregationStateCache
{
virtual ~AggregationStateCache() = default;
struct Settings
{
size_t max_threads;
};
};
using AggregationStateCachePtr = std::shared_ptr<AggregationStateCache>;
/// For the case where there is one numeric key.
template <typename FieldType, typename TData> /// UInt8/16/32/64 for any type with corresponding bit width.
@ -146,6 +161,8 @@ struct AggregationMethodOneNumber
/** Do not use optimization for consecutive keys.
*/
static const bool no_consecutive_keys_optimization = false;
/// Use optimization for low cardinality.
static const bool low_cardinality_optimization = false;
/** Insert the key from the hash table into columns.
*/
@ -153,6 +170,14 @@ struct AggregationMethodOneNumber
{
static_cast<ColumnVector<FieldType> *>(key_columns[0].get())->insertData(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
/// Get StringRef from value which can be inserted into column.
static StringRef getValueRef(const typename Data::value_type & value)
{
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
@ -211,11 +236,19 @@ struct AggregationMethodString
static ALWAYS_INLINE void onExistingKey(const Key & /*key*/, StringRefs & /*keys*/, Arena & /*pool*/) {}
static const bool no_consecutive_keys_optimization = false;
static const bool low_cardinality_optimization = false;
static StringRef getValueRef(const typename Data::value_type & value)
{
return StringRef(value.first.data, value.first.size);
}
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &)
{
key_columns[0]->insertData(value.first.data, value.first.size);
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
@ -272,13 +305,289 @@ struct AggregationMethodFixedString
static ALWAYS_INLINE void onExistingKey(const Key &, StringRefs &, Arena &) {}
static const bool no_consecutive_keys_optimization = false;
static const bool low_cardinality_optimization = false;
static StringRef getValueRef(const typename Data::value_type & value)
{
return StringRef(value.first.data, value.first.size);
}
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &)
{
key_columns[0]->insertData(value.first.data, value.first.size);
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
/// Cache stores dictionaries and saved_hash per dictionary key.
class LowCardinalityDictionaryCache : public AggregationStateCache
{
public:
/// Will assume that dictionaries with same hash has the same keys.
/// Just in case, check that they have also the same size.
struct DictionaryKey
{
UInt128 hash;
UInt64 size;
bool operator== (const DictionaryKey & other) const { return hash == other.hash && size == other.size; }
};
struct DictionaryKeyHash
{
size_t operator()(const DictionaryKey & key) const
{
SipHash hash;
hash.update(key.hash.low);
hash.update(key.hash.high);
hash.update(key.size);
return hash.get64();
}
};
struct CachedValues
{
/// Store ptr to dictionary to be sure it won't be deleted.
ColumnPtr dictionary_holder;
/// Hashes for dictionary keys.
const UInt64 * saved_hash = nullptr;
};
using CachedValuesPtr = std::shared_ptr<CachedValues>;
explicit LowCardinalityDictionaryCache(const AggregationStateCache::Settings & settings) : cache(settings.max_threads) {}
CachedValuesPtr get(const DictionaryKey & key) { return cache.get(key); }
void set(const DictionaryKey & key, const CachedValuesPtr & mapped) { cache.set(key, mapped); }
private:
using Cache = LRUCache<DictionaryKey, CachedValues, DictionaryKeyHash>;
Cache cache;
};
/// Single low cardinality column.
template <typename SingleColumnMethod>
struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
{
using Base = SingleColumnMethod;
using BaseState = typename Base::State;
using Data = typename Base::Data;
using Key = typename Base::Key;
using Mapped = typename Base::Mapped;
using iterator = typename Base::iterator;
using const_iterator = typename Base::const_iterator;
using Base::data;
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & settings)
{
return std::make_shared<LowCardinalityDictionaryCache>(settings);
}
AggregationMethodSingleLowCardinalityColumn() = default;
template <typename Other>
explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {}
struct State : public BaseState
{
ColumnRawPtrs key;
const IColumn * positions = nullptr;
size_t size_of_index_type = 0;
/// saved hash is from current column or from cache.
const UInt64 * saved_hash = nullptr;
/// Hold dictionary in case saved_hash is from cache to be sure it won't be deleted.
ColumnPtr dictionary_holder;
/// Cache AggregateDataPtr for current column in order to decrease the number of hash table usages.
PaddedPODArray<AggregateDataPtr> aggregate_data;
PaddedPODArray<AggregateDataPtr> * aggregate_data_cache;
void init(ColumnRawPtrs &)
{
throw Exception("Expected cache for AggregationMethodSingleLowCardinalityColumn::init", ErrorCodes::LOGICAL_ERROR);
}
void init(ColumnRawPtrs & key_columns, const AggregationStateCachePtr & cache_ptr)
{
auto column = typeid_cast<const ColumnWithDictionary *>(key_columns[0]);
if (!column)
throw Exception("Invalid aggregation key type for AggregationMethodSingleLowCardinalityColumn method. "
"Excepted LowCardinality, got " + key_columns[0]->getName(), ErrorCodes::LOGICAL_ERROR);
if (!cache_ptr)
throw Exception("Cache wasn't created for AggregationMethodSingleLowCardinalityColumn", ErrorCodes::LOGICAL_ERROR);
auto cache = typeid_cast<LowCardinalityDictionaryCache *>(cache_ptr.get());
if (!cache)
{
const auto & cached_val = *cache_ptr;
throw Exception("Invalid type for AggregationMethodSingleLowCardinalityColumn cache: "
+ demangle(typeid(cached_val).name()), ErrorCodes::LOGICAL_ERROR);
}
auto * dict = column->getDictionary().getNestedColumn().get();
key = {dict};
bool is_shared_dict = column->isSharedDictionary();
typename LowCardinalityDictionaryCache::DictionaryKey dictionary_key;
typename LowCardinalityDictionaryCache::CachedValuesPtr cached_values;
if (is_shared_dict)
{
dictionary_key = {column->getDictionary().getHash(), dict->size()};
cached_values = cache->get(dictionary_key);
}
if (cached_values)
{
saved_hash = cached_values->saved_hash;
dictionary_holder = cached_values->dictionary_holder;
}
else
{
saved_hash = column->getDictionary().tryGetSavedHash();
dictionary_holder = column->getDictionaryPtr();
if (is_shared_dict)
{
cached_values = std::make_shared<typename LowCardinalityDictionaryCache::CachedValues>();
cached_values->saved_hash = saved_hash;
cached_values->dictionary_holder = dictionary_holder;
cache->set(dictionary_key, cached_values);
}
}
AggregateDataPtr default_data = nullptr;
aggregate_data.assign(key[0]->size(), default_data);
aggregate_data_cache = &aggregate_data;
size_of_index_type = column->getSizeOfIndexType();
positions = column->getIndexesPtr().get();
BaseState::init(key);
}
ALWAYS_INLINE size_t getIndexAt(size_t row) const
{
switch (size_of_index_type)
{
case sizeof(UInt8): return static_cast<const ColumnUInt8 *>(positions)->getElement(row);
case sizeof(UInt16): return static_cast<const ColumnUInt16 *>(positions)->getElement(row);
case sizeof(UInt32): return static_cast<const ColumnUInt32 *>(positions)->getElement(row);
case sizeof(UInt64): return static_cast<const ColumnUInt64 *>(positions)->getElement(row);
default: throw Exception("Unexpected size of index type for low cardinality column.", ErrorCodes::LOGICAL_ERROR);
}
}
/// Get the key from the key columns for insertion into the hash table.
ALWAYS_INLINE Key getKey(
const ColumnRawPtrs & /*key_columns*/,
size_t /*keys_size*/,
size_t i,
const Sizes & key_sizes,
StringRefs & keys,
Arena & pool) const
{
size_t row = getIndexAt(i);
return BaseState::getKey(key, 1, row, key_sizes, keys, pool);
}
template <typename D>
ALWAYS_INLINE AggregateDataPtr * emplaceKeyFromRow(
D & data,
size_t i,
bool & inserted,
size_t keys_size,
StringRefs & keys,
Arena & pool)
{
size_t row = getIndexAt(i);
if ((*aggregate_data_cache)[row])
{
inserted = false;
return &(*aggregate_data_cache)[row];
}
else
{
ColumnRawPtrs key_columns;
Sizes key_sizes;
auto key = getKey(key_columns, 0, i, key_sizes, keys, pool);
typename D::iterator it;
if (saved_hash)
data.emplace(key, it, inserted, saved_hash[row]);
else
data.emplace(key, it, inserted);
if (inserted)
Base::onNewKey(*it, keys_size, keys, pool);
else
(*aggregate_data_cache)[row] = Base::getAggregateData(it->second);
return &Base::getAggregateData(it->second);
}
}
ALWAYS_INLINE void cacheAggregateData(size_t i, AggregateDataPtr data)
{
size_t row = getIndexAt(i);
(*aggregate_data_cache)[row] = data;
}
template <typename D>
ALWAYS_INLINE AggregateDataPtr * findFromRow(D & data, size_t i)
{
size_t row = getIndexAt(i);
if (!(*aggregate_data_cache)[row])
{
ColumnRawPtrs key_columns;
Sizes key_sizes;
StringRefs keys;
Arena pool;
auto key = getKey(key_columns, 0, i, key_sizes, keys, pool);
typename D::iterator it;
if (saved_hash)
it = data.find(key, saved_hash[row]);
else
it = data.find(key);
if (it != data.end())
(*aggregate_data_cache)[row] = Base::getAggregateData(it->second);
}
return &(*aggregate_data_cache)[row];
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return Base::getAggregateData(value); }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return Base::getAggregateData(value); }
static void onNewKey(typename Data::value_type & value, size_t keys_size, StringRefs & keys, Arena & pool)
{
return Base::onNewKey(value, keys_size, keys, pool);
}
static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool)
{
return Base::onExistingKey(key, keys, pool);
}
static const bool no_consecutive_keys_optimization = true;
static const bool low_cardinality_optimization = true;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t /*keys_size*/, const Sizes & /*key_sizes*/)
{
auto ref = Base::getValueRef(value);
static_cast<ColumnWithDictionary *>(key_columns[0].get())->insertData(ref.data, ref.size);
}
};
namespace aggregator_impl
{
@ -376,8 +685,20 @@ protected:
}
// Oprional mask for low cardinality columns.
template <bool has_low_cardinality>
struct LowCardinalityKeys
{
ColumnRawPtrs nested_columns;
ColumnRawPtrs positions;
Sizes position_sizes;
};
template <>
struct LowCardinalityKeys<false> {};
/// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits.
template <typename TData, bool has_nullable_keys_ = false>
template <typename TData, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false>
struct AggregationMethodKeysFixed
{
using Data = TData;
@ -386,6 +707,7 @@ struct AggregationMethodKeysFixed
using iterator = typename Data::iterator;
using const_iterator = typename Data::const_iterator;
static constexpr bool has_nullable_keys = has_nullable_keys_;
static constexpr bool has_low_cardinality = has_low_cardinality_;
Data data;
@ -396,11 +718,31 @@ struct AggregationMethodKeysFixed
class State final : private aggregator_impl::BaseStateKeysFixed<Key, has_nullable_keys>
{
LowCardinalityKeys<has_low_cardinality> low_cardinality_keys;
public:
using Base = aggregator_impl::BaseStateKeysFixed<Key, has_nullable_keys>;
void init(ColumnRawPtrs & key_columns)
{
if constexpr (has_low_cardinality)
{
low_cardinality_keys.nested_columns.resize(key_columns.size());
low_cardinality_keys.positions.assign(key_columns.size(), nullptr);
low_cardinality_keys.position_sizes.resize(key_columns.size());
for (size_t i = 0; i < key_columns.size(); ++i)
{
if (auto * low_cardinality_col = typeid_cast<const ColumnWithDictionary *>(key_columns[i]))
{
low_cardinality_keys.nested_columns[i] = low_cardinality_col->getDictionary().getNestedColumn().get();
low_cardinality_keys.positions[i] = &low_cardinality_col->getIndexes();
low_cardinality_keys.position_sizes[i] = low_cardinality_col->getSizeOfIndexType();
}
else
low_cardinality_keys.nested_columns[i] = key_columns[i];
}
}
if (has_nullable_keys)
Base::init(key_columns);
}
@ -419,7 +761,13 @@ struct AggregationMethodKeysFixed
return packFixed<Key>(i, keys_size, Base::getActualColumns(), key_sizes, bitmap);
}
else
{
if constexpr (has_low_cardinality)
return packFixed<Key, true>(i, keys_size, low_cardinality_keys.nested_columns, key_sizes,
&low_cardinality_keys.positions, &low_cardinality_keys.position_sizes);
return packFixed<Key>(i, keys_size, key_columns, key_sizes);
}
}
};
@ -433,6 +781,7 @@ struct AggregationMethodKeysFixed
static ALWAYS_INLINE void onExistingKey(const Key &, StringRefs &, Arena &) {}
static const bool no_consecutive_keys_optimization = false;
static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t keys_size, const Sizes & key_sizes)
{
@ -482,6 +831,8 @@ struct AggregationMethodKeysFixed
}
}
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
@ -538,6 +889,7 @@ struct AggregationMethodSerialized
/// If the key already was, it is removed from the pool (overwritten), and the next key can not be compared with it.
static const bool no_consecutive_keys_optimization = true;
static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t keys_size, const Sizes &)
{
@ -545,6 +897,8 @@ struct AggregationMethodSerialized
for (size_t i = 0; i < keys_size; ++i)
pos = key_columns[i]->deserializeAndInsertFromArena(pos);
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
@ -614,6 +968,24 @@ struct AggregatedDataVariants : private boost::noncopyable
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, true>> nullable_keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, true>> nullable_keys256_two_level;
/// Support for low cardinality.
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key>>> low_cardinality_key8;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key>>> low_cardinality_key16;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>>> low_cardinality_key32;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>>> low_cardinality_key64;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithStringKey>>> low_cardinality_key_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithStringKey>>> low_cardinality_key_fixed_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>>> low_cardinality_key32_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>>> low_cardinality_key64_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithStringKeyTwoLevel>>> low_cardinality_key_string_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithStringKeyTwoLevel>>> low_cardinality_key_fixed_string_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, false, true>> low_cardinality_keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, false, true>> low_cardinality_keys256;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, false, true>> low_cardinality_keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, false, true>> low_cardinality_keys256_two_level;
/// In this and similar macros, the option without_key is not considered.
#define APPLY_FOR_AGGREGATED_VARIANTS(M) \
M(key8, false) \
@ -642,6 +1014,20 @@ struct AggregatedDataVariants : private boost::noncopyable
M(nullable_keys256, false) \
M(nullable_keys128_two_level, true) \
M(nullable_keys256_two_level, true) \
M(low_cardinality_key8, false) \
M(low_cardinality_key16, false) \
M(low_cardinality_key32, false) \
M(low_cardinality_key64, false) \
M(low_cardinality_keys128, false) \
M(low_cardinality_keys256, false) \
M(low_cardinality_key_string, false) \
M(low_cardinality_key_fixed_string, false) \
M(low_cardinality_key32_two_level, true) \
M(low_cardinality_key64_two_level, true) \
M(low_cardinality_keys128_two_level, true) \
M(low_cardinality_keys256_two_level, true) \
M(low_cardinality_key_string_two_level, true) \
M(low_cardinality_key_fixed_string_two_level, true) \
enum class Type
{
@ -759,6 +1145,12 @@ struct AggregatedDataVariants : private boost::noncopyable
M(serialized) \
M(nullable_keys128) \
M(nullable_keys256) \
M(low_cardinality_key32) \
M(low_cardinality_key64) \
M(low_cardinality_keys128) \
M(low_cardinality_keys256) \
M(low_cardinality_key_string) \
M(low_cardinality_key_fixed_string) \
#define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
M(key8) \
@ -769,6 +1161,8 @@ struct AggregatedDataVariants : private boost::noncopyable
M(keys128_hash64) \
M(keys256_hash64) \
M(serialized_hash64) \
M(low_cardinality_key8) \
M(low_cardinality_key16) \
#define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \
APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
@ -800,7 +1194,65 @@ struct AggregatedDataVariants : private boost::noncopyable
M(keys256_two_level) \
M(serialized_two_level) \
M(nullable_keys128_two_level) \
M(nullable_keys256_two_level)
M(nullable_keys256_two_level) \
M(low_cardinality_key32_two_level) \
M(low_cardinality_key64_two_level) \
M(low_cardinality_keys128_two_level) \
M(low_cardinality_keys256_two_level) \
M(low_cardinality_key_string_two_level) \
M(low_cardinality_key_fixed_string_two_level) \
#define APPLY_FOR_LOW_CARDINALITY_VARIANTS(M) \
M(low_cardinality_key8) \
M(low_cardinality_key16) \
M(low_cardinality_key32) \
M(low_cardinality_key64) \
M(low_cardinality_keys128) \
M(low_cardinality_keys256) \
M(low_cardinality_key_string) \
M(low_cardinality_key_fixed_string) \
M(low_cardinality_key32_two_level) \
M(low_cardinality_key64_two_level) \
M(low_cardinality_keys128_two_level) \
M(low_cardinality_keys256_two_level) \
M(low_cardinality_key_string_two_level) \
M(low_cardinality_key_fixed_string_two_level) \
bool isLowCardinality()
{
switch (type)
{
#define M(NAME) \
case Type::NAME: return true;
APPLY_FOR_LOW_CARDINALITY_VARIANTS(M)
#undef M
default:
return false;
}
}
static AggregationStateCachePtr createCache(Type type, const AggregationStateCache::Settings & settings)
{
switch (type)
{
case Type::without_key: return nullptr;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: \
{ \
using TPtr ## NAME = decltype(AggregatedDataVariants::NAME); \
using T ## NAME = typename TPtr ## NAME ::element_type; \
return T ## NAME ::createCache(settings); \
}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
default:
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
}
};
using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;
@ -862,6 +1314,9 @@ public:
const std::string tmp_path;
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
Params(
const Block & src_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
@ -870,7 +1325,7 @@ public:
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
const std::string & tmp_path_)
const std::string & tmp_path_, size_t max_threads_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
@ -878,14 +1333,14 @@ public:
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
tmp_path(tmp_path_)
tmp_path(tmp_path_), max_threads(max_threads_)
{
}
/// Only parameters that matter during merge.
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, false, "")
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, false, "", max_threads_)
{
intermediate_header = intermediate_header_;
}
@ -978,6 +1433,8 @@ protected:
AggregatedDataVariants::Type method_chosen;
Sizes key_sizes;
AggregationStateCachePtr aggregation_state_cache;
AggregateFunctionsPlainPtrs aggregate_functions;
/** This array serves two purposes.

View File

@ -962,8 +962,8 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
{
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<ConvertColumnWithDictionaryToFullBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(stream, expression));
stream = //std::make_shared<ConvertColumnWithDictionaryToFullBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(stream, expression); //);
});
Names key_names;
@ -993,7 +993,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath());
context.getTemporaryPath(), settings.max_threads);
/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
@ -1053,10 +1053,10 @@ void InterpreterSelectQuery::executeMergeAggregated(Pipeline & pipeline, bool ov
* but it can work more slowly.
*/
Aggregator::Params params(header, keys, aggregates, overflow_row);
const Settings & settings = context.getSettingsRef();
Aggregator::Params params(header, keys, aggregates, overflow_row, settings.max_threads);
if (!settings.distributed_aggregation_memory_efficient)
{
/// We union several sources into one, parallelizing the work.
@ -1120,7 +1120,7 @@ void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificato
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath());
context.getTemporaryPath(), settings.max_threads);
if (modificator == Modificator::ROLLUP)
pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);

View File

@ -79,7 +79,7 @@ int main(int argc, char ** argv)
Aggregator::Params params(
stream->getHeader(), {0, 1}, aggregate_descriptions,
false, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, false, "");
false, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, false, "", 1);
Aggregator aggregator(params);

View File

@ -0,0 +1,26 @@
2
a_1
a_2
b_1
b_2
3
3 b_1
3 b_2
3 a_1
3 a_2
a_1 b_1
a_2 b_2
3
3
3
3
3
3
3
3
a_1 3
a_2 3
b_1 3
b_2 3
a_1 b_1 3
a_2 b_2 3

View File

@ -0,0 +1,24 @@
set allow_experimental_low_cardinality_type = 1;
drop table if exists test.tab;
create table test.tab (a String, b StringWithDictionary) engine = MergeTree order by a;
insert into test.tab values ('a_1', 'b_1'), ('a_2', 'b_2');
select count() from test.tab;
select a from test.tab group by a order by a;
select b from test.tab group by b order by b;
select length(b) as l from test.tab group by l;
select sum(length(a)), b from test.tab group by b order by b;
select sum(length(b)), a from test.tab group by a order by a;
select a, b from test.tab group by a, b order by a, b;
select sum(length(a)) from test.tab group by b, b || '_';
select length(b) as l from test.tab group by l;
select length(b) as l from test.tab group by l, l + 1;
select length(b) as l from test.tab group by l, l + 1, l + 2;
select length(b) as l from test.tab group by l, l + 1, l + 2, l + 3;
select length(b) as l from test.tab group by l, l + 1, l + 2, l + 3, l + 4;
select length(b) as l from test.tab group by l, l + 1, l + 2, l + 3, l + 4, l + 5;
select a, length(b) as l from test.tab group by a, l, l + 1 order by a;
select b, length(b) as l from test.tab group by b, l, l + 1 order by b;
select a, b, length(b) as l from test.tab group by a, b, l, l + 1 order by a, b;
drop table if exists test.tab;

View File

@ -0,0 +1,7 @@
a 1 LowCardinality(UInt32)
a 1 UInt32
a 1 LowCardinality(UInt32)
a 1 LowCardinality(String)
a 1 LowCardinality(UInt32)
a 1 String
a 1 LowCardinality(UInt32)

View File

@ -0,0 +1,19 @@
set allow_experimental_low_cardinality_type = 1;
create table test.tab (a String, b LowCardinality(UInt32)) engine = MergeTree order by a;
insert into test.tab values ('a', 1);
select *, toTypeName(b) from test.tab;
alter table test.tab modify column b UInt32;
select *, toTypeName(b) from test.tab;
alter table test.tab modify column b LowCardinality(UInt32);
select *, toTypeName(b) from test.tab;
alter table test.tab modify column b StringWithDictionary;
select *, toTypeName(b) from test.tab;
alter table test.tab modify column b LowCardinality(UInt32);
select *, toTypeName(b) from test.tab;
alter table test.tab modify column b String;
select *, toTypeName(b) from test.tab;
alter table test.tab modify column b LowCardinality(UInt32);
select *, toTypeName(b) from test.tab;
drop table if exists test.tab;