diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 9c2766ae7d6..218502e7f43 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -46,6 +46,14 @@ inline bool isLocalImpl(const Cluster::Address & address, const Poco::Net::Socke return address.default_database.empty() && isLocalAddress(resolved_address, clickhouse_port); } +void concatInsertPath(std::string & insert_path, const std::string & dir_name) +{ + if (insert_path.empty()) + insert_path = dir_name; + else + insert_path += "," + dir_name; +} + } /// Implementation of Cluster::Address class @@ -358,9 +366,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false); - /// In case of internal_replication we will be appending names to dir_name_for_internal_replication - std::string dir_name_for_internal_replication; - std::string dir_name_for_internal_replication_with_local; + ShardInfoInsertPathForInternalReplication insert_paths; for (const auto & replica_key : replica_keys) { @@ -379,18 +385,20 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, if (internal_replication) { - auto dir_name = replica_addresses.back().toFullString(settings.use_compact_format_in_distributed_parts_names); - if (!replica_addresses.back().is_local) + /// use_compact_format=0 { - if (dir_name_for_internal_replication.empty()) - dir_name_for_internal_replication = dir_name; - else - dir_name_for_internal_replication += "," + dir_name; + auto dir_name = replica_addresses.back().toFullString(false /* use_compact_format */); + if (!replica_addresses.back().is_local) + concatInsertPath(insert_paths.prefer_localhost_replica, dir_name); + concatInsertPath(insert_paths.no_prefer_localhost_replica, dir_name); + } + /// use_compact_format=1 + { + auto dir_name = replica_addresses.back().toFullString(true /* use_compact_format */); + if (!replica_addresses.back().is_local) + concatInsertPath(insert_paths.prefer_localhost_replica_compact, dir_name); + concatInsertPath(insert_paths.no_prefer_localhost_replica_compact, dir_name); } - if (dir_name_for_internal_replication_with_local.empty()) - dir_name_for_internal_replication_with_local = dir_name; - else - dir_name_for_internal_replication_with_local += "," + dir_name; } } else @@ -425,8 +433,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); shards_info.push_back({ - std::move(dir_name_for_internal_replication), - std::move(dir_name_for_internal_replication_with_local), + std::move(insert_paths), current_shard_num, weight, std::move(shard_local_addresses), @@ -485,8 +492,7 @@ Cluster::Cluster(const Settings & settings, const std::vector; using AddressesWithFailover = std::vector; + /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication + /// + /// Contains different path for permutations of: + /// - prefer_localhost_replica + /// Notes with prefer_localhost_replica==0 will contains local nodes. + /// - use_compact_format_in_distributed_parts_names + /// See toFullString() + /// + /// This is cached to avoid looping by replicas in insertPathForInternalReplication(). + struct ShardInfoInsertPathForInternalReplication + { + /// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=0 + std::string prefer_localhost_replica; + /// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=0 + std::string no_prefer_localhost_replica; + /// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=1 + std::string prefer_localhost_replica_compact; + /// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=1 + std::string no_prefer_localhost_replica_compact; + }; + struct ShardInfo { public: @@ -141,13 +162,10 @@ public: size_t getLocalNodeCount() const { return local_addresses.size(); } bool hasInternalReplication() const { return has_internal_replication; } /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication - const std::string & pathForInsert(bool prefer_localhost_replica) const; + const std::string & insertPathForInternalReplication(bool prefer_localhost_replica, bool use_compact_format) const; public: - /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication && prefer_localhost_replica - std::string dir_name_for_internal_replication; - /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication && !prefer_localhost_replica - std::string dir_name_for_internal_replication_with_local; + ShardInfoInsertPathForInternalReplication insert_path_for_internal_replication; /// Number of the shard, the indexation begins with 1 UInt32 shard_num = 0; UInt32 weight = 1; diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index c9f4ffe8b6a..ef7c02163bb 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -524,7 +524,14 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz /// Prefer insert into current instance directly writeToLocal(block, shard_info.getLocalNodeCount()); else - writeToShard(block, {shard_info.pathForInsert(settings.prefer_localhost_replica)}); + { + const auto & path = shard_info.insertPathForInternalReplication( + settings.prefer_localhost_replica, + settings.use_compact_format_in_distributed_parts_names); + if (path.empty()) + throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR); + writeToShard(block, {path}); + } } else { diff --git a/tests/queries/0_stateless/01555_system_distribution_queue_mask.reference b/tests/queries/0_stateless/01555_system_distribution_queue_mask.reference new file mode 100644 index 00000000000..bd0eac10816 --- /dev/null +++ b/tests/queries/0_stateless/01555_system_distribution_queue_mask.reference @@ -0,0 +1,4 @@ +masked +3,"default:*@127%2E0%2E0%2E1:9000,default:*@127%2E0%2E0%2E2:9000" +no masking +1,"default@localhost:9000" diff --git a/tests/queries/0_stateless/01555_system_distribution_queue_mask.sql b/tests/queries/0_stateless/01555_system_distribution_queue_mask.sql new file mode 100644 index 00000000000..0143b8e46ed --- /dev/null +++ b/tests/queries/0_stateless/01555_system_distribution_queue_mask.sql @@ -0,0 +1,36 @@ +-- force data path with the user/pass in it +set use_compact_format_in_distributed_parts_names=0; +-- use async send even for localhost +set prefer_localhost_replica=0; + +drop table if exists dist_01555; +drop table if exists data_01555; +create table data_01555 (key Int) Engine=Null(); + +-- +-- masked +-- +SELECT 'masked'; +create table dist_01555 (key Int) Engine=Distributed(test_cluster_with_incorrect_pw, currentDatabase(), data_01555, key); + +insert into dist_01555 values (1)(2); +-- since test_cluster_with_incorrect_pw contains incorrect password ignore error +system flush distributed dist_01555; -- { serverError 516; } +select length(splitByChar('*', data_path)), replaceRegexpOne(data_path, '^.*/([^/]*)/' , '\\1') from system.distribution_queue where database = currentDatabase() and table = 'dist_01555' format CSV; + +drop table dist_01555; + +-- +-- no masking +-- +SELECT 'no masking'; +create table dist_01555 (key Int) Engine=Distributed(test_shard_localhost, currentDatabase(), data_01555, key); + +insert into dist_01555 values (1)(2); +-- since test_cluster_with_incorrect_pw contains incorrect password ignore error +system flush distributed dist_01555; +select length(splitByChar('*', data_path)), replaceRegexpOne(data_path, '^.*/([^/]*)/' , '\\1') from system.distribution_queue where database = currentDatabase() and table = 'dist_01555' format CSV; + +-- cleanup +drop table dist_01555; +drop table data_01555;