This commit is contained in:
Amos Bird 2024-09-19 10:44:56 +02:00 committed by GitHub
commit 9f8de81dde
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 93 additions and 3 deletions

View File

@ -19,9 +19,13 @@
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/CountingTransform.h>
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Transforms/MemoryBoundMerging.h>
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/StreamInQueryCacheTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
@ -171,8 +175,9 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
/// 1. Remote: Set counter on Remote
/// 2. Limit ... PartialSorting: Set counter on PartialSorting
/// 3. Limit ... TotalsHaving(with filter) ... Remote: Set counter on the input port of Limit
/// 4. Limit ... Remote: Set counter on Remote
/// 5. Limit ... : Set counter on the input port of Limit
/// 4. Limit ... MergingAggregated ... Remote: Set counter on the input port of Limit
/// 5. Limit ... Remote: Set counter on Remote
/// 6. Limit ... : Set counter on the input port of Limit
/// Case 1.
if ((typeid_cast<RemoteSource *>(processor) || typeid_cast<DelayedSource *>(processor)) && !limit_processor)
@ -210,6 +215,14 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
}
/// Case 4.
if (typeid_cast<MergingAggregatedTransform *>(processor) || typeid_cast<MergingAggregatedBucketTransform *>(processor)
|| typeid_cast<SortingAggregatedTransform *>(processor)
|| typeid_cast<SortingAggregatedForMemoryBoundMergingTransform *>(processor))
{
continue;
}
/// Case 5.
if (typeid_cast<RemoteSource *>(processor) || typeid_cast<DelayedSource *>(processor))
{
processors.emplace_back(processor);
@ -228,6 +241,10 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
continue;
}
/// Skip CreatingSetsTransform
if (typeid_cast<CreatingSetsTransform *>(processor))
continue;
if (limit_processor == processor)
{
ssize_t i = 0;
@ -250,7 +267,7 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
}
}
/// Case 5.
/// Case 6.
for (auto && [limit, ports] : limit_candidates)
{
/// If there are some input ports which don't have the counter, add it to LimitTransform.

View File

@ -0,0 +1,52 @@
{
"meta":
[
{
"name": "age",
"type": "Int16"
}
],
"data":
[
{
"age": 33
},
{
"age": 48
},
{
"age": 50
}
],
"rows": 3,
"rows_before_limit_at_least": 3
}
{
"meta":
[
{
"name": "service_name",
"type": "String"
}
],
"data":
[
{
"service_name": "service1"
},
{
"service_name": "service2"
},
{
"service_name": "service3"
}
],
"rows": 3,
"rows_before_limit_at_least": 3
}

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS users;
CREATE TABLE users (uid Int16, name String, age Int16) ENGINE=MergeTree ORDER BY uid;
INSERT INTO users VALUES (1231, 'John', 33),(6666, 'John', 48), (8888, 'John', 50);
SELECT age FROM remote('127.0.0.{2,3}', currentDatabase(), users) GROUP BY age LIMIT 20 FORMAT JSON SETTINGS output_format_write_statistics=0;
DROP TABLE users;
DROP TABLE IF EXISTS test_rows_count_bug_local;
CREATE TABLE test_rows_count_bug_local (id UUID DEFAULT generateUUIDv4(), service_name String, path String) ENGINE=MergeTree ORDER BY tuple();
INSERT INTO test_rows_count_bug_local (service_name, path) VALUES ('service1', '/foo/1'), ('service1', '/foo/2'), ('service2', '/foo/3'), ('service2', '/foo/4'), ('service3', '/foo/5');
SELECT service_name FROM test_rows_count_bug_local
WHERE id global in (select id from remote('127.0.0.{2,3}', currentDatabase(), test_rows_count_bug_local))
GROUP BY service_name ORDER BY service_name limit 20 FORMAT JSON SETTINGS output_format_write_statistics=0;
DROP TABLE test_rows_count_bug_local;