Merge pull request #25513 from azat/use_compact_format_in_distributed_parts_names-2

Drop replicas from dirname for internal_replication=true
This commit is contained in:
alexey-milovidov 2021-06-23 22:51:07 +03:00 committed by GitHub
commit 99f60a7c1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 87 additions and 51 deletions

View File

@ -179,7 +179,7 @@ String Cluster::Address::toFullString(bool use_compact_format) const
// shard_num/replica_num like in system.clusters table
throw Exception("shard_num/replica_num cannot be zero", ErrorCodes::LOGICAL_ERROR);
return "shard" + std::to_string(shard_index) + "_replica" + std::to_string(replica_index);
return fmt::format("shard{}_replica{}", shard_index, replica_index);
}
else
{
@ -199,7 +199,7 @@ Cluster::Address Cluster::Address::fromFullString(const String & full_string)
const char * user_pw_end = strchr(full_string.data(), '@');
/// parsing with the new [shard{shard_index}[_replica{replica_index}]] format
/// parsing with the new shard{shard_index}[_replica{replica_index}] format
if (!user_pw_end && startsWith(full_string, "shard"))
{
const char * underscore = strchr(full_string.data(), '_');
@ -401,6 +401,9 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
ShardInfoInsertPathForInternalReplication insert_paths;
/// "_all_replicas" is a marker that will be replaced with all replicas
/// (for creating connections in the Distributed engine)
insert_paths.compact = fmt::format("shard{}_all_replicas", current_shard_num);
for (const auto & replica_key : replica_keys)
{
@ -419,20 +422,10 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
if (internal_replication)
{
/// use_compact_format=0
{
auto dir_name = replica_addresses.back().toFullString(false /* use_compact_format */);
if (!replica_addresses.back().is_local)
concatInsertPath(insert_paths.prefer_localhost_replica, dir_name);
concatInsertPath(insert_paths.no_prefer_localhost_replica, dir_name);
}
/// use_compact_format=1
{
auto dir_name = replica_addresses.back().toFullString(true /* use_compact_format */);
if (!replica_addresses.back().is_local)
concatInsertPath(insert_paths.prefer_localhost_replica_compact, dir_name);
concatInsertPath(insert_paths.no_prefer_localhost_replica_compact, dir_name);
}
auto dir_name = replica_addresses.back().toFullString(/* use_compact_format= */ false);
if (!replica_addresses.back().is_local)
concatInsertPath(insert_paths.prefer_localhost_replica, dir_name);
concatInsertPath(insert_paths.no_prefer_localhost_replica, dir_name);
}
}
else
@ -660,17 +653,17 @@ const std::string & Cluster::ShardInfo::insertPathForInternalReplication(bool pr
const auto & paths = insert_path_for_internal_replication;
if (!use_compact_format)
{
if (prefer_localhost_replica)
return paths.prefer_localhost_replica;
else
return paths.no_prefer_localhost_replica;
const auto & path = prefer_localhost_replica ? paths.prefer_localhost_replica : paths.no_prefer_localhost_replica;
if (path.size() > NAME_MAX)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Path '{}' for async distributed INSERT is too long (exceed {} limit)", path, NAME_MAX);
}
return path;
}
else
{
if (prefer_localhost_replica)
return paths.prefer_localhost_replica_compact;
else
return paths.no_prefer_localhost_replica_compact;
return paths.compact;
}
}

View File

@ -166,10 +166,8 @@ public:
std::string prefer_localhost_replica;
/// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=0
std::string no_prefer_localhost_replica;
/// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=1
std::string prefer_localhost_replica_compact;
/// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=1
std::string no_prefer_localhost_replica_compact;
/// use_compact_format_in_distributed_parts_names=1
std::string compact;
};
struct ShardInfo

View File

