Handle prefer_localhost_replica on INSERT into Distributed

Right now it will issue remote send even if finally the local replica
will be selected - not good I guess.

This should also fix load_balancing.
This commit is contained in:
Azat Khuzhin 2020-05-08 01:31:31 +03:00
parent c811e1f0d0
commit 085bafad05

View File

@ -275,12 +275,12 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
}
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
const Settings & settings = context.getSettingsRef();
if (!job.is_local_job)
if (!job.is_local_job || !settings.prefer_localhost_replica)
{
if (!job.stream)
{
const Settings & settings = context.getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
if (shard_info.hasInternalReplication())
{
@ -318,7 +318,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
job.stream->write(shard_block);
}
else
else // local
{
if (!job.stream)
{
@ -507,10 +507,11 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context.getSettingsRef();
if (shard_info.hasInternalReplication())
{
if (shard_info.getLocalNodeCount() > 0)
if (shard_info.getLocalNodeCount() > 0 && settings.prefer_localhost_replica)
{
/// Prefer insert into current instance directly
writeToLocal(block, shard_info.getLocalNodeCount());
@ -531,7 +532,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
if (!address.is_local)
dir_names.push_back(address.toFullString(context.getSettingsRef().use_compact_format_in_distributed_parts_names));
dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names));
if (!dir_names.empty())
writeToShard(block, dir_names);