diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 9c2766ae7d6..61ad4258b90 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -46,6 +46,14 @@ inline bool isLocalImpl(const Cluster::Address & address, const Poco::Net::Socke return address.default_database.empty() && isLocalAddress(resolved_address, clickhouse_port); } +void concatInsertPath(std::string & insert_path, const std::string & dir_name) +{ + if (insert_path.empty()) + insert_path = dir_name; + else + insert_path += "," + dir_name; +} + } /// Implementation of Cluster::Address class @@ -358,9 +366,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false); - /// In case of internal_replication we will be appending names to dir_name_for_internal_replication - std::string dir_name_for_internal_replication; - std::string dir_name_for_internal_replication_with_local; + ShardInfoInsertPathForInternalReplication insert_paths; for (const auto & replica_key : replica_keys) { @@ -379,18 +385,20 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, if (internal_replication) { - auto dir_name = replica_addresses.back().toFullString(settings.use_compact_format_in_distributed_parts_names); - if (!replica_addresses.back().is_local) + /// use_compact_format=0 { - if (dir_name_for_internal_replication.empty()) - dir_name_for_internal_replication = dir_name; - else - dir_name_for_internal_replication += "," + dir_name; + auto dir_name = replica_addresses.back().toFullString(0 /* use_compact_format */); + if (!replica_addresses.back().is_local) + concatInsertPath(insert_paths.prefer_localhost_replica, dir_name); + concatInsertPath(insert_paths.no_prefer_localhost_replica, dir_name); + } + /// use_compact_format=1 + { + auto dir_name = replica_addresses.back().toFullString(1 /* use_compact_format */); + if (!replica_addresses.back().is_local) + concatInsertPath(insert_paths.prefer_localhost_replica_compact, dir_name); + concatInsertPath(insert_paths.no_prefer_localhost_replica_compact, dir_name); } - if (dir_name_for_internal_replication_with_local.empty()) - dir_name_for_internal_replication_with_local = dir_name; - else - dir_name_for_internal_replication_with_local += "," + dir_name; } } else @@ -425,8 +433,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); shards_info.push_back({ - std::move(dir_name_for_internal_replication), - std::move(dir_name_for_internal_replication_with_local), + std::move(insert_paths), current_shard_num, weight, std::move(shard_local_addresses), @@ -485,8 +492,7 @@ Cluster::Cluster(const Settings & settings, const std::vector; using AddressesWithFailover = std::vector; + /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication + /// + /// Contains different path for permutations of: + /// - prefer_localhost_replica + /// Notes with prefer_localhost_replica==0 will contains local nodes. + /// - use_compact_format_in_distributed_parts_names + /// See toFullString() + /// + /// This is cached to avoid looping by replicas in insertPathForInternalReplication(). + struct ShardInfoInsertPathForInternalReplication + { + /// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=0 + std::string prefer_localhost_replica; + /// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=0 + std::string no_prefer_localhost_replica; + /// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=1 + std::string prefer_localhost_replica_compact; + /// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=1 + std::string no_prefer_localhost_replica_compact; + }; + struct ShardInfo { public: @@ -141,13 +162,10 @@ public: size_t getLocalNodeCount() const { return local_addresses.size(); } bool hasInternalReplication() const { return has_internal_replication; } /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication - const std::string & pathForInsert(bool prefer_localhost_replica) const; + const std::string & insertPathForInternalReplication(bool prefer_localhost_replica, bool use_compact_format) const; public: - /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication && prefer_localhost_replica - std::string dir_name_for_internal_replication; - /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication && !prefer_localhost_replica - std::string dir_name_for_internal_replication_with_local; + ShardInfoInsertPathForInternalReplication insert_path_for_internal_replication; /// Number of the shard, the indexation begins with 1 UInt32 shard_num = 0; UInt32 weight = 1; diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index f08cdf76cbf..d524d0354ae 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -523,7 +523,14 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz /// Prefer insert into current instance directly writeToLocal(block, shard_info.getLocalNodeCount()); else - writeToShard(block, {shard_info.pathForInsert(settings.prefer_localhost_replica)}); + { + const auto & path = shard_info.insertPathForInternalReplication( + settings.prefer_localhost_replica, + settings.use_compact_format_in_distributed_parts_names); + if (path.empty()) + throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR); + writeToShard(block, {path}); + } } else {