@ -60,13 +60,12 @@ namespace
constexpr const std::chrono::minutes decrease_error_count_period{5};
template <typename PoolFactory>
ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory, Poco::Logger * log)
ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory, const Cluster::ShardsInfo & shards_info, Poco::Logger * log)
{
ConnectionPoolPtrs pools;
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
auto make_connection = [&](const Cluster::Address & address)
{
Cluster::Address address = Cluster::Address::fromFullString(boost::copy_range<std::string>(*it));
try
{
pools.emplace_back(factory(address));
@ -76,10 +75,35 @@ namespace
if (e.code() == ErrorCodes::INCORRECT_FILE_NAME)
{
tryLogCurrentException(log);
continue;
return;
}
throw;
}
};
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const std::string & dirname = boost::copy_range<std::string>(*it);
Cluster::Address address = Cluster::Address::fromFullString(dirname);
if (address.shard_index && dirname.ends_with("_all_replicas"))
{
if (address.shard_index > shards_info.size())
{
LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name);
continue;
}
const auto & shard_info = shards_info[address.shard_index - 1];
size_t replicas = shard_info.per_replica_pools.size();
for (size_t replica_index = 1; replica_index <= replicas; ++replica_index)
{
address.replica_index = replica_index;
make_connection(address);
}
}
else
make_connection(address);
}
return pools;
@ -420,13 +444,13 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses();
/// check new format shard{shard_index}_number{replica_index}
/// check new format shard{shard_index}_replica{replica_index}
/// (shard_index and replica_index starts from 1)
if (address.shard_index != 0)
{
if (!address.replica_index)
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"Wrong replica_index ({})", address.replica_index, name);
"Wrong replica_index={} ({})", address.replica_index, name);
if (address.shard_index > shards_info.size())
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
@ -475,7 +499,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
address.secure);
};
auto pools = createPoolsForAddresses(name, pool_factory, storage.log);
auto pools = createPoolsForAddresses(name, pool_factory, storage.getCluster()->getShardsInfo(), storage.log);
const auto settings = storage.getContext()->getSettings();
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools,

View File

@ -1,19 +1,20 @@
<yandex>
<remote_servers>
<test_cluster>
<test_cluster_internal_replication>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>not_existing</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</test_cluster_internal_replication>
<test_cluster_2>
<test_cluster_no_internal_replication>
<node>
<host>not_existing</host>
<port>9000</port>
</node>
</test_cluster_2>
</test_cluster_no_internal_replication>
</remote_servers>
</yandex>

View File

@ -1,16 +1,27 @@
import pytest
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=line-too-long
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/remote_servers.xml'])
cluster_param = pytest.mark.parametrize("cluster", [
('test_cluster'),
('test_cluster_2'),
('test_cluster_internal_replication'),
('test_cluster_no_internal_replication'),
])
def get_dist_path(cluster, table, dist_format):
if dist_format == 0:
return f'/var/lib/clickhouse/data/test/{table}/default@not_existing:9000'
if cluster == 'test_cluster_internal_replication':
return f'/var/lib/clickhouse/data/test/{table}/shard1_all_replicas'
return f'/var/lib/clickhouse/data/test/{table}/shard1_replica1'
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -29,13 +40,16 @@ def test_single_file(started_cluster, cluster):
node.query("insert into test.distr_1 values (1, 'a'), (2, 'bb'), (3, 'ccc')",
settings={"use_compact_format_in_distributed_parts_names": "1"})
query = "select * from file('/var/lib/clickhouse/data/test/distr_1/shard1_replica1/1.bin', 'Distributed')"
path = get_dist_path(cluster, 'distr_1', 1)
query = f"select * from file('{path}/1.bin', 'Distributed')"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '1\ta\n2\tbb\n3\tccc\n'
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_1/shard1_replica1/1.bin');" \
"select * from t"
query = f"""
create table t (x UInt64, s String) engine = File('Distributed', '{path}/1.bin');
select * from t;
"""
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '1\ta\n2\tbb\n3\tccc\n'
@ -54,13 +68,16 @@ def test_two_files(started_cluster, cluster):
"use_compact_format_in_distributed_parts_names": "1",
})
query = "select * from file('/var/lib/clickhouse/data/test/distr_2/shard1_replica1/{1,2,3,4}.bin', 'Distributed') order by x"
path = get_dist_path(cluster, 'distr_2', 1)
query = f"select * from file('{path}/{{1,2,3,4}}.bin', 'Distributed') order by x"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n'
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_2/shard1_replica1/{1,2,3,4}.bin');" \
"select * from t order by x"
query = f"""
create table t (x UInt64, s String) engine = File('Distributed', '{path}/{{1,2,3,4}}.bin');
select * from t order by x;
"""
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n'
@ -76,13 +93,16 @@ def test_single_file_old(started_cluster, cluster):
"use_compact_format_in_distributed_parts_names": "0",
})
query = "select * from file('/var/lib/clickhouse/data/test/distr_3/default@not_existing:9000/1.bin', 'Distributed')"
path = get_dist_path(cluster, 'distr_3', 0)
query = f"select * from file('{path}/1.bin', 'Distributed')"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '1\ta\n2\tbb\n3\tccc\n'
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_3/default@not_existing:9000/1.bin');" \
"select * from t"
query = f"""
create table t (x UInt64, s String) engine = File('Distributed', '{path}/1.bin');
select * from t;
"""
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '1\ta\n2\tbb\n3\tccc\n'