Fix rows_before_limit_at_least for DelayedSource.

This commit is contained in:
Nikolai Kochetov 2023-08-31 12:53:58 +00:00
parent a1024813ec
commit 52a7206002
5 changed files with 33 additions and 2 deletions

View File

@ -133,6 +133,12 @@ void DelayedSource::work()
processors = Pipe::detachProcessors(std::move(pipe));
if (rows_before_limit)
{
for (auto & processor : processors)
processor->setRowsBeforeLimitCounter(rows_before_limit);
}
synchronizePorts(totals_output, totals, header, processors);
synchronizePorts(extremes_output, extremes, header, processors);
}

View File

@ -30,10 +30,13 @@ public:
OutputPort * getTotalsPort() { return totals; }
OutputPort * getExtremesPort() { return extremes; }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit.swap(counter); }
private:
QueryPlanResourceHolder resources;
Creator creator;
Processors processors;
RowsBeforeLimitCounterPtr rows_before_limit;
/// Outputs for DelayedSource.
OutputPort * main = nullptr;

View File

@ -12,6 +12,7 @@
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Sources/DelayedSource.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/SourceFromChunks.h>
@ -164,7 +165,7 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
/// 5. Limit ... : Set counter on the input port of Limit
/// Case 1.
if (typeid_cast<RemoteSource *>(processor) && !limit_processor)
if ((typeid_cast<RemoteSource *>(processor) || typeid_cast<DelayedSource *>(processor)) && !limit_processor)
{
processors.emplace_back(processor);
continue;
@ -199,7 +200,7 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
}
/// Case 4.
if (typeid_cast<RemoteSource *>(processor))
if (typeid_cast<RemoteSource *>(processor) || typeid_cast<DelayedSource *>(processor))
{
processors.emplace_back(processor);
limit_candidates[limit_processor].push_back(limit_input_port);

View File

@ -5,3 +5,23 @@
3
3
1
{
"meta":
[
{
"name": "sum(a)",
"type": "Int64"
}
],
"data":
[
{
"sum(a)": "1"
}
],
"rows": 1,
"rows_before_limit_at_least": 2
}

View File

@ -10,6 +10,7 @@ SYSTEM ENABLE FAILPOINT use_delayed_remote_source;
SELECT sum(a) FROM remote('127.0.0.4', currentDatabase(), '02863_delayed_source') WITH TOTALS SETTINGS extremes = 1;
SELECT max(explain like '%Delayed%') FROM (EXPLAIN PIPELINE graph=1 SELECT sum(a) FROM remote('127.0.0.4', currentDatabase(), '02863_delayed_source') WITH TOTALS SETTINGS extremes = 1);
SELECT sum(a) FROM remote('127.0.0.4', currentDatabase(), '02863_delayed_source') GROUP BY a ORDER BY a LIMIT 1 FORMAT JSON settings output_format_write_statistics=0;
SYSTEM DISABLE FAILPOINT use_delayed_remote_source;