Move creation and validation of table metadata before creating queue metadata & save to factory

This commit is contained in:
kssenii 2024-09-11 15:25:32 +02:00
parent 4e0b3f8415
commit a05610c38f
7 changed files with 133 additions and 175 deletions

View File

@ -33,7 +33,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int REPLICA_ALREADY_EXISTS; extern const int REPLICA_ALREADY_EXISTS;
extern const int INCOMPATIBLE_COLUMNS;
} }
namespace namespace
@ -108,8 +107,12 @@ private:
} }
}; };
ObjectStorageQueueMetadata::ObjectStorageQueueMetadata(const fs::path & zookeeper_path_, const ObjectStorageQueueSettings & settings_) ObjectStorageQueueMetadata::ObjectStorageQueueMetadata(
const fs::path & zookeeper_path_,
const ObjectStorageQueueTableMetadata & table_metadata_,
const ObjectStorageQueueSettings & settings_)
: settings(settings_) : settings(settings_)
, table_metadata(table_metadata_)
, zookeeper_path(zookeeper_path_) , zookeeper_path(zookeeper_path_)
, buckets_num(getBucketsNum(settings_)) , buckets_num(getBucketsNum(settings_))
, log(getLogger("StorageObjectStorageQueue(" + zookeeper_path_.string() + ")")) , log(getLogger("StorageObjectStorageQueue(" + zookeeper_path_.string() + ")"))
@ -144,11 +147,6 @@ void ObjectStorageQueueMetadata::shutdown()
task->deactivate(); task->deactivate();
} }
void ObjectStorageQueueMetadata::checkSettings(const ObjectStorageQueueSettings & settings_) const
{
ObjectStorageQueueTableMetadata::checkEquals(settings, settings_);
}
ObjectStorageQueueMetadata::FileStatusPtr ObjectStorageQueueMetadata::getFileStatus(const std::string & path) ObjectStorageQueueMetadata::FileStatusPtr ObjectStorageQueueMetadata::getFileStatus(const std::string & path)
{ {
return local_file_statuses->get(path, /* create */false); return local_file_statuses->get(path, /* create */false);
@ -219,13 +217,14 @@ ObjectStorageQueueMetadata::tryAcquireBucket(const Bucket & bucket, const Proces
return ObjectStorageQueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor, log); return ObjectStorageQueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor, log);
} }
void ObjectStorageQueueMetadata::initialize( void ObjectStorageQueueMetadata::syncWithKeeper(
const ConfigurationPtr & configuration, const fs::path & zookeeper_path,
const StorageInMemoryMetadata & storage_metadata) const ObjectStorageQueueTableMetadata & table_metadata,
const ObjectStorageQueueSettings & settings,
LoggerPtr log)
{ {
const auto metadata_from_table = ObjectStorageQueueTableMetadata(*configuration, settings, storage_metadata);
const auto & columns_from_table = storage_metadata.getColumns();
const auto table_metadata_path = zookeeper_path / "metadata"; const auto table_metadata_path = zookeeper_path / "metadata";
const auto buckets_num = getBucketsNum(settings);
const auto metadata_paths = settings.mode == ObjectStorageQueueMode::ORDERED const auto metadata_paths = settings.mode == ObjectStorageQueueMode::ORDERED
? ObjectStorageQueueOrderedFileMetadata::getMetadataPaths(buckets_num) ? ObjectStorageQueueOrderedFileMetadata::getMetadataPaths(buckets_num)
: ObjectStorageQueueUnorderedFileMetadata::getMetadataPaths(); : ObjectStorageQueueUnorderedFileMetadata::getMetadataPaths();
@ -237,24 +236,17 @@ void ObjectStorageQueueMetadata::initialize(
{ {
if (zookeeper->exists(table_metadata_path)) if (zookeeper->exists(table_metadata_path))
{ {
const auto metadata_from_zk = ObjectStorageQueueTableMetadata::parse(zookeeper->get(fs::path(zookeeper_path) / "metadata")); const auto metadata_str = zookeeper->get(fs::path(zookeeper_path) / "metadata");
const auto columns_from_zk = ColumnsDescription::parse(metadata_from_zk.columns); const auto metadata_from_zk = ObjectStorageQueueTableMetadata::parse(metadata_str);
metadata_from_table.checkEquals(metadata_from_zk); table_metadata.checkEquals(metadata_from_zk);
if (columns_from_zk != columns_from_table)
{
throw Exception(
ErrorCodes::INCOMPATIBLE_COLUMNS,
"Table columns structure in ZooKeeper is different from local table structure. "
"Local columns:\n{}\nZookeeper columns:\n{}",
columns_from_table.toString(), columns_from_zk.toString());
}
return; return;
} }
Coordination::Requests requests; Coordination::Requests requests;
requests.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent)); requests.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent));
requests.emplace_back(zkutil::makeCreateRequest(table_metadata_path, metadata_from_table.toString(), zkutil::CreateMode::Persistent)); requests.emplace_back(zkutil::makeCreateRequest(
table_metadata_path, table_metadata.toString(), zkutil::CreateMode::Persistent));
for (const auto & path : metadata_paths) for (const auto & path : metadata_paths)
{ {
@ -263,16 +255,27 @@ void ObjectStorageQueueMetadata::initialize(
} }
if (!settings.last_processed_path.value.empty()) if (!settings.last_processed_path.value.empty())
getFileMetadata(settings.last_processed_path)->setProcessedAtStartRequests(requests, zookeeper); {
ObjectStorageQueueOrderedFileMetadata(
zookeeper_path,
settings.last_processed_path,
std::make_shared<FileStatus>(),
/* bucket_info */nullptr,
buckets_num,
settings.loading_retries,
log).setProcessedAtStartRequests(requests, zookeeper);
}
Coordination::Responses responses; Coordination::Responses responses;
auto code = zookeeper->tryMulti(requests, responses); auto code = zookeeper->tryMulti(requests, responses);
if (code == Coordination::Error::ZNODEEXISTS) if (code == Coordination::Error::ZNODEEXISTS)
{ {
auto exception = zkutil::KeeperMultiException(code, requests, responses); auto exception = zkutil::KeeperMultiException(code, requests, responses);
LOG_INFO(log, "Got code `{}` for path: {}. " LOG_INFO(log, "Got code `{}` for path: {}. "
"It looks like the table {} was created by another server at the same moment, " "It looks like the table {} was created by another server at the same moment, "
"will retry", code, exception.getPathForFirstFailedOp(), zookeeper_path.string()); "will retry",
code, exception.getPathForFirstFailedOp(), zookeeper_path.string());
continue; continue;
} }
else if (code != Coordination::Error::ZOK) else if (code != Coordination::Error::ZOK)

View File

@ -8,6 +8,7 @@
#include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h> #include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h> #include <Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h> #include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -52,11 +53,19 @@ public:
using Bucket = size_t; using Bucket = size_t;
using Processor = std::string; using Processor = std::string;
ObjectStorageQueueMetadata(const fs::path & zookeeper_path_, const ObjectStorageQueueSettings & settings_); ObjectStorageQueueMetadata(
const fs::path & zookeeper_path_,
const ObjectStorageQueueTableMetadata & table_metadata_,
const ObjectStorageQueueSettings & settings_);
~ObjectStorageQueueMetadata(); ~ObjectStorageQueueMetadata();
void initialize(const ConfigurationPtr & configuration, const StorageInMemoryMetadata & storage_metadata); static void syncWithKeeper(
void checkSettings(const ObjectStorageQueueSettings & settings) const; const fs::path & zookeeper_path,
const ObjectStorageQueueTableMetadata & table_metadata,
const ObjectStorageQueueSettings & settings,
LoggerPtr log);
void shutdown(); void shutdown();
FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {}); FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {});
@ -72,11 +81,17 @@ public:
static size_t getBucketsNum(const ObjectStorageQueueSettings & settings); static size_t getBucketsNum(const ObjectStorageQueueSettings & settings);
static size_t getBucketsNum(const ObjectStorageQueueTableMetadata & settings); static size_t getBucketsNum(const ObjectStorageQueueTableMetadata & settings);
void checkTableMetadataEquals(const ObjectStorageQueueMetadata & other);
const ObjectStorageQueueTableMetadata & getTableMetadata() const { return table_metadata; }
ObjectStorageQueueTableMetadata & getTableMetadata() { return table_metadata; }
private: private:
void cleanupThreadFunc(); void cleanupThreadFunc();
void cleanupThreadFuncImpl(); void cleanupThreadFuncImpl();
const ObjectStorageQueueSettings settings; ObjectStorageQueueSettings settings;
ObjectStorageQueueTableMetadata table_metadata;
const fs::path zookeeper_path; const fs::path zookeeper_path;
const size_t buckets_num; const size_t buckets_num;
@ -89,4 +104,6 @@ private:
std::shared_ptr<LocalFileStatuses> local_file_statuses; std::shared_ptr<LocalFileStatuses> local_file_statuses;
}; };
using ObjectStorageQueueMetadataPtr = std::unique_ptr<ObjectStorageQueueMetadata>;
} }

