Merge pull request #53644 from ClickHouse/fix-delayed-source-with-totals

Correctly handle totals and extremes with `DelayedSource`
This commit is contained in:
Antonio Andelic 2023-08-22 12:24:16 +02:00 committed by GitHub
commit e0af6e5879
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 1 deletions

View File

@ -33,6 +33,7 @@ static struct InitFiu
#define APPLY_FOR_FAILPOINTS(ONCE, REGULAR, PAUSEABLE_ONCE, PAUSEABLE) \ #define APPLY_FOR_FAILPOINTS(ONCE, REGULAR, PAUSEABLE_ONCE, PAUSEABLE) \
ONCE(replicated_merge_tree_commit_zk_fail_after_op) \ ONCE(replicated_merge_tree_commit_zk_fail_after_op) \
REGULAR(use_delayed_remote_source) \
REGULAR(dummy_failpoint) \ REGULAR(dummy_failpoint) \
PAUSEABLE_ONCE(dummy_pausable_failpoint_once) \ PAUSEABLE_ONCE(dummy_pausable_failpoint_once) \
PAUSEABLE(dummy_pausable_failpoint) PAUSEABLE(dummy_pausable_failpoint)

View File

@ -14,6 +14,7 @@
#include <Client/IConnections.h> #include <Client/IConnections.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/FailPoint.h>
#include <Processors/QueryPlan/QueryPlan.h> #include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromRemote.h> #include <Processors/QueryPlan/ReadFromRemote.h>
#include <Processors/QueryPlan/ExpressionStep.h> #include <Processors/QueryPlan/ExpressionStep.h>
@ -35,6 +36,11 @@ namespace ErrorCodes
extern const int ALL_REPLICAS_ARE_STALE; extern const int ALL_REPLICAS_ARE_STALE;
} }
namespace FailPoints
{
extern const char use_delayed_remote_source[];
}
namespace ClusterProxy namespace ClusterProxy
{ {
@ -134,6 +140,12 @@ void SelectStreamFactory::createForShard(
const auto & settings = context->getSettingsRef(); const auto & settings = context->getSettingsRef();
fiu_do_on(FailPoints::use_delayed_remote_source,
{
emplace_remote_stream(/*lazy=*/true, /*local_delay=*/999999);
return;
});
if (settings.prefer_localhost_replica && shard_info.isLocal()) if (settings.prefer_localhost_replica && shard_info.isLocal())
{ {
StoragePtr main_table_storage; StoragePtr main_table_storage;

View File

@ -148,6 +148,8 @@ Processors DelayedSource::expandPipeline()
inputs.emplace_back(outputs.front().getHeader(), this); inputs.emplace_back(outputs.front().getHeader(), this);
/// Connect checks that header is same for ports. /// Connect checks that header is same for ports.
connect(*output, inputs.back()); connect(*output, inputs.back());
if (output == main_output)
inputs.back().setNeeded(); inputs.back().setNeeded();
} }

View File

@ -0,0 +1,16 @@
-- Tags: no-parallel
-- Tag no-parallel: failpoint is used which can force DelayedSource on other tests
DROP TABLE IF EXISTS 02863_delayed_source;
CREATE TABLE 02863_delayed_source(a Int64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/02863_delayed_source/{replica}', 'r1') ORDER BY a;
INSERT INTO 02863_delayed_source VALUES (1), (2);
SYSTEM ENABLE FAILPOINT use_delayed_remote_source;
SELECT sum(a) FROM remote('127.0.0.4', currentDatabase(), '02863_delayed_source') WITH TOTALS SETTINGS extremes = 1;
SELECT max(explain like '%Delayed%') FROM (EXPLAIN PIPELINE graph=1 SELECT sum(a) FROM remote('127.0.0.4', currentDatabase(), '02863_delayed_source') WITH TOTALS SETTINGS extremes = 1);
SYSTEM DISABLE FAILPOINT use_delayed_remote_source;
DROP TABLE 02863_delayed_source;