fixes due to review

This commit is contained in:
yariks5s 2024-06-10 18:09:07 +00:00
parent ac480084a5
commit 68e1d8701c
11 changed files with 128 additions and 181 deletions

View File

@ -632,7 +632,10 @@ BlockIO InterpreterInsertQuery::execute()
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
return std::make_shared<ApplySquashingTransform>(in_header);
return std::make_shared<ApplySquashingTransform>(
in_header,
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
});
}
@ -685,7 +688,10 @@ BlockIO InterpreterInsertQuery::execute()
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
auto squashing = std::make_shared<ApplySquashingTransform>(chain.getInputHeader());
auto squashing = std::make_shared<ApplySquashingTransform>(
chain.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
chain.addSource(std::move(squashing));

View File

@ -10,77 +10,30 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
ApplySquashing::ApplySquashing(Block header_)
Squashing::Squashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_)
: header(header_)
, min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
{
}
Chunk ApplySquashing::add(Chunk && input_chunk)
Chunk Squashing::flush()
{
return convertToChunk(std::move(chunks_to_merge_vec));
}
Chunk Squashing::squash(Chunk && input_chunk)
{
if (!input_chunk.hasChunkInfo())
return Chunk();
const auto *info = getInfoFromChunk(input_chunk);
append(info->chunks);
squash(info->chunks);
return std::move(accumulated_chunk);
}
void ApplySquashing::append(std::vector<Chunk> & input_chunks)
{
accumulated_chunk = {};
std::vector<IColumn::MutablePtr> mutable_columns = {};
size_t rows = 0;
for (const Chunk & chunk : input_chunks)
rows += chunk.getNumRows();
{
auto & first_chunk = input_chunks[0];
Columns columns = first_chunk.detachColumns();
for (size_t i = 0; i < columns.size(); ++i)
{
mutable_columns.push_back(IColumn::mutate(std::move(columns[i])));
mutable_columns[i]->reserve(rows);
}
}
for (size_t i = 1; i < input_chunks.size(); ++i) // We've already processed the first chunk above
{
Columns columns = input_chunks[i].detachColumns();
for (size_t j = 0, size = mutable_columns.size(); j < size; ++j)
{
const auto source_column = columns[j];
mutable_columns[j]->insertRangeFrom(*source_column, 0, source_column->size());
}
}
accumulated_chunk.setColumns(std::move(mutable_columns), rows);
}
const ChunksToSquash* ApplySquashing::getInfoFromChunk(const Chunk & chunk)
{
const auto& info = chunk.getChunkInfo();
const auto * agg_info = typeid_cast<const ChunksToSquash *>(info.get());
if (!agg_info)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no ChunksToSquash in ChunkInfoPtr");
return agg_info;
}
PlanSquashing::PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_)
: min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
, header(header_)
{
}
Chunk PlanSquashing::flush()
{
return convertToChunk(std::move(chunks_to_merge_vec));
}
Chunk PlanSquashing::add(Chunk && input_chunk)
Chunk Squashing::add(Chunk && input_chunk)
{
if (!input_chunk)
return {};
@ -131,7 +84,7 @@ Chunk PlanSquashing::add(Chunk && input_chunk)
return {};
}
Chunk PlanSquashing::convertToChunk(std::vector<Chunk> && chunks)
Chunk Squashing::convertToChunk(std::vector<Chunk> && chunks) const
{
if (chunks.empty())
return {};
@ -144,19 +97,61 @@ Chunk PlanSquashing::convertToChunk(std::vector<Chunk> && chunks)
return Chunk(header.cloneEmptyColumns(), 0, info);
}
void PlanSquashing::expandCurrentSize(size_t rows, size_t bytes)
void Squashing::squash(std::vector<Chunk> & input_chunks)
{
accumulated_chunk = {};
std::vector<IColumn::MutablePtr> mutable_columns = {};
size_t rows = 0;
for (const Chunk & chunk : input_chunks)
rows += chunk.getNumRows();
{
auto & first_chunk = input_chunks[0];
Columns columns = first_chunk.detachColumns();
for (size_t i = 0; i < columns.size(); ++i)
{
mutable_columns.push_back(IColumn::mutate(std::move(columns[i])));
mutable_columns[i]->reserve(rows);
}
}
for (size_t i = 1; i < input_chunks.size(); ++i) // We've already processed the first chunk above
{
Columns columns = input_chunks[i].detachColumns();
for (size_t j = 0, size = mutable_columns.size(); j < size; ++j)
{
const auto source_column = columns[j];
mutable_columns[j]->insertRangeFrom(*source_column, 0, source_column->size());
}
}
accumulated_chunk.setColumns(std::move(mutable_columns), rows);
}
const ChunksToSquash* Squashing::getInfoFromChunk(const Chunk & chunk)
{
const auto& info = chunk.getChunkInfo();
const auto * agg_info = typeid_cast<const ChunksToSquash *>(info.get());
if (!agg_info)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no ChunksToSquash in ChunkInfoPtr");
return agg_info;
}
void Squashing::expandCurrentSize(size_t rows, size_t bytes)
{
accumulated_size.rows += rows;
accumulated_size.bytes += bytes;
}
void PlanSquashing::changeCurrentSize(size_t rows, size_t bytes)
void Squashing::changeCurrentSize(size_t rows, size_t bytes)
{
accumulated_size.rows = rows;
accumulated_size.bytes = bytes;
}
bool PlanSquashing::isEnoughSize(size_t rows, size_t bytes) const
bool Squashing::isEnoughSize(size_t rows, size_t bytes) const
{
return (!min_block_size_rows && !min_block_size_bytes)
|| (min_block_size_rows && rows >= min_block_size_rows)

View File

@ -26,39 +26,23 @@ struct ChunksToSquash : public ChunkInfo
* Order of data is kept.
*/
class ApplySquashing
class Squashing
{
public:
explicit ApplySquashing(Block header_);
Chunk add(Chunk && input_chunk);
Block header;
private:
Chunk accumulated_chunk;
const ChunksToSquash * getInfoFromChunk(const Chunk & chunk);
void append(std::vector<Chunk> & input_chunks);
bool isEnoughSize(const Block & block);
bool isEnoughSize(size_t rows, size_t bytes) const;
};
class PlanSquashing
{
public:
explicit PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_);
PlanSquashing(PlanSquashing && other) = default;
explicit Squashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_);
Squashing(Squashing && other) = default;
Chunk add(Chunk && input_chunk);
Chunk squash(Chunk && input_chunk);
Chunk flush();
bool isDataLeft()
{
return !chunks_to_merge_vec.empty();
}
Block header;
private:
struct CurrentSize
{
@ -70,14 +54,18 @@ private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
const Block header;
CurrentSize accumulated_size;
Chunk accumulated_chunk;
const ChunksToSquash * getInfoFromChunk(const Chunk & chunk);
void squash(std::vector<Chunk> & input_chunks);
void expandCurrentSize(size_t rows, size_t bytes);
void changeCurrentSize(size_t rows, size_t bytes);
bool isEnoughSize(size_t rows, size_t bytes) const;
Chunk convertToChunk(std::vector<Chunk> && chunks);
Chunk convertToChunk(std::vector<Chunk> && chunks) const;
};
}