View File

@ -14,19 +14,23 @@ ObjectStorageQueueMetadataFactory & ObjectStorageQueueMetadataFactory::instance(
return ret; return ret;
} }
ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFactory::getOrCreate(
ObjectStorageQueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const ObjectStorageQueueSettings & settings) const std::string & zookeeper_path,
ObjectStorageQueueMetadataPtr metadata)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto it = metadata_by_path.find(zookeeper_path); auto it = metadata_by_path.find(zookeeper_path);
if (it == metadata_by_path.end()) if (it == metadata_by_path.end())
{ {
auto files_metadata = std::make_shared<ObjectStorageQueueMetadata>(zookeeper_path, settings); it = metadata_by_path.emplace(zookeeper_path, std::move(metadata)).first;
it = metadata_by_path.emplace(zookeeper_path, std::move(files_metadata)).first;
} }
else else
{ {
it->second.metadata->checkSettings(settings); auto & metadata_from_table = metadata->getTableMetadata();
auto & metadata_from_keeper = it->second.metadata->getTableMetadata();
metadata_from_table.checkEquals(metadata_from_keeper);
it->second.ref_count += 1; it->second.ref_count += 1;
} }
return it->second.metadata; return it->second.metadata;

View File

@ -13,7 +13,9 @@ public:
static ObjectStorageQueueMetadataFactory & instance(); static ObjectStorageQueueMetadataFactory & instance();
FilesMetadataPtr getOrCreate(const std::string & zookeeper_path, const ObjectStorageQueueSettings & settings); FilesMetadataPtr getOrCreate(
const std::string & zookeeper_path,
ObjectStorageQueueMetadataPtr metadata);
void remove(const std::string & zookeeper_path); void remove(const std::string & zookeeper_path);

