return back num_bytes

This commit is contained in:
Anton Popov 2024-08-29 16:03:41 +00:00
parent 4499f20ca8
commit a4ddca773b

View File

@ -808,7 +808,8 @@ try
auto add_entry_to_asynchronous_insert_log = [&, query_by_format = NameToNameMap{}](
const InsertData::EntryPtr & entry,
const String & parsing_exception,
size_t num_rows) mutable
size_t num_rows,
size_t num_bytes) mutable
{
if (!async_insert_log)
return;
@ -820,7 +821,7 @@ try
elem.table = query_table;
elem.format = entry->format;
elem.query_id = entry->query_id;
elem.bytes = entry->chunk.byteSize();
elem.bytes = num_bytes;
elem.rows = num_rows;
elem.exception = parsing_exception;
elem.data_kind = entry->chunk.getDataKind();
@ -891,7 +892,7 @@ try
if (async_insert_log)
{
for (const auto & entry : data->entries)
add_entry_to_asynchronous_insert_log(entry, /*parsing_exception=*/ "", /*num_rows=*/ 0);
add_entry_to_asynchronous_insert_log(entry, /*parsing_exception=*/ "", /*num_rows=*/ 0, entry->chunk.byteSize());
auto exception = getCurrentExceptionMessage(false);
auto flush_time = std::chrono::system_clock::now();
@ -1035,13 +1036,15 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
auto buffer = std::make_unique<ReadBufferFromString>(*bytes);
size_t num_bytes = bytes->size();
size_t num_rows = executor.execute(*buffer);
total_rows += num_rows;
chunk_info->offsets.push_back(total_rows);
chunk_info->tokens.push_back(entry->async_dedup_token);
add_to_async_insert_log(entry, current_exception, num_rows);
add_to_async_insert_log(entry, current_exception, num_rows, num_bytes);
current_exception.clear();
entry->resetChunk();
}
@ -1081,7 +1084,7 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
chunk_info->offsets.push_back(total_rows);
chunk_info->tokens.push_back(entry->async_dedup_token);
add_to_async_insert_log(entry, /*parsing_exception=*/ "", block_to_insert.rows());
add_to_async_insert_log(entry, /*parsing_exception=*/ "", block_to_insert.rows(), block_to_insert.bytes());
entry->resetChunk();
}