diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index e632886778f..d735fb8a55c 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -632,7 +632,10 @@ BlockIO InterpreterInsertQuery::execute() pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr { - return std::make_shared(in_header); + return std::make_shared( + in_header, + table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, + table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL); }); } @@ -685,7 +688,10 @@ BlockIO InterpreterInsertQuery::execute() { bool table_prefers_large_blocks = table->prefersLargeBlocks(); - auto squashing = std::make_shared(chain.getInputHeader()); + auto squashing = std::make_shared( + chain.getInputHeader(), + table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, + table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL); chain.addSource(std::move(squashing)); diff --git a/src/Interpreters/Squashing.cpp b/src/Interpreters/Squashing.cpp index 47add495421..a05c5853ce3 100644 --- a/src/Interpreters/Squashing.cpp +++ b/src/Interpreters/Squashing.cpp @@ -10,77 +10,30 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -ApplySquashing::ApplySquashing(Block header_) +Squashing::Squashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_) : header(header_) + , min_block_size_rows(min_block_size_rows_) + , min_block_size_bytes(min_block_size_bytes_) { } -Chunk ApplySquashing::add(Chunk && input_chunk) +Chunk Squashing::flush() +{ + return convertToChunk(std::move(chunks_to_merge_vec)); +} + +Chunk Squashing::squash(Chunk && input_chunk) { if (!input_chunk.hasChunkInfo()) return Chunk(); const auto *info = getInfoFromChunk(input_chunk); - append(info->chunks); + squash(info->chunks); return std::move(accumulated_chunk); } -void ApplySquashing::append(std::vector & input_chunks) -{ - accumulated_chunk = {}; - std::vector mutable_columns = {}; - size_t rows = 0; - for (const Chunk & chunk : input_chunks) - rows += chunk.getNumRows(); - - { - auto & first_chunk = input_chunks[0]; - Columns columns = first_chunk.detachColumns(); - for (size_t i = 0; i < columns.size(); ++i) - { - mutable_columns.push_back(IColumn::mutate(std::move(columns[i]))); - mutable_columns[i]->reserve(rows); - } - } - - for (size_t i = 1; i < input_chunks.size(); ++i) // We've already processed the first chunk above - { - Columns columns = input_chunks[i].detachColumns(); - for (size_t j = 0, size = mutable_columns.size(); j < size; ++j) - { - const auto source_column = columns[j]; - - mutable_columns[j]->insertRangeFrom(*source_column, 0, source_column->size()); - } - } - accumulated_chunk.setColumns(std::move(mutable_columns), rows); -} - -const ChunksToSquash* ApplySquashing::getInfoFromChunk(const Chunk & chunk) -{ - const auto& info = chunk.getChunkInfo(); - const auto * agg_info = typeid_cast(info.get()); - - if (!agg_info) - throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no ChunksToSquash in ChunkInfoPtr"); - - return agg_info; -} - -PlanSquashing::PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_) - : min_block_size_rows(min_block_size_rows_) - , min_block_size_bytes(min_block_size_bytes_) - , header(header_) -{ -} - -Chunk PlanSquashing::flush() -{ - return convertToChunk(std::move(chunks_to_merge_vec)); -} - -Chunk PlanSquashing::add(Chunk && input_chunk) +Chunk Squashing::add(Chunk && input_chunk) { if (!input_chunk) return {}; @@ -131,7 +84,7 @@ Chunk PlanSquashing::add(Chunk && input_chunk) return {}; } -Chunk PlanSquashing::convertToChunk(std::vector && chunks) +Chunk Squashing::convertToChunk(std::vector && chunks) const { if (chunks.empty()) return {}; @@ -144,19 +97,61 @@ Chunk PlanSquashing::convertToChunk(std::vector && chunks) return Chunk(header.cloneEmptyColumns(), 0, info); } -void PlanSquashing::expandCurrentSize(size_t rows, size_t bytes) +void Squashing::squash(std::vector & input_chunks) +{ + accumulated_chunk = {}; + std::vector mutable_columns = {}; + size_t rows = 0; + for (const Chunk & chunk : input_chunks) + rows += chunk.getNumRows(); + + { + auto & first_chunk = input_chunks[0]; + Columns columns = first_chunk.detachColumns(); + for (size_t i = 0; i < columns.size(); ++i) + { + mutable_columns.push_back(IColumn::mutate(std::move(columns[i]))); + mutable_columns[i]->reserve(rows); + } + } + + for (size_t i = 1; i < input_chunks.size(); ++i) // We've already processed the first chunk above + { + Columns columns = input_chunks[i].detachColumns(); + for (size_t j = 0, size = mutable_columns.size(); j < size; ++j) + { + const auto source_column = columns[j]; + + mutable_columns[j]->insertRangeFrom(*source_column, 0, source_column->size()); + } + } + accumulated_chunk.setColumns(std::move(mutable_columns), rows); +} + +const ChunksToSquash* Squashing::getInfoFromChunk(const Chunk & chunk) +{ + const auto& info = chunk.getChunkInfo(); + const auto * agg_info = typeid_cast(info.get()); + + if (!agg_info) + throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no ChunksToSquash in ChunkInfoPtr"); + + return agg_info; +} + +void Squashing::expandCurrentSize(size_t rows, size_t bytes) { accumulated_size.rows += rows; accumulated_size.bytes += bytes; } -void PlanSquashing::changeCurrentSize(size_t rows, size_t bytes) +void Squashing::changeCurrentSize(size_t rows, size_t bytes) { accumulated_size.rows = rows; accumulated_size.bytes = bytes; } -bool PlanSquashing::isEnoughSize(size_t rows, size_t bytes) const +bool Squashing::isEnoughSize(size_t rows, size_t bytes) const { return (!min_block_size_rows && !min_block_size_bytes) || (min_block_size_rows && rows >= min_block_size_rows) diff --git a/src/Interpreters/Squashing.h b/src/Interpreters/Squashing.h index 77191e63050..760b7d7475f 100644 --- a/src/Interpreters/Squashing.h +++ b/src/Interpreters/Squashing.h @@ -26,39 +26,23 @@ struct ChunksToSquash : public ChunkInfo * Order of data is kept. */ -class ApplySquashing +class Squashing { public: - explicit ApplySquashing(Block header_); - - Chunk add(Chunk && input_chunk); - - Block header; - -private: - Chunk accumulated_chunk; - - const ChunksToSquash * getInfoFromChunk(const Chunk & chunk); - - void append(std::vector & input_chunks); - - bool isEnoughSize(const Block & block); - bool isEnoughSize(size_t rows, size_t bytes) const; -}; - -class PlanSquashing -{ -public: - explicit PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_); - PlanSquashing(PlanSquashing && other) = default; + explicit Squashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_); + Squashing(Squashing && other) = default; Chunk add(Chunk && input_chunk); + Chunk squash(Chunk && input_chunk); Chunk flush(); + bool isDataLeft() { return !chunks_to_merge_vec.empty(); } + Block header; + private: struct CurrentSize { @@ -70,14 +54,18 @@ private: size_t min_block_size_rows; size_t min_block_size_bytes; - const Block header; CurrentSize accumulated_size; + Chunk accumulated_chunk; + + const ChunksToSquash * getInfoFromChunk(const Chunk & chunk); + + void squash(std::vector & input_chunks); void expandCurrentSize(size_t rows, size_t bytes); void changeCurrentSize(size_t rows, size_t bytes); bool isEnoughSize(size_t rows, size_t bytes) const; - Chunk convertToChunk(std::vector && chunks); + Chunk convertToChunk(std::vector && chunks) const; }; } diff --git a/src/Processors/Transforms/ApplySquashingTransform.h b/src/Processors/Transforms/ApplySquashingTransform.h index e63691fcc6a..7bf1f32340b 100644 --- a/src/Processors/Transforms/ApplySquashingTransform.h +++ b/src/Processors/Transforms/ApplySquashingTransform.h @@ -9,9 +9,9 @@ namespace DB class ApplySquashingTransform : public ExceptionKeepingTransform { public: - explicit ApplySquashingTransform(const Block & header) + explicit ApplySquashingTransform(const Block & header, const size_t min_block_size_rows, const size_t min_block_size_bytes) : ExceptionKeepingTransform(header, header, false) - , squashing(header) + , squashing(header, min_block_size_rows, min_block_size_bytes) { } @@ -37,7 +37,7 @@ public: protected: void onConsume(Chunk chunk) override { - if (auto res_chunk = squashing.add(std::move(chunk))) + if (auto res_chunk = squashing.squash(std::move(chunk))) cur_chunk.setColumns(res_chunk.getColumns(), res_chunk.getNumRows()); } @@ -50,12 +50,12 @@ protected: } void onFinish() override { - auto chunk = squashing.add({}); + auto chunk = squashing.squash({}); finish_chunk.setColumns(chunk.getColumns(), chunk.getNumRows()); } private: - ApplySquashing squashing; + Squashing squashing; Chunk cur_chunk; Chunk finish_chunk; }; diff --git a/src/Processors/Transforms/PlanSquashingTransform.cpp b/src/Processors/Transforms/PlanSquashingTransform.cpp index 7945bd97e04..f8d5143493f 100644 --- a/src/Processors/Transforms/PlanSquashingTransform.cpp +++ b/src/Processors/Transforms/PlanSquashingTransform.cpp @@ -11,7 +11,7 @@ namespace ErrorCodes } PlanSquashingTransform::PlanSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports) - : IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), balance(header, min_block_size_rows, min_block_size_bytes) + : IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), squashing(header, min_block_size_rows, min_block_size_bytes) { } @@ -29,9 +29,6 @@ IProcessor::Status PlanSquashingTransform::prepare() case READ_IF_CAN: status = prepareConsume(); break; - case WAIT_IN: - planning_status = PlanningStatus::READ_IF_CAN; - return Status::NeedData; case PUSH: return sendOrFlush(); case FLUSH: @@ -64,17 +61,21 @@ void PlanSquashingTransform::init() IProcessor::Status PlanSquashingTransform::prepareConsume() { - bool inputs_have_no_data = true, all_finished = true; + bool all_finished = true; for (auto & input : inputs) { if (!input.isFinished()) all_finished = false; + else + { + input.setNeeded(); + continue; + } if (input.hasData()) { - inputs_have_no_data = false; chunk = input.pull(); - transform(chunk); + chunk = transform(std::move(chunk)); if (chunk.hasChunkInfo()) { @@ -86,62 +87,27 @@ IProcessor::Status PlanSquashingTransform::prepareConsume() if (all_finished) /// If all inputs are closed, we check if we have data in balancing { - if (balance.isDataLeft()) /// If we have data in balancing, we process this data + if (squashing.isDataLeft()) /// If we have data in balancing, we process this data { planning_status = PlanningStatus::FLUSH; flushChunk(); return Status::Ready; } - planning_status = PlanningStatus::PUSH; - return Status::Ready; - } - - if (inputs_have_no_data) - planning_status = PlanningStatus::WAIT_IN; - - return Status::Ready; -} - -IProcessor::Status PlanSquashingTransform::waitForDataIn() -{ - bool all_finished = true; - bool inputs_have_no_data = true; - for (auto & input : inputs) - { - if (input.isFinished()) - continue; - - all_finished = false; - - if (input.hasData()) - inputs_have_no_data = false; - - } - if (all_finished) - { - planning_status = PlanningStatus::READ_IF_CAN; - return Status::Ready; - } - - if (!inputs_have_no_data) - { - planning_status = PlanningStatus::READ_IF_CAN; + planning_status = PlanningStatus::FINISH; return Status::Ready; } return Status::NeedData; } -void PlanSquashingTransform::transform(Chunk & chunk_) +Chunk PlanSquashingTransform::transform(Chunk && chunk_) { - Chunk res_chunk = balance.add(std::move(chunk_)); - std::swap(res_chunk, chunk_); + return squashing.add(std::move(chunk_)); } -void PlanSquashingTransform::flushChunk() +Chunk PlanSquashingTransform::flushChunk() { - Chunk res_chunk = balance.flush(); - std::swap(res_chunk, chunk); + return squashing.flush(); } IProcessor::Status PlanSquashingTransform::sendOrFlush() diff --git a/src/Processors/Transforms/PlanSquashingTransform.h b/src/Processors/Transforms/PlanSquashingTransform.h index 7afc942a7f2..a9152d9dbe9 100644 --- a/src/Processors/Transforms/PlanSquashingTransform.h +++ b/src/Processors/Transforms/PlanSquashingTransform.h @@ -8,7 +8,6 @@ enum PlanningStatus { INIT, READ_IF_CAN, - WAIT_IN, PUSH, FLUSH, FINISH @@ -36,12 +35,12 @@ public: Status waitForDataIn(); Status finish(); - void transform(Chunk & chunk); - void flushChunk(); + Chunk transform(Chunk && chunk); + Chunk flushChunk(); private: Chunk chunk; - PlanSquashing balance; + Squashing squashing; PlanningStatus planning_status = PlanningStatus::INIT; }; } diff --git a/src/Processors/Transforms/SquashingTransform.cpp b/src/Processors/Transforms/SquashingTransform.cpp index a516811bf45..c1f8a9f2513 100644 --- a/src/Processors/Transforms/SquashingTransform.cpp +++ b/src/Processors/Transforms/SquashingTransform.cpp @@ -12,16 +12,15 @@ extern const int LOGICAL_ERROR; SquashingTransform::SquashingTransform( const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) : ExceptionKeepingTransform(header, header, false) - , planSquashing(header, min_block_size_rows, min_block_size_bytes) - , applySquashing(header) + , squashing(header, min_block_size_rows, min_block_size_bytes) { } void SquashingTransform::onConsume(Chunk chunk) { - Chunk planned_chunk = planSquashing.add(std::move(chunk)); + Chunk planned_chunk = squashing.add(std::move(chunk)); if (planned_chunk.hasChunkInfo()) - cur_chunk = applySquashing.add(std::move(planned_chunk)); + cur_chunk = squashing.squash(std::move(planned_chunk)); } SquashingTransform::GenerateResult SquashingTransform::onGenerate() @@ -34,9 +33,9 @@ SquashingTransform::GenerateResult SquashingTransform::onGenerate() void SquashingTransform::onFinish() { - Chunk chunk = planSquashing.flush(); + Chunk chunk = squashing.flush(); if (chunk.hasChunkInfo()) - chunk = applySquashing.add(std::move(chunk)); + chunk = squashing.squash(std::move(chunk)); finish_chunk.setColumns(chunk.getColumns(), chunk.getNumRows()); } @@ -60,8 +59,7 @@ void SquashingTransform::work() SimpleSquashingTransform::SimpleSquashingTransform( const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) : ISimpleTransform(header, header, false) - , planSquashing(header, min_block_size_rows, min_block_size_bytes) - , applySquashing(header) + , squashing(header, min_block_size_rows, min_block_size_bytes) { } @@ -69,18 +67,18 @@ void SimpleSquashingTransform::transform(Chunk & chunk) { if (!finished) { - Chunk planned_chunk = planSquashing.add(std::move(chunk)); + Chunk planned_chunk = squashing.add(std::move(chunk)); if (planned_chunk.hasChunkInfo()) - chunk = applySquashing.add(std::move(planned_chunk)); + chunk = squashing.squash(std::move(planned_chunk)); } else { if (chunk.hasRows()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost"); - chunk = planSquashing.flush(); + chunk = squashing.flush(); if (chunk.hasChunkInfo()) - chunk = applySquashing.add(std::move(chunk)); + chunk = squashing.squash(std::move(chunk)); } } diff --git a/src/Processors/Transforms/SquashingTransform.h b/src/Processors/Transforms/SquashingTransform.h index b5b3c6616d2..c5b727ac6ec 100644 --- a/src/Processors/Transforms/SquashingTransform.h +++ b/src/Processors/Transforms/SquashingTransform.h @@ -24,8 +24,7 @@ protected: void onFinish() override; private: - PlanSquashing planSquashing; - ApplySquashing applySquashing; + Squashing squashing; Chunk cur_chunk; Chunk finish_chunk; }; @@ -44,8 +43,7 @@ protected: IProcessor::Status prepare() override; private: - PlanSquashing planSquashing; - ApplySquashing applySquashing; + Squashing squashing; bool finished = false; }; diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 1dd99796754..2be4e8d5665 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -885,22 +885,21 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro using PushResult = AsynchronousInsertQueue::PushResult; startInsertQuery(); - PlanSquashing plan_squashing(state.input_header, 0, query_context->getSettingsRef().async_insert_max_data_size); - ApplySquashing apply_squashing(state.input_header); + Squashing squashing(state.input_header, 0, query_context->getSettingsRef().async_insert_max_data_size); while (readDataNext()) { - apply_squashing.header = state.block_for_insert; - auto planned_chunk = plan_squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()}); + squashing.header = state.block_for_insert; + auto planned_chunk = squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()}); if (planned_chunk.hasChunkInfo()) { - Chunk result_chunk = apply_squashing.add(std::move(planned_chunk)); + Chunk result_chunk = squashing.squash(std::move(planned_chunk)); ColumnsWithTypeAndName cols; if (result_chunk.hasColumns() && state.block_for_insert) for (size_t j = 0; j < result_chunk.getNumColumns(); ++j) cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.block_for_insert.getDataTypes()[j], state.block_for_insert.getNames()[j])); auto result = Block(cols); - apply_squashing.header = Block(state.block_for_insert); + squashing.header = Block(state.block_for_insert); return PushResult { .status = PushResult::TOO_MUCH_DATA, @@ -909,14 +908,14 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro } } - auto planned_chunk = plan_squashing.flush(); + auto planned_chunk = squashing.flush(); Chunk result_chunk; if (planned_chunk.hasChunkInfo()) - result_chunk = apply_squashing.add(std::move(planned_chunk)); + result_chunk = squashing.squash(std::move(planned_chunk)); ColumnsWithTypeAndName cols; if (result_chunk.hasColumns()) for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) - cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], apply_squashing.header.getDataTypes()[j], apply_squashing.header.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], squashing.header.getDataTypes()[j], squashing.header.getNames()[j])); auto result = Block(cols); return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 0d1fc46ec76..fad195d6a36 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1267,8 +1267,7 @@ private: ProjectionNameToItsBlocks projection_parts; std::move_iterator projection_parts_iterator; - std::vector projection_squash_plannings; - std::vector projection_squashes; + std::vector projection_squashes; const ProjectionsDescription & projections; ExecutableTaskPtr merge_projection_parts_task_ptr; @@ -1286,10 +1285,9 @@ void PartMergerWriter::prepare() for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) { - PlanSquashing plan_squashing(ctx->updated_header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); + Squashing squashing(ctx->updated_header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); // We split the materialization into multiple stages similar to the process of INSERT SELECT query. - projection_squash_plannings.emplace_back(ctx->updated_header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); - projection_squashes.emplace_back(ctx->updated_header); + projection_squashes.emplace_back(ctx->updated_header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); } existing_rows_count = 0; @@ -1317,11 +1315,11 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() ProfileEventTimeIncrement watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds); Block block_to_squash = projection.calculate(cur_block, ctx->context); projection_squashes[i].header = block_to_squash; - Chunk planned_chunk = projection_squash_plannings[i].add({block_to_squash.getColumns(), block_to_squash.rows()}); + Chunk planned_chunk = projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}); if (planned_chunk.hasChunkInfo()) { - Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); + Chunk projection_chunk = projection_squashes[i].squash(std::move(planned_chunk)); ColumnsWithTypeAndName cols; if (projection_chunk.hasColumns()) for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) @@ -1345,11 +1343,11 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) { const auto & projection = *ctx->projections_to_build[i]; - auto & projection_squash_plan = projection_squash_plannings[i]; + auto & projection_squash_plan = projection_squashes[i]; auto planned_chunk = projection_squash_plan.flush(); if (planned_chunk.hasChunkInfo()) { - Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); + Chunk projection_chunk = projection_squashes[i].squash(std::move(planned_chunk)); ColumnsWithTypeAndName cols; if (projection_chunk.hasColumns()) for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) diff --git a/src/Storages/ProjectionsDescription.cpp b/src/Storages/ProjectionsDescription.cpp index c88582a8a1a..37ea3f274b6 100644 --- a/src/Storages/ProjectionsDescription.cpp +++ b/src/Storages/ProjectionsDescription.cpp @@ -313,7 +313,7 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context) // There should be only one output block after this transformation. builder.addTransform(std::make_shared(builder.getHeader(), block.rows(), 0, 1)); - builder.addTransform(std::make_shared(builder.getHeader())); + builder.addTransform(std::make_shared(builder.getHeader(), block.rows(), 0)); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); PullingPipelineExecutor executor(pipeline);