Review fixes

This commit is contained in:
kssenii 2023-10-16 18:15:11 +02:00
parent 0ddee6ad6c
commit 0a6a4b3894
6 changed files with 26 additions and 17 deletions

View File

@ -474,6 +474,7 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder
if (holder->remove(&requests, &responses)) if (holder->remove(&requests, &responses))
{ {
LOG_TEST(log, "Moved file `{}` to processed", path); LOG_TEST(log, "Moved file `{}` to processed", path);
zk_client->tryRemove(node_name + ".retriable", -1);
return; return;
} }
@ -534,7 +535,11 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
Coordination::Responses responses; Coordination::Responses responses;
if (holder->remove(&requests, &responses)) if (holder->remove(&requests, &responses))
{
LOG_TEST(log, "Moved file `{}` to processed", path);
zk_client->tryRemove(node_name + ".retriable", -1);
return; return;
}
/// Failed to update max processed node, retry. /// Failed to update max processed node, retry.
if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK) if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK)
@ -814,14 +819,12 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
}; };
auto node_cmp = [](const Node & a, const Node & b) auto node_cmp = [](const Node & a, const Node & b)
{ {
if (a.metadata.last_processed_timestamp == b.metadata.last_processed_timestamp) return std::tie(a.metadata.last_processed_timestamp, a.metadata.file_path)
return a.metadata.file_path < b.metadata.file_path; < std::tie(b.metadata.last_processed_timestamp, b.metadata.file_path);
else
return a.metadata.last_processed_timestamp < b.metadata.last_processed_timestamp;
}; };
/// Ordered in ascending order of timestamps. /// Ordered in ascending order of timestamps.
std::multiset<Node, decltype(node_cmp)> sorted_nodes(node_cmp); std::set<Node, decltype(node_cmp)> sorted_nodes(node_cmp);
LOG_TRACE(log, "Found {} nodes", nodes.size()); LOG_TRACE(log, "Found {} nodes", nodes.size());
@ -854,7 +857,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", max_set_size, max_set_age_sec, get_nodes_str()); LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", max_set_size, max_set_age_sec, get_nodes_str());
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes.size() - max_set_size : 0; size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes.size() - max_set_size : 0;
for (const auto & node : sorted_nodes) for (const auto & node : sorted_nodes)
{ {
if (nodes_to_remove) if (nodes_to_remove)
{ {

View File

@ -149,7 +149,11 @@ class S3QueueFilesMetadata::ProcessingNodeHolder
{ {
friend class S3QueueFilesMetadata; friend class S3QueueFilesMetadata;
public: public:
ProcessingNodeHolder(const std::string & processing_id_, const std::string & path_, const std::string & zk_node_path_, zkutil::ZooKeeperPtr zk_client_); ProcessingNodeHolder(
const std::string & processing_id_,
const std::string & path_,
const std::string & zk_node_path_,
zkutil::ZooKeeperPtr zk_client_);
~ProcessingNodeHolder(); ~ProcessingNodeHolder();

View File

@ -167,11 +167,12 @@ Chunk StorageS3QueueSource::generate()
return chunk; return chunk;
} }
} }
catch (const Exception & e) catch (...)
{ {
LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", reader.getFile(), e.displayText()); const auto message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", reader.getFile(), message);
files_metadata->setFileFailed(key_with_info->processing_holder, e.message()); files_metadata->setFileFailed(key_with_info->processing_holder, message);
appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false); appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false);
throw; throw;

View File

@ -65,7 +65,7 @@ public:
RemoveFileFunc remove_file_func_, RemoveFileFunc remove_file_func_,
const NamesAndTypesList & requested_virtual_columns_, const NamesAndTypesList & requested_virtual_columns_,
ContextPtr context_, ContextPtr context_,
const std::atomic<bool> & shutdown_called_, const std::atomic<bool> & shutdown_called_,
std::shared_ptr<S3QueueLog> s3_queue_log_, std::shared_ptr<S3QueueLog> s3_queue_log_,
const StorageID & storage_id_); const StorageID & storage_id_);

View File

@ -210,20 +210,21 @@ Pipe StorageS3Queue::read(
Pipes pipes; Pipes pipes;
const size_t adjusted_num_streams = std::min<size_t>(num_streams, s3queue_settings->s3queue_processing_threads_num); const size_t adjusted_num_streams = std::min<size_t>(num_streams, s3queue_settings->s3queue_processing_threads_num);
auto file_iterator = createFileIterator(local_context, query_info.query);
for (size_t i = 0; i < adjusted_num_streams; ++i) for (size_t i = 0; i < adjusted_num_streams; ++i)
pipes.emplace_back(createSource(column_names, storage_snapshot, query_info.query, max_block_size, local_context)); pipes.emplace_back(createSource(file_iterator, column_names, storage_snapshot, max_block_size, local_context));
return Pipe::unitePipes(std::move(pipes)); return Pipe::unitePipes(std::move(pipes));
} }
std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource( std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
const Names & column_names, const Names & column_names,
const StorageSnapshotPtr & storage_snapshot, const StorageSnapshotPtr & storage_snapshot,
ASTPtr query,
size_t max_block_size, size_t max_block_size,
ContextPtr local_context) ContextPtr local_context)
{ {
auto configuration_snapshot = updateConfigurationAndGetCopy(local_context); auto configuration_snapshot = updateConfigurationAndGetCopy(local_context);
auto file_iterator = createFileIterator(local_context, query);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals()); auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
auto internal_source = std::make_unique<StorageS3Source>( auto internal_source = std::make_unique<StorageS3Source>(
@ -350,12 +351,12 @@ bool StorageS3Queue::streamToViews()
// Only insert into dependent views and expect that input blocks contain virtual columns // Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true); InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true);
auto block_io = interpreter.execute(); auto block_io = interpreter.execute();
auto file_iterator = createFileIterator(s3queue_context, nullptr);
Pipes pipes; Pipes pipes;
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i) for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
{ {
auto source = createSource( auto source = createSource(file_iterator, block_io.pipeline.getHeader().getNames(), storage_snapshot, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
block_io.pipeline.getHeader().getNames(), storage_snapshot, nullptr, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
pipes.emplace_back(std::move(source)); pipes.emplace_back(std::move(source));
} }
auto pipe = Pipe::unitePipes(std::move(pipes)); auto pipe = Pipe::unitePipes(std::move(pipes));

View File

@ -83,9 +83,9 @@ private:
std::shared_ptr<FileIterator> createFileIterator(ContextPtr local_context, ASTPtr query); std::shared_ptr<FileIterator> createFileIterator(ContextPtr local_context, ASTPtr query);
std::shared_ptr<StorageS3QueueSource> createSource( std::shared_ptr<StorageS3QueueSource> createSource(
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
const Names & column_names, const Names & column_names,
const StorageSnapshotPtr & storage_snapshot, const StorageSnapshotPtr & storage_snapshot,
ASTPtr query,
size_t max_block_size, size_t max_block_size,
ContextPtr local_context); ContextPtr local_context);