View File

@ -9,9 +9,9 @@ namespace DB
class ApplySquashingTransform : public ExceptionKeepingTransform
{
public:
explicit ApplySquashingTransform(const Block & header)
explicit ApplySquashingTransform(const Block & header, const size_t min_block_size_rows, const size_t min_block_size_bytes)
: ExceptionKeepingTransform(header, header, false)
, squashing(header)
, squashing(header, min_block_size_rows, min_block_size_bytes)
{
}
@ -37,7 +37,7 @@ public:
protected:
void onConsume(Chunk chunk) override
{
if (auto res_chunk = squashing.add(std::move(chunk)))
if (auto res_chunk = squashing.squash(std::move(chunk)))
cur_chunk.setColumns(res_chunk.getColumns(), res_chunk.getNumRows());
}
@ -50,12 +50,12 @@ protected:
}
void onFinish() override
{
auto chunk = squashing.add({});
auto chunk = squashing.squash({});
finish_chunk.setColumns(chunk.getColumns(), chunk.getNumRows());
}
private:
ApplySquashing squashing;
Squashing squashing;
Chunk cur_chunk;
Chunk finish_chunk;
};

View File

@ -11,7 +11,7 @@ namespace ErrorCodes
}
PlanSquashingTransform::PlanSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), balance(header, min_block_size_rows, min_block_size_bytes)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), squashing(header, min_block_size_rows, min_block_size_bytes)
{
}
@ -29,9 +29,6 @@ IProcessor::Status PlanSquashingTransform::prepare()
case READ_IF_CAN:
status = prepareConsume();
break;
case WAIT_IN:
planning_status = PlanningStatus::READ_IF_CAN;
return Status::NeedData;
case PUSH:
return sendOrFlush();
case FLUSH:
@ -64,17 +61,21 @@ void PlanSquashingTransform::init()
IProcessor::Status PlanSquashingTransform::prepareConsume()
{
bool inputs_have_no_data = true, all_finished = true;
bool all_finished = true;
for (auto & input : inputs)
{
if (!input.isFinished())
all_finished = false;
else
{
input.setNeeded();
continue;
}
if (input.hasData())
{
inputs_have_no_data = false;
chunk = input.pull();
transform(chunk);
chunk = transform(std::move(chunk));
if (chunk.hasChunkInfo())
{
@ -86,62 +87,27 @@ IProcessor::Status PlanSquashingTransform::prepareConsume()
if (all_finished) /// If all inputs are closed, we check if we have data in balancing
{
if (balance.isDataLeft()) /// If we have data in balancing, we process this data
if (squashing.isDataLeft()) /// If we have data in balancing, we process this data
{
planning_status = PlanningStatus::FLUSH;
flushChunk();
return Status::Ready;
}
planning_status = PlanningStatus::PUSH;
return Status::Ready;
}
if (inputs_have_no_data)
planning_status = PlanningStatus::WAIT_IN;
return Status::Ready;
}
IProcessor::Status PlanSquashingTransform::waitForDataIn()
{
bool all_finished = true;
bool inputs_have_no_data = true;
for (auto & input : inputs)
{
if (input.isFinished())
continue;
all_finished = false;
if (input.hasData())
inputs_have_no_data = false;
}
if (all_finished)
{
planning_status = PlanningStatus::READ_IF_CAN;
return Status::Ready;
}
if (!inputs_have_no_data)
{
planning_status = PlanningStatus::READ_IF_CAN;
planning_status = PlanningStatus::FINISH;
return Status::Ready;
}
return Status::NeedData;
}
void PlanSquashingTransform::transform(Chunk & chunk_)
Chunk PlanSquashingTransform::transform(Chunk && chunk_)
{
Chunk res_chunk = balance.add(std::move(chunk_));
std::swap(res_chunk, chunk_);
return squashing.add(std::move(chunk_));
}
void PlanSquashingTransform::flushChunk()
Chunk PlanSquashingTransform::flushChunk()
{
Chunk res_chunk = balance.flush();
std::swap(res_chunk, chunk);
return squashing.flush();
}
IProcessor::Status PlanSquashingTransform::sendOrFlush()

View File

@ -8,7 +8,6 @@ enum PlanningStatus
{
INIT,
READ_IF_CAN,
WAIT_IN,
PUSH,
FLUSH,
FINISH
@ -36,12 +35,12 @@ public:
Status waitForDataIn();
Status finish();
void transform(Chunk & chunk);
void flushChunk();
Chunk transform(Chunk && chunk);
Chunk flushChunk();
private:
Chunk chunk;
PlanSquashing balance;
Squashing squashing;
PlanningStatus planning_status = PlanningStatus::INIT;
};
}

View File

@ -12,16 +12,15 @@ extern const int LOGICAL_ERROR;
SquashingTransform::SquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ExceptionKeepingTransform(header, header, false)
, planSquashing(header, min_block_size_rows, min_block_size_bytes)
, applySquashing(header)
, squashing(header, min_block_size_rows, min_block_size_bytes)
{
}
void SquashingTransform::onConsume(Chunk chunk)
{
Chunk planned_chunk = planSquashing.add(std::move(chunk));
Chunk planned_chunk = squashing.add(std::move(chunk));
if (planned_chunk.hasChunkInfo())
cur_chunk = applySquashing.add(std::move(planned_chunk));
cur_chunk = squashing.squash(std::move(planned_chunk));
}
SquashingTransform::GenerateResult SquashingTransform::onGenerate()
@ -34,9 +33,9 @@ SquashingTransform::GenerateResult SquashingTransform::onGenerate()
void SquashingTransform::onFinish()
{
Chunk chunk = planSquashing.flush();
Chunk chunk = squashing.flush();
if (chunk.hasChunkInfo())
chunk = applySquashing.add(std::move(chunk));
chunk = squashing.squash(std::move(chunk));
finish_chunk.setColumns(chunk.getColumns(), chunk.getNumRows());
}
@ -60,8 +59,7 @@ void SquashingTransform::work()
SimpleSquashingTransform::SimpleSquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ISimpleTransform(header, header, false)
, planSquashing(header, min_block_size_rows, min_block_size_bytes)
, applySquashing(header)
, squashing(header, min_block_size_rows, min_block_size_bytes)
{
}
@ -69,18 +67,18 @@ void SimpleSquashingTransform::transform(Chunk & chunk)
{
if (!finished)
{
Chunk planned_chunk = planSquashing.add(std::move(chunk));
Chunk planned_chunk = squashing.add(std::move(chunk));
if (planned_chunk.hasChunkInfo())
chunk = applySquashing.add(std::move(planned_chunk));
chunk = squashing.squash(std::move(planned_chunk));
}
else
{
if (chunk.hasRows())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost");
chunk = planSquashing.flush();
chunk = squashing.flush();
if (chunk.hasChunkInfo())
chunk = applySquashing.add(std::move(chunk));
chunk = squashing.squash(std::move(chunk));
}
}

View File

@ -24,8 +24,7 @@ protected:
void onFinish() override;
private:
PlanSquashing planSquashing;
ApplySquashing applySquashing;
Squashing squashing;
Chunk cur_chunk;
Chunk finish_chunk;
};
@ -44,8 +43,7 @@ protected:
IProcessor::Status prepare() override;
private:
PlanSquashing planSquashing;
ApplySquashing applySquashing;
Squashing squashing;
bool finished = false;
};

