fix async insert with alter modify column

This commit is contained in:
Anton Popov 2024-08-23 16:08:04 +00:00
parent 1aaf9a08c4
commit 7517ef4cc9
6 changed files with 226 additions and 74 deletions

View File

@ -699,6 +699,17 @@ catch (...)
tryLogCurrentException("AsynchronousInsertQueue", "Failed to add elements to AsynchronousInsertLog");
}
void convertBlockToHeader(Block & block, const Block & header)
{
auto converting_dag = ActionsDAG::makeConvertingActions(
block.getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto converting_actions = std::make_shared<ExpressionActions>(std::move(converting_dag));
converting_actions->execute(block);
}
String serializeQuery(const IAST & query, size_t max_length)
{
return query.hasSecretParts()
@ -794,6 +805,48 @@ try
if (async_insert_log)
log_elements.reserve(data->entries.size());
auto add_entry_to_asynchronous_insert_log = [&](
const InsertData::EntryPtr & entry,
const NameToNameMap & query_by_format,
const String & parsing_exception,
size_t num_rows,
size_t num_bytes)
{
if (!async_insert_log)
return;
AsynchronousInsertLogElement elem;
elem.event_time = timeInSeconds(entry->create_time);
elem.event_time_microseconds = timeInMicroseconds(entry->create_time);
elem.database = query_database;
elem.table = query_table;
elem.format = entry->format;
elem.query_id = entry->query_id;
elem.bytes = num_bytes;
elem.rows = num_rows;
elem.exception = parsing_exception;
elem.data_kind = entry->chunk.getDataKind();
elem.timeout_milliseconds = data->timeout_ms.count();
elem.flush_query_id = insert_query_id;
auto it = query_by_format.find(entry->format);
elem.query_for_logging = it != query_by_format.end() ? it->second : key.query_str;
/// If there was a parsing error,
/// the entry won't be flushed anyway,
/// so add the log element immediately.
if (!elem.exception.empty())
{
elem.status = AsynchronousInsertLogElement::ParsingError;
async_insert_log->add(std::move(elem));
}
else
{
elem.status = AsynchronousInsertLogElement::Ok;
log_elements.push_back(elem);
}
};
try
{
interpreter = std::make_unique<InterpreterInsertQuery>(
@ -822,49 +875,21 @@ try
catch (...)
{
logExceptionBeforeStart(query_for_logging, insert_context, key.query, query_span, start_watch.elapsedMilliseconds());
if (async_insert_log)
{
auto query_by_format = getQueriesByFormat(key.query, data->entries, insert_context);
for (const auto & entry : data->entries)
add_entry_to_asynchronous_insert_log(entry, query_by_format, "", 0, entry->chunk.byteSize());
auto exception = getCurrentExceptionMessage(false);
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, exception);
}
throw;
}
auto add_entry_to_asynchronous_insert_log = [&](const auto & entry,
const auto & entry_query_for_logging,
const auto & exception,
size_t num_rows,
size_t num_bytes,
Milliseconds timeout_ms)
{
if (!async_insert_log)
return;
AsynchronousInsertLogElement elem;
elem.event_time = timeInSeconds(entry->create_time);
elem.event_time_microseconds = timeInMicroseconds(entry->create_time);
elem.query_for_logging = entry_query_for_logging;
elem.database = query_database;
elem.table = query_table;
elem.format = entry->format;
elem.query_id = entry->query_id;
elem.bytes = num_bytes;
elem.rows = num_rows;
elem.exception = exception;
elem.data_kind = entry->chunk.getDataKind();
elem.timeout_milliseconds = timeout_ms.count();
elem.flush_query_id = insert_query_id;
/// If there was a parsing error,
/// the entry won't be flushed anyway,
/// so add the log element immediately.
if (!elem.exception.empty())
{
elem.status = AsynchronousInsertLogElement::ParsingError;
async_insert_log->add(std::move(elem));
}
else
{
log_elements.push_back(elem);
}
};
auto finish_entries = [&]
auto finish_entries = [&](size_t num_rows, size_t num_bytes)
{
for (const auto & entry : data->entries)
{
@ -877,6 +902,13 @@ try
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, "");
}
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
queue_shard_flush_time_history.updateWithCurrentTime();
bool pulling_pipeline = false;
logQueryFinish(
query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal);
};
try
@ -891,20 +923,9 @@ try
ProfileEvents::increment(ProfileEvents::AsyncInsertRows, chunk.getNumRows());
auto log_and_add_finish_to_query_log = [&](size_t num_rows, size_t num_bytes)
{
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
queue_shard_flush_time_history.updateWithCurrentTime();
bool pulling_pipeline = false;
logQueryFinish(
query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal);
};
if (chunk.getNumRows() == 0)
{
finish_entries();
log_and_add_finish_to_query_log(0, 0);
finish_entries(0, 0);
return;
}
@ -917,7 +938,7 @@ try
CompletedPipelineExecutor completed_executor(pipeline);
completed_executor.execute();
log_and_add_finish_to_query_log(num_rows, num_bytes);
finish_entries(num_rows, num_bytes);
}
catch (...)
{
@ -931,8 +952,6 @@ try
}
throw;
}
finish_entries();
}
catch (const Exception & e)
{
@ -1013,7 +1032,7 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
chunk_info->offsets.push_back(total_rows);
chunk_info->tokens.push_back(entry->async_dedup_token);
add_to_async_insert_log(entry, query_for_logging, current_exception, num_rows, num_bytes, data->timeout_ms);
add_to_async_insert_log(entry, {}, current_exception, num_rows, num_bytes);
current_exception.clear();
entry->resetChunk();
@ -1036,19 +1055,7 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
auto chunk_info = std::make_shared<AsyncInsertInfo>();
auto result_columns = header.cloneEmptyColumns();
std::unordered_map<String, String> format_to_query;
auto get_query_by_format = [&](const String & format) -> const String &
{
auto [it, inserted] = format_to_query.try_emplace(format);
if (!inserted)
return it->second;
auto query = key.query->clone();
assert_cast<ASTInsertQuery &>(*query).format = format;
it->second = serializeQuery(*query, insert_context->getSettingsRef().log_queries_cut_to_length);
return it->second;
};
auto query_by_format = getQueriesByFormat(key.query, data->entries, insert_context);
for (const auto & entry : data->entries)
{
@ -1057,17 +1064,19 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected entry with data kind Preprocessed. Got: {}", entry->chunk.getDataKind());
auto columns = block->getColumns();
Block block_to_insert = *block;
if (!isCompatibleHeader(block_to_insert, header))
convertBlockToHeader(block_to_insert, header);
auto columns = block_to_insert.getColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
total_rows += block->rows();
total_rows += block_to_insert.rows();
chunk_info->offsets.push_back(total_rows);
chunk_info->tokens.push_back(entry->async_dedup_token);
const auto & query_for_logging = get_query_by_format(entry->format);
add_to_async_insert_log(entry, query_for_logging, "", block->rows(), block->bytes(), data->timeout_ms);
add_to_async_insert_log(entry, query_by_format, "", block_to_insert.rows(), block_to_insert.bytes());
entry->resetChunk();
}
@ -1076,6 +1085,27 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
return chunk;
}
NameToNameMap AsynchronousInsertQueue::getQueriesByFormat(
const ASTPtr & query,
const std::list<InsertData::EntryPtr> & entries,
const ContextPtr & insert_context)
{
std::unordered_map<String, String> format_to_query;
auto query_copy = query->clone();
for (const auto & entry : entries)
{
auto [it, inserted] = format_to_query.try_emplace(entry->format);
if (!inserted)
continue;
assert_cast<ASTInsertQuery &>(*query_copy).format = entry->format;
it->second = serializeQuery(*query_copy, insert_context->getSettingsRef().log_queries_cut_to_length);
}
return format_to_query;
}
template <typename E>
void AsynchronousInsertQueue::finishWithException(
const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception)

