mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 10:22:10 +00:00
Better deletion of keeper metadata in s3queue
This commit is contained in:
parent
0f4990d2e7
commit
7fc60d84cc
@ -17,6 +17,7 @@
|
|||||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||||
#include <Common/getRandomASCIIString.h>
|
#include <Common/getRandomASCIIString.h>
|
||||||
#include <Common/randomSeed.h>
|
#include <Common/randomSeed.h>
|
||||||
|
#include <Common/DNSResolver.h>
|
||||||
#include <numeric>
|
#include <numeric>
|
||||||
|
|
||||||
|
|
||||||
@ -438,6 +439,164 @@ ObjectStorageQueueTableMetadata ObjectStorageQueueMetadata::syncWithKeeper(
|
|||||||
"of wrong zookeeper path or because of logical error");
|
"of wrong zookeeper path or because of logical error");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
struct Info
|
||||||
|
{
|
||||||
|
std::string hostname;
|
||||||
|
std::string table_id;
|
||||||
|
|
||||||
|
bool operator ==(const Info & other) const
|
||||||
|
{
|
||||||
|
return hostname == other.hostname && table_id == other.table_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
static Info create(const StorageID & storage_id)
|
||||||
|
{
|
||||||
|
Info self;
|
||||||
|
self.hostname = DNSResolver::instance().getHostName();
|
||||||
|
self.table_id = storage_id.hasUUID() ? toString(storage_id.uuid) : storage_id.getFullTableName();
|
||||||
|
return self;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string serialize() const
|
||||||
|
{
|
||||||
|
WriteBufferFromOwnString buf;
|
||||||
|
size_t version = 0;
|
||||||
|
buf << version << "\n";
|
||||||
|
buf << hostname << "\n";
|
||||||
|
buf << table_id << "\n";
|
||||||
|
return buf.str();
|
||||||
|
}
|
||||||
|
|
||||||
|
static Info deserialize(const std::string & str)
|
||||||
|
{
|
||||||
|
ReadBufferFromString buf(str);
|
||||||
|
Info info;
|
||||||
|
size_t version;
|
||||||
|
buf >> version >> "\n";
|
||||||
|
buf >> info.hostname >> "\n";
|
||||||
|
buf >> info.table_id >> "\n";
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObjectStorageQueueMetadata::registerIfNot(const StorageID & storage_id)
|
||||||
|
{
|
||||||
|
const auto registry_path = zookeeper_path / "registry";
|
||||||
|
const auto self = Info::create(storage_id);
|
||||||
|
|
||||||
|
Coordination::Error code;
|
||||||
|
for (size_t i = 0; i < 1000; ++i)
|
||||||
|
{
|
||||||
|
Coordination::Stat stat;
|
||||||
|
std::string registry_str;
|
||||||
|
auto zk_client = getZooKeeper();
|
||||||
|
|
||||||
|
bool node_exists = zk_client->tryGet(registry_path, registry_str, &stat);
|
||||||
|
if (node_exists)
|
||||||
|
{
|
||||||
|
Strings registered;
|
||||||
|
splitInto<','>(registered, registry_str);
|
||||||
|
|
||||||
|
for (const auto & elem : registered)
|
||||||
|
{
|
||||||
|
if (elem.empty())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
auto info = Info::deserialize(elem);
|
||||||
|
if (info == self)
|
||||||
|
{
|
||||||
|
LOG_TRACE(log, "Table {} is already registered", self.table_id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG_TRACE(log, "Adding {} to registry", self.table_id);
|
||||||
|
|
||||||
|
if (node_exists)
|
||||||
|
{
|
||||||
|
auto new_registry_str = registry_str + "," + self.serialize();
|
||||||
|
code = zk_client->trySet(registry_path, new_registry_str, stat.version);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
code = zk_client->tryCreate(registry_path, self.serialize(), zkutil::CreateMode::Persistent);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code == Coordination::Error::ZOK)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (code == Coordination::Error::ZBADVERSION
|
||||||
|
|| code == Coordination::Error::ZSESSIONEXPIRED)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
throw zkutil::KeeperException(code);
|
||||||
|
}
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot register in keeper. Last error: {}", code);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t ObjectStorageQueueMetadata::unregister(const StorageID & storage_id)
|
||||||
|
{
|
||||||
|
const auto registry_path = zookeeper_path / "registry";
|
||||||
|
const auto self = Info::create(storage_id);
|
||||||
|
|
||||||
|
Coordination::Error code;
|
||||||
|
for (size_t i = 0; i < 1000; ++i)
|
||||||
|
{
|
||||||
|
Coordination::Stat stat;
|
||||||
|
std::string registry_str;
|
||||||
|
auto zk_client = getZooKeeper();
|
||||||
|
|
||||||
|
bool node_exists = zk_client->tryGet(registry_path, registry_str, &stat);
|
||||||
|
if (!node_exists)
|
||||||
|
{
|
||||||
|
LOG_WARNING(log, "Cannot unregister: registry does not exist");
|
||||||
|
chassert(false);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
Strings registered;
|
||||||
|
splitInto<','>(registered, registry_str);
|
||||||
|
|
||||||
|
bool found = false;
|
||||||
|
std::string new_registry_str;
|
||||||
|
size_t count = 0;
|
||||||
|
for (const auto & elem : registered)
|
||||||
|
{
|
||||||
|
if (elem.empty())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
auto info = Info::deserialize(elem);
|
||||||
|
if (info == self)
|
||||||
|
found = true;
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (!new_registry_str.empty())
|
||||||
|
new_registry_str += ",";
|
||||||
|
new_registry_str += elem;
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!found)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unregister: not registered");
|
||||||
|
|
||||||
|
code = zk_client->trySet(registry_path, new_registry_str, stat.version);
|
||||||
|
|
||||||
|
if (code == Coordination::Error::ZOK)
|
||||||
|
return count;
|
||||||
|
|
||||||
|
if (code == Coordination::Error::ZBADVERSION
|
||||||
|
|| code == Coordination::Error::ZSESSIONEXPIRED)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
throw zkutil::KeeperException(code);
|
||||||
|
}
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unregister in keeper. Last error: {}", code);
|
||||||
|
}
|
||||||
|
|
||||||
void ObjectStorageQueueMetadata::cleanupThreadFunc()
|
void ObjectStorageQueueMetadata::cleanupThreadFunc()
|
||||||
{
|
{
|
||||||
/// A background task is responsible for maintaining
|
/// A background task is responsible for maintaining
|
||||||
|
@ -71,6 +71,9 @@ public:
|
|||||||
bool is_attach,
|
bool is_attach,
|
||||||
LoggerPtr log);
|
LoggerPtr log);
|
||||||
|
|
||||||
|
void registerIfNot(const StorageID & storage_id);
|
||||||
|
size_t unregister(const StorageID & storage_id);
|
||||||
|
|
||||||
void shutdown();
|
void shutdown();
|
||||||
|
|
||||||
FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {});
|
FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {});
|
||||||
|
@ -16,7 +16,8 @@ ObjectStorageQueueMetadataFactory & ObjectStorageQueueMetadataFactory::instance(
|
|||||||
|
|
||||||
ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFactory::getOrCreate(
|
ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFactory::getOrCreate(
|
||||||
const std::string & zookeeper_path,
|
const std::string & zookeeper_path,
|
||||||
ObjectStorageQueueMetadataPtr metadata)
|
ObjectStorageQueueMetadataPtr metadata,
|
||||||
|
const StorageID & storage_id)
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
auto it = metadata_by_path.find(zookeeper_path);
|
auto it = metadata_by_path.find(zookeeper_path);
|
||||||
@ -27,16 +28,16 @@ ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFa
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto & metadata_from_table = metadata->getTableMetadata();
|
auto & metadata_from_table = metadata->getTableMetadata();
|
||||||
auto & metadata_from_keeper = it->second.metadata->getTableMetadata();
|
auto & metadata_from_keeper = it->second->getTableMetadata();
|
||||||
|
|
||||||
metadata_from_table.checkEquals(metadata_from_keeper);
|
metadata_from_table.checkEquals(metadata_from_keeper);
|
||||||
|
|
||||||
it->second.ref_count += 1;
|
|
||||||
}
|
}
|
||||||
return it->second.metadata;
|
|
||||||
|
it->second->registerIfNot(storage_id);
|
||||||
|
return it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_path)
|
void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_path, const StorageID & storage_id)
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
auto it = metadata_by_path.find(zookeeper_path);
|
auto it = metadata_by_path.find(zookeeper_path);
|
||||||
@ -44,17 +45,19 @@ void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_pat
|
|||||||
if (it == metadata_by_path.end())
|
if (it == metadata_by_path.end())
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata with zookeeper path {} does not exist", zookeeper_path);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata with zookeeper path {} does not exist", zookeeper_path);
|
||||||
|
|
||||||
chassert(it->second.ref_count > 0);
|
const size_t registry_size = it->second->unregister(storage_id);
|
||||||
if (--it->second.ref_count == 0)
|
LOG_TRACE(log, "Remaining registry size: {}", registry_size);
|
||||||
|
|
||||||
|
if (registry_size == 0)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto zk_client = Context::getGlobalContextInstance()->getZooKeeper();
|
auto zk_client = Context::getGlobalContextInstance()->getZooKeeper();
|
||||||
zk_client->tryRemove(it->first);
|
zk_client->removeRecursive(it->first);
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
tryLogCurrentException(log);
|
||||||
}
|
}
|
||||||
|
|
||||||
metadata_by_path.erase(it);
|
metadata_by_path.erase(it);
|
||||||
@ -64,8 +67,8 @@ void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_pat
|
|||||||
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> ObjectStorageQueueMetadataFactory::getAll()
|
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> ObjectStorageQueueMetadataFactory::getAll()
|
||||||
{
|
{
|
||||||
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> result;
|
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> result;
|
||||||
for (const auto & [zk_path, metadata_and_ref_count] : metadata_by_path)
|
for (const auto & [zk_path, metadata] : metadata_by_path)
|
||||||
result.emplace(zk_path, metadata_and_ref_count.metadata);
|
result.emplace(zk_path, metadata);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14,25 +14,19 @@ public:
|
|||||||
|
|
||||||
FilesMetadataPtr getOrCreate(
|
FilesMetadataPtr getOrCreate(
|
||||||
const std::string & zookeeper_path,
|
const std::string & zookeeper_path,
|
||||||
ObjectStorageQueueMetadataPtr metadata);
|
ObjectStorageQueueMetadataPtr metadata,
|
||||||
|
const StorageID & storage_id);
|
||||||
|
|
||||||
void remove(const std::string & zookeeper_path);
|
void remove(const std::string & zookeeper_path, const StorageID & storage_id);
|
||||||
|
|
||||||
std::unordered_map<std::string, FilesMetadataPtr> getAll();
|
std::unordered_map<std::string, FilesMetadataPtr> getAll();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct Metadata
|
using MetadataByPath = std::unordered_map<std::string, std::shared_ptr<ObjectStorageQueueMetadata>>;
|
||||||
{
|
|
||||||
explicit Metadata(std::shared_ptr<ObjectStorageQueueMetadata> metadata_) : metadata(metadata_), ref_count(1) {}
|
|
||||||
|
|
||||||
std::shared_ptr<ObjectStorageQueueMetadata> metadata;
|
|
||||||
/// TODO: the ref count should be kept in keeper, because of the case with distributed processing.
|
|
||||||
size_t ref_count = 0;
|
|
||||||
};
|
|
||||||
using MetadataByPath = std::unordered_map<std::string, Metadata>;
|
|
||||||
|
|
||||||
MetadataByPath metadata_by_path;
|
MetadataByPath metadata_by_path;
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
|
LoggerPtr log = getLogger("QueueMetadataFactory");
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -30,8 +30,8 @@ namespace ErrorCodes
|
|||||||
DECLARE(UInt64, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
|
DECLARE(UInt64, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
|
||||||
DECLARE(UInt64, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
|
DECLARE(UInt64, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
|
||||||
DECLARE(UInt64, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
|
DECLARE(UInt64, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
|
||||||
DECLARE(UInt64, polling_max_timeout_ms, 10 * 60 * 1000, "Maximum timeout before next polling", 0) \
|
DECLARE(UInt64, polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
|
||||||
DECLARE(UInt64, polling_backoff_ms, 30 * 1000, "Polling backoff", 0) \
|
DECLARE(UInt64, polling_backoff_ms, 1000, "Polling backoff", 0) \
|
||||||
DECLARE(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
|
DECLARE(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
|
||||||
DECLARE(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
|
DECLARE(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
|
||||||
DECLARE(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
|
DECLARE(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
|
||||||
|
@ -214,9 +214,12 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
|
|||||||
zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->format, context_, is_attach, log);
|
zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->format, context_, is_attach, log);
|
||||||
|
|
||||||
auto queue_metadata = std::make_unique<ObjectStorageQueueMetadata>(
|
auto queue_metadata = std::make_unique<ObjectStorageQueueMetadata>(
|
||||||
zk_path, std::move(table_metadata), (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms], (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms]);
|
zk_path,
|
||||||
|
std::move(table_metadata),
|
||||||
|
(*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms],
|
||||||
|
(*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms]);
|
||||||
|
|
||||||
files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata));
|
files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata), table_id_);
|
||||||
|
|
||||||
task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); });
|
task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); });
|
||||||
}
|
}
|
||||||
@ -248,7 +251,7 @@ void StorageObjectStorageQueue::shutdown(bool is_drop)
|
|||||||
|
|
||||||
void StorageObjectStorageQueue::drop()
|
void StorageObjectStorageQueue::drop()
|
||||||
{
|
{
|
||||||
ObjectStorageQueueMetadataFactory::instance().remove(zk_path);
|
ObjectStorageQueueMetadataFactory::instance().remove(zk_path, getStorageID());
|
||||||
}
|
}
|
||||||
|
|
||||||
bool StorageObjectStorageQueue::supportsSubsetOfColumns(const ContextPtr & context_) const
|
bool StorageObjectStorageQueue::supportsSubsetOfColumns(const ContextPtr & context_) const
|
||||||
|
@ -2276,3 +2276,127 @@ def test_alter_settings(started_cluster):
|
|||||||
|
|
||||||
check_int_settings(node, int_settings)
|
check_int_settings(node, int_settings)
|
||||||
check_string_settings(node, string_settings)
|
check_string_settings(node, string_settings)
|
||||||
|
|
||||||
|
|
||||||
|
def test_registry(started_cluster):
|
||||||
|
node1 = started_cluster.instances["node1"]
|
||||||
|
node2 = started_cluster.instances["node2"]
|
||||||
|
|
||||||
|
table_name = f"test_registry_{uuid.uuid4().hex[:8]}"
|
||||||
|
db_name = f"db_{table_name}"
|
||||||
|
dst_table_name = f"{table_name}_dst"
|
||||||
|
keeper_path = f"/clickhouse/test_{table_name}"
|
||||||
|
files_path = f"{table_name}_data"
|
||||||
|
files_to_generate = 1000
|
||||||
|
|
||||||
|
node1.query(f"DROP DATABASE IF EXISTS {db_name}")
|
||||||
|
node2.query(f"DROP DATABASE IF EXISTS {db_name}")
|
||||||
|
|
||||||
|
node1.query(
|
||||||
|
f"CREATE DATABASE {db_name} ENGINE=Replicated('/clickhouse/databases/replicateddb', 'shard1', 'node1')"
|
||||||
|
)
|
||||||
|
node2.query(
|
||||||
|
f"CREATE DATABASE {db_name} ENGINE=Replicated('/clickhouse/databases/replicateddb', 'shard1', 'node2')"
|
||||||
|
)
|
||||||
|
|
||||||
|
create_table(
|
||||||
|
started_cluster,
|
||||||
|
node1,
|
||||||
|
table_name,
|
||||||
|
"ordered",
|
||||||
|
files_path,
|
||||||
|
additional_settings={"keeper_path": keeper_path, "buckets": 3},
|
||||||
|
database_name=db_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
zk = started_cluster.get_kazoo_client("zoo1")
|
||||||
|
registry, stat = zk.get(f"{keeper_path}/registry/")
|
||||||
|
|
||||||
|
uuid1 = node1.query(
|
||||||
|
f"SELECT uuid FROM system.tables WHERE database = '{db_name}' and table = '{table_name}'"
|
||||||
|
).strip()
|
||||||
|
assert uuid1 in str(registry)
|
||||||
|
|
||||||
|
expected = [f"0\\nnode1\\n{uuid1}\\n", f"0\\nnode2\\n{uuid1}\\n"]
|
||||||
|
|
||||||
|
for elem in expected:
|
||||||
|
assert elem in str(registry)
|
||||||
|
|
||||||
|
total_values = generate_random_files(
|
||||||
|
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
|
||||||
|
)
|
||||||
|
|
||||||
|
create_mv(node1, f"{db_name}.{table_name}", dst_table_name)
|
||||||
|
create_mv(node2, f"{db_name}.{table_name}", dst_table_name)
|
||||||
|
|
||||||
|
def get_count():
|
||||||
|
return int(
|
||||||
|
node1.query(
|
||||||
|
f"SELECT count() FROM clusterAllReplicas(cluster, default.{dst_table_name})"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
expected_rows = files_to_generate
|
||||||
|
for _ in range(20):
|
||||||
|
if expected_rows == get_count():
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
assert expected_rows == get_count()
|
||||||
|
|
||||||
|
table_name_2 = f"test_registry_{uuid.uuid4().hex[:8]}_2"
|
||||||
|
create_table(
|
||||||
|
started_cluster,
|
||||||
|
node1,
|
||||||
|
table_name_2,
|
||||||
|
"ordered",
|
||||||
|
files_path,
|
||||||
|
additional_settings={"keeper_path": keeper_path, "buckets": 3},
|
||||||
|
database_name=db_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
registry, stat = zk.get(f"{keeper_path}/registry/")
|
||||||
|
|
||||||
|
uuid2 = node1.query(
|
||||||
|
f"SELECT uuid FROM system.tables WHERE database = '{db_name}' and table = '{table_name_2}'"
|
||||||
|
).strip()
|
||||||
|
|
||||||
|
assert uuid1 in str(registry)
|
||||||
|
assert uuid2 in str(registry)
|
||||||
|
|
||||||
|
expected = [
|
||||||
|
f"0\\nnode1\\n{uuid1}\\n",
|
||||||
|
f"0\\nnode2\\n{uuid1}\\n",
|
||||||
|
f"0\\nnode1\\n{uuid2}\\n",
|
||||||
|
f"0\\nnode2\\n{uuid2}\\n",
|
||||||
|
]
|
||||||
|
|
||||||
|
for elem in expected:
|
||||||
|
assert elem in str(registry)
|
||||||
|
|
||||||
|
node1.restart_clickhouse()
|
||||||
|
node2.restart_clickhouse()
|
||||||
|
|
||||||
|
registry, stat = zk.get(f"{keeper_path}/registry/")
|
||||||
|
|
||||||
|
assert uuid1 in str(registry)
|
||||||
|
assert uuid2 in str(registry)
|
||||||
|
|
||||||
|
node1.query(f"DROP TABLE {db_name}.{table_name_2} SYNC")
|
||||||
|
|
||||||
|
assert zk.exists(keeper_path) is not None
|
||||||
|
registry, stat = zk.get(f"{keeper_path}/registry/")
|
||||||
|
|
||||||
|
assert uuid1 in str(registry)
|
||||||
|
assert uuid2 not in str(registry)
|
||||||
|
|
||||||
|
expected = [
|
||||||
|
f"0\\nnode1\\n{uuid1}\\n",
|
||||||
|
f"0\\nnode2\\n{uuid1}\\n",
|
||||||
|
]
|
||||||
|
|
||||||
|
for elem in expected:
|
||||||
|
assert elem in str(registry)
|
||||||
|
|
||||||
|
node1.query(f"DROP TABLE {db_name}.{table_name} SYNC")
|
||||||
|
|
||||||
|
assert zk.exists(keeper_path) is None
|
||||||
|
Loading…
Reference in New Issue
Block a user