Merge pull request #16788 from azat/fix-use_compact_format_in_distributed_parts_names

Apply use_compact_format_in_distributed_parts_names for each INSERT (with internal_replication)
This commit is contained in:
alexey-milovidov 2020-11-08 23:23:10 +03:00 committed by GitHub
commit f39457bc77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 105 additions and 31 deletions

View File

@ -46,6 +46,14 @@ inline bool isLocalImpl(const Cluster::Address & address, const Poco::Net::Socke
return address.default_database.empty() && isLocalAddress(resolved_address, clickhouse_port);
}
void concatInsertPath(std::string & insert_path, const std::string & dir_name)
{
if (insert_path.empty())
insert_path = dir_name;
else
insert_path += "," + dir_name;
}
}
/// Implementation of Cluster::Address class
@ -358,9 +366,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
/// In case of internal_replication we will be appending names to dir_name_for_internal_replication
std::string dir_name_for_internal_replication;
std::string dir_name_for_internal_replication_with_local;
ShardInfoInsertPathForInternalReplication insert_paths;
for (const auto & replica_key : replica_keys)
{
@ -379,18 +385,20 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
if (internal_replication)
{
auto dir_name = replica_addresses.back().toFullString(settings.use_compact_format_in_distributed_parts_names);
if (!replica_addresses.back().is_local)
/// use_compact_format=0
{
if (dir_name_for_internal_replication.empty())
dir_name_for_internal_replication = dir_name;
else
dir_name_for_internal_replication += "," + dir_name;
auto dir_name = replica_addresses.back().toFullString(false /* use_compact_format */);
if (!replica_addresses.back().is_local)
concatInsertPath(insert_paths.prefer_localhost_replica, dir_name);
concatInsertPath(insert_paths.no_prefer_localhost_replica, dir_name);
}
/// use_compact_format=1
{
auto dir_name = replica_addresses.back().toFullString(true /* use_compact_format */);
if (!replica_addresses.back().is_local)
concatInsertPath(insert_paths.prefer_localhost_replica_compact, dir_name);
concatInsertPath(insert_paths.no_prefer_localhost_replica_compact, dir_name);
}
if (dir_name_for_internal_replication_with_local.empty())
dir_name_for_internal_replication_with_local = dir_name;
else
dir_name_for_internal_replication_with_local += "," + dir_name;
}
}
else
@ -425,8 +433,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
shards_info.push_back({
std::move(dir_name_for_internal_replication),
std::move(dir_name_for_internal_replication_with_local),
std::move(insert_paths),
current_shard_num,
weight,
std::move(shard_local_addresses),
@ -485,8 +492,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size());
shards_info.push_back({
{}, // dir_name_for_internal_replication
{}, // dir_name_for_internal_replication_with_local
{}, // insert_path_for_internal_replication
current_shard_num,
default_weight,
std::move(shard_local_addresses),
@ -609,22 +615,25 @@ Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector
initMisc();
}
const std::string & Cluster::ShardInfo::pathForInsert(bool prefer_localhost_replica) const
const std::string & Cluster::ShardInfo::insertPathForInternalReplication(bool prefer_localhost_replica, bool use_compact_format) const
{
if (!has_internal_replication)
throw Exception("internal_replication is not set", ErrorCodes::LOGICAL_ERROR);
if (prefer_localhost_replica)
const auto & paths = insert_path_for_internal_replication;
if (!use_compact_format)
{
if (dir_name_for_internal_replication.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
return dir_name_for_internal_replication;
if (prefer_localhost_replica)
return paths.prefer_localhost_replica;
else
return paths.no_prefer_localhost_replica;
}
else
{
if (dir_name_for_internal_replication_with_local.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
return dir_name_for_internal_replication_with_local;
if (prefer_localhost_replica)
return paths.prefer_localhost_replica_compact;
else
return paths.no_prefer_localhost_replica_compact;
}
}

View File

@ -133,6 +133,27 @@ public:
using Addresses = std::vector<Address>;
using AddressesWithFailover = std::vector<Addresses>;
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
///
/// Contains different path for permutations of:
/// - prefer_localhost_replica
/// Notes with prefer_localhost_replica==0 will contains local nodes.
/// - use_compact_format_in_distributed_parts_names
/// See toFullString()
///
/// This is cached to avoid looping by replicas in insertPathForInternalReplication().
struct ShardInfoInsertPathForInternalReplication
{
/// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=0
std::string prefer_localhost_replica;
/// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=0
std::string no_prefer_localhost_replica;
/// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=1
std::string prefer_localhost_replica_compact;
/// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=1
std::string no_prefer_localhost_replica_compact;
};
struct ShardInfo
{
public:
@ -141,13 +162,10 @@ public:
size_t getLocalNodeCount() const { return local_addresses.size(); }
bool hasInternalReplication() const { return has_internal_replication; }
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
const std::string & pathForInsert(bool prefer_localhost_replica) const;
const std::string & insertPathForInternalReplication(bool prefer_localhost_replica, bool use_compact_format) const;
public:
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication && prefer_localhost_replica
std::string dir_name_for_internal_replication;
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication && !prefer_localhost_replica
std::string dir_name_for_internal_replication_with_local;
ShardInfoInsertPathForInternalReplication insert_path_for_internal_replication;
/// Number of the shard, the indexation begins with 1
UInt32 shard_num = 0;
UInt32 weight = 1;

View File

@ -524,7 +524,14 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const siz
/// Prefer insert into current instance directly
writeToLocal(block, shard_info.getLocalNodeCount());
else
writeToShard(block, {shard_info.pathForInsert(settings.prefer_localhost_replica)});
{
const auto & path = shard_info.insertPathForInternalReplication(
settings.prefer_localhost_replica,
settings.use_compact_format_in_distributed_parts_names);
if (path.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
writeToShard(block, {path});
}
}
else
{

View File

@ -0,0 +1,4 @@
masked
3,"default:*@127%2E0%2E0%2E1:9000,default:*@127%2E0%2E0%2E2:9000"
no masking
1,"default@localhost:9000"

View File

@ -0,0 +1,36 @@
-- force data path with the user/pass in it
set use_compact_format_in_distributed_parts_names=0;
-- use async send even for localhost
set prefer_localhost_replica=0;
drop table if exists dist_01555;
drop table if exists data_01555;
create table data_01555 (key Int) Engine=Null();
--
-- masked
--
SELECT 'masked';
create table dist_01555 (key Int) Engine=Distributed(test_cluster_with_incorrect_pw, currentDatabase(), data_01555, key);
insert into dist_01555 values (1)(2);
-- since test_cluster_with_incorrect_pw contains incorrect password ignore error
system flush distributed dist_01555; -- { serverError 516; }
select length(splitByChar('*', data_path)), replaceRegexpOne(data_path, '^.*/([^/]*)/' , '\\1') from system.distribution_queue where database = currentDatabase() and table = 'dist_01555' format CSV;
drop table dist_01555;
--
-- no masking
--
SELECT 'no masking';
create table dist_01555 (key Int) Engine=Distributed(test_shard_localhost, currentDatabase(), data_01555, key);
insert into dist_01555 values (1)(2);
-- since test_cluster_with_incorrect_pw contains incorrect password ignore error
system flush distributed dist_01555;
select length(splitByChar('*', data_path)), replaceRegexpOne(data_path, '^.*/([^/]*)/' , '\\1') from system.distribution_queue where database = currentDatabase() and table = 'dist_01555' format CSV;
-- cleanup
drop table dist_01555;
drop table data_01555;