diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp new file mode 100644 index 00000000000..514baeb7b07 --- /dev/null +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -0,0 +1,351 @@ +#include "IO/VarInt.h" +#include "config.h" + +#if USE_AWS_S3 +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int TIMEOUT_EXCEEDED; +} + +namespace +{ + UInt64 getCurrentTime() + { + return std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + } +} + +void S3QueueFilesMetadata::S3QueueCollection::read(ReadBuffer & in) +{ + files = {}; + if (in.eof()) + return; + + size_t files_num; + in >> files_num >> "\n"; + while (files_num--) + { + TrackedCollectionItem item; + in >> item.file_path >> "\n"; + in >> item.timestamp >> "\n"; + in >> item.retries_count >> "\n"; + in >> item.last_exception >> "\n"; + files.push_back(item); + } +} + +void S3QueueFilesMetadata::S3QueueCollection::write(WriteBuffer & out) const +{ + out << files.size() << "\n"; + for (const auto & processed_file : files) + { + out << processed_file.file_path << "\n"; + out << processed_file.timestamp << "\n"; + out << processed_file.retries_count << "\n"; + out << processed_file.last_exception << "\n"; + } +} + +String S3QueueFilesMetadata::S3QueueCollection::toString() const +{ + WriteBufferFromOwnString out; + write(out); + return out.str(); +} + +S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueCollection::getFileNames() +{ + S3FilesCollection keys = {}; + for (const auto & pair : files) + keys.insert(pair.file_path); + return keys; +} + + +S3QueueFilesMetadata::S3QueueProcessedCollection::S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_) + : max_size(max_size_), max_age(max_age_) +{ +} + +void S3QueueFilesMetadata::S3QueueProcessedCollection::parse(const String & collection_str) +{ + ReadBufferFromString buf(collection_str); + read(buf); + if (max_age > 0) // Remove old items + { + std::erase_if( + files, + [timestamp = getCurrentTime(), this](const TrackedCollectionItem & processed_file) + { return (timestamp - processed_file.timestamp) > max_age; }); + } +} + + +void S3QueueFilesMetadata::S3QueueProcessedCollection::add(const String & file_name) +{ + TrackedCollectionItem processed_file = { .file_path=file_name, .timestamp = getCurrentTime() }; + files.push_back(processed_file); + + /// TODO: it is strange that in parse() we take into account only max_age, but here only max_size. + while (files.size() > max_size) + { + files.pop_front(); + } +} + + +S3QueueFilesMetadata::S3QueueFailedCollection::S3QueueFailedCollection(const UInt64 & max_retries_count_) + : max_retries_count(max_retries_count_) +{ +} + +void S3QueueFilesMetadata::S3QueueFailedCollection::parse(const String & collection_str) +{ + ReadBufferFromString buf(collection_str); + read(buf); +} + + +bool S3QueueFilesMetadata::S3QueueFailedCollection::add(const String & file_name, const String & exception_message) +{ + auto failed_it = std::find_if( + files.begin(), files.end(), + [&file_name](const TrackedCollectionItem & s) { return s.file_path == file_name; }); + + if (failed_it == files.end()) + { + files.emplace_back(file_name, 0, max_retries_count, exception_message); + } + else if (failed_it->retries_count == 0 || --failed_it->retries_count == 0) + { + return false; + } + return true; +} + +S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueFailedCollection::getFileNames() +{ + S3FilesCollection failed_keys; + for (const auto & pair : files) + { + if (pair.retries_count == 0) + failed_keys.insert(pair.file_path); + } + return failed_keys; +} + +void S3QueueFilesMetadata::S3QueueProcessingCollection::parse(const String & collection_str) +{ + ReadBufferFromString rb(collection_str); + Strings result; + readQuoted(result, rb); + files = S3FilesCollection(result.begin(), result.end()); +} + +void S3QueueFilesMetadata::S3QueueProcessingCollection::add(const Strings & file_names) +{ + files.insert(file_names.begin(), file_names.end()); +} + +void S3QueueFilesMetadata::S3QueueProcessingCollection::remove(const String & file_name) +{ + files.erase(file_name); +} + +String S3QueueFilesMetadata::S3QueueProcessingCollection::toString() const +{ + return DB::toString(Strings(files.begin(), files.end())); +} + + +S3QueueFilesMetadata::S3QueueFilesMetadata( + const StorageS3Queue * storage_, + const S3QueueSettings & settings_) + : storage(storage_) + , mode(settings_.mode) + , max_set_size(settings_.s3queue_tracked_files_limit.value) + , max_set_age_sec(settings_.s3queue_tracked_file_ttl_sec.value) + , max_loading_retries(settings_.s3queue_loading_retries.value) + , zookeeper_processing_path(fs::path(storage->getZooKeeperPath()) / "processing") + , zookeeper_processed_path(fs::path(storage->getZooKeeperPath()) / "processed") + , zookeeper_failed_path(fs::path(storage->getZooKeeperPath()) / "failed") + , zookeeper_lock_path(fs::path(storage->getZooKeeperPath()) / "lock") + , log(&Poco::Logger::get("S3QueueFilesMetadata")) +{ +} + +void S3QueueFilesMetadata::setFileProcessed(const String & file_path) +{ + auto zookeeper = storage->getZooKeeper(); + auto lock = acquireLock(zookeeper); + + switch (mode) + { + case S3QueueMode::UNORDERED: + { + S3QueueProcessedCollection processed_files(max_set_size, max_set_age_sec); + processed_files.parse(zookeeper->get(zookeeper_processed_path)); + processed_files.add(file_path); + zookeeper->set(zookeeper_processed_path, processed_files.toString()); + break; + } + case S3QueueMode::ORDERED: + { + // Check that we set in ZooKeeper node only maximum processed file path. + // This check can be useful, when multiple table engines consume in ordered mode. + String max_file = getMaxProcessedFile(); + if (max_file.compare(file_path) <= 0) + zookeeper->set(zookeeper_processed_path, file_path); + break; + } + } + removeProcessingFile(file_path); +} + + +bool S3QueueFilesMetadata::setFileFailed(const String & file_path, const String & exception_message) +{ + auto zookeeper = storage->getZooKeeper(); + auto lock = acquireLock(zookeeper); + + S3QueueFailedCollection failed_collection(max_loading_retries); + failed_collection.parse(zookeeper->get(zookeeper_failed_path)); + const bool can_be_retried = failed_collection.add(file_path, exception_message); + zookeeper->set(zookeeper_failed_path, failed_collection.toString()); + removeProcessingFile(file_path); + return can_be_retried; +} + +S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getFailedFiles() +{ + auto zookeeper = storage->getZooKeeper(); + String failed_files = zookeeper->get(zookeeper_failed_path); + + S3QueueFailedCollection failed_collection(max_loading_retries); + failed_collection.parse(failed_files); + return failed_collection.getFileNames(); +} + +String S3QueueFilesMetadata::getMaxProcessedFile() +{ + auto zookeeper = storage->getZooKeeper(); + return zookeeper->get(zookeeper_processed_path); +} + +S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessingFiles() +{ + auto zookeeper = storage->getZooKeeper(); + String processing_files; + if (!zookeeper->tryGet(zookeeper_processing_path, processing_files)) + return {}; + + S3QueueProcessingCollection processing_collection; + if (!processing_files.empty()) + processing_collection.parse(processing_files); + return processing_collection.getFileNames(); +} + +void S3QueueFilesMetadata::setFilesProcessing(const Strings & file_paths) +{ + auto zookeeper = storage->getZooKeeper(); + String processing_files; + zookeeper->tryGet(zookeeper_processing_path, processing_files); + + S3QueueProcessingCollection processing_collection; + if (!processing_files.empty()) + processing_collection.parse(processing_files); + processing_collection.add(file_paths); + + if (zookeeper->exists(zookeeper_processing_path)) + zookeeper->set(zookeeper_processing_path, processing_collection.toString()); + else + zookeeper->create(zookeeper_processing_path, processing_collection.toString(), zkutil::CreateMode::Ephemeral); +} + +void S3QueueFilesMetadata::removeProcessingFile(const String & file_path) +{ + auto zookeeper = storage->getZooKeeper(); + String processing_files; + zookeeper->tryGet(zookeeper_processing_path, processing_files); + + S3QueueProcessingCollection processing_collection; + processing_collection.parse(processing_files); + processing_collection.remove(file_path); + zookeeper->set(zookeeper_processing_path, processing_collection.toString()); +} + +S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getUnorderedProcessedFiles() +{ + auto zookeeper = storage->getZooKeeper(); + S3QueueProcessedCollection processed_collection(max_set_size, max_set_age_sec); + processed_collection.parse(zookeeper->get(zookeeper_processed_path)); + return processed_collection.getFileNames(); +} + +S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessedFailedAndProcessingFiles() +{ + S3FilesCollection processed_and_failed_files = getFailedFiles(); + switch (mode) + { + case S3QueueMode::UNORDERED: + { + processed_and_failed_files.merge(getUnorderedProcessedFiles()); + break; + } + case S3QueueMode::ORDERED: + { + processed_and_failed_files.insert(getMaxProcessedFile()); + break; + } + } + processed_and_failed_files.merge(getProcessingFiles()); + return processed_and_failed_files; +} + +std::shared_ptr S3QueueFilesMetadata::acquireLock(zkutil::ZooKeeperPtr zookeeper) +{ + UInt32 retry_count = 200; + UInt32 sleep_ms = 100; + UInt32 retries = 0; + + while (true) + { + Coordination::Error code = zookeeper->tryCreate(zookeeper_lock_path, "", zkutil::CreateMode::Ephemeral); + if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS) + { + retries++; + if (retries > retry_count) + { + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Can't acquire zookeeper lock"); + } + sleepForMilliseconds(sleep_ms); + } + else if (code != Coordination::Error::ZOK) + { + throw Coordination::Exception(code, zookeeper_lock_path); + } + else + { + return zkutil::EphemeralNodeHolder::existing(zookeeper_lock_path, *zookeeper); + } + } +} + +} + +#endif diff --git a/src/Storages/S3Queue/S3QueueHolder.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h similarity index 73% rename from src/Storages/S3Queue/S3QueueHolder.h rename to src/Storages/S3Queue/S3QueueFilesMetadata.h index de7f1f56f9a..c436de946ff 100644 --- a/src/Storages/S3Queue/S3QueueHolder.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -9,7 +9,10 @@ namespace DB { -class S3QueueHolder : public WithContext +class StorageS3Queue; +struct S3QueueSettings; + +class S3QueueFilesMetadata { public: struct TrackedCollectionItem @@ -23,27 +26,21 @@ public: using S3FilesCollection = std::unordered_set; using TrackedFiles = std::deque; - S3QueueHolder( - const String & zookeeper_path_, - const S3QueueMode & mode_, - ContextPtr context_, - UInt64 & max_set_size_, - UInt64 & max_set_age_sec_, - UInt64 & max_loading_retries_); + S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_); + void setFilesProcessing(const Strings & file_paths); void setFileProcessed(const String & file_path); bool setFileFailed(const String & file_path, const String & exception_message); - void setFilesProcessing(Strings & file_paths); - S3FilesCollection getProcessedAndFailedFiles(); - String getMaxProcessedFile(); - std::shared_ptr acquireLock(); + S3FilesCollection getProcessedFailedAndProcessingFiles(); + String getMaxProcessedFile(); + std::shared_ptr acquireLock(zkutil::ZooKeeperPtr zookeeper); struct S3QueueCollection { public: virtual ~S3QueueCollection() = default; - String toString() const; + virtual String toString() const; S3FilesCollection getFileNames(); virtual void parse(const String & collection_str) = 0; @@ -82,30 +79,42 @@ public: UInt64 max_retries_count; }; + struct S3QueueProcessingCollection + { + public: + S3QueueProcessingCollection() = default; + + void parse(const String & collection_str); + void add(const Strings & file_names); + void remove(const String & file_name); + + String toString() const; + const S3FilesCollection & getFileNames() const { return files; } + + private: + S3FilesCollection files; + }; private: + const StorageS3Queue * storage; + const S3QueueMode mode; const UInt64 max_set_size; const UInt64 max_set_age_sec; const UInt64 max_loading_retries; - zkutil::ZooKeeperPtr zk_client; - mutable std::mutex current_zookeeper_mutex; - mutable std::mutex mutex; - const String zookeeper_path; - const String zookeeper_failed_path; const String zookeeper_processing_path; const String zookeeper_processed_path; + const String zookeeper_failed_path; const String zookeeper_lock_path; - const S3QueueMode mode; - const UUID table_uuid; + + mutable std::mutex mutex; Poco::Logger * log; S3FilesCollection getFailedFiles(); S3FilesCollection getProcessingFiles(); S3FilesCollection getUnorderedProcessedFiles(); - void removeProcessingFile(const String & file_path); - S3FilesCollection parseCollection(const String & collection_str); + void removeProcessingFile(const String & file_path); }; diff --git a/src/Storages/S3Queue/S3QueueHolder.cpp b/src/Storages/S3Queue/S3QueueHolder.cpp deleted file mode 100644 index 860484da671..00000000000 --- a/src/Storages/S3Queue/S3QueueHolder.cpp +++ /dev/null @@ -1,341 +0,0 @@ -#include "IO/VarInt.h" -#include "config.h" - -#if USE_AWS_S3 -# include -# include -# include -# include -# include -# include -# include -# include -# include - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int TIMEOUT_EXCEEDED; -} - -/// TODO: update zk session if expired - -void S3QueueHolder::S3QueueCollection::read(ReadBuffer & in) -{ - files = {}; - - if (in.eof()) - return; - - size_t files_num; - in >> files_num >> "\n"; - while (files_num--) - { - TrackedCollectionItem item; - in >> item.file_path >> "\n"; - in >> item.timestamp >> "\n"; - in >> item.retries_count >> "\n"; - in >> item.last_exception >> "\n"; - files.push_back(item); - } -} - -void S3QueueHolder::S3QueueCollection::write(WriteBuffer & out) const -{ - out << files.size() << "\n"; - for (const auto & processed_file : files) - { - out << processed_file.file_path << "\n"; - out << processed_file.timestamp << "\n"; - out << processed_file.retries_count << "\n"; - out << processed_file.last_exception << "\n"; - } -} - -String S3QueueHolder::S3QueueCollection::toString() const -{ - WriteBufferFromOwnString out; - write(out); - return out.str(); -} - -S3QueueHolder::S3FilesCollection S3QueueHolder::S3QueueCollection::getFileNames() -{ - S3FilesCollection keys = {}; - for (const auto & pair : files) - { - keys.insert(pair.file_path); - } - return keys; -} - - -S3QueueHolder::S3QueueProcessedCollection::S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_) - : max_size(max_size_), max_age(max_age_) -{ -} - -void S3QueueHolder::S3QueueProcessedCollection::parse(const String & collection_str) -{ - ReadBufferFromString buf(collection_str); - read(buf); - if (max_age > 0) // Remove old items - { - UInt64 timestamp = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - UInt64 max_seconds_diff = max_age; - std::erase_if( - files, - [×tamp, &max_seconds_diff](const TrackedCollectionItem & processed_file) - { return (timestamp - processed_file.timestamp) > max_seconds_diff; }); - } -} - - -void S3QueueHolder::S3QueueProcessedCollection::add(const String & file_name) -{ - UInt64 timestamp = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - - TrackedCollectionItem processed_file = {.file_path=file_name, .timestamp=timestamp}; - files.push_back(processed_file); - - while (files.size() > max_size) - { - files.pop_front(); - } -} - - -S3QueueHolder::S3QueueFailedCollection::S3QueueFailedCollection(const UInt64 & max_retries_count_) : max_retries_count(max_retries_count_) -{ -} - -void S3QueueHolder::S3QueueFailedCollection::parse(const String & collection_str) -{ - ReadBufferFromString buf(collection_str); - read(buf); -} - - -bool S3QueueHolder::S3QueueFailedCollection::add(const String & file_name, const String & exception_message) -{ - auto failed_it - = std::find_if(files.begin(), files.end(), [&file_name](const TrackedCollectionItem & s) { return s.file_path == file_name; }); - if (failed_it != files.end()) - { - if (failed_it->retries_count == 0 || --failed_it->retries_count == 0) - { - return false; - } - } - else - { - TrackedCollectionItem failed_file = { .file_path=file_name, .retries_count=max_retries_count, .last_exception = exception_message }; - files.push_back(failed_file); - } - return true; -} - -S3QueueHolder::S3FilesCollection S3QueueHolder::S3QueueFailedCollection::getFileNames() -{ - S3FilesCollection failed_keys; - for (const auto & pair : files) - { - if (pair.retries_count <= 0) - { - failed_keys.insert(pair.file_path); - } - } - return failed_keys; -} - -S3QueueHolder::S3QueueHolder( - const String & zookeeper_path_, - const S3QueueMode & mode_, - ContextPtr context_, - UInt64 & max_set_size_, - UInt64 & max_set_age_sec_, - UInt64 & max_loading_retries_) - : WithContext(context_) - , max_set_size(max_set_size_) - , max_set_age_sec(max_set_age_sec_) - , max_loading_retries(max_loading_retries_) - , zk_client(getContext()->getZooKeeper()) - , zookeeper_path(zookeeper_path_) - , zookeeper_failed_path(fs::path(zookeeper_path_) / "failed") - , zookeeper_processing_path(fs::path(zookeeper_path_) / "processing") - , zookeeper_processed_path(fs::path(zookeeper_path_) / "processed") - , zookeeper_lock_path(fs::path(zookeeper_path_) / "lock") - , mode(mode_) - , log(&Poco::Logger::get("S3QueueHolder")) -{ -} - - -void S3QueueHolder::setFileProcessed(const String & file_path) -{ - auto lock = acquireLock(); - - if (mode == S3QueueMode::UNORDERED) - { - String processed_files = zk_client->get(zookeeper_processed_path); - auto processed = S3QueueProcessedCollection(max_set_size, max_set_age_sec); - processed.parse(processed_files); - processed.add(file_path); - zk_client->set(zookeeper_processed_path, processed.toString()); - } - else if (mode == S3QueueMode::ORDERED) - { - String max_file = getMaxProcessedFile(); - // Check that we set in ZooKeeper node only maximum processed file path. - // This check can be useful, when multiple table engines consume in ordered mode. - if (max_file.compare(file_path) <= 0) - { - zk_client->set(zookeeper_processed_path, file_path); - } - } - removeProcessingFile(file_path); -} - - -bool S3QueueHolder::setFileFailed(const String & file_path, const String & exception_message) -{ - auto lock = acquireLock(); - - auto failed_collection = S3QueueFailedCollection(max_loading_retries); - failed_collection.parse(zk_client->get(zookeeper_failed_path)); - bool retry_later = failed_collection.add(file_path, exception_message); - - zk_client->set(zookeeper_failed_path, failed_collection.toString()); - removeProcessingFile(file_path); - - return retry_later; -} - -S3QueueHolder::S3FilesCollection S3QueueHolder::getFailedFiles() -{ - String failed_files = zk_client->get(zookeeper_failed_path); - - auto failed_collection = S3QueueFailedCollection(max_loading_retries); - failed_collection.parse(failed_files); - - return failed_collection.getFileNames(); -} - -String S3QueueHolder::getMaxProcessedFile() -{ - String processed = zk_client->get(zookeeper_processed_path); - return processed; -} - -S3QueueHolder::S3FilesCollection S3QueueHolder::getProcessingFiles() -{ - String processing_files; - if (!zk_client->tryGet(zookeeper_processing_path, processing_files)) - return {}; - return parseCollection(processing_files); -} - -void S3QueueHolder::setFilesProcessing(Strings & file_paths) -{ - std::unordered_set processing_files(file_paths.begin(), file_paths.end()); - processing_files.merge(getProcessingFiles()); - String processing_files_str = toString(Strings(processing_files.begin(), processing_files.end())); - - if (zk_client->exists(zookeeper_processing_path)) - zk_client->set(fs::path(zookeeper_processing_path), processing_files_str); - else - zk_client->create(fs::path(zookeeper_processing_path), processing_files_str, zkutil::CreateMode::Ephemeral); -} - -S3QueueHolder::S3FilesCollection S3QueueHolder::getUnorderedProcessedFiles() -{ - String processed = zk_client->get(zookeeper_processed_path); - auto collection = S3QueueProcessedCollection(max_set_size, max_set_age_sec); - collection.parse(processed); - return collection.getFileNames(); -} - -S3QueueHolder::S3FilesCollection S3QueueHolder::getProcessedAndFailedFiles() -{ - S3FilesCollection processed_and_failed_files = getFailedFiles(); - - if (mode == S3QueueMode::UNORDERED) - { - processed_and_failed_files.merge(getUnorderedProcessedFiles()); - } - else - { - String processed = getMaxProcessedFile(); - processed_and_failed_files.insert(processed); - } - - S3FilesCollection processing_files = getProcessingFiles(); - processed_and_failed_files.merge(processing_files); - - return processed_and_failed_files; -} - -void S3QueueHolder::removeProcessingFile(const String & file_path) -{ - String node_data; - String processing = zk_client->get(zookeeper_processing_path); - S3FilesCollection processing_files = parseCollection(processing); - - processing_files.erase(file_path); - - Strings file_paths(processing_files.begin(), processing_files.end()); - zk_client->set(fs::path(zookeeper_processing_path), toString(file_paths)); -} - -std::shared_ptr S3QueueHolder::acquireLock() -{ - UInt32 retry_count = 200; - UInt32 sleep_ms = 100; - - UInt32 retries = 0; - while (true) - { - Coordination::Error code = zk_client->tryCreate(zookeeper_lock_path, "", zkutil::CreateMode::Ephemeral); - if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS) - { - retries++; - if (retries > retry_count) - { - throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Can't acquire zookeeper lock"); - } - sleepForMilliseconds(sleep_ms); - } - else if (code != Coordination::Error::ZOK) - { - throw Coordination::Exception(code, zookeeper_lock_path); - } - else - { - return zkutil::EphemeralNodeHolder::existing(zookeeper_lock_path, *zk_client); - } - } -} - -S3QueueHolder::S3FilesCollection S3QueueHolder::parseCollection(const String & collection_str) -{ - ReadBufferFromString rb(collection_str); - Strings deserialized; - try - { - readQuoted(deserialized, rb); - } - catch (const Exception & e) - { - LOG_WARNING(log, "Can't parse collection from ZooKeeper node: {}", e.displayText()); - deserialized = {}; - } - - return std::unordered_set(deserialized.begin(), deserialized.end()); -} - -} - -#endif diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp index 4785a131f60..57d2d6304b0 100644 --- a/src/Storages/S3Queue/S3QueueSource.cpp +++ b/src/Storages/S3Queue/S3QueueSource.cpp @@ -171,7 +171,7 @@ StorageS3QueueSource::StorageS3QueueSource( const String & bucket_, const String & version_id_, std::shared_ptr file_iterator_, - std::shared_ptr queue_holder_, + std::shared_ptr files_metadata_, const S3QueueAction & action_, const size_t download_thread_num_) : ISource(getHeader(sample_block_, requested_virtual_columns_)) @@ -183,7 +183,7 @@ StorageS3QueueSource::StorageS3QueueSource( , columns_desc(columns_) , request_settings(request_settings_) , client(client_) - , queue_holder(queue_holder_) + , files_metadata(files_metadata_) , requested_virtual_columns(requested_virtual_columns_) , file_iterator(file_iterator_) , action(action_) @@ -259,13 +259,13 @@ Chunk StorageS3QueueSource::generate() catch (const Exception & e) { LOG_ERROR(log, "Exception in chunk pulling: {} ", e.displayText()); - queue_holder->setFileFailed(reader.getFile(), e.message()); + files_metadata->setFileFailed(reader.getFile(), e.message()); success_in_pulling = false; } if (success_in_pulling) { applyActionAfterProcessing(reader.getFile()); - queue_holder->setFileProcessed(reader.getFile()); + files_metadata->setFileProcessed(reader.getFile()); return chunk; } diff --git a/src/Storages/S3Queue/S3QueueSource.h b/src/Storages/S3Queue/S3QueueSource.h index e2e472b5007..a85fce46ad8 100644 --- a/src/Storages/S3Queue/S3QueueSource.h +++ b/src/Storages/S3Queue/S3QueueSource.h @@ -8,7 +8,7 @@ # include # include -# include +# include # include # include @@ -81,7 +81,7 @@ public: const String & bucket, const String & version_id, std::shared_ptr file_iterator_, - std::shared_ptr queue_holder_, + std::shared_ptr files_metadata_, const S3QueueAction & action_, size_t download_thread_num); @@ -101,7 +101,7 @@ private: S3Settings::RequestSettings request_settings; std::shared_ptr client; - std::shared_ptr queue_holder; + std::shared_ptr files_metadata; using ReaderHolder = StorageS3Source::ReaderHolder; ReaderHolder reader; diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index baacb3397c7..87bff398172 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -93,23 +93,21 @@ StorageS3Queue::StorageS3Queue( : IStorage(table_id_) , WithContext(context_) , s3queue_settings(std::move(s3queue_settings_)) - , s3_configuration{configuration_} - , keys({s3_configuration.url.key}) - , mode(s3queue_settings->mode) , after_processing(s3queue_settings->after_processing) - , milliseconds_to_wait(s3queue_settings->s3queue_polling_min_timeout_ms) - , format_name(configuration_.format) - , compression_method(configuration_.compression_method) - , name(s3_configuration.url.storage_name) + , configuration{configuration_} + , reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms) , format_settings(format_settings_) , partition_by(partition_by_) , log(&Poco::Logger::get("StorageS3Queue (" + table_id_.table_name + ")")) { - if (!withGlobs()) - throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue engine can read only from url with globs"); + if (configuration.url.key.ends_with('/')) + configuration.url.key += '*'; - String setting_zookeeper_path = s3queue_settings->keeper_path; - if (setting_zookeeper_path.empty()) + if (!withGlobs()) + throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs"); + + String setting_zk_path = s3queue_settings->keeper_path; + if (setting_zk_path.empty()) { auto database = DatabaseCatalog::instance().getDatabase(table_id_.database_name); bool is_in_replicated_database = database->getEngineName() == "Replicated"; @@ -135,25 +133,25 @@ StorageS3Queue::StorageS3Queue( "s3queue_default_zookeeper_path_prefix not specified"); } - zookeeper_path = zkutil::extractZooKeeperPath( + zk_path = zkutil::extractZooKeeperPath( fs::path(zk_path_prefix) / toString(table_id_.uuid), /* check_starts_with_slash */ true, log); } else { /// We do not add table uuid here on purpose. - zookeeper_path = zkutil::extractZooKeeperPath(s3queue_settings->keeper_path.value, /* check_starts_with_slash */ true, log); + zk_path = zkutil::extractZooKeeperPath(s3queue_settings->keeper_path.value, /* check_starts_with_slash */ true, log); } - LOG_INFO(log, "Using zookeeper path: {}", zookeeper_path); + LOG_INFO(log, "Using zookeeper path: {}", zk_path); - FormatFactory::instance().checkFormatName(format_name); - context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration.url.uri); + FormatFactory::instance().checkFormatName(configuration.format); + context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.url.uri); StorageInMemoryMetadata storage_metadata; - s3_configuration.update(context_); + configuration.update(context_); if (columns_.empty()) { - auto columns = StorageS3::getTableStructureFromDataImpl(s3_configuration, format_settings, context_); + auto columns = StorageS3::getTableStructureFromDataImpl(configuration, format_settings, context_); storage_metadata.setColumns(columns); } else @@ -163,22 +161,15 @@ StorageS3Queue::StorageS3Queue( storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); - setZooKeeper(); auto metadata_snapshot = getInMemoryMetadataPtr(); const bool is_first_replica = createTableIfNotExists(metadata_snapshot); if (!is_first_replica) { - checkTableStructure(zookeeper_path, metadata_snapshot); + checkTableStructure(zk_path, metadata_snapshot); } - queue_holder = std::make_unique( - zookeeper_path, - mode, - getContext(), - s3queue_settings->s3queue_tracked_files_limit.value, - s3queue_settings->s3queue_tracked_file_ttl_sec.value, - s3queue_settings->s3queue_loading_retries.value); + files_metadata = std::make_shared(this, *s3queue_settings); auto default_virtuals = NamesAndTypesList{ {"_path", std::make_shared(std::make_shared())}, @@ -196,12 +187,12 @@ StorageS3Queue::StorageS3Queue( bool StorageS3Queue::supportsSubcolumns() const { - return FormatFactory::instance().checkIfFormatSupportsSubcolumns(format_name); + return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format); } bool StorageS3Queue::supportsSubsetOfColumns() const { - return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name); + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format); } Pipe StorageS3Queue::read( @@ -220,7 +211,7 @@ Pipe StorageS3Queue::read( if (mv_attached) throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageS3Queue with attached materialized views"); - auto query_s3_configuration = updateConfigurationAndGetCopy(local_context); + auto query_configuration = updateConfigurationAndGetCopy(local_context); Pipes pipes; @@ -262,24 +253,23 @@ Pipe StorageS3Queue::read( } const size_t max_download_threads = local_context->getSettingsRef().max_download_threads; - auto zookeeper = getZooKeeper(); return Pipe(std::make_shared( requested_virtual_columns, - format_name, + configuration.format, getName(), block_for_format, local_context, format_settings, columns_description, max_block_size, - query_s3_configuration.request_settings, - compression_method, - query_s3_configuration.client, - query_s3_configuration.url.bucket, - query_s3_configuration.url.version_id, + query_configuration.request_settings, + configuration.compression_method, + query_configuration.client, + query_configuration.url.bucket, + query_configuration.url.version_id, iterator_wrapper, - queue_holder, + files_metadata, after_processing, max_download_threads)); } @@ -387,7 +377,7 @@ void StorageS3Queue::threadFunc() break; } - milliseconds_to_wait = s3queue_settings->s3queue_polling_min_timeout_ms; + reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms; } } } @@ -402,9 +392,9 @@ void StorageS3Queue::threadFunc() { LOG_TRACE(log, "Reschedule S3 Queue thread func."); /// Reschedule with backoff. - if (milliseconds_to_wait < s3queue_settings->s3queue_polling_max_timeout_ms) - milliseconds_to_wait += s3queue_settings->s3queue_polling_backoff_ms; - task->holder->scheduleAfter(milliseconds_to_wait); + if (reschedule_processing_interval_ms < s3queue_settings->s3queue_polling_max_timeout_ms) + reschedule_processing_interval_ms += s3queue_settings->s3queue_polling_backoff_ms; + task->holder->scheduleAfter(reschedule_processing_interval_ms); } } @@ -426,7 +416,7 @@ void StorageS3Queue::streamToViews() auto s3queue_context = Context::createCopy(getContext()); s3queue_context->makeQueryContext(); - auto query_s3_configuration = updateConfigurationAndGetCopy(s3queue_context); + auto query_configuration = updateConfigurationAndGetCopy(s3queue_context); // Create a stream for each consumer and join them in a union stream // Only insert into dependent views and expect that input blocks contain virtual columns @@ -473,23 +463,22 @@ void StorageS3Queue::streamToViews() Pipes pipes; - auto zookeeper = getZooKeeper(); auto pipe = Pipe(std::make_shared( requested_virtual_columns, - format_name, + configuration.format, getName(), block_for_format, s3queue_context, format_settings, columns_description, block_size, - query_s3_configuration.request_settings, - compression_method, - query_s3_configuration.client, - query_s3_configuration.url.bucket, - query_s3_configuration.url.version_id, + query_configuration.request_settings, + configuration.compression_method, + query_configuration.client, + query_configuration.url.bucket, + query_configuration.url.version_id, iterator_wrapper, - queue_holder, + files_metadata, after_processing, max_download_threads)); @@ -505,65 +494,56 @@ void StorageS3Queue::streamToViews() StorageS3Queue::Configuration StorageS3Queue::updateConfigurationAndGetCopy(ContextPtr local_context) { - s3_configuration.update(local_context); - return s3_configuration; -} - -void StorageS3Queue::setZooKeeper() -{ - std::lock_guard lock(current_zookeeper_mutex); - current_zookeeper = getContext()->getZooKeeper(); -} - -zkutil::ZooKeeperPtr StorageS3Queue::tryGetZooKeeper() const -{ - std::lock_guard lock(current_zookeeper_mutex); - return current_zookeeper; + configuration.update(local_context); + return configuration; } zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const { - auto res = tryGetZooKeeper(); - if (!res) - throw Exception(ErrorCodes::NO_ZOOKEEPER, "Cannot get ZooKeeper"); - return res; + std::lock_guard lock{zk_mutex}; + if (!zk_client || zk_client->expired()) + { + zk_client = getContext()->getZooKeeper(); + zk_client->sync(zk_path); + } + return zk_client; } bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot) { auto zookeeper = getZooKeeper(); - zookeeper->createAncestors(zookeeper_path); + zookeeper->createAncestors(zk_path); for (size_t i = 0; i < zk_create_table_retries; ++i) { Coordination::Requests ops; bool is_first_replica = true; - if (zookeeper->exists(zookeeper_path + "/metadata")) + if (zookeeper->exists(zk_path + "/metadata")) { - if (!zookeeper->exists(zookeeper_path + "/processing")) - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processing", "", zkutil::CreateMode::Ephemeral)); - LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zookeeper_path); + if (!zookeeper->exists(zk_path + "/processing")) + ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral)); + LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zk_path); is_first_replica = false; } else { - String metadata_str = S3QueueTableMetadata(s3_configuration, *s3queue_settings).toString(); - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processed", "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/failed", "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processing", "", zkutil::CreateMode::Ephemeral)); + String metadata_str = S3QueueTableMetadata(configuration, *s3queue_settings).toString(); + ops.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processed", "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/failed", "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral)); ops.emplace_back(zkutil::makeCreateRequest( - zookeeper_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent)); + zk_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", metadata_str, zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/metadata", metadata_str, zkutil::CreateMode::Persistent)); } Coordination::Responses responses; auto code = zookeeper->tryMulti(ops, responses); if (code == Coordination::Error::ZNODEEXISTS) { - LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zookeeper_path); + LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path); continue; } else if (code != Coordination::Error::ZOK) @@ -577,7 +557,7 @@ bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_ throw Exception( ErrorCodes::REPLICA_ALREADY_EXISTS, "Cannot create table, because it is created concurrently every time or because " - "of wrong zookeeper_path or because of logical error"); + "of wrong zk_path or because of logical error"); } @@ -588,7 +568,7 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const { auto zookeeper = getZooKeeper(); - S3QueueTableMetadata old_metadata(s3_configuration, *s3queue_settings); + S3QueueTableMetadata old_metadata(configuration, *s3queue_settings); Coordination::Stat metadata_stat; String metadata_str = zookeeper->get(fs::path(zookeeper_prefix) / "metadata", &metadata_stat); @@ -615,39 +595,40 @@ std::shared_ptr StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query) { auto it = std::make_shared( - *s3_configuration.client, - s3_configuration.url, + *configuration.client, + configuration.url, query, virtual_block, local_context, s3queue_settings->s3queue_polling_size.value, - s3_configuration.request_settings); + configuration.request_settings); - auto lock = queue_holder->acquireLock(); - S3QueueHolder::S3FilesCollection files_to_skip = queue_holder->getProcessedAndFailedFiles(); + auto zookeeper = getZooKeeper(); + auto lock = files_metadata->acquireLock(zookeeper); + S3QueueFilesMetadata::S3FilesCollection files_to_skip = files_metadata->getProcessedFailedAndProcessingFiles(); Strings files_to_process; - if (mode == S3QueueMode::UNORDERED) + if (s3queue_settings->mode == S3QueueMode::UNORDERED) { - files_to_process = it->filterProcessingFiles(mode, files_to_skip); + files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip); } else { - String max_processed_file = queue_holder->getMaxProcessedFile(); - files_to_process = it->filterProcessingFiles(mode, files_to_skip, max_processed_file); + String max_processed_file = files_metadata->getMaxProcessedFile(); + files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip, max_processed_file); } LOG_TEST(log, "Found files to process: {}", fmt::join(files_to_process, ", ")); - queue_holder->setFilesProcessing(files_to_process); + files_metadata->setFilesProcessing(files_to_process); return it; } void StorageS3Queue::drop() { - auto zk_client = getZooKeeper(); - if (zk_client->exists(zookeeper_path)) - zk_client->removeRecursive(zookeeper_path); + auto zookeeper = getZooKeeper(); + if (zookeeper->exists(zk_path)) + zookeeper->removeRecursive(zk_path); } void registerStorageS3QueueImpl(const String & name, StorageFactory & factory) diff --git a/src/Storages/S3Queue/StorageS3Queue.h b/src/Storages/S3Queue/StorageS3Queue.h index 6af38058dd8..9737d5fcefa 100644 --- a/src/Storages/S3Queue/StorageS3Queue.h +++ b/src/Storages/S3Queue/StorageS3Queue.h @@ -11,7 +11,7 @@ # include # include -# include +# include # include # include # include @@ -41,6 +41,7 @@ class StorageS3Queue : public IStorage, WithContext { public: using Configuration = typename StorageS3::Configuration; + StorageS3Queue( std::unique_ptr s3queue_settings_, const Configuration & configuration_, @@ -79,35 +80,39 @@ public: bool supportsPartitionBy() const override; - const auto & getFormatName() const { return format_name; } + const auto & getFormatName() const { return configuration.format; } + + const String & getZooKeeperPath() const { return zk_path; } + + zkutil::ZooKeeperPtr getZooKeeper() const; private: - std::unique_ptr s3queue_settings; - std::shared_ptr queue_holder; - Configuration s3_configuration; - std::vector keys; + const std::unique_ptr s3queue_settings; + const S3QueueAction after_processing; + + std::shared_ptr files_metadata; + Configuration configuration; NamesAndTypesList virtual_columns; Block virtual_block; - S3QueueMode mode; - S3QueueAction after_processing; - uint64_t milliseconds_to_wait = 10000; - - String format_name; - String compression_method; - String name; + UInt64 reschedule_processing_interval_ms; std::optional format_settings; ASTPtr partition_by; + String zk_path; + mutable zkutil::ZooKeeperPtr zk_client; + mutable std::mutex zk_mutex; + + std::atomic mv_attached = false; + std::atomic shutdown_called{false}; + Poco::Logger * log; + bool supportsSubcolumns() const override; - bool withGlobs() const { return s3_configuration.url.key.find_first_of("*?{") != std::string::npos; } + bool withGlobs() const { return configuration.url.key.find_first_of("*?{") != std::string::npos; } void threadFunc(); size_t getTableDependentCount() const; - std::atomic mv_attached = false; bool hasDependencies(const StorageID & table_id); - std::atomic shutdown_called{false}; - Poco::Logger * log; void startup() override; void shutdown() override; @@ -122,19 +127,10 @@ private: std::shared_ptr task; bool supportsSubsetOfColumns() const override; - String zookeeper_path; - - zkutil::ZooKeeperPtr current_zookeeper; - mutable std::mutex current_zookeeper_mutex; - - void setZooKeeper(); - zkutil::ZooKeeperPtr tryGetZooKeeper() const; - zkutil::ZooKeeperPtr getZooKeeper() const; const UInt32 zk_create_table_retries = 1000; bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot); void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot); - const String & getZooKeeperPath() const { return zookeeper_path; } using KeysWithInfo = StorageS3QueueSource::KeysWithInfo;