From 479f1fc346a1931b7c50b39a01beb1def1320749 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 7 Dec 2015 19:06:18 +0300 Subject: [PATCH 01/27] dbms: fix typo and absent support of Float64 for MySQL dictionaries [#METR-18946] --- dbms/include/DB/Dictionaries/MySQLBlockInputStream.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h b/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h index c66f0a19497..5021345ff55 100644 --- a/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h +++ b/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h @@ -77,7 +77,7 @@ public: types.push_back(value_type_t::Int64); else if (typeid_cast(type)) types.push_back(value_type_t::Float32); - else if (typeid_cast(type)) + else if (typeid_cast(type)) types.push_back(value_type_t::Float64); else if (typeid_cast(type)) types.push_back(value_type_t::String); From cf2c86956e129eb63bfae612269c67129139f049 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 7 Dec 2015 19:07:02 +0300 Subject: [PATCH 02/27] dbms: SmallObjectPool: remove minimum size check [#METR-18946] --- dbms/include/DB/Common/SmallObjectPool.h | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/dbms/include/DB/Common/SmallObjectPool.h b/dbms/include/DB/Common/SmallObjectPool.h index 61c07393252..16faf5f879d 100644 --- a/dbms/include/DB/Common/SmallObjectPool.h +++ b/dbms/include/DB/Common/SmallObjectPool.h @@ -14,10 +14,13 @@ namespace DB { +/** Can allocate memory objects of fixed size with deletion support. + * For `object_size` less than `min_allocation_size` still allocates `min_allocation_size` bytes. */ class SmallObjectPool { private: struct Block { Block * next; }; + static constexpr auto min_allocation_size = sizeof(Block); const std::size_t object_size; Arena pool; @@ -25,16 +28,11 @@ private: public: SmallObjectPool( - const std::size_t object_size, const std::size_t initial_size = 4096, const std::size_t growth_factor = 2, + const std::size_t object_size_, const std::size_t initial_size = 4096, const std::size_t growth_factor = 2, const std::size_t linear_growth_threshold = 128 * 1024 * 1024) - : object_size{object_size}, pool{initial_size, growth_factor, linear_growth_threshold} + : object_size{std::max(object_size_, min_allocation_size)}, + pool{initial_size, growth_factor, linear_growth_threshold} { - if (object_size < sizeof(Block)) - throw Exception{ - "Can't make allocations smaller than sizeof(Block) = " + std::to_string(sizeof(Block)), - ErrorCodes::LOGICAL_ERROR - }; - if (pool.size() < object_size) return; From 78b560c846c4fb68b83097889afc79bdf4b7ec56 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 7 Dec 2015 22:30:50 +0300 Subject: [PATCH 03/27] dbms: fixed error [#METR-19283]. --- dbms/src/Storages/MergeTree/PKCondition.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dbms/src/Storages/MergeTree/PKCondition.cpp b/dbms/src/Storages/MergeTree/PKCondition.cpp index 0211b053c42..aae687a90d1 100644 --- a/dbms/src/Storages/MergeTree/PKCondition.cpp +++ b/dbms/src/Storages/MergeTree/PKCondition.cpp @@ -457,6 +457,13 @@ bool PKCondition::mayBeTrueInRange(const Field * left_pk, const Field * right_pk applyFunction(func, current_type, key_range_transformed.left, new_type, key_range_transformed.left); if (!key_range_transformed.right.isNull()) applyFunction(func, current_type, key_range_transformed.right, new_type, key_range_transformed.right); + + if (!new_type) + { + evaluation_is_not_possible = true; + break; + } + current_type.swap(new_type); if (!monotonicity.is_positive) From 35db0d65b9d7f1c4421c0a139f0c1ebc6eb67b17 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 7 Dec 2015 23:08:00 +0300 Subject: [PATCH 04/27] dbms: fixed error [#METR-19288]. --- .../MergingAggregatedMemoryEfficientBlockInputStream.cpp | 9 +++++++-- dbms/src/Interpreters/Aggregator.cpp | 8 ++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index 7407e03908e..d6b42497774 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -62,7 +62,6 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start() auto memory_tracker = current_memory_tracker; task = std::packaged_task([&child, memory_tracker] { - /// memory_tracker и имя потока устанавливается здесь. Далее для всех задач в reading_pool это уже не требуется. current_memory_tracker = memory_tracker; setThreadName("MergeAggReadThr"); child->readPrefix(); @@ -276,7 +275,13 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate { if (need_that_input(input)) { - tasks.emplace_back([&input, &read_from_input] { read_from_input(input); }); + auto memory_tracker = current_memory_tracker; + tasks.emplace_back([&input, &read_from_input, memory_tracker] + { + current_memory_tracker = memory_tracker; + setThreadName("MergeAggReadThr"); + read_from_input(input); + }); auto & task = tasks.back(); reading_pool->schedule([&task] { task(); }); } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 915bff295e3..c504f224e17 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -90,7 +90,8 @@ void Aggregator::initialize(const Block & block) initialized = true; - memory_usage_before_aggregation = current_memory_tracker->get(); + if (current_memory_tracker) + memory_usage_before_aggregation = current_memory_tracker->get(); aggregate_functions.resize(params.aggregates_size); for (size_t i = 0; i < params.aggregates_size; ++i) @@ -732,7 +733,10 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, } size_t result_size = result.sizeWithoutOverflowRow(); - auto current_memory_usage = current_memory_tracker->get(); + Int64 current_memory_usage = 0; + if (current_memory_tracker) + current_memory_usage = current_memory_tracker->get(); + auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Здесь учитываются все результаты в сумме, из разных потоков. bool worth_convert_to_two_level From dc7372ab71f20cd062e3b58e92835a1c5395d329 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 8 Dec 2015 04:17:57 +0300 Subject: [PATCH 05/27] dbms: fixed error [#METR-19271]. --- dbms/include/DB/Columns/ColumnConst.h | 29 +++++++++++++++++-- .../00287_column_const_with_nan.reference | 1 + .../00287_column_const_with_nan.sql | 1 + 3 files changed, 28 insertions(+), 3 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00287_column_const_with_nan.reference create mode 100644 dbms/tests/queries/0_stateless/00287_column_const_with_nan.sql diff --git a/dbms/include/DB/Columns/ColumnConst.h b/dbms/include/DB/Columns/ColumnConst.h index b6bfa226ada..125d1f2e644 100644 --- a/dbms/include/DB/Columns/ColumnConst.h +++ b/dbms/include/DB/Columns/ColumnConst.h @@ -26,6 +26,29 @@ public: }; +namespace ColumnConstDetails +{ + template + inline bool equals(const T & x, const T & y) + { + return x == y; + } + + /// Проверяет побитовую идентичность элементов, даже если они являются NaN-ами. + template <> + inline bool equals(const Float32 & x, const Float32 & y) + { + return 0 == memcmp(&x, &y, sizeof(x)); + } + + template <> + inline bool equals(const Float64 & x, const Float64 & y) + { + return 0 == memcmp(&x, &y, sizeof(x)); + } +} + + /** Столбец-константа может содержать внутри себя само значение, * или, в случае массивов, SharedPtr от значения-массива, * чтобы избежать проблем производительности при копировании очень больших массивов. @@ -65,7 +88,7 @@ public: void insertRangeFrom(const IColumn & src, size_t start, size_t length) override { - if (getDataFromHolder() != static_cast(src).getDataFromHolder()) + if (!ColumnConstDetails::equals(getDataFromHolder(), static_cast(src).getDataFromHolder())) throw Exception("Cannot insert different element into constant column " + getName(), ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN); @@ -74,7 +97,7 @@ public: void insert(const Field & x) override { - if (x.get() != FieldType(getDataFromHolder())) + if (!ColumnConstDetails::equals(x.get(), FieldType(getDataFromHolder()))) throw Exception("Cannot insert different element into constant column " + getName(), ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN); ++s; @@ -87,7 +110,7 @@ public: void insertFrom(const IColumn & src, size_t n) override { - if (getDataFromHolder() != static_cast(src).getDataFromHolder()) + if (!ColumnConstDetails::equals(getDataFromHolder(), static_cast(src).getDataFromHolder())) throw Exception("Cannot insert different element into constant column " + getName(), ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN); ++s; diff --git a/dbms/tests/queries/0_stateless/00287_column_const_with_nan.reference b/dbms/tests/queries/0_stateless/00287_column_const_with_nan.reference new file mode 100644 index 00000000000..946573052ce --- /dev/null +++ b/dbms/tests/queries/0_stateless/00287_column_const_with_nan.reference @@ -0,0 +1 @@ +nan 1 diff --git a/dbms/tests/queries/0_stateless/00287_column_const_with_nan.sql b/dbms/tests/queries/0_stateless/00287_column_const_with_nan.sql new file mode 100644 index 00000000000..67931511ac2 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00287_column_const_with_nan.sql @@ -0,0 +1 @@ +SELECT * FROM (SELECT nan, number FROM system.numbers) WHERE number % 100 = 1 LIMIT 1; From 18c3aa441aa5900eb5cbf473c986b0022b3641a2 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 8 Dec 2015 04:43:39 +0300 Subject: [PATCH 06/27] dbms: removing old temporary files on startup [#METR-17000]. --- dbms/src/Server/Server.cpp | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 134b5e8e6d8..5c623b4fe05 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -584,10 +584,22 @@ int Server::main(const std::vector & args) global_context->setPath(path); /// Директория для временных файлов при обработке тяжёлых запросов. - std::string tmp_path = config().getString("tmp_path", path + "tmp/"); - global_context->setTemporaryPath(tmp_path); - Poco::File(tmp_path).createDirectories(); - /// TODO Очистка временных файлов. Проверка, что директория с временными файлами не совпадает и не содержит в себе основной path. + { + std::string tmp_path = config().getString("tmp_path", path + "tmp/"); + global_context->setTemporaryPath(tmp_path); + Poco::File(tmp_path).createDirectories(); + + /// Очистка временных файлов. + Poco::DirectoryIterator dir_end; + for (Poco::DirectoryIterator it(tmp_path); it != dir_end; ++it) + { + if (it->isFile() && 0 == it.name().compare(0, 3, "tmp")) + { + LOG_DEBUG(log, "Removing old temporary file " << it->path()); + it->remove(); + } + } + } bool has_zookeeper = false; if (config().has("zookeeper")) From 50d463d08b2da5e430321d1dcdb469d2f8ef5686 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 8 Dec 2015 05:01:46 +0300 Subject: [PATCH 07/27] dbms: fixed error [#METR-17000]. --- dbms/include/DB/Interpreters/Aggregator.h | 9 +++------ dbms/src/Interpreters/Aggregator.cpp | 17 ++++++++++------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 84aa515c060..fe9d37f7df4 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -849,18 +849,15 @@ public: * которые могут быть затем объединены с другими состояниями (для распределённой обработки запроса). * Если final = true, то в качестве столбцов-агрегатов создаются столбцы с готовыми значениями. */ - BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads); + BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const; /** Объединить несколько структур данных агрегации в одну. (В первый непустой элемент массива.) - * После объединения, все стркутуры агрегации (а не только те, в которую они будут слиты) должны жить, - * пока не будет вызвана функция convertToBlocks. - * Это нужно, так как в слитом результате могут остаться указатели на память в пуле, которым владеют другие структуры агрегации. */ - AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants, size_t max_threads); + AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants, size_t max_threads) const; /** Объединить несколько структур данных агрегации и выдать результат в виде потока блоков. */ - std::unique_ptr mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads); + std::unique_ptr mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const; /** Объединить поток частично агрегированных блоков в одну структуру данных. * (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций с удалённых серверов.) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index c504f224e17..c3dcb6a78d3 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -1269,7 +1269,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl( } -BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) +BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const { if (isCancelled()) return BlocksList(); @@ -1618,7 +1618,7 @@ void NO_INLINE Aggregator::mergeTwoLevelDataImpl( } -AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants, size_t max_threads) +AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants, size_t max_threads) const { if (data_variants.empty()) throw Exception("Empty data passed to Aggregator::merge.", ErrorCodes::EMPTY_DATA_PASSED); @@ -1743,7 +1743,7 @@ public: /** На вход подаётся набор непустых множеств частично агрегированных данных, * которые все либо являются одноуровневыми, либо являются двухуровневыми. */ - MergingAndConvertingBlockInputStream(Aggregator & aggregator_, ManyAggregatedDataVariants & data_, bool final_, size_t threads_) + MergingAndConvertingBlockInputStream(const Aggregator & aggregator_, ManyAggregatedDataVariants & data_, bool final_, size_t threads_) : aggregator(aggregator_), data(data_), final(final_), threads(threads_) {} String getName() const override { return "MergingAndConverting"; } @@ -1849,7 +1849,7 @@ protected: } private: - Aggregator & aggregator; + const Aggregator & aggregator; ManyAggregatedDataVariants data; bool final; size_t threads; @@ -1919,7 +1919,7 @@ private: }; -std::unique_ptr Aggregator::mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) +std::unique_ptr Aggregator::mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const { if (data_variants.empty()) throw Exception("Empty data passed to Aggregator::mergeAndConvertToBlocks.", ErrorCodes::EMPTY_DATA_PASSED); @@ -2306,8 +2306,11 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) Block empty_block; initialize(empty_block); - if (!sample) - sample = blocks.front().cloneEmpty(); + { + std::lock_guard lock(mutex); + if (!sample) + sample = blocks.front().cloneEmpty(); + } /// Каким способом выполнять агрегацию? for (size_t i = 0; i < params.keys_size; ++i) From a57f9967700fc1cfc48a8867bf17cec39fc5eabf Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Tue, 8 Dec 2015 12:16:09 +0300 Subject: [PATCH 08/27] dbms: fix transform() accepting non constant second argument --- dbms/include/DB/Functions/FunctionsTransform.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/include/DB/Functions/FunctionsTransform.h b/dbms/include/DB/Functions/FunctionsTransform.h index 62650874989..353cb61a7e9 100644 --- a/dbms/include/DB/Functions/FunctionsTransform.h +++ b/dbms/include/DB/Functions/FunctionsTransform.h @@ -129,7 +129,7 @@ public: const ColumnConstArray * array_from = typeid_cast(&*block.getByPosition(arguments[1]).column); const ColumnConstArray * array_to = typeid_cast(&*block.getByPosition(arguments[2]).column); - if (!array_from && !array_to) + if (!array_from || !array_to) throw Exception("Second and third arguments of function " + getName() + " must be constant arrays.", ErrorCodes::ILLEGAL_COLUMN); prepare(array_from->getData(), array_to->getData(), block, arguments); From c8e3d9053636707ffb9d02dedbda455032af850b Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Tue, 8 Dec 2015 12:16:33 +0300 Subject: [PATCH 09/27] dbms: SmallObjectPool: fix odr-use in debug build --- dbms/include/DB/Common/SmallObjectPool.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/include/DB/Common/SmallObjectPool.h b/dbms/include/DB/Common/SmallObjectPool.h index 16faf5f879d..7948ae42626 100644 --- a/dbms/include/DB/Common/SmallObjectPool.h +++ b/dbms/include/DB/Common/SmallObjectPool.h @@ -15,12 +15,12 @@ namespace DB /** Can allocate memory objects of fixed size with deletion support. - * For `object_size` less than `min_allocation_size` still allocates `min_allocation_size` bytes. */ + * For small `object_size`s allocated no less than getMinAllocationSize() bytes. */ class SmallObjectPool { private: struct Block { Block * next; }; - static constexpr auto min_allocation_size = sizeof(Block); + static constexpr auto getMinAllocationSize() { return sizeof(Block); } const std::size_t object_size; Arena pool; @@ -30,7 +30,7 @@ public: SmallObjectPool( const std::size_t object_size_, const std::size_t initial_size = 4096, const std::size_t growth_factor = 2, const std::size_t linear_growth_threshold = 128 * 1024 * 1024) - : object_size{std::max(object_size_, min_allocation_size)}, + : object_size{std::max(object_size_, getMinAllocationSize())}, pool{initial_size, growth_factor, linear_growth_threshold} { if (pool.size() < object_size) From f6973a32c4959634a15f15eff9caf25d4304bdbd Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 8 Dec 2015 23:04:11 +0300 Subject: [PATCH 10/27] dbms: added support for empty StripeLog tables [#METR-19298]. --- dbms/src/Storages/StorageStripeLog.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dbms/src/Storages/StorageStripeLog.cpp b/dbms/src/Storages/StorageStripeLog.cpp index 074a5d611d6..acb2f062c0b 100644 --- a/dbms/src/Storages/StorageStripeLog.cpp +++ b/dbms/src/Storages/StorageStripeLog.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -232,6 +233,9 @@ BlockInputStreams StorageStripeLog::read( NameSet column_names_set(column_names.begin(), column_names.end()); + if (!Poco::File(full_path() + "index.mrk").exists()) + return { new NullBlockInputStream }; + CompressedReadBufferFromFile index_in(full_path() + "index.mrk", 0, 0, INDEX_BUFFER_SIZE); std::shared_ptr index{std::make_shared(index_in, column_names_set)}; From 3b9466ee0831fcb71c4e572f15a4546a0bd52c52 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 8 Dec 2015 23:05:49 +0300 Subject: [PATCH 11/27] dbms: added test [#METR-19298]. --- .../queries/0_stateless/00288_empty_stripelog.reference | 2 ++ dbms/tests/queries/0_stateless/00288_empty_stripelog.sql | 8 ++++++++ 2 files changed, 10 insertions(+) create mode 100644 dbms/tests/queries/0_stateless/00288_empty_stripelog.reference create mode 100644 dbms/tests/queries/0_stateless/00288_empty_stripelog.sql diff --git a/dbms/tests/queries/0_stateless/00288_empty_stripelog.reference b/dbms/tests/queries/0_stateless/00288_empty_stripelog.reference new file mode 100644 index 00000000000..1191247b6d9 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00288_empty_stripelog.reference @@ -0,0 +1,2 @@ +1 +2 diff --git a/dbms/tests/queries/0_stateless/00288_empty_stripelog.sql b/dbms/tests/queries/0_stateless/00288_empty_stripelog.sql new file mode 100644 index 00000000000..fddbbedaac2 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00288_empty_stripelog.sql @@ -0,0 +1,8 @@ +DROP TABLE IF EXISTS test.stripelog; +CREATE TABLE test.stripelog (x UInt8) ENGINE = StripeLog; + +SELECT * FROM test.stripelog ORDER BY x; +INSERT INTO test.stripelog VALUES (1), (2); +SELECT * FROM test.stripelog ORDER BY x; + +DROP TABLE test.stripelog; From ccd51123bf4d4ca186da2fc5c8b01a9cf87ee274 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 00:29:38 +0300 Subject: [PATCH 12/27] dbms: fixed error [#METR-19316]. --- .../MergingAggregatedMemoryEfficientBlockInputStream.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index d6b42497774..5c0fb5ec08f 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -128,6 +128,12 @@ MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEffici if (parallel_merge_data) { LOG_TRACE((&Logger::get("MergingAggregatedMemoryEfficientBlockInputStream")), "Waiting for threads to finish"); + + { + std::lock_guard lock(parallel_merge_data->get_next_blocks_mutex); + parallel_merge_data->exhausted = true; + } + parallel_merge_data->result_queue.clear(); parallel_merge_data->pool.wait(); } From d852ef480bcb77c31031e4e22e7d0dd10494c79e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 04:29:01 +0300 Subject: [PATCH 13/27] dbms: external aggregation: fixed error [#METR-19316]. --- ...rgingAggregatedMemoryEfficientBlockInputStream.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index 5c0fb5ec08f..f3b5013bb7f 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -173,7 +173,16 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker } } - parallel_merge_data->result_queue.push(aggregator.mergeBlocks(*blocks_to_merge, final)); + Block res = aggregator.mergeBlocks(*blocks_to_merge, final); + + { + std::lock_guard lock(parallel_merge_data->get_next_blocks_mutex); + + if (parallel_merge_data->exhausted) + break; + + parallel_merge_data->result_queue.push(OutputData(std::move(res))); + } } } catch (...) From 69942f38d47622f8592a92f572f5e9056435ebf5 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 05:27:45 +0300 Subject: [PATCH 14/27] dbms: fixed error with ParserAlterQuery [#METR-13097]. --- dbms/src/Parsers/ParserAlterQuery.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbms/src/Parsers/ParserAlterQuery.cpp b/dbms/src/Parsers/ParserAlterQuery.cpp index cf933805184..49cf3fc57e2 100644 --- a/dbms/src/Parsers/ParserAlterQuery.cpp +++ b/dbms/src/Parsers/ParserAlterQuery.cpp @@ -90,7 +90,8 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa return false; ws.ignore(pos, end); - parser_col_decl.parse(pos, end, params.col_decl, max_parsed_pos, expected); + if (!parser_col_decl.parse(pos, end, params.col_decl, max_parsed_pos, expected)) + return false; ws.ignore(pos, end); if (s_after.ignore(pos, end, max_parsed_pos, expected)) From 7560351942b94a1ce7aab860a8f82707a9649083 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 05:55:35 +0300 Subject: [PATCH 15/27] dbms: removed old code [#METR-17000]. --- dbms/include/DB/Interpreters/Aggregator.h | 15 -- .../AggregatingBlockInputStream.cpp | 12 +- dbms/src/Interpreters/Aggregator.cpp | 214 ------------------ 3 files changed, 6 insertions(+), 235 deletions(-) diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index fe9d37f7df4..907faa78f84 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -851,10 +851,6 @@ public: */ BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const; - /** Объединить несколько структур данных агрегации в одну. (В первый непустой элемент массива.) - */ - AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants, size_t max_threads) const; - /** Объединить несколько структур данных агрегации и выдать результат в виде потока блоков. */ std::unique_ptr mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const; @@ -1081,12 +1077,6 @@ protected: Table & table_dst, Table & table_src) const; - /// Слить все ключи, оставшиеся после предыдущего метода, в overflows. - template - void mergeDataRemainingKeysToOverflowsImpl( - AggregatedDataWithoutKey & overflows, - Table & table_src) const; - void mergeWithoutKeyDataImpl( ManyAggregatedDataVariants & non_empty_data) const; @@ -1094,11 +1084,6 @@ protected: void mergeSingleLevelDataImpl( ManyAggregatedDataVariants & non_empty_data) const; - template - void mergeTwoLevelDataImpl( - ManyAggregatedDataVariants & many_data, - boost::threadpool::pool * thread_pool) const; - template void convertToBlockImpl( Method & method, diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp index 618f2488499..bfd146d8e5e 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp @@ -12,17 +12,17 @@ Block AggregatingBlockInputStream::readImpl() if (!executed) { executed = true; - AggregatedDataVariants data_variants; + AggregatedDataVariantsPtr data_variants = new AggregatedDataVariants; Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); }; aggregator.setCancellationHook(hook); - aggregator.execute(children.back(), data_variants); + aggregator.execute(children.back(), *data_variants); if (!aggregator.hasTemporaryFiles()) { - impl.reset(new BlocksListBlockInputStream( - aggregator.convertToBlocks(data_variants, final, 1))); + ManyAggregatedDataVariants many_data { data_variants }; + impl = aggregator.mergeAndConvertToBlocks(many_data, final, 1); } else { @@ -35,9 +35,9 @@ Block AggregatingBlockInputStream::readImpl() if (!isCancelled()) { /// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще. - size_t rows = data_variants.sizeWithoutOverflowRow(); + size_t rows = data_variants->sizeWithoutOverflowRow(); if (rows) - aggregator.writeToTemporaryFile(data_variants, rows); + aggregator.writeToTemporaryFile(*data_variants, rows); } const auto & files = aggregator.getTemporaryFiles(); diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index c3dcb6a78d3..4da3dd7905c 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -1442,31 +1442,6 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl( } } -template -void NO_INLINE Aggregator::mergeDataRemainingKeysToOverflowsImpl( - AggregatedDataWithoutKey & overflows, - Table & table_src) const -{ - for (auto it = table_src.begin(); it != table_src.end(); ++it) - { - if (Method::getAggregateData(it->second) == nullptr) - continue; - - AggregateDataPtr res_data = overflows; - - for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_functions[i]->merge( - res_data + offsets_of_aggregate_states[i], - Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); - - for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_functions[i]->destroy( - Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); - - Method::getAggregateData(it->second) = nullptr; - } -} - void NO_INLINE Aggregator::mergeWithoutKeyDataImpl( ManyAggregatedDataVariants & non_empty_data) const @@ -1526,193 +1501,6 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl( } -template -void NO_INLINE Aggregator::mergeTwoLevelDataImpl( - ManyAggregatedDataVariants & non_empty_data, - boost::threadpool::pool * thread_pool) const -{ - AggregatedDataVariantsPtr & res = non_empty_data[0]; - - /// В данном случае, no_more_keys будет выставлено, только если в первом (самом большом) состоянии достаточно много строк. - bool no_more_keys = false; - if (!checkLimits(res->sizeWithoutOverflowRow(), no_more_keys)) - return; - - /// Слияние распараллеливается по корзинам - первому уровню TwoLevelHashMap. - auto merge_bucket = [&non_empty_data, &res, no_more_keys, this](size_t bucket, MemoryTracker * memory_tracker) - { - current_memory_tracker = memory_tracker; - - /// Все результаты агрегации соединяем с первым. - for (size_t i = 1, size = non_empty_data.size(); i < size; ++i) - { - AggregatedDataVariants & current = *non_empty_data[i]; - - if (!no_more_keys) - { - mergeDataImpl( - getDataVariant(*res).data.impls[bucket], - getDataVariant(current).data.impls[bucket]); - - getDataVariant(current).data.impls[bucket].clearAndShrink(); - } - else - { - mergeDataOnlyExistingKeysImpl( - getDataVariant(*res).data.impls[bucket], - getDataVariant(current).data.impls[bucket]); - } - } - }; - - /// packaged_task используются, чтобы исключения автоматически прокидывались в основной поток. - - std::vector> tasks(Method::Data::NUM_BUCKETS); - - try - { - for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket) - { - tasks[bucket] = std::packaged_task(std::bind(merge_bucket, bucket, current_memory_tracker)); - - if (thread_pool) - thread_pool->schedule([bucket, &tasks] { tasks[bucket](); }); - else - tasks[bucket](); - } - } - catch (...) - { - /// Если этого не делать, то в случае исключения, tasks уничтожится раньше завершения потоков, и будет плохо. - if (thread_pool) - thread_pool->wait(); - - throw; - } - - if (thread_pool) - thread_pool->wait(); - - for (auto & task : tasks) - if (task.valid()) - task.get_future().get(); - - if (no_more_keys && params.overflow_row) - { - for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket) - { - for (size_t i = 1, size = non_empty_data.size(); i < size; ++i) - { - AggregatedDataVariants & current = *non_empty_data[i]; - - mergeDataRemainingKeysToOverflowsImpl( - res->without_key, - getDataVariant(current).data.impls[bucket]); - } - } - } - - /// aggregator не будет уничтожать состояния агрегатных функций в деструкторе - for (size_t i = 1, size = non_empty_data.size(); i < size; ++i) - non_empty_data[i]->aggregator = nullptr; -} - - -AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants, size_t max_threads) const -{ - if (data_variants.empty()) - throw Exception("Empty data passed to Aggregator::merge.", ErrorCodes::EMPTY_DATA_PASSED); - - LOG_TRACE(log, "Merging aggregated data"); - - Stopwatch watch; - - ManyAggregatedDataVariants non_empty_data; - non_empty_data.reserve(data_variants.size()); - for (auto & data : data_variants) - if (!data->empty()) - non_empty_data.push_back(data); - - if (non_empty_data.empty()) - return data_variants[0]; - - if (non_empty_data.size() == 1) - return non_empty_data[0]; - - /// Отсортируем состояния по убыванию размера, чтобы мердж был более эффективным (так как все состояния мерджатся в первое). - std::sort(non_empty_data.begin(), non_empty_data.end(), - [](const AggregatedDataVariantsPtr & lhs, const AggregatedDataVariantsPtr & rhs) - { - return lhs->sizeWithoutOverflowRow() > rhs->sizeWithoutOverflowRow(); - }); - - /// Если хотя бы один из вариантов двухуровневый, то переконвертируем все варианты в двухуровневые, если есть не такие. - /// Замечание - возможно, было бы более оптимально не конвертировать одноуровневые варианты перед мерджем, а мерджить их отдельно, в конце. - - bool has_at_least_one_two_level = false; - for (const auto & variant : non_empty_data) - { - if (variant->isTwoLevel()) - { - has_at_least_one_two_level = true; - break; - } - } - - if (has_at_least_one_two_level) - for (auto & variant : non_empty_data) - if (!variant->isTwoLevel()) - variant->convertToTwoLevel(); - - AggregatedDataVariantsPtr & res = non_empty_data[0]; - - size_t rows = res->size(); - for (size_t i = 1, size = non_empty_data.size(); i < size; ++i) - { - rows += non_empty_data[i]->size(); - AggregatedDataVariants & current = *non_empty_data[i]; - - if (res->type != current.type) - throw Exception("Cannot merge different aggregated data variants.", ErrorCodes::CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS); - - res->aggregates_pools.insert(res->aggregates_pools.end(), current.aggregates_pools.begin(), current.aggregates_pools.end()); - } - - /// В какой структуре данных агрегированы данные? - if (res->type == AggregatedDataVariants::Type::without_key || params.overflow_row) - mergeWithoutKeyDataImpl(non_empty_data); - - std::unique_ptr thread_pool; - if (max_threads > 1 && res->isTwoLevel()) - thread_pool.reset(new boost::threadpool::pool(max_threads)); - - if (false) {} -#define M(NAME) \ - else if (res->type == AggregatedDataVariants::Type::NAME) \ - mergeSingleLevelDataImplNAME)::element_type>(non_empty_data); - APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) -#undef M -#define M(NAME) \ - else if (res->type == AggregatedDataVariants::Type::NAME) \ - mergeTwoLevelDataImplNAME)::element_type>(non_empty_data, thread_pool.get()); - APPLY_FOR_VARIANTS_TWO_LEVEL(M) -#undef M - else if (res->type != AggregatedDataVariants::Type::without_key) - throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); - - double elapsed_seconds = watch.elapsedSeconds(); - size_t res_rows = res->size(); - - LOG_TRACE(log, std::fixed << std::setprecision(3) - << "Merged aggregated data. " - << "From " << rows << " to " << res_rows << " rows (efficiency: " << static_cast(rows) / res_rows << ")" - << " in " << elapsed_seconds << " sec." - << " (" << rows / elapsed_seconds << " rows/sec.)"); - - return res; -} - - template void NO_INLINE Aggregator::mergeBucketImpl( ManyAggregatedDataVariants & data, Int32 bucket) const @@ -1734,8 +1522,6 @@ void NO_INLINE Aggregator::mergeBucketImpl( * Если состояния агрегации двухуровневые, то выдаёт блоки строго по порядку bucket_num. * (Это важно при распределённой обработке.) * При этом, может обрабатывать разные bucket-ы параллельно, используя до threads потоков. - * - * TODO Удалить обычную функцию Aggregator::merge и связанные с ней, в случае невостребованности. */ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream { From a43272c9b81cddaa44be30de1be1af1a1e3fe93c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 05:56:18 +0300 Subject: [PATCH 16/27] dbms: fixed error in memory-efficient merging of aggregated states [#METR-17000]. --- dbms/include/DB/Interpreters/Aggregator.h | 7 ++++- dbms/src/Interpreters/Aggregator.cpp | 32 +++++++++++------------ 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 907faa78f84..cf7c8f3310a 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -968,10 +968,15 @@ protected: TemporaryFiles temporary_files; /** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов. - * Сформировать блок - пример результата. + * Сформировать блок - пример результата. Он используется в методах convertToBlocks, mergeAndConvertToBlocks. */ void initialize(const Block & block); + /** Установить блок - пример результата, + * только если он ещё не был установлен. + */ + void setSampleBlock(const Block & block); + /** Выбрать способ агрегации на основе количества и типов ключей. */ AggregatedDataVariants::Type chooseAggregationMethod(const ConstColumnPlainPtrs & key_columns, Sizes & key_sizes); diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 4da3dd7905c..6c66b5a658e 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -155,6 +155,15 @@ void Aggregator::initialize(const Block & block) } +void Aggregator::setSampleBlock(const Block & block) +{ + std::lock_guard lock(mutex); + + if (!sample) + sample = block.cloneEmpty(); +} + + void Aggregator::compileIfPossible(AggregatedDataVariants::Type type) { std::lock_guard lock(mutex); @@ -1895,8 +1904,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants AggregateColumnsData aggregate_columns(params.aggregates_size); - Block empty_block; - initialize(empty_block); + initialize({}); if (isCancelled()) return; @@ -1929,8 +1937,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants if (bucket_to_blocks.empty()) return; - if (!sample) - sample = bucket_to_blocks.begin()->second.front().cloneEmpty(); + setSampleBlock(bucket_to_blocks.begin()->second.front()); /// Каким способом выполнять агрегацию? for (size_t i = 0; i < params.keys_size; ++i) @@ -2089,14 +2096,8 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) AggregateColumnsData aggregate_columns(params.aggregates_size); - Block empty_block; - initialize(empty_block); - - { - std::lock_guard lock(mutex); - if (!sample) - sample = blocks.front().cloneEmpty(); - } + initialize({}); + setSampleBlock(blocks.front()); /// Каким способом выполнять агрегацию? for (size_t i = 0; i < params.keys_size; ++i) @@ -2261,11 +2262,8 @@ std::vector Aggregator::convertBlockToTwoLevel(const Block & block) if (!block) return {}; - Block empty_block; - initialize(empty_block); - - if (!sample) - sample = block.cloneEmpty(); + initialize({}); + setSampleBlock(block); AggregatedDataVariants data; From d1fc6175f33fb445521827f4add69a418a176cf2 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 06:30:54 +0300 Subject: [PATCH 17/27] dbms: added revision to query_log table [#METR-19319]. --- dbms/src/Interpreters/QueryLog.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dbms/src/Interpreters/QueryLog.cpp b/dbms/src/Interpreters/QueryLog.cpp index e491e5138ae..28851ef93a6 100644 --- a/dbms/src/Interpreters/QueryLog.cpp +++ b/dbms/src/Interpreters/QueryLog.cpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace DB @@ -203,6 +204,7 @@ Block QueryLog::createBlock() {new ColumnFixedString(16), new DataTypeFixedString(16), "ip_address"}, {new ColumnString, new DataTypeString, "user"}, {new ColumnString, new DataTypeString, "query_id"}, + {new ColumnUInt32, new DataTypeUInt32, "revision"}, }; } @@ -262,6 +264,8 @@ void QueryLog::flush() block.unsafeGetByPosition(i++).column.get()->insertData(elem.user.data(), elem.user.size()); block.unsafeGetByPosition(i++).column.get()->insertData(elem.query_id.data(), elem.query_id.size()); + + block.unsafeGetByPosition(i++).column.get()->insert(static_cast(Revision::get())); } BlockOutputStreamPtr stream = table->write({}, {}); From e348105481391e0d96758e25c543e116995ee967 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 07:06:44 +0300 Subject: [PATCH 18/27] dbms: get rid of bad idea [#METR-19056]. --- dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h index e44da4f3c0a..9e6efe6fa02 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -53,11 +53,10 @@ public: /** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение. * Все новые вызовы будут бросать исключения, пока не будет вызван uncancel(). - * Считает количество таких вызовов для поддержки нескольких наложенных друг на друга отмен. */ - void cancel() { ++cancelled; } - void uncancel() { --cancelled; } - bool isCancelled() const { return cancelled > 0; } + void cancel() { cancelled = true; } + void uncancel() { cancelled = false; } + bool isCancelled() const { return cancelled; } private: MergeTreeData & data; @@ -67,7 +66,7 @@ private: /// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто). time_t disk_space_warning_time = 0; - std::atomic cancelled {0}; + std::atomic cancelled {false}; }; From 03f091c01c852fddf2b6d0d120394be5830490df Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 07:28:01 +0300 Subject: [PATCH 19/27] dbms: fixed error [#METR-19316]. --- .../MergingAggregatedMemoryEfficientBlockInputStream.h | 3 ++- .../MergingAggregatedMemoryEfficientBlockInputStream.cpp | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h index e5875c874c9..1d989db265f 100644 --- a/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h @@ -83,7 +83,8 @@ private: boost::threadpool::pool pool; std::mutex get_next_blocks_mutex; ConcurrentBoundedQueue result_queue; - bool exhausted = false; + bool exhausted = false; /// Данных больше нет. + bool finish = false; /// Нужно завершить работу раньше, чем данные закончились. std::atomic active_threads; ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads), active_threads(max_threads) {} diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index f3b5013bb7f..44e6ad4ccd0 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -131,7 +131,7 @@ MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEffici { std::lock_guard lock(parallel_merge_data->get_next_blocks_mutex); - parallel_merge_data->exhausted = true; + parallel_merge_data->finish = true; } parallel_merge_data->result_queue.clear(); @@ -161,7 +161,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker { std::lock_guard lock(parallel_merge_data->get_next_blocks_mutex); - if (parallel_merge_data->exhausted) + if (parallel_merge_data->exhausted || parallel_merge_data->finish) break; blocks_to_merge = getNextBlocksToMerge(); @@ -178,7 +178,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker { std::lock_guard lock(parallel_merge_data->get_next_blocks_mutex); - if (parallel_merge_data->exhausted) + if (parallel_merge_data->finish) break; parallel_merge_data->result_queue.push(OutputData(std::move(res))); From e0653fda4ebb905afde952a47fdd70246ee61056 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 07:41:46 +0300 Subject: [PATCH 20/27] dbms: little better [#METR-19172]. --- dbms/src/Storages/StorageReplicatedMergeTree.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 762b84ba5af..6785968ef49 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3054,10 +3054,10 @@ void StorageReplicatedMergeTree::drop() if (is_readonly) throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY); - auto zookeeper = getZooKeeper(); - shutdown(); + auto zookeeper = getZooKeeper(); + LOG_INFO(log, "Removing replica " << replica_path); replica_is_active_node = nullptr; zookeeper->tryRemoveRecursive(replica_path); From 6260ac34a36623cc75a3d728f8849f9d8ba3aa70 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 08:05:42 +0300 Subject: [PATCH 21/27] dbms: fixed high CPU usage on shutdown [#METR-19056]. --- dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp index f82a694b811..33f481ef328 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp @@ -118,6 +118,9 @@ void BackgroundProcessingPool::threadFunction() /// O(n), n - число задач. По сути, количество таблиц. Обычно их мало. for (const auto & handle : tasks) { + if (handle->removed) + continue; + time_t next_time_to_execute = handle->next_time_to_execute; if (next_time_to_execute < min_time) @@ -144,9 +147,6 @@ void BackgroundProcessingPool::threadFunction() continue; } - if (task->removed) - continue; - /// Лучшей задачи не нашлось, а эта задача в прошлый раз ничего не сделала, и поэтому ей назначено некоторое время спать. time_t current_time = time(0); if (min_time > current_time) From 6eb47b0db47d0817d8589914db59dfd534a78ab7 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 09:04:00 +0300 Subject: [PATCH 22/27] dbms: fixed error with StorageBuffer [#METR-19249]. --- dbms/src/Storages/StorageBuffer.cpp | 17 ++++---- .../0_stateless/00289_buffer_test.reference | 2 + .../queries/0_stateless/00289_buffer_test.sh | 41 +++++++++++++++++++ 3 files changed, 53 insertions(+), 7 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00289_buffer_test.reference create mode 100755 dbms/tests/queries/0_stateless/00289_buffer_test.sh diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index b9ac246c68b..158325c973e 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -141,6 +141,9 @@ BlockInputStreams StorageBuffer::read( static void appendBlock(const Block & from, Block & to) { + if (!to) + throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR); + size_t rows = from.rows(); for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no) { @@ -243,13 +246,13 @@ private: buffer.first_write_time = time(0); buffer.data = sorted_block.cloneEmpty(); } - - /** Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер. - * Это также защищает от неограниченного потребления оперативки, так как в случае невозможности записать в таблицу, - * будет выкинуто исключение, а новые данные не будут добавлены в буфер. - */ - if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes())) + else if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes())) { + /** Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер. + * Это также защищает от неограниченного потребления оперативки, так как в случае невозможности записать в таблицу, + * будет выкинуто исключение, а новые данные не будут добавлены в буфер. + */ + lock.unlock(); storage.flushBuffer(buffer, false); lock.lock(); @@ -321,7 +324,7 @@ void StorageBuffer::flushAllBuffers(const bool check_thresholds) void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds) { - Block block_to_write; + Block block_to_write = buffer.data.cloneEmpty(); time_t current_time = check_thresholds ? time(0) : 0; /** Довольно много проблем из-за того, что хотим блокировать буфер лишь на короткое время. diff --git a/dbms/tests/queries/0_stateless/00289_buffer_test.reference b/dbms/tests/queries/0_stateless/00289_buffer_test.reference new file mode 100644 index 00000000000..dc546d0f000 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00289_buffer_test.reference @@ -0,0 +1,2 @@ +20000 1 20000 200010000 20000 +20000 1 20000 200010000 20000 diff --git a/dbms/tests/queries/0_stateless/00289_buffer_test.sh b/dbms/tests/queries/0_stateless/00289_buffer_test.sh new file mode 100755 index 00000000000..240b3295914 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00289_buffer_test.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +clickhouse-client -n --query=" + DROP TABLE IF EXISTS test.dst; + DROP TABLE IF EXISTS test.buffer; + + CREATE TABLE test.dst (x UInt64, d Date DEFAULT today()) ENGINE = MergeTree(d, x, 8192); + CREATE TABLE test.buffer (x UInt64, d Date DEFAULT today()) ENGINE = Buffer(test, dst, 16, 1, 100, 10000, 10, 1000, 100000); + "; + +seq 1 1000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 1001 2000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 2001 3000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 3001 4000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 4001 5000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 5001 6000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 6001 7000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 7001 8000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 8001 9000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 9001 10000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 10001 11000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 11001 12000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 12001 13000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 13001 14000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 14001 15000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 15001 16000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 16001 17000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 17001 18000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 18001 19000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & +seq 19001 20000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & + +wait + +clickhouse-client --query="SELECT count(), min(x), max(x), sum(x), uniqExact(x) FROM test.buffer;"; +clickhouse-client --query="OPTIMIZE TABLE test.buffer;"; +clickhouse-client --query="SELECT count(), min(x), max(x), sum(x), uniqExact(x) FROM test.dst;"; + +clickhouse-client -n --query=" + DROP TABLE test.dst; + DROP TABLE test.buffer; + "; From 9e33ab55193043bf109774822539de271f3cfc8e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 09:10:13 +0300 Subject: [PATCH 23/27] dbms: fixed error with StorageBuffer [#METR-19249]. --- dbms/src/Storages/StorageBuffer.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 158325c973e..5e8450d3234 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -243,7 +243,6 @@ private: if (!buffer.data) { - buffer.first_write_time = time(0); buffer.data = sorted_block.cloneEmpty(); } else if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes())) @@ -258,6 +257,9 @@ private: lock.lock(); } + if (!buffer.first_write_time) + buffer.first_write_time = time(0); + appendBlock(sorted_block, buffer.data); } }; From e47383477010eb50a19b078ad8a637caf1ff09e7 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 09:16:24 +0300 Subject: [PATCH 24/27] dbms: StorageBuffer: write correct info about block flushed [#METR-19249]. --- dbms/src/Storages/StorageBuffer.cpp | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 5e8450d3234..95a79b52f28 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -306,14 +306,9 @@ bool StorageBuffer::checkThresholds(Buffer & buffer, time_t current_time, size_t size_t rows = buffer.data.rowsInFirstColumn() + additional_rows; size_t bytes = buffer.data.bytes() + additional_bytes; - bool res = + return (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes) || (time_passed > max_thresholds.time || rows > max_thresholds.rows || bytes > max_thresholds.bytes); - - if (res) - LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds."); - - return res; } @@ -329,6 +324,10 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds) Block block_to_write = buffer.data.cloneEmpty(); time_t current_time = check_thresholds ? time(0) : 0; + size_t rows = 0; + size_t bytes = 0; + time_t time_passed = 0; + /** Довольно много проблем из-за того, что хотим блокировать буфер лишь на короткое время. * Под блокировкой, получаем из буфера блок, и заменяем в нём блок на новый пустой. * Затем пытаемся записать полученный блок в подчинённую таблицу. @@ -338,6 +337,11 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds) { std::lock_guard lock(buffer.mutex); + rows = buffer.data.rowsInFirstColumn(); + bytes = buffer.data.bytes(); + if (buffer.first_write_time) + time_passed = current_time - buffer.first_write_time; + if (check_thresholds) { if (!checkThresholds(buffer, current_time)) @@ -345,7 +349,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds) } else { - if (buffer.data.rowsInFirstColumn() == 0) + if (rows == 0) return; } @@ -353,6 +357,8 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds) buffer.first_write_time = 0; } + LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds."); + if (no_destination) return; From db8d82f13e03e7fe33aecd6d08423d09054e1199 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 9 Dec 2015 09:55:49 +0300 Subject: [PATCH 25/27] dbms: Buffer: better [#METR-19249]. --- dbms/include/DB/Storages/StorageBuffer.h | 3 ++- dbms/src/Storages/StorageBuffer.cpp | 18 +++++++++++++----- .../queries/0_stateless/00289_buffer_test.sh | 2 +- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/dbms/include/DB/Storages/StorageBuffer.h b/dbms/include/DB/Storages/StorageBuffer.h index 0ec8d04b79b..a601dd7302f 100644 --- a/dbms/include/DB/Storages/StorageBuffer.h +++ b/dbms/include/DB/Storages/StorageBuffer.h @@ -128,7 +128,8 @@ private: void flushAllBuffers(bool check_thresholds = true); /// Сбросить буфер. Если выставлено check_thresholds - сбрасывает только если превышены пороги. void flushBuffer(Buffer & buffer, bool check_thresholds); - bool checkThresholds(Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0); + bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const; + bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const; /// Аргумент table передаётся, так как иногда вычисляется заранее. Он должен соответствовать destination-у. void writeBlockToDestination(const Block & block, StoragePtr table); diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 95a79b52f28..65455caf7b0 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -238,6 +238,8 @@ private: void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, std::unique_lock && lock) { + time_t current_time = time(0); + /// Сортируем столбцы в блоке. Это нужно, чтобы было проще потом конкатенировать блоки. Block sorted_block = block.sortColumns(); @@ -245,7 +247,7 @@ private: { buffer.data = sorted_block.cloneEmpty(); } - else if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes())) + else if (storage.checkThresholds(buffer, current_time, sorted_block.rowsInFirstColumn(), sorted_block.bytes())) { /** Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер. * Это также защищает от неограниченного потребления оперативки, так как в случае невозможности записать в таблицу, @@ -258,7 +260,7 @@ private: } if (!buffer.first_write_time) - buffer.first_write_time = time(0); + buffer.first_write_time = current_time; appendBlock(sorted_block, buffer.data); } @@ -297,7 +299,7 @@ bool StorageBuffer::optimize(const Settings & settings) } -bool StorageBuffer::checkThresholds(Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) +bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const { time_t time_passed = 0; if (buffer.first_write_time) @@ -306,6 +308,12 @@ bool StorageBuffer::checkThresholds(Buffer & buffer, time_t current_time, size_t size_t rows = buffer.data.rowsInFirstColumn() + additional_rows; size_t bytes = buffer.data.bytes() + additional_bytes; + return checkThresholdsImpl(rows, bytes, time_passed); +} + + +bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const +{ return (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes) || (time_passed > max_thresholds.time || rows > max_thresholds.rows || bytes > max_thresholds.bytes); @@ -322,7 +330,7 @@ void StorageBuffer::flushAllBuffers(const bool check_thresholds) void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds) { Block block_to_write = buffer.data.cloneEmpty(); - time_t current_time = check_thresholds ? time(0) : 0; + time_t current_time = time(0); size_t rows = 0; size_t bytes = 0; @@ -344,7 +352,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds) if (check_thresholds) { - if (!checkThresholds(buffer, current_time)) + if (!checkThresholdsImpl(rows, bytes, time_passed)) return; } else diff --git a/dbms/tests/queries/0_stateless/00289_buffer_test.sh b/dbms/tests/queries/0_stateless/00289_buffer_test.sh index 240b3295914..5a7aaab07f3 100755 --- a/dbms/tests/queries/0_stateless/00289_buffer_test.sh +++ b/dbms/tests/queries/0_stateless/00289_buffer_test.sh @@ -5,7 +5,7 @@ clickhouse-client -n --query=" DROP TABLE IF EXISTS test.buffer; CREATE TABLE test.dst (x UInt64, d Date DEFAULT today()) ENGINE = MergeTree(d, x, 8192); - CREATE TABLE test.buffer (x UInt64, d Date DEFAULT today()) ENGINE = Buffer(test, dst, 16, 1, 100, 10000, 10, 1000, 100000); + CREATE TABLE test.buffer (x UInt64, d Date DEFAULT today()) ENGINE = Buffer(test, dst, 16, 1, 10, 100, 1000, 10000, 100000); "; seq 1 1000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n & From e7c435c27eaf251cfd14bec66a7954eb3655c3c1 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Tue, 8 Dec 2015 14:45:00 +0300 Subject: [PATCH 26/27] dbms: fix typo and absent support of Float64 for MongoDB dictionaries [#METR-18946] --- dbms/include/DB/Dictionaries/MongoDBBlockInputStream.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/include/DB/Dictionaries/MongoDBBlockInputStream.h b/dbms/include/DB/Dictionaries/MongoDBBlockInputStream.h index 6f36e6e2a1f..da20222c57e 100644 --- a/dbms/include/DB/Dictionaries/MongoDBBlockInputStream.h +++ b/dbms/include/DB/Dictionaries/MongoDBBlockInputStream.h @@ -76,7 +76,7 @@ public: types.push_back(value_type_t::Int64); else if (typeid_cast(type)) types.push_back(value_type_t::Float32); - else if (typeid_cast(type)) + else if (typeid_cast(type)) types.push_back(value_type_t::Float64); else if (typeid_cast(type)) types.push_back(value_type_t::String); From b7f496eb7b356a473ec4abf58eed64e81ffcbd9b Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Wed, 9 Dec 2015 15:30:06 +0300 Subject: [PATCH 27/27] dbms: fix MongoDB dictionary source for UInt8 [#METR-18946] --- .../DB/Dictionaries/MongoDBBlockInputStream.h | 55 ++++++++++++------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/dbms/include/DB/Dictionaries/MongoDBBlockInputStream.h b/dbms/include/DB/Dictionaries/MongoDBBlockInputStream.h index da20222c57e..3c309c5b5e6 100644 --- a/dbms/include/DB/Dictionaries/MongoDBBlockInputStream.h +++ b/dbms/include/DB/Dictionaries/MongoDBBlockInputStream.h @@ -127,9 +127,10 @@ private: for (const auto idx : ext::range(0, size)) { - const auto value = row[names[idx]]; + const auto & name = names[idx]; + const auto value = row[name]; if (value.ok()) - insertValue(columns[idx], types[idx], value); + insertValue(columns[idx], types[idx], value, name); else insertDefaultValue(columns[idx], *sample_columns[idx]); } @@ -142,26 +143,29 @@ private: return block; } - static void insertValue(IColumn * const column, const value_type_t type, const mongo::BSONElement & value) + static void insertValue( + IColumn * const column, const value_type_t type, const mongo::BSONElement & value, const mongo::StringData & name) { switch (type) { case value_type_t::UInt8: { - if (value.type() != mongo::Bool) + if (!value.isNumber() && value.type() != mongo::Bool) throw Exception{ - "Type mismatch, expected Bool, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected a number or Bool, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; - static_cast(column)->insert(value.boolean()); + static_cast(column)->insert(value.isNumber() ? value.numberInt() : value.boolean()); break; } case value_type_t::UInt16: { if (!value.isNumber()) throw Exception{ - "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; @@ -172,7 +176,8 @@ private: { if (!value.isNumber()) throw Exception{ - "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; @@ -183,7 +188,8 @@ private: { if (!value.isNumber()) throw Exception{ - "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; @@ -192,20 +198,22 @@ private: } case value_type_t::Int8: { - if (!value.isNumber()) + if (!value.isNumber() && value.type() != mongo::Bool) throw Exception{ - "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected a number or Bool, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; - static_cast(column)->insert(value.numberInt()); + static_cast(column)->insert(value.isNumber() ? value.numberInt() : value.numberInt()); break; } case value_type_t::Int16: { if (!value.isNumber()) throw Exception{ - "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; @@ -216,7 +224,8 @@ private: { if (!value.isNumber()) throw Exception{ - "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; @@ -227,7 +236,8 @@ private: { if (!value.isNumber()) throw Exception{ - "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; @@ -238,7 +248,8 @@ private: { if (!value.isNumber()) throw Exception{ - "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; @@ -249,7 +260,8 @@ private: { if (!value.isNumber()) throw Exception{ - "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; @@ -260,7 +272,8 @@ private: { if (value.type() != mongo::String) throw Exception{ - "Type mismatch, expected String, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected String, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; @@ -272,7 +285,8 @@ private: { if (value.type() != mongo::Date) throw Exception{ - "Type mismatch, expected Date, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected Date, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH }; @@ -284,7 +298,8 @@ private: { if (value.type() != mongo::Date) throw Exception{ - "Type mismatch, expected Date, got " + std::string{mongo::typeName(value.type())}, + "Type mismatch, expected Date, got " + std::string{mongo::typeName(value.type())} + + " for column " + name.toString(), ErrorCodes::TYPE_MISMATCH };