Drop replicas from dirname for internal_replication=true

Under use_compact_format_in_distributed_parts_names=1 and
internal_replication=true the server encodes all replicas for the
directory name for async INSERT into Distributed, and the directory name
looks like:

    shard1_replica1,shard1_replica2,shard3_replica3

This is required for creating connections (to specific replicas only),
but in case of internal_replication=true, this can be avoided, since
this path will always includes all replicas.

This patch replaces all replicas with "_all_replicas" marker.

Note, that initial problem was that this path may overflow the NAME_MAX
if you will have more then 15 replicas, and the server will fail to
create the directory.

Also note, that changed directory name should not be a problem, since:
- empty directories will be removed since #16729
- and replicas encoded in the directory name is also supported anyway.
This commit is contained in:
Azat Khuzhin 2021-06-20 16:50:01 +03:00
parent dbb2532e88
commit e148ef739d
5 changed files with 87 additions and 51 deletions

View File

@ -179,7 +179,7 @@ String Cluster::Address::toFullString(bool use_compact_format) const
// shard_num/replica_num like in system.clusters table // shard_num/replica_num like in system.clusters table
throw Exception("shard_num/replica_num cannot be zero", ErrorCodes::LOGICAL_ERROR); throw Exception("shard_num/replica_num cannot be zero", ErrorCodes::LOGICAL_ERROR);
return "shard" + std::to_string(shard_index) + "_replica" + std::to_string(replica_index); return fmt::format("shard{}_replica{}", shard_index, replica_index);
} }
else else
{ {
@ -199,7 +199,7 @@ Cluster::Address Cluster::Address::fromFullString(const String & full_string)
const char * user_pw_end = strchr(full_string.data(), '@'); const char * user_pw_end = strchr(full_string.data(), '@');
/// parsing with the new [shard{shard_index}[_replica{replica_index}]] format /// parsing with the new shard{shard_index}[_replica{replica_index}] format
if (!user_pw_end && startsWith(full_string, "shard")) if (!user_pw_end && startsWith(full_string, "shard"))
{ {
const char * underscore = strchr(full_string.data(), '_'); const char * underscore = strchr(full_string.data(), '_');
@ -401,6 +401,9 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false); bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
ShardInfoInsertPathForInternalReplication insert_paths; ShardInfoInsertPathForInternalReplication insert_paths;
/// "_all_replicas" is a marker that will be replaced with all replicas
/// (for creating connections in the Distributed engine)
insert_paths.compact = fmt::format("shard{}_all_replicas", current_shard_num);
for (const auto & replica_key : replica_keys) for (const auto & replica_key : replica_keys)
{ {
@ -419,21 +422,11 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
if (internal_replication) if (internal_replication)
{ {
/// use_compact_format=0 auto dir_name = replica_addresses.back().toFullString(/* use_compact_format= */ false);
{
auto dir_name = replica_addresses.back().toFullString(false /* use_compact_format */);
if (!replica_addresses.back().is_local) if (!replica_addresses.back().is_local)
concatInsertPath(insert_paths.prefer_localhost_replica, dir_name); concatInsertPath(insert_paths.prefer_localhost_replica, dir_name);
concatInsertPath(insert_paths.no_prefer_localhost_replica, dir_name); concatInsertPath(insert_paths.no_prefer_localhost_replica, dir_name);
} }
/// use_compact_format=1
{
auto dir_name = replica_addresses.back().toFullString(true /* use_compact_format */);
if (!replica_addresses.back().is_local)
concatInsertPath(insert_paths.prefer_localhost_replica_compact, dir_name);
concatInsertPath(insert_paths.no_prefer_localhost_replica_compact, dir_name);
}
}
} }
else else
throw Exception("Unknown element in config: " + replica_key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); throw Exception("Unknown element in config: " + replica_key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
@ -660,17 +653,17 @@ const std::string & Cluster::ShardInfo::insertPathForInternalReplication(bool pr
const auto & paths = insert_path_for_internal_replication; const auto & paths = insert_path_for_internal_replication;
if (!use_compact_format) if (!use_compact_format)
{ {
if (prefer_localhost_replica) const auto & path = prefer_localhost_replica ? paths.prefer_localhost_replica : paths.no_prefer_localhost_replica;
return paths.prefer_localhost_replica; if (path.size() > NAME_MAX)
else {
return paths.no_prefer_localhost_replica; throw Exception(ErrorCodes::LOGICAL_ERROR,
"Path '{}' for async distributed INSERT is too long (exceed {} limit)", path, NAME_MAX);
}
return path;
} }
else else
{ {
if (prefer_localhost_replica) return paths.compact;
return paths.prefer_localhost_replica_compact;
else
return paths.no_prefer_localhost_replica_compact;
} }
} }

View File

@ -166,10 +166,8 @@ public:
std::string prefer_localhost_replica; std::string prefer_localhost_replica;
/// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=0 /// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=0
std::string no_prefer_localhost_replica; std::string no_prefer_localhost_replica;
/// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=1 /// use_compact_format_in_distributed_parts_names=1
std::string prefer_localhost_replica_compact; std::string compact;
/// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=1
std::string no_prefer_localhost_replica_compact;
}; };
struct ShardInfo struct ShardInfo

View File

@ -60,13 +60,12 @@ namespace
constexpr const std::chrono::minutes decrease_error_count_period{5}; constexpr const std::chrono::minutes decrease_error_count_period{5};
template <typename PoolFactory> template <typename PoolFactory>
ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory, Poco::Logger * log) ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory, const Cluster::ShardsInfo & shards_info, Poco::Logger * log)
{ {
ConnectionPoolPtrs pools; ConnectionPoolPtrs pools;
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it) auto make_connection = [&](const Cluster::Address & address)
{ {
Cluster::Address address = Cluster::Address::fromFullString(boost::copy_range<std::string>(*it));
try try
{ {
pools.emplace_back(factory(address)); pools.emplace_back(factory(address));
@ -76,10 +75,35 @@ namespace
if (e.code() == ErrorCodes::INCORRECT_FILE_NAME) if (e.code() == ErrorCodes::INCORRECT_FILE_NAME)
{ {
tryLogCurrentException(log); tryLogCurrentException(log);
continue; return;
} }
throw; throw;
} }
};
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const std::string & dirname = boost::copy_range<std::string>(*it);
Cluster::Address address = Cluster::Address::fromFullString(dirname);
if (address.shard_index && dirname.ends_with("_all_replicas"))
{
if (address.shard_index > shards_info.size())
{
LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name);
continue;
}
const auto & shard_info = shards_info[address.shard_index - 1];
size_t replicas = shard_info.per_replica_pools.size();
for (size_t replica_index = 1; replica_index <= replicas; ++replica_index)
{
address.replica_index = replica_index;
make_connection(address);
}
}
else
make_connection(address);
} }
return pools; return pools;
@ -420,13 +444,13 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
const auto & shards_info = cluster->getShardsInfo(); const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses(); const auto & shards_addresses = cluster->getShardsAddresses();
/// check new format shard{shard_index}_number{replica_index} /// check new format shard{shard_index}_replica{replica_index}
/// (shard_index and replica_index starts from 1) /// (shard_index and replica_index starts from 1)
if (address.shard_index != 0) if (address.shard_index != 0)
{ {
if (!address.replica_index) if (!address.replica_index)
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"Wrong replica_index ({})", address.replica_index, name); "Wrong replica_index={} ({})", address.replica_index, name);
if (address.shard_index > shards_info.size()) if (address.shard_index > shards_info.size())
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
@ -475,7 +499,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
address.secure); address.secure);
}; };
auto pools = createPoolsForAddresses(name, pool_factory, storage.log); auto pools = createPoolsForAddresses(name, pool_factory, storage.getCluster()->getShardsInfo(), storage.log);
const auto settings = storage.getContext()->getSettings(); const auto settings = storage.getContext()->getSettings();
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools, return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools,