View File

@ -885,22 +885,21 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro
using PushResult = AsynchronousInsertQueue::PushResult;
startInsertQuery();
PlanSquashing plan_squashing(state.input_header, 0, query_context->getSettingsRef().async_insert_max_data_size);
ApplySquashing apply_squashing(state.input_header);
Squashing squashing(state.input_header, 0, query_context->getSettingsRef().async_insert_max_data_size);
while (readDataNext())
{
apply_squashing.header = state.block_for_insert;
auto planned_chunk = plan_squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()});
squashing.header = state.block_for_insert;
auto planned_chunk = squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()});
if (planned_chunk.hasChunkInfo())
{
Chunk result_chunk = apply_squashing.add(std::move(planned_chunk));
Chunk result_chunk = squashing.squash(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
if (result_chunk.hasColumns() && state.block_for_insert)
for (size_t j = 0; j < result_chunk.getNumColumns(); ++j)
cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.block_for_insert.getDataTypes()[j], state.block_for_insert.getNames()[j]));
auto result = Block(cols);
apply_squashing.header = Block(state.block_for_insert);
squashing.header = Block(state.block_for_insert);
return PushResult
{
.status = PushResult::TOO_MUCH_DATA,
@ -909,14 +908,14 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro
}
}
auto planned_chunk = plan_squashing.flush();
auto planned_chunk = squashing.flush();
Chunk result_chunk;
if (planned_chunk.hasChunkInfo())
result_chunk = apply_squashing.add(std::move(planned_chunk));
result_chunk = squashing.squash(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
if (result_chunk.hasColumns())
for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j)
cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], apply_squashing.header.getDataTypes()[j], apply_squashing.header.getNames()[j]));
cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], squashing.header.getDataTypes()[j], squashing.header.getNames()[j]));
auto result = Block(cols);
return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context);

