Merge pull request #72810 from ClickHouse/disable-metadata-deletion-in-s3-queue

Better deletion of metadata in s3queue
This commit is contained in:
Kseniia Sumarokova 2024-12-09 16:35:48 +00:00 committed by GitHub
commit 63603cc6d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 336 additions and 22 deletions

View File

@ -17,6 +17,7 @@
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/getRandomASCIIString.h> #include <Common/getRandomASCIIString.h>
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <Common/DNSResolver.h>
#include <numeric> #include <numeric>
@ -438,6 +439,163 @@ ObjectStorageQueueTableMetadata ObjectStorageQueueMetadata::syncWithKeeper(
"of wrong zookeeper path or because of logical error"); "of wrong zookeeper path or because of logical error");
} }
namespace
{
struct Info
{
std::string hostname;
std::string table_id;
bool operator ==(const Info & other) const
{
return hostname == other.hostname && table_id == other.table_id;
}
static Info create(const StorageID & storage_id)
{
Info self;
self.hostname = DNSResolver::instance().getHostName();
self.table_id = storage_id.hasUUID() ? toString(storage_id.uuid) : storage_id.getFullTableName();
return self;
}
std::string serialize() const
{
WriteBufferFromOwnString buf;
size_t version = 0;
buf << version << "\n";
buf << hostname << "\n";
buf << table_id << "\n";
return buf.str();
}
static Info deserialize(const std::string & str)
{
ReadBufferFromString buf(str);
Info info;
size_t version;
buf >> version >> "\n";
buf >> info.hostname >> "\n";
buf >> info.table_id >> "\n";
return info;
}
};
}
void ObjectStorageQueueMetadata::registerIfNot(const StorageID & storage_id)
{
const auto registry_path = zookeeper_path / "registry";
const auto self = Info::create(storage_id);
Coordination::Error code;
for (size_t i = 0; i < 1000; ++i)
{
Coordination::Stat stat;
std::string registry_str;
auto zk_client = getZooKeeper();
if (zk_client->tryGet(registry_path, registry_str, &stat))
{
Strings registered;
splitInto<','>(registered, registry_str);
for (const auto & elem : registered)
{
if (elem.empty())
continue;
auto info = Info::deserialize(elem);
if (info == self)
{
LOG_TRACE(log, "Table {} is already registered", self.table_id);
return;
}
}
auto new_registry_str = registry_str + "," + self.serialize();
code = zk_client->trySet(registry_path, new_registry_str, stat.version);
}
else
code = zk_client->tryCreate(registry_path, self.serialize(), zkutil::CreateMode::Persistent);
if (code == Coordination::Error::ZOK)
{
LOG_TRACE(log, "Added {} to registry", self.table_id);
return;
}
if (code == Coordination::Error::ZBADVERSION
|| code == Coordination::Error::ZSESSIONEXPIRED)
continue;
throw zkutil::KeeperException(code);
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot register in keeper. Last error: {}", code);
}
size_t ObjectStorageQueueMetadata::unregister(const StorageID & storage_id)
{
const auto registry_path = zookeeper_path / "registry";
const auto self = Info::create(storage_id);
Coordination::Error code = Coordination::Error::ZOK;
for (size_t i = 0; i < 1000; ++i)
{
Coordination::Stat stat;
std::string registry_str;
auto zk_client = getZooKeeper();
bool node_exists = zk_client->tryGet(registry_path, registry_str, &stat);
if (!node_exists)
{
LOG_WARNING(log, "Cannot unregister: registry does not exist");
chassert(false);
return 0;
}
Strings registered;
splitInto<','>(registered, registry_str);
bool found = false;
std::string new_registry_str;
size_t count = 0;
for (const auto & elem : registered)
{
if (elem.empty())
continue;
auto info = Info::deserialize(elem);
if (info == self)
found = true;
else
{
if (!new_registry_str.empty())
new_registry_str += ",";
new_registry_str += elem;
count += 1;
}
}
if (!found)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unregister: not registered");
code = zk_client->trySet(registry_path, new_registry_str, stat.version);
if (code == Coordination::Error::ZOK)
return count;
if (Coordination::isHardwareError(code)
|| code == Coordination::Error::ZBADVERSION)
continue;
throw zkutil::KeeperException(code);
}
if (Coordination::isHardwareError(code))
throw zkutil::KeeperException(code);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unregister in keeper. Last error: {}", code);
}
void ObjectStorageQueueMetadata::cleanupThreadFunc() void ObjectStorageQueueMetadata::cleanupThreadFunc()
{ {
/// A background task is responsible for maintaining /// A background task is responsible for maintaining

View File

@ -71,6 +71,9 @@ public:
bool is_attach, bool is_attach,
LoggerPtr log); LoggerPtr log);
void registerIfNot(const StorageID & storage_id);
size_t unregister(const StorageID & storage_id);
void shutdown(); void shutdown();
FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {}); FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {});

