diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp index 23ac92b667a..e424769b061 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp @@ -33,7 +33,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; extern const int REPLICA_ALREADY_EXISTS; - extern const int INCOMPATIBLE_COLUMNS; } namespace @@ -108,8 +107,12 @@ private: } }; -ObjectStorageQueueMetadata::ObjectStorageQueueMetadata(const fs::path & zookeeper_path_, const ObjectStorageQueueSettings & settings_) +ObjectStorageQueueMetadata::ObjectStorageQueueMetadata( + const fs::path & zookeeper_path_, + const ObjectStorageQueueTableMetadata & table_metadata_, + const ObjectStorageQueueSettings & settings_) : settings(settings_) + , table_metadata(table_metadata_) , zookeeper_path(zookeeper_path_) , buckets_num(getBucketsNum(settings_)) , log(getLogger("StorageObjectStorageQueue(" + zookeeper_path_.string() + ")")) @@ -144,11 +147,6 @@ void ObjectStorageQueueMetadata::shutdown() task->deactivate(); } -void ObjectStorageQueueMetadata::checkSettings(const ObjectStorageQueueSettings & settings_) const -{ - ObjectStorageQueueTableMetadata::checkEquals(settings, settings_); -} - ObjectStorageQueueMetadata::FileStatusPtr ObjectStorageQueueMetadata::getFileStatus(const std::string & path) { return local_file_statuses->get(path, /* create */false); @@ -219,13 +217,14 @@ ObjectStorageQueueMetadata::tryAcquireBucket(const Bucket & bucket, const Proces return ObjectStorageQueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor, log); } -void ObjectStorageQueueMetadata::initialize( - const ConfigurationPtr & configuration, - const StorageInMemoryMetadata & storage_metadata) +void ObjectStorageQueueMetadata::syncWithKeeper( + const fs::path & zookeeper_path, + const ObjectStorageQueueTableMetadata & table_metadata, + const ObjectStorageQueueSettings & settings, + LoggerPtr log) { - const auto metadata_from_table = ObjectStorageQueueTableMetadata(*configuration, settings, storage_metadata); - const auto & columns_from_table = storage_metadata.getColumns(); const auto table_metadata_path = zookeeper_path / "metadata"; + const auto buckets_num = getBucketsNum(settings); const auto metadata_paths = settings.mode == ObjectStorageQueueMode::ORDERED ? ObjectStorageQueueOrderedFileMetadata::getMetadataPaths(buckets_num) : ObjectStorageQueueUnorderedFileMetadata::getMetadataPaths(); @@ -237,24 +236,17 @@ void ObjectStorageQueueMetadata::initialize( { if (zookeeper->exists(table_metadata_path)) { - const auto metadata_from_zk = ObjectStorageQueueTableMetadata::parse(zookeeper->get(fs::path(zookeeper_path) / "metadata")); - const auto columns_from_zk = ColumnsDescription::parse(metadata_from_zk.columns); + const auto metadata_str = zookeeper->get(fs::path(zookeeper_path) / "metadata"); + const auto metadata_from_zk = ObjectStorageQueueTableMetadata::parse(metadata_str); - metadata_from_table.checkEquals(metadata_from_zk); - if (columns_from_zk != columns_from_table) - { - throw Exception( - ErrorCodes::INCOMPATIBLE_COLUMNS, - "Table columns structure in ZooKeeper is different from local table structure. " - "Local columns:\n{}\nZookeeper columns:\n{}", - columns_from_table.toString(), columns_from_zk.toString()); - } + table_metadata.checkEquals(metadata_from_zk); return; } Coordination::Requests requests; requests.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent)); - requests.emplace_back(zkutil::makeCreateRequest(table_metadata_path, metadata_from_table.toString(), zkutil::CreateMode::Persistent)); + requests.emplace_back(zkutil::makeCreateRequest( + table_metadata_path, table_metadata.toString(), zkutil::CreateMode::Persistent)); for (const auto & path : metadata_paths) { @@ -263,16 +255,27 @@ void ObjectStorageQueueMetadata::initialize( } if (!settings.last_processed_path.value.empty()) - getFileMetadata(settings.last_processed_path)->setProcessedAtStartRequests(requests, zookeeper); + { + ObjectStorageQueueOrderedFileMetadata( + zookeeper_path, + settings.last_processed_path, + std::make_shared(), + /* bucket_info */nullptr, + buckets_num, + settings.loading_retries, + log).setProcessedAtStartRequests(requests, zookeeper); + } Coordination::Responses responses; auto code = zookeeper->tryMulti(requests, responses); if (code == Coordination::Error::ZNODEEXISTS) { auto exception = zkutil::KeeperMultiException(code, requests, responses); + LOG_INFO(log, "Got code `{}` for path: {}. " "It looks like the table {} was created by another server at the same moment, " - "will retry", code, exception.getPathForFirstFailedOp(), zookeeper_path.string()); + "will retry", + code, exception.getPathForFirstFailedOp(), zookeeper_path.string()); continue; } else if (code != Coordination::Error::ZOK) diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h index e5fae047ac5..71d26ca7c47 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h @@ -8,6 +8,7 @@ #include #include #include +#include #include namespace fs = std::filesystem; @@ -52,11 +53,19 @@ public: using Bucket = size_t; using Processor = std::string; - ObjectStorageQueueMetadata(const fs::path & zookeeper_path_, const ObjectStorageQueueSettings & settings_); + ObjectStorageQueueMetadata( + const fs::path & zookeeper_path_, + const ObjectStorageQueueTableMetadata & table_metadata_, + const ObjectStorageQueueSettings & settings_); + ~ObjectStorageQueueMetadata(); - void initialize(const ConfigurationPtr & configuration, const StorageInMemoryMetadata & storage_metadata); - void checkSettings(const ObjectStorageQueueSettings & settings) const; + static void syncWithKeeper( + const fs::path & zookeeper_path, + const ObjectStorageQueueTableMetadata & table_metadata, + const ObjectStorageQueueSettings & settings, + LoggerPtr log); + void shutdown(); FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {}); @@ -72,11 +81,17 @@ public: static size_t getBucketsNum(const ObjectStorageQueueSettings & settings); static size_t getBucketsNum(const ObjectStorageQueueTableMetadata & settings); + void checkTableMetadataEquals(const ObjectStorageQueueMetadata & other); + + const ObjectStorageQueueTableMetadata & getTableMetadata() const { return table_metadata; } + ObjectStorageQueueTableMetadata & getTableMetadata() { return table_metadata; } + private: void cleanupThreadFunc(); void cleanupThreadFuncImpl(); - const ObjectStorageQueueSettings settings; + ObjectStorageQueueSettings settings; + ObjectStorageQueueTableMetadata table_metadata; const fs::path zookeeper_path; const size_t buckets_num; @@ -89,4 +104,6 @@ private: std::shared_ptr local_file_statuses; }; +using ObjectStorageQueueMetadataPtr = std::unique_ptr; + } diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.cpp index ffae33d6f41..ba98711eff9 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.cpp @@ -14,19 +14,23 @@ ObjectStorageQueueMetadataFactory & ObjectStorageQueueMetadataFactory::instance( return ret; } -ObjectStorageQueueMetadataFactory::FilesMetadataPtr -ObjectStorageQueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const ObjectStorageQueueSettings & settings) +ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFactory::getOrCreate( + const std::string & zookeeper_path, + ObjectStorageQueueMetadataPtr metadata) { std::lock_guard lock(mutex); auto it = metadata_by_path.find(zookeeper_path); if (it == metadata_by_path.end()) { - auto files_metadata = std::make_shared(zookeeper_path, settings); - it = metadata_by_path.emplace(zookeeper_path, std::move(files_metadata)).first; + it = metadata_by_path.emplace(zookeeper_path, std::move(metadata)).first; } else { - it->second.metadata->checkSettings(settings); + auto & metadata_from_table = metadata->getTableMetadata(); + auto & metadata_from_keeper = it->second.metadata->getTableMetadata(); + + metadata_from_table.checkEquals(metadata_from_keeper); + it->second.ref_count += 1; } return it->second.metadata; diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h index a93f5ee3d83..a9975c526ef 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h @@ -13,7 +13,9 @@ public: static ObjectStorageQueueMetadataFactory & instance(); - FilesMetadataPtr getOrCreate(const std::string & zookeeper_path, const ObjectStorageQueueSettings & settings); + FilesMetadataPtr getOrCreate( + const std::string & zookeeper_path, + ObjectStorageQueueMetadataPtr metadata); void remove(const std::string & zookeeper_path); diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp index cb9cdf8e186..c2f70e3e804 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp @@ -1,6 +1,5 @@ #include -#include #include #include #include @@ -32,18 +31,18 @@ namespace ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata( - const StorageObjectStorage::Configuration & configuration, const ObjectStorageQueueSettings & engine_settings, - const StorageInMemoryMetadata & storage_metadata) + const ColumnsDescription & columns_, + const std::string & format_) + : format_name(format_) + , columns(columns_.toString()) + , after_processing(engine_settings.after_processing.toString()) + , mode(engine_settings.mode.toString()) + , tracked_files_limit(engine_settings.tracked_files_limit) + , tracked_file_ttl_sec(engine_settings.tracked_file_ttl_sec) + , buckets(engine_settings.buckets) + , processing_threads_num(engine_settings.processing_threads_num) { - format_name = configuration.format; - after_processing = engine_settings.after_processing.toString(); - mode = engine_settings.mode.toString(); - tracked_files_limit = engine_settings.tracked_files_limit; - tracked_file_ttl_sec = engine_settings.tracked_file_ttl_sec; - buckets = engine_settings.buckets; - processing_threads_num = engine_settings.processing_threads_num; - columns = storage_metadata.getColumns().toString(); } String ObjectStorageQueueTableMetadata::toString() const @@ -65,48 +64,40 @@ String ObjectStorageQueueTableMetadata::toString() const return oss.str(); } -void ObjectStorageQueueTableMetadata::read(const String & metadata_str) +template +static auto getOrDefault( + const Poco::JSON::Object::Ptr & json, + const std::string & setting, + const std::string & compatibility_prefix, + const T & default_value) { - Poco::JSON::Parser parser; - auto json = parser.parse(metadata_str).extract(); + if (!compatibility_prefix.empty() && json->has(compatibility_prefix + setting)) + return json->getValue(compatibility_prefix + setting); - after_processing = json->getValue("after_processing"); - mode = json->getValue("mode"); + if (json->has(setting)) + return json->getValue(setting); - format_name = json->getValue("format_name"); - columns = json->getValue("columns"); + return default_value; +} - /// Check with "s3queue_" prefix for compatibility. - { - if (json->has("s3queue_tracked_files_limit")) - tracked_files_limit = json->getValue("s3queue_tracked_files_limit"); - if (json->has("s3queue_tracked_file_ttl_sec")) - tracked_file_ttl_sec = json->getValue("s3queue_tracked_file_ttl_sec"); - if (json->has("s3queue_processing_threads_num")) - processing_threads_num = json->getValue("s3queue_processing_threads_num"); - } - - if (json->has("tracked_files_limit")) - tracked_files_limit = json->getValue("tracked_files_limit"); - - if (json->has("tracked_file_ttl_sec")) - tracked_file_ttl_sec = json->getValue("tracked_file_ttl_sec"); - - if (json->has("last_processed_file")) - last_processed_path = json->getValue("last_processed_file"); - - if (json->has("processing_threads_num")) - processing_threads_num = json->getValue("processing_threads_num"); - - if (json->has("buckets")) - buckets = json->getValue("buckets"); +ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(const Poco::JSON::Object::Ptr & json) + : format_name(json->getValue("format_name")) + , columns(json->getValue("columns")) + , after_processing(json->getValue("after_processing")) + , mode(json->getValue("mode")) + , tracked_files_limit(getOrDefault(json, "tracked_files_limit", "s3queue_", 0)) + , tracked_file_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "s3queue_", 0)) + , buckets(getOrDefault(json, "buckets", "", 0)) + , processing_threads_num(getOrDefault(json, "processing_threads_num", "s3queue_", 0)) + , last_processed_path(getOrDefault(json, "last_processed_file", "s3queue_", "")) +{ } ObjectStorageQueueTableMetadata ObjectStorageQueueTableMetadata::parse(const String & metadata_str) { - ObjectStorageQueueTableMetadata metadata; - metadata.read(metadata_str); - return metadata; + Poco::JSON::Parser parser; + auto json = parser.parse(metadata_str).extract(); + return ObjectStorageQueueTableMetadata(json); } void ObjectStorageQueueTableMetadata::checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const @@ -181,72 +172,17 @@ void ObjectStorageQueueTableMetadata::checkImmutableFieldsEquals(const ObjectSto ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in processing buckets. " "Stored in ZooKeeper: {}, local: {}", - ObjectStorageQueueMetadata::getBucketsNum(*this), ObjectStorageQueueMetadata::getBucketsNum(from_zk)); + ObjectStorageQueueMetadata::getBucketsNum(from_zk), ObjectStorageQueueMetadata::getBucketsNum(*this)); } } + + if (columns != from_zk.columns) + throw Exception( + ErrorCodes::METADATA_MISMATCH, + "Existing table metadata in ZooKeeper differs in columns. " + "Stored in ZooKeeper: {}, local: {}", + from_zk.columns, + columns); } -void ObjectStorageQueueTableMetadata::checkEquals(const ObjectStorageQueueSettings & current, const ObjectStorageQueueSettings & expected) -{ - if (current.after_processing != expected.after_processing) - throw Exception( - ErrorCodes::METADATA_MISMATCH, - "Existing table metadata in ZooKeeper differs " - "in action after processing. Stored in ZooKeeper: {}, local: {}", - expected.after_processing.toString(), - current.after_processing.toString()); - - if (current.mode != expected.mode) - throw Exception( - ErrorCodes::METADATA_MISMATCH, - "Existing table metadata in ZooKeeper differs in engine mode. " - "Stored in ZooKeeper: {}, local: {}", - expected.mode.toString(), - current.mode.toString()); - - if (current.tracked_files_limit != expected.tracked_files_limit) - throw Exception( - ErrorCodes::METADATA_MISMATCH, - "Existing table metadata in ZooKeeper differs in max set size. " - "Stored in ZooKeeper: {}, local: {}", - expected.tracked_files_limit, - current.tracked_files_limit); - - if (current.tracked_file_ttl_sec != expected.tracked_file_ttl_sec) - throw Exception( - ErrorCodes::METADATA_MISMATCH, - "Existing table metadata in ZooKeeper differs in max set age. " - "Stored in ZooKeeper: {}, local: {}", - expected.tracked_file_ttl_sec, - current.tracked_file_ttl_sec); - - if (current.last_processed_path.value != expected.last_processed_path.value) - throw Exception( - ErrorCodes::METADATA_MISMATCH, - "Existing table metadata in ZooKeeper differs in last_processed_path. " - "Stored in ZooKeeper: {}, local: {}", - expected.last_processed_path.value, - current.last_processed_path.value); - - if (current.mode == ObjectStorageQueueMode::ORDERED) - { - if (current.buckets != expected.buckets) - { - throw Exception( - ErrorCodes::METADATA_MISMATCH, - "Existing table metadata in ZooKeeper differs in buckets setting. " - "Stored in ZooKeeper: {}, local: {}", - expected.buckets, current.buckets); - } - - if (ObjectStorageQueueMetadata::getBucketsNum(current) != ObjectStorageQueueMetadata::getBucketsNum(expected)) - { - throw Exception( - ErrorCodes::METADATA_MISMATCH, - "Existing table metadata in ZooKeeper differs in processing buckets. " - "Stored in ZooKeeper: {}, local: {}", - ObjectStorageQueueMetadata::getBucketsNum(current), ObjectStorageQueueMetadata::getBucketsNum(expected)); - } - } -} } diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h index bbae06b66c6..d70b859ae1d 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include namespace DB @@ -16,29 +18,28 @@ class ReadBuffer; */ struct ObjectStorageQueueTableMetadata { - String format_name; - String columns; - String after_processing; - String mode; - UInt64 tracked_files_limit = 0; - UInt64 tracked_file_ttl_sec = 0; - UInt64 buckets = 0; - UInt64 processing_threads_num = 1; - String last_processed_path; + const String format_name; + const String columns; + const String after_processing; + const String mode; + const UInt64 tracked_files_limit; + const UInt64 tracked_file_ttl_sec; + const UInt64 buckets; + const UInt64 processing_threads_num; + const String last_processed_path; - ObjectStorageQueueTableMetadata() = default; ObjectStorageQueueTableMetadata( - const StorageObjectStorage::Configuration & configuration, const ObjectStorageQueueSettings & engine_settings, - const StorageInMemoryMetadata & storage_metadata); + const ColumnsDescription & columns_, + const std::string & format_); + + explicit ObjectStorageQueueTableMetadata(const Poco::JSON::Object::Ptr & json); - void read(const String & metadata_str); static ObjectStorageQueueTableMetadata parse(const String & metadata_str); String toString() const; void checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const; - static void checkEquals(const ObjectStorageQueueSettings & current, const ObjectStorageQueueSettings & expected); private: void checkImmutableFieldsEquals(const ObjectStorageQueueTableMetadata & from_zk) const; diff --git a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp index c1ef37e1a48..8f11836a11b 100644 --- a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp +++ b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp @@ -85,7 +85,10 @@ namespace } } - std::shared_ptr getQueueLog(const ObjectStoragePtr & storage, const ContextPtr & context, const ObjectStorageQueueSettings & table_settings) + std::shared_ptr getQueueLog( + const ObjectStoragePtr & storage, + const ContextPtr & context, + const ObjectStorageQueueSettings & table_settings) { const auto & settings = context->getSettingsRef(); switch (storage->getType()) @@ -105,7 +108,6 @@ namespace default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: {}", storage->getType()); } - } } @@ -161,21 +163,14 @@ StorageObjectStorageQueue::StorageObjectStorageQueue( setInMemoryMetadata(storage_metadata); LOG_INFO(log, "Using zookeeper path: {}", zk_path.string()); - task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); }); - /// Get metadata manager from ObjectStorageQueueMetadataFactory, - /// it will increase the ref count for the metadata object. - /// The ref count is decreased when StorageObjectStorageQueue::drop() method is called. - files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, *queue_settings); - try - { - files_metadata->initialize(configuration_, storage_metadata); - } - catch (...) - { - ObjectStorageQueueMetadataFactory::instance().remove(zk_path); - throw; - } + ObjectStorageQueueTableMetadata table_metadata(*queue_settings, storage_metadata.getColumns(), configuration_->format); + ObjectStorageQueueMetadata::syncWithKeeper(zk_path, table_metadata, *queue_settings, log); + + auto queue_metadata = std::make_unique(zk_path, std::move(table_metadata), *queue_settings); + files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata)); + + task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); }); } void StorageObjectStorageQueue::startup()