diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 52f8a8dd42a..5ed0d6d6257 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -699,6 +699,17 @@ catch (...) tryLogCurrentException("AsynchronousInsertQueue", "Failed to add elements to AsynchronousInsertLog"); } +void convertBlockToHeader(Block & block, const Block & header) +{ + auto converting_dag = ActionsDAG::makeConvertingActions( + block.getColumnsWithTypeAndName(), + header.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Name); + + auto converting_actions = std::make_shared(std::move(converting_dag)); + converting_actions->execute(block); +} + String serializeQuery(const IAST & query, size_t max_length) { return query.hasSecretParts() @@ -794,6 +805,48 @@ try if (async_insert_log) log_elements.reserve(data->entries.size()); + auto add_entry_to_asynchronous_insert_log = [&]( + const InsertData::EntryPtr & entry, + const NameToNameMap & query_by_format, + const String & parsing_exception, + size_t num_rows, + size_t num_bytes) + { + if (!async_insert_log) + return; + + AsynchronousInsertLogElement elem; + elem.event_time = timeInSeconds(entry->create_time); + elem.event_time_microseconds = timeInMicroseconds(entry->create_time); + elem.database = query_database; + elem.table = query_table; + elem.format = entry->format; + elem.query_id = entry->query_id; + elem.bytes = num_bytes; + elem.rows = num_rows; + elem.exception = parsing_exception; + elem.data_kind = entry->chunk.getDataKind(); + elem.timeout_milliseconds = data->timeout_ms.count(); + elem.flush_query_id = insert_query_id; + + auto it = query_by_format.find(entry->format); + elem.query_for_logging = it != query_by_format.end() ? it->second : key.query_str; + + /// If there was a parsing error, + /// the entry won't be flushed anyway, + /// so add the log element immediately. + if (!elem.exception.empty()) + { + elem.status = AsynchronousInsertLogElement::ParsingError; + async_insert_log->add(std::move(elem)); + } + else + { + elem.status = AsynchronousInsertLogElement::Ok; + log_elements.push_back(elem); + } + }; + try { interpreter = std::make_unique( @@ -822,49 +875,21 @@ try catch (...) { logExceptionBeforeStart(query_for_logging, insert_context, key.query, query_span, start_watch.elapsedMilliseconds()); + + if (async_insert_log) + { + auto query_by_format = getQueriesByFormat(key.query, data->entries, insert_context); + for (const auto & entry : data->entries) + add_entry_to_asynchronous_insert_log(entry, query_by_format, "", 0, entry->chunk.byteSize()); + + auto exception = getCurrentExceptionMessage(false); + auto flush_time = std::chrono::system_clock::now(); + appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, exception); + } throw; } - auto add_entry_to_asynchronous_insert_log = [&](const auto & entry, - const auto & entry_query_for_logging, - const auto & exception, - size_t num_rows, - size_t num_bytes, - Milliseconds timeout_ms) - { - if (!async_insert_log) - return; - - AsynchronousInsertLogElement elem; - elem.event_time = timeInSeconds(entry->create_time); - elem.event_time_microseconds = timeInMicroseconds(entry->create_time); - elem.query_for_logging = entry_query_for_logging; - elem.database = query_database; - elem.table = query_table; - elem.format = entry->format; - elem.query_id = entry->query_id; - elem.bytes = num_bytes; - elem.rows = num_rows; - elem.exception = exception; - elem.data_kind = entry->chunk.getDataKind(); - elem.timeout_milliseconds = timeout_ms.count(); - elem.flush_query_id = insert_query_id; - - /// If there was a parsing error, - /// the entry won't be flushed anyway, - /// so add the log element immediately. - if (!elem.exception.empty()) - { - elem.status = AsynchronousInsertLogElement::ParsingError; - async_insert_log->add(std::move(elem)); - } - else - { - log_elements.push_back(elem); - } - }; - - auto finish_entries = [&] + auto finish_entries = [&](size_t num_rows, size_t num_bytes) { for (const auto & entry : data->entries) { @@ -877,6 +902,13 @@ try auto flush_time = std::chrono::system_clock::now(); appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, ""); } + + LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str); + queue_shard_flush_time_history.updateWithCurrentTime(); + + bool pulling_pipeline = false; + logQueryFinish( + query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal); }; try @@ -891,20 +923,9 @@ try ProfileEvents::increment(ProfileEvents::AsyncInsertRows, chunk.getNumRows()); - auto log_and_add_finish_to_query_log = [&](size_t num_rows, size_t num_bytes) - { - LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str); - queue_shard_flush_time_history.updateWithCurrentTime(); - - bool pulling_pipeline = false; - logQueryFinish( - query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal); - }; - if (chunk.getNumRows() == 0) { - finish_entries(); - log_and_add_finish_to_query_log(0, 0); + finish_entries(0, 0); return; } @@ -917,7 +938,7 @@ try CompletedPipelineExecutor completed_executor(pipeline); completed_executor.execute(); - log_and_add_finish_to_query_log(num_rows, num_bytes); + finish_entries(num_rows, num_bytes); } catch (...) { @@ -931,8 +952,6 @@ try } throw; } - - finish_entries(); } catch (const Exception & e) { @@ -1013,7 +1032,7 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing( chunk_info->offsets.push_back(total_rows); chunk_info->tokens.push_back(entry->async_dedup_token); - add_to_async_insert_log(entry, query_for_logging, current_exception, num_rows, num_bytes, data->timeout_ms); + add_to_async_insert_log(entry, {}, current_exception, num_rows, num_bytes); current_exception.clear(); entry->resetChunk(); @@ -1036,19 +1055,7 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries( auto chunk_info = std::make_shared(); auto result_columns = header.cloneEmptyColumns(); - std::unordered_map format_to_query; - - auto get_query_by_format = [&](const String & format) -> const String & - { - auto [it, inserted] = format_to_query.try_emplace(format); - if (!inserted) - return it->second; - - auto query = key.query->clone(); - assert_cast(*query).format = format; - it->second = serializeQuery(*query, insert_context->getSettingsRef().log_queries_cut_to_length); - return it->second; - }; + auto query_by_format = getQueriesByFormat(key.query, data->entries, insert_context); for (const auto & entry : data->entries) { @@ -1057,17 +1064,19 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries( throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected entry with data kind Preprocessed. Got: {}", entry->chunk.getDataKind()); - auto columns = block->getColumns(); + Block block_to_insert = *block; + if (!isCompatibleHeader(block_to_insert, header)) + convertBlockToHeader(block_to_insert, header); + + auto columns = block_to_insert.getColumns(); for (size_t i = 0, s = columns.size(); i < s; ++i) result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size()); - total_rows += block->rows(); + total_rows += block_to_insert.rows(); chunk_info->offsets.push_back(total_rows); chunk_info->tokens.push_back(entry->async_dedup_token); - const auto & query_for_logging = get_query_by_format(entry->format); - add_to_async_insert_log(entry, query_for_logging, "", block->rows(), block->bytes(), data->timeout_ms); - + add_to_async_insert_log(entry, query_by_format, "", block_to_insert.rows(), block_to_insert.bytes()); entry->resetChunk(); } @@ -1076,6 +1085,27 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries( return chunk; } +NameToNameMap AsynchronousInsertQueue::getQueriesByFormat( + const ASTPtr & query, + const std::list & entries, + const ContextPtr & insert_context) +{ + std::unordered_map format_to_query; + auto query_copy = query->clone(); + + for (const auto & entry : entries) + { + auto [it, inserted] = format_to_query.try_emplace(entry->format); + if (!inserted) + continue; + + assert_cast(*query_copy).format = entry->format; + it->second = serializeQuery(*query_copy, insert_context->getSettingsRef().log_queries_cut_to_length); + } + + return format_to_query; +} + template void AsynchronousInsertQueue::finishWithException( const ASTPtr & query, const std::list & entries, const E & exception) diff --git a/src/Interpreters/AsynchronousInsertQueue.h b/src/Interpreters/AsynchronousInsertQueue.h index da14b43d276..9a84fe8bb12 100644 --- a/src/Interpreters/AsynchronousInsertQueue.h +++ b/src/Interpreters/AsynchronousInsertQueue.h @@ -293,6 +293,11 @@ private: const ContextPtr & insert_context, LogFunc && add_to_async_insert_log); + static NameToNameMap getQueriesByFormat( + const ASTPtr & query, + const std::list & entries, + const ContextPtr & insert_context); + template static void finishWithException(const ASTPtr & query, const std::list & entries, const E & exception); diff --git a/tests/queries/0_stateless/03229_async_insert_alter.reference b/tests/queries/0_stateless/03229_async_insert_alter.reference new file mode 100644 index 00000000000..f66021d0bfe --- /dev/null +++ b/tests/queries/0_stateless/03229_async_insert_alter.reference @@ -0,0 +1,8 @@ +42 24 0 +42 24 0 +43 34 55 +42 24 +43 34 +INSERT INTO default.t_async_insert_alter (id, v1) FORMAT Values Preprocessed Ok +INSERT INTO default.t_async_insert_alter (id, v1, value2) FORMAT Values Preprocessed Ok +INSERT INTO default.t_async_insert_alter (id, v1, value2) FORMAT Values Preprocessed FlushError diff --git a/tests/queries/0_stateless/03229_async_insert_alter.sql b/tests/queries/0_stateless/03229_async_insert_alter.sql new file mode 100644 index 00000000000..a95bbc6e55f --- /dev/null +++ b/tests/queries/0_stateless/03229_async_insert_alter.sql @@ -0,0 +1,46 @@ +-- Tags: no-parallel + +SET wait_for_async_insert = 0; +SET async_insert_busy_timeout_max_ms = 300000; +SET async_insert_busy_timeout_min_ms = 300000; +SET async_insert_use_adaptive_busy_timeout = 0; + +DROP TABLE IF EXISTS t_async_insert_alter; + +CREATE TABLE t_async_insert_alter (id Int64, v1 Int64) ENGINE = MergeTree ORDER BY id SETTINGS async_insert = 1; + +-- ADD COLUMN + +INSERT INTO t_async_insert_alter VALUES (42, 24); + +ALTER TABLE t_async_insert_alter ADD COLUMN value2 Int64; + +SYSTEM FLUSH ASYNC INSERT QUEUE; +SYSTEM FLUSH LOGS; + +SELECT * FROM t_async_insert_alter ORDER BY id; + +-- MODIFY COLUMN + +INSERT INTO t_async_insert_alter VALUES (43, 34, 55); + +ALTER TABLE t_async_insert_alter MODIFY COLUMN value2 String; + +SYSTEM FLUSH ASYNC INSERT QUEUE; +SYSTEM FLUSH LOGS; + +SELECT * FROM t_async_insert_alter ORDER BY id; + +-- DROP COLUMN + +INSERT INTO t_async_insert_alter VALUES ('100', '200', '300'); + +ALTER TABLE t_async_insert_alter DROP COLUMN value2; + +SYSTEM FLUSH ASYNC INSERT QUEUE; +SYSTEM FLUSH LOGS; + +SELECT * FROM t_async_insert_alter ORDER BY id; +SELECT query, data_kind, status FROM system.asynchronous_insert_log WHERE database = currentDatabase() AND table = 't_async_insert_alter' ORDER BY event_time_microseconds; + +DROP TABLE t_async_insert_alter; diff --git a/tests/queries/0_stateless/03229_async_insert_alter_http.reference b/tests/queries/0_stateless/03229_async_insert_alter_http.reference new file mode 100644 index 00000000000..195701d2b82 --- /dev/null +++ b/tests/queries/0_stateless/03229_async_insert_alter_http.reference @@ -0,0 +1,8 @@ +42 24 0 +42 24 0 +43 34 55 +42 24 +43 34 +INSERT INTO default.t_async_insert_alter (id, v1) FORMAT Values Parsed Ok +INSERT INTO default.t_async_insert_alter (id, v1, value2) FORMAT Values Parsed Ok +INSERT INTO default.t_async_insert_alter (id, v1, value2) FORMAT Values Parsed FlushError diff --git a/tests/queries/0_stateless/03229_async_insert_alter_http.sh b/tests/queries/0_stateless/03229_async_insert_alter_http.sh new file mode 100755 index 00000000000..18e68f51285 --- /dev/null +++ b/tests/queries/0_stateless/03229_async_insert_alter_http.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# Tags: no-parallel + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q " + DROP TABLE IF EXISTS t_async_insert_alter; + CREATE TABLE t_async_insert_alter (id Int64, v1 Int64) ENGINE = MergeTree ORDER BY id SETTINGS async_insert = 1; +" + +url="${CLICKHOUSE_URL}&async_insert=1&async_insert_busy_timeout_max_ms=300000&async_insert_busy_timeout_min_ms=300000&wait_for_async_insert=0&async_insert_use_adaptive_busy_timeout=0" + +# ADD COLUMN + +${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_async_insert_alter VALUES (42, 24)" + +$CLICKHOUSE_CLIENT -q " + ALTER TABLE t_async_insert_alter ADD COLUMN value2 Int64; + + SYSTEM FLUSH ASYNC INSERT QUEUE; + SYSTEM FLUSH LOGS; + + SELECT * FROM t_async_insert_alter ORDER BY id; +" + +# MODIFY COLUMN + +${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_async_insert_alter VALUES (43, 34, 55)" + +$CLICKHOUSE_CLIENT -q " + ALTER TABLE t_async_insert_alter MODIFY COLUMN value2 String; + + SYSTEM FLUSH ASYNC INSERT QUEUE; + SYSTEM FLUSH LOGS; + + SELECT * FROM t_async_insert_alter ORDER BY id; +" + +## DROP COLUMN + +${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_async_insert_alter VALUES ('100', '200', '300')" + +$CLICKHOUSE_CLIENT -q " + ALTER TABLE t_async_insert_alter DROP COLUMN value2; + + SYSTEM FLUSH ASYNC INSERT QUEUE; + SYSTEM FLUSH LOGS; + + SELECT * FROM t_async_insert_alter ORDER BY id; + SELECT query, data_kind, status FROM system.asynchronous_insert_log WHERE database = currentDatabase() AND table = 't_async_insert_alter' ORDER BY event_time_microseconds; + + DROP TABLE t_async_insert_alter; +"