diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index 4624566a517..d0e4d0f88cc 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -1,18 +1,18 @@ -#include "IO/VarInt.h" #include "config.h" #if USE_AWS_S3 -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include - +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace DB { @@ -30,151 +30,6 @@ namespace } } -void S3QueueFilesMetadata::S3QueueCollection::read(ReadBuffer & in) -{ - files = {}; - if (in.eof()) - return; - - size_t files_num; - in >> files_num >> "\n"; - while (files_num--) - { - TrackedCollectionItem item; - in >> item.file_path >> "\n"; - in >> item.timestamp >> "\n"; - in >> item.retries_count >> "\n"; - in >> item.last_exception >> "\n"; - files.push_back(item); - } -} - -void S3QueueFilesMetadata::S3QueueCollection::write(WriteBuffer & out) const -{ - out << files.size() << "\n"; - for (const auto & processed_file : files) - { - out << processed_file.file_path << "\n"; - out << processed_file.timestamp << "\n"; - out << processed_file.retries_count << "\n"; - out << processed_file.last_exception << "\n"; - } -} - -String S3QueueFilesMetadata::S3QueueCollection::toString() const -{ - WriteBufferFromOwnString out; - write(out); - return out.str(); -} - -S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueCollection::getFileNames() -{ - S3FilesCollection keys = {}; - for (const auto & pair : files) - keys.insert(pair.file_path); - return keys; -} - - -S3QueueFilesMetadata::S3QueueProcessedCollection::S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_) - : max_size(max_size_), max_age(max_age_) -{ -} - -void S3QueueFilesMetadata::S3QueueProcessedCollection::parse(const String & collection_str) -{ - ReadBufferFromString buf(collection_str); - read(buf); - if (max_age > 0) // Remove old items - { - std::erase_if( - files, - [timestamp = getCurrentTime(), this](const TrackedCollectionItem & processed_file) - { return (timestamp - processed_file.timestamp) > max_age; }); - } -} - - -void S3QueueFilesMetadata::S3QueueProcessedCollection::add(const String & file_name) -{ - TrackedCollectionItem processed_file; - processed_file.file_path = file_name; - processed_file.timestamp = getCurrentTime(); - files.push_back(processed_file); - - /// TODO: it is strange that in parse() we take into account only max_age, but here only max_size. - while (files.size() > max_size) - { - files.pop_front(); - } -} - - -S3QueueFilesMetadata::S3QueueFailedCollection::S3QueueFailedCollection(const UInt64 & max_retries_count_) - : max_retries_count(max_retries_count_) -{ -} - -void S3QueueFilesMetadata::S3QueueFailedCollection::parse(const String & collection_str) -{ - ReadBufferFromString buf(collection_str); - read(buf); -} - - -bool S3QueueFilesMetadata::S3QueueFailedCollection::add(const String & file_name, const String & exception_message) -{ - auto failed_it = std::find_if( - files.begin(), files.end(), - [&file_name](const TrackedCollectionItem & s) { return s.file_path == file_name; }); - - if (failed_it == files.end()) - { - files.emplace_back(file_name, 0, max_retries_count, exception_message); - } - else if (failed_it->retries_count == 0 || --failed_it->retries_count == 0) - { - return false; - } - return true; -} - -S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueFailedCollection::getFileNames() -{ - S3FilesCollection failed_keys; - for (const auto & pair : files) - { - if (pair.retries_count == 0) - failed_keys.insert(pair.file_path); - } - return failed_keys; -} - -void S3QueueFilesMetadata::S3QueueProcessingCollection::parse(const String & collection_str) -{ - ReadBufferFromString rb(collection_str); - Strings result; - readQuoted(result, rb); - files = S3FilesCollection(result.begin(), result.end()); -} - -void S3QueueFilesMetadata::S3QueueProcessingCollection::add(const Strings & file_names) -{ - files.insert(file_names.begin(), file_names.end()); -} - -void S3QueueFilesMetadata::S3QueueProcessingCollection::remove(const String & file_name) -{ - files.erase(file_name); -} - -String S3QueueFilesMetadata::S3QueueProcessingCollection::toString() const -{ - return DB::toString(Strings(files.begin(), files.end())); -} - - S3QueueFilesMetadata::S3QueueFilesMetadata( const StorageS3Queue * storage_, const S3QueueSettings & settings_) @@ -183,171 +38,273 @@ S3QueueFilesMetadata::S3QueueFilesMetadata( , max_set_size(settings_.s3queue_tracked_files_limit.value) , max_set_age_sec(settings_.s3queue_tracked_file_ttl_sec.value) , max_loading_retries(settings_.s3queue_loading_retries.value) - , zookeeper_processing_path(fs::path(storage->getZooKeeperPath()) / "processing") - , zookeeper_processed_path(fs::path(storage->getZooKeeperPath()) / "processed") - , zookeeper_failed_path(fs::path(storage->getZooKeeperPath()) / "failed") - , zookeeper_lock_path(fs::path(storage->getZooKeeperPath()) / "lock") + , zookeeper_processing_path(storage->getZooKeeperPath() / "processing") + , zookeeper_processed_path(storage->getZooKeeperPath() / "processed") + , zookeeper_failed_path(storage->getZooKeeperPath() / "failed") , log(&Poco::Logger::get("S3QueueFilesMetadata")) { } -void S3QueueFilesMetadata::setFileProcessed(const String & file_path) +std::string S3QueueFilesMetadata::NodeMetadata::toString() const { - auto zookeeper = storage->getZooKeeper(); - auto lock = acquireLock(zookeeper); + Poco::JSON::Object json; + json.set("file_path", file_path); + json.set("last_processed_timestamp", getCurrentTime()); + json.set("last_exception", last_exception); + json.set("retries", retries); + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); +} + +S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::NodeMetadata::fromString(const std::string & metadata_str) +{ + Poco::JSON::Parser parser; + auto json = parser.parse(metadata_str).extract(); + + NodeMetadata metadata; + metadata.file_path = json->getValue("file_path"); + metadata.last_processed_timestamp = json->getValue("last_processed_timestamp"); + metadata.last_exception = json->getValue("last_exception"); + metadata.retries = json->getValue("retries"); + return metadata; +} + +std::string S3QueueFilesMetadata::getNodeName(const std::string & path) +{ + SipHash path_hash; + path_hash.update(path); + return toString(path_hash.get64()); +} + +S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata( + const std::string & path, + const std::string & exception, + size_t retries) +{ + NodeMetadata metadata; + metadata.file_path = path; + metadata.last_processed_timestamp = getCurrentTime(); + metadata.last_exception = exception; + metadata.retries = retries; + return metadata; +} + +bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) +{ switch (mode) { - case S3QueueMode::UNORDERED: - { - S3QueueProcessedCollection processed_files(max_set_size, max_set_age_sec); - processed_files.parse(zookeeper->get(zookeeper_processed_path)); - processed_files.add(file_path); - zookeeper->set(zookeeper_processed_path, processed_files.toString()); - break; - } case S3QueueMode::ORDERED: { - // Check that we set in ZooKeeper node only maximum processed file path. - // This check can be useful, when multiple table engines consume in ordered mode. - String max_file = getMaxProcessedFile(); - if (max_file.compare(file_path) <= 0) - zookeeper->set(zookeeper_processed_path, file_path); - break; + return trySetFileAsProcessingForOrderedMode(path); } - } - removeProcessingFile(file_path); -} - - -bool S3QueueFilesMetadata::setFileFailed(const String & file_path, const String & exception_message) -{ - auto zookeeper = storage->getZooKeeper(); - auto lock = acquireLock(zookeeper); - - S3QueueFailedCollection failed_collection(max_loading_retries); - failed_collection.parse(zookeeper->get(zookeeper_failed_path)); - const bool can_be_retried = failed_collection.add(file_path, exception_message); - zookeeper->set(zookeeper_failed_path, failed_collection.toString()); - removeProcessingFile(file_path); - return can_be_retried; -} - -S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getFailedFiles() -{ - auto zookeeper = storage->getZooKeeper(); - String failed_files = zookeeper->get(zookeeper_failed_path); - - S3QueueFailedCollection failed_collection(max_loading_retries); - failed_collection.parse(failed_files); - return failed_collection.getFileNames(); -} - -String S3QueueFilesMetadata::getMaxProcessedFile() -{ - auto zookeeper = storage->getZooKeeper(); - return zookeeper->get(zookeeper_processed_path); -} - -S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessingFiles() -{ - auto zookeeper = storage->getZooKeeper(); - String processing_files; - if (!zookeeper->tryGet(zookeeper_processing_path, processing_files)) - return {}; - - S3QueueProcessingCollection processing_collection; - if (!processing_files.empty()) - processing_collection.parse(processing_files); - return processing_collection.getFileNames(); -} - -void S3QueueFilesMetadata::setFilesProcessing(const Strings & file_paths) -{ - auto zookeeper = storage->getZooKeeper(); - String processing_files; - zookeeper->tryGet(zookeeper_processing_path, processing_files); - - S3QueueProcessingCollection processing_collection; - if (!processing_files.empty()) - processing_collection.parse(processing_files); - processing_collection.add(file_paths); - - if (zookeeper->exists(zookeeper_processing_path)) - zookeeper->set(zookeeper_processing_path, processing_collection.toString()); - else - zookeeper->create(zookeeper_processing_path, processing_collection.toString(), zkutil::CreateMode::Ephemeral); -} - -void S3QueueFilesMetadata::removeProcessingFile(const String & file_path) -{ - auto zookeeper = storage->getZooKeeper(); - String processing_files; - zookeeper->tryGet(zookeeper_processing_path, processing_files); - - S3QueueProcessingCollection processing_collection; - processing_collection.parse(processing_files); - processing_collection.remove(file_path); - zookeeper->set(zookeeper_processing_path, processing_collection.toString()); -} - -S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getUnorderedProcessedFiles() -{ - auto zookeeper = storage->getZooKeeper(); - S3QueueProcessedCollection processed_collection(max_set_size, max_set_age_sec); - processed_collection.parse(zookeeper->get(zookeeper_processed_path)); - return processed_collection.getFileNames(); -} - -S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessedFailedAndProcessingFiles() -{ - S3FilesCollection processed_and_failed_files = getFailedFiles(); - switch (mode) - { case S3QueueMode::UNORDERED: { - processed_and_failed_files.merge(getUnorderedProcessedFiles()); - break; - } - case S3QueueMode::ORDERED: - { - processed_and_failed_files.insert(getMaxProcessedFile()); - break; + return trySetFileAsProcessingForUnorderedMode(path); } } - processed_and_failed_files.merge(getProcessingFiles()); - return processed_and_failed_files; } -std::shared_ptr S3QueueFilesMetadata::acquireLock(zkutil::ZooKeeperPtr zookeeper) +bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path) { - UInt32 retry_count = 200; - UInt32 sleep_ms = 100; - UInt32 retries = 0; + const auto node_name = getNodeName(path); + const auto node_metadata = createNodeMetadata(path).toString(); + const auto zk_client = storage->getZooKeeper(); + + /// The following requests to the following: + /// If !exists(processed_node) && !exists(failed_node) && !exists(processing_node) => create(processing_node) + Coordination::Requests requests; + /// Check that processed node does not appear. + requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path / node_name, "", zkutil::CreateMode::Persistent)); + requests.push_back(zkutil::makeRemoveRequest(zookeeper_processed_path / node_name, -1)); + /// Check that failed node does not appear. + requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, "", zkutil::CreateMode::Persistent)); + requests.push_back(zkutil::makeRemoveRequest(zookeeper_failed_path / node_name, -1)); + /// Check that processing node does not exist and create if not. + requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata, zkutil::CreateMode::Ephemeral)); + + Coordination::Responses responses; + auto code = zk_client->tryMulti(requests, responses); + return code == Coordination::Error::ZOK; +} + +bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path) +{ + const auto node_name = getNodeName(path); + const auto node_metadata = createNodeMetadata(path).toString(); + const auto zk_client = storage->getZooKeeper(); while (true) { - Coordination::Error code = zookeeper->tryCreate(zookeeper_lock_path, "", zkutil::CreateMode::Ephemeral); - if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS) + Coordination::Requests requests; + zkutil::addCheckNotExistsRequest(requests, zk_client, zookeeper_failed_path / node_name); + zkutil::addCheckNotExistsRequest(requests, zk_client, zookeeper_processing_path / node_name); + requests.push_back(zkutil::makeGetRequest(zookeeper_processed_path)); + + Coordination::Responses responses; + auto code = zk_client->tryMulti(requests, responses); + + if (code != Coordination::Error::ZOK) { - retries++; - if (retries > retry_count) + if (responses[0]->error != Coordination::Error::ZOK + || responses[1]->error != Coordination::Error::ZOK) { - throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Can't acquire zookeeper lock"); + /// Path is already in Failed or Processing. + return false; } - sleepForMilliseconds(sleep_ms); + /// GetRequest for zookeeper_processed_path should never fail, + /// because this is persistent node created at the creation of S3Queue storage. + throw zkutil::KeeperException::fromPath(code, requests.back()->getPath()); } - else if (code != Coordination::Error::ZOK) + + Coordination::Stat processed_node_stat; + NodeMetadata processed_node_metadata; + if (const auto * get_response = dynamic_cast(responses.back().get())) { - throw Coordination::Exception::fromPath(code, zookeeper_lock_path); + processed_node_stat = get_response->stat; + if (!get_response->data.empty()) + processed_node_metadata = NodeMetadata::fromString(get_response->data); + } + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected response type with error: {}", responses.back()->error); + + auto max_processed_file_path = processed_node_metadata.file_path; + if (!max_processed_file_path.empty() && path <= max_processed_file_path) + return false; + + requests.clear(); + zkutil::addCheckNotExistsRequest(requests, *zk_client, zookeeper_failed_path / node_name); + requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata, zkutil::CreateMode::Ephemeral)); + requests.push_back(zkutil::makeCheckRequest(zookeeper_processed_path, processed_node_stat.version)); + + code = zk_client->tryMulti(requests, responses); + if (code == Coordination::Error::ZOK) + return true; + + if (responses[0]->error != Coordination::Error::ZOK + || responses[1]->error != Coordination::Error::ZOK) + { + /// Path is already in Failed or Processing. + return false; + } + /// Max processed path changed. Retry. + } +} + +void S3QueueFilesMetadata::setFileProcessed(const String & path) +{ + switch (mode) + { + case S3QueueMode::ORDERED: + { + return setFileProcessedForOrderedMode(path); + } + case S3QueueMode::UNORDERED: + { + return setFileProcessedForUnorderedMode(path); + } + } +} + +void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(const String & path) +{ + /// List results in s3 are always returned in UTF-8 binary order. + /// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html) + + const auto node_name = getNodeName(path); + const auto node_metadata = createNodeMetadata(path).toString(); + const auto zk_client = storage->getZooKeeper(); + + Coordination::Requests requests; + requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1)); + requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path / node_name, node_metadata, zkutil::CreateMode::Persistent)); + + Coordination::Responses responses; + auto code = zk_client->tryMulti(requests, responses); + if (code == Coordination::Error::ZOK) + return; + + /// TODO this could be because of the expired session. + if (responses[0]->error != Coordination::Error::ZOK) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Attemp to set file as processed but it is not processing"); + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Attemp to set file as processed but it is already processed"); +} + +void S3QueueFilesMetadata::setFileProcessedForOrderedMode(const String & path) +{ + const auto node_name = getNodeName(path); + const auto node_metadata = createNodeMetadata(path).toString(); + const auto zk_client = storage->getZooKeeper(); + + while (true) + { + std::string res; + Coordination::Stat stat; + bool exists = zk_client->tryGet(zookeeper_processed_path, res, &stat); + Coordination::Requests requests; + if (exists) + { + if (!res.empty()) + { + auto metadata = NodeMetadata::fromString(res); + if (metadata.file_path >= path) + return; + } + requests.push_back(zkutil::makeSetRequest(zookeeper_processed_path, node_metadata, stat.version)); } else { - return zkutil::EphemeralNodeHolder::existing(zookeeper_lock_path, *zookeeper); + requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path, node_metadata, zkutil::CreateMode::Persistent)); } + + Coordination::Responses responses; + auto code = zk_client->tryMulti(requests, responses); + if (code == Coordination::Error::ZOK) + return; } } +void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exception_message) +{ + const auto node_name = getNodeName(path); + auto node_metadata = createNodeMetadata(path, exception_message); + const auto zk_client = storage->getZooKeeper(); + + Coordination::Requests requests; + requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1)); + requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, node_metadata.toString(), zkutil::CreateMode::Persistent)); + + Coordination::Responses responses; + auto code = zk_client->tryMulti(requests, responses); + if (code == Coordination::Error::ZOK) + return; + + if (responses[0]->error != Coordination::Error::ZOK) + { + /// TODO this could be because of the expired session. + throw Exception(ErrorCodes::LOGICAL_ERROR, "Attemp to set file as filed but it is not processing"); + } + + Coordination::Stat stat; + auto failed_node_metadata = NodeMetadata::fromString(zk_client->get(zookeeper_failed_path / node_name, &stat)); + node_metadata.retries = failed_node_metadata.retries + 1; + + /// Failed node already exists, update it. + requests.clear(); + requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1)); + requests.push_back(zkutil::makeSetRequest(zookeeper_failed_path / node_name, node_metadata.toString(), stat.version)); + + responses.clear(); + code = zk_client->tryMulti(requests, responses); + if (code == Coordination::Error::ZOK) + return; + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set file as failed"); +} + } #endif diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index 577c71b2227..302feab6028 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -1,102 +1,29 @@ #pragma once +#include "config.h" #if USE_AWS_S3 +#include +#include +#include -# include -# include -# include -# include +namespace fs = std::filesystem; +namespace Poco { class Logger; } namespace DB { -class StorageS3Queue; struct S3QueueSettings; +class StorageS3Queue; class S3QueueFilesMetadata { public: - struct TrackedCollectionItem - { - TrackedCollectionItem() = default; - TrackedCollectionItem(const String & file_path_, UInt64 timestamp_, UInt64 retries_count_, const String & last_exception_) - : file_path(file_path_), timestamp(timestamp_), retries_count(retries_count_), last_exception(last_exception_) {} - String file_path; - UInt64 timestamp = 0; - UInt64 retries_count = 0; - String last_exception; - }; - - using S3FilesCollection = std::unordered_set; - using TrackedFiles = std::deque; - S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_); - void setFilesProcessing(const Strings & file_paths); - void setFileProcessed(const String & file_path); - bool setFileFailed(const String & file_path, const String & exception_message); + bool trySetFileAsProcessing(const std::string & path); - S3FilesCollection getProcessedFailedAndProcessingFiles(); - String getMaxProcessedFile(); - std::shared_ptr acquireLock(zkutil::ZooKeeperPtr zookeeper); + void setFileProcessed(const std::string & path); - struct S3QueueCollection - { - public: - virtual ~S3QueueCollection() = default; - virtual String toString() const; - S3FilesCollection getFileNames(); - - virtual void parse(const String & collection_str) = 0; - - protected: - TrackedFiles files; - - void read(ReadBuffer & in); - void write(WriteBuffer & out) const; - }; - - struct S3QueueProcessedCollection : public S3QueueCollection - { - public: - S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_); - - void parse(const String & collection_str) override; - void add(const String & file_name); - - private: - const UInt64 max_size; - const UInt64 max_age; - }; - - struct S3QueueFailedCollection : S3QueueCollection - { - public: - S3QueueFailedCollection(const UInt64 & max_retries_count_); - - void parse(const String & collection_str) override; - bool add(const String & file_name, const String & exception_message); - - S3FilesCollection getFileNames(); - - private: - UInt64 max_retries_count; - }; - - struct S3QueueProcessingCollection - { - public: - S3QueueProcessingCollection() = default; - - void parse(const String & collection_str); - void add(const Strings & file_names); - void remove(const String & file_name); - - String toString() const; - const S3FilesCollection & getFileNames() const { return files; } - - private: - S3FilesCollection files; - }; + void setFileFailed(const std::string & path, const std::string & exception_message); private: const StorageS3Queue * storage; @@ -105,23 +32,35 @@ private: const UInt64 max_set_age_sec; const UInt64 max_loading_retries; - const String zookeeper_processing_path; - const String zookeeper_processed_path; - const String zookeeper_failed_path; - const String zookeeper_lock_path; + const fs::path zookeeper_processing_path; + const fs::path zookeeper_processed_path; + const fs::path zookeeper_failed_path; mutable std::mutex mutex; Poco::Logger * log; - S3FilesCollection getFailedFiles(); - S3FilesCollection getProcessingFiles(); - S3FilesCollection getUnorderedProcessedFiles(); + bool trySetFileAsProcessingForOrderedMode(const std::string & path); + bool trySetFileAsProcessingForUnorderedMode(const std::string & path); - void removeProcessingFile(const String & file_path); + void setFileProcessedForOrderedMode(const std::string & path); + void setFileProcessedForUnorderedMode(const std::string & path); + + std::string getNodeName(const std::string & path); + + struct NodeMetadata + { + std::string file_path; + UInt64 last_processed_timestamp = 0; + std::string last_exception; + UInt64 retries = 0; + + std::string toString() const; + static NodeMetadata fromString(const std::string & metadata_str); + }; + + NodeMetadata createNodeMetadata(const std::string & path, const std::string & exception = "", size_t retries = 0); }; - } - #endif diff --git a/src/Storages/S3Queue/S3QueueSettings.cpp b/src/Storages/S3Queue/S3QueueSettings.cpp index b74cf8d39bb..cb312adc5d9 100644 --- a/src/Storages/S3Queue/S3QueueSettings.cpp +++ b/src/Storages/S3Queue/S3QueueSettings.cpp @@ -1,8 +1,8 @@ +#include +#include #include #include #include -#include -#include namespace DB diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp index 54a863aeb2c..6704345ea59 100644 --- a/src/Storages/S3Queue/S3QueueSource.cpp +++ b/src/Storages/S3Queue/S3QueueSource.cpp @@ -1,47 +1,12 @@ -#include -#include -#include -#include "IO/ParallelReadBuffer.h" -#include "Parsers/ASTCreateQuery.h" #include "config.h" #if USE_AWS_S3 - -# include - -# include - -# include -# include - -# include - -# include -# include - -# include -# include -# include -# include -# include -# include - -# include - -# include -# include -# include - -# include - -# include - -# include -# include -# include - -# include -# include +#include +#include +#include +#include +#include +#include namespace CurrentMetrics @@ -64,138 +29,43 @@ namespace ErrorCodes extern const int S3_ERROR; } - -StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator( - const S3::Client & client_, - const S3::URI & globbed_uri_, - ASTPtr query, - const NamesAndTypesList & virtual_columns, - ContextPtr context, - UInt64 & max_poll_size_, - const S3Settings::RequestSettings & request_settings_) - : max_poll_size(max_poll_size_) - , glob_iterator(std::make_unique( - client_, globbed_uri_, query, virtual_columns, context, nullptr, request_settings_)) +StorageS3QueueSource::FileIterator::FileIterator( + std::shared_ptr metadata_, std::unique_ptr glob_iterator_) + : metadata(metadata_) , glob_iterator(std::move(glob_iterator_)) +{ +} + +StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::FileIterator::next() { - /// todo(kssenii): remove this loop, it should not be here while (true) { KeyWithInfo val = glob_iterator->next(); if (val.key.empty()) - break; - keys_buf.push_back(val); + return {}; + if (metadata->trySetFileAsProcessing(val.key)) + return val; } } -Strings StorageS3QueueSource::QueueGlobIterator::filterProcessingFiles( - const S3QueueMode & engine_mode, std::unordered_set & exclude_keys, const String & max_file) -{ - for (const KeyWithInfo & val : keys_buf) - { - auto full_path = val.key; - if (exclude_keys.find(full_path) != exclude_keys.end()) - { - LOG_TEST(log, "File {} will be skipped, because it was found in exclude files list " - "(either already processed or failed to be processed)", val.key); - continue; - } - - if ((engine_mode == S3QueueMode::ORDERED) && (full_path.compare(max_file) <= 0)) - continue; - - if ((processing_keys.size() < max_poll_size) || (engine_mode == S3QueueMode::ORDERED)) - { - processing_keys.push_back(val); - } - else - { - break; - } - } - - if (engine_mode == S3QueueMode::ORDERED) - { - std::sort( - processing_keys.begin(), - processing_keys.end(), - [](const KeyWithInfo & lhs, const KeyWithInfo & rhs) { return lhs.key.compare(rhs.key) < 0; }); - - if (processing_keys.size() > max_poll_size) - { - processing_keys.erase(processing_keys.begin() + max_poll_size, processing_keys.end()); - } - } - - Strings keys; - for (const auto & key_info : processing_keys) - keys.push_back(key_info.key); - - processing_keys.push_back(KeyWithInfo()); - processing_iterator = processing_keys.begin(); - return keys; -} - - -StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::QueueGlobIterator::next() -{ - std::lock_guard lock(mutex); - if (processing_iterator != processing_keys.end()) - { - return *processing_iterator++; - } - - return KeyWithInfo(); -} - StorageS3QueueSource::StorageS3QueueSource( - const ReadFromFormatInfo & info, - const String & format_, String name_, - ContextPtr context_, - std::optional format_settings_, - UInt64 max_block_size_, - const S3Settings::RequestSettings & request_settings_, - String compression_hint_, - const std::shared_ptr & client_, - const String & bucket_, - const String & version_id_, - const String & url_host_and_port, - std::shared_ptr file_iterator_, + const Block & header_, + std::unique_ptr internal_source_, std::shared_ptr files_metadata_, const S3QueueAction & action_, - const size_t download_thread_num_) - : ISource(info.source_header) + RemoveFileFunc remove_file_func_, + const NamesAndTypesList & requested_virtual_columns_, + ContextPtr context_) + : ISource(header_) , WithContext(context_) , name(std::move(name_)) - , bucket(bucket_) - , version_id(version_id_) - , format(format_) - , columns_desc(info.columns_description) - , request_settings(request_settings_) - , client(client_) - , files_metadata(files_metadata_) - , requested_virtual_columns(info.requested_virtual_columns) - , requested_columns(info.requested_columns) - , file_iterator(file_iterator_) , action(action_) + , files_metadata(files_metadata_) + , internal_source(std::move(internal_source_)) + , requested_virtual_columns(requested_virtual_columns_) + , remove_file_func(remove_file_func_) + , log(&Poco::Logger::get("StorageS3QueueSource")) { - internal_source = std::make_shared( - info, - format_, - name_, - context_, - format_settings_, - max_block_size_, - request_settings_, - compression_hint_, - client_, - bucket_, - version_id_, - url_host_and_port, - file_iterator, - download_thread_num_, - false, - /* query_info */ std::nullopt); reader = std::move(internal_source->reader); if (reader) reader_future = std::move(internal_source->reader_future); @@ -213,7 +83,6 @@ String StorageS3QueueSource::getName() const Chunk StorageS3QueueSource::generate() { - auto file_progress = getContext()->getFileProgressCallback(); while (true) { if (isCancelled() || !reader) @@ -223,46 +92,27 @@ Chunk StorageS3QueueSource::generate() break; } - Chunk chunk; - bool success_in_pulling = false; try { + Chunk chunk; if (reader->pull(chunk)) { - UInt64 num_rows = chunk.getNumRows(); - auto file_path = reader.getPath(); - - for (const auto & virtual_column : requested_virtual_columns) - { - if (virtual_column.name == "_path") - { - chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst()); - } - else if (virtual_column.name == "_file") - { - size_t last_slash_pos = file_path.find_last_of('/'); - auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1)); - chunk.addColumn(column->convertToFullColumnIfConst()); - } - } - success_in_pulling = true; + LOG_TEST(log, "Read {} rows from file: {}", chunk.getNumRows(), reader.getPath()); + VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath()); + return chunk; } } catch (const Exception & e) { LOG_ERROR(log, "Exception in chunk pulling: {} ", e.displayText()); files_metadata->setFileFailed(reader.getFile(), e.message()); - success_in_pulling = false; - } - if (success_in_pulling) - { - applyActionAfterProcessing(reader.getFile()); - files_metadata->setFileProcessed(reader.getFile()); - return chunk; + throw; } + files_metadata->setFileProcessed(reader.getFile()); + applyActionAfterProcessing(reader.getFile()); - assert(reader_future.valid()); + chassert(reader_future.valid()); reader = reader_future.get(); if (!reader) @@ -277,37 +127,21 @@ Chunk StorageS3QueueSource::generate() return {}; } - -void StorageS3QueueSource::applyActionAfterProcessing(const String & file_path) +void StorageS3QueueSource::applyActionAfterProcessing(const String & path) { switch (action) { case S3QueueAction::DELETE: - deleteProcessedObject(file_path); + { + assert(remove_file_func); + remove_file_func(path); break; + } case S3QueueAction::KEEP: break; } } -void StorageS3QueueSource::deleteProcessedObject(const String & file_path) -{ - LOG_INFO(log, "Delete processed file {} from bucket {}", file_path, bucket); - - S3::DeleteObjectRequest request; - request.WithKey(file_path).WithBucket(bucket); - auto outcome = client->DeleteObject(request); - if (!outcome.IsSuccess()) - { - const auto & err = outcome.GetError(); - LOG_ERROR(log, "{} (Code: {})", err.GetMessage(), static_cast(err.GetErrorType())); - } - else - { - LOG_TRACE(log, "Object with path {} was removed from S3", file_path); - } -} - } #endif diff --git a/src/Storages/S3Queue/S3QueueSource.h b/src/Storages/S3Queue/S3QueueSource.h index f89384fb096..1ec762d6477 100644 --- a/src/Storages/S3Queue/S3QueueSource.h +++ b/src/Storages/S3Queue/S3QueueSource.h @@ -2,29 +2,13 @@ #include "config.h" #if USE_AWS_S3 +#include +#include +#include +#include -# include - -# include - -# include -# include -# include -# include -# include - -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include +namespace Poco { class Logger; } namespace DB { @@ -34,56 +18,37 @@ class StorageS3QueueSource : public ISource, WithContext { public: using IIterator = StorageS3Source::IIterator; - using DisclosedGlobIterator = StorageS3Source::DisclosedGlobIterator; - using KeysWithInfo = StorageS3Source::KeysWithInfo; + using GlobIterator = StorageS3Source::DisclosedGlobIterator; using KeyWithInfo = StorageS3Source::KeyWithInfo; - class QueueGlobIterator : public IIterator + using ZooKeeperGetter = std::function; + using RemoveFileFunc = std::function; + + class FileIterator : public IIterator { public: - QueueGlobIterator( - const S3::Client & client_, - const S3::URI & globbed_uri_, - ASTPtr query, - const NamesAndTypesList & virtual_columns, - ContextPtr context, - UInt64 & max_poll_size_, - const S3Settings::RequestSettings & request_settings_ = {}); + FileIterator( + std::shared_ptr metadata_, + std::unique_ptr glob_iterator_); KeyWithInfo next() override; - Strings - filterProcessingFiles(const S3QueueMode & engine_mode, std::unordered_set & exclude_keys, const String & max_file = ""); - private: - UInt64 max_poll_size; - KeysWithInfo keys_buf; - KeysWithInfo processing_keys; - mutable std::mutex mutex; - std::unique_ptr glob_iterator; - std::vector::iterator processing_iterator; - - Poco::Logger * log = &Poco::Logger::get("StorageS3QueueSourceIterator"); + const std::shared_ptr metadata; + const std::unique_ptr glob_iterator; + std::mutex mutex; }; static Block getHeader(Block sample_block, const std::vector & requested_virtual_columns); StorageS3QueueSource( - const ReadFromFormatInfo & info, - const String & format, String name_, - ContextPtr context_, - std::optional format_settings_, - UInt64 max_block_size_, - const S3Settings::RequestSettings & request_settings_, - String compression_hint_, - const std::shared_ptr & client_, - const String & bucket, - const String & version_id, - const String & url_host_and_port, - std::shared_ptr file_iterator_, + const Block & header_, + std::unique_ptr internal_source_, std::shared_ptr files_metadata_, const S3QueueAction & action_, - size_t download_thread_num); + RemoveFileFunc remove_file_func_, + const NamesAndTypesList & requested_virtual_columns_, + ContextPtr context_); ~StorageS3QueueSource() override; @@ -91,34 +56,21 @@ public: Chunk generate() override; - private: - String name; - String bucket; - String version_id; - String format; - ColumnsDescription columns_desc; - S3Settings::RequestSettings request_settings; - std::shared_ptr client; + const String name; + const S3QueueAction action; + const std::shared_ptr files_metadata; + const std::shared_ptr internal_source; + const NamesAndTypesList requested_virtual_columns; + + RemoveFileFunc remove_file_func; + Poco::Logger * log; - std::shared_ptr files_metadata; using ReaderHolder = StorageS3Source::ReaderHolder; ReaderHolder reader; - - NamesAndTypesList requested_virtual_columns; - NamesAndTypesList requested_columns; - std::shared_ptr file_iterator; - const S3QueueAction action; - - Poco::Logger * log = &Poco::Logger::get("StorageS3QueueSource"); - std::future reader_future; - mutable std::mutex mutex; - - std::shared_ptr internal_source; - void deleteProcessedObject(const String & file_path); - void applyActionAfterProcessing(const String & file_path); + void applyActionAfterProcessing(const String & path); }; } diff --git a/src/Storages/S3Queue/S3QueueTableMetadata.cpp b/src/Storages/S3Queue/S3QueueTableMetadata.cpp index 23eebb6ded9..f9c89f4d87d 100644 --- a/src/Storages/S3Queue/S3QueueTableMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueTableMetadata.cpp @@ -2,12 +2,12 @@ #if USE_AWS_S3 -# include -# include -# include -# include -# include -# include +#include +#include +#include +#include +#include +#include namespace DB @@ -18,13 +18,17 @@ namespace ErrorCodes extern const int METADATA_MISMATCH; } -S3QueueTableMetadata::S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings) +S3QueueTableMetadata::S3QueueTableMetadata( + const StorageS3::Configuration & configuration, + const S3QueueSettings & engine_settings, + const StorageInMemoryMetadata & storage_metadata) { format_name = configuration.format; after_processing = engine_settings.after_processing.toString(); mode = engine_settings.mode.toString(); s3queue_tracked_files_limit = engine_settings.s3queue_tracked_files_limit; s3queue_tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec; + columns = storage_metadata.getColumns().toString(); } @@ -36,6 +40,7 @@ String S3QueueTableMetadata::toString() const json.set("s3queue_tracked_files_limit", s3queue_tracked_files_limit); json.set("s3queue_tracked_file_ttl_sec", s3queue_tracked_file_ttl_sec); json.set("format_name", format_name); + json.set("columns", columns); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); @@ -52,6 +57,7 @@ void S3QueueTableMetadata::read(const String & metadata_str) s3queue_tracked_files_limit = json->getValue("s3queue_tracked_files_limit"); s3queue_tracked_file_ttl_sec = json->getValue("s3queue_tracked_file_ttl_sec"); format_name = json->getValue("format_name"); + columns = json->getValue("columns"); } S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str) diff --git a/src/Storages/S3Queue/S3QueueTableMetadata.h b/src/Storages/S3Queue/S3QueueTableMetadata.h index 4b6fbc54825..f15665692c4 100644 --- a/src/Storages/S3Queue/S3QueueTableMetadata.h +++ b/src/Storages/S3Queue/S3QueueTableMetadata.h @@ -2,9 +2,9 @@ #if USE_AWS_S3 -# include -# include -# include +#include +#include +#include namespace DB { @@ -18,13 +18,14 @@ class ReadBuffer; struct S3QueueTableMetadata { String format_name; + String columns; String after_processing; String mode; UInt64 s3queue_tracked_files_limit; UInt64 s3queue_tracked_file_ttl_sec; S3QueueTableMetadata() = default; - S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings); + S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings, const StorageInMemoryMetadata & storage_metadata); void read(const String & metadata_str); static S3QueueTableMetadata parse(const String & metadata_str); diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index 08cbff96cd0..be71af24601 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -2,69 +2,38 @@ #if USE_AWS_S3 +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include -# include -# include -# include -# include -# include -# include -# include -# include -# include "IO/ParallelReadBuffer.h" - -# include - -# include - -# include - -# include -# include - -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include - - -# include - -# include -# include -# include - -# include - -# include - -# include - -# include -# include -# include -# include namespace fs = std::filesystem; namespace ProfileEvents { -extern const Event S3DeleteObjects; -extern const Event S3ListObjects; + extern const Event S3DeleteObjects; + extern const Event S3ListObjects; } namespace DB { -static const String PARTITION_ID_WILDCARD = "{_partition_id}"; static const auto MAX_THREAD_WORK_DURATION_MS = 60000; namespace ErrorCodes @@ -78,6 +47,33 @@ namespace ErrorCodes extern const int INCOMPATIBLE_COLUMNS; } +namespace +{ + bool containsGlobs(const S3::URI & url) + { + return url.key.find_first_of("*?{") != std::string::npos; + } + + std::string chooseZooKeeperPath(const StorageID & table_id, const Settings & settings, const S3QueueSettings & s3queue_settings) + { + std::string zk_path_prefix = settings.s3queue_default_zookeeper_path.value; + if (zk_path_prefix.empty()) + zk_path_prefix = "/"; + + std::string result_zk_path; + if (s3queue_settings.keeper_path.changed) + { + /// We do not add table uuid here on purpose. + result_zk_path = fs::path(zk_path_prefix) / s3queue_settings.keeper_path.value; + } + else + { + auto database_uuid = DatabaseCatalog::instance().getDatabase(table_id.database_name)->getUUID(); + result_zk_path = fs::path(zk_path_prefix) / toString(database_uuid) / toString(table_id.uuid); + } + return zkutil::extractZooKeeperPath(result_zk_path, true); + } +} StorageS3Queue::StorageS3Queue( std::unique_ptr s3queue_settings_, @@ -87,79 +83,64 @@ StorageS3Queue::StorageS3Queue( const ConstraintsDescription & constraints_, const String & comment, ContextPtr context_, - std::optional format_settings_, - ASTPtr partition_by_) + std::optional format_settings_) : IStorage(table_id_) , WithContext(context_) , s3queue_settings(std::move(s3queue_settings_)) + , zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *s3queue_settings)) , after_processing(s3queue_settings->after_processing) + , files_metadata(std::make_shared(this, *s3queue_settings)) , configuration{configuration_} - , reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms) , format_settings(format_settings_) - , partition_by(partition_by_) + , reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms) , log(&Poco::Logger::get("StorageS3Queue (" + table_id_.table_name + ")")) { if (configuration.url.key.ends_with('/')) + { configuration.url.key += '*'; - - if (!withGlobs()) + } + else if (!containsGlobs(configuration.url)) + { throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs"); - - std::string zk_path_prefix = getContext()->getSettingsRef().s3queue_default_zookeeper_path.value; - if (zk_path_prefix.empty()) - zk_path_prefix = "/"; - - std::string result_zk_path; - if (s3queue_settings->keeper_path.changed) - { - /// We do not add table uuid here on purpose. - result_zk_path = fs::path(zk_path_prefix) / s3queue_settings->keeper_path.value; - } - else - { - auto database_uuid = DatabaseCatalog::instance().getDatabase(table_id_.database_name)->getUUID(); - result_zk_path = fs::path(zk_path_prefix) / toString(database_uuid) / toString(table_id_.uuid); } - zk_path = zkutil::extractZooKeeperPath(result_zk_path, true/* check_starts_with_slash */, log); - LOG_INFO(log, "Using zookeeper path: {}", zk_path); - - FormatFactory::instance().checkFormatName(configuration.format); - context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.url.uri); - StorageInMemoryMetadata storage_metadata; configuration.update(context_); + FormatFactory::instance().checkFormatName(configuration.format); + context_->getRemoteHostFilter().checkURL(configuration.url.uri); + StorageInMemoryMetadata storage_metadata; if (columns_.empty()) { auto columns = StorageS3::getTableStructureFromDataImpl(configuration, format_settings, context_); storage_metadata.setColumns(columns); } else + { storage_metadata.setColumns(columns_); - + } storage_metadata.setConstraints(constraints_); storage_metadata.setComment(comment); + + createOrCheckMetadata(storage_metadata); setInMemoryMetadata(storage_metadata); - auto metadata_snapshot = getInMemoryMetadataPtr(); - const bool is_first_replica = createTableIfNotExists(metadata_snapshot); - - if (!is_first_replica) - { - checkTableStructure(zk_path, metadata_snapshot); - } - - files_metadata = std::make_shared(this, *s3queue_settings); virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); + task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); }); - auto poll_thread = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); }); - task = std::make_shared(std::move(poll_thread)); + LOG_INFO(log, "Using zookeeper path: {}", zk_path.string()); } - -bool StorageS3Queue::supportsSubcolumns() const +void StorageS3Queue::startup() { - return true; + if (task) + task->activateAndSchedule(); +} + +void StorageS3Queue::shutdown() +{ + shutdown_called = true; + if (task) + task->deactivate(); } bool StorageS3Queue::supportsSubsetOfColumns() const @@ -177,80 +158,62 @@ Pipe StorageS3Queue::read( size_t /* num_streams */) { if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select) - throw Exception( - ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`"); + { + throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. " + "To enable use setting `stream_like_engine_allow_direct_select`"); + } if (mv_attached) - throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageS3Queue with attached materialized views"); + { + throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, + "Cannot read from {} with attached materialized views", getName()); + } - auto query_configuration = updateConfigurationAndGetCopy(local_context); - - std::shared_ptr iterator_wrapper = createFileIterator(local_context, query_info.query); + Pipes pipes; + pipes.emplace_back(createSource(column_names, storage_snapshot, query_info.query, max_block_size, local_context)); + return Pipe::unitePipes(std::move(pipes)); +} +std::shared_ptr StorageS3Queue::createSource( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + ASTPtr query, + size_t max_block_size, + ContextPtr local_context) +{ + auto configuration_snapshot = updateConfigurationAndGetCopy(local_context); + auto file_iterator = createFileIterator(local_context, query); auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals()); - const size_t max_download_threads = local_context->getSettingsRef().max_download_threads; - - return Pipe(std::make_shared( - read_from_format_info, - configuration.format, - getName(), - local_context, - format_settings, + auto internal_source = std::make_unique( + read_from_format_info, configuration.format, getName(), local_context, format_settings, max_block_size, - query_configuration.request_settings, - configuration.compression_method, - query_configuration.client, - query_configuration.url.bucket, - query_configuration.url.version_id, - query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()), - iterator_wrapper, - files_metadata, - after_processing, - max_download_threads)); -} + configuration_snapshot.request_settings, + configuration_snapshot.compression_method, + configuration_snapshot.client, + configuration_snapshot.url.bucket, + configuration_snapshot.url.version_id, + configuration_snapshot.url.uri.getHost() + std::to_string(configuration_snapshot.url.uri.getPort()), + file_iterator, local_context->getSettingsRef().max_download_threads, false, /* query_info */ std::nullopt); -SinkToStoragePtr StorageS3Queue::write(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, bool) -{ - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Write is not supported by storage {}", getName()); -} - -void StorageS3Queue::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) -{ - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName()); -} - -NamesAndTypesList StorageS3Queue::getVirtuals() const -{ - return virtual_columns; -} - -bool StorageS3Queue::supportsPartitionBy() const -{ - return true; -} - -void StorageS3Queue::startup() -{ - if (task) - task->holder->activateAndSchedule(); -} - -void StorageS3Queue::shutdown() -{ - shutdown_called = true; - if (task) + auto file_deleter = [this, bucket = configuration_snapshot.url.bucket, client = configuration_snapshot.client](const std::string & path) { - task->stream_cancelled = true; - task->holder->deactivate(); - } -} - -size_t StorageS3Queue::getTableDependentCount() const -{ - auto table_id = getStorageID(); - // Check if at least one direct dependency is attached - return DatabaseCatalog::instance().getDependentViews(table_id).size(); + S3::DeleteObjectRequest request; + request.WithKey(path).WithBucket(bucket); + auto outcome = client->DeleteObject(request); + if (!outcome.IsSuccess()) + { + const auto & err = outcome.GetError(); + LOG_ERROR(log, "{} (Code: {})", err.GetMessage(), static_cast(err.GetErrorType())); + } + else + { + LOG_TRACE(log, "Object with path {} was removed from S3", path); + } + }; + return std::make_shared( + getName(), read_from_format_info.source_header, std::move(internal_source), + files_metadata, after_processing, file_deleter, read_from_format_info.requested_virtual_columns, local_context); } bool StorageS3Queue::hasDependencies(const StorageID & table_id) @@ -280,40 +243,33 @@ bool StorageS3Queue::hasDependencies(const StorageID & table_id) void StorageS3Queue::threadFunc() { - bool reschedule = true; + SCOPE_EXIT({ mv_attached.store(false); }); try { auto table_id = getStorageID(); - - auto dependencies_count = getTableDependentCount(); + size_t dependencies_count = DatabaseCatalog::instance().getDependentViews(table_id).size(); if (dependencies_count) { auto start_time = std::chrono::steady_clock::now(); - + /// Reset reschedule interval. + reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms; + /// Disallow parallel selects while streaming to mv. mv_attached.store(true); - // Keep streaming as long as there are attached views and streaming is not cancelled - while (!task->stream_cancelled) - { - if (!hasDependencies(table_id)) - { - /// For this case, we can not wait for watch thread to wake up - reschedule = true; - break; - } + /// Keep streaming as long as there are attached views and streaming is not cancelled + while (!shutdown_called && hasDependencies(table_id)) + { LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count); + streamToViews(); - auto ts = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast(ts - start_time); + auto now = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(now - start_time); if (duration.count() > MAX_THREAD_WORK_DURATION_MS) { LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule."); - reschedule = true; break; } - - reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms; } } } @@ -322,19 +278,16 @@ void StorageS3Queue::threadFunc() tryLogCurrentException(__PRETTY_FUNCTION__); } - mv_attached.store(false); - - if (reschedule && !shutdown_called) + if (!shutdown_called) { - LOG_TRACE(log, "Reschedule S3 Queue thread func."); - /// Reschedule with backoff. if (reschedule_processing_interval_ms < s3queue_settings->s3queue_polling_max_timeout_ms) reschedule_processing_interval_ms += s3queue_settings->s3queue_polling_backoff_ms; - task->holder->scheduleAfter(reschedule_processing_interval_ms); + + LOG_TRACE(log, "Reschedule S3 Queue processing thread in {} ms", reschedule_processing_interval_ms); + task->scheduleAfter(reschedule_processing_interval_ms); } } - void StorageS3Queue::streamToViews() { auto table_id = getStorageID(); @@ -348,8 +301,6 @@ void StorageS3Queue::streamToViews() auto insert = std::make_shared(); insert->table_id = table_id; - size_t block_size = 100; - auto s3queue_context = Context::createCopy(getContext()); s3queue_context->makeQueryContext(); auto query_configuration = updateConfigurationAndGetCopy(s3queue_context); @@ -358,40 +309,14 @@ void StorageS3Queue::streamToViews() // Only insert into dependent views and expect that input blocks contain virtual columns InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true); auto block_io = interpreter.execute(); - auto column_names = block_io.pipeline.getHeader().getNames(); - - // Create a stream for each consumer and join them in a union stream - - std::shared_ptr iterator_wrapper = createFileIterator(s3queue_context, nullptr); - auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals()); - const size_t max_download_threads = s3queue_context->getSettingsRef().max_download_threads; - - auto pipe = Pipe(std::make_shared( - read_from_format_info, - configuration.format, - getName(), - s3queue_context, - format_settings, - block_size, - query_configuration.request_settings, - configuration.compression_method, - query_configuration.client, - query_configuration.url.bucket, - query_configuration.url.version_id, - query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()), - iterator_wrapper, - files_metadata, - after_processing, - max_download_threads)); + auto pipe = Pipe(createSource(block_io.pipeline.getHeader().getNames(), storage_snapshot, nullptr, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context)); std::atomic_size_t rows = 0; - { - block_io.pipeline.complete(std::move(pipe)); - block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); }); - CompletedPipelineExecutor executor(block_io.pipeline); - executor.execute(); - } + block_io.pipeline.complete(std::move(pipe)); + block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); }); + CompletedPipelineExecutor executor(block_io.pipeline); + executor.execute(); } StorageS3Queue::Configuration StorageS3Queue::updateConfigurationAndGetCopy(ContextPtr local_context) @@ -411,49 +336,40 @@ zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const return zk_client; } - -bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot) +void StorageS3Queue::createOrCheckMetadata(const StorageInMemoryMetadata & storage_metadata) { auto zookeeper = getZooKeeper(); zookeeper->createAncestors(zk_path); - for (size_t i = 0; i < zk_create_table_retries; ++i) + for (size_t i = 0; i < 1000; ++i) { - Coordination::Requests ops; - bool is_first_replica = true; - if (zookeeper->exists(zk_path + "/metadata")) + Coordination::Requests requests; + if (zookeeper->exists(zk_path / "metadata")) { - if (!zookeeper->exists(zk_path + "/processing")) - ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral)); - LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zk_path); - is_first_replica = false; + checkTableStructure(zk_path, storage_metadata); } else { - String metadata_str = S3QueueTableMetadata(configuration, *s3queue_settings).toString(); - ops.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processed", "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/failed", "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral)); - ops.emplace_back(zkutil::makeCreateRequest( - zk_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent)); - - ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/metadata", metadata_str, zkutil::CreateMode::Persistent)); + std::string metadata = S3QueueTableMetadata(configuration, *s3queue_settings, storage_metadata).toString(); + requests.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent)); + requests.emplace_back(zkutil::makeCreateRequest(zk_path / "processed", "", zkutil::CreateMode::Persistent)); + requests.emplace_back(zkutil::makeCreateRequest(zk_path / "failed", "", zkutil::CreateMode::Persistent)); + requests.emplace_back(zkutil::makeCreateRequest(zk_path / "processing", "", zkutil::CreateMode::Persistent)); + requests.emplace_back(zkutil::makeCreateRequest(zk_path / "metadata", metadata, zkutil::CreateMode::Persistent)); } Coordination::Responses responses; - auto code = zookeeper->tryMulti(ops, responses); + auto code = zookeeper->tryMulti(requests, responses); if (code == Coordination::Error::ZNODEEXISTS) { - LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path); + LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path.string()); continue; } else if (code != Coordination::Error::ZOK) { - zkutil::KeeperMultiException::check(code, ops, responses); + zkutil::KeeperMultiException::check(code, requests, responses); } - - return is_first_replica; + return; } throw Exception( @@ -463,24 +379,20 @@ bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_ } -/** Verify that list of columns and table settings match those specified in ZK (/metadata). - * If not, throw an exception. - */ -void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot) +void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const StorageInMemoryMetadata & storage_metadata) { + // Verify that list of columns and table settings match those specified in ZK (/metadata). + // If not, throw an exception. + auto zookeeper = getZooKeeper(); - - S3QueueTableMetadata old_metadata(configuration, *s3queue_settings); - - Coordination::Stat metadata_stat; - String metadata_str = zookeeper->get(fs::path(zookeeper_prefix) / "metadata", &metadata_stat); + String metadata_str = zookeeper->get(fs::path(zookeeper_prefix) / "metadata"); auto metadata_from_zk = S3QueueTableMetadata::parse(metadata_str); + + S3QueueTableMetadata old_metadata(configuration, *s3queue_settings, storage_metadata); old_metadata.checkEquals(metadata_from_zk); - Coordination::Stat columns_stat; - auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_prefix) / "columns", &columns_stat)); - - const ColumnsDescription & old_columns = metadata_snapshot->getColumns(); + auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_prefix) / "columns")); + const ColumnsDescription & old_columns = storage_metadata.getColumns(); if (columns_from_zk != old_columns) { throw Exception( @@ -492,38 +404,12 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const } } - -std::shared_ptr -StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query) +std::shared_ptr StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query) { - auto it = std::make_shared( - *configuration.client, - configuration.url, - query, - virtual_columns, - local_context, - s3queue_settings->s3queue_polling_size.value, - configuration.request_settings); - - auto zookeeper = getZooKeeper(); - auto lock = files_metadata->acquireLock(zookeeper); - S3QueueFilesMetadata::S3FilesCollection files_to_skip = files_metadata->getProcessedFailedAndProcessingFiles(); - - Strings files_to_process; - if (s3queue_settings->mode == S3QueueMode::UNORDERED) - { - files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip); - } - else - { - String max_processed_file = files_metadata->getMaxProcessedFile(); - files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip, max_processed_file); - } - - LOG_TEST(log, "Found files to process: {}", fmt::join(files_to_process, ", ")); - - files_metadata->setFilesProcessing(files_to_process); - return it; + auto glob_iterator = std::make_unique( + *configuration.client, configuration.url, query, virtual_columns, local_context, + /* read_keys */nullptr, configuration.request_settings); + return std::make_shared(files_metadata, std::move(glob_iterator)); } void StorageS3Queue::drop() @@ -540,11 +426,15 @@ void registerStorageS3QueueImpl(const String & name, StorageFactory & factory) [](const StorageFactory::Arguments & args) { if (!args.attach && !args.getLocalContext()->getSettingsRef().allow_experimental_s3queue) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3Queue is experimental. You can enable it with the `allow_experimental_s3queue` setting."); + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3Queue is experimental. " + "You can enable it with the `allow_experimental_s3queue` setting."); + } auto & engine_args = args.engine_args; if (engine_args.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); + auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext()); // Use format settings from global server context + settings from @@ -582,10 +472,6 @@ void registerStorageS3QueueImpl(const String & name, StorageFactory & factory) format_settings = getFormatSettings(args.getContext()); } - ASTPtr partition_by; - if (args.storage_def->partition_by) - partition_by = args.storage_def->partition_by->clone(); - return std::make_shared( std::move(s3queue_settings), std::move(configuration), @@ -594,12 +480,10 @@ void registerStorageS3QueueImpl(const String & name, StorageFactory & factory) args.constraints, args.comment, args.getContext(), - format_settings, - partition_by); + format_settings); }, { .supports_settings = true, - .supports_sort_order = true, // for partition by .supports_schema_inference = true, .source_access_type = AccessType::S3, }); diff --git a/src/Storages/S3Queue/StorageS3Queue.h b/src/Storages/S3Queue/StorageS3Queue.h index 712fe9e530b..9151473a9bc 100644 --- a/src/Storages/S3Queue/StorageS3Queue.h +++ b/src/Storages/S3Queue/StorageS3Queue.h @@ -4,29 +4,14 @@ #if USE_AWS_S3 -# include - -# include -# include - -# include -# include -# include -# include -# include -# include - -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include +#include +#include +#include +#include +#include +#include +#include +#include namespace Aws::S3 { @@ -35,7 +20,7 @@ class Client; namespace DB { - +class S3QueueFilesMetadata; class StorageS3Queue : public IStorage, WithContext { @@ -50,8 +35,7 @@ public: const ConstraintsDescription & constraints_, const String & comment, ContextPtr context_, - std::optional format_settings_, - ASTPtr partition_by_ = nullptr); + std::optional format_settings_); String getName() const override { return "S3Queue"; } @@ -64,79 +48,58 @@ public: size_t max_block_size, size_t num_streams) override; - SinkToStoragePtr write( - const ASTPtr & query, - const StorageMetadataPtr & metadata_snapshot, - ContextPtr context, - bool async_insert) override; - - void truncate( - const ASTPtr & /*query*/, - const StorageMetadataPtr & /*metadata_snapshot*/, - ContextPtr /*local_context*/, - TableExclusiveLockHolder &) override; - - NamesAndTypesList getVirtuals() const override; - - bool supportsPartitionBy() const override; + NamesAndTypesList getVirtuals() const override { return virtual_columns; } const auto & getFormatName() const { return configuration.format; } - const String & getZooKeeperPath() const { return zk_path; } + const fs::path & getZooKeeperPath() const { return zk_path; } zkutil::ZooKeeperPtr getZooKeeper() const; private: + using FileIterator = StorageS3QueueSource::FileIterator; + const std::unique_ptr s3queue_settings; + const fs::path zk_path; const S3QueueAction after_processing; std::shared_ptr files_metadata; Configuration configuration; + + const std::optional format_settings; NamesAndTypesList virtual_columns; - UInt64 reschedule_processing_interval_ms; - std::optional format_settings; - ASTPtr partition_by; - - String zk_path; mutable zkutil::ZooKeeperPtr zk_client; mutable std::mutex zk_mutex; + BackgroundSchedulePool::TaskHolder task; + std::atomic stream_cancelled{false}; + UInt64 reschedule_processing_interval_ms; + std::atomic mv_attached = false; - std::atomic shutdown_called{false}; + std::atomic shutdown_called = false; Poco::Logger * log; - bool supportsSubcolumns() const override; - bool withGlobs() const { return configuration.url.key.find_first_of("*?{") != std::string::npos; } - - void threadFunc(); - size_t getTableDependentCount() const; - bool hasDependencies(const StorageID & table_id); - void startup() override; void shutdown() override; void drop() override; - - struct TaskContext - { - BackgroundSchedulePool::TaskHolder holder; - std::atomic stream_cancelled{false}; - explicit TaskContext(BackgroundSchedulePool::TaskHolder && task_) : holder(std::move(task_)) { } - }; - std::shared_ptr task; - bool supportsSubsetOfColumns() const override; + bool supportsSubcolumns() const override { return true; } - const UInt32 zk_create_table_retries = 1000; - bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot); - void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot); - - using KeysWithInfo = StorageS3QueueSource::KeysWithInfo; - - std::shared_ptr - createFileIterator(ContextPtr local_context, ASTPtr query); + std::shared_ptr createFileIterator(ContextPtr local_context, ASTPtr query); + std::shared_ptr createSource( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + ASTPtr query, + size_t max_block_size, + ContextPtr local_context); + bool hasDependencies(const StorageID & table_id); void streamToViews(); + void threadFunc(); + + void createOrCheckMetadata(const StorageInMemoryMetadata & storage_metadata); + void checkTableStructure(const String & zookeeper_prefix, const StorageInMemoryMetadata & storage_metadata); Configuration updateConfigurationAndGetCopy(ContextPtr local_context); }; diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index c11bbd43dc6..65c31acb5d8 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -111,6 +111,7 @@ def generate_random_files( to_generate = [ (f"{prefix}/test_{i}.csv", i) for i in range(start_ind, start_ind + count) ] + print(f"Generating files: {to_generate}") to_generate.sort(key=lambda x: x[0]) for filename, i in to_generate: @@ -179,29 +180,58 @@ def run_query(instance, query, stdin=None, settings=None): @pytest.mark.parametrize("mode", AVAILABLE_MODES) def test_delete_after_processing(started_cluster, mode): - prefix = "delete" bucket = started_cluster.minio_bucket - instance = started_cluster.instances["instance"] - table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + node = started_cluster.instances["instance"] - total_values = generate_random_files(5, prefix, started_cluster, bucket) - instance.query( + table_name = "test.delete_after_processing" + dst_table_name = "test.delete_after_processing_dst" + mv_name = "test.delete_after_processing_mv" + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + files_num = 5 + row_num = 10 + + prefix = "delete" + total_values = generate_random_files( + files_num, prefix, started_cluster, bucket, row_num=row_num + ) + node.query( f""" - DROP TABLE IF EXISTS test.s3_queue; - CREATE TABLE test.s3_queue ({table_format}) + DROP TABLE IF EXISTS {table_name}; + DROP TABLE IF EXISTS {dst_table_name}; + DROP TABLE IF EXISTS {mv_name}; + CREATE TABLE {table_name} ({table_format}) ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV') SETTINGS mode = '{mode}', keeper_path = '/clickhouse/test_delete_{mode}', s3queue_loading_retries = 3, after_processing='delete'; + + CREATE TABLE {dst_table_name} ({table_format}, _path String) + ENGINE = MergeTree() + ORDER BY column1; + + CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {table_name}; """ ) - get_query = f"SELECT * FROM test.s3_queue ORDER BY column1, column2, column3" + expected_count = files_num * row_num + for _ in range(100): + count = int(node.query(f"SELECT count() FROM {dst_table_name}")) + print(f"{count}/{expected_count}") + if count == expected_count: + break + time.sleep(1) + + assert int(node.query(f"SELECT count() FROM {dst_table_name}")) == expected_count + assert int(node.query(f"SELECT uniq(_path) FROM {dst_table_name}")) == files_num assert [ - list(map(int, l.split())) for l in run_query(instance, get_query).splitlines() + list(map(int, l.split())) + for l in node.query( + f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3" + ).splitlines() ] == sorted(total_values, key=lambda x: (x[0], x[1], x[2])) + minio = started_cluster.minio_client objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True)) assert len(objects) == 0