Merge pull request #53005 from ClickHouse/fix-parallel-replicas-multiply-result

Parallel reading from replicas over Distributed
This commit is contained in:
Igor Nikonov 2023-08-21 01:10:53 +02:00 committed by GitHub
commit b074e44c1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 385 additions and 19 deletions

View File

@ -273,6 +273,8 @@ public:
/// Are distributed DDL Queries (ON CLUSTER Clause) allowed for this cluster
bool areDistributedDDLQueriesAllowed() const { return allow_distributed_ddl_queries; }
const String & getName() const { return name; }
private:
SlotToShard slot_to_shard;

View File

@ -28,7 +28,7 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int SUPPORT_IS_DISABLED;
extern const int LOGICAL_ERROR;
}
namespace ClusterProxy
@ -234,7 +234,8 @@ void executeQuery(
std::move(external_tables),
log,
shards,
query_info.storage_limits);
query_info.storage_limits,
query_info.getCluster()->getName());
read_from_remote->setStepDescription("Read from remote replica");
plan->addStep(std::move(read_from_remote));
@ -266,20 +267,57 @@ void executeQueryWithParallelReplicas(
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ClusterPtr & not_optimized_cluster)
{
if (not_optimized_cluster->getShardsInfo().size() != 1)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Cluster for parallel replicas should consist only from one shard");
auto shard_info = not_optimized_cluster->getShardsInfo().front();
const auto & settings = context->getSettingsRef();
ClusterPtr new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings);
auto new_context = Context::createCopy(context);
auto scalars = new_context->hasQueryContext() ? new_context->getQueryContext()->getScalars() : Scalars{};
UInt64 shard_num = 0; /// shard_num is 1-based, so 0 - no shard specified
const auto it = scalars.find("_shard_num");
if (it != scalars.end())
{
const Block & block = it->second;
const auto & column = block.safeGetByPosition(0).column;
shard_num = column->getUInt(0);
}
size_t all_replicas_count = 0;
ClusterPtr new_cluster;
/// if got valid shard_num from query initiator, then parallel replicas scope is the specified shard
/// shards are numbered in order of appearance in the cluster config
if (shard_num > 0)
{
const auto shard_count = not_optimized_cluster->getShardCount();
if (shard_num > shard_count)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Shard number is greater than shard count: shard_num={} shard_count={} cluster={}",
shard_num,
shard_count,
not_optimized_cluster->getName());
chassert(shard_count == not_optimized_cluster->getShardsAddresses().size());
LOG_DEBUG(&Poco::Logger::get("executeQueryWithParallelReplicas"), "Parallel replicas query in shard scope: shard_num={} cluster={}",
shard_num, not_optimized_cluster->getName());
const auto shard_replicas_num = not_optimized_cluster->getShardsAddresses()[shard_num - 1].size();
all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), shard_replicas_num);
/// shard_num is 1-based, but getClusterWithSingleShard expects 0-based index
new_cluster = not_optimized_cluster->getClusterWithSingleShard(shard_num - 1);
}
else
{
new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings);
all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), new_cluster->getShardCount());
}
auto all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), new_cluster->getShardCount());
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(all_replicas_count);
auto remote_plan = std::make_unique<QueryPlan>();
/// This is a little bit weird, but we construct an "empty" coordinator without
/// any specified reading/coordination method (like Default, InOrder, InReverseOrder)
@ -288,8 +326,6 @@ void executeQueryWithParallelReplicas(
/// to then tell it about the reading method we chose.
query_info.coordinator = coordinator;
auto new_context = Context::createCopy(context);
auto scalars = new_context->hasQueryContext() ? new_context->getQueryContext()->getScalars() : Scalars{};
auto external_tables = new_context->getExternalTables();
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(

View File

@ -103,7 +103,8 @@ ReadFromRemote::ReadFromRemote(
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_,
std::shared_ptr<const StorageLimitsList> storage_limits_)
std::shared_ptr<const StorageLimitsList> storage_limits_,
const String & cluster_name_)
: ISourceStep(DataStream{.header = std::move(header_)})
, shards(std::move(shards_))
, stage(stage_)
@ -116,6 +117,7 @@ ReadFromRemote::ReadFromRemote(
, storage_limits(std::move(storage_limits_))
, log(log_)
, shard_count(shard_count_)
, cluster_name(cluster_name_)
{
}
@ -234,13 +236,37 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
if (context->getParallelReplicasMode() == Context::ParallelReplicasMode::READ_TASKS)
{
if (context->getSettingsRef().cluster_for_parallel_replicas.changed)
{
const String cluster_for_parallel_replicas = context->getSettingsRef().cluster_for_parallel_replicas;
if (cluster_for_parallel_replicas != cluster_name)
LOG_INFO(log, "cluster_for_parallel_replicas has been set for the query but has no effect: {}. Distributed table cluster is used: {}",
cluster_for_parallel_replicas, cluster_name);
}
context->setSetting("cluster_for_parallel_replicas", cluster_name);
}
std::shared_ptr<RemoteQueryExecutor> remote_query_executor;
remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (context->getParallelReplicasMode() == Context::ParallelReplicasMode::READ_TASKS)
{
// when doing parallel reading from replicas (ParallelReplicasMode::READ_TASKS) on a shard:
// establish a connection to a replica on the shard, the replica will instantiate coordinator to manage parallel reading from replicas on the shard.
// The coordinator will return query result from the shard.
// Only one coordinator per shard is necessary. Therefore using PoolMode::GET_ONE to establish only one connection per shard.
// Using PoolMode::GET_MANY for this mode will(can) lead to instantiation of several coordinators (depends on max_parallel_replicas setting)
// each will execute parallel reading from replicas, so the query result will be multiplied by the number of created coordinators
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
}
else
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);

View File

@ -35,7 +35,8 @@ public:
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_,
std::shared_ptr<const StorageLimitsList> storage_limits_);
std::shared_ptr<const StorageLimitsList> storage_limits_,
const String & cluster_name_);
String getName() const override { return "ReadFromRemote"; }
@ -55,8 +56,9 @@ private:
Tables external_tables;
std::shared_ptr<const StorageLimitsList> storage_limits;
Poco::Logger * log;
UInt32 shard_count;
String cluster_name;
void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
};

