better code in AsynchronousInsertQueue

This commit is contained in:
Anton Popov 2024-08-29 14:38:59 +00:00
parent b39c5cdb40
commit 4499f20ca8
2 changed files with 27 additions and 50 deletions

View File

@ -805,12 +805,10 @@ try
if (async_insert_log)
log_elements.reserve(data->entries.size());
auto add_entry_to_asynchronous_insert_log = [&](
auto add_entry_to_asynchronous_insert_log = [&, query_by_format = NameToNameMap{}](
const InsertData::EntryPtr & entry,
const NameToNameMap & query_by_format,
const String & parsing_exception,
size_t num_rows,
size_t num_bytes)
size_t num_rows) mutable
{
if (!async_insert_log)
return;
@ -822,15 +820,29 @@ try
elem.table = query_table;
elem.format = entry->format;
elem.query_id = entry->query_id;
elem.bytes = num_bytes;
elem.bytes = entry->chunk.byteSize();
elem.rows = num_rows;
elem.exception = parsing_exception;
elem.data_kind = entry->chunk.getDataKind();
elem.timeout_milliseconds = data->timeout_ms.count();
elem.flush_query_id = insert_query_id;
auto it = query_by_format.find(entry->format);
elem.query_for_logging = it != query_by_format.end() ? it->second : key.query_str;
auto get_query_by_format = [&](const String & format) -> const String &
{
auto [it, inserted] = query_by_format.try_emplace(format);
if (!inserted)
return it->second;
auto query = key.query->clone();
assert_cast<ASTInsertQuery &>(*query).format = format;
it->second = serializeQuery(*query, insert_context->getSettingsRef().log_queries_cut_to_length);
return it->second;
};
if (entry->chunk.getDataKind() == DataKind::Parsed)
elem.query_for_logging = key.query_str;
else
elem.query_for_logging = get_query_by_format(entry->format);
/// If there was a parsing error,
/// the entry won't be flushed anyway,
@ -843,7 +855,7 @@ try
else
{
elem.status = AsynchronousInsertLogElement::Ok;
log_elements.push_back(elem);
log_elements.push_back(std::move(elem));
}
};
@ -878,9 +890,8 @@ try
if (async_insert_log)
{
auto query_by_format = getQueriesByFormat(key.query, data->entries, insert_context);
for (const auto & entry : data->entries)
add_entry_to_asynchronous_insert_log(entry, query_by_format, "", 0, entry->chunk.byteSize());
add_entry_to_asynchronous_insert_log(entry, /*parsing_exception=*/ "", /*num_rows=*/ 0);
auto exception = getCurrentExceptionMessage(false);
auto flush_time = std::chrono::system_clock::now();
@ -919,13 +930,13 @@ try
if (key.data_kind == DataKind::Parsed)
chunk = processEntriesWithParsing(key, data, header, insert_context, log, add_entry_to_asynchronous_insert_log);
else
chunk = processPreprocessedEntries(key, data, header, insert_context, add_entry_to_asynchronous_insert_log);
chunk = processPreprocessedEntries(data, header, add_entry_to_asynchronous_insert_log);
ProfileEvents::increment(ProfileEvents::AsyncInsertRows, chunk.getNumRows());
if (chunk.getNumRows() == 0)
{
finish_entries(0, 0);
finish_entries(/*num_rows=*/ 0, /*num_bytes=*/ 0);
return;
}
@ -1012,7 +1023,6 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform));
auto chunk_info = std::make_shared<AsyncInsertInfo>();
auto query_for_logging = serializeQuery(*key.query, insert_context->getSettingsRef().log_queries_cut_to_length);
for (const auto & entry : data->entries)
{
@ -1025,15 +1035,13 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
auto buffer = std::make_unique<ReadBufferFromString>(*bytes);
size_t num_bytes = bytes->size();
size_t num_rows = executor.execute(*buffer);
total_rows += num_rows;
chunk_info->offsets.push_back(total_rows);
chunk_info->tokens.push_back(entry->async_dedup_token);
add_to_async_insert_log(entry, {}, current_exception, num_rows, num_bytes);
add_to_async_insert_log(entry, current_exception, num_rows);
current_exception.clear();
entry->resetChunk();
}
@ -1045,18 +1053,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
template <typename LogFunc>
Chunk AsynchronousInsertQueue::processPreprocessedEntries(
const InsertQuery & key,
const InsertDataPtr & data,
const Block & header,
const ContextPtr & insert_context,
LogFunc && add_to_async_insert_log)
{
size_t total_rows = 0;
auto chunk_info = std::make_shared<AsyncInsertInfo>();
auto result_columns = header.cloneEmptyColumns();
auto query_by_format = getQueriesByFormat(key.query, data->entries, insert_context);
for (const auto & entry : data->entries)
{
const auto * block = entry->chunk.asBlock();
@ -1073,10 +1077,11 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
total_rows += block_to_insert.rows();
chunk_info->offsets.push_back(total_rows);
chunk_info->tokens.push_back(entry->async_dedup_token);
add_to_async_insert_log(entry, query_by_format, "", block_to_insert.rows(), block_to_insert.bytes());
add_to_async_insert_log(entry, /*parsing_exception=*/ "", block_to_insert.rows());
entry->resetChunk();
}
@ -1085,27 +1090,6 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
return chunk;
}
NameToNameMap AsynchronousInsertQueue::getQueriesByFormat(
const ASTPtr & query,
const std::list<InsertData::EntryPtr> & entries,
const ContextPtr & insert_context)
{
std::unordered_map<String, String> format_to_query;
auto query_copy = query->clone();
for (const auto & entry : entries)
{
auto [it, inserted] = format_to_query.try_emplace(entry->format);
if (!inserted)
continue;
assert_cast<ASTInsertQuery &>(*query_copy).format = entry->format;
it->second = serializeQuery(*query_copy, insert_context->getSettingsRef().log_queries_cut_to_length);
}
return format_to_query;
}
template <typename E>
void AsynchronousInsertQueue::finishWithException(
const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception)

View File

@ -287,17 +287,10 @@ private:
template <typename LogFunc>
static Chunk processPreprocessedEntries(
const InsertQuery & key,
const InsertDataPtr & data,
const Block & header,
const ContextPtr & insert_context,
LogFunc && add_to_async_insert_log);
static NameToNameMap getQueriesByFormat(
const ASTPtr & query,
const std::list<InsertData::EntryPtr> & entries,
const ContextPtr & insert_context);
template <typename E>
static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);