mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
fix async inserts with empty data
This commit is contained in:
parent
afdd1f696d
commit
09c66c3879
@ -342,10 +342,15 @@ void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)
|
||||
}
|
||||
}
|
||||
|
||||
static void appendElementsToLogSafe(
|
||||
namespace
|
||||
{
|
||||
|
||||
using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
|
||||
|
||||
void appendElementsToLogSafe(
|
||||
AsynchronousInsertLog & log,
|
||||
std::vector<AsynchronousInsertLogElement> elements,
|
||||
std::chrono::time_point<std::chrono::system_clock> flush_time,
|
||||
TimePoint flush_time,
|
||||
const String & flush_query_id,
|
||||
const String & flush_exception)
|
||||
try
|
||||
@ -367,6 +372,8 @@ catch (...)
|
||||
tryLogCurrentException("AsynchronousInsertQueue", "Failed to add elements to AsynchronousInsertLog");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// static
|
||||
void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context)
|
||||
try
|
||||
@ -473,8 +480,27 @@ try
|
||||
format->addBuffer(std::move(last_buffer));
|
||||
auto insert_query_id = insert_context->getCurrentQueryId();
|
||||
|
||||
auto finish_entries = [&](bool has_data)
|
||||
{
|
||||
for (const auto & entry : data->entries)
|
||||
{
|
||||
if (!entry->isFinished())
|
||||
entry->finish();
|
||||
}
|
||||
|
||||
if (!log_elements.empty())
|
||||
{
|
||||
auto flush_time = has_data ? std::chrono::system_clock::now() : TimePoint{};
|
||||
auto query_id = has_data ? insert_query_id : "";
|
||||
appendElementsToLogSafe(*insert_log, std::move(log_elements), flush_time, query_id, "");
|
||||
}
|
||||
};
|
||||
|
||||
if (total_rows == 0)
|
||||
{
|
||||
finish_entries(false);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
@ -502,17 +528,7 @@ try
|
||||
throw;
|
||||
}
|
||||
|
||||
for (const auto & entry : data->entries)
|
||||
{
|
||||
if (!entry->isFinished())
|
||||
entry->finish();
|
||||
}
|
||||
|
||||
if (!log_elements.empty())
|
||||
{
|
||||
auto flush_time = std::chrono::system_clock::now();
|
||||
appendElementsToLogSafe(*insert_log, std::move(log_elements), flush_time, insert_query_id, "");
|
||||
}
|
||||
finish_entries(true);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
|
@ -0,0 +1,2 @@
|
||||
0
|
||||
Ok 0
|
18
tests/queries/0_stateless/02714_async_inserts_empty_data.sh
Executable file
18
tests/queries/0_stateless/02714_async_inserts_empty_data.sh
Executable file
@ -0,0 +1,18 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_async_insert_empty_data"
|
||||
${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_async_insert_empty_data (id UInt32) ENGINE = Memory"
|
||||
|
||||
echo -n '' | ${CLICKHOUSE_CURL} -sS "$url&query=INSERT%20INTO%20t_async_insert_empty_data%20FORMAT%20JSONEachRow" --data-binary @-
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count() FROM t_async_insert_empty_data"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT status, bytes FROM system.asynchronous_insert_log WHERE database = '$CLICKHOUSE_DATABASE' AND table = 't_async_insert_empty_data'"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_async_insert_empty_data"
|
Loading…
Reference in New Issue
Block a user