View File

@ -293,6 +293,11 @@ private:
const ContextPtr & insert_context,
LogFunc && add_to_async_insert_log);
static NameToNameMap getQueriesByFormat(
const ASTPtr & query,
const std::list<InsertData::EntryPtr> & entries,
const ContextPtr & insert_context);
template <typename E>
static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);

View File

@ -0,0 +1,8 @@
42 24 0
42 24 0
43 34 55
42 24
43 34
INSERT INTO default.t_async_insert_alter (id, v1) FORMAT Values Preprocessed Ok
INSERT INTO default.t_async_insert_alter (id, v1, value2) FORMAT Values Preprocessed Ok
INSERT INTO default.t_async_insert_alter (id, v1, value2) FORMAT Values Preprocessed FlushError

View File

@ -0,0 +1,46 @@
-- Tags: no-parallel
SET wait_for_async_insert = 0;
SET async_insert_busy_timeout_max_ms = 300000;
SET async_insert_busy_timeout_min_ms = 300000;
SET async_insert_use_adaptive_busy_timeout = 0;
DROP TABLE IF EXISTS t_async_insert_alter;
CREATE TABLE t_async_insert_alter (id Int64, v1 Int64) ENGINE = MergeTree ORDER BY id SETTINGS async_insert = 1;
-- ADD COLUMN
INSERT INTO t_async_insert_alter VALUES (42, 24);
ALTER TABLE t_async_insert_alter ADD COLUMN value2 Int64;
SYSTEM FLUSH ASYNC INSERT QUEUE;
SYSTEM FLUSH LOGS;
SELECT * FROM t_async_insert_alter ORDER BY id;
-- MODIFY COLUMN
INSERT INTO t_async_insert_alter VALUES (43, 34, 55);
ALTER TABLE t_async_insert_alter MODIFY COLUMN value2 String;
SYSTEM FLUSH ASYNC INSERT QUEUE;
SYSTEM FLUSH LOGS;
SELECT * FROM t_async_insert_alter ORDER BY id;
-- DROP COLUMN
INSERT INTO t_async_insert_alter VALUES ('100', '200', '300');
ALTER TABLE t_async_insert_alter DROP COLUMN value2;
SYSTEM FLUSH ASYNC INSERT QUEUE;
SYSTEM FLUSH LOGS;
SELECT * FROM t_async_insert_alter ORDER BY id;
SELECT query, data_kind, status FROM system.asynchronous_insert_log WHERE database = currentDatabase() AND table = 't_async_insert_alter' ORDER BY event_time_microseconds;
DROP TABLE t_async_insert_alter;