View File

@ -1,19 +1,20 @@
<yandex> <yandex>
<remote_servers> <remote_servers>
<test_cluster> <test_cluster_internal_replication>
<shard> <shard>
<internal_replication>true</internal_replication>
<replica> <replica>
<host>not_existing</host> <host>not_existing</host>
<port>9000</port> <port>9000</port>
</replica> </replica>
</shard> </shard>
</test_cluster> </test_cluster_internal_replication>
<test_cluster_2> <test_cluster_no_internal_replication>
<node> <node>
<host>not_existing</host> <host>not_existing</host>
<port>9000</port> <port>9000</port>
</node> </node>
</test_cluster_2> </test_cluster_no_internal_replication>
</remote_servers> </remote_servers>
</yandex> </yandex>

View File

@ -1,16 +1,27 @@
import pytest # pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=line-too-long
import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/remote_servers.xml']) node = cluster.add_instance('node', main_configs=['configs/remote_servers.xml'])
cluster_param = pytest.mark.parametrize("cluster", [ cluster_param = pytest.mark.parametrize("cluster", [
('test_cluster'), ('test_cluster_internal_replication'),
('test_cluster_2'), ('test_cluster_no_internal_replication'),
]) ])
def get_dist_path(cluster, table, dist_format):
if dist_format == 0:
return f'/var/lib/clickhouse/data/test/{table}/default@not_existing:9000'
if cluster == 'test_cluster_internal_replication':
return f'/var/lib/clickhouse/data/test/{table}/shard1_all_replicas'
return f'/var/lib/clickhouse/data/test/{table}/shard1_replica1'
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
try: try:
@ -29,13 +40,16 @@ def test_single_file(started_cluster, cluster):
node.query("insert into test.distr_1 values (1, 'a'), (2, 'bb'), (3, 'ccc')", node.query("insert into test.distr_1 values (1, 'a'), (2, 'bb'), (3, 'ccc')",
settings={"use_compact_format_in_distributed_parts_names": "1"}) settings={"use_compact_format_in_distributed_parts_names": "1"})
query = "select * from file('/var/lib/clickhouse/data/test/distr_1/shard1_replica1/1.bin', 'Distributed')" path = get_dist_path(cluster, 'distr_1', 1)
query = f"select * from file('{path}/1.bin', 'Distributed')"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '1\ta\n2\tbb\n3\tccc\n' assert out == '1\ta\n2\tbb\n3\tccc\n'
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_1/shard1_replica1/1.bin');" \ query = f"""
"select * from t" create table t (x UInt64, s String) engine = File('Distributed', '{path}/1.bin');
select * from t;
"""
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '1\ta\n2\tbb\n3\tccc\n' assert out == '1\ta\n2\tbb\n3\tccc\n'
@ -54,13 +68,16 @@ def test_two_files(started_cluster, cluster):
"use_compact_format_in_distributed_parts_names": "1", "use_compact_format_in_distributed_parts_names": "1",
}) })
query = "select * from file('/var/lib/clickhouse/data/test/distr_2/shard1_replica1/{1,2,3,4}.bin', 'Distributed') order by x" path = get_dist_path(cluster, 'distr_2', 1)
query = f"select * from file('{path}/{{1,2,3,4}}.bin', 'Distributed') order by x"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n' assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n'
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_2/shard1_replica1/{1,2,3,4}.bin');" \ query = f"""
"select * from t order by x" create table t (x UInt64, s String) engine = File('Distributed', '{path}/{{1,2,3,4}}.bin');
select * from t order by x;
"""
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n' assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n'
@ -76,13 +93,16 @@ def test_single_file_old(started_cluster, cluster):
"use_compact_format_in_distributed_parts_names": "0", "use_compact_format_in_distributed_parts_names": "0",
}) })
query = "select * from file('/var/lib/clickhouse/data/test/distr_3/default@not_existing:9000/1.bin', 'Distributed')" path = get_dist_path(cluster, 'distr_3', 0)
query = f"select * from file('{path}/1.bin', 'Distributed')"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '1\ta\n2\tbb\n3\tccc\n' assert out == '1\ta\n2\tbb\n3\tccc\n'
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_3/default@not_existing:9000/1.bin');" \ query = f"""
"select * from t" create table t (x UInt64, s String) engine = File('Distributed', '{path}/1.bin');
select * from t;
"""
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query]) out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '1\ta\n2\tbb\n3\tccc\n' assert out == '1\ta\n2\tbb\n3\tccc\n'