Merge pull request #66380 from ClickHouse/pr-cleanup

PR cleanup: remove redundant code
This commit is contained in:
Igor Nikonov 2024-07-17 21:45:10 +00:00 committed by GitHub
commit 9810a658e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 14 additions and 26 deletions

View File

@ -54,8 +54,6 @@ public:
struct ReplicaInfo
{
bool collaborate_with_initiator{false};
size_t all_replicas_count{0};
size_t number_of_current_replica{0};
};

View File

@ -142,13 +142,12 @@ void MultiplexedConnections::sendQuery(
modified_settings.group_by_two_level_threshold = 0;
modified_settings.group_by_two_level_threshold_bytes = 0;
}
}
if (replica_info)
{
client_info.collaborate_with_initiator = true;
client_info.count_participating_replicas = replica_info->all_replicas_count;
client_info.number_of_current_replica = replica_info->number_of_current_replica;
}
if (replica_info)
{
client_info.collaborate_with_initiator = true;
client_info.number_of_current_replica = replica_info->number_of_current_replica;
}
/// FIXME: Remove once we will make `allow_experimental_analyzer` obsolete setting.

View File

@ -95,7 +95,7 @@ void ClientInfo::write(WriteBuffer & out, UInt64 server_protocol_revision) const
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS)
{
writeVarUInt(static_cast<UInt64>(collaborate_with_initiator), out);
writeVarUInt(count_participating_replicas, out);
writeVarUInt(obsolete_count_participating_replicas, out);
writeVarUInt(number_of_current_replica, out);
}
}
@ -185,7 +185,7 @@ void ClientInfo::read(ReadBuffer & in, UInt64 client_protocol_revision)
UInt64 value;
readVarUInt(value, in);
collaborate_with_initiator = static_cast<bool>(value);
readVarUInt(count_participating_replicas, in);
readVarUInt(obsolete_count_participating_replicas, in);
readVarUInt(number_of_current_replica, in);
}
}

View File

@ -127,7 +127,7 @@ public:
/// For parallel processing on replicas
bool collaborate_with_initiator{false};
UInt64 count_participating_replicas{0};
UInt64 obsolete_count_participating_replicas{0};
UInt64 number_of_current_replica{0};
enum class BackgroundOperationType : uint8_t

View File

@ -5001,13 +5001,6 @@ void Context::setConnectionClientVersion(UInt64 client_version_major, UInt64 cli
client_info.connection_tcp_protocol_version = client_tcp_protocol_version;
}
void Context::setReplicaInfo(bool collaborate_with_initiator, size_t all_replicas_count, size_t number_of_current_replica)
{
client_info.collaborate_with_initiator = collaborate_with_initiator;
client_info.count_participating_replicas = all_replicas_count;
client_info.number_of_current_replica = number_of_current_replica;
}
void Context::increaseDistributedDepth()
{
++client_info.distributed_depth;

View File

@ -699,7 +699,6 @@ public:
void setInitialQueryStartTime(std::chrono::time_point<std::chrono::system_clock> initial_query_start_time);
void setQuotaClientKey(const String & quota_key);
void setConnectionClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version);
void setReplicaInfo(bool collaborate_with_initiator, size_t all_replicas_count, size_t number_of_current_replica);
void increaseDistributedDepth();
const OpenTelemetry::TracingContext & getClientTraceContext() const { return client_info.client_trace_context; }
OpenTelemetry::TracingContext & getClientTraceContext() { return client_info.client_trace_context; }

View File

@ -412,8 +412,8 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
const auto & shard = cluster->getShardsInfo().at(0);
size_t all_replicas_count = current_settings.max_parallel_replicas;
if (all_replicas_count > shard.getAllNodeCount())
size_t max_replicas_to_use = current_settings.max_parallel_replicas;
if (max_replicas_to_use > shard.getAllNodeCount())
{
LOG_INFO(
getLogger("ReadFromParallelRemoteReplicasStep"),
@ -421,14 +421,14 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
"Will use the latter number to execute the query.",
current_settings.max_parallel_replicas,
shard.getAllNodeCount());
all_replicas_count = shard.getAllNodeCount();
max_replicas_to_use = shard.getAllNodeCount();
}
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> shuffled_pool;
if (all_replicas_count < shard.getAllNodeCount())
if (max_replicas_to_use < shard.getAllNodeCount())
{
shuffled_pool = shard.pool->getShuffledPools(current_settings);
shuffled_pool.resize(all_replicas_count);
shuffled_pool.resize(max_replicas_to_use);
}
else
{
@ -438,11 +438,10 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func);
}
for (size_t i=0; i < all_replicas_count; ++i)
for (size_t i=0; i < max_replicas_to_use; ++i)
{
IConnections::ReplicaInfo replica_info
{
.all_replicas_count = all_replicas_count,
/// we should use this number specifically because efficiency of data distribution by consistent hash depends on it.
.number_of_current_replica = i,
};