View File

@ -1267,8 +1267,7 @@ private:
ProjectionNameToItsBlocks projection_parts;
std::move_iterator<ProjectionNameToItsBlocks::iterator> projection_parts_iterator;
std::vector<PlanSquashing> projection_squash_plannings;
std::vector<ApplySquashing> projection_squashes;
std::vector<Squashing> projection_squashes;
const ProjectionsDescription & projections;
ExecutableTaskPtr merge_projection_parts_task_ptr;
@ -1286,10 +1285,9 @@ void PartMergerWriter::prepare()
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
PlanSquashing plan_squashing(ctx->updated_header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
Squashing squashing(ctx->updated_header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
// We split the materialization into multiple stages similar to the process of INSERT SELECT query.
projection_squash_plannings.emplace_back(ctx->updated_header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
projection_squashes.emplace_back(ctx->updated_header);
projection_squashes.emplace_back(ctx->updated_header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
}
existing_rows_count = 0;
@ -1317,11 +1315,11 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
Block block_to_squash = projection.calculate(cur_block, ctx->context);
projection_squashes[i].header = block_to_squash;
Chunk planned_chunk = projection_squash_plannings[i].add({block_to_squash.getColumns(), block_to_squash.rows()});
Chunk planned_chunk = projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()});
if (planned_chunk.hasChunkInfo())
{
Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk));
Chunk projection_chunk = projection_squashes[i].squash(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
if (projection_chunk.hasColumns())
for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j)
@ -1345,11 +1343,11 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
const auto & projection = *ctx->projections_to_build[i];
auto & projection_squash_plan = projection_squash_plannings[i];
auto & projection_squash_plan = projection_squashes[i];
auto planned_chunk = projection_squash_plan.flush();
if (planned_chunk.hasChunkInfo())
{
Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk));
Chunk projection_chunk = projection_squashes[i].squash(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
if (projection_chunk.hasColumns())
for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j)

View File

@ -313,7 +313,7 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context)
// There should be only one output block after this transformation.
builder.addTransform(std::make_shared<PlanSquashingTransform>(builder.getHeader(), block.rows(), 0, 1));
builder.addTransform(std::make_shared<ApplySquashingTransform>(builder.getHeader()));
builder.addTransform(std::make_shared<ApplySquashingTransform>(builder.getHeader(), block.rows(), 0));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline);