Low cardinality group by signle column. [#CLICKHOUSE-3903]

This commit is contained in:
Nikolai Kochetov 2018-08-21 17:53:51 +03:00
parent a475bb1c9f
commit 787e814e4c
8 changed files with 263 additions and 57 deletions

View File

@ -442,7 +442,7 @@ typename ColumnVector<IndexType>::Container & ColumnWithDictionary::Index::getPo
}
template <typename IndexType>
typename const ColumnVector<IndexType>::Container & ColumnWithDictionary::Index::getPositionsData() const
const typename ColumnVector<IndexType>::Container & ColumnWithDictionary::Index::getPositionsData() const
{
const auto * positions_ptr = typeid_cast<const ColumnVector<IndexType> *>(positions.get());
if (!positions_ptr)
@ -595,6 +595,18 @@ void ColumnWithDictionary::Index::checkSizeOfType()
", but positions are " + positions->getName(), ErrorCodes::LOGICAL_ERROR);
}
void ColumnWithDictionary::Index::countKeys(ColumnUInt64::Container & counts) const
{
auto counter = [&](auto x)
{
using CurIndexType = decltype(x);
auto & data = getPositionsData<CurIndexType>();
for (auto pos : data)
++counts[pos];
};
callForType(std::move(counter), size_of_type);
}
ColumnWithDictionary::Dictionary::Dictionary(MutableColumnPtr && column_unique_)
: column_unique(std::move(column_unique_))
@ -639,17 +651,4 @@ void ColumnWithDictionary::Dictionary::compact(ColumnPtr & positions)
shared = false;
}
void ColumnWithDictionary::Dictionary::countKeys(ColumnUInt64::Container & counts) const
{
auto counter = [&](auto x)
{
using CurIndexType = decltype(x);
auto & data = getPositionsData<CurIndexType>();
for (auto pos : data)
++counts[pos];
};
callForType(std::move(counter), size_of_type);
}
}

View File

@ -205,7 +205,7 @@ public:
typename ColumnVector<IndexType>::Container & getPositionsData();
template <typename IndexType>
typename const ColumnVector<IndexType>::Container & getPositionsData() const;
const typename ColumnVector<IndexType>::Container & getPositionsData() const;
template <typename IndexType>
void convertPositions();

View File

@ -841,4 +841,12 @@ void registerDataTypeWithDictionary(DataTypeFactory & factory)
factory.registerDataType("LowCardinality", create);
}
DataTypePtr removeLowCardinality(const DataTypePtr & type)
{
if (auto * type_with_dictionary = typeid_cast<const DataTypeWithDictionary *>(type.get()))
return type_with_dictionary->getDictionaryType();
return type;
}
}

View File

@ -167,4 +167,6 @@ private:
static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const Creator & creator);
};
DataTypePtr removeLowCardinality(const DataTypePtr & type);
}

View File

