return multiple blocks

This commit is contained in:
Dmitry 2020-05-08 22:46:52 +03:00
parent 015a3555c6
commit 0286b60ed6
4 changed files with 113 additions and 100 deletions

View File

@ -1752,7 +1752,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingInOrderTransform>(header, transform_params, group_by_descr, group_by_descr);
return std::make_shared<AggregatingInOrderTransform>(header, transform_params, group_by_descr, group_by_descr, settings.max_block_size);
});
pipeline.enableQuotaForCurrentStreams();

View File

@ -6,8 +6,9 @@ namespace DB
AggregatingInOrderTransform::AggregatingInOrderTransform(
Block header, AggregatingTransformParamsPtr params_, SortDescription & sort_description_,
SortDescription & group_by_description_)
SortDescription & group_by_description_, size_t max_block_size_)
: IProcessor({std::move(header)}, {params_->getHeader()})
, max_block_size(max_block_size_)
, params(std::move(params_))
, sort_description(sort_description_)
, group_by_description(group_by_description_)
@ -15,8 +16,6 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
, many_data(std::make_shared<ManyAggregatedData>(1))
, variants(*many_data->variants[0])
{
// std::cerr << "AggregatingInOrderTransform\n";
Block res_header = params->getHeader();
/// Replace column names to column position in description_sorted.
@ -28,18 +27,6 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
column_description.column_name.clear();
}
}
res_key_columns.resize(params->params.keys_size);
res_aggregate_columns.resize(params->params.aggregates_size);
for (size_t i = 0; i < params->params.keys_size; ++i)
{
res_key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
}
for (size_t i = 0; i < params->params.aggregates_size; ++i)
{
res_aggregate_columns[i] = params->aggregator.aggregate_functions[i]->getReturnType()->createColumn();
}
}
AggregatingInOrderTransform::~AggregatingInOrderTransform() = default;
@ -58,12 +45,9 @@ static bool less(const MutableColumns & lhs, const Columns & rhs, size_t i, size
return false;
}
/// TODO maybe move all things inside the Aggregator?
void AggregatingInOrderTransform::consume(Chunk chunk)
{
// std::cerr << "\nchunk " << x++ << " of size " << chunk.getNumRows() << "\n";
// sz += chunk.getNumRows();
/// Find the position of last already read key in current chunk.
size_t rows = chunk.getNumRows();
@ -79,15 +63,25 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
}
Aggregator::AggregateFunctionInstructions aggregate_function_instructions;
params->aggregator.prepareAggregateInstructions(chunk.detachColumns(), aggregate_columns, materialized_columns, aggregate_function_instructions);
params->aggregator.prepareAggregateInstructions(chunk.getColumns(), aggregate_columns, materialized_columns, aggregate_function_instructions);
size_t key_end = 0;
size_t key_begin = 0;
if (!res_block_size)
{
// std::cerr << "Creating first state with key " << key_begin << "\n";
LOG_TRACE(log, "AggregatingInOrder");
res_key_columns.resize(params->params.keys_size);
res_aggregate_columns.resize(params->params.aggregates_size);
for (size_t i = 0; i < params->params.keys_size; ++i)
{
res_key_columns[i] = params->getHeader().safeGetByPosition(i).type->createColumn();
}
for (size_t i = 0; i < params->params.aggregates_size; ++i)
{
res_aggregate_columns[i] = params->aggregator.aggregate_functions[i]->getReturnType()->createColumn();
}
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
++res_block_size;
}
@ -98,27 +92,19 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
while (key_end != rows)
{
high = rows;
/// Find the first position of new key in current chunk
while (high - low > 1)
{
mid = (low + high) / 2;
// std::cerr << "Comparing last key and row " << mid << "\n";
if (!less(res_key_columns, key_columns, res_block_size - 1, mid, group_by_description))
{
low = mid;
}
else
{
high = mid;
}
}
key_end = high;
if (key_begin != key_end)
{
// std::cerr << "Executing from " << key_begin << " to " << key_end << "\n";
/// Add data to the state if segment is not empty (Empty when we were looking for last key in new block and haven't found it)
params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
}
@ -127,19 +113,98 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
if (key_begin != rows)
{
// std::cerr << "Finalizing the last state.\n";
/// We finalize last key aggregation states if a new key found (Not found if high == rows)
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
// std::cerr << "Creating state with key " << key_begin << "\n";
if (res_block_size == max_block_size) {
Columns source_columns = chunk.detachColumns();
for (auto & source_column : source_columns)
source_column = source_column->cut(key_begin, rows - key_begin);
current_chunk = Chunk(source_columns, rows - key_begin);
block_end_reached = true;
need_generate = true;
res_block_size = 0;
return;
}
/// We create a new state for the new key and update res_key_columns
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
++res_block_size;
}
}
block_end_reached = false;
}
void AggregatingInOrderTransform::work()
{
if (is_consume_finished || need_generate)
{
generate();
}
else
{
consume(std::move(current_chunk));
}
}
/// TODO less complicated
IProcessor::Status AggregatingInOrderTransform::prepare()
{
auto & output = outputs.front();
auto & input = inputs.back();
/// Check can output.
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
if (block_end_reached)
{
if (need_generate)
{
return Status::Ready;
}
else
{
output.push(std::move(to_push_chunk));
return Status::Ready;
}
}
if (!block_end_reached)
{
if (is_consume_finished)
{
output.push(std::move(to_push_chunk));
output.finish();
return Status::Finished;
}
if (input.isFinished())
{
is_consume_finished = true;
return Status::Ready;
}
}
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull(!is_consume_finished);
return Status::Ready;
}
/// Convert block to chunk.
/// Adds additional info about aggregation.
Chunk convertToChunk(const Block & block)
@ -155,71 +220,10 @@ Chunk convertToChunk(const Block & block)
return chunk;
}
void AggregatingInOrderTransform::work()
{
if (is_consume_finished)
{
generate();
}
else
{
consume(std::move(current_chunk));
}
}
IProcessor::Status AggregatingInOrderTransform::prepare()
{
auto & output = outputs.front();
/// Last output is current. All other outputs should already be closed.
auto & input = inputs.back();
/// Check can output.
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// Get chunk from input.
if (input.isFinished() && !is_consume_finished)
{
is_consume_finished = true;
return Status::Ready;
}
if (is_consume_finished)
{
/// TODO many blocks
output.push(std::move(current_chunk));
output.finish();
return Status::Finished;
}
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull();
return Status::Ready;
}
void AggregatingInOrderTransform::generate()
{
// std::cerr << sz << "\n";
// std::cerr << "\nFinalizing the last state in generate().\n";
if (res_block_size)
if (res_block_size && is_consume_finished)
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
LOG_TRACE(log, "Aggregated");
@ -233,7 +237,8 @@ void AggregatingInOrderTransform::generate()
{
res.getByPosition(i + res_key_columns.size()).column = std::move(res_aggregate_columns[i]);
}
current_chunk = convertToChunk(res);
to_push_chunk = convertToChunk(res);
need_generate = false;
}
}

View File

@ -11,8 +11,8 @@ class AggregatingInOrderTransform : public IProcessor
{
public:
AggregatingInOrderTransform(Block header, AggregatingTransformParamsPtr params,
SortDescription & sort_description, SortDescription & group_by_description);
AggregatingInOrderTransform(Block header, AggregatingTransformParamsPtr params, SortDescription & sort_description,
SortDescription & group_by_description, size_t max_block_size);
~AggregatingInOrderTransform() override;
@ -29,7 +29,8 @@ private:
// size_t x = 1;
// size_t sz = 0;
size_t res_block_size{};
size_t max_block_size;
size_t res_block_size = 0;
MutableColumns res_key_columns;
MutableColumns res_aggregate_columns;
@ -44,9 +45,12 @@ private:
ManyAggregatedDataPtr many_data;
AggregatedDataVariants & variants;
bool need_generate = false;
bool block_end_reached = false;
bool is_consume_finished = false;
Chunk current_chunk;
Chunk to_push_chunk;
Logger * log = &Logger::get("AggregatingInOrderTransform");
};

View File

@ -55,6 +55,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
int read_direction = required_sort_description.at(0).direction;
size_t prefix_size = std::min(required_sort_description.size(), sorting_key_columns.size());
for (size_t i = 0; i < prefix_size; ++i)
{
if (forbidden_columns.count(required_sort_description[i].column_name))
@ -71,7 +72,6 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
bool found_function = false;
for (const auto & action : elements_actions[i]->getActions())
{
std::cerr << action.toString() << "\n";
if (action.type != ExpressionAction::APPLY_FUNCTION)
continue;
@ -82,6 +82,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
}
else
found_function = true;
if (action.argument_names.size() != 1 || action.argument_names.at(0) != sorting_key_columns[i])
{
current_direction = 0;
@ -94,6 +95,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
current_direction = 0;
break;
}
auto monotonicity = func.getMonotonicityForRange(*func.getArgumentTypes().at(0), {}, {});
if (!monotonicity.is_monotonic)
{
@ -106,8 +108,10 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
if (!found_function)
current_direction = 0;
if (!current_direction || (i > 0 && current_direction != read_direction))
break;
if (i == 0)
read_direction = current_direction;