From 085bafad0523ccc1ba18a99235dae3de007543b6 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 8 May 2020 01:31:31 +0300 Subject: [PATCH] Handle prefer_localhost_replica on INSERT into Distributed Right now it will issue remote send even if finally the local replica will be selected - not good I guess. This should also fix load_balancing. --- .../Distributed/DistributedBlockOutputStream.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index ae0af5f9cf4..e8c45826a35 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -275,12 +275,12 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp } const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block; + const Settings & settings = context.getSettingsRef(); - if (!job.is_local_job) + if (!job.is_local_job || !settings.prefer_localhost_replica) { if (!job.stream) { - const Settings & settings = context.getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); if (shard_info.hasInternalReplication()) { @@ -318,7 +318,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; job.stream->write(shard_block); } - else + else // local { if (!job.stream) { @@ -507,10 +507,11 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block) void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id) { const auto & shard_info = cluster->getShardsInfo()[shard_id]; + const auto & settings = context.getSettingsRef(); if (shard_info.hasInternalReplication()) { - if (shard_info.getLocalNodeCount() > 0) + if (shard_info.getLocalNodeCount() > 0 && settings.prefer_localhost_replica) { /// Prefer insert into current instance directly writeToLocal(block, shard_info.getLocalNodeCount()); @@ -531,7 +532,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz std::vector dir_names; for (const auto & address : cluster->getShardsAddresses()[shard_id]) if (!address.is_local) - dir_names.push_back(address.toFullString(context.getSettingsRef().use_compact_format_in_distributed_parts_names)); + dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names)); if (!dir_names.empty()) writeToShard(block, dir_names);