diff --git a/dbms/src/Columns/ColumnWithDictionary.cpp b/dbms/src/Columns/ColumnWithDictionary.cpp index 2a0e211d1b2..431df3c490f 100644 --- a/dbms/src/Columns/ColumnWithDictionary.cpp +++ b/dbms/src/Columns/ColumnWithDictionary.cpp @@ -442,7 +442,7 @@ typename ColumnVector::Container & ColumnWithDictionary::Index::getPo } template -typename const ColumnVector::Container & ColumnWithDictionary::Index::getPositionsData() const +const typename ColumnVector::Container & ColumnWithDictionary::Index::getPositionsData() const { const auto * positions_ptr = typeid_cast *>(positions.get()); if (!positions_ptr) @@ -595,6 +595,18 @@ void ColumnWithDictionary::Index::checkSizeOfType() ", but positions are " + positions->getName(), ErrorCodes::LOGICAL_ERROR); } +void ColumnWithDictionary::Index::countKeys(ColumnUInt64::Container & counts) const +{ + auto counter = [&](auto x) + { + using CurIndexType = decltype(x); + auto & data = getPositionsData(); + for (auto pos : data) + ++counts[pos]; + }; + callForType(std::move(counter), size_of_type); +} + ColumnWithDictionary::Dictionary::Dictionary(MutableColumnPtr && column_unique_) : column_unique(std::move(column_unique_)) @@ -639,17 +651,4 @@ void ColumnWithDictionary::Dictionary::compact(ColumnPtr & positions) shared = false; } - -void ColumnWithDictionary::Dictionary::countKeys(ColumnUInt64::Container & counts) const -{ - auto counter = [&](auto x) - { - using CurIndexType = decltype(x); - auto & data = getPositionsData(); - for (auto pos : data) - ++counts[pos]; - }; - callForType(std::move(counter), size_of_type); -} - } diff --git a/dbms/src/Columns/ColumnWithDictionary.h b/dbms/src/Columns/ColumnWithDictionary.h index 84a13fb5b66..1f7eacb54d0 100644 --- a/dbms/src/Columns/ColumnWithDictionary.h +++ b/dbms/src/Columns/ColumnWithDictionary.h @@ -205,7 +205,7 @@ public: typename ColumnVector::Container & getPositionsData(); template - typename const ColumnVector::Container & getPositionsData() const; + const typename ColumnVector::Container & getPositionsData() const; template void convertPositions(); diff --git a/dbms/src/DataTypes/DataTypeWithDictionary.cpp b/dbms/src/DataTypes/DataTypeWithDictionary.cpp index 8877eb820f1..30d412ccc4f 100644 --- a/dbms/src/DataTypes/DataTypeWithDictionary.cpp +++ b/dbms/src/DataTypes/DataTypeWithDictionary.cpp @@ -841,4 +841,12 @@ void registerDataTypeWithDictionary(DataTypeFactory & factory) factory.registerDataType("LowCardinality", create); } + +DataTypePtr removeLowCardinality(const DataTypePtr & type) +{ + if (auto * type_with_dictionary = typeid_cast(type.get())) + return type_with_dictionary->getDictionaryType(); + return type; +} + } diff --git a/dbms/src/DataTypes/DataTypeWithDictionary.h b/dbms/src/DataTypes/DataTypeWithDictionary.h index b3b8bba6abe..765a09935ad 100644 --- a/dbms/src/DataTypes/DataTypeWithDictionary.h +++ b/dbms/src/DataTypes/DataTypeWithDictionary.h @@ -167,4 +167,6 @@ private: static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const Creator & creator); }; +DataTypePtr removeLowCardinality(const DataTypePtr & type); + } diff --git a/dbms/src/Functions/IFunction.cpp b/dbms/src/Functions/IFunction.cpp index f0735b189a2..f11bfd2e5d6 100644 --- a/dbms/src/Functions/IFunction.cpp +++ b/dbms/src/Functions/IFunction.cpp @@ -459,6 +459,7 @@ DataTypePtr FunctionBuilderImpl::getReturnType(const ColumnsWithTypeAndName & ar { bool has_low_cardinality = false; size_t num_full_low_cardinality_columns = 0; + size_t num_full_ordinary_columns = 0; ColumnsWithTypeAndName args_without_dictionary(arguments); @@ -476,9 +477,12 @@ DataTypePtr FunctionBuilderImpl::getReturnType(const ColumnsWithTypeAndName & ar if (!is_const) ++num_full_low_cardinality_columns; } + else if (!is_const) + ++num_full_ordinary_columns; } - if (canBeExecutedOnLowCardinalityDictionary() && has_low_cardinality && num_full_low_cardinality_columns <= 1) + if (canBeExecutedOnLowCardinalityDictionary() && has_low_cardinality + && num_full_low_cardinality_columns <= 1 && num_full_ordinary_columns == 0) return std::make_shared(getReturnTypeWithoutDictionary(args_without_dictionary)); else return getReturnTypeWithoutDictionary(args_without_dictionary); diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index cb8bc9c3725..d84ccfbf3f5 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -25,6 +25,7 @@ #if __has_include() #include #include +#include #endif @@ -100,7 +101,15 @@ Block Aggregator::getHeader(bool final) const if (params.src_header) { for (size_t i = 0; i < params.keys_size; ++i) - res.insert(params.src_header.safeGetByPosition(params.keys[i]).cloneEmpty()); + { + auto col = params.src_header.safeGetByPosition(params.keys[i]).cloneEmpty(); +// if (auto * col_with_dict = typeid_cast(col.column.get())) +// { +// col.column = col_with_dict->getDictionary().getNestedColumn()->cloneEmpty(); +// col.type = removeLowCardinality(col.type); +// } + res.insert(std::move(col)); + } for (size_t i = 0; i < params.aggregates_size; ++i) { @@ -374,18 +383,25 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod() DataTypes types_removed_nullable; types_removed_nullable.reserve(params.keys.size()); bool has_nullable_key = false; + bool has_low_cardinality = false; for (const auto & pos : params.keys) { - const auto & type = (params.src_header ? params.src_header : params.intermediate_header).safeGetByPosition(pos).type; + DataTypePtr type = (params.src_header ? params.src_header : params.intermediate_header).safeGetByPosition(pos).type; + + if (type->withDictionary()) + { + has_low_cardinality = true; + type = removeLowCardinality(type); + } if (type->isNullable()) { has_nullable_key = true; - types_removed_nullable.push_back(removeNullable(type)); + type = removeNullable(type); } - else - types_removed_nullable.push_back(type); + + types_removed_nullable.push_back(type); } /** Returns ordinary (not two-level) methods, because we start from them. @@ -441,6 +457,19 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod() if (params.keys_size == 1 && types_removed_nullable[0]->isValueRepresentedByNumber()) { size_t size_of_field = types_removed_nullable[0]->getSizeOfValueInMemory(); + + if (has_low_cardinality) + { + if (size_of_field == 1) + return AggregatedDataVariants::Type::low_cardinality_key8; + if (size_of_field == 2) + return AggregatedDataVariants::Type::low_cardinality_key16; + if (size_of_field == 4) + return AggregatedDataVariants::Type::low_cardinality_key32; + if (size_of_field == 8) + return AggregatedDataVariants::Type::low_cardinality_key64; + } + if (size_of_field == 1) return AggregatedDataVariants::Type::key8; if (size_of_field == 2) @@ -455,7 +484,7 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod() } /// If all keys fits in N bits, will use hash table with all keys packed (placed contiguously) to single N-bit key. - if (params.keys_size == num_fixed_contiguous_keys) + if (params.keys_size == num_fixed_contiguous_keys && !has_low_cardinality) { if (keys_bytes <= 16) return AggregatedDataVariants::Type::keys128; @@ -465,10 +494,20 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod() /// If single string key - will use hash table with references to it. Strings itself are stored separately in Arena. if (params.keys_size == 1 && types_removed_nullable[0]->isString()) - return AggregatedDataVariants::Type::key_string; + { + if (has_low_cardinality) + return AggregatedDataVariants::Type::low_cardinality_key_string; + else + return AggregatedDataVariants::Type::key_string; + } if (params.keys_size == 1 && types_removed_nullable[0]->isFixedString()) - return AggregatedDataVariants::Type::key_fixed_string; + { + if (has_low_cardinality) + return AggregatedDataVariants::Type::low_cardinality_key_fixed_string; + else + return AggregatedDataVariants::Type::key_fixed_string; + } /** If it is possible to use 'concat' method due to one-to-one correspondense. Otherwise the method will be 'serialized'. */ @@ -785,6 +824,21 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re /// `result` will destroy the states of aggregate functions in the destructor result.aggregator = this; + /// How to perform the aggregation? + if (result.empty()) + { + result.init(method); + result.keys_size = params.keys_size; + result.key_sizes = key_sizes; + LOG_TRACE(log, "Aggregation method: " << result.getMethodName()); + + if (params.compiler) + compileIfPossible(result.type); + } + + if (isCancelled()) + return true; + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_columns[i].resize(params.aggregates[i].arguments.size()); @@ -792,7 +846,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re * To make them work anyway, we materialize them. */ Columns materialized_columns; - ColumnRawPtrs key_counts; + // ColumnRawPtrs key_counts; /// Remember the columns we will work with for (size_t i = 0; i < params.keys_size; ++i) @@ -807,16 +861,12 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re if (const auto * column_with_dictionary = typeid_cast(key_columns[i])) { - if (params.keys_size == 1) - { - materialized_columns.push_back(column_with_dictionary->countKeys()); - key_columns[i] = column_with_dictionary->getDictionary().getNestedColumn().get(); - } - else + if (!result.isLowCardinality()) { materialized_columns.push_back(column_with_dictionary->convertToFullColumn()); + key_columns[i] = materialized_columns.back().get(); } - key_counts.push_back(materialized_columns.back().get()); + //key_counts.push_back(materialized_columns.back().get()); } } @@ -834,6 +884,12 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re materialized_columns.push_back(converted); aggregate_columns[i][j] = materialized_columns.back().get(); } + + if (auto * col_with_dict = typeid_cast(aggregate_columns[i][j])) + { + materialized_columns.push_back(col_with_dict->convertToFullColumn()); + aggregate_columns[i][j] = materialized_columns.back().get(); + } } aggregate_functions_instructions[i].that = aggregate_functions[i]; @@ -847,21 +903,6 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re size_t rows = block.rows(); - /// How to perform the aggregation? - if (result.empty()) - { - result.init(method); - result.keys_size = params.keys_size; - result.key_sizes = key_sizes; - LOG_TRACE(log, "Aggregation method: " << result.getMethodName()); - - if (params.compiler) - compileIfPossible(result.type); - } - - if (isCancelled()) - return true; - if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key) { AggregateDataPtr place = result.aggregates_pool->alloc(total_size_of_aggregate_states); @@ -926,14 +967,14 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re /// When there is no dynamically compiled code. else { - #define M(NAME, IS_TWO_LEVEL) \ - else if (result.type == AggregatedDataVariants::Type::NAME) \ - executeImpl(*result.NAME, result.aggregates_pool, rows, key_columns, &aggregate_functions_instructions[0], \ - result.key_sizes, key, no_more_keys, overflow_row_ptr); + #define M(NAME, IS_TWO_LEVEL) \ + else if (result.type == AggregatedDataVariants::Type::NAME) \ + executeImpl(*result.NAME, result.aggregates_pool, rows, key_columns, &aggregate_functions_instructions[0], \ + result.key_sizes, key, no_more_keys, overflow_row_ptr); - if (false) {} - APPLY_FOR_AGGREGATED_VARIANTS(M) - #undef M + if (false) {} + APPLY_FOR_AGGREGATED_VARIANTS(M) + #undef M } } diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 68170a28025..38b3ea3b459 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -26,6 +26,7 @@ #include #include #include +#include namespace DB @@ -155,6 +156,11 @@ struct AggregationMethodOneNumber { static_cast *>(key_columns[0].get())->insertData(reinterpret_cast(&value.first), sizeof(value.first)); } + + static StringRef getRef(const typename Data::value_type & value) + { + return StringRef(reinterpret_cast(&value.first), sizeof(value.first)); + } }; @@ -214,6 +220,11 @@ struct AggregationMethodString static const bool no_consecutive_keys_optimization = false; + static StringRef getRef(const typename Data::value_type & value) + { + return StringRef(value.first.data, value.first.size); + } + static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &) { key_columns[0]->insertData(value.first.data, value.first.size); @@ -275,12 +286,94 @@ struct AggregationMethodFixedString static const bool no_consecutive_keys_optimization = false; + static StringRef getRef(const typename Data::value_type & value) + { + return StringRef(value.first.data, value.first.size); + } + static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &) { key_columns[0]->insertData(value.first.data, value.first.size); } }; + +/// Single low cardinality column. +template +struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod +{ + using Base = SingleColumnMethod; + using BaseState = typename Base::State; + + using Data = typename Base::Data; + using Key = typename Base::Key; + using Mapped = typename Base::Mapped; + using iterator = typename Base::iterator; + using const_iterator = typename Base::const_iterator; + + Data data; + + AggregationMethodSingleLowCardinalityColumn() = default; + + template + explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {} + + struct State : public BaseState + { + ColumnRawPtrs key; + const ColumnWithDictionary * column; + + /** Called at the start of each block processing. + * Sets the variables needed for the other methods called in inner loops. + */ + void init(ColumnRawPtrs & key_columns) + { + column = typeid_cast(key_columns[0]); + if (!column) + throw Exception("Invalid aggregation key type for AggregationMethodSingleLowCardinalityColumn method. " + "Excepted LowCardinality, got " + key_columns[0]->getName(), ErrorCodes::LOGICAL_ERROR); + key = {column->getDictionary().getNestedColumn().get()}; + + BaseState::init(key); + } + + /// Get the key from the key columns for insertion into the hash table. + Key getKey( + const ColumnRawPtrs & /*key_columns*/, + size_t /*keys_size*/, + size_t i, + const Sizes & key_sizes, + StringRefs & keys, + Arena & pool) const + { + size_t row = column->getIndexes().getUInt(i); + return BaseState::getKey(key, 1, row, key_sizes, keys, pool); + } + }; + + static AggregateDataPtr & getAggregateData(Mapped & value) { return Base::getAggregateData(value); } + static const AggregateDataPtr & getAggregateData(const Mapped & value) { return Base::getAggregateData(value); } + + static void onNewKey(typename Data::value_type & value, size_t keys_size, StringRefs & keys, Arena & pool) + { + return Base::onNewKey(value, keys_size, keys, pool); + } + + static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool) + { + return Base::onExistingKey(key, keys, pool); + } + + using Base::no_consecutive_keys_optimization; + + static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t /*keys_size*/, const Sizes & /*key_sizes*/) + { + auto ref = Base::getRef(value); + static_cast(key_columns[0].get())->insertData(ref.data, ref.size); + } +}; + + namespace aggregator_impl { @@ -757,6 +850,19 @@ struct AggregatedDataVariants : private boost::noncopyable std::unique_ptr> nullable_keys128_two_level; std::unique_ptr> nullable_keys256_two_level; + /// Support for low cardinality. + std::unique_ptr>> low_cardinality_key8; + std::unique_ptr>> low_cardinality_key16; + std::unique_ptr>> low_cardinality_key32; + std::unique_ptr>> low_cardinality_key64; + std::unique_ptr>> low_cardinality_key_string; + std::unique_ptr>> low_cardinality_key_fixed_string; + + std::unique_ptr>> low_cardinality_key32_two_level; + std::unique_ptr>> low_cardinality_key64_two_level; + std::unique_ptr>> low_cardinality_key_string_two_level; + std::unique_ptr>> low_cardinality_key_fixed_string_two_level; + /// In this and similar macros, the option without_key is not considered. #define APPLY_FOR_AGGREGATED_VARIANTS(M) \ M(key8, false) \ @@ -790,6 +896,16 @@ struct AggregatedDataVariants : private boost::noncopyable M(nullable_keys256, false) \ M(nullable_keys128_two_level, true) \ M(nullable_keys256_two_level, true) \ + M(low_cardinality_key8, false) \ + M(low_cardinality_key16, false) \ + M(low_cardinality_key32, false) \ + M(low_cardinality_key64, false) \ + M(low_cardinality_key_string, false) \ + M(low_cardinality_key_fixed_string, false) \ + M(low_cardinality_key32_two_level, true) \ + M(low_cardinality_key64_two_level, true) \ + M(low_cardinality_key_string_two_level, true) \ + M(low_cardinality_key_fixed_string_two_level, true) \ enum class Type { @@ -909,6 +1025,10 @@ struct AggregatedDataVariants : private boost::noncopyable M(serialized) \ M(nullable_keys128) \ M(nullable_keys256) \ + M(low_cardinality_key32) \ + M(low_cardinality_key64) \ + M(low_cardinality_key_string) \ + M(low_cardinality_key_fixed_string) \ #define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \ M(key8) \ @@ -920,6 +1040,8 @@ struct AggregatedDataVariants : private boost::noncopyable M(keys256_hash64) \ M(concat_hash64) \ M(serialized_hash64) \ + M(low_cardinality_key8) \ + M(low_cardinality_key16) \ #define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \ APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \ @@ -953,7 +1075,37 @@ struct AggregatedDataVariants : private boost::noncopyable M(concat_two_level) \ M(serialized_two_level) \ M(nullable_keys128_two_level) \ - M(nullable_keys256_two_level) + M(nullable_keys256_two_level) \ + M(low_cardinality_key32_two_level) \ + M(low_cardinality_key64_two_level) \ + M(low_cardinality_key_string_two_level) \ + M(low_cardinality_key_fixed_string_two_level) \ + + #define APPLY_FOR_LOW_CARDINALITY_VARIANTS(M) \ + M(low_cardinality_key8) \ + M(low_cardinality_key16) \ + M(low_cardinality_key32) \ + M(low_cardinality_key64) \ + M(low_cardinality_key_string) \ + M(low_cardinality_key_fixed_string) \ + M(low_cardinality_key32_two_level) \ + M(low_cardinality_key64_two_level) \ + M(low_cardinality_key_string_two_level) \ + M(low_cardinality_key_fixed_string_two_level) \ + + bool isLowCardinality() + { + switch (type) + { + #define M(NAME) \ + case Type::NAME: return true; + + APPLY_FOR_LOW_CARDINALITY_VARIANTS(M) + #undef M + default: + return false; + } + } }; using AggregatedDataVariantsPtr = std::shared_ptr; diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 926a0be7b5f..606548ce491 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -830,8 +830,8 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre { pipeline.transform([&](auto & stream) { - stream = std::make_shared( - std::make_shared(stream, expression)); + stream = //std::make_shared( + std::make_shared(stream, expression); //); }); Names key_names;