mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Apply use_compact_format_in_distributed_parts_names for each INSERT (with internal_replication)
Before this patch use_compact_format_in_distributed_parts_names was applied only from default profile (at server start) for internal_replication=1, and was ignored on INSERT.
This commit is contained in:
parent
6eda689aa3
commit
04db0834bf
@ -46,6 +46,14 @@ inline bool isLocalImpl(const Cluster::Address & address, const Poco::Net::Socke
|
||||
return address.default_database.empty() && isLocalAddress(resolved_address, clickhouse_port);
|
||||
}
|
||||
|
||||
void concatInsertPath(std::string & insert_path, const std::string & dir_name)
|
||||
{
|
||||
if (insert_path.empty())
|
||||
insert_path = dir_name;
|
||||
else
|
||||
insert_path += "," + dir_name;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// Implementation of Cluster::Address class
|
||||
@ -358,9 +366,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
|
||||
|
||||
bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
|
||||
|
||||
/// In case of internal_replication we will be appending names to dir_name_for_internal_replication
|
||||
std::string dir_name_for_internal_replication;
|
||||
std::string dir_name_for_internal_replication_with_local;
|
||||
ShardInfoInsertPathForInternalReplication insert_paths;
|
||||
|
||||
for (const auto & replica_key : replica_keys)
|
||||
{
|
||||
@ -379,18 +385,20 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
|
||||
|
||||
if (internal_replication)
|
||||
{
|
||||
auto dir_name = replica_addresses.back().toFullString(settings.use_compact_format_in_distributed_parts_names);
|
||||
if (!replica_addresses.back().is_local)
|
||||
/// use_compact_format=0
|
||||
{
|
||||
if (dir_name_for_internal_replication.empty())
|
||||
dir_name_for_internal_replication = dir_name;
|
||||
else
|
||||
dir_name_for_internal_replication += "," + dir_name;
|
||||
auto dir_name = replica_addresses.back().toFullString(0 /* use_compact_format */);
|
||||
if (!replica_addresses.back().is_local)
|
||||
concatInsertPath(insert_paths.prefer_localhost_replica, dir_name);
|
||||
concatInsertPath(insert_paths.no_prefer_localhost_replica, dir_name);
|
||||
}
|
||||
/// use_compact_format=1
|
||||
{
|
||||
auto dir_name = replica_addresses.back().toFullString(1 /* use_compact_format */);
|
||||
if (!replica_addresses.back().is_local)
|
||||
concatInsertPath(insert_paths.prefer_localhost_replica_compact, dir_name);
|
||||
concatInsertPath(insert_paths.no_prefer_localhost_replica_compact, dir_name);
|
||||
}
|
||||
if (dir_name_for_internal_replication_with_local.empty())
|
||||
dir_name_for_internal_replication_with_local = dir_name;
|
||||
else
|
||||
dir_name_for_internal_replication_with_local += "," + dir_name;
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -425,8 +433,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
|
||||
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
|
||||
|
||||
shards_info.push_back({
|
||||
std::move(dir_name_for_internal_replication),
|
||||
std::move(dir_name_for_internal_replication_with_local),
|
||||
std::move(insert_paths),
|
||||
current_shard_num,
|
||||
weight,
|
||||
std::move(shard_local_addresses),
|
||||
@ -485,8 +492,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
|
||||
|
||||
slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size());
|
||||
shards_info.push_back({
|
||||
{}, // dir_name_for_internal_replication
|
||||
{}, // dir_name_for_internal_replication_with_local
|
||||
{}, // insert_path_for_internal_replication
|
||||
current_shard_num,
|
||||
default_weight,
|
||||
std::move(shard_local_addresses),
|
||||
@ -609,22 +615,25 @@ Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector
|
||||
initMisc();
|
||||
}
|
||||
|
||||
const std::string & Cluster::ShardInfo::pathForInsert(bool prefer_localhost_replica) const
|
||||
const std::string & Cluster::ShardInfo::insertPathForInternalReplication(bool prefer_localhost_replica, bool use_compact_format) const
|
||||
{
|
||||
if (!has_internal_replication)
|
||||
throw Exception("internal_replication is not set", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (prefer_localhost_replica)
|
||||
auto & paths = insert_path_for_internal_replication;
|
||||
if (!use_compact_format)
|
||||
{
|
||||
if (dir_name_for_internal_replication.empty())
|
||||
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
|
||||
return dir_name_for_internal_replication;
|
||||
if (prefer_localhost_replica)
|
||||
return paths.prefer_localhost_replica;
|
||||
else
|
||||
return paths.no_prefer_localhost_replica;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (dir_name_for_internal_replication_with_local.empty())
|
||||
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
|
||||
return dir_name_for_internal_replication_with_local;
|
||||
if (prefer_localhost_replica)
|
||||
return paths.prefer_localhost_replica_compact;
|
||||
else
|
||||
return paths.no_prefer_localhost_replica_compact;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,6 +133,27 @@ public:
|
||||
using Addresses = std::vector<Address>;
|
||||
using AddressesWithFailover = std::vector<Addresses>;
|
||||
|
||||
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
|
||||
///
|
||||
/// Contains different path for permutations of:
|
||||
/// - prefer_localhost_replica
|
||||
/// Notes with prefer_localhost_replica==0 will contains local nodes.
|
||||
/// - use_compact_format_in_distributed_parts_names
|
||||
/// See toFullString()
|
||||
///
|
||||
/// This is cached to avoid looping by replicas in insertPathForInternalReplication().
|
||||
struct ShardInfoInsertPathForInternalReplication
|
||||
{
|
||||
/// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=0
|
||||
std::string prefer_localhost_replica;
|
||||
/// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=0
|
||||
std::string no_prefer_localhost_replica;
|
||||
/// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=1
|
||||
std::string prefer_localhost_replica_compact;
|
||||
/// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=1
|
||||
std::string no_prefer_localhost_replica_compact;
|
||||
};
|
||||
|
||||
struct ShardInfo
|
||||
{
|
||||
public:
|
||||
@ -141,13 +162,10 @@ public:
|
||||
size_t getLocalNodeCount() const { return local_addresses.size(); }
|
||||
bool hasInternalReplication() const { return has_internal_replication; }
|
||||
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
|
||||
const std::string & pathForInsert(bool prefer_localhost_replica) const;
|
||||
const std::string & insertPathForInternalReplication(bool prefer_localhost_replica, bool use_compact_format) const;
|
||||
|
||||
public:
|
||||
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication && prefer_localhost_replica
|
||||
std::string dir_name_for_internal_replication;
|
||||
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication && !prefer_localhost_replica
|
||||
std::string dir_name_for_internal_replication_with_local;
|
||||
ShardInfoInsertPathForInternalReplication insert_path_for_internal_replication;
|
||||
/// Number of the shard, the indexation begins with 1
|
||||
UInt32 shard_num = 0;
|
||||
UInt32 weight = 1;
|
||||
|
@ -523,7 +523,14 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz
|
||||
/// Prefer insert into current instance directly
|
||||
writeToLocal(block, shard_info.getLocalNodeCount());
|
||||
else
|
||||
writeToShard(block, {shard_info.pathForInsert(settings.prefer_localhost_replica)});
|
||||
{
|
||||
const auto & path = shard_info.insertPathForInternalReplication(
|
||||
settings.prefer_localhost_replica,
|
||||
settings.use_compact_format_in_distributed_parts_names);
|
||||
if (path.empty())
|
||||
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
|
||||
writeToShard(block, {path});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user