View File

@ -1,6 +1,5 @@
#include <config.h> #include <config.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h> #include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h> #include <Poco/JSON/Parser.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h> #include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
@ -32,18 +31,18 @@ namespace
ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata( ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(
const StorageObjectStorage::Configuration & configuration,
const ObjectStorageQueueSettings & engine_settings, const ObjectStorageQueueSettings & engine_settings,
const StorageInMemoryMetadata & storage_metadata) const ColumnsDescription & columns_,
const std::string & format_)
: format_name(format_)
, columns(columns_.toString())
, after_processing(engine_settings.after_processing.toString())
, mode(engine_settings.mode.toString())
, tracked_files_limit(engine_settings.tracked_files_limit)
, tracked_file_ttl_sec(engine_settings.tracked_file_ttl_sec)
, buckets(engine_settings.buckets)
, processing_threads_num(engine_settings.processing_threads_num)
{ {
format_name = configuration.format;
after_processing = engine_settings.after_processing.toString();
mode = engine_settings.mode.toString();
tracked_files_limit = engine_settings.tracked_files_limit;
tracked_file_ttl_sec = engine_settings.tracked_file_ttl_sec;
buckets = engine_settings.buckets;
processing_threads_num = engine_settings.processing_threads_num;
columns = storage_metadata.getColumns().toString();
} }
String ObjectStorageQueueTableMetadata::toString() const String ObjectStorageQueueTableMetadata::toString() const
@ -65,48 +64,40 @@ String ObjectStorageQueueTableMetadata::toString() const
return oss.str(); return oss.str();
} }
void ObjectStorageQueueTableMetadata::read(const String & metadata_str) template <typename T>
static auto getOrDefault(
const Poco::JSON::Object::Ptr & json,
const std::string & setting,
const std::string & compatibility_prefix,
const T & default_value)
{ {
Poco::JSON::Parser parser; if (!compatibility_prefix.empty() && json->has(compatibility_prefix + setting))
auto json = parser.parse(metadata_str).extract<Poco::JSON::Object::Ptr>(); return json->getValue<T>(compatibility_prefix + setting);
after_processing = json->getValue<String>("after_processing"); if (json->has(setting))
mode = json->getValue<String>("mode"); return json->getValue<T>(setting);
format_name = json->getValue<String>("format_name"); return default_value;
columns = json->getValue<String>("columns"); }
/// Check with "s3queue_" prefix for compatibility. ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(const Poco::JSON::Object::Ptr & json)
{ : format_name(json->getValue<String>("format_name"))
if (json->has("s3queue_tracked_files_limit")) , columns(json->getValue<String>("columns"))
tracked_files_limit = json->getValue<UInt64>("s3queue_tracked_files_limit"); , after_processing(json->getValue<String>("after_processing"))
if (json->has("s3queue_tracked_file_ttl_sec")) , mode(json->getValue<String>("mode"))
tracked_file_ttl_sec = json->getValue<UInt64>("s3queue_tracked_file_ttl_sec"); , tracked_files_limit(getOrDefault(json, "tracked_files_limit", "s3queue_", 0))
if (json->has("s3queue_processing_threads_num")) , tracked_file_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "s3queue_", 0))
processing_threads_num = json->getValue<UInt64>("s3queue_processing_threads_num"); , buckets(getOrDefault(json, "buckets", "", 0))
} , processing_threads_num(getOrDefault(json, "processing_threads_num", "s3queue_", 0))
, last_processed_path(getOrDefault<String>(json, "last_processed_file", "s3queue_", ""))
if (json->has("tracked_files_limit")) {
tracked_files_limit = json->getValue<UInt64>("tracked_files_limit");
if (json->has("tracked_file_ttl_sec"))
tracked_file_ttl_sec = json->getValue<UInt64>("tracked_file_ttl_sec");
if (json->has("last_processed_file"))
last_processed_path = json->getValue<String>("last_processed_file");
if (json->has("processing_threads_num"))
processing_threads_num = json->getValue<UInt64>("processing_threads_num");
if (json->has("buckets"))
buckets = json->getValue<UInt64>("buckets");
} }
ObjectStorageQueueTableMetadata ObjectStorageQueueTableMetadata::parse(const String & metadata_str) ObjectStorageQueueTableMetadata ObjectStorageQueueTableMetadata::parse(const String & metadata_str)
{ {
ObjectStorageQueueTableMetadata metadata; Poco::JSON::Parser parser;
metadata.read(metadata_str); auto json = parser.parse(metadata_str).extract<Poco::JSON::Object::Ptr>();
return metadata; return ObjectStorageQueueTableMetadata(json);
} }
void ObjectStorageQueueTableMetadata::checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const void ObjectStorageQueueTableMetadata::checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const
@ -181,72 +172,17 @@ void ObjectStorageQueueTableMetadata::checkImmutableFieldsEquals(const ObjectSto
ErrorCodes::METADATA_MISMATCH, ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in processing buckets. " "Existing table metadata in ZooKeeper differs in processing buckets. "
"Stored in ZooKeeper: {}, local: {}", "Stored in ZooKeeper: {}, local: {}",
ObjectStorageQueueMetadata::getBucketsNum(*this), ObjectStorageQueueMetadata::getBucketsNum(from_zk)); ObjectStorageQueueMetadata::getBucketsNum(from_zk), ObjectStorageQueueMetadata::getBucketsNum(*this));
} }
} }
if (columns != from_zk.columns)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in columns. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.columns,
columns);
} }
void ObjectStorageQueueTableMetadata::checkEquals(const ObjectStorageQueueSettings & current, const ObjectStorageQueueSettings & expected)
{
if (current.after_processing != expected.after_processing)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs "
"in action after processing. Stored in ZooKeeper: {}, local: {}",
expected.after_processing.toString(),
current.after_processing.toString());
if (current.mode != expected.mode)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in engine mode. "
"Stored in ZooKeeper: {}, local: {}",
expected.mode.toString(),
current.mode.toString());
if (current.tracked_files_limit != expected.tracked_files_limit)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set size. "
"Stored in ZooKeeper: {}, local: {}",
expected.tracked_files_limit,
current.tracked_files_limit);
if (current.tracked_file_ttl_sec != expected.tracked_file_ttl_sec)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set age. "
"Stored in ZooKeeper: {}, local: {}",
expected.tracked_file_ttl_sec,
current.tracked_file_ttl_sec);
if (current.last_processed_path.value != expected.last_processed_path.value)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in last_processed_path. "
"Stored in ZooKeeper: {}, local: {}",
expected.last_processed_path.value,
current.last_processed_path.value);
if (current.mode == ObjectStorageQueueMode::ORDERED)
{
if (current.buckets != expected.buckets)
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in buckets setting. "
"Stored in ZooKeeper: {}, local: {}",
expected.buckets, current.buckets);
}
if (ObjectStorageQueueMetadata::getBucketsNum(current) != ObjectStorageQueueMetadata::getBucketsNum(expected))
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in processing buckets. "
"Stored in ZooKeeper: {}, local: {}",
ObjectStorageQueueMetadata::getBucketsNum(current), ObjectStorageQueueMetadata::getBucketsNum(expected));
}
}
}
} }

