Backport #66174 to 24.6: Fix SimpleSquashingTransform

This commit is contained in:
robot-clickhouse 2024-07-07 07:08:58 +00:00
parent 80ef33fe72
commit d63399c516
4 changed files with 193 additions and 45 deletions

View File

@ -783,7 +783,7 @@ void AggregatingTransform::initGenerate()
{ {
/// Just a reasonable constant, matches default value for the setting `preferred_block_size_bytes` /// Just a reasonable constant, matches default value for the setting `preferred_block_size_bytes`
static constexpr size_t oneMB = 1024 * 1024; static constexpr size_t oneMB = 1024 * 1024;
return std::make_shared<SimpleSquashingTransform>(header, params->params.max_block_size, oneMB); return std::make_shared<SimpleSquashingChunksTransform>(header, params->params.max_block_size, oneMB);
}); });
} }
/// AggregatingTransform::expandPipeline expects single output port. /// AggregatingTransform::expandPipeline expects single output port.

View File

@ -7,6 +7,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
} }
SquashingTransform::SquashingTransform( SquashingTransform::SquashingTransform(
@ -56,53 +57,170 @@ void SquashingTransform::work()
} }
} }
SimpleSquashingTransform::SimpleSquashingTransform( SimpleSquashingChunksTransform::SimpleSquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ISimpleTransform(header, header, false) : IInflatingTransform(header, header), squashing(min_block_size_rows, min_block_size_bytes)
, squashing(header, min_block_size_rows, min_block_size_bytes)
{ {
} }
void SimpleSquashingTransform::transform(Chunk & chunk) void SimpleSquashingChunksTransform::consume(Chunk chunk)
{ {
if (!finished) Block current_block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
{ squashed_chunk.setColumns(current_block.getColumns(), current_block.rows());
Chunk planned_chunk = squashing.add(std::move(chunk));
if (planned_chunk.hasChunkInfo())
chunk = DB::Squashing::squash(std::move(planned_chunk));
}
else
{
if (chunk.hasRows())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost");
chunk = squashing.flush();
if (chunk.hasChunkInfo())
chunk = DB::Squashing::squash(std::move(chunk));
}
} }
IProcessor::Status SimpleSquashingTransform::prepare() Chunk SimpleSquashingChunksTransform::generate()
{ {
if (!finished && input.isFinished()) if (squashed_chunk.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in SimpleSquashingChunksTransform");
return std::move(squashed_chunk);
}
bool SimpleSquashingChunksTransform::canGenerate()
{
return !squashed_chunk.empty();
}
Chunk SimpleSquashingChunksTransform::getRemaining()
{
Block current_block = squashing.add({});
squashed_chunk.setColumns(current_block.getColumns(), current_block.rows());
return std::move(squashed_chunk);
}
SquashingLegacy::SquashingLegacy(size_t min_block_size_rows_, size_t min_block_size_bytes_)
: min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
{
}
Block SquashingLegacy::add(Block && input_block)
{
return addImpl<Block &&>(std::move(input_block));
}
Block SquashingLegacy::add(const Block & input_block)
{
return addImpl<const Block &>(input_block);
}
/*
* To minimize copying, accept two types of argument: const reference for output
* stream, and rvalue reference for input stream, and decide whether to copy
* inside this function. This allows us not to copy Block unless we absolutely
* have to.
*/
template <typename ReferenceType>
Block SquashingLegacy::addImpl(ReferenceType input_block)
{
/// End of input stream.
if (!input_block)
{ {
if (output.isFinished()) Block to_return;
return Status::Finished; std::swap(to_return, accumulated_block);
return to_return;
}
if (!output.canPush()) /// Just read block is already enough.
return Status::PortFull; if (isEnoughSize(input_block))
{
if (has_output) /// If no accumulated data, return just read block.
if (!accumulated_block)
{ {
output.pushData(std::move(output_data)); return std::move(input_block);
has_output = false;
return Status::PortFull;
} }
finished = true; /// Return accumulated data (maybe it has small size) and place new block to accumulated data.
/// On the next call to transform() we will return all data buffered in `squashing` (if any) Block to_return = std::move(input_block);
return Status::Ready; std::swap(to_return, accumulated_block);
return to_return;
} }
return ISimpleTransform::prepare();
/// Accumulated block is already enough.
if (isEnoughSize(accumulated_block))
{
/// Return accumulated data and place new block to accumulated data.
Block to_return = std::move(input_block);
std::swap(to_return, accumulated_block);
return to_return;
}
append<ReferenceType>(std::move(input_block));
if (isEnoughSize(accumulated_block))
{
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
/// Squashed block is not ready.
return {};
} }
template <typename ReferenceType>
void SquashingLegacy::append(ReferenceType input_block)
{
if (!accumulated_block)
{
accumulated_block = std::move(input_block);
return;
}
assert(blocksHaveEqualStructure(input_block, accumulated_block));
try
{
for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i)
{
const auto source_column = input_block.getByPosition(i).column;
auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column));
mutable_column->insertRangeFrom(*source_column, 0, source_column->size());
accumulated_block.getByPosition(i).column = std::move(mutable_column);
}
}
catch (...)
{
/// add() may be called again even after a previous add() threw an exception.
/// Keep accumulated_block in a valid state.
/// Seems ok to discard accumulated data because we're throwing an exception, which the caller will
/// hopefully interpret to mean "this block and all *previous* blocks are potentially lost".
accumulated_block.clear();
throw;
}
}
bool SquashingLegacy::isEnoughSize(const Block & block)
{
size_t rows = 0;
size_t bytes = 0;
for (const auto & [column, type, name] : block)
{
if (!column)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid column in block.");
if (!rows)
rows = column->size();
else if (rows != column->size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Sizes of columns doesn't match");
bytes += column->byteSize();
}
return isEnoughSize(rows, bytes);
}
bool SquashingLegacy::isEnoughSize(size_t rows, size_t bytes) const
{
return (!min_block_size_rows && !min_block_size_bytes)
|| (min_block_size_rows && rows >= min_block_size_rows)
|| (min_block_size_bytes && bytes >= min_block_size_bytes);
}
} }