@ -459,6 +459,7 @@ DataTypePtr FunctionBuilderImpl::getReturnType(const ColumnsWithTypeAndName & ar
{
bool has_low_cardinality = false;
size_t num_full_low_cardinality_columns = 0;
size_t num_full_ordinary_columns = 0;
ColumnsWithTypeAndName args_without_dictionary(arguments);
@ -476,9 +477,12 @@ DataTypePtr FunctionBuilderImpl::getReturnType(const ColumnsWithTypeAndName & ar
if (!is_const)
++num_full_low_cardinality_columns;
}
else if (!is_const)
++num_full_ordinary_columns;
}
if (canBeExecutedOnLowCardinalityDictionary() && has_low_cardinality && num_full_low_cardinality_columns <= 1)
if (canBeExecutedOnLowCardinalityDictionary() && has_low_cardinality
&& num_full_low_cardinality_columns <= 1 && num_full_ordinary_columns == 0)
return std::make_shared<DataTypeWithDictionary>(getReturnTypeWithoutDictionary(args_without_dictionary));
else
return getReturnTypeWithoutDictionary(args_without_dictionary);

View File

@ -25,6 +25,7 @@
#if __has_include(<Interpreters/config_compile.h>)
#include <Interpreters/config_compile.h>
#include <Columns/ColumnWithDictionary.h>
#include <DataTypes/DataTypeWithDictionary.h>
#endif
@ -100,7 +101,15 @@ Block Aggregator::getHeader(bool final) const
if (params.src_header)
{
for (size_t i = 0; i < params.keys_size; ++i)
res.insert(params.src_header.safeGetByPosition(params.keys[i]).cloneEmpty());
{
auto col = params.src_header.safeGetByPosition(params.keys[i]).cloneEmpty();
// if (auto * col_with_dict = typeid_cast<const ColumnWithDictionary *>(col.column.get()))
// {
// col.column = col_with_dict->getDictionary().getNestedColumn()->cloneEmpty();
// col.type = removeLowCardinality(col.type);
// }
res.insert(std::move(col));
}
for (size_t i = 0; i < params.aggregates_size; ++i)
{
@ -374,18 +383,25 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
DataTypes types_removed_nullable;
types_removed_nullable.reserve(params.keys.size());
bool has_nullable_key = false;
bool has_low_cardinality = false;
for (const auto & pos : params.keys)
{
const auto & type = (params.src_header ? params.src_header : params.intermediate_header).safeGetByPosition(pos).type;
DataTypePtr type = (params.src_header ? params.src_header : params.intermediate_header).safeGetByPosition(pos).type;
if (type->withDictionary())
{
has_low_cardinality = true;
type = removeLowCardinality(type);
}
if (type->isNullable())
{
has_nullable_key = true;
types_removed_nullable.push_back(removeNullable(type));
type = removeNullable(type);
}
else
types_removed_nullable.push_back(type);
types_removed_nullable.push_back(type);
}
/** Returns ordinary (not two-level) methods, because we start from them.
@ -441,6 +457,19 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
if (params.keys_size == 1 && types_removed_nullable[0]->isValueRepresentedByNumber())
{
size_t size_of_field = types_removed_nullable[0]->getSizeOfValueInMemory();
if (has_low_cardinality)
{
if (size_of_field == 1)
return AggregatedDataVariants::Type::low_cardinality_key8;
if (size_of_field == 2)
return AggregatedDataVariants::Type::low_cardinality_key16;
if (size_of_field == 4)
return AggregatedDataVariants::Type::low_cardinality_key32;
if (size_of_field == 8)
return AggregatedDataVariants::Type::low_cardinality_key64;
}
if (size_of_field == 1)
return AggregatedDataVariants::Type::key8;
if (size_of_field == 2)
@ -455,7 +484,7 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
}
/// If all keys fits in N bits, will use hash table with all keys packed (placed contiguously) to single N-bit key.
if (params.keys_size == num_fixed_contiguous_keys)
if (params.keys_size == num_fixed_contiguous_keys && !has_low_cardinality)
{
if (keys_bytes <= 16)
return AggregatedDataVariants::Type::keys128;
@ -465,10 +494,20 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
/// If single string key - will use hash table with references to it. Strings itself are stored separately in Arena.
if (params.keys_size == 1 && types_removed_nullable[0]->isString())
return AggregatedDataVariants::Type::key_string;
{
if (has_low_cardinality)
return AggregatedDataVariants::Type::low_cardinality_key_string;
else
return AggregatedDataVariants::Type::key_string;
}
if (params.keys_size == 1 && types_removed_nullable[0]->isFixedString())
return AggregatedDataVariants::Type::key_fixed_string;
{
if (has_low_cardinality)
return AggregatedDataVariants::Type::low_cardinality_key_fixed_string;
else
return AggregatedDataVariants::Type::key_fixed_string;
}
/** If it is possible to use 'concat' method due to one-to-one correspondense. Otherwise the method will be 'serialized'.
*/
@ -785,6 +824,21 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
/// How to perform the aggregation?
if (result.empty())
{
result.init(method);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
if (params.compiler)
compileIfPossible(result.type);
}
if (isCancelled())
return true;
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
@ -792,7 +846,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
* To make them work anyway, we materialize them.
*/
Columns materialized_columns;
ColumnRawPtrs key_counts;
// ColumnRawPtrs key_counts;
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
@ -807,16 +861,12 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
if (const auto * column_with_dictionary = typeid_cast<const ColumnWithDictionary *>(key_columns[i]))
{
if (params.keys_size == 1)
{
materialized_columns.push_back(column_with_dictionary->countKeys());
key_columns[i] = column_with_dictionary->getDictionary().getNestedColumn().get();
}
else
if (!result.isLowCardinality())
{
materialized_columns.push_back(column_with_dictionary->convertToFullColumn());
key_columns[i] = materialized_columns.back().get();
}
key_counts.push_back(materialized_columns.back().get());
//key_counts.push_back(materialized_columns.back().get());
}
}
@ -834,6 +884,12 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
materialized_columns.push_back(converted);
aggregate_columns[i][j] = materialized_columns.back().get();
}
if (auto * col_with_dict = typeid_cast<const ColumnWithDictionary *>(aggregate_columns[i][j]))
{
materialized_columns.push_back(col_with_dict->convertToFullColumn());
aggregate_columns[i][j] = materialized_columns.back().get();
}
}
aggregate_functions_instructions[i].that = aggregate_functions[i];
@ -847,21 +903,6 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
size_t rows = block.rows();
/// How to perform the aggregation?
if (result.empty())
{
result.init(method);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
if (params.compiler)
compileIfPossible(result.type);
}
if (isCancelled())
return true;
if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
{
AggregateDataPtr place = result.aggregates_pool->alloc(total_size_of_aggregate_states);
@ -926,14 +967,14 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
/// When there is no dynamically compiled code.
else
{
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, rows, key_columns, &aggregate_functions_instructions[0], \
result.key_sizes, key, no_more_keys, overflow_row_ptr);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, rows, key_columns, &aggregate_functions_instructions[0], \
result.key_sizes, key, no_more_keys, overflow_row_ptr);
if (false) {}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
if (false) {}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
}

