From 331457990920387d13593cca91e9481c6d14baef Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Mon, 20 Jul 2015 20:09:43 +0300 Subject: [PATCH 1/6] Merge --- .../AggregateFunctionUniq.h | 43 +++ .../DB/Common/CombinedCardinalityEstimator.h | 251 ++++++++++++++++++ dbms/include/DB/Common/HashTable/HashTable.h | 4 +- .../AggregateFunctionFactory.cpp | 21 ++ 4 files changed, 317 insertions(+), 2 deletions(-) create mode 100644 dbms/include/DB/Common/CombinedCardinalityEstimator.h diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionUniq.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionUniq.h index 9d063d8ea8b..146bb6a9394 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionUniq.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionUniq.h @@ -15,6 +15,7 @@ #include #include #include +#include #include @@ -117,6 +118,26 @@ struct AggregateFunctionUniqExactData }; +template +struct AggregateFunctionUniqCombinedData +{ + using Key = T; + using Set = CombinedCardinalityEstimator, HashTableGrower<4> >, 16, 16, 19>; + Set set; + + static String getName() { return "uniqCombined"; } +}; + +template <> +struct AggregateFunctionUniqCombinedData +{ + using Key = UInt64; + using Set = CombinedCardinalityEstimator, HashTableGrower<4> >, 16, 16, 19>; + Set set; + + static String getName() { return "uniqCombined"; } +}; + namespace detail { /** Структура для делегации работы по добавлению одного элемента в агрегатные функции uniq. @@ -166,6 +187,28 @@ namespace detail data.set.insert(key); } }; + + template + struct OneAdder > + { + static void addOne(AggregateFunctionUniqCombinedData & data, const IColumn & column, size_t row_num) + { + if (data.set.isMedium()) + data.set.insert(static_cast &>(column).getData()[row_num]); + else + data.set.insert(AggregateFunctionUniqTraits::hash(static_cast &>(column).getData()[row_num])); + } + }; + + template<> + struct OneAdder > + { + static void addOne(AggregateFunctionUniqCombinedData & data, const IColumn & column, size_t row_num) + { + StringRef value = column.getDataAt(row_num); + data.set.insert(CityHash64(value.data, value.size)); + } + }; } diff --git a/dbms/include/DB/Common/CombinedCardinalityEstimator.h b/dbms/include/DB/Common/CombinedCardinalityEstimator.h new file mode 100644 index 00000000000..a3d15a8c155 --- /dev/null +++ b/dbms/include/DB/Common/CombinedCardinalityEstimator.h @@ -0,0 +1,251 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ + +namespace details +{ + +enum class ContainerType { SMALL, MEDIUM, LARGE }; + +ContainerType max(const ContainerType & lhs, const ContainerType & rhs) +{ + unsigned int res = std::max(static_cast(lhs), static_cast(rhs)); + return static_cast(res); +} + +} + +/** Для маленького количества ключей - массив фиксированного размера "на стеке". + * Для среднего - выделяется HashSet. + * Для большого - выделяется HyperLogLog. + */ +template +class CombinedCardinalityEstimator +{ +public: + using Self = CombinedCardinalityEstimator; + +private: + using Small = SmallSet; + using Medium = HashContainer; + using Large = HyperLogLogWithSmallSetOptimization; + +public: + ~CombinedCardinalityEstimator() + { + if (container_type == details::ContainerType::MEDIUM) + { + delete medium; + + if (current_memory_tracker) + current_memory_tracker->free(sizeof(medium)); + } + else if (container_type == details::ContainerType::LARGE) + { + delete large; + + if (current_memory_tracker) + current_memory_tracker->free(sizeof(large)); + } + } + + void insert(Key value) + { + if (container_type == details::ContainerType::SMALL) + { + if (small.find(value) == small.end()) + { + if (!small.full()) + small.insert(value); + else + { + toMedium(); + medium->insert(value); + } + } + } + else if (container_type == details::ContainerType::MEDIUM) + { + if (medium->size() < medium_set_size_max) + medium->insert(value); + else + { + toLarge(); + large->insert(value); + } + } + else if (container_type == details::ContainerType::LARGE) + large->insert(value); + else + throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR); + } + + UInt32 size() const + { + if (container_type == details::ContainerType::SMALL) + return small.size(); + else if (container_type == details::ContainerType::MEDIUM) + return medium->size(); + else if (container_type == details::ContainerType::LARGE) + return large->size(); + else + throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR); + } + + void merge(const Self & rhs) + { + details::ContainerType max_container_type = details::max(container_type, rhs.container_type); + + if (container_type != max_container_type) + { + if (max_container_type == details::ContainerType::MEDIUM) + toMedium(); + else if (max_container_type == details::ContainerType::LARGE) + toLarge(); + } + + if (container_type == details::ContainerType::SMALL) + { + for (const auto & x : rhs.small) + insert(x); + } + else if (container_type == details::ContainerType::MEDIUM) + { + if (rhs.container_type == details::ContainerType::SMALL) + { + for (const auto & x : rhs.small) + insert(x); + } + else if (rhs.container_type == details::ContainerType::MEDIUM) + { + for (const auto & x : *rhs.medium) + insert(x); + } + } + else if (container_type == details::ContainerType::LARGE) + { + if (rhs.container_type == details::ContainerType::SMALL) + { + for (const auto & x : rhs.small) + insert(x); + } + else if (rhs.container_type == details::ContainerType::MEDIUM) + { + for (const auto & x : *rhs.medium) + insert(x); + } + else if (rhs.container_type == details::ContainerType::LARGE) + large->merge(*rhs.large); + } + else + throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR); + } + + /// Можно вызывать только для пустого объекта. + void read(DB::ReadBuffer & in) + { + UInt8 v; + readBinary(v, in); + details::ContainerType t = static_cast(v); + + if (t == details::ContainerType::SMALL) + small.read(in); + else if (t == details::ContainerType::MEDIUM) + { + toMedium(); + medium->read(in); + } + else if (t == details::ContainerType::LARGE) + { + toLarge(); + large->read(in); + } + else + throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR); + } + + void readAndMerge(DB::ReadBuffer & in) + { + Self other; + other.read(in); + merge(other); + } + + void write(DB::WriteBuffer & out) const + { + UInt8 v = static_cast(container_type); + writeBinary(v, out); + + if (container_type == details::ContainerType::SMALL) + small.write(out); + else if (container_type == details::ContainerType::MEDIUM) + medium->write(out); + else if (container_type == details::ContainerType::LARGE) + large->write(out); + else + throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR); + } + + bool isMedium() const + { + return container_type == details::ContainerType::MEDIUM; + } + +private: + void toMedium() + { + if (container_type != details::ContainerType::SMALL) + throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR); + + container_type = details::ContainerType::MEDIUM; + + if (current_memory_tracker) + current_memory_tracker->alloc(sizeof(medium)); + + Medium * tmp_medium = new Medium; + + for (const auto & x : small) + tmp_medium->insert(x); + + medium = tmp_medium; + } + + void toLarge() + { + if ((container_type != details::ContainerType::SMALL) && (container_type != details::ContainerType::MEDIUM)) + throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR); + + container_type = details::ContainerType::LARGE; + + if (current_memory_tracker) + current_memory_tracker->alloc(sizeof(large)); + + Large * tmp_large = new Large; + + for (const auto & x : *medium) + tmp_large->insert(x); + + large = tmp_large; + + delete medium; + medium = nullptr; + + if (current_memory_tracker) + current_memory_tracker->free(sizeof(medium)); + } + +private: + Small small; + Medium * medium = nullptr; + Large * large = nullptr; + const UInt32 medium_set_size_max = 1UL << medium_set_power2_max; + details::ContainerType container_type = details::ContainerType::SMALL; +}; + +} diff --git a/dbms/include/DB/Common/HashTable/HashTable.h b/dbms/include/DB/Common/HashTable/HashTable.h index 1fdee83c54b..67196746ae5 100644 --- a/dbms/include/DB/Common/HashTable/HashTable.h +++ b/dbms/include/DB/Common/HashTable/HashTable.h @@ -757,7 +757,7 @@ public: { Cell x; x.read(rb); - insert(x); + insert(Cell::getKey(x.getValue())); } } @@ -781,7 +781,7 @@ public: Cell x; DB::assertString(",", rb); x.readText(rb); - insert(x); + insert(Cell::getKey(x.getValue())); } } diff --git a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp index 39464720135..4676d21bdda 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp +++ b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp @@ -351,6 +351,26 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da else throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); } + else if (name == "uniqCombined") + { + if (argument_types.size() != 1) + throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + const IDataType & argument_type = *argument_types[0]; + + AggregateFunctionPtr res = createWithNumericType(*argument_types[0]); + + if (res) + return res; + else if (typeid_cast(&argument_type)) + return new AggregateFunctionUniq>; + else if (typeid_cast(&argument_type)) + return new AggregateFunctionUniq>; + else if (typeid_cast(&argument_type) || typeid_cast(&argument_type)) + return new AggregateFunctionUniq>; + else + throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } else if (name == "uniqUpTo") { if (argument_types.size() != 1) @@ -706,6 +726,7 @@ const AggregateFunctionFactory::FunctionNames & AggregateFunctionFactory::getFun "uniq", "uniqHLL12", "uniqExact", + "uniqCombined", "uniqUpTo", "groupArray", "groupUniqArray", From 18e886588d24c5ed1d4da5122d8ed34ad9bc6efe Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Mon, 20 Jul 2015 20:29:59 +0300 Subject: [PATCH 2/6] dbms: Server: Fixed bug. [#METR-17276] --- .../DB/Common/CombinedCardinalityEstimator.h | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/dbms/include/DB/Common/CombinedCardinalityEstimator.h b/dbms/include/DB/Common/CombinedCardinalityEstimator.h index a3d15a8c155..43c11380668 100644 --- a/dbms/include/DB/Common/CombinedCardinalityEstimator.h +++ b/dbms/include/DB/Common/CombinedCardinalityEstimator.h @@ -203,8 +203,6 @@ private: if (container_type != details::ContainerType::SMALL) throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR); - container_type = details::ContainerType::MEDIUM; - if (current_memory_tracker) current_memory_tracker->alloc(sizeof(medium)); @@ -214,6 +212,8 @@ private: tmp_medium->insert(x); medium = tmp_medium; + + container_type = details::ContainerType::MEDIUM; } void toLarge() @@ -221,23 +221,34 @@ private: if ((container_type != details::ContainerType::SMALL) && (container_type != details::ContainerType::MEDIUM)) throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR); - container_type = details::ContainerType::LARGE; - if (current_memory_tracker) current_memory_tracker->alloc(sizeof(large)); Large * tmp_large = new Large; - for (const auto & x : *medium) - tmp_large->insert(x); + if (container_type == details::ContainerType::SMALL) + { + for (const auto & x : small) + tmp_large->insert(x); + } + else if (container_type == details::ContainerType::MEDIUM) + { + for (const auto & x : *medium) + tmp_large->insert(x); + } large = tmp_large; - delete medium; - medium = nullptr; + if (container_type == details::ContainerType::MEDIUM) + { + delete medium; + medium = nullptr; - if (current_memory_tracker) - current_memory_tracker->free(sizeof(medium)); + if (current_memory_tracker) + current_memory_tracker->free(sizeof(medium)); + } + + container_type = details::ContainerType::LARGE; } private: From c956fff1d07d43c37f4f222a0a27a9c439ad70b9 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 20 Jul 2015 20:41:51 +0300 Subject: [PATCH 3/6] dbms: add FarmHash64 and two variations of MetroHash64 to hash_map_string_3 text [#METR-15838] --- .../Interpreters/tests/hash_map_string_3.cpp | 36 +++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/dbms/src/Interpreters/tests/hash_map_string_3.cpp b/dbms/src/Interpreters/tests/hash_map_string_3.cpp index e25600d9858..1e1a52509d1 100644 --- a/dbms/src/Interpreters/tests/hash_map_string_3.cpp +++ b/dbms/src/Interpreters/tests/hash_map_string_3.cpp @@ -4,6 +4,9 @@ #include +#include +#include + #define DBMS_HASH_MAP_COUNT_COLLISIONS #define DBMS_HASH_MAP_DEBUG_RESIZES @@ -32,8 +35,8 @@ for file in MobilePhoneModel PageCharset Params URLDomain UTMSource Referer URL if [[ $TOTAL_ELEMS -gt 25000000 ]]; then break; fi ./hash_map_string_3 $size $method < ${file}.bin 2>&1 | grep HashMap | grep -oE '[0-9\.]+ elem'; - done | awk -W interactive '{ if ($1 > x) { x = $1 }; printf(".") } END { print x }' | tee /tmp/hash_map_string_2_res; - CUR_RESULT=$(cat /tmp/hash_map_string_2_res | tr -d '.') + done | awk -W interactive '{ if ($1 > x) { x = $1 }; printf(".") } END { print x }' | tee /tmp/hash_map_string_3_res; + CUR_RESULT=$(cat /tmp/hash_map_string_3_res | tr -d '.') if [[ $CUR_RESULT -gt $BEST_RESULT ]]; then BEST_METHOD=$method BEST_RESULT=$CUR_RESULT @@ -277,6 +280,32 @@ struct VerySimpleHash }; +struct FarmHash64 +{ + size_t operator() (StringRef x) const + { + return farmhash::Hash64(x.data, x.size); + } +}; + + +template +struct MetroHash64 +{ + size_t operator() (StringRef x) const + { + union { + std::uint64_t u64; + std::uint8_t u8[sizeof(u64)]; + }; + + metrohash64(reinterpret_cast(x.data), x.size, 0, u8); + + return u64; + } +}; + + /*struct CRC32Hash { size_t operator() (StringRef x) const @@ -426,6 +455,9 @@ int main(int argc, char ** argv) if (!m || m == 5) bench (data, "StringRef_CRC32Hash"); if (!m || m == 6) bench (data, "StringRef_CRC32ILPHash"); if (!m || m == 7) bench(data, "StringRef_VerySimpleHash"); + if (!m || m == 8) bench(data, "StringRef_FarmHash64"); + if (!m || m == 9) bench>(data, "StringRef_MetroHash64_1"); + if (!m || m == 10) bench>(data, "StringRef_MetroHash64_2"); return 0; } From 9364b2a13aac79b9733476869d65b31cb07e975e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 20 Jul 2015 23:05:34 +0300 Subject: [PATCH 4/6] dbms: fixed error in split functions [#METR-17374]. --- dbms/include/DB/Functions/FunctionsStringArray.h | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dbms/include/DB/Functions/FunctionsStringArray.h b/dbms/include/DB/Functions/FunctionsStringArray.h index 30c1b532f65..80f6a77b4c7 100644 --- a/dbms/include/DB/Functions/FunctionsStringArray.h +++ b/dbms/include/DB/Functions/FunctionsStringArray.h @@ -132,7 +132,8 @@ public: const ColumnConstString * col = typeid_cast(&*block.getByPosition(arguments[0]).column); if (!col) - throw Exception("Illegal column " + col->getName() + " of first argument of function " + getName() + ". Must be constant string.", + throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName() + + " of first argument of function " + getName() + ". Must be constant string.", ErrorCodes::ILLEGAL_COLUMN); const String & sep_str = col->getData(); @@ -198,7 +199,8 @@ public: const ColumnConstString * col = typeid_cast(&*block.getByPosition(arguments[0]).column); if (!col) - throw Exception("Illegal column " + col->getName() + " of first argument of function " + getName() + ". Must be constant string.", + throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName() + + " of first argument of function " + getName() + ". Must be constant string.", ErrorCodes::ILLEGAL_COLUMN); sep = col->getData(); @@ -264,7 +266,8 @@ public: const ColumnConstString * col = typeid_cast(&*block.getByPosition(arguments[1]).column); if (!col) - throw Exception("Illegal column " + col->getName() + " of first argument of function " + getName() + ". Must be constant string.", + throw Exception("Illegal column " + block.getByPosition(arguments[1]).column->getName() + + " of first argument of function " + getName() + ". Must be constant string.", ErrorCodes::ILLEGAL_COLUMN); re = Regexps::get(col->getData()); From 45bfe8e36105a978310457daac8a6c39dbe55874 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 21 Jul 2015 00:43:53 +0300 Subject: [PATCH 5/6] dbms: fixed error with DISTINCT [#METR-17364]. --- dbms/src/Interpreters/InterpreterSelectQuery.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index d520dd275cd..8e41ba968dd 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -522,6 +522,9 @@ void InterpreterSelectQuery::executeSingleQuery() if (has_order_by) executeOrder(streams); + if (has_order_by && query.limit_length) + executeDistinct(streams, false, selected_columns); + if (query.limit_length) executePreLimit(streams); } From 291fb2ad3e8e15c272f7cc8b714c693f9e1b5f0e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 21 Jul 2015 04:09:59 +0300 Subject: [PATCH 6/6] dbms: added test [#METR-17364]. --- ...00200_distinct_order_by_limit_distributed.reference | 10 ++++++++++ .../00200_distinct_order_by_limit_distributed.sql | 5 +++++ 2 files changed, 15 insertions(+) create mode 100644 dbms/tests/queries/0_stateless/00200_distinct_order_by_limit_distributed.reference create mode 100644 dbms/tests/queries/0_stateless/00200_distinct_order_by_limit_distributed.sql diff --git a/dbms/tests/queries/0_stateless/00200_distinct_order_by_limit_distributed.reference b/dbms/tests/queries/0_stateless/00200_distinct_order_by_limit_distributed.reference new file mode 100644 index 00000000000..8b1acc12b63 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00200_distinct_order_by_limit_distributed.reference @@ -0,0 +1,10 @@ +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 diff --git a/dbms/tests/queries/0_stateless/00200_distinct_order_by_limit_distributed.sql b/dbms/tests/queries/0_stateless/00200_distinct_order_by_limit_distributed.sql new file mode 100644 index 00000000000..85a7c434f16 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00200_distinct_order_by_limit_distributed.sql @@ -0,0 +1,5 @@ +DROP TABLE IF EXISTS numbers_memory; +CREATE TABLE numbers_memory AS system.numbers ENGINE = Memory; +INSERT INTO numbers_memory SELECT number FROM system.numbers LIMIT 100; +SELECT DISTINCT number FROM remote('127.0.0.{2,3}', default.numbers_memory) ORDER BY number LIMIT 10; +DROP TABLE numbers_memory;