Fix SimpleSquashingTransform

This commit is contained in:
vdimir 2024-07-06 15:07:35 +00:00
parent 4eeb59ec3f
commit 9dc52217f4
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
4 changed files with 35 additions and 50 deletions

View File

@ -783,7 +783,7 @@ void AggregatingTransform::initGenerate()
{
/// Just a reasonable constant, matches default value for the setting `preferred_block_size_bytes`
static constexpr size_t oneMB = 1024 * 1024;
return std::make_shared<SimpleSquashingTransform>(header, params->params.max_block_size, oneMB);
return std::make_shared<SimpleSquashingChunksTransform>(header, params->params.max_block_size, oneMB);
});
}
/// AggregatingTransform::expandPipeline expects single output port.

View File

@ -56,53 +56,37 @@ void SquashingTransform::work()
}
}
SimpleSquashingTransform::SimpleSquashingTransform(
SimpleSquashingChunksTransform::SimpleSquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ISimpleTransform(header, header, false)
, squashing(header, min_block_size_rows, min_block_size_bytes)
: IInflatingTransform(header, header), squashing(min_block_size_rows, min_block_size_bytes)
{
}
void SimpleSquashingTransform::transform(Chunk & chunk)
void SimpleSquashingChunksTransform::consume(Chunk chunk)
{
if (!finished)
{
Chunk planned_chunk = squashing.add(std::move(chunk));
if (planned_chunk.hasChunkInfo())
chunk = DB::Squashing::squash(std::move(planned_chunk));
}
else
{
if (chunk.hasRows())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost");
chunk = squashing.flush();
if (chunk.hasChunkInfo())
chunk = DB::Squashing::squash(std::move(chunk));
}
Block current_block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
squashed_chunk.setColumns(current_block.getColumns(), current_block.rows());
}
IProcessor::Status SimpleSquashingTransform::prepare()
Chunk SimpleSquashingChunksTransform::generate()
{
if (!finished && input.isFinished())
{
if (output.isFinished())
return Status::Finished;
if (squashed_chunk.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in SimpleSquashingChunksTransform");
if (!output.canPush())
return Status::PortFull;
if (has_output)
{
output.pushData(std::move(output_data));
has_output = false;
return Status::PortFull;
}
finished = true;
/// On the next call to transform() we will return all data buffered in `squashing` (if any)
return Status::Ready;
}
return ISimpleTransform::prepare();
return std::move(squashed_chunk);
}
bool SimpleSquashingChunksTransform::canGenerate()
{
return !squashed_chunk.empty();
}
Chunk SimpleSquashingChunksTransform::getRemaining()
{
Block current_block = squashing.add({});
squashed_chunk.setColumns(current_block.getColumns(), current_block.rows());
return std::move(squashed_chunk);
}
}

View File

@ -2,6 +2,7 @@
#include <Interpreters/Squashing.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/IInflatingTransform.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Transforms/ApplySquashingTransform.h>
@ -29,22 +30,22 @@ private:
Chunk finish_chunk;
};
/// Doesn't care about propagating exceptions and thus doesn't throw LOGICAL_ERROR if the following transform closes its input port.
class SimpleSquashingTransform : public ISimpleTransform
class SimpleSquashingChunksTransform : public IInflatingTransform
{
public:
explicit SimpleSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
explicit SimpleSquashingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
String getName() const override { return "SimpleSquashingTransform"; }
protected:
void transform(Chunk &) override;
IProcessor::Status prepare() override;
void consume(Chunk chunk) override;
bool canGenerate() override;
Chunk generate() override;
Chunk getRemaining() override;
private:
Squashing squashing;
bool finished = false;
SquashingTransform squashing;
Chunk squashed_chunk;
};
}

View File

@ -290,7 +290,7 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
size_t min_block_size_rows = mutable_context->getSettingsRef().min_external_table_block_size_rows;
size_t min_block_size_bytes = mutable_context->getSettingsRef().min_external_table_block_size_bytes;
auto squashing = std::make_shared<SimpleSquashingTransform>(builder->getHeader(), min_block_size_rows, min_block_size_bytes);
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(builder->getHeader(), min_block_size_rows, min_block_size_bytes);
builder->resize(1);
builder->addTransform(std::move(squashing));