View File

@ -26,6 +26,7 @@
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnWithDictionary.h>
namespace DB
@ -155,6 +156,11 @@ struct AggregationMethodOneNumber
{
static_cast<ColumnVector<FieldType> *>(key_columns[0].get())->insertData(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
static StringRef getRef(const typename Data::value_type & value)
{
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
};
@ -214,6 +220,11 @@ struct AggregationMethodString
static const bool no_consecutive_keys_optimization = false;
static StringRef getRef(const typename Data::value_type & value)
{
return StringRef(value.first.data, value.first.size);
}
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &)
{
key_columns[0]->insertData(value.first.data, value.first.size);
@ -275,12 +286,94 @@ struct AggregationMethodFixedString
static const bool no_consecutive_keys_optimization = false;
static StringRef getRef(const typename Data::value_type & value)
{
return StringRef(value.first.data, value.first.size);
}
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &)
{
key_columns[0]->insertData(value.first.data, value.first.size);
}
};
/// Single low cardinality column.
template <typename SingleColumnMethod>
struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
{
using Base = SingleColumnMethod;
using BaseState = typename Base::State;
using Data = typename Base::Data;
using Key = typename Base::Key;
using Mapped = typename Base::Mapped;
using iterator = typename Base::iterator;
using const_iterator = typename Base::const_iterator;
Data data;
AggregationMethodSingleLowCardinalityColumn() = default;
template <typename Other>
explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {}
struct State : public BaseState
{
ColumnRawPtrs key;
const ColumnWithDictionary * column;
/** Called at the start of each block processing.
* Sets the variables needed for the other methods called in inner loops.
*/
void init(ColumnRawPtrs & key_columns)
{
column = typeid_cast<const ColumnWithDictionary *>(key_columns[0]);
if (!column)
throw Exception("Invalid aggregation key type for AggregationMethodSingleLowCardinalityColumn method. "
"Excepted LowCardinality, got " + key_columns[0]->getName(), ErrorCodes::LOGICAL_ERROR);
key = {column->getDictionary().getNestedColumn().get()};
BaseState::init(key);
}
/// Get the key from the key columns for insertion into the hash table.
Key getKey(
const ColumnRawPtrs & /*key_columns*/,
size_t /*keys_size*/,
size_t i,
const Sizes & key_sizes,
StringRefs & keys,
Arena & pool) const
{
size_t row = column->getIndexes().getUInt(i);
return BaseState::getKey(key, 1, row, key_sizes, keys, pool);
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return Base::getAggregateData(value); }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return Base::getAggregateData(value); }
static void onNewKey(typename Data::value_type & value, size_t keys_size, StringRefs & keys, Arena & pool)
{
return Base::onNewKey(value, keys_size, keys, pool);
}
static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool)
{
return Base::onExistingKey(key, keys, pool);
}
using Base::no_consecutive_keys_optimization;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t /*keys_size*/, const Sizes & /*key_sizes*/)
{
auto ref = Base::getRef(value);
static_cast<ColumnWithDictionary *>(key_columns[0].get())->insertData(ref.data, ref.size);
}
};
namespace aggregator_impl
{
@ -757,6 +850,19 @@ struct AggregatedDataVariants : private boost::noncopyable
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, true>> nullable_keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, true>> nullable_keys256_two_level;
/// Support for low cardinality.
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key>>> low_cardinality_key8;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key>>> low_cardinality_key16;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>>> low_cardinality_key32;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>>> low_cardinality_key64;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithStringKey>>> low_cardinality_key_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithStringKey>>> low_cardinality_key_fixed_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>>> low_cardinality_key32_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>>> low_cardinality_key64_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithStringKeyTwoLevel>>> low_cardinality_key_string_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithStringKeyTwoLevel>>> low_cardinality_key_fixed_string_two_level;
/// In this and similar macros, the option without_key is not considered.
#define APPLY_FOR_AGGREGATED_VARIANTS(M) \
M(key8, false) \
@ -790,6 +896,16 @@ struct AggregatedDataVariants : private boost::noncopyable
M(nullable_keys256, false) \
M(nullable_keys128_two_level, true) \
M(nullable_keys256_two_level, true) \
M(low_cardinality_key8, false) \
M(low_cardinality_key16, false) \
M(low_cardinality_key32, false) \
M(low_cardinality_key64, false) \
M(low_cardinality_key_string, false) \
M(low_cardinality_key_fixed_string, false) \
M(low_cardinality_key32_two_level, true) \
M(low_cardinality_key64_two_level, true) \
M(low_cardinality_key_string_two_level, true) \
M(low_cardinality_key_fixed_string_two_level, true) \
enum class Type
{
@ -909,6 +1025,10 @@ struct AggregatedDataVariants : private boost::noncopyable
M(serialized) \
M(nullable_keys128) \
M(nullable_keys256) \
M(low_cardinality_key32) \
M(low_cardinality_key64) \
M(low_cardinality_key_string) \
M(low_cardinality_key_fixed_string) \
#define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
M(key8) \
@ -920,6 +1040,8 @@ struct AggregatedDataVariants : private boost::noncopyable
M(keys256_hash64) \
M(concat_hash64) \
M(serialized_hash64) \
M(low_cardinality_key8) \
M(low_cardinality_key16) \
#define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \
APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
@ -953,7 +1075,37 @@ struct AggregatedDataVariants : private boost::noncopyable
M(concat_two_level) \
M(serialized_two_level) \
M(nullable_keys128_two_level) \
M(nullable_keys256_two_level)
M(nullable_keys256_two_level) \
M(low_cardinality_key32_two_level) \
M(low_cardinality_key64_two_level) \
M(low_cardinality_key_string_two_level) \
M(low_cardinality_key_fixed_string_two_level) \
#define APPLY_FOR_LOW_CARDINALITY_VARIANTS(M) \
M(low_cardinality_key8) \
M(low_cardinality_key16) \
M(low_cardinality_key32) \
M(low_cardinality_key64) \
M(low_cardinality_key_string) \
M(low_cardinality_key_fixed_string) \
M(low_cardinality_key32_two_level) \
M(low_cardinality_key64_two_level) \
M(low_cardinality_key_string_two_level) \
M(low_cardinality_key_fixed_string_two_level) \
bool isLowCardinality()
{
switch (type)
{
#define M(NAME) \
case Type::NAME: return true;
APPLY_FOR_LOW_CARDINALITY_VARIANTS(M)
#undef M
default:
return false;
}
}
};
using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;

View File

@ -830,8 +830,8 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
{
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<ConvertColumnWithDictionaryToFullBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(stream, expression));
stream = //std::make_shared<ConvertColumnWithDictionaryToFullBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(stream, expression); //);
});
Names key_names;