View File

@ -220,7 +220,8 @@ void StorageMergeTree::read(
local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
auto cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);
String cluster_for_parallel_replicas = local_context->getSettingsRef().cluster_for_parallel_replicas;
auto cluster = local_context->getCluster(cluster_for_parallel_replicas);
Block header;

View File

@ -5157,7 +5157,9 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
{
auto table_id = getStorageID();
auto parallel_replicas_cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);
auto scalars = local_context->hasQueryContext() ? local_context->getQueryContext()->getScalars() : Scalars{};
String cluster_for_parallel_replicas = local_context->getSettingsRef().cluster_for_parallel_replicas;
auto parallel_replicas_cluster = local_context->getCluster(cluster_for_parallel_replicas);
ASTPtr modified_query_ast;
Block header;

View File

@ -176,6 +176,38 @@
</replica>
</shard-->
</test_cluster_one_shard_three_replicas_localhost>
<test_cluster_two_shard_three_replicas_localhost>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.4</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.5</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.6</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shard_three_replicas_localhost>
<test_cluster_two_shards_localhost>
<shard>
<replica>

View File

@ -0,0 +1,58 @@
<clickhouse>
<remote_servers>
<test_multiple_shards_multiple_replicas>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
<replica>
<host>n5</host>
<port>9000</port>
</replica>
<replica>
<host>n6</host>
<port>9000</port>
</replica>
</shard>
</test_multiple_shards_multiple_replicas>
<test_single_shard_multiple_replicas>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
</shard>
</test_single_shard_multiple_replicas>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,154 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
nodes = [
cluster.add_instance(
f"n{i}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
for i in (1, 2, 3, 4, 5, 6)
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def create_tables(cluster, table_name):
# create replicated tables
for node in nodes:
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
if cluster == "test_single_shard_multiple_replicas":
nodes[0].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
nodes[1].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2') ORDER BY (key)"
)
nodes[2].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3') ORDER BY (key)"
)
nodes[3].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r4') ORDER BY (key)"
)
elif cluster == "test_multiple_shards_multiple_replicas":
# shard 1
nodes[0].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
nodes[1].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2') ORDER BY (key)"
)
nodes[2].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3') ORDER BY (key)"
)
# shard 2
nodes[3].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard2/{table_name}', 'r1') ORDER BY (key)"
)
nodes[4].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard2/{table_name}', 'r2') ORDER BY (key)"
)
nodes[5].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard2/{table_name}', 'r3') ORDER BY (key)"
)
else:
raise Exception(f"Unexpected cluster: {cluster}")
# create distributed table
nodes[0].query(f"DROP TABLE IF EXISTS {table_name}_d SYNC")
nodes[0].query(
f"""
CREATE TABLE {table_name}_d AS {table_name}
Engine=Distributed(
{cluster},
currentDatabase(),
{table_name},
key
)
"""
)
# populate data
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(1000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(2000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT -number, -number FROM numbers(1000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT -number, -number FROM numbers(2000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(3)",
settings={"insert_distributed_sync": 1},
)
@pytest.mark.parametrize(
"cluster,max_parallel_replicas,prefer_localhost_replica",
[
# prefer_localhost_replica=0
pytest.param("test_single_shard_multiple_replicas", 2, 0),
pytest.param("test_single_shard_multiple_replicas", 3, 0),
pytest.param("test_single_shard_multiple_replicas", 4, 0),
pytest.param("test_single_shard_multiple_replicas", 10, 0),
# prefer_localhost_replica=1
pytest.param("test_single_shard_multiple_replicas", 2, 1),
pytest.param("test_single_shard_multiple_replicas", 3, 1),
pytest.param("test_single_shard_multiple_replicas", 4, 1),
pytest.param("test_single_shard_multiple_replicas", 10, 1),
# prefer_localhost_replica=0
pytest.param("test_multiple_shards_multiple_replicas", 2, 0),
pytest.param("test_multiple_shards_multiple_replicas", 3, 0),
pytest.param("test_multiple_shards_multiple_replicas", 4, 0),
pytest.param("test_multiple_shards_multiple_replicas", 10, 0),
# prefer_localhost_replica=1
pytest.param("test_multiple_shards_multiple_replicas", 2, 1),
pytest.param("test_multiple_shards_multiple_replicas", 3, 1),
pytest.param("test_multiple_shards_multiple_replicas", 4, 1),
pytest.param("test_multiple_shards_multiple_replicas", 10, 1),
],
)
def test_parallel_replicas_over_distributed(
start_cluster, cluster, max_parallel_replicas, prefer_localhost_replica
):
table_name = "test_table"
create_tables(cluster, table_name)
node = nodes[0]
expected_result = f"6003\t-1999\t1999\t3\n"
# w/o parallel replicas
assert (
node.query(f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d")
== expected_result
)
# parallel replicas
assert (
node.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"prefer_localhost_replica": prefer_localhost_replica,
"max_parallel_replicas": max_parallel_replicas,
"use_hedged_requests": 0,
},
)
== expected_result
)

View File

@ -0,0 +1,6 @@
-- 1 shard, 3 replicas
100 0 99 49.5
200 0 99 49.5
-- 2 shards, 3 replicas each
200 0 99 49.5
400 0 99 49.5

View File

@ -0,0 +1,47 @@
-- 1 shard
SELECT '-- 1 shard, 3 replicas';
DROP TABLE IF EXISTS test_d;
DROP TABLE IF EXISTS test;
CREATE TABLE test (id UInt64, date Date)
ENGINE = MergeTree
ORDER BY id;
CREATE TABLE IF NOT EXISTS test_d as test
ENGINE = Distributed(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), test);
insert into test select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0;
insert into test select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0;
-- 2 shards
SELECT '-- 2 shards, 3 replicas each';
DROP TABLE IF EXISTS test2_d;
DROP TABLE IF EXISTS test2;
CREATE TABLE test2 (id UInt64, date Date)
ENGINE = MergeTree
ORDER BY id;
CREATE TABLE IF NOT EXISTS test2_d as test2
ENGINE = Distributed(test_cluster_two_shard_three_replicas_localhost, currentDatabase(), test2, id);
insert into test2 select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test2_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0;
insert into test2 select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test2_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0;