remove squashing

This commit is contained in:
yariks5s 2024-05-27 16:23:01 +00:00
parent 01a16fd8e2
commit 00b07bba14
6 changed files with 56 additions and 167 deletions

View File

@ -1,6 +1,7 @@
#include <vector>
#include <Interpreters/Squashing.h>
#include <Common/CurrentThread.h>
#include "Columns/IColumn.h"
namespace DB
@ -11,124 +12,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
Squashing::Squashing(size_t min_block_size_rows_, size_t min_block_size_bytes_)
: min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
{
}
Block Squashing::add(Block && input_block)
{
return addImpl<Block &&>(std::move(input_block));
}
Block Squashing::add(const Block & input_block)
{
return addImpl<const Block &>(input_block);
}
/*
* To minimize copying, accept two types of argument: const reference for output
* stream, and rvalue reference for input stream, and decide whether to copy
* inside this function. This allows us not to copy Block unless we absolutely
* have to.
*/
template <typename ReferenceType>
Block Squashing::addImpl(ReferenceType input_block)
{
/// End of input stream.
if (!input_block)
{
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
/// Just read block is already enough.
if (isEnoughSize(input_block))
{
/// If no accumulated data, return just read block.
if (!accumulated_block)
{
return std::move(input_block);
}
/// Return accumulated data (maybe it has small size) and place new block to accumulated data.
Block to_return = std::move(input_block);
std::swap(to_return, accumulated_block);
return to_return;
}
/// Accumulated block is already enough.
if (isEnoughSize(accumulated_block))
{
/// Return accumulated data and place new block to accumulated data.
Block to_return = std::move(input_block);
std::swap(to_return, accumulated_block);
return to_return;
}
append<ReferenceType>(std::move(input_block));
if (isEnoughSize(accumulated_block))
{
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
/// Squashed block is not ready.
return {};
}
template <typename ReferenceType>
void Squashing::append(ReferenceType input_block)
{
if (!accumulated_block)
{
accumulated_block = std::move(input_block);
return;
}
assert(blocksHaveEqualStructure(input_block, accumulated_block));
for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i)
{
const auto source_column = input_block.getByPosition(i).column;
auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column));
mutable_column->insertRangeFrom(*source_column, 0, source_column->size());
accumulated_block.getByPosition(i).column = std::move(mutable_column);
}
}
bool Squashing::isEnoughSize(const Block & block)
{
size_t rows = 0;
size_t bytes = 0;
for (const auto & [column, type, name] : block)
{
if (!rows)
rows = column->size();
else if (rows != column->size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Sizes of columns doesn't match");
bytes += column->byteSize();
}
return isEnoughSize(rows, bytes);
}
bool Squashing::isEnoughSize(size_t rows, size_t bytes) const
{
return (!min_block_size_rows && !min_block_size_bytes)
|| (min_block_size_rows && rows >= min_block_size_rows)
|| (min_block_size_bytes && bytes >= min_block_size_bytes);
}
ApplySquashing::ApplySquashing(Block header_)
: header(std::move(header_))
{
@ -187,10 +70,9 @@ const ChunksToSquash* ApplySquashing::getInfoFromChunk(const Chunk & chunk)
return agg_info;
}
PlanSquashing::PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_)
PlanSquashing::PlanSquashing(size_t min_block_size_rows_, size_t min_block_size_bytes_)
: min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
, header(std::move(header_))
{
}
@ -199,7 +81,7 @@ Chunk PlanSquashing::flush()
return convertToChunk(std::move(chunks_to_merge_vec));
}
Chunk PlanSquashing::add(Chunk & input_chunk)
Chunk PlanSquashing::add(Chunk && input_chunk)
{
if (!input_chunk)
return {};
@ -260,7 +142,8 @@ Chunk PlanSquashing::convertToChunk(std::vector<Chunk> && chunks)
chunks.clear();
return Chunk(header.cloneEmptyColumns(), 0, info);
Columns cols = {};
return Chunk(cols, 0, info);
}
void PlanSquashing::expandCurrentSize(size_t rows, size_t bytes)

View File

@ -25,33 +25,6 @@ struct ChunksToSquash : public ChunkInfo
*
* Order of data is kept.
*/
class Squashing
{
public:
/// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored.
Squashing(size_t min_block_size_rows_, size_t min_block_size_bytes_);
/** Add next block and possibly returns squashed block.
* At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true.
*/
Block add(Block && block);
Block add(const Block & block);
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
Block accumulated_block;
template <typename ReferenceType>
Block addImpl(ReferenceType block);
template <typename ReferenceType>
void append(ReferenceType block);
bool isEnoughSize(const Block & block);
bool isEnoughSize(size_t rows, size_t bytes) const;
};
class ApplySquashing
{
@ -75,9 +48,9 @@ private:
class PlanSquashing
{
public:
PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_);
PlanSquashing(size_t min_block_size_rows_, size_t min_block_size_bytes_);
Chunk add(Chunk & input_chunk);
Chunk add(Chunk && input_chunk);
Chunk flush();
bool isDataLeft()
{
@ -95,7 +68,7 @@ private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
const Block header;
// const Block header;
CurrentSize accumulated_size;
void expandCurrentSize(size_t rows, size_t bytes);

View File

@ -11,7 +11,7 @@ namespace ErrorCodes
}
PlanSquashingTransform::PlanSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), balance(header, min_block_size_rows, min_block_size_bytes)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), balance(min_block_size_rows, min_block_size_bytes)
{
}
@ -134,7 +134,7 @@ IProcessor::Status PlanSquashingTransform::waitForDataIn()
void PlanSquashingTransform::transform(Chunk & chunk_)
{
Chunk res_chunk = balance.add(chunk_);
Chunk res_chunk = balance.add(std::move(chunk_));
std::swap(res_chunk, chunk_);
}

View File

@ -12,14 +12,14 @@ extern const int LOGICAL_ERROR;
SquashingTransform::SquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ExceptionKeepingTransform(header, header, false)
, planSquashing(header, min_block_size_rows, min_block_size_bytes)
, planSquashing(min_block_size_rows, min_block_size_bytes)
, applySquashing(header)
{
}
void SquashingTransform::onConsume(Chunk chunk)
{
Chunk planned_chunk = planSquashing.add(chunk);
Chunk planned_chunk = planSquashing.add(std::move(chunk));
if (planned_chunk.hasChunkInfo())
cur_chunk = applySquashing.add(std::move(planned_chunk));
}
@ -60,7 +60,7 @@ void SquashingTransform::work()
SimpleSquashingTransform::SimpleSquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ISimpleTransform(header, header, false)
, planSquashing(header, min_block_size_rows, min_block_size_bytes)
, planSquashing(min_block_size_rows, min_block_size_bytes)
, applySquashing(header)
{
}
@ -69,7 +69,7 @@ void SimpleSquashingTransform::transform(Chunk & chunk)
{
if (!finished)
{
Chunk planned_chunk = planSquashing.add(chunk);
Chunk planned_chunk = planSquashing.add(std::move(chunk));
if (planned_chunk.hasChunkInfo())
chunk = applySquashing.add(std::move(planned_chunk));
}

View File

@ -885,13 +885,21 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro
using PushResult = AsynchronousInsertQueue::PushResult;
startInsertQuery();
Squashing squashing(0, query_context->getSettingsRef().async_insert_max_data_size);
PlanSquashing plan_squashing(0, query_context->getSettingsRef().async_insert_max_data_size);
ApplySquashing apply_squashing(state.input_header);
while (readDataNext())
{
auto result = squashing.add(std::move(state.block_for_insert));
if (result)
auto planned_chunk = plan_squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()});
Chunk result_chunk;
if (planned_chunk.hasChunkInfo())
result_chunk = apply_squashing.add(std::move(planned_chunk));
if (result_chunk)
{
ColumnsWithTypeAndName cols;
for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j)
cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j]));
auto result = Block(cols);
return PushResult
{
.status = PushResult::TOO_MUCH_DATA,
@ -900,7 +908,14 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro
}
}
auto result = squashing.add({});
auto planned_chunk = plan_squashing.flush();
Chunk result_chunk;
if (planned_chunk.hasChunkInfo())
result_chunk = apply_squashing.add(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j)
cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j]));
auto result = Block(cols);
return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context);
}

View File

@ -28,6 +28,7 @@
#include <DataTypes/DataTypeVariant.h>
#include <boost/algorithm/string/replace.hpp>
#include <Common/ProfileEventsScope.h>
#include <Core/ColumnsWithTypeAndName.h>
namespace ProfileEvents
@ -1266,7 +1267,8 @@ private:
ProjectionNameToItsBlocks projection_parts;
std::move_iterator<ProjectionNameToItsBlocks::iterator> projection_parts_iterator;
std::vector<Squashing> projection_squashes;
std::vector<PlanSquashing> projection_squash_plannings;
std::vector<ApplySquashing> projection_squashes;
const ProjectionsDescription & projections;
ExecutableTaskPtr merge_projection_parts_task_ptr;
@ -1285,7 +1287,8 @@ void PartMergerWriter::prepare()
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
// We split the materialization into multiple stages similar to the process of INSERT SELECT query.
projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
projection_squash_plannings.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
projection_squashes.emplace_back(ctx->updated_header);
}
existing_rows_count = 0;
@ -1313,7 +1316,15 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
Block projection_block;
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
projection_block = projection_squashes[i].add(projection.calculate(cur_block, ctx->context));
Block to_plan = projection.calculate(cur_block, ctx->context);
Chunk planned_chunk = projection_squash_plannings[i].add({to_plan.getColumns(), to_plan.rows()});
Chunk projection_chunk;
if (planned_chunk.hasChunkInfo())
projection_chunk = projection_squashes[i].add(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j)
cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j]));
projection_block = Block(cols);
}
if (projection_block)
@ -1337,8 +1348,15 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
const auto & projection = *ctx->projections_to_build[i];
auto & projection_squash = projection_squashes[i];
auto projection_block = projection_squash.add({});
auto & projection_squash_plan = projection_squash_plannings[i];
auto planned_chunk = projection_squash_plan.flush();
Chunk projection_chunk;
if (planned_chunk.hasChunkInfo())
projection_chunk = projection_squashes[i].add(std::move(planned_chunk));
ColumnsWithTypeAndName cols;
for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j)
cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j]));
auto projection_block = Block(cols);
if (projection_block)
{
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(