Allow FETCH PARTITION from other zookeepers

This commit is contained in:
Amos Bird 2020-08-27 22:19:18 +08:00
parent 6a963f58da
commit 0c1cf22c00
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
7 changed files with 111 additions and 8 deletions

View File

@ -306,6 +306,9 @@ struct ContextShared
mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper.
mutable std::mutex auxiliary_zookeepers_mutex;
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
String interserver_io_host; /// The host name by which this server is available for other servers.
UInt16 interserver_io_port = 0; /// and port.
String interserver_io_user;
@ -1446,6 +1449,19 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
return shared->zookeeper;
}
zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
{
std::lock_guard lock(shared->auxiliary_zookeepers_mutex);
auto zookeeper = shared->auxiliary_zookeepers.find(name);
if (zookeeper == shared->auxiliary_zookeepers.end())
zookeeper->second = std::make_shared<zkutil::ZooKeeper>(getConfigRef(), "auxiliary_zookeepers." + name);
else if (zookeeper->second->expired())
zookeeper->second = zookeeper->second->startNewSession();
return zookeeper->second;
}
void Context::resetZooKeeper() const
{
std::lock_guard lock(shared->zookeeper_mutex);

View File

@ -474,6 +474,8 @@ public:
/// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call.
/// If no ZooKeeper configured, throws an exception.
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
/// Has ready or expired ZooKeeper
bool hasZooKeeper() const;
/// Reset current zookeeper session. Do not create a new one.

View File

@ -113,6 +113,7 @@ namespace ErrorCodes
extern const int ALL_REPLICAS_LOST;
extern const int REPLICA_STATUS_CHANGED;
extern const int CANNOT_ASSIGN_ALTER;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace ActionLocks
@ -3109,8 +3110,9 @@ void StorageReplicatedMergeTree::cleanLastPartNode(const String & partition_id)
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const StorageMetadataPtr & metadata_snapshot,
const String & source_replica_path, bool to_detached, size_t quorum)
const String & source_replica_path, bool to_detached, size_t quorum, zkutil::ZooKeeper::Ptr zookeeper_)
{
auto zookeeper = zookeeper_ ? zookeeper_ : getZooKeeper();
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
if (auto part = getPartIfExists(part_info, {IMergeTreeDataPart::State::Outdated, IMergeTreeDataPart::State::Deleting}))
@ -3170,7 +3172,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
source_part_checksums.computeTotalChecksums(source_part->checksums);
MinimalisticDataPartChecksums desired_checksums;
auto zookeeper = getZooKeeper();
String part_path = source_replica_path + "/parts/" + part_name;
String part_znode = zookeeper->get(part_path);
if (!part_znode.empty())
@ -3200,7 +3201,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
}
else
{
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
ReplicatedMergeTreeAddress address(zookeeper->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto user_password = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
@ -3253,6 +3254,8 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
}
else
{
// The fetched part is valuable and should not be cleaned like a temp part.
part->is_temp = false;
part->renameTo("detached/" + part_name, true);
}
}
@ -4553,9 +4556,23 @@ void StorageReplicatedMergeTree::fetchPartition(
const String & from_,
const Context & query_context)
{
auto zookeeper = getZooKeeper();
String from = from_;
if (from.empty())
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (from[0] != '/')
{
auto delimiter = from.find(':');
if (delimiter == String::npos)
throw Exception("Zookeeper path should start with '/' or '<auxiliary_zookeeper_name>:/'", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto auxiliary_zookeeper_name = from.substr(0, delimiter);
from = from.substr(delimiter + 1, String::npos);
if (from.empty())
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
zookeeper = global_context.getAuxiliaryZooKeeper(auxiliary_zookeeper_name);
}
String partition_id = getPartitionIDFromQuery(partition, query_context);
String from = from_;
if (from.back() == '/')
from.resize(from.size() - 1);
@ -4582,7 +4599,6 @@ void StorageReplicatedMergeTree::fetchPartition(
String best_replica;
{
auto zookeeper = getZooKeeper();
/// List of replicas of source shard.
replicas = zookeeper->getChildren(from + "/replicas");
@ -4651,7 +4667,7 @@ void StorageReplicatedMergeTree::fetchPartition(
if (try_no >= query_context.getSettings().max_fetch_partition_retries_count)
throw Exception("Too many retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS);
Strings parts = getZooKeeper()->getChildren(best_replica_path + "/parts");
Strings parts = zookeeper->getChildren(best_replica_path + "/parts");
ActiveDataPartSet active_parts_set(format_version, parts);
Strings parts_to_fetch;
@ -4691,7 +4707,7 @@ void StorageReplicatedMergeTree::fetchPartition(
{
try
{
fetchPart(part, metadata_snapshot, best_replica_path, true, 0);
fetchPart(part, metadata_snapshot, best_replica_path, true, 0, zookeeper);
}
catch (const DB::Exception & e)
{

View File

@ -478,7 +478,13 @@ private:
* If quorum != 0, then the node for tracking the quorum is updated.
* Returns false if part is already fetching right now.
*/
bool fetchPart(const String & part_name, const StorageMetadataPtr & metadata_snapshot, const String & replica_path, bool to_detached, size_t quorum);
bool fetchPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & replica_path,
bool to_detached,
size_t quorum,
zkutil::ZooKeeper::Ptr zookeeper_ = nullptr);
/// Required only to avoid races between executeLogEntry and fetchPartition
std::unordered_set<String> currently_fetching_parts;

View File

@ -0,0 +1,28 @@
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
</zookeeper>
<auxiliary_zookeepers>
<zookeeper2>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
</zookeeper2>
</auxiliary_zookeepers>
</yandex>

View File

@ -0,0 +1,35 @@
from __future__ import print_function
from helpers.cluster import ClickHouseCluster
import helpers
import pytest
def test_chroot_with_same_root():
cluster = ClickHouseCluster(
__file__, zookeeper_config_path="configs/zookeeper_config.xml"
)
node = cluster.add_instance(
"node", config_dir="configs", with_zookeeper=True, zookeeper_use_tmpfs=False
)
try:
cluster.start()
node.query(
"CREATE TABLE simple (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', 'node') PARTITION BY date;"
)
node.query("INSERT INTO simple VALUES ('2020-08-27', 1)")
node.query(
"CREATE TABLE simple2 (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/simple', 'node') PARTITION BY date;"
)
node.query(
"ALTER TABLE simple2 FETCH PARTITION '2020-08-27' FROM 'zookeeper2:/clickhouse/tables/0/simple';"
)
node.query("ALTER TABLE simple2 ATTACH PARTITION '2020-08-27';")
assert node.query("SELECT id FROM simple2").strip() == "1"
finally:
cluster.shutdown()