View File

@ -16,7 +16,8 @@ ObjectStorageQueueMetadataFactory & ObjectStorageQueueMetadataFactory::instance(
ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFactory::getOrCreate( ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFactory::getOrCreate(
const std::string & zookeeper_path, const std::string & zookeeper_path,
ObjectStorageQueueMetadataPtr metadata) ObjectStorageQueueMetadataPtr metadata,
const StorageID & storage_id)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto it = metadata_by_path.find(zookeeper_path); auto it = metadata_by_path.find(zookeeper_path);
@ -30,13 +31,14 @@ ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFa
auto & metadata_from_keeper = it->second.metadata->getTableMetadata(); auto & metadata_from_keeper = it->second.metadata->getTableMetadata();
metadata_from_table.checkEquals(metadata_from_keeper); metadata_from_table.checkEquals(metadata_from_keeper);
it->second.ref_count += 1;
} }
it->second.metadata->registerIfNot(storage_id);
it->second.ref_count += 1;
return it->second.metadata; return it->second.metadata;
} }
void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_path) void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_path, const StorageID & storage_id)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto it = metadata_by_path.find(zookeeper_path); auto it = metadata_by_path.find(zookeeper_path);
@ -44,28 +46,52 @@ void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_pat
if (it == metadata_by_path.end()) if (it == metadata_by_path.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata with zookeeper path {} does not exist", zookeeper_path); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata with zookeeper path {} does not exist", zookeeper_path);
chassert(it->second.ref_count > 0); it->second.ref_count -= 1;
if (--it->second.ref_count == 0)
size_t registry_size;
try
{
registry_size = it->second.metadata->unregister(storage_id);
LOG_TRACE(log, "Remaining registry size: {}", registry_size);
}
catch (const zkutil::KeeperException & e)
{
if (!Coordination::isHardwareError(e.code))
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// Any non-zero value would do.
registry_size = 1;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
/// Any non-zero value would do.
registry_size = 1;
}
if (registry_size == 0)
{ {
try try
{ {
auto zk_client = Context::getGlobalContextInstance()->getZooKeeper(); auto zk_client = Context::getGlobalContextInstance()->getZooKeeper();
zk_client->tryRemove(it->first); zk_client->removeRecursive(it->first);
} }
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(log);
} }
metadata_by_path.erase(it);
} }
if (!it->second.ref_count)
metadata_by_path.erase(it);
} }
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> ObjectStorageQueueMetadataFactory::getAll() std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> ObjectStorageQueueMetadataFactory::getAll()
{ {
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> result; std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> result;
for (const auto & [zk_path, metadata_and_ref_count] : metadata_by_path) for (const auto & [zk_path, metadata] : metadata_by_path)
result.emplace(zk_path, metadata_and_ref_count.metadata); result.emplace(zk_path, metadata.metadata);
return result; return result;
} }

View File

@ -14,25 +14,25 @@ public:
FilesMetadataPtr getOrCreate( FilesMetadataPtr getOrCreate(
const std::string & zookeeper_path, const std::string & zookeeper_path,
ObjectStorageQueueMetadataPtr metadata); ObjectStorageQueueMetadataPtr metadata,
const StorageID & storage_id);
void remove(const std::string & zookeeper_path); void remove(const std::string & zookeeper_path, const StorageID & storage_id);
std::unordered_map<std::string, FilesMetadataPtr> getAll(); std::unordered_map<std::string, FilesMetadataPtr> getAll();
private: private:
struct Metadata struct MetadataWithRefCount
{ {
explicit Metadata(std::shared_ptr<ObjectStorageQueueMetadata> metadata_) : metadata(metadata_), ref_count(1) {} explicit MetadataWithRefCount(std::shared_ptr<ObjectStorageQueueMetadata> metadata_) : metadata(metadata_) {}
std::shared_ptr<ObjectStorageQueueMetadata> metadata; std::shared_ptr<ObjectStorageQueueMetadata> metadata;
/// TODO: the ref count should be kept in keeper, because of the case with distributed processing.
size_t ref_count = 0; size_t ref_count = 0;
}; };
using MetadataByPath = std::unordered_map<std::string, Metadata>; using MetadataByPath = std::unordered_map<std::string, MetadataWithRefCount>;
MetadataByPath metadata_by_path; MetadataByPath metadata_by_path;
std::mutex mutex; std::mutex mutex;
LoggerPtr log = getLogger("QueueMetadataFactory");
}; };
} }

View File

