Better semantic of sharing columns: development [#CLICKHOUSE-2].

This commit is contained in:
Alexey Milovidov 2017-12-15 05:04:28 +03:00
parent 6756b37925
commit a92055c768
2 changed files with 31 additions and 22 deletions

View File

@ -43,7 +43,8 @@ static void finalize(Block & block)
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithTypeAndName & current = block.safeGetByPosition(i);
ColumnAggregateFunction * unfinalized_column = typeid_cast<ColumnAggregateFunction *>(&*current.column);
const ColumnAggregateFunction * unfinalized_column = typeid_cast<const ColumnAggregateFunction *>(current.column.get());
if (unfinalized_column)
{
current.type = unfinalized_column->getAggregateFunction()->getReturnType();
@ -69,8 +70,8 @@ const Block & TotalsHavingBlockInputStream::getTotals()
addToTotals(current_totals, overflow_aggregates, nullptr);
}
finalize(current_totals);
totals = current_totals;
totals = header.cloneWithColumns(std::move(current_totals));
finalize(totals);
}
if (totals && expression)
@ -89,6 +90,9 @@ Block TotalsHavingBlockInputStream::readImpl()
{
block = children[0]->read();
if (!header)
header = block.cloneEmpty();
/// Block with values not included in `max_rows_to_group_by`. We'll postpone it.
if (overflow_row && block && block.info.is_overflows)
{
@ -119,7 +123,7 @@ Block TotalsHavingBlockInputStream::readImpl()
if (ColumnPtr materialized = filter_column_ptr->convertToFullColumnIfConst())
filter_column_ptr = materialized;
FilterDescription filter_description(filter_column_ptr);
FilterDescription filter_description(*filter_column_ptr);
/// Add values to `totals` (if it was not already done).
if (totals_mode == TotalsMode::BEFORE_HAVING)
@ -150,26 +154,27 @@ Block TotalsHavingBlockInputStream::readImpl()
}
}
void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, const IColumn::Filter * filter)
void TotalsHavingBlockInputStream::addToTotals(MutableColumns & totals, const Block & block, const IColumn::Filter * filter)
{
bool init = !totals;
bool need_init = totals.empty();
ArenaPtr arena;
if (init)
if (need_init)
arena = std::make_shared<Arena>();
for (size_t i = 0; i < block.columns(); ++i)
for (size_t i = 0, num_columns = block.columns(); i < num_columns; ++i)
{
const ColumnWithTypeAndName & current = block.safeGetByPosition(i);
const ColumnAggregateFunction * column = typeid_cast<const ColumnAggregateFunction *>(&*current.column);
const ColumnAggregateFunction * column = typeid_cast<const ColumnAggregateFunction *>(current.column.get());
if (!column)
{
if (init)
if (need_init)
{
ColumnPtr new_column = current.type->createColumn();
MutableColumnPtr new_column = current.type->createColumn();
current.type->insertDefaultInto(*new_column);
totals.insert(ColumnWithTypeAndName(new_column, current.type, current.name));
totals.emplace_back(std::move(new_column));
}
continue;
}
@ -177,26 +182,28 @@ void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, co
IAggregateFunction * function;
AggregateDataPtr data;
if (init)
/// Create ColumnAggregateFunction with one value.
if (need_init)
{
function = column->getAggregateFunction().get();
auto target = ColumnAggregateFunction::create(column->getAggregateFunction(), Arenas(1, arena));
totals.insert(ColumnWithTypeAndName(target, current.type, current.name));
data = arena->alloc(function->sizeOfData());
function->create(data);
target->getData().push_back(data);
totals.emplace_back(std::move(target));
}
else
{
auto target = typeid_cast<ColumnAggregateFunction *>(totals.safeGetByPosition(i).column.get());
if (!target)
throw Exception("Unexpected type of column: " + totals.safeGetByPosition(i).column->getName(),
ErrorCodes::ILLEGAL_COLUMN);
function = target->getAggregateFunction().get();
data = target->getData()[0];
auto & target = typeid_cast<ColumnAggregateFunction &>(*totals[i]);
function = target.getAggregateFunction().get();
data = target.getData()[0];
}
/// Accumulate all aggregate states into that value.
const ColumnAggregateFunction::Container_t & vec = column->getData();
size_t size = vec.size();

View File

@ -42,16 +42,18 @@ private:
size_t passed_keys = 0;
size_t total_keys = 0;
Block header;
/** Here are the values that did not pass max_rows_to_group_by.
* They are added or not added to the current_totals, depending on the totals_mode.
*/
Block overflow_aggregates;
/// Here, total values are accumulated. After the work is finished, they will be placed in IProfilingBlockInputStream::totals.
Block current_totals;
MutableColumns current_totals;
/// If filter == nullptr - add all rows. Otherwise, only the rows that pass the filter (HAVING).
void addToTotals(Block & totals, Block & block, const IColumn::Filter * filter);
void addToTotals(MutableColumns & totals, const Block & block, const IColumn::Filter * filter);
};
}