View File

@ -2,6 +2,7 @@
#include <Interpreters/Squashing.h> #include <Interpreters/Squashing.h>
#include <Processors/ISimpleTransform.h> #include <Processors/ISimpleTransform.h>
#include <Processors/IInflatingTransform.h>
#include <Processors/Sinks/SinkToStorage.h> #include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Transforms/ApplySquashingTransform.h> #include <Processors/Transforms/ApplySquashingTransform.h>
@ -29,22 +30,51 @@ private:
Chunk finish_chunk; Chunk finish_chunk;
}; };
/// Doesn't care about propagating exceptions and thus doesn't throw LOGICAL_ERROR if the following transform closes its input port.
class SimpleSquashingTransform : public ISimpleTransform class SquashingLegacy
{ {
public: public:
explicit SimpleSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes); /// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored.
SquashingLegacy(size_t min_block_size_rows_, size_t min_block_size_bytes_);
/** Add next block and possibly returns squashed block.
* At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true.
*/
Block add(Block && block);
Block add(const Block & block);
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
Block accumulated_block;
template <typename ReferenceType>
Block addImpl(ReferenceType block);
template <typename ReferenceType>
void append(ReferenceType block);
bool isEnoughSize(const Block & block);
bool isEnoughSize(size_t rows, size_t bytes) const;
};
class SimpleSquashingChunksTransform : public IInflatingTransform
{
public:
explicit SimpleSquashingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
String getName() const override { return "SimpleSquashingTransform"; } String getName() const override { return "SimpleSquashingTransform"; }
protected: protected:
void transform(Chunk &) override; void consume(Chunk chunk) override;
bool canGenerate() override;
IProcessor::Status prepare() override; Chunk generate() override;
Chunk getRemaining() override;
private: private:
Squashing squashing; SquashingLegacy squashing;
Chunk squashed_chunk;
bool finished = false;
}; };
} }

View File

@ -290,7 +290,7 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
size_t min_block_size_rows = mutable_context->getSettingsRef().min_external_table_block_size_rows; size_t min_block_size_rows = mutable_context->getSettingsRef().min_external_table_block_size_rows;
size_t min_block_size_bytes = mutable_context->getSettingsRef().min_external_table_block_size_bytes; size_t min_block_size_bytes = mutable_context->getSettingsRef().min_external_table_block_size_bytes;
auto squashing = std::make_shared<SimpleSquashingTransform>(builder->getHeader(), min_block_size_rows, min_block_size_bytes); auto squashing = std::make_shared<SimpleSquashingChunksTransform>(builder->getHeader(), min_block_size_rows, min_block_size_bytes);
builder->resize(1); builder->resize(1);
builder->addTransform(std::move(squashing)); builder->addTransform(std::move(squashing));