diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index 556eca808f6..ccd8caf1c5a 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -391,7 +391,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee auto code = zookeeper->tryMulti(ops, responses); if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS) - return std::make_shared(current_worker_path, *zookeeper, false, false, description); + return zkutil::EphemeralNodeHolder::existing(current_worker_path, *zookeeper); if (code == Coordination::Error::ZBADVERSION) { diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 4b598147301..d61156b31a9 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -644,11 +644,18 @@ class EphemeralNodeHolder public: using Ptr = std::shared_ptr; - EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool sequential, const std::string & data) + EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool try_create, bool sequential, const std::string & data) : path(path_), zookeeper(zookeeper_) { if (create) + { path = zookeeper.create(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral); + need_remove = created = true; + } + else if (try_create) + { + need_remove = created = Coordination::Error::ZOK == zookeeper.tryCreate(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral); + } } std::string getPath() const @@ -656,19 +663,32 @@ public: return path; } + bool isCreated() const + { + return created; + } + static Ptr create(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "") { - return std::make_shared(path, zookeeper, true, false, data); + return std::make_shared(path, zookeeper, true, false, false, data); + } + + static Ptr tryCreate(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "") + { + auto node = std::make_shared(path, zookeeper, false, true, false, data); + if (node->isCreated()) + return node; + return nullptr; } static Ptr createSequential(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "") { - return std::make_shared(path, zookeeper, true, true, data); + return std::make_shared(path, zookeeper, true, false, true, data); } static Ptr existing(const std::string & path, ZooKeeper & zookeeper) { - return std::make_shared(path, zookeeper, false, false, ""); + return std::make_shared(path, zookeeper, false, false, false, ""); } void setAlreadyRemoved() @@ -702,6 +722,7 @@ private: ZooKeeper & zookeeper; CurrentMetrics::Increment metric_increment{CurrentMetrics::EphemeralNode}; bool need_remove = true; + bool created = false; }; using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr; diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index f61a200f8f0..dbcc085575b 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -4,10 +4,13 @@ #include #include #include +#include #include #include #include +#include #include +#include #include #include #include @@ -55,15 +58,6 @@ std::unique_lock S3QueueFilesMetadata::LocalFileStatuses::lock() con return std::unique_lock(mutex); } -S3QueueFilesMetadata::FileStatus::State S3QueueFilesMetadata::LocalFileStatuses::state(const std::string & filename) const -{ - auto lk = lock(); - if (auto it = file_statuses.find(filename); it != file_statuses.end()) - return it->second->state; - else - return FileStatus::State::None; -} - S3QueueFilesMetadata::FileStatuses S3QueueFilesMetadata::LocalFileStatuses::getAll() const { auto lk = lock(); @@ -106,6 +100,7 @@ std::string S3QueueFilesMetadata::NodeMetadata::toString() const json.set("last_processed_timestamp", getCurrentTime()); json.set("last_exception", last_exception); json.set("retries", retries); + json.set("processing_id", processing_id); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); @@ -123,6 +118,7 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::NodeMetadata::fromStrin metadata.last_processed_timestamp = json->getValue("last_processed_timestamp"); metadata.last_exception = json->getValue("last_exception"); metadata.retries = json->getValue("retries"); + metadata.processing_id = json->getValue("processing_id"); return metadata; } @@ -189,28 +185,28 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata( return metadata; } -bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) +S3QueueFilesMetadata::ProcessingHolderPtr S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds); + auto file_status = local_file_statuses.get(path, /* create */false); /// Check locally cached file status. - switch (local_file_statuses.state(path)) + switch (file_status->state) { case FileStatus::State::Processing: [[fallthrough]]; case FileStatus::State::Processed: { /// File is already processes or processing by current server. - return false; + return nullptr; } case FileStatus::State::Failed: { - if (!max_loading_retries) - { - /// File was processes by current server and failed, - /// retries are disabled. - return false; - } - /// TODO save information if file is still retriable. + /// max_loading_retries == 0 => file is not retriable. + /// file_status->retries is a cached value, so in case file_status->retries >= max_loading retries + /// we can fully rely that it is true, but in other case the value might be outdated, + /// but this is ok, we will recheck with zookeeper. + if (!max_loading_retries || file_status->retries >= max_loading_retries) + return nullptr; break; } case FileStatus::State::None: @@ -220,19 +216,25 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) break; } } - /// TODO lock file token not to go to keeper simultaneously from this server. + std::unique_lock lock(file_status->processing_lock, std::defer_lock); + if (!lock.try_lock()) + { + /// Another thread is already trying to set file as processing. + return nullptr; + } SetFileProcessingResult result; + ProcessingHolderPtr processing_holder; switch (mode) { case S3QueueMode::ORDERED: { - result = trySetFileAsProcessingForOrderedMode(path); + std::tie(result, processing_holder) = trySetFileAsProcessingForOrderedMode(path); break; } case S3QueueMode::UNORDERED: { - result = trySetFileAsProcessingForUnorderedMode(path); + std::tie(result, processing_holder) = trySetFileAsProcessingForUnorderedMode(path); break; } } @@ -240,7 +242,6 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) { case SetFileProcessingResult::Success: { - auto file_status = local_file_statuses.get(path, /* create */true); file_status->state = FileStatus::State::Processing; file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessingMicroseconds, timer.get()); timer.cancel(); @@ -251,14 +252,12 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) case SetFileProcessingResult::AlreadyProcessed: { /// Cache the state. - auto file_status = local_file_statuses.get(path, /* create */true); file_status->state = FileStatus::State::Processed; break; } case SetFileProcessingResult::AlreadyFailed: { /// Cache the state. - auto file_status = local_file_statuses.get(path, /* create */true); file_status->state = FileStatus::State::Failed; break; } @@ -268,54 +267,60 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) break; } } - return result == SetFileProcessingResult::Success; + + if (result != SetFileProcessingResult::Success) + return nullptr; + + return processing_holder; } -S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path) +std::pair +S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path) { /// Create an ephemenral node in /processing /// if corresponding node does not exist in failed/, processed/ and processing/. /// Return false otherwise. const auto node_name = getNodeName(path); - const auto node_metadata = createNodeMetadata(path).toString(); const auto zk_client = getZooKeeper(); + auto node_metadata = createNodeMetadata(path); + node_metadata.processing_id = getRandomASCIIString(10); Coordination::Requests requests; zkutil::addCheckNotExistsRequest(requests, *zk_client, zookeeper_processed_path / node_name); zkutil::addCheckNotExistsRequest(requests, *zk_client, zookeeper_failed_path / node_name); - requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata, zkutil::CreateMode::Ephemeral)); + requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata.toString(), zkutil::CreateMode::Ephemeral)); Coordination::Responses responses; auto code = zk_client->tryMulti(requests, responses); if (code == Coordination::Error::ZOK) { - return SetFileProcessingResult::Success; + auto holder = std::make_unique(node_metadata.processing_id, zookeeper_processing_path / node_name, zk_client); + return std::pair{SetFileProcessingResult::Success, std::move(holder)}; } - else if (responses[0]->error == Coordination::Error::ZOK) - { - if (responses[1]->error == Coordination::Error::ZOK) - { - chassert(responses[2]->error != Coordination::Error::ZOK); - return SetFileProcessingResult::ProcessingByOtherNode; - } - else - return SetFileProcessingResult::AlreadyFailed; - } - else - return SetFileProcessingResult::AlreadyProcessed; + + if (responses[0]->error != Coordination::Error::ZOK) + return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr}; + + if (responses[1]->error != Coordination::Error::ZOK) + return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr}; + + chassert(responses[2]->error != Coordination::Error::ZOK); + return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr}; } -S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path) +std::pair +S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path) { /// Create an ephemenral node in /processing /// if corresponding it does not exist in failed/, processing/ and satisfied max processed file check. /// Return false otherwise. const auto node_name = getNodeName(path); - const auto node_metadata = createNodeMetadata(path).toString(); const auto zk_client = getZooKeeper(); + auto node_metadata = createNodeMetadata(path); + node_metadata.processing_id = getRandomASCIIString(10); while (true) { @@ -330,12 +335,12 @@ S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAs if (responses[0]->error == Coordination::Error::ZOK) { LOG_TEST(log, "Skipping file `{}`: already processing", path); - return SetFileProcessingResult::ProcessingByOtherNode; + return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr}; } else { LOG_TEST(log, "Skipping file `{}`: failed", path); - return SetFileProcessingResult::AlreadyFailed; + return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr}; } } @@ -347,27 +352,30 @@ S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAs auto max_processed_file_path = processed_node_metadata.file_path; if (!max_processed_file_path.empty() && path <= max_processed_file_path) - return SetFileProcessingResult::AlreadyProcessed; + return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr}; requests.clear(); responses.clear(); zkutil::addCheckNotExistsRequest(requests, *zk_client, zookeeper_failed_path / node_name); - requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata, zkutil::CreateMode::Ephemeral)); + requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata.toString(), zkutil::CreateMode::Ephemeral)); requests.push_back(zkutil::makeCheckRequest(zookeeper_processed_path, processed_node_stat.version)); code = zk_client->tryMulti(requests, responses); if (code == Coordination::Error::ZOK) - return SetFileProcessingResult::Success; + { + auto holder = std::make_unique(node_metadata.processing_id, zookeeper_processing_path / node_name, zk_client); + return std::pair{SetFileProcessingResult::Success, std::move(holder)}; + } if (responses[0]->error != Coordination::Error::ZOK) { LOG_TEST(log, "Skipping file `{}`: failed", path); - return SetFileProcessingResult::AlreadyFailed; + return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr}; } else if (responses[1]->error != Coordination::Error::ZOK) { LOG_TEST(log, "Skipping file `{}`: already processing", path); - return SetFileProcessingResult::ProcessingByOtherNode; + return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr}; } else { @@ -465,8 +473,8 @@ void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exc { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds); - SCOPE_EXIT_SAFE({ - auto file_status = local_file_statuses.get(path, /* create */false); + auto file_status = local_file_statuses.get(path, /* create */false); + SCOPE_EXIT({ file_status->state = FileStatus::State::Failed; file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileFailedMicroseconds, timer.get()); timer.cancel(); @@ -505,6 +513,7 @@ void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exc { auto failed_node_metadata = NodeMetadata::fromString(res); node_metadata.retries = failed_node_metadata.retries + 1; + file_status->retries = node_metadata.retries; } LOG_TEST(log, "File `{}` failed to process, try {}/{} (Error: {})", @@ -605,30 +614,12 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() /// Create a lock so that with distributed processing /// multiple nodes do not execute cleanup in parallel. - Coordination::Error code = zk_client->tryCreate(zookeeper_cleanup_lock_path, - toString(getCurrentTime()), - zkutil::CreateMode::Ephemeral); - if (code == Coordination::Error::ZNODEEXISTS) + auto ephemeral_node = zkutil::EphemeralNodeHolder::create(zookeeper_cleanup_lock_path, *zk_client, toString(getCurrentTime())); + if (!ephemeral_node) { LOG_TEST(log, "Cleanup is already being executed by another node"); return; } - else if (code != Coordination::Error::ZOK) - { - throw Coordination::Exception::fromPath(code, zookeeper_cleanup_lock_path); - } - - SCOPE_EXIT_SAFE({ - try - { - zk_client->remove(zookeeper_cleanup_lock_path); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - chassert(false); - } - }); struct Node { @@ -687,7 +678,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() local_file_statuses.remove(node.metadata.file_path, /* if_exists */true); - code = zk_client->tryRemove(path); + auto code = zk_client->tryRemove(path); if (code == Coordination::Error::ZOK) --nodes_to_remove; else @@ -704,7 +695,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() local_file_statuses.remove(node.metadata.file_path, /* if_exists */true); - code = zk_client->tryRemove(path); + auto code = zk_client->tryRemove(path); if (code != Coordination::Error::ZOK) LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", path.string(), code); } diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index b109f584f5e..7dec4c11383 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -22,7 +22,17 @@ public: ~S3QueueFilesMetadata(); - bool trySetFileAsProcessing(const std::string & path); + struct ProcessingHolder + { + ProcessingHolder(const std::string & processing_id_, const std::string & zk_node_path_, zkutil::ZooKeeperPtr zk_client_) + : zk_client(zk_client_), zk_node_path(zk_node_path_), processing_id(processing_id_) {} + + zkutil::ZooKeeperPtr zk_client; + std::string zk_node_path; + std::string processing_id; + }; + using ProcessingHolderPtr = std::unique_ptr; + ProcessingHolderPtr trySetFileAsProcessing(const std::string & path); void setFileProcessed(const std::string & path); @@ -47,6 +57,10 @@ public: time_t processing_start_time = 0; time_t processing_end_time = 0; + + size_t retries = 0; + + std::mutex processing_lock; }; using FileStatuses = std::unordered_map>; @@ -88,8 +102,8 @@ private: AlreadyProcessed, AlreadyFailed, }; - SetFileProcessingResult trySetFileAsProcessingForOrderedMode(const std::string & path); - SetFileProcessingResult trySetFileAsProcessingForUnorderedMode(const std::string & path); + std::pair trySetFileAsProcessingForOrderedMode(const std::string & path); + std::pair trySetFileAsProcessingForUnorderedMode(const std::string & path); struct NodeMetadata { @@ -97,6 +111,7 @@ private: UInt64 last_processed_timestamp = 0; std::string last_exception; UInt64 retries = 0; + std::string processing_id; /// For ephemeral processing node. std::string toString() const; static NodeMetadata fromString(const std::string & metadata_str); @@ -115,7 +130,6 @@ private: FileStatuses getAll() const; std::shared_ptr get(const std::string & filename, bool create); bool remove(const std::string & filename, bool if_exists); - FileStatus::State state(const std::string & filename) const; std::unique_lock lock() const; }; LocalFileStatuses local_file_statuses; diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp index e9a57cbbfd4..ff1a8c86b64 100644 --- a/src/Storages/S3Queue/S3QueueSource.cpp +++ b/src/Storages/S3Queue/S3QueueSource.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -29,24 +30,39 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } +StorageS3QueueSource::S3QueueKeyWithInfo::S3QueueKeyWithInfo( + const std::string & key_, + std::optional info_, + std::unique_ptr processing_holder_, + std::shared_ptr file_status_) + : StorageS3Source::KeyWithInfo(key_, info_) + , processing_holder(std::move(processing_holder_)) + , file_status(file_status_) +{ +} + StorageS3QueueSource::FileIterator::FileIterator( std::shared_ptr metadata_, std::unique_ptr glob_iterator_) : metadata(metadata_) , glob_iterator(std::move(glob_iterator_)) { } -StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::FileIterator::next() +StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next() { /// List results in s3 are always returned in UTF-8 binary order. /// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html) while (true) { - KeyWithInfo val = glob_iterator->next(); - if (val.key.empty()) + KeyWithInfoPtr val = glob_iterator->next(); + + if (!val) return {}; - if (metadata->trySetFileAsProcessing(val.key)) - return val; + + if (auto processing_holder = metadata->trySetFileAsProcessing(val->key); processing_holder) + { + return std::make_shared(val->key, val->info, std::move(processing_holder), nullptr); + } } } @@ -77,6 +93,7 @@ StorageS3QueueSource::StorageS3QueueSource( , shutdown_called(shutdown_called_) , s3_queue_log(s3_queue_log_) , storage_id(storage_id_) + , s3_queue_user_id(fmt::format("{}:{}", CurrentThread::getQueryId(), getRandomASCIIString(8))) , remove_file_func(remove_file_func_) , log(&Poco::Logger::get("StorageS3QueueSource")) { diff --git a/src/Storages/S3Queue/S3QueueSource.h b/src/Storages/S3Queue/S3QueueSource.h index ce8a64022d0..634c0803465 100644 --- a/src/Storages/S3Queue/S3QueueSource.h +++ b/src/Storages/S3Queue/S3QueueSource.h @@ -19,19 +19,30 @@ class StorageS3QueueSource : public ISource, WithContext { public: using IIterator = StorageS3Source::IIterator; + using KeyWithInfoPtr = StorageS3Source::KeyWithInfoPtr; using GlobIterator = StorageS3Source::DisclosedGlobIterator; - using KeyWithInfo = StorageS3Source::KeyWithInfo; using ZooKeeperGetter = std::function; using RemoveFileFunc = std::function; + using Metadata = S3QueueFilesMetadata; + + struct S3QueueKeyWithInfo : public StorageS3Source::KeyWithInfo + { + S3QueueKeyWithInfo( + const std::string & key_, + std::optional info_, + std::unique_ptr processing_holder_, + std::shared_ptr file_status_); + + std::unique_ptr processing_holder; + std::shared_ptr file_status; + }; class FileIterator : public IIterator { public: - FileIterator( - std::shared_ptr metadata_, - std::unique_ptr glob_iterator_); + FileIterator(std::shared_ptr metadata_, std::unique_ptr glob_iterator_); - KeyWithInfo next() override; + KeyWithInfoPtr next() override; size_t estimatedKeysCount() override; @@ -71,6 +82,7 @@ private: const std::atomic & shutdown_called; const std::shared_ptr s3_queue_log; const StorageID storage_id; + const std::string s3_queue_user_id; RemoveFileFunc remove_file_func; Poco::Logger * log; diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 3deb22bd32d..3f8919d0f05 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -161,7 +161,7 @@ public: /// We don't have to list bucket, because there is no asterisks. if (key_prefix.size() == globbed_uri.key.size()) { - buffer.emplace_back(globbed_uri.key, std::nullopt); + buffer.emplace_back(std::make_shared(globbed_uri.key, std::nullopt)); buffer_iter = buffer.begin(); is_finished = true; return; @@ -182,7 +182,7 @@ public: fillInternalBufferAssumeLocked(); } - KeyWithInfo next() + KeyWithInfoPtr next() { std::lock_guard lock(mutex); return nextAssumeLocked(); @@ -201,7 +201,7 @@ public: private: using ListObjectsOutcome = Aws::S3::Model::ListObjectsV2Outcome; - KeyWithInfo nextAssumeLocked() + KeyWithInfoPtr nextAssumeLocked() { if (buffer_iter != buffer.end()) { @@ -277,7 +277,7 @@ private: .last_modification_time = row.GetLastModified().Millis() / 1000, }; - temp_buffer.emplace_back(std::move(key), std::move(info)); + temp_buffer.emplace_back(std::make_shared(std::move(key), std::move(info))); } } @@ -289,7 +289,7 @@ private: if (!is_initialized) { - filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(globbed_uri.bucket) / temp_buffer.front().key, getContext()); + filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(globbed_uri.bucket) / temp_buffer.front()->key, getContext()); is_initialized = true; } @@ -298,7 +298,7 @@ private: std::vector paths; paths.reserve(temp_buffer.size()); for (const auto & key_with_info : temp_buffer) - paths.push_back(fs::path(globbed_uri.bucket) / key_with_info.key); + paths.push_back(fs::path(globbed_uri.bucket) / key_with_info->key); VirtualColumnUtils::filterByPathOrFile(temp_buffer, paths, query, virtual_columns, getContext(), filter_ast); } @@ -307,8 +307,8 @@ private: if (file_progress_callback) { - for (const auto & [_, info] : buffer) - file_progress_callback(FileProgress(0, info->size)); + for (const auto & key_with_info : buffer) + file_progress_callback(FileProgress(0, key_with_info->info->size)); } /// Set iterator only after the whole batch is processed @@ -371,7 +371,7 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( { } -StorageS3Source::KeyWithInfo StorageS3Source::DisclosedGlobIterator::next() +StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next() { return pimpl->next(); } @@ -422,11 +422,11 @@ public: if (read_keys_) { for (const auto & key : keys) - read_keys_->push_back({key, {}}); + read_keys_->push_back(std::make_shared(key)); } } - KeyWithInfo next() + KeyWithInfoPtr next() { size_t current_index = index.fetch_add(1, std::memory_order_relaxed); if (current_index >= keys.size()) @@ -439,7 +439,7 @@ public: file_progress_callback(FileProgress(0, info->size)); } - return {key, info}; + return std::make_shared(key, info); } size_t objectsCount() @@ -476,7 +476,7 @@ StorageS3Source::KeysIterator::KeysIterator( { } -StorageS3Source::KeyWithInfo StorageS3Source::KeysIterator::next() +StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next() { return pimpl->next(); } @@ -502,14 +502,14 @@ StorageS3Source::ReadTaskIterator::ReadTaskIterator( pool.wait(); buffer.reserve(max_threads_count); for (auto & key_future : keys) - buffer.emplace_back(key_future.get(), std::nullopt); + buffer.emplace_back(std::make_shared(key_future.get(), std::nullopt)); } -StorageS3Source::KeyWithInfo StorageS3Source::ReadTaskIterator::next() +StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next() { size_t current_index = index.fetch_add(1, std::memory_order_relaxed); if (current_index >= buffer.size()) - return {callback(), {}}; + return std::make_shared(callback()); return buffer[current_index]; } @@ -566,22 +566,22 @@ StorageS3Source::StorageS3Source( StorageS3Source::ReaderHolder StorageS3Source::createReader() { - KeyWithInfo key_with_info; + KeyWithInfoPtr key_with_info; do { key_with_info = (*file_iterator)(); - if (key_with_info.key.empty()) + if (!key_with_info) return {}; - if (!key_with_info.info) - key_with_info.info = S3::getObjectInfo(*client, bucket, key_with_info.key, version_id, request_settings); + if (!key_with_info->info) + key_with_info->info = S3::getObjectInfo(*client, bucket, key_with_info->key, version_id, request_settings); } - while (getContext()->getSettingsRef().s3_skip_empty_files && key_with_info.info->size == 0); + while (getContext()->getSettingsRef().s3_skip_empty_files && key_with_info->info->size == 0); QueryPipelineBuilder builder; std::shared_ptr source; std::unique_ptr read_buf; - std::optional num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(key_with_info) : std::nullopt; + std::optional num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(*key_with_info) : std::nullopt; if (num_rows_from_cache) { /// We should not return single chunk with all number of rows, @@ -594,8 +594,8 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader() } else { - auto compression_method = chooseCompressionMethod(key_with_info.key, compression_hint); - read_buf = createS3ReadBuffer(key_with_info.key, key_with_info.info->size); + auto compression_method = chooseCompressionMethod(key_with_info->key, compression_hint); + read_buf = createS3ReadBuffer(key_with_info->key, key_with_info->info->size); auto input_format = FormatFactory::instance().getInput( format, @@ -639,7 +639,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader() ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles); - return ReaderHolder{key_with_info, bucket, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)}; + return ReaderHolder{*key_with_info, bucket, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)}; } std::future StorageS3Source::createReaderAsync() @@ -1494,7 +1494,7 @@ namespace { current_key_with_info = (*file_iterator)(); - if (current_key_with_info.key.empty()) + if (!current_key_with_info) { if (first) throw Exception( @@ -1506,6 +1506,8 @@ namespace return nullptr; } + chassert(!current_key_with_info->key.empty()); + /// S3 file iterator could get new keys after new iteration, check them in schema cache. if (getContext()->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size) { @@ -1515,15 +1517,15 @@ namespace return nullptr; } - if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info.info && current_key_with_info.info->size == 0) + if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info->info && current_key_with_info->info->size == 0) continue; int zstd_window_log_max = static_cast(getContext()->getSettingsRef().zstd_window_log_max); - auto impl = std::make_unique(configuration.client, configuration.url.bucket, current_key_with_info.key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings()); + auto impl = std::make_unique(configuration.client, configuration.url.bucket, current_key_with_info->key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings()); if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof()) { first = false; - return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info.key, configuration.compression_method), zstd_window_log_max); + return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), zstd_window_log_max); } } } @@ -1538,7 +1540,7 @@ namespace if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3) return; - String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info.key; + String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info->key; auto key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext()); StorageS3::getSchemaCache(getContext()).addNumRows(key, num_rows); } @@ -1549,7 +1551,7 @@ namespace const StorageS3::Configuration & configuration; const std::optional & format_settings; std::optional columns_from_cache; - StorageS3Source::KeyWithInfo current_key_with_info; + StorageS3Source::KeyWithInfoPtr current_key_with_info; size_t prev_read_keys_size; bool first = true; }; @@ -1689,9 +1691,9 @@ std::optional StorageS3::tryGetColumnsFromCache( auto get_last_mod_time = [&] { time_t last_modification_time = 0; - if (it->info) + if ((*it)->info) { - last_modification_time = it->info->last_modification_time; + last_modification_time = (*it)->info->last_modification_time; } else { @@ -1701,7 +1703,7 @@ std::optional StorageS3::tryGetColumnsFromCache( last_modification_time = S3::getObjectInfo( *configuration.client, configuration.url.bucket, - it->key, + (*it)->key, configuration.url.version_id, configuration.request_settings, /*with_metadata=*/ false, @@ -1712,7 +1714,7 @@ std::optional StorageS3::tryGetColumnsFromCache( return last_modification_time ? std::make_optional(last_modification_time) : std::nullopt; }; - String path = fs::path(configuration.url.bucket) / it->key; + String path = fs::path(configuration.url.bucket) / (*it)->key; String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / path; auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, ctx); auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time); @@ -1734,7 +1736,7 @@ void StorageS3::addColumnsToCache( auto host_and_bucket = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket; Strings sources; sources.reserve(keys.size()); - std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem.key; }); + std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem->key; }); auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx); auto & schema_cache = getSchemaCache(ctx); schema_cache.addManyColumns(cache_keys, columns); diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 088f9000ce8..38cf3a5f65b 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -43,22 +43,21 @@ public: struct KeyWithInfo { KeyWithInfo() = default; - KeyWithInfo(String key_, std::optional info_) - : key(std::move(key_)), info(std::move(info_)) - { - } + explicit KeyWithInfo(String key_, std::optional info_ = std::nullopt) + : key(std::move(key_)), info(std::move(info_)) {} String key; std::optional info; }; + using KeyWithInfoPtr = std::shared_ptr; - using KeysWithInfo = std::vector; + using KeysWithInfo = std::vector; class IIterator { public: virtual ~IIterator() = default; - virtual KeyWithInfo next() = 0; + virtual KeyWithInfoPtr next() = 0; /// Estimates how many streams we need to process all files. /// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys. @@ -66,7 +65,7 @@ public: /// fixme: May underestimate if the glob has a strong filter, so there are few matches among the first 1000 ListObjects results. virtual size_t estimatedKeysCount() = 0; - KeyWithInfo operator ()() { return next(); } + KeyWithInfoPtr operator ()() { return next(); } }; class DisclosedGlobIterator : public IIterator @@ -82,7 +81,7 @@ public: const S3Settings::RequestSettings & request_settings_ = {}, std::function progress_callback_ = {}); - KeyWithInfo next() override; + KeyWithInfoPtr next() override; size_t estimatedKeysCount() override; private: @@ -106,7 +105,7 @@ public: KeysWithInfo * read_keys = nullptr, std::function progress_callback_ = {}); - KeyWithInfo next() override; + KeyWithInfoPtr next() override; size_t estimatedKeysCount() override; private: @@ -120,7 +119,7 @@ public: public: explicit ReadTaskIterator(const ReadTaskCallback & callback_, const size_t max_threads_count); - KeyWithInfo next() override; + KeyWithInfoPtr next() override; size_t estimatedKeysCount() override; private: diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index c8715938c6f..1dd1a410595 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -82,7 +82,7 @@ RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr { auto iterator = std::make_shared( *s3_configuration.client, s3_configuration.url, query, virtual_columns, context, nullptr, s3_configuration.request_settings, context->getFileProgressCallback()); - auto callback = std::make_shared>([iterator]() mutable -> String { return iterator->next().key; }); + auto callback = std::make_shared>([iterator]() mutable -> String { return iterator->next()->key; }); return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; }