View File

@ -3,6 +3,8 @@
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h> #include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/StorageInMemoryMetadata.h> #include <Storages/StorageInMemoryMetadata.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h> #include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <base/types.h> #include <base/types.h>
namespace DB namespace DB
@ -16,29 +18,28 @@ class ReadBuffer;
*/ */
struct ObjectStorageQueueTableMetadata struct ObjectStorageQueueTableMetadata
{ {
String format_name; const String format_name;
String columns; const String columns;
String after_processing; const String after_processing;
String mode; const String mode;
UInt64 tracked_files_limit = 0; const UInt64 tracked_files_limit;
UInt64 tracked_file_ttl_sec = 0; const UInt64 tracked_file_ttl_sec;
UInt64 buckets = 0; const UInt64 buckets;
UInt64 processing_threads_num = 1; const UInt64 processing_threads_num;
String last_processed_path; const String last_processed_path;
ObjectStorageQueueTableMetadata() = default;
ObjectStorageQueueTableMetadata( ObjectStorageQueueTableMetadata(
const StorageObjectStorage::Configuration & configuration,
const ObjectStorageQueueSettings & engine_settings, const ObjectStorageQueueSettings & engine_settings,
const StorageInMemoryMetadata & storage_metadata); const ColumnsDescription & columns_,
const std::string & format_);
explicit ObjectStorageQueueTableMetadata(const Poco::JSON::Object::Ptr & json);
void read(const String & metadata_str);
static ObjectStorageQueueTableMetadata parse(const String & metadata_str); static ObjectStorageQueueTableMetadata parse(const String & metadata_str);
String toString() const; String toString() const;
void checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const; void checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const;
static void checkEquals(const ObjectStorageQueueSettings & current, const ObjectStorageQueueSettings & expected);
private: private:
void checkImmutableFieldsEquals(const ObjectStorageQueueTableMetadata & from_zk) const; void checkImmutableFieldsEquals(const ObjectStorageQueueTableMetadata & from_zk) const;

View File

@ -85,7 +85,10 @@ namespace
} }
} }
std::shared_ptr<ObjectStorageQueueLog> getQueueLog(const ObjectStoragePtr & storage, const ContextPtr & context, const ObjectStorageQueueSettings & table_settings) std::shared_ptr<ObjectStorageQueueLog> getQueueLog(
const ObjectStoragePtr & storage,
const ContextPtr & context,
const ObjectStorageQueueSettings & table_settings)
{ {
const auto & settings = context->getSettingsRef(); const auto & settings = context->getSettingsRef();
switch (storage->getType()) switch (storage->getType())
@ -105,7 +108,6 @@ namespace
default: default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: {}", storage->getType()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: {}", storage->getType());
} }
} }
} }
@ -161,21 +163,14 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
setInMemoryMetadata(storage_metadata); setInMemoryMetadata(storage_metadata);
LOG_INFO(log, "Using zookeeper path: {}", zk_path.string()); LOG_INFO(log, "Using zookeeper path: {}", zk_path.string());
task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); });
/// Get metadata manager from ObjectStorageQueueMetadataFactory, ObjectStorageQueueTableMetadata table_metadata(*queue_settings, storage_metadata.getColumns(), configuration_->format);
/// it will increase the ref count for the metadata object. ObjectStorageQueueMetadata::syncWithKeeper(zk_path, table_metadata, *queue_settings, log);
/// The ref count is decreased when StorageObjectStorageQueue::drop() method is called.
files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, *queue_settings); auto queue_metadata = std::make_unique<ObjectStorageQueueMetadata>(zk_path, std::move(table_metadata), *queue_settings);
try files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata));
{
files_metadata->initialize(configuration_, storage_metadata); task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); });
}
catch (...)
{
ObjectStorageQueueMetadataFactory::instance().remove(zk_path);
throw;
}
} }
void StorageObjectStorageQueue::startup() void StorageObjectStorageQueue::startup()