remove code duplicates

This commit is contained in:
CurtizJ 2018-09-07 17:18:15 +03:00
parent 41ded9273e
commit 3e854df5a5
4 changed files with 42 additions and 40 deletions

View File

@ -1,4 +1,5 @@
#include <DataStreams/RollupBlockInputStream.h>
#include <DataStreams/finalizeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/FilterDescription.h>
@ -6,23 +7,7 @@
namespace DB
{
static void finalize(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithTypeAndName & current = block.getByPosition(i);
const DataTypeAggregateFunction * unfinalized_type = typeid_cast<const DataTypeAggregateFunction *>(current.type.get());
if (unfinalized_type)
{
current.type = unfinalized_type->getReturnType();
if (current.column)
current.column = typeid_cast<const ColumnAggregateFunction &>(*current.column).convertToValues();
}
}
}
RollupBlockInputStream::RollupBlockInputStream(
const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : aggregator(params_),
keys(params_.keys)
@ -36,7 +21,7 @@ RollupBlockInputStream::RollupBlockInputStream(
Block RollupBlockInputStream::getHeader() const
{
Block res = children.at(0)->getHeader();
finalize(res);
finalizeBlock(res);
return res;
}
@ -58,7 +43,7 @@ Block RollupBlockInputStream::readImpl()
rollup_block = aggregator.mergeBlocks(rollup_blocks, false);
Block finalized = rollup_block;
finalize(finalized);
finalizeBlock(finalized);
return finalized;
}
@ -66,7 +51,7 @@ Block RollupBlockInputStream::readImpl()
current_key = keys.size() - 1;
rollup_block = block;
finalize(block);
finalizeBlock(block);
return block;
}

View File

@ -1,4 +1,5 @@
#include <DataStreams/TotalsHavingBlockInputStream.h>
#include <DataStreams/finalizeBlock.h>
#include <Interpreters/ExpressionActions.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
@ -53,23 +54,6 @@ TotalsHavingBlockInputStream::TotalsHavingBlockInputStream(
}
static void finalize(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithTypeAndName & current = block.getByPosition(i);
const DataTypeAggregateFunction * unfinalized_type = typeid_cast<const DataTypeAggregateFunction *>(current.type.get());
if (unfinalized_type)
{
current.type = unfinalized_type->getReturnType();
if (current.column)
current.column = typeid_cast<const ColumnAggregateFunction &>(*current.column).convertToValues();
}
}
}
Block TotalsHavingBlockInputStream::getTotals()
{
if (!totals)
@ -87,7 +71,7 @@ Block TotalsHavingBlockInputStream::getTotals()
}
totals = children.at(0)->getHeader().cloneWithColumns(std::move(current_totals));
finalize(totals);
finalizeBlock(totals);
}
if (totals && expression)
@ -101,7 +85,7 @@ Block TotalsHavingBlockInputStream::getHeader() const
{
Block res = children.at(0)->getHeader();
if (final)
finalize(res);
finalizeBlock(res);
if (expression)
expression->execute(res);
return res;
@ -129,7 +113,7 @@ Block TotalsHavingBlockInputStream::readImpl()
finalized = block;
if (final)
finalize(finalized);
finalizeBlock(finalized);
total_keys += finalized.rows();

View File

@ -0,0 +1,24 @@
#include <DataStreams/finalizeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Common/typeid_cast.h>
namespace DB
{
void finalizeBlock(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithTypeAndName & current = block.getByPosition(i);
const DataTypeAggregateFunction * unfinalized_type = typeid_cast<const DataTypeAggregateFunction *>(current.type.get());
if (unfinalized_type)
{
current.type = unfinalized_type->getReturnType();
if (current.column)
current.column = typeid_cast<const ColumnAggregateFunction &>(*current.column).convertToValues();
}
}
}
}

View File

@ -0,0 +1,9 @@
#pragma once
#include <Core/Block.h>
namespace DB
{
/// Converts aggregate function columns with non-finalized states to final values
void finalizeBlock(Block & block);
}