Add zookeeper name in endpoint id (#49780)

* Add zookeeper name in endpoint id

When we migrate a replicated table from one zookeeper cluster to
another (the reason why we migration is that zookeeper's load is
too high), we will create a new table with the same zpath, but it
will fail and the old table will be in trouble.

Here is some infomation:
1.old table:
  CREATE TABLE a1 (`id` UInt64)
  ENGINE = ReplicatedMergeTree('/clickhouse/tables/default/a1/{shard}', '{replica}')
  ORDER BY (id);
2.new table:
  CREATE TABLE a2 (`id` UInt64)
  ENGINE = ReplicatedMergeTree('aux1:/clickhouse/tables/default/a1/{shard}', '{replica}')
  ORDER BY (id);
3.error info:
  <Error> executeQuery: Code: 220. DB::Exception: Duplicate interserver IO endpoint:
          DataPartsExchange:/clickhouse/tables/default/a1/01/replicas/02.
          (DUPLICATE_INTERSERVER_IO_ENDPOINT)
  <Error> InterserverIOHTTPHandler: Code: 221. DB::Exception: No interserver IO endpoint
          named DataPartsExchange:/clickhouse/tables/default/a1/01/replicas/02.
          (NO_SUCH_INTERSERVER_IO_ENDPOINT)

* Revert "Add zookeeper name in endpoint id"

This reverts commit 9deb75b249619b7abdd38e3949ca8b3a76c9df8e.

* Add zookeeper name in endpoint id

When we migrate a replicated table from one zookeeper cluster to
another (the reason why we migration is that zookeeper's load is
too high), we will create a new table with the same zpath, but it
will fail and the old table will be in trouble.

* Fix incompatible with a new setting

* add a test, fix other issues

* Update 02442_auxiliary_zookeeper_endpoint_id.sql

* Update 02735_system_zookeeper_connection.reference

* Update 02735_system_zookeeper_connection.sql

* Update run.sh

* Remove the 'no-fasttest' tag

* Update 02442_auxiliary_zookeeper_endpoint_id.sql

---------

Co-authored-by: Alexander Tokmakov <tavplubix@clickhouse.com>
Co-authored-by: Alexander Tokmakov <tavplubix@gmail.com>
This commit is contained in:
何李夫 2023-05-25 17:50:14 +08:00 committed by GitHub
parent 1b32348452
commit e4c8c4cecf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 147 additions and 48 deletions

View File

@ -132,6 +132,9 @@ function run_tests()
ADDITIONAL_OPTIONS+=('--report-logs-stats') ADDITIONAL_OPTIONS+=('--report-logs-stats')
clickhouse-test "00001_select_1" > /dev/null ||:
clickhouse-client -q "insert into system.zookeeper (name, path, value) values ('auxiliary_zookeeper2', '/test/chroot/', '')" ||:
set +e set +e
clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --hung-check --print-time \ clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --hung-check --print-time \
--test-runs "$NUM_TRIES" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \ --test-runs "$NUM_TRIES" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \

View File

@ -65,6 +65,9 @@ sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp > /etc/clickhouse-server/config.d/storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
start start
stop stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log
@ -94,6 +97,9 @@ sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp > /etc/clickhouse-server/config.d/storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
start start
clickhouse-client --query="SELECT 'Server version: ', version()" clickhouse-client --query="SELECT 'Server version: ', version()"

View File

@ -3,6 +3,7 @@
#include <base/types.h> #include <base/types.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Coordination/KeeperConstants.h> #include <Coordination/KeeperConstants.h>
#include <Poco/Net/SocketAddress.h>
#include <vector> #include <vector>
#include <memory> #include <memory>
@ -466,7 +467,7 @@ public:
/// Useful to check owner of ephemeral node. /// Useful to check owner of ephemeral node.
virtual int64_t getSessionID() const = 0; virtual int64_t getSessionID() const = 0;
virtual String getConnectedAddress() const = 0; virtual Poco::Net::SocketAddress getConnectedAddress() const = 0;
/// If the method will throw an exception, callbacks won't be called. /// If the method will throw an exception, callbacks won't be called.
/// ///

View File

@ -39,7 +39,7 @@ public:
bool isExpired() const override { return expired; } bool isExpired() const override { return expired; }
int64_t getSessionID() const override { return 0; } int64_t getSessionID() const override { return 0; }
String getConnectedAddress() const override { return connected_zk_address; } Poco::Net::SocketAddress getConnectedAddress() const override { return connected_zk_address; }
void create( void create(
@ -127,7 +127,7 @@ private:
zkutil::ZooKeeperArgs args; zkutil::ZooKeeperArgs args;
String connected_zk_address; Poco::Net::SocketAddress connected_zk_address;
std::mutex push_request_mutex; std::mutex push_request_mutex;
std::atomic<bool> expired{false}; std::atomic<bool> expired{false};

View File

@ -112,11 +112,10 @@ void ZooKeeper::init(ZooKeeperArgs args_)
else else
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot); LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
String address = impl->getConnectedAddress(); Poco::Net::SocketAddress address = impl->getConnectedAddress();
size_t colon_pos = address.find(':'); connected_zk_host = address.host().toString();
connected_zk_host = address.substr(0, colon_pos); connected_zk_port = address.port();
connected_zk_port = address.substr(colon_pos + 1);
connected_zk_index = 0; connected_zk_index = 0;
@ -124,7 +123,7 @@ void ZooKeeper::init(ZooKeeperArgs args_)
{ {
for (size_t i = 0; i < args.hosts.size(); i++) for (size_t i = 0; i < args.hosts.size(); i++)
{ {
if (args.hosts[i] == address) if (args.hosts[i] == address.toString())
{ {
connected_zk_index = i; connected_zk_index = i;
break; break;

View File

@ -524,7 +524,7 @@ public:
void setServerCompletelyStarted(); void setServerCompletelyStarted();
String getConnectedZooKeeperHost() const { return connected_zk_host; } String getConnectedZooKeeperHost() const { return connected_zk_host; }
String getConnectedZooKeeperPort() const { return connected_zk_port; } UInt16 getConnectedZooKeeperPort() const { return connected_zk_port; }
size_t getConnectedZooKeeperIndex() const { return connected_zk_index; } size_t getConnectedZooKeeperIndex() const { return connected_zk_index; }
private: private:
@ -591,7 +591,7 @@ private:
ZooKeeperArgs args; ZooKeeperArgs args;
String connected_zk_host; String connected_zk_host;
String connected_zk_port; UInt16 connected_zk_port;
size_t connected_zk_index; size_t connected_zk_index;
std::mutex mutex; std::mutex mutex;

View File

@ -433,7 +433,7 @@ void ZooKeeper::connect(
} }
connected = true; connected = true;
connected_zk_address = node.address.toString(); connected_zk_address = node.address;
break; break;
} }
@ -450,7 +450,7 @@ void ZooKeeper::connect(
if (!connected) if (!connected)
{ {
WriteBufferFromOwnString message; WriteBufferFromOwnString message;
connected_zk_address = ""; connected_zk_address = Poco::Net::SocketAddress();
message << "All connection tries failed while connecting to ZooKeeper. nodes: "; message << "All connection tries failed while connecting to ZooKeeper. nodes: ";
bool first = true; bool first = true;

View File

@ -125,7 +125,7 @@ public:
/// Useful to check owner of ephemeral node. /// Useful to check owner of ephemeral node.
int64_t getSessionID() const override { return session_id; } int64_t getSessionID() const override { return session_id; }
String getConnectedAddress() const override { return connected_zk_address; } Poco::Net::SocketAddress getConnectedAddress() const override { return connected_zk_address; }
void executeGenericRequest( void executeGenericRequest(
const ZooKeeperRequestPtr & request, const ZooKeeperRequestPtr & request,
@ -203,7 +203,7 @@ public:
private: private:
ACLs default_acls; ACLs default_acls;
String connected_zk_address; Poco::Net::SocketAddress connected_zk_address;
zkutil::ZooKeeperArgs args; zkutil::ZooKeeperArgs args;

View File

@ -2796,11 +2796,7 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
std::map<String, zkutil::ZooKeeperPtr> Context::getAuxiliaryZooKeepers() const std::map<String, zkutil::ZooKeeperPtr> Context::getAuxiliaryZooKeepers() const
{ {
std::lock_guard lock(shared->auxiliary_zookeepers_mutex); std::lock_guard lock(shared->auxiliary_zookeepers_mutex);
return shared->auxiliary_zookeepers;
if (!shared->auxiliary_zookeepers.empty())
return shared->auxiliary_zookeepers;
else
return std::map<String, zkutil::ZooKeeperPtr>();
} }
#if USE_ROCKSDB #if USE_ROCKSDB

View File

@ -369,6 +369,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
ContextPtr context, ContextPtr context,
const String & part_name, const String & part_name,
const String & zookeeper_name,
const String & replica_path, const String & replica_path,
const String & host, const String & host,
int port, int port,
@ -401,13 +402,18 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
/// Validation of the input that may come from malicious replica. /// Validation of the input that may come from malicious replica.
auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version); auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
String endpoint_id = getEndpointId(
data_settings->enable_the_endpoint_id_with_zookeeper_name_prefix ?
zookeeper_name + ":" + replica_path :
replica_path);
Poco::URI uri; Poco::URI uri;
uri.setScheme(interserver_scheme); uri.setScheme(interserver_scheme);
uri.setHost(host); uri.setHost(host);
uri.setPort(port); uri.setPort(port);
uri.setQueryParameters( uri.setQueryParameters(
{ {
{"endpoint", getEndpointId(replica_path)}, {"endpoint", endpoint_id},
{"part", part_name}, {"part", part_name},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION)}, {"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION)},
{"compress", "false"} {"compress", "false"}
@ -630,7 +636,15 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
temporary_directory_lock = {}; temporary_directory_lock = {};
/// Try again but without zero-copy /// Try again but without zero-copy
return fetchSelectedPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts, return fetchSelectedPart(
metadata_snapshot,
context,
part_name,
zookeeper_name,
replica_path,
host,
port,
timeouts,
user, password, interserver_scheme, throttler, to_detached, tmp_prefix, nullptr, false, disk); user, password, interserver_scheme, throttler, to_detached, tmp_prefix, nullptr, false, disk);
} }
} }

View File

@ -70,6 +70,7 @@ public:
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
ContextPtr context, ContextPtr context,
const String & part_name, const String & part_name,
const String & zookeeper_name,
const String & replica_path, const String & replica_path,
const String & host, const String & host,
int port, int port,

View File

@ -159,6 +159,7 @@ struct Settings;
M(UInt64, min_bytes_to_rebalance_partition_over_jbod, 0, "Minimal amount of bytes to enable part rebalance over JBOD array (0 - disabled).", 0) \ M(UInt64, min_bytes_to_rebalance_partition_over_jbod, 0, "Minimal amount of bytes to enable part rebalance over JBOD array (0 - disabled).", 0) \
M(Bool, check_sample_column_is_correct, true, "Check columns or columns by hash for sampling are unsigned integer.", 0) \ M(Bool, check_sample_column_is_correct, true, "Check columns or columns by hash for sampling are unsigned integer.", 0) \
M(Bool, allow_vertical_merges_from_compact_to_wide_parts, false, "Allows vertical merges from compact to wide parts. This settings must have the same value on all replicas", 0) \ M(Bool, allow_vertical_merges_from_compact_to_wide_parts, false, "Allows vertical merges from compact to wide parts. This settings must have the same value on all replicas", 0) \
M(Bool, enable_the_endpoint_id_with_zookeeper_name_prefix, false, "Enable the endpoint id with zookeeper name prefix for the replicated merge tree table", 0) \
\ \
/** Experimental/work in progress feature. Unsafe for production. */ \ /** Experimental/work in progress feature. Unsafe for production. */ \
M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \ M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \

View File

@ -240,6 +240,15 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperAndAssertNotReadonl
return res; return res;
} }
String StorageReplicatedMergeTree::getEndpointName() const
{
const MergeTreeSettings & settings = getContext()->getReplicatedMergeTreeSettings();
if (settings.enable_the_endpoint_id_with_zookeeper_name_prefix)
return zookeeper_name + ":" + replica_path;
return replica_path;
}
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context) static ConnectionTimeouts getHTTPTimeouts(ContextPtr context)
{ {
return ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}); return ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
@ -1841,6 +1850,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
String source_replica_path = fs::path(zookeeper_path) / "replicas" / replica; String source_replica_path = fs::path(zookeeper_path) / "replicas" / replica;
if (!fetchPart(part_name, if (!fetchPart(part_name,
metadata_snapshot, metadata_snapshot,
zookeeper_name,
source_replica_path, source_replica_path,
/* to_detached= */ false, /* to_detached= */ false,
entry.quorum, entry.quorum,
@ -2341,7 +2351,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
interserver_scheme, address.scheme, address.host); interserver_scheme, address.scheme, address.host);
part_desc->res_part = fetcher.fetchSelectedPart( part_desc->res_part = fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path, metadata_snapshot, getContext(), part_desc->found_new_part_name, zookeeper_name, source_replica_path,
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(),
interserver_scheme, replicated_fetches_throttler, false, TMP_PREFIX + "fetch_"); interserver_scheme, replicated_fetches_throttler, false, TMP_PREFIX + "fetch_");
@ -2458,7 +2468,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
interserver_scheme, address.scheme, address.host); interserver_scheme, address.scheme, address.host);
return fetcher.fetchSelectedPart( return fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), entry.new_part_name, source_replica_path, metadata_snapshot, getContext(), entry.new_part_name, zookeeper_name, source_replica_path,
address.host, address.replication_port, address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme,
replicated_fetches_throttler, true); replicated_fetches_throttler, true);
@ -4042,6 +4052,7 @@ bool StorageReplicatedMergeTree::partIsLastQuorumPart(const MergeTreePartInfo &
bool StorageReplicatedMergeTree::fetchPart( bool StorageReplicatedMergeTree::fetchPart(
const String & part_name, const String & part_name,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const String & source_zookeeper_name,
const String & source_replica_path, const String & source_replica_path,
bool to_detached, bool to_detached,
size_t quorum, size_t quorum,
@ -4077,7 +4088,7 @@ bool StorageReplicatedMergeTree::fetchPart(
currently_fetching_parts.erase(part_name); currently_fetching_parts.erase(part_name);
}); });
LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path); LOG_DEBUG(log, "Fetching part {} from {}:{}", part_name, source_zookeeper_name, source_replica_path);
auto settings_ptr = getSettings(); auto settings_ptr = getSettings();
TableLockHolder table_lock_holder; TableLockHolder table_lock_holder;
@ -4134,7 +4145,8 @@ bool StorageReplicatedMergeTree::fetchPart(
} }
else else
{ {
LOG_INFO(log, "Not checking checksums of part {} with replica {} because part was removed from ZooKeeper", part_name, source_replica_path); LOG_INFO(log, "Not checking checksums of part {} with replica {}:{} because part was removed from ZooKeeper",
part_name, source_zookeeper_name, source_replica_path);
} }
} }
@ -4187,6 +4199,7 @@ bool StorageReplicatedMergeTree::fetchPart(
metadata_snapshot, metadata_snapshot,
getContext(), getContext(),
part_name, part_name,
source_zookeeper_name,
source_replica_path, source_replica_path,
address.host, address.host,
address.replication_port, address.replication_port,
@ -4279,7 +4292,7 @@ bool StorageReplicatedMergeTree::fetchPart(
if (part_to_clone) if (part_to_clone)
LOG_DEBUG(log, "Cloned part {} from {}{}", part_name, part_to_clone->name, to_detached ? " (to 'detached' directory)" : ""); LOG_DEBUG(log, "Cloned part {} from {}{}", part_name, part_to_clone->name, to_detached ? " (to 'detached' directory)" : "");
else else
LOG_DEBUG(log, "Fetched part {} from {}{}", part_name, source_replica_path, to_detached ? " (to 'detached' directory)" : ""); LOG_DEBUG(log, "Fetched part {} from {}:{}{}", part_name, source_zookeeper_name, source_replica_path, to_detached ? " (to 'detached' directory)" : "");
return true; return true;
} }
@ -4318,7 +4331,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
currently_fetching_parts.erase(part_name); currently_fetching_parts.erase(part_name);
}); });
LOG_DEBUG(log, "Fetching already known part {} from {}", part_name, source_replica_path); LOG_DEBUG(log, "Fetching already known part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
TableLockHolder table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); TableLockHolder table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
@ -4350,7 +4363,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
"'{}' != '{}', can't fetch part from {}", interserver_scheme, address.scheme, address.host); "'{}' != '{}', can't fetch part from {}", interserver_scheme, address.scheme, address.host);
return fetcher.fetchSelectedPart( return fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), part_name, source_replica_path, metadata_snapshot, getContext(), part_name, zookeeper_name, source_replica_path,
address.host, address.replication_port, address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(), timeouts, credentials->getUser(), credentials->getPassword(),
interserver_scheme, replicated_fetches_throttler, false, "", nullptr, true, interserver_scheme, replicated_fetches_throttler, false, "", nullptr, true,
@ -4387,7 +4400,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches); ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part {} from {}", part_name, source_replica_path); LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
return part->getDataPartStoragePtr(); return part->getDataPartStoragePtr();
} }
@ -4430,7 +4443,16 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
InterserverIOEndpointPtr data_parts_exchange_ptr = std::make_shared<DataPartsExchange::Service>(*this); InterserverIOEndpointPtr data_parts_exchange_ptr = std::make_shared<DataPartsExchange::Service>(*this);
[[maybe_unused]] auto prev_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, data_parts_exchange_ptr); [[maybe_unused]] auto prev_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, data_parts_exchange_ptr);
assert(prev_ptr == nullptr); assert(prev_ptr == nullptr);
getContext()->getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr);
/// The endpoint id:
/// old format: DataPartsExchange:/clickhouse/tables/default/t1/{shard}/{replica}
/// new format: DataPartsExchange:{zookeeper_name}:/clickhouse/tables/default/t1/{shard}/{replica}
/// Notice:
/// They are incompatible and the default is the old format.
/// If you want to use the new format, please ensure that 'enable_the_endpoint_id_with_zookeeper_name_prefix' of all nodes is true .
///
getContext()->getInterserverIOHandler().addEndpoint(
data_parts_exchange_ptr->getId(getEndpointName()), data_parts_exchange_ptr);
startBeingLeader(); startBeingLeader();
@ -4555,7 +4577,7 @@ void StorageReplicatedMergeTree::shutdown()
auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{}); auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{});
if (data_parts_exchange_ptr) if (data_parts_exchange_ptr)
{ {
getContext()->getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_ptr->getId(replica_path)); getContext()->getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_ptr->getId(getEndpointName()));
/// Ask all parts exchange handlers to finish asap. New ones will fail to start /// Ask all parts exchange handlers to finish asap. New ones will fail to start
data_parts_exchange_ptr->blocker.cancelForever(); data_parts_exchange_ptr->blocker.cancelForever();
/// Wait for all of them /// Wait for all of them
@ -6237,14 +6259,14 @@ void StorageReplicatedMergeTree::fetchPartition(
info.table_id = getStorageID(); info.table_id = getStorageID();
info.table_id.uuid = UUIDHelpers::Nil; info.table_id.uuid = UUIDHelpers::Nil;
auto expand_from = query_context->getMacros()->expand(from_, info); auto expand_from = query_context->getMacros()->expand(from_, info);
String auxiliary_zookeeper_name = zkutil::extractZooKeeperName(expand_from); String from_zookeeper_name = zkutil::extractZooKeeperName(expand_from);
String from = zkutil::extractZooKeeperPath(expand_from, /* check_starts_with_slash */ true); String from = zkutil::extractZooKeeperPath(expand_from, /* check_starts_with_slash */ true);
if (from.empty()) if (from.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "ZooKeeper path should not be empty"); throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "ZooKeeper path should not be empty");
zkutil::ZooKeeperPtr zookeeper; zkutil::ZooKeeperPtr zookeeper;
if (auxiliary_zookeeper_name != default_zookeeper_name) if (from_zookeeper_name != default_zookeeper_name)
zookeeper = getContext()->getAuxiliaryZooKeeper(auxiliary_zookeeper_name); zookeeper = getContext()->getAuxiliaryZooKeeper(from_zookeeper_name);
else else
zookeeper = getZooKeeper(); zookeeper = getZooKeeper();
@ -6263,12 +6285,12 @@ void StorageReplicatedMergeTree::fetchPartition(
*/ */
if (checkIfDetachedPartExists(part_name)) if (checkIfDetachedPartExists(part_name))
throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Detached part {} already exists.", part_name); throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Detached part {} already exists.", part_name);
LOG_INFO(log, "Will fetch part {} from shard {} (zookeeper '{}')", part_name, from_, auxiliary_zookeeper_name); LOG_INFO(log, "Will fetch part {} from shard {}", part_name, from_);
try try
{ {
/// part name , metadata, part_path , true, 0, zookeeper /// part name , metadata, part_path , true, 0, zookeeper
if (!fetchPart(part_name, metadata_snapshot, part_path, true, 0, zookeeper, /* try_fetch_shared = */ false)) if (!fetchPart(part_name, metadata_snapshot, from_zookeeper_name, part_path, true, 0, zookeeper, /* try_fetch_shared = */ false))
throw Exception(ErrorCodes::UNFINISHED, "Failed to fetch part {} from {}", part_name, from_); throw Exception(ErrorCodes::UNFINISHED, "Failed to fetch part {} from {}", part_name, from_);
} }
catch (const DB::Exception & e) catch (const DB::Exception & e)
@ -6283,7 +6305,7 @@ void StorageReplicatedMergeTree::fetchPartition(
} }
String partition_id = getPartitionIDFromQuery(partition, query_context); String partition_id = getPartitionIDFromQuery(partition, query_context);
LOG_INFO(log, "Will fetch partition {} from shard {} (zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name); LOG_INFO(log, "Will fetch partition {} from shard {}", partition_id, from_);
/** Let's check that there is no such partition in the `detached` directory (where we will write the downloaded parts). /** Let's check that there is no such partition in the `detached` directory (where we will write the downloaded parts).
* Unreliable (there is a race condition) - such a partition may appear a little later. * Unreliable (there is a race condition) - such a partition may appear a little later.
@ -6307,7 +6329,7 @@ void StorageReplicatedMergeTree::fetchPartition(
active_replicas.push_back(replica); active_replicas.push_back(replica);
if (active_replicas.empty()) if (active_replicas.empty())
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No active replicas for shard {}", from); throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No active replicas for shard {}", from_);
/** You must select the best (most relevant) replica. /** You must select the best (most relevant) replica.
* This is a replica with the maximum `log_pointer`, then with the minimum `queue` size. * This is a replica with the maximum `log_pointer`, then with the minimum `queue` size.
@ -6361,7 +6383,8 @@ void StorageReplicatedMergeTree::fetchPartition(
LOG_INFO(log, "Some of parts ({}) are missing. Will try to fetch covering parts.", missing_parts.size()); LOG_INFO(log, "Some of parts ({}) are missing. Will try to fetch covering parts.", missing_parts.size());
if (try_no >= query_context->getSettings().max_fetch_partition_retries_count) if (try_no >= query_context->getSettings().max_fetch_partition_retries_count)
throw Exception(ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS, "Too many retries to fetch parts from {}", best_replica_path); throw Exception(ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS,
"Too many retries to fetch parts from {}:{}", from_zookeeper_name, best_replica_path);
Strings parts = zookeeper->getChildren(fs::path(best_replica_path) / "parts"); Strings parts = zookeeper->getChildren(fs::path(best_replica_path) / "parts");
ActiveDataPartSet active_parts_set(format_version, parts); ActiveDataPartSet active_parts_set(format_version, parts);
@ -6382,7 +6405,8 @@ void StorageReplicatedMergeTree::fetchPartition(
parts_to_fetch = std::move(parts_to_fetch_partition); parts_to_fetch = std::move(parts_to_fetch_partition);
if (parts_to_fetch.empty()) if (parts_to_fetch.empty())
throw Exception(ErrorCodes::PARTITION_DOESNT_EXIST, "Partition {} on {} doesn't exist", partition_id, best_replica_path); throw Exception(ErrorCodes::PARTITION_DOESNT_EXIST,
"Partition {} on {}:{} doesn't exist", partition_id, from_zookeeper_name, best_replica_path);
} }
else else
{ {
@ -6392,7 +6416,7 @@ void StorageReplicatedMergeTree::fetchPartition(
if (!containing_part.empty()) if (!containing_part.empty())
parts_to_fetch.push_back(containing_part); parts_to_fetch.push_back(containing_part);
else else
LOG_WARNING(log, "Part {} on replica {} has been vanished.", missing_part, best_replica_path); LOG_WARNING(log, "Part {} on replica {}:{} has been vanished.", missing_part, from_zookeeper_name, best_replica_path);
} }
} }
@ -6405,7 +6429,7 @@ void StorageReplicatedMergeTree::fetchPartition(
try try
{ {
fetched = fetchPart(part, metadata_snapshot, best_replica_path, true, 0, zookeeper, /* try_fetch_shared = */ false); fetched = fetchPart(part, metadata_snapshot, from_zookeeper_name, best_replica_path, true, 0, zookeeper, /* try_fetch_shared = */ false);
} }
catch (const DB::Exception & e) catch (const DB::Exception & e)
{ {

View File

@ -382,6 +382,7 @@ private:
zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const; zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const;
zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const; zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const;
void setZooKeeper(); void setZooKeeper();
String getEndpointName() const;
/// If true, the table is offline and can not be written to it. /// If true, the table is offline and can not be written to it.
/// This flag is managed by RestartingThread. /// This flag is managed by RestartingThread.
@ -699,6 +700,7 @@ private:
bool fetchPart( bool fetchPart(
const String & part_name, const String & part_name,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const String & source_zookeeper_name,
const String & source_replica_path, const String & source_replica_path,
bool to_detached, bool to_detached,
size_t quorum, size_t quorum,

View File

@ -13,7 +13,7 @@ NamesAndTypesList StorageSystemZooKeeperConnection::getNamesAndTypes()
return { return {
{"name", std::make_shared<DataTypeString>()}, {"name", std::make_shared<DataTypeString>()},
{"host", std::make_shared<DataTypeString>()}, {"host", std::make_shared<DataTypeString>()},
{"port", std::make_shared<DataTypeString>()}, {"port", std::make_shared<DataTypeUInt16>()},
{"index", std::make_shared<DataTypeUInt8>()}, {"index", std::make_shared<DataTypeUInt8>()},
{"connected_time", std::make_shared<DataTypeDateTime>()}, {"connected_time", std::make_shared<DataTypeDateTime>()},
{"is_expired", std::make_shared<DataTypeUInt8>()}, {"is_expired", std::make_shared<DataTypeUInt8>()},
@ -25,7 +25,7 @@ NamesAndTypesList StorageSystemZooKeeperConnection::getNamesAndTypes()
void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, ContextPtr context, void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, ContextPtr context,
const SelectQueryInfo &) const const SelectQueryInfo &) const
{ {
res_columns[0]->insert("default_zookeeper"); res_columns[0]->insert("default");
res_columns[1]->insert(context->getZooKeeper()->getConnectedZooKeeperHost()); res_columns[1]->insert(context->getZooKeeper()->getConnectedZooKeeperHost());
res_columns[2]->insert(context->getZooKeeper()->getConnectedZooKeeperPort()); res_columns[2]->insert(context->getZooKeeper()->getConnectedZooKeeperPort());
res_columns[3]->insert(context->getZooKeeper()->getConnectedZooKeeperIndex()); res_columns[3]->insert(context->getZooKeeper()->getConnectedZooKeeperIndex());
@ -38,7 +38,6 @@ void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, Co
{ {
res_columns[0]->insert(elem.first); res_columns[0]->insert(elem.first);
res_columns[1]->insert(elem.second->getConnectedZooKeeperHost()); res_columns[1]->insert(elem.second->getConnectedZooKeeperHost());
res_columns[1]->insert(elem.second->getConnectedZooKeeperHost());
res_columns[2]->insert(elem.second->getConnectedZooKeeperPort()); res_columns[2]->insert(elem.second->getConnectedZooKeeperPort());
res_columns[3]->insert(elem.second->getConnectedZooKeeperIndex()); res_columns[3]->insert(elem.second->getConnectedZooKeeperIndex());
res_columns[4]->insert(elem.second->getSessionUptime()); res_columns[4]->insert(elem.second->getSessionUptime());

View File

@ -14,6 +14,24 @@
</node> </node>
</zookeeper> </zookeeper>
<auxiliary_zookeepers>
<zookeeper2>
<node index="1">
<host>localhost</host>
<port>9181</port>
</node>
<node index="2">
<host>localhost</host>
<port>19181</port>
</node>
<node index="3">
<host>localhost</host>
<port>29181</port>
</node>
<root>/test/chroot/auxiliary_zookeeper2</root>
</zookeeper2>
</auxiliary_zookeepers>
<keeper_server> <keeper_server>
<tcp_port>9181</tcp_port> <tcp_port>9181</tcp_port>
<server_id>1</server_id> <server_id>1</server_id>

View File

@ -1,5 +1,6 @@
<clickhouse> <clickhouse>
<merge_tree> <merge_tree>
<enable_the_endpoint_id_with_zookeeper_name_prefix>1</enable_the_endpoint_id_with_zookeeper_name_prefix>
<number_of_free_entries_in_pool_to_execute_mutation>8</number_of_free_entries_in_pool_to_execute_mutation> <number_of_free_entries_in_pool_to_execute_mutation>8</number_of_free_entries_in_pool_to_execute_mutation>
</merge_tree> </merge_tree>
</clickhouse> </clickhouse>

View File

@ -7,4 +7,13 @@
<port>9181</port> <port>9181</port>
</node> </node>
</zookeeper> </zookeeper>
<auxiliary_zookeepers>
<zookeeper2>
<node index="1">
<host>localhost</host>
<port>9181</port>
</node>
<root>/test/chroot/auxiliary_zookeeper2</root>
</zookeeper2>
</auxiliary_zookeepers>
</clickhouse> </clickhouse>

View File

@ -0,0 +1,21 @@
-- Tags: no-fasttest
drop table if exists t1_r1 sync;
drop table if exists t1_r2 sync;
drop table if exists t2 sync;
create table t1_r1 (x Int32) engine=ReplicatedMergeTree('/test/02442/{database}/t', 'r1') order by x;
create table t1_r2 (x Int32) engine=ReplicatedMergeTree('/test/02442/{database}/t', 'r2') order by x;
-- create table with same replica_path as t1_r1
create table t2 (x Int32) engine=ReplicatedMergeTree('zookeeper2:/test/02442/{database}/t', 'r1') order by x;
drop table t2 sync;
-- insert data into one replica
insert into t1_r1 select * from generateRandom('x Int32') LIMIT 10013;
system sync replica t1_r2;
select count() from t1_r2;
drop table t1_r1 sync;
drop table t1_r2 sync;

View File

@ -1 +1,2 @@
[ :1]:9181 0 default ::1 9181 0 0 3
zookeeper2 ::1 9181 0 0 0

View File

@ -5,9 +5,11 @@ DROP TABLE IF EXISTS test_zk_connection_table;
CREATE TABLE test_zk_connection_table ( CREATE TABLE test_zk_connection_table (
key UInt64 key UInt64
) )
ENGINE ReplicatedMergeTree('/clickhouse/{database}/02731_zk_connection/{shard}', '{replica}') ENGINE ReplicatedMergeTree('zookeeper2:/clickhouse/{database}/02731_zk_connection/{shard}', '{replica}')
ORDER BY tuple(); ORDER BY tuple();
select host, port, is_expired from system.zookeeper_connection where name='default_zookeeper'; -- keeper_api_version will by 0 for auxiliary_zookeeper2, because we fail to get /api_version due to chroot
-- I'm not sure if it's a bug or a useful trick to fallback to basic api
select name, host, port, index, is_expired, keeper_api_version from system.zookeeper_connection order by name;
DROP TABLE IF EXISTS test_zk_connection_table; DROP TABLE IF EXISTS test_zk_connection_table;