mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Backport #69080 to 24.3: fix logical error for empty async inserts
This commit is contained in:
parent
e7f370a7ea
commit
71a20b49e1
@ -977,8 +977,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
|
||||
size_t num_rows = executor.execute(*buffer);
|
||||
|
||||
total_rows += num_rows;
|
||||
chunk_info->offsets.push_back(total_rows);
|
||||
chunk_info->tokens.push_back(entry->async_dedup_token);
|
||||
/// for some reason, client can pass zero rows and bytes to server.
|
||||
/// We don't update offsets in this case, because we assume every insert has some rows during dedup
|
||||
/// but we have nothing to deduplicate for this insert.
|
||||
if (num_rows > 0)
|
||||
{
|
||||
chunk_info->offsets.push_back(total_rows);
|
||||
chunk_info->tokens.push_back(entry->async_dedup_token);
|
||||
}
|
||||
|
||||
add_to_async_insert_log(entry, query_for_logging, current_exception, num_rows, num_bytes, data->timeout_ms);
|
||||
|
||||
@ -1029,8 +1035,14 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
|
||||
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
|
||||
|
||||
total_rows += block->rows();
|
||||
chunk_info->offsets.push_back(total_rows);
|
||||
chunk_info->tokens.push_back(entry->async_dedup_token);
|
||||
/// for some reason, client can pass zero rows and bytes to server.
|
||||
/// We don't update offsets in this case, because we assume every insert has some rows during dedup,
|
||||
/// but we have nothing to deduplicate for this insert.
|
||||
if (block->rows())
|
||||
{
|
||||
chunk_info->offsets.push_back(total_rows);
|
||||
chunk_info->tokens.push_back(entry->async_dedup_token);
|
||||
}
|
||||
|
||||
const auto & query_for_logging = get_query_by_format(entry->format);
|
||||
add_to_async_insert_log(entry, query_for_logging, "", block->rows(), block->bytes(), data->timeout_ms);
|
||||
|
@ -48,9 +48,11 @@ def generate_data(q, total_number, use_token):
|
||||
partitions = ["2022-11-11 10:10:10", "2022-12-12 10:10:10"]
|
||||
last_number = 0
|
||||
while True:
|
||||
dup_simulate = random.randint(0, 3)
|
||||
# 0 to simulate duplication
|
||||
# 1 to simulate empty
|
||||
simulate_flag = random.randint(0, 4)
|
||||
# insert old data randomly. 25% of them are dup.
|
||||
if dup_simulate == 0:
|
||||
if simulate_flag == 0:
|
||||
last_idx = len(old_data) - 1
|
||||
if last_idx < 0:
|
||||
continue
|
||||
@ -58,6 +60,11 @@ def generate_data(q, total_number, use_token):
|
||||
if idx < 0:
|
||||
idx = 0
|
||||
q.put(old_data[idx])
|
||||
if simulate_flag == 1:
|
||||
empty_insert_stmt = (
|
||||
"insert into t_async_insert_dedup values format JSONEachRow"
|
||||
)
|
||||
q.put((empty_insert_stmt, ""))
|
||||
else:
|
||||
# insert new data.
|
||||
chunk_size = random.randint(1, max_chunk_size)
|
||||
|
@ -1,5 +1,5 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long, zookeeper, no-parallel, no-fasttest
|
||||
# Tags: long, zookeeper, no-fasttest
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
|
@ -1,5 +1,5 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long, zookeeper, no-parallel, no-fasttest
|
||||
# Tags: long, zookeeper, no-fasttest
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
|
Loading…
Reference in New Issue
Block a user