diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 42d4b9d05a1..2fb5d7afcbd 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -179,7 +179,7 @@ String Cluster::Address::toFullString(bool use_compact_format) const // shard_num/replica_num like in system.clusters table throw Exception("shard_num/replica_num cannot be zero", ErrorCodes::LOGICAL_ERROR); - return "shard" + std::to_string(shard_index) + "_replica" + std::to_string(replica_index); + return fmt::format("shard{}_replica{}", shard_index, replica_index); } else { @@ -199,7 +199,7 @@ Cluster::Address Cluster::Address::fromFullString(const String & full_string) const char * user_pw_end = strchr(full_string.data(), '@'); - /// parsing with the new [shard{shard_index}[_replica{replica_index}]] format + /// parsing with the new shard{shard_index}[_replica{replica_index}] format if (!user_pw_end && startsWith(full_string, "shard")) { const char * underscore = strchr(full_string.data(), '_'); @@ -401,6 +401,9 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false); ShardInfoInsertPathForInternalReplication insert_paths; + /// "_all_replicas" is a marker that will be replaced with all replicas + /// (for creating connections in the Distributed engine) + insert_paths.compact = fmt::format("shard{}_all_replicas", current_shard_num); for (const auto & replica_key : replica_keys) { @@ -419,20 +422,10 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, if (internal_replication) { - /// use_compact_format=0 - { - auto dir_name = replica_addresses.back().toFullString(false /* use_compact_format */); - if (!replica_addresses.back().is_local) - concatInsertPath(insert_paths.prefer_localhost_replica, dir_name); - concatInsertPath(insert_paths.no_prefer_localhost_replica, dir_name); - } - /// use_compact_format=1 - { - auto dir_name = replica_addresses.back().toFullString(true /* use_compact_format */); - if (!replica_addresses.back().is_local) - concatInsertPath(insert_paths.prefer_localhost_replica_compact, dir_name); - concatInsertPath(insert_paths.no_prefer_localhost_replica_compact, dir_name); - } + auto dir_name = replica_addresses.back().toFullString(/* use_compact_format= */ false); + if (!replica_addresses.back().is_local) + concatInsertPath(insert_paths.prefer_localhost_replica, dir_name); + concatInsertPath(insert_paths.no_prefer_localhost_replica, dir_name); } } else @@ -660,17 +653,17 @@ const std::string & Cluster::ShardInfo::insertPathForInternalReplication(bool pr const auto & paths = insert_path_for_internal_replication; if (!use_compact_format) { - if (prefer_localhost_replica) - return paths.prefer_localhost_replica; - else - return paths.no_prefer_localhost_replica; + const auto & path = prefer_localhost_replica ? paths.prefer_localhost_replica : paths.no_prefer_localhost_replica; + if (path.size() > NAME_MAX) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Path '{}' for async distributed INSERT is too long (exceed {} limit)", path, NAME_MAX); + } + return path; } else { - if (prefer_localhost_replica) - return paths.prefer_localhost_replica_compact; - else - return paths.no_prefer_localhost_replica_compact; + return paths.compact; } } diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index 4ca5cbaa9f2..0afc43b85b2 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -166,10 +166,8 @@ public: std::string prefer_localhost_replica; /// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=0 std::string no_prefer_localhost_replica; - /// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=1 - std::string prefer_localhost_replica_compact; - /// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=1 - std::string no_prefer_localhost_replica_compact; + /// use_compact_format_in_distributed_parts_names=1 + std::string compact; }; struct ShardInfo diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index e8835132f8f..fe2e11c7b65 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -60,13 +60,12 @@ namespace constexpr const std::chrono::minutes decrease_error_count_period{5}; template - ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory, Poco::Logger * log) + ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory, const Cluster::ShardsInfo & shards_info, Poco::Logger * log) { ConnectionPoolPtrs pools; - for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it) + auto make_connection = [&](const Cluster::Address & address) { - Cluster::Address address = Cluster::Address::fromFullString(boost::copy_range(*it)); try { pools.emplace_back(factory(address)); @@ -76,10 +75,35 @@ namespace if (e.code() == ErrorCodes::INCORRECT_FILE_NAME) { tryLogCurrentException(log); - continue; + return; } throw; } + }; + + for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it) + { + const std::string & dirname = boost::copy_range(*it); + Cluster::Address address = Cluster::Address::fromFullString(dirname); + if (address.shard_index && dirname.ends_with("_all_replicas")) + { + if (address.shard_index > shards_info.size()) + { + LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name); + continue; + } + + const auto & shard_info = shards_info[address.shard_index - 1]; + size_t replicas = shard_info.per_replica_pools.size(); + + for (size_t replica_index = 1; replica_index <= replicas; ++replica_index) + { + address.replica_index = replica_index; + make_connection(address); + } + } + else + make_connection(address); } return pools; @@ -420,13 +444,13 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri const auto & shards_info = cluster->getShardsInfo(); const auto & shards_addresses = cluster->getShardsAddresses(); - /// check new format shard{shard_index}_number{replica_index} + /// check new format shard{shard_index}_replica{replica_index} /// (shard_index and replica_index starts from 1) if (address.shard_index != 0) { if (!address.replica_index) throw Exception(ErrorCodes::INCORRECT_FILE_NAME, - "Wrong replica_index ({})", address.replica_index, name); + "Wrong replica_index={} ({})", address.replica_index, name); if (address.shard_index > shards_info.size()) throw Exception(ErrorCodes::INCORRECT_FILE_NAME, @@ -475,7 +499,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri address.secure); }; - auto pools = createPoolsForAddresses(name, pool_factory, storage.log); + auto pools = createPoolsForAddresses(name, pool_factory, storage.getCluster()->getShardsInfo(), storage.log); const auto settings = storage.getContext()->getSettings(); return pools.size() == 1 ? pools.front() : std::make_shared(pools, diff --git a/tests/integration/test_distributed_format/configs/remote_servers.xml b/tests/integration/test_distributed_format/configs/remote_servers.xml index 5c86713bd78..87eaea50a8b 100644 --- a/tests/integration/test_distributed_format/configs/remote_servers.xml +++ b/tests/integration/test_distributed_format/configs/remote_servers.xml @@ -1,19 +1,20 @@ - + + true not_existing 9000 - + - + not_existing 9000 - + diff --git a/tests/integration/test_distributed_format/test.py b/tests/integration/test_distributed_format/test.py index 22054077544..d6e1cc03fa8 100644 --- a/tests/integration/test_distributed_format/test.py +++ b/tests/integration/test_distributed_format/test.py @@ -1,16 +1,27 @@ -import pytest +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=line-too-long +import pytest from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) node = cluster.add_instance('node', main_configs=['configs/remote_servers.xml']) cluster_param = pytest.mark.parametrize("cluster", [ - ('test_cluster'), - ('test_cluster_2'), + ('test_cluster_internal_replication'), + ('test_cluster_no_internal_replication'), ]) +def get_dist_path(cluster, table, dist_format): + if dist_format == 0: + return f'/var/lib/clickhouse/data/test/{table}/default@not_existing:9000' + if cluster == 'test_cluster_internal_replication': + return f'/var/lib/clickhouse/data/test/{table}/shard1_all_replicas' + return f'/var/lib/clickhouse/data/test/{table}/shard1_replica1' + + @pytest.fixture(scope="module") def started_cluster(): try: @@ -29,13 +40,16 @@ def test_single_file(started_cluster, cluster): node.query("insert into test.distr_1 values (1, 'a'), (2, 'bb'), (3, 'ccc')", settings={"use_compact_format_in_distributed_parts_names": "1"}) - query = "select * from file('/var/lib/clickhouse/data/test/distr_1/shard1_replica1/1.bin', 'Distributed')" + path = get_dist_path(cluster, 'distr_1', 1) + query = f"select * from file('{path}/1.bin', 'Distributed')" out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) assert out == '1\ta\n2\tbb\n3\tccc\n' - query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_1/shard1_replica1/1.bin');" \ - "select * from t" + query = f""" + create table t (x UInt64, s String) engine = File('Distributed', '{path}/1.bin'); + select * from t; + """ out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) assert out == '1\ta\n2\tbb\n3\tccc\n' @@ -54,13 +68,16 @@ def test_two_files(started_cluster, cluster): "use_compact_format_in_distributed_parts_names": "1", }) - query = "select * from file('/var/lib/clickhouse/data/test/distr_2/shard1_replica1/{1,2,3,4}.bin', 'Distributed') order by x" + path = get_dist_path(cluster, 'distr_2', 1) + query = f"select * from file('{path}/{{1,2,3,4}}.bin', 'Distributed') order by x" out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n' - query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_2/shard1_replica1/{1,2,3,4}.bin');" \ - "select * from t order by x" + query = f""" + create table t (x UInt64, s String) engine = File('Distributed', '{path}/{{1,2,3,4}}.bin'); + select * from t order by x; + """ out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n' @@ -76,13 +93,16 @@ def test_single_file_old(started_cluster, cluster): "use_compact_format_in_distributed_parts_names": "0", }) - query = "select * from file('/var/lib/clickhouse/data/test/distr_3/default@not_existing:9000/1.bin', 'Distributed')" + path = get_dist_path(cluster, 'distr_3', 0) + query = f"select * from file('{path}/1.bin', 'Distributed')" out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) assert out == '1\ta\n2\tbb\n3\tccc\n' - query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_3/default@not_existing:9000/1.bin');" \ - "select * from t" + query = f""" + create table t (x UInt64, s String) engine = File('Distributed', '{path}/1.bin'); + select * from t; + """ out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) assert out == '1\ta\n2\tbb\n3\tccc\n'