@ -214,9 +214,12 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->format, context_, is_attach, log); zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->format, context_, is_attach, log);
auto queue_metadata = std::make_unique<ObjectStorageQueueMetadata>( auto queue_metadata = std::make_unique<ObjectStorageQueueMetadata>(
zk_path, std::move(table_metadata), (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms], (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms]); zk_path,
std::move(table_metadata),
(*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms],
(*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms]);
files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata)); files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata), table_id_);
task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); }); task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); });
} }
@ -248,7 +251,7 @@ void StorageObjectStorageQueue::shutdown(bool is_drop)
void StorageObjectStorageQueue::drop() void StorageObjectStorageQueue::drop()
{ {
ObjectStorageQueueMetadataFactory::instance().remove(zk_path); ObjectStorageQueueMetadataFactory::instance().remove(zk_path, getStorageID());
} }
bool StorageObjectStorageQueue::supportsSubsetOfColumns(const ContextPtr & context_) const bool StorageObjectStorageQueue::supportsSubsetOfColumns(const ContextPtr & context_) const

View File

@ -2403,3 +2403,127 @@ def test_list_and_delete_race(started_cluster):
assert node.contains_in_log( assert node.contains_in_log(
"because of the race with list & delete" "because of the race with list & delete"
) or node_2.contains_in_log("because of the race with list & delete") ) or node_2.contains_in_log("because of the race with list & delete")
def test_registry(started_cluster):
node1 = started_cluster.instances["node1"]
node2 = started_cluster.instances["node2"]
table_name = f"test_registry_{uuid.uuid4().hex[:8]}"
db_name = f"db_{table_name}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 1000
node1.query(f"DROP DATABASE IF EXISTS {db_name}")
node2.query(f"DROP DATABASE IF EXISTS {db_name}")
node1.query(
f"CREATE DATABASE {db_name} ENGINE=Replicated('/clickhouse/databases/replicateddb2', 'shard1', 'node1')"
)
node2.query(
f"CREATE DATABASE {db_name} ENGINE=Replicated('/clickhouse/databases/replicateddb2', 'shard1', 'node2')"
)
create_table(
started_cluster,
node1,
table_name,
"ordered",
files_path,
additional_settings={"keeper_path": keeper_path, "buckets": 3},
database_name=db_name,
)
zk = started_cluster.get_kazoo_client("zoo1")
registry, stat = zk.get(f"{keeper_path}/registry/")
uuid1 = node1.query(
f"SELECT uuid FROM system.tables WHERE database = '{db_name}' and table = '{table_name}'"
).strip()
assert uuid1 in str(registry)
expected = [f"0\\nnode1\\n{uuid1}\\n", f"0\\nnode2\\n{uuid1}\\n"]
for elem in expected:
assert elem in str(registry)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node1, f"{db_name}.{table_name}", dst_table_name)
create_mv(node2, f"{db_name}.{table_name}", dst_table_name)
def get_count():
return int(
node1.query(
f"SELECT count() FROM clusterAllReplicas(cluster, default.{dst_table_name})"
)
)
expected_rows = files_to_generate
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
table_name_2 = f"test_registry_{uuid.uuid4().hex[:8]}_2"
create_table(
started_cluster,
node1,
table_name_2,
"ordered",
files_path,
additional_settings={"keeper_path": keeper_path, "buckets": 3},
database_name=db_name,
)
registry, stat = zk.get(f"{keeper_path}/registry/")
uuid2 = node1.query(
f"SELECT uuid FROM system.tables WHERE database = '{db_name}' and table = '{table_name_2}'"
).strip()
assert uuid1 in str(registry)
assert uuid2 in str(registry)
expected = [
f"0\\nnode1\\n{uuid1}\\n",
f"0\\nnode2\\n{uuid1}\\n",
f"0\\nnode1\\n{uuid2}\\n",
f"0\\nnode2\\n{uuid2}\\n",
]
for elem in expected:
assert elem in str(registry)
node1.restart_clickhouse()
node2.restart_clickhouse()
registry, stat = zk.get(f"{keeper_path}/registry/")
assert uuid1 in str(registry)
assert uuid2 in str(registry)
node1.query(f"DROP TABLE {db_name}.{table_name_2} SYNC")
assert zk.exists(keeper_path) is not None
registry, stat = zk.get(f"{keeper_path}/registry/")
assert uuid1 in str(registry)
assert uuid2 not in str(registry)
expected = [
f"0\\nnode1\\n{uuid1}\\n",
f"0\\nnode2\\n{uuid1}\\n",
]
for elem in expected:
assert elem in str(registry)
node1.query(f"DROP TABLE {db_name}.{table_name} SYNC")
assert zk.exists(keeper_path) is None