diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index 150174aabcb..963a64e257e 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -474,6 +474,7 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder if (holder->remove(&requests, &responses)) { LOG_TEST(log, "Moved file `{}` to processed", path); + zk_client->tryRemove(node_name + ".retriable", -1); return; } @@ -534,7 +535,11 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt Coordination::Responses responses; if (holder->remove(&requests, &responses)) + { + LOG_TEST(log, "Moved file `{}` to processed", path); + zk_client->tryRemove(node_name + ".retriable", -1); return; + } /// Failed to update max processed node, retry. if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK) @@ -814,14 +819,12 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() }; auto node_cmp = [](const Node & a, const Node & b) { - if (a.metadata.last_processed_timestamp == b.metadata.last_processed_timestamp) - return a.metadata.file_path < b.metadata.file_path; - else - return a.metadata.last_processed_timestamp < b.metadata.last_processed_timestamp; + return std::tie(a.metadata.last_processed_timestamp, a.metadata.file_path) + < std::tie(b.metadata.last_processed_timestamp, b.metadata.file_path); }; /// Ordered in ascending order of timestamps. - std::multiset sorted_nodes(node_cmp); + std::set sorted_nodes(node_cmp); LOG_TRACE(log, "Found {} nodes", nodes.size()); @@ -854,7 +857,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", max_set_size, max_set_age_sec, get_nodes_str()); size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes.size() - max_set_size : 0; - for (const auto & node : sorted_nodes) + for (const auto & node : sorted_nodes) { if (nodes_to_remove) { diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index a2fd1fb6699..df9db87a621 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -149,7 +149,11 @@ class S3QueueFilesMetadata::ProcessingNodeHolder { friend class S3QueueFilesMetadata; public: - ProcessingNodeHolder(const std::string & processing_id_, const std::string & path_, const std::string & zk_node_path_, zkutil::ZooKeeperPtr zk_client_); + ProcessingNodeHolder( + const std::string & processing_id_, + const std::string & path_, + const std::string & zk_node_path_, + zkutil::ZooKeeperPtr zk_client_); ~ProcessingNodeHolder(); diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp index 1a44fe9cff8..6ea222df71f 100644 --- a/src/Storages/S3Queue/S3QueueSource.cpp +++ b/src/Storages/S3Queue/S3QueueSource.cpp @@ -167,11 +167,12 @@ Chunk StorageS3QueueSource::generate() return chunk; } } - catch (const Exception & e) + catch (...) { - LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", reader.getFile(), e.displayText()); + const auto message = getCurrentExceptionMessage(true); + LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", reader.getFile(), message); - files_metadata->setFileFailed(key_with_info->processing_holder, e.message()); + files_metadata->setFileFailed(key_with_info->processing_holder, message); appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false); throw; diff --git a/src/Storages/S3Queue/S3QueueSource.h b/src/Storages/S3Queue/S3QueueSource.h index db3015f129e..7c8eb3eeb74 100644 --- a/src/Storages/S3Queue/S3QueueSource.h +++ b/src/Storages/S3Queue/S3QueueSource.h @@ -65,7 +65,7 @@ public: RemoveFileFunc remove_file_func_, const NamesAndTypesList & requested_virtual_columns_, ContextPtr context_, - const std::atomic & shutdown_called_, + const std::atomic & shutdown_called_, std::shared_ptr s3_queue_log_, const StorageID & storage_id_); diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index 37389eb1bd0..92f15aed62f 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -210,20 +210,21 @@ Pipe StorageS3Queue::read( Pipes pipes; const size_t adjusted_num_streams = std::min(num_streams, s3queue_settings->s3queue_processing_threads_num); + + auto file_iterator = createFileIterator(local_context, query_info.query); for (size_t i = 0; i < adjusted_num_streams; ++i) - pipes.emplace_back(createSource(column_names, storage_snapshot, query_info.query, max_block_size, local_context)); + pipes.emplace_back(createSource(file_iterator, column_names, storage_snapshot, max_block_size, local_context)); return Pipe::unitePipes(std::move(pipes)); } std::shared_ptr StorageS3Queue::createSource( + std::shared_ptr file_iterator, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, - ASTPtr query, size_t max_block_size, ContextPtr local_context) { auto configuration_snapshot = updateConfigurationAndGetCopy(local_context); - auto file_iterator = createFileIterator(local_context, query); auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals()); auto internal_source = std::make_unique( @@ -350,12 +351,12 @@ bool StorageS3Queue::streamToViews() // Only insert into dependent views and expect that input blocks contain virtual columns InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true); auto block_io = interpreter.execute(); + auto file_iterator = createFileIterator(s3queue_context, nullptr); Pipes pipes; for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i) { - auto source = createSource( - block_io.pipeline.getHeader().getNames(), storage_snapshot, nullptr, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context); + auto source = createSource(file_iterator, block_io.pipeline.getHeader().getNames(), storage_snapshot, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context); pipes.emplace_back(std::move(source)); } auto pipe = Pipe::unitePipes(std::move(pipes)); diff --git a/src/Storages/S3Queue/StorageS3Queue.h b/src/Storages/S3Queue/StorageS3Queue.h index 2a62078fcca..000015951ea 100644 --- a/src/Storages/S3Queue/StorageS3Queue.h +++ b/src/Storages/S3Queue/StorageS3Queue.h @@ -83,9 +83,9 @@ private: std::shared_ptr createFileIterator(ContextPtr local_context, ASTPtr query); std::shared_ptr createSource( + std::shared_ptr file_iterator, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, - ASTPtr query, size_t max_block_size, ContextPtr local_context);