Merge pull request #62451 from nickitat/fix_simple_squashing_transform

Fix SimpleSquashingChunksTransform
This commit is contained in:
Nikita Taranov 2024-04-12 12:56:06 +00:00 committed by GitHub
commit cab7e760e6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 30 additions and 0 deletions

View File

@ -3,6 +3,11 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
SquashingChunksTransform::SquashingChunksTransform( SquashingChunksTransform::SquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ExceptionKeepingTransform(header, header, false) : ExceptionKeepingTransform(header, header, false)
@ -64,6 +69,9 @@ void SimpleSquashingChunksTransform::transform(Chunk & chunk)
} }
else else
{ {
if (chunk.hasRows())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost");
auto block = squashing.add({}); auto block = squashing.add({});
chunk.setColumns(block.getColumns(), block.rows()); chunk.setColumns(block.getColumns(), block.rows());
} }
@ -73,7 +81,21 @@ IProcessor::Status SimpleSquashingChunksTransform::prepare()
{ {
if (!finished && input.isFinished()) if (!finished && input.isFinished())
{ {
if (output.isFinished())
return Status::Finished;
if (!output.canPush())
return Status::PortFull;
if (has_output)
{
output.pushData(std::move(output_data));
has_output = false;
return Status::PortFull;
}
finished = true; finished = true;
/// On the next call to transform() we will return all data buffered in `squashing` (if any)
return Status::Ready; return Status::Ready;
} }
return ISimpleTransform::prepare(); return ISimpleTransform::prepare();

View File

@ -0,0 +1,2 @@
17747796
17747796

View File

@ -0,0 +1,6 @@
-- Tags: global
set allow_prefetched_read_pool_for_remote_filesystem=0, merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=0, max_threads=2, max_block_size=65387;
SELECT sum(UserID GLOBAL IN (SELECT UserID FROM remote('127.0.0.{1,2}', test.hits))) FROM remote('127.0.0.{1,2}', test.hits);
SELECT sum(UserID GLOBAL IN (SELECT UserID FROM test.hits)) FROM remote('127.0.0.{1,2}', test.hits);