Merge pull request #66174 from ClickHouse/vdimir/fix_squashing_transform2

Fix SimpleSquashingTransform
This commit is contained in:
vdimir 2024-07-07 06:45:26 +00:00 committed by GitHub
commit a36673135a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 193 additions and 45 deletions

View File

@ -783,7 +783,7 @@ void AggregatingTransform::initGenerate()
{
/// Just a reasonable constant, matches default value for the setting `preferred_block_size_bytes`
static constexpr size_t oneMB = 1024 * 1024;
return std::make_shared<SimpleSquashingTransform>(header, params->params.max_block_size, oneMB);
return std::make_shared<SimpleSquashingChunksTransform>(header, params->params.max_block_size, oneMB);
});
}
/// AggregatingTransform::expandPipeline expects single output port.

View File

@ -7,6 +7,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}
SquashingTransform::SquashingTransform(
@ -56,53 +57,170 @@ void SquashingTransform::work()
}
}
SimpleSquashingTransform::SimpleSquashingTransform(
SimpleSquashingChunksTransform::SimpleSquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ISimpleTransform(header, header, false)
, squashing(header, min_block_size_rows, min_block_size_bytes)
: IInflatingTransform(header, header), squashing(min_block_size_rows, min_block_size_bytes)
{
}
void SimpleSquashingTransform::transform(Chunk & chunk)
void SimpleSquashingChunksTransform::consume(Chunk chunk)
{
if (!finished)
{
Chunk planned_chunk = squashing.add(std::move(chunk));
if (planned_chunk.hasChunkInfo())
chunk = DB::Squashing::squash(std::move(planned_chunk));
}
else
{
if (chunk.hasRows())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost");
chunk = squashing.flush();
if (chunk.hasChunkInfo())
chunk = DB::Squashing::squash(std::move(chunk));
}
Block current_block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
squashed_chunk.setColumns(current_block.getColumns(), current_block.rows());
}
IProcessor::Status SimpleSquashingTransform::prepare()
Chunk SimpleSquashingChunksTransform::generate()
{
if (!finished && input.isFinished())
if (squashed_chunk.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in SimpleSquashingChunksTransform");
return std::move(squashed_chunk);
}
bool SimpleSquashingChunksTransform::canGenerate()
{
return !squashed_chunk.empty();
}
Chunk SimpleSquashingChunksTransform::getRemaining()
{
Block current_block = squashing.add({});
squashed_chunk.setColumns(current_block.getColumns(), current_block.rows());
return std::move(squashed_chunk);
}
SquashingLegacy::SquashingLegacy(size_t min_block_size_rows_, size_t min_block_size_bytes_)
: min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
{
}
Block SquashingLegacy::add(Block && input_block)
{
return addImpl<Block &&>(std::move(input_block));
}
Block SquashingLegacy::add(const Block & input_block)
{
return addImpl<const Block &>(input_block);
}
/*
* To minimize copying, accept two types of argument: const reference for output
* stream, and rvalue reference for input stream, and decide whether to copy
* inside this function. This allows us not to copy Block unless we absolutely
* have to.
*/
template <typename ReferenceType>
Block SquashingLegacy::addImpl(ReferenceType input_block)
{
/// End of input stream.
if (!input_block)
{
if (output.isFinished())
return Status::Finished;
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
if (!output.canPush())
return Status::PortFull;
if (has_output)
/// Just read block is already enough.
if (isEnoughSize(input_block))
{
/// If no accumulated data, return just read block.
if (!accumulated_block)
{
output.pushData(std::move(output_data));
has_output = false;
return Status::PortFull;
return std::move(input_block);
}
finished = true;
/// On the next call to transform() we will return all data buffered in `squashing` (if any)
return Status::Ready;
/// Return accumulated data (maybe it has small size) and place new block to accumulated data.
Block to_return = std::move(input_block);
std::swap(to_return, accumulated_block);
return to_return;
}
return ISimpleTransform::prepare();
/// Accumulated block is already enough.
if (isEnoughSize(accumulated_block))
{
/// Return accumulated data and place new block to accumulated data.
Block to_return = std::move(input_block);
std::swap(to_return, accumulated_block);
return to_return;
}
append<ReferenceType>(std::move(input_block));
if (isEnoughSize(accumulated_block))
{
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
/// Squashed block is not ready.
return {};
}
template <typename ReferenceType>
void SquashingLegacy::append(ReferenceType input_block)
{
if (!accumulated_block)
{
accumulated_block = std::move(input_block);
return;
}
assert(blocksHaveEqualStructure(input_block, accumulated_block));
try
{
for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i)
{
const auto source_column = input_block.getByPosition(i).column;
auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column));
mutable_column->insertRangeFrom(*source_column, 0, source_column->size());
accumulated_block.getByPosition(i).column = std::move(mutable_column);
}
}
catch (...)
{
/// add() may be called again even after a previous add() threw an exception.
/// Keep accumulated_block in a valid state.
/// Seems ok to discard accumulated data because we're throwing an exception, which the caller will
/// hopefully interpret to mean "this block and all *previous* blocks are potentially lost".
accumulated_block.clear();
throw;
}
}
bool SquashingLegacy::isEnoughSize(const Block & block)
{
size_t rows = 0;
size_t bytes = 0;
for (const auto & [column, type, name] : block)
{
if (!column)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid column in block.");
if (!rows)
rows = column->size();
else if (rows != column->size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Sizes of columns doesn't match");
bytes += column->byteSize();
}
return isEnoughSize(rows, bytes);
}
bool SquashingLegacy::isEnoughSize(size_t rows, size_t bytes) const
{
return (!min_block_size_rows && !min_block_size_bytes)
|| (min_block_size_rows && rows >= min_block_size_rows)
|| (min_block_size_bytes && bytes >= min_block_size_bytes);
}
}

View File

@ -2,6 +2,7 @@
#include <Interpreters/Squashing.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/IInflatingTransform.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Transforms/ApplySquashingTransform.h>
@ -29,22 +30,51 @@ private:
Chunk finish_chunk;
};
/// Doesn't care about propagating exceptions and thus doesn't throw LOGICAL_ERROR if the following transform closes its input port.
class SimpleSquashingTransform : public ISimpleTransform
class SquashingLegacy
{
public:
explicit SimpleSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
/// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored.
SquashingLegacy(size_t min_block_size_rows_, size_t min_block_size_bytes_);
/** Add next block and possibly returns squashed block.
* At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true.
*/
Block add(Block && block);
Block add(const Block & block);
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
Block accumulated_block;
template <typename ReferenceType>
Block addImpl(ReferenceType block);
template <typename ReferenceType>
void append(ReferenceType block);
bool isEnoughSize(const Block & block);
bool isEnoughSize(size_t rows, size_t bytes) const;
};
class SimpleSquashingChunksTransform : public IInflatingTransform
{
public:
explicit SimpleSquashingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
String getName() const override { return "SimpleSquashingTransform"; }
protected:
void transform(Chunk &) override;
IProcessor::Status prepare() override;
void consume(Chunk chunk) override;
bool canGenerate() override;
Chunk generate() override;
Chunk getRemaining() override;
private:
Squashing squashing;
bool finished = false;
SquashingLegacy squashing;
Chunk squashed_chunk;
};
}

View File

@ -290,7 +290,7 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
size_t min_block_size_rows = mutable_context->getSettingsRef().min_external_table_block_size_rows;
size_t min_block_size_bytes = mutable_context->getSettingsRef().min_external_table_block_size_bytes;
auto squashing = std::make_shared<SimpleSquashingTransform>(builder->getHeader(), min_block_size_rows, min_block_size_bytes);
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(builder->getHeader(), min_block_size_rows, min_block_size_bytes);
builder->resize(1);
builder->addTransform(std::move(squashing));