SummingSortedBlockInputStream: miscellaneous [#CLICKHOUSE-2].

This commit is contained in:
Alexey Milovidov 2017-11-28 05:14:50 +03:00
parent 36d4f89c7a
commit c05cf01a9b
2 changed files with 39 additions and 23 deletions

View File

@ -62,12 +62,10 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(ColumnPlainPtrs & m
}
catch (...)
{
desc.function->destroy(desc.state.data());
desc.created = false;
desc.destroyState();
throw;
}
desc.function->destroy(desc.state.data());
desc.created = false;
desc.destroyState();
}
else
desc.merged_column->insertDefault();
@ -123,8 +121,6 @@ Block SummingSortedBlockInputStream::readImpl()
/// Additional initialization.
if (current_row.empty())
{
auto & factory = AggregateFunctionFactory::instance();
current_row.resize(num_columns);
next_key.columns.resize(description.size());
@ -178,12 +174,9 @@ Block SummingSortedBlockInputStream::readImpl()
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
{
// Create aggregator to sum this column
auto desc = AggregateDescription{};
AggregateDescription desc;
desc.column_numbers = {i};
desc.function = factory.get("sumWithOverflow", {column.type});
desc.function->setArguments({column.type});
desc.add_function = desc.function->getAddressOfAddFunction();
desc.state.resize(desc.function->sizeOfData());
desc.init("sumWithOverflow", {column.type});
columns_to_aggregate.emplace_back(std::move(desc));
}
else
@ -218,8 +211,8 @@ Block SummingSortedBlockInputStream::readImpl()
}
DataTypes argument_types = {};
auto desc = AggregateDescription{};
auto map_desc = MapDescription{};
AggregateDescription desc;
MapDescription map_desc;
column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
@ -263,10 +256,7 @@ Block SummingSortedBlockInputStream::readImpl()
if (map_desc.key_col_nums.size() == 1)
{
// Create summation for all value columns in the map
desc.function = factory.get("sumMap", argument_types);
desc.function->setArguments(argument_types);
desc.add_function = desc.function->getAddressOfAddFunction();
desc.state.resize(desc.function->sizeOfData());
desc.init("sumMap", argument_types);
columns_to_aggregate.emplace_back(std::move(desc));
}
else
@ -345,10 +335,7 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
/// Reset aggregation states for next row
for (auto & desc : columns_to_aggregate)
{
desc.function->create(desc.state.data());
desc.created = true;
}
desc.createState();
// Start aggregations with current row
addRow(current_row, current);

View File

@ -4,6 +4,8 @@
#include <Core/ColumnNumbers.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
namespace DB
{
@ -79,12 +81,39 @@ private:
std::vector<char> state;
bool created = false;
void init(const char * function_name, const DataTypes & argument_types)
{
function = AggregateFunctionFactory::instance().get(function_name, argument_types);
function->setArguments(argument_types);
add_function = function->getAddressOfAddFunction();
state.resize(function->sizeOfData());
}
void createState()
{
if (created)
return;
function->create(state.data());
created = true;
}
void destroyState()
{
if (!created)
return;
function->destroy(state.data());
created = false;
}
/// Explicitly destroy aggregation state if the stream is terminated
~AggregateDescription()
{
if (created)
function->destroy(state.data());
destroyState();
}
AggregateDescription() = default;
AggregateDescription(AggregateDescription &&) = default;
AggregateDescription(const AggregateDescription &) = delete;
};
/// Stores numbers of key-columns and value-columns.