From 4499f20ca8af46f88da72f6eaeb2880ded348efa Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 29 Aug 2024 14:38:59 +0000 Subject: [PATCH] better code in AsynchronousInsertQueue --- src/Interpreters/AsynchronousInsertQueue.cpp | 70 ++++++++------------ src/Interpreters/AsynchronousInsertQueue.h | 7 -- 2 files changed, 27 insertions(+), 50 deletions(-) diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 5ed0d6d6257..c619ea80c7c 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -805,12 +805,10 @@ try if (async_insert_log) log_elements.reserve(data->entries.size()); - auto add_entry_to_asynchronous_insert_log = [&]( + auto add_entry_to_asynchronous_insert_log = [&, query_by_format = NameToNameMap{}]( const InsertData::EntryPtr & entry, - const NameToNameMap & query_by_format, const String & parsing_exception, - size_t num_rows, - size_t num_bytes) + size_t num_rows) mutable { if (!async_insert_log) return; @@ -822,15 +820,29 @@ try elem.table = query_table; elem.format = entry->format; elem.query_id = entry->query_id; - elem.bytes = num_bytes; + elem.bytes = entry->chunk.byteSize(); elem.rows = num_rows; elem.exception = parsing_exception; elem.data_kind = entry->chunk.getDataKind(); elem.timeout_milliseconds = data->timeout_ms.count(); elem.flush_query_id = insert_query_id; - auto it = query_by_format.find(entry->format); - elem.query_for_logging = it != query_by_format.end() ? it->second : key.query_str; + auto get_query_by_format = [&](const String & format) -> const String & + { + auto [it, inserted] = query_by_format.try_emplace(format); + if (!inserted) + return it->second; + + auto query = key.query->clone(); + assert_cast(*query).format = format; + it->second = serializeQuery(*query, insert_context->getSettingsRef().log_queries_cut_to_length); + return it->second; + }; + + if (entry->chunk.getDataKind() == DataKind::Parsed) + elem.query_for_logging = key.query_str; + else + elem.query_for_logging = get_query_by_format(entry->format); /// If there was a parsing error, /// the entry won't be flushed anyway, @@ -843,7 +855,7 @@ try else { elem.status = AsynchronousInsertLogElement::Ok; - log_elements.push_back(elem); + log_elements.push_back(std::move(elem)); } }; @@ -878,9 +890,8 @@ try if (async_insert_log) { - auto query_by_format = getQueriesByFormat(key.query, data->entries, insert_context); for (const auto & entry : data->entries) - add_entry_to_asynchronous_insert_log(entry, query_by_format, "", 0, entry->chunk.byteSize()); + add_entry_to_asynchronous_insert_log(entry, /*parsing_exception=*/ "", /*num_rows=*/ 0); auto exception = getCurrentExceptionMessage(false); auto flush_time = std::chrono::system_clock::now(); @@ -919,13 +930,13 @@ try if (key.data_kind == DataKind::Parsed) chunk = processEntriesWithParsing(key, data, header, insert_context, log, add_entry_to_asynchronous_insert_log); else - chunk = processPreprocessedEntries(key, data, header, insert_context, add_entry_to_asynchronous_insert_log); + chunk = processPreprocessedEntries(data, header, add_entry_to_asynchronous_insert_log); ProfileEvents::increment(ProfileEvents::AsyncInsertRows, chunk.getNumRows()); if (chunk.getNumRows() == 0) { - finish_entries(0, 0); + finish_entries(/*num_rows=*/ 0, /*num_bytes=*/ 0); return; } @@ -1012,7 +1023,6 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing( StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform)); auto chunk_info = std::make_shared(); - auto query_for_logging = serializeQuery(*key.query, insert_context->getSettingsRef().log_queries_cut_to_length); for (const auto & entry : data->entries) { @@ -1025,15 +1035,13 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing( auto buffer = std::make_unique(*bytes); - size_t num_bytes = bytes->size(); size_t num_rows = executor.execute(*buffer); - total_rows += num_rows; + chunk_info->offsets.push_back(total_rows); chunk_info->tokens.push_back(entry->async_dedup_token); - add_to_async_insert_log(entry, {}, current_exception, num_rows, num_bytes); - + add_to_async_insert_log(entry, current_exception, num_rows); current_exception.clear(); entry->resetChunk(); } @@ -1045,18 +1053,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing( template Chunk AsynchronousInsertQueue::processPreprocessedEntries( - const InsertQuery & key, const InsertDataPtr & data, const Block & header, - const ContextPtr & insert_context, LogFunc && add_to_async_insert_log) { size_t total_rows = 0; auto chunk_info = std::make_shared(); auto result_columns = header.cloneEmptyColumns(); - auto query_by_format = getQueriesByFormat(key.query, data->entries, insert_context); - for (const auto & entry : data->entries) { const auto * block = entry->chunk.asBlock(); @@ -1073,10 +1077,11 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries( result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size()); total_rows += block_to_insert.rows(); + chunk_info->offsets.push_back(total_rows); chunk_info->tokens.push_back(entry->async_dedup_token); - add_to_async_insert_log(entry, query_by_format, "", block_to_insert.rows(), block_to_insert.bytes()); + add_to_async_insert_log(entry, /*parsing_exception=*/ "", block_to_insert.rows()); entry->resetChunk(); } @@ -1085,27 +1090,6 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries( return chunk; } -NameToNameMap AsynchronousInsertQueue::getQueriesByFormat( - const ASTPtr & query, - const std::list & entries, - const ContextPtr & insert_context) -{ - std::unordered_map format_to_query; - auto query_copy = query->clone(); - - for (const auto & entry : entries) - { - auto [it, inserted] = format_to_query.try_emplace(entry->format); - if (!inserted) - continue; - - assert_cast(*query_copy).format = entry->format; - it->second = serializeQuery(*query_copy, insert_context->getSettingsRef().log_queries_cut_to_length); - } - - return format_to_query; -} - template void AsynchronousInsertQueue::finishWithException( const ASTPtr & query, const std::list & entries, const E & exception) diff --git a/src/Interpreters/AsynchronousInsertQueue.h b/src/Interpreters/AsynchronousInsertQueue.h index 9a84fe8bb12..cbe998a2850 100644 --- a/src/Interpreters/AsynchronousInsertQueue.h +++ b/src/Interpreters/AsynchronousInsertQueue.h @@ -287,17 +287,10 @@ private: template static Chunk processPreprocessedEntries( - const InsertQuery & key, const InsertDataPtr & data, const Block & header, - const ContextPtr & insert_context, LogFunc && add_to_async_insert_log); - static NameToNameMap getQueriesByFormat( - const ASTPtr & query, - const std::list & entries, - const ContextPtr & insert_context); - template static void finishWithException(const ASTPtr & query, const std::list & entries, const E & exception);