View File

@ -0,0 +1,8 @@
42 24 0
42 24 0
43 34 55
42 24
43 34
INSERT INTO default.t_async_insert_alter (id, v1) FORMAT Values Parsed Ok
INSERT INTO default.t_async_insert_alter (id, v1, value2) FORMAT Values Parsed Ok
INSERT INTO default.t_async_insert_alter (id, v1, value2) FORMAT Values Parsed FlushError

View File

@ -0,0 +1,55 @@
#!/usr/bin/env bash
# Tags: no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "
DROP TABLE IF EXISTS t_async_insert_alter;
CREATE TABLE t_async_insert_alter (id Int64, v1 Int64) ENGINE = MergeTree ORDER BY id SETTINGS async_insert = 1;
"
url="${CLICKHOUSE_URL}&async_insert=1&async_insert_busy_timeout_max_ms=300000&async_insert_busy_timeout_min_ms=300000&wait_for_async_insert=0&async_insert_use_adaptive_busy_timeout=0"
# ADD COLUMN
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_async_insert_alter VALUES (42, 24)"
$CLICKHOUSE_CLIENT -q "
ALTER TABLE t_async_insert_alter ADD COLUMN value2 Int64;
SYSTEM FLUSH ASYNC INSERT QUEUE;
SYSTEM FLUSH LOGS;
SELECT * FROM t_async_insert_alter ORDER BY id;
"
# MODIFY COLUMN
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_async_insert_alter VALUES (43, 34, 55)"
$CLICKHOUSE_CLIENT -q "
ALTER TABLE t_async_insert_alter MODIFY COLUMN value2 String;
SYSTEM FLUSH ASYNC INSERT QUEUE;
SYSTEM FLUSH LOGS;
SELECT * FROM t_async_insert_alter ORDER BY id;
"
## DROP COLUMN
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_async_insert_alter VALUES ('100', '200', '300')"
$CLICKHOUSE_CLIENT -q "
ALTER TABLE t_async_insert_alter DROP COLUMN value2;
SYSTEM FLUSH ASYNC INSERT QUEUE;
SYSTEM FLUSH LOGS;
SELECT * FROM t_async_insert_alter ORDER BY id;
SELECT query, data_kind, status FROM system.asynchronous_insert_log WHERE database = currentDatabase() AND table = 't_async_insert_alter' ORDER BY event_time_microseconds;
DROP TABLE t_async_insert_alter;
"