apply double-phased squashing in all transformers, resize optimization

This commit is contained in:
yariks5s 2024-05-23 19:12:02 +00:00
parent 58000be1a7
commit f632636f21
6 changed files with 57 additions and 46 deletions

View File

@ -1,7 +1,6 @@
#include <vector> #include <vector>
#include <Interpreters/Squashing.h> #include <Interpreters/Squashing.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/logger_useful.h>
namespace DB namespace DB
@ -135,46 +134,52 @@ ApplySquashing::ApplySquashing(Block header_)
{ {
} }
Block ApplySquashing::add(Chunk && input_chunk) Chunk ApplySquashing::add(Chunk && input_chunk)
{ {
return addImpl(std::move(input_chunk)); return addImpl(std::move(input_chunk));
} }
Block ApplySquashing::addImpl(Chunk && input_chunk) Chunk ApplySquashing::addImpl(Chunk && input_chunk)
{ {
if (!input_chunk.hasChunkInfo()) if (!input_chunk.hasChunkInfo())
return Block(); return Chunk();
const auto *info = getInfoFromChunk(input_chunk); const auto *info = getInfoFromChunk(input_chunk);
for (auto & chunk : info->chunks) append(info->chunks);
append(chunk);
Block to_return; Block to_return;
std::swap(to_return, accumulated_block); std::swap(to_return, accumulated_block);
return to_return; return Chunk(to_return.getColumns(), to_return.rows());
} }
void ApplySquashing::append(Chunk & input_chunk) void ApplySquashing::append(const std::vector<Chunk> & input_chunks)
{ {
if (input_chunk.getNumColumns() == 0) std::vector<IColumn::MutablePtr> mutable_columns;
return; size_t rows = 0;
if (!accumulated_block) for (const Chunk & chunk : input_chunks)
rows += chunk.getNumRows();
for (const auto & input_chunk : input_chunks)
{ {
for (size_t i = 0; i < input_chunk.getNumColumns(); ++ i) if (!accumulated_block)
{ {
ColumnWithTypeAndName col = ColumnWithTypeAndName(input_chunk.getColumns()[i], header.getDataTypes()[i], header.getNames()[i]); for (size_t i = 0; i < input_chunks[0].getNumColumns(); ++i)
accumulated_block.insert(accumulated_block.columns(), col); { // We can put this part of code out of the cycle, but it will consume more memory
ColumnWithTypeAndName col = ColumnWithTypeAndName(input_chunks[0].getColumns()[i],header.getDataTypes()[i], header.getNames()[i]);
mutable_columns.push_back(IColumn::mutate(col.column));
mutable_columns[i]->reserve(rows);
accumulated_block.insert(col);
}
continue;
} }
return;
}
for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i) for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i)
{ {
const auto source_column = input_chunk.getColumns()[i]; const auto source_column = input_chunk.getColumns()[i];
auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column)); mutable_columns[i]->insertRangeFrom(*source_column, 0, source_column->size());
mutable_column->insertRangeFrom(*source_column, 0, source_column->size()); accumulated_block.getByPosition(i).column = mutable_columns[i]->cloneFinalized();
accumulated_block.getByPosition(i).column = std::move(mutable_column); }
} }
} }
@ -283,7 +288,6 @@ bool PlanSquashing::isEnoughSize(const std::vector<Chunk> & chunks)
bool PlanSquashing::isEnoughSize(size_t rows, size_t bytes) const bool PlanSquashing::isEnoughSize(size_t rows, size_t bytes) const
{ {
LOG_TRACE(getLogger("Planning"), "rows: {}, bytes: {}", rows, bytes);
return (!min_block_size_rows && !min_block_size_bytes) return (!min_block_size_rows && !min_block_size_bytes)
|| (min_block_size_rows && rows >= min_block_size_rows) || (min_block_size_rows && rows >= min_block_size_rows)
|| (min_block_size_bytes && bytes >= min_block_size_bytes); || (min_block_size_bytes && bytes >= min_block_size_bytes);

View File

@ -1,7 +1,5 @@
#pragma once #pragma once
#include <list>
#include <memory>
#include <vector> #include <vector>
#include <Core/Block.h> #include <Core/Block.h>
#include <Processors/Chunk.h> #include <Processors/Chunk.h>
@ -60,17 +58,17 @@ class ApplySquashing
public: public:
explicit ApplySquashing(Block header_); explicit ApplySquashing(Block header_);
Block add(Chunk && input_chunk); Chunk add(Chunk && input_chunk);
private: private:
Block accumulated_block; Block accumulated_block;
const Block header; const Block header;
Block addImpl(Chunk && chunk); Chunk addImpl(Chunk && chunk);
const ChunksToSquash * getInfoFromChunk(const Chunk & chunk); const ChunksToSquash * getInfoFromChunk(const Chunk & chunk);
void append(Chunk & input_chunk); void append(const std::vector<Chunk> & input_chunks);
bool isEnoughSize(const Block & block); bool isEnoughSize(const Block & block);
bool isEnoughSize(size_t rows, size_t bytes) const; bool isEnoughSize(size_t rows, size_t bytes) const;

View File

@ -37,8 +37,8 @@ public:
protected: protected:
void onConsume(Chunk chunk) override void onConsume(Chunk chunk) override
{ {
if (auto block = squashing.add(std::move(chunk))) if (auto res_chunk = squashing.add(std::move(chunk)))
cur_chunk.setColumns(block.getColumns(), block.rows()); cur_chunk.setColumns(res_chunk.getColumns(), res_chunk.getNumRows());
} }
GenerateResult onGenerate() override GenerateResult onGenerate() override
@ -50,8 +50,8 @@ protected:
} }
void onFinish() override void onFinish() override
{ {
auto block = squashing.add({}); auto chunk = squashing.add({});
finish_chunk.setColumns(block.getColumns(), block.rows()); finish_chunk.setColumns(chunk.getColumns(), chunk.getNumRows());
} }
private: private:

View File

@ -3,7 +3,6 @@
#include <Processors/Sinks/SinkToStorage.h> #include <Processors/Sinks/SinkToStorage.h>
#include <Processors/IProcessor.h> #include <Processors/IProcessor.h>
#include <Interpreters/Squashing.h> #include <Interpreters/Squashing.h>
#include "Processors/Port.h"
enum PlanningStatus enum PlanningStatus
{ {

View File

@ -1,5 +1,5 @@
#include <Processors/Transforms/SquashingTransform.h> #include <Processors/Transforms/SquashingTransform.h>
#include <Common/logger_useful.h> #include <Interpreters/Squashing.h>
namespace DB namespace DB
{ {
@ -12,14 +12,16 @@ extern const int LOGICAL_ERROR;
SquashingTransform::SquashingTransform( SquashingTransform::SquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ExceptionKeepingTransform(header, header, false) : ExceptionKeepingTransform(header, header, false)
, squashing(min_block_size_rows, min_block_size_bytes) , planSquashing(header, min_block_size_rows, min_block_size_bytes)
, applySquashing(header)
{ {
} }
void SquashingTransform::onConsume(Chunk chunk) void SquashingTransform::onConsume(Chunk chunk)
{ {
if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()))) Chunk planned_chunk = planSquashing.add(std::move(chunk));
cur_chunk.setColumns(block.getColumns(), block.rows()); if (planned_chunk.hasChunkInfo())
cur_chunk = applySquashing.add(std::move(planned_chunk));
} }
SquashingTransform::GenerateResult SquashingTransform::onGenerate() SquashingTransform::GenerateResult SquashingTransform::onGenerate()
@ -32,8 +34,10 @@ SquashingTransform::GenerateResult SquashingTransform::onGenerate()
void SquashingTransform::onFinish() void SquashingTransform::onFinish()
{ {
auto block = squashing.add({}); Chunk chunk = planSquashing.flush();
finish_chunk.setColumns(block.getColumns(), block.rows()); if (chunk.hasChunkInfo())
chunk = applySquashing.add(std::move(chunk));
finish_chunk.setColumns(chunk.getColumns(), chunk.getNumRows());
} }
void SquashingTransform::work() void SquashingTransform::work()
@ -55,7 +59,9 @@ void SquashingTransform::work()
SimpleSquashingTransform::SimpleSquashingTransform( SimpleSquashingTransform::SimpleSquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ISimpleTransform(header, header, false), squashing(min_block_size_rows, min_block_size_bytes) : ISimpleTransform(header, header, false)
, planSquashing(header, min_block_size_rows, min_block_size_bytes)
, applySquashing(header)
{ {
} }
@ -63,16 +69,18 @@ void SimpleSquashingTransform::transform(Chunk & chunk)
{ {
if (!finished) if (!finished)
{ {
if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()))) Chunk planned_chunk = planSquashing.add(std::move(chunk));
chunk.setColumns(block.getColumns(), block.rows()); if (planned_chunk.hasChunkInfo())
chunk = applySquashing.add(std::move(planned_chunk));
} }
else else
{ {
if (chunk.hasRows()) if (chunk.hasRows())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost");
auto block = squashing.add({}); chunk = planSquashing.flush();
chunk.setColumns(block.getColumns(), block.rows()); if (chunk.hasChunkInfo())
chunk = applySquashing.add(std::move(chunk));
} }
} }

View File

@ -24,7 +24,8 @@ protected:
void onFinish() override; void onFinish() override;
private: private:
Squashing squashing; PlanSquashing planSquashing;
ApplySquashing applySquashing;
Chunk cur_chunk; Chunk cur_chunk;
Chunk finish_chunk; Chunk finish_chunk;
}; };
@ -43,7 +44,8 @@ protected:
IProcessor::Status prepare() override; IProcessor::Status prepare() override;
private: private:
Squashing squashing; PlanSquashing planSquashing;
ApplySquashing applySquashing;
bool finished = false; bool finished = false;
}; };