Adding Info in FinalizingSimpleTransform

This commit is contained in:
Dmitry 2020-05-21 23:34:53 +03:00
parent a597e62d3b
commit ee22a3ad30
4 changed files with 28 additions and 26 deletions

View File

@ -1749,7 +1749,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
/// Forget about current totals and extremes. They will be calculated again after aggregation if needed.
pipeline.dropTotalsAndExtremes();
/// TODO better case determination
if (group_by_info && settings.optimize_aggregation_in_order)
{
auto & query = getSelectQuery();
@ -1771,7 +1770,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
return std::make_shared<AggregatingInOrderTransform>(header, transform_params, group_by_descr, settings.max_block_size, many_data, counter++);
});
/// TODO remove code duplication
for (auto & column_description : group_by_descr)
{
if (!column_description.column_name.empty())
@ -1797,13 +1795,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
});
}
if (final)
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FinalizingSimpleTransform>(header, transform_params);
});
}
pipeline.enableQuotaForCurrentStreams();
return;

View File

@ -16,7 +16,7 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
Block header, AggregatingTransformParamsPtr params_,
const SortDescription & group_by_description_, size_t res_block_size_,
ManyAggregatedDataPtr many_data_, size_t current_variant)
: IProcessor({std::move(header)}, {params_->getHeader(false)})
: IProcessor({std::move(header)}, {params_->getCustomHeader(false)})
, res_block_size(res_block_size_)
, params(std::move(params_))
, group_by_description(group_by_description_)
@ -24,7 +24,8 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
, many_data(std::move(many_data_))
, variants(*many_data->variants[current_variant])
{
res_header = params->getHeader(false);
/// We won't finalize states in order to merge same states (generated due to multi-thread execution) in AggregatingSortedTransform
res_header = params->getCustomHeader(false);
/// Replace column names to column position in description_sorted.
for (auto & column_description : group_by_description)
@ -56,7 +57,6 @@ static bool less(const MutableColumns & lhs, const Columns & rhs, size_t i, size
void AggregatingInOrderTransform::consume(Chunk chunk)
{
/// Find the position of last already read key in current chunk.
size_t rows = chunk.getNumRows();
if (rows == 0)
return;
@ -75,7 +75,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
size_t key_end = 0;
size_t key_begin = 0;
/// If we don't have a block we create it and fill with first key
if (!cur_block_size)
{
res_key_columns.resize(params->params.keys_size);
@ -85,7 +85,6 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
{
res_key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
}
for (size_t i = 0; i < params->params.aggregates_size; ++i)
{
res_aggregate_columns[i] = res_header.safeGetByPosition(i + params->params.keys_size).type->createColumn();
@ -96,11 +95,11 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
size_t mid = 0;
size_t high = 0;
size_t low = -1;
/// Will split block into segments with the same key
while (key_end != rows)
{
high = rows;
/// Find the first position of new key in current chunk
/// Find the first position of new (not current) key in current chunk
while (high - low > 1)
{
mid = (low + high) / 2;
@ -110,20 +109,18 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
high = mid;
}
key_end = high;
/// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block.
if (key_begin != key_end)
{
/// Add data to the state if segment is not empty (Empty when we were looking for last key in new block and haven't found it)
params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
}
low = key_begin = key_end;
/// We finalize last key aggregation state if a new key found.
if (key_begin != rows)
{
/// We finalize last key aggregation states if a new key found (Not found if high == rows)
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
/// If res_block_size is reached we have to stop consuming and generate the block. Save the extra rows into new chunk.
if (cur_block_size == res_block_size)
{
Columns source_columns = chunk.detachColumns();
@ -159,7 +156,7 @@ void AggregatingInOrderTransform::work()
}
}
/// TODO simplify prepare
IProcessor::Status AggregatingInOrderTransform::prepare()
{
auto & output = outputs.front();
@ -196,6 +193,7 @@ IProcessor::Status AggregatingInOrderTransform::prepare()
{
output.push(std::move(to_push_chunk));
output.finish();
LOG_TRACE(log, "Aggregated");
return Status::Finished;
}
if (input.isFinished())

View File

@ -63,15 +63,25 @@ private:
class FinalizingSimpleTransform : public ISimpleTransform
{
public:
FinalizingSimpleTransform(Block header, AggregatingTransformParamsPtr params)
: ISimpleTransform({std::move(header)}, {params->getHeader(true)}, true) {}
FinalizingSimpleTransform(Block header, AggregatingTransformParamsPtr params_)
: ISimpleTransform({std::move(header)}, {params_->getHeader()}, true)
, params(params_) {}
void transform(Chunk & chunk) override
{
if (!chunk.getChunkInfo())
{
auto info = std::make_shared<AggregatedChunkInfo>();
chunk.setChunkInfo(std::move(info));
}
if (params->final)
finalizeChunk(chunk);
}
String getName() const override { return "FinalizingSimpleTransform"; }
private:
AggregatingTransformParamsPtr params;
};

View File

@ -29,8 +29,7 @@ struct AggregatingTransformParams
Block getHeader() const { return aggregator.getHeader(final); }
/// TODO remove that logic
Block getHeader(bool final_) const { return aggregator.getHeader(final_); }
Block getCustomHeader(bool final_) const { return aggregator.getHeader(final_); }
};
struct ManyAggregatedData