From c05cf01a9b250455efcfc2e8f3d68e1ec94de9b7 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 28 Nov 2017 05:14:50 +0300 Subject: [PATCH] SummingSortedBlockInputStream: miscellaneous [#CLICKHOUSE-2]. --- .../SummingSortedBlockInputStream.cpp | 29 +++++----------- .../SummingSortedBlockInputStream.h | 33 +++++++++++++++++-- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index 64a182b814e..226e8bf9409 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -62,12 +62,10 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(ColumnPlainPtrs & m } catch (...) { - desc.function->destroy(desc.state.data()); - desc.created = false; + desc.destroyState(); throw; } - desc.function->destroy(desc.state.data()); - desc.created = false; + desc.destroyState(); } else desc.merged_column->insertDefault(); @@ -123,8 +121,6 @@ Block SummingSortedBlockInputStream::readImpl() /// Additional initialization. if (current_row.empty()) { - auto & factory = AggregateFunctionFactory::instance(); - current_row.resize(num_columns); next_key.columns.resize(description.size()); @@ -178,12 +174,9 @@ Block SummingSortedBlockInputStream::readImpl() std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) { // Create aggregator to sum this column - auto desc = AggregateDescription{}; + AggregateDescription desc; desc.column_numbers = {i}; - desc.function = factory.get("sumWithOverflow", {column.type}); - desc.function->setArguments({column.type}); - desc.add_function = desc.function->getAddressOfAddFunction(); - desc.state.resize(desc.function->sizeOfData()); + desc.init("sumWithOverflow", {column.type}); columns_to_aggregate.emplace_back(std::move(desc)); } else @@ -218,8 +211,8 @@ Block SummingSortedBlockInputStream::readImpl() } DataTypes argument_types = {}; - auto desc = AggregateDescription{}; - auto map_desc = MapDescription{}; + AggregateDescription desc; + MapDescription map_desc; column_num_it = map.second.begin(); for (; column_num_it != map.second.end(); ++column_num_it) @@ -263,10 +256,7 @@ Block SummingSortedBlockInputStream::readImpl() if (map_desc.key_col_nums.size() == 1) { // Create summation for all value columns in the map - desc.function = factory.get("sumMap", argument_types); - desc.function->setArguments(argument_types); - desc.add_function = desc.function->getAddressOfAddFunction(); - desc.state.resize(desc.function->sizeOfData()); + desc.init("sumMap", argument_types); columns_to_aggregate.emplace_back(std::move(desc)); } else @@ -345,10 +335,7 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std: /// Reset aggregation states for next row for (auto & desc : columns_to_aggregate) - { - desc.function->create(desc.state.data()); - desc.created = true; - } + desc.createState(); // Start aggregations with current row addRow(current_row, current); diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.h b/dbms/src/DataStreams/SummingSortedBlockInputStream.h index e4207daf88d..fba61bccd3d 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.h @@ -4,6 +4,8 @@ #include #include #include +#include + namespace DB { @@ -79,12 +81,39 @@ private: std::vector state; bool created = false; + void init(const char * function_name, const DataTypes & argument_types) + { + function = AggregateFunctionFactory::instance().get(function_name, argument_types); + function->setArguments(argument_types); + add_function = function->getAddressOfAddFunction(); + state.resize(function->sizeOfData()); + } + + void createState() + { + if (created) + return; + function->create(state.data()); + created = true; + } + + void destroyState() + { + if (!created) + return; + function->destroy(state.data()); + created = false; + } + /// Explicitly destroy aggregation state if the stream is terminated ~AggregateDescription() { - if (created) - function->destroy(state.data()); + destroyState(); } + + AggregateDescription() = default; + AggregateDescription(AggregateDescription &&) = default; + AggregateDescription(const AggregateDescription &) = delete; }; /// Stores numbers of key-columns and value-columns.