added move constructor and removed unused parts

This commit is contained in:
yariks5s 2024-05-30 16:33:48 +00:00
parent d0d5b6d0cb
commit 826bec2575
7 changed files with 47 additions and 48 deletions

View File

@ -1,7 +1,6 @@
#include <vector>
#include <Interpreters/Squashing.h>
#include <Common/CurrentThread.h>
#include "Columns/IColumn.h"
namespace DB
@ -69,9 +68,10 @@ const ChunksToSquash* ApplySquashing::getInfoFromChunk(const Chunk & chunk)
return agg_info;
}
PlanSquashing::PlanSquashing(size_t min_block_size_rows_, size_t min_block_size_bytes_)
PlanSquashing::PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_)
: min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
, header(header_)
{
}
@ -141,8 +141,7 @@ Chunk PlanSquashing::convertToChunk(std::vector<Chunk> && chunks)
chunks.clear();
Columns cols = {};
return Chunk(cols, 0, info);
return Chunk(header.cloneEmptyColumns(), 0, info);
}
void PlanSquashing::expandCurrentSize(size_t rows, size_t bytes)

View File

@ -33,9 +33,10 @@ public:
Chunk add(Chunk && input_chunk);
Block header;
private:
Chunk accumulated_chunk;
const Block header;
const ChunksToSquash * getInfoFromChunk(const Chunk & chunk);
@ -48,7 +49,8 @@ private:
class PlanSquashing
{
public:
PlanSquashing(size_t min_block_size_rows_, size_t min_block_size_bytes_);
explicit PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_);
PlanSquashing(PlanSquashing && other) = default;
Chunk add(Chunk && input_chunk);
Chunk flush();
@ -68,7 +70,7 @@ private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
// const Block header;
const Block header;
CurrentSize accumulated_size;
void expandCurrentSize(size_t rows, size_t bytes);

View File

@ -11,7 +11,7 @@ namespace ErrorCodes
}
PlanSquashingTransform::PlanSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), balance(min_block_size_rows, min_block_size_bytes)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), balance(header, min_block_size_rows, min_block_size_bytes)
{
}

View File

@ -12,7 +12,7 @@ extern const int LOGICAL_ERROR;
SquashingTransform::SquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ExceptionKeepingTransform(header, header, false)
, planSquashing(min_block_size_rows, min_block_size_bytes)
, planSquashing(header, min_block_size_rows, min_block_size_bytes)
, applySquashing(header)
{
}
@ -60,7 +60,7 @@ void SquashingTransform::work()
SimpleSquashingTransform::SimpleSquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ISimpleTransform(header, header, false)
, planSquashing(min_block_size_rows, min_block_size_bytes)
, planSquashing(header, min_block_size_rows, min_block_size_bytes)
, applySquashing(header)
{
}

View File

@ -367,13 +367,16 @@ std::optional<Chain> generateViewChain(
bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type;
out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access);
bool table_prefers_large_blocks = inner_table->prefersLargeBlocks();
const auto & settings = insert_context->getSettingsRef();
if (interpreter.shouldAddSquashingFroStorage(inner_table))
{
bool table_prefers_large_blocks = inner_table->prefersLargeBlocks();
const auto & settings = insert_context->getSettingsRef();
out.addSource(std::make_shared<SquashingTransform>(
out.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL));
out.addSource(std::make_shared<SquashingTransform>(
out.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL));
}
auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), current_thread, insert_context->getQuota());
counting->setProcessListElement(insert_context->getProcessListElement());

View File

@ -885,20 +885,19 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro
using PushResult = AsynchronousInsertQueue::PushResult;
startInsertQuery();
PlanSquashing plan_squashing(0, query_context->getSettingsRef().async_insert_max_data_size);
PlanSquashing plan_squashing(state.input_header, 0, query_context->getSettingsRef().async_insert_max_data_size);
ApplySquashing apply_squashing(state.input_header);
while (readDataNext())
{
auto planned_chunk = plan_squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()});
Chunk result_chunk;
if (planned_chunk.hasChunkInfo())
result_chunk = apply_squashing.add(std::move(planned_chunk));
if (result_chunk)
{
Chunk result_chunk = apply_squashing.add(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j)
cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j]));
if (result_chunk.hasColumns() && state.block_for_insert)
for (size_t j = 0; j < result_chunk.getNumColumns(); ++j)
cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.block_for_insert.getDataTypes()[j], state.block_for_insert.getNames()[j]));
auto result = Block(cols);
return PushResult
{
@ -913,8 +912,9 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro
if (planned_chunk.hasChunkInfo())
result_chunk = apply_squashing.add(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j)
cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j]));
if (result_chunk.hasColumns())
for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j)
cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j]));
auto result = Block(cols);
return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context);
}

View File

@ -1287,7 +1287,7 @@ void PartMergerWriter::prepare()
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
// We split the materialization into multiple stages similar to the process of INSERT SELECT query.
projection_squash_plannings.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
projection_squash_plannings.emplace_back(ctx->updated_header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
projection_squashes.emplace_back(ctx->updated_header);
}
@ -1313,24 +1313,20 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
{
const auto & projection = *ctx->projections_to_build[i];
Block projection_block;
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
Block to_plan = projection.calculate(cur_block, ctx->context);
Chunk planned_chunk = projection_squash_plannings[i].add({to_plan.getColumns(), to_plan.rows()});
Chunk projection_chunk;
if (planned_chunk.hasChunkInfo())
projection_chunk = projection_squashes[i].add(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j)
cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j]));
projection_block = Block(cols);
}
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
Block block_to_squash = projection.calculate(cur_block, ctx->context);
projection_squashes[i].header = block_to_squash;
Chunk planned_chunk = projection_squash_plannings[i].add({block_to_squash.getColumns(), block_to_squash.rows()});
if (projection_block)
if (planned_chunk.hasChunkInfo())
{
Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
if (projection_chunk.hasColumns())
for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j)
cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], block_to_squash.getDataTypes()[j], block_to_squash.getNames()[j]));
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num);
*ctx->data, ctx->log, Block(cols), projection, ctx->new_data_part.get(), ++block_num);
tmp_part.finalize();
tmp_part.part->getDataPartStorage().commitTransaction();
projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
@ -1350,17 +1346,16 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
const auto & projection = *ctx->projections_to_build[i];
auto & projection_squash_plan = projection_squash_plannings[i];
auto planned_chunk = projection_squash_plan.flush();
Chunk projection_chunk;
if (planned_chunk.hasChunkInfo())
projection_chunk = projection_squashes[i].add(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j)
cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j]));
auto projection_block = Block(cols);
if (projection_block)
{
Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
if (projection_chunk.hasColumns())
for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j)
cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], projection_squashes[i].header.getDataTypes()[j], projection_squashes[i].header.getNames()[j]));
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num);
*ctx->data, ctx->log, Block(cols), projection, ctx->new_data_part.get(), ++block_num);
temp_part.finalize();
temp_part.part->getDataPartStorage().commitTransaction();
projection_parts[projection.name].emplace_back(std::move(temp_part.part));