From 0f2ae721411afc2b3150bde3a4a12c5426118f6f Mon Sep 17 00:00:00 2001 From: AVMusorin Date: Thu, 2 Mar 2023 13:51:21 +0100 Subject: [PATCH 1/5] prohibit DEFAULT/EPHEMERAL/ALIAS in KafkaEngine --- .../table-engines/integrations/kafka.md | 4 ++ .../sql-reference/statements/create/table.md | 4 ++ src/Storages/ColumnsDescription.cpp | 9 ++++ src/Storages/ColumnsDescription.h | 3 ++ src/Storages/Kafka/StorageKafka.cpp | 5 ++ tests/integration/test_storage_kafka/test.py | 50 +++++++++++++++++++ 6 files changed, 75 insertions(+) diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md index ef422632d3e..e6134043b8e 100644 --- a/docs/en/engines/table-engines/integrations/kafka.md +++ b/docs/en/engines/table-engines/integrations/kafka.md @@ -125,6 +125,10 @@ Groups are flexible and synced on the cluster. For instance, if you have 10 topi 2. Create a table with the desired structure. 3. Create a materialized view that converts data from the engine and puts it into a previously created table. +:::info +Kafka Engine doesn't support columns with default value of type `DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS`. If you need columns with any default type, they can be added at `MATERIALIZED VIEW` level. +::: + When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background. This allows you to continually receive messages from Kafka and convert them to the required format using `SELECT`. One kafka table can have as many materialized views as you like, they do not read data from the kafka table directly, but receive new records (in blocks), this way you can write to several tables with different detail level (with grouping - aggregation and without). diff --git a/docs/en/sql-reference/statements/create/table.md b/docs/en/sql-reference/statements/create/table.md index 9e66afba613..ba495b0eed5 100644 --- a/docs/en/sql-reference/statements/create/table.md +++ b/docs/en/sql-reference/statements/create/table.md @@ -121,6 +121,10 @@ If the data type and default expression are defined explicitly, this expression Default expressions may be defined as an arbitrary expression from table constants and columns. When creating and changing the table structure, it checks that expressions do not contain loops. For INSERT, it checks that expressions are resolvable – that all columns they can be calculated from have been passed. +:::info +Kafka Engine doesn't support columns with default value of type `DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS`. If you need columns with any default type, they can be added at `MATERIALIZED VIEW` level, see [Kafka Engine](../../../engines/table-engines/integrations/kafka.md#description). +::: + ### DEFAULT `DEFAULT expr` diff --git a/src/Storages/ColumnsDescription.cpp b/src/Storages/ColumnsDescription.cpp index d401840eec7..fa39e304925 100644 --- a/src/Storages/ColumnsDescription.cpp +++ b/src/Storages/ColumnsDescription.cpp @@ -383,6 +383,15 @@ NamesAndTypesList ColumnsDescription::getEphemeral() const return ret; } +NamesAndTypesList ColumnsDescription::getWithDefaultExpression() const +{ + NamesAndTypesList ret; + for (const auto & col : columns) + if (col.default_desc.expression) + ret.emplace_back(col.name, col.type); + return ret; +} + NamesAndTypesList ColumnsDescription::getAll() const { NamesAndTypesList ret; diff --git a/src/Storages/ColumnsDescription.h b/src/Storages/ColumnsDescription.h index 4f874f4b850..36109392ab6 100644 --- a/src/Storages/ColumnsDescription.h +++ b/src/Storages/ColumnsDescription.h @@ -132,6 +132,9 @@ public: NamesAndTypesList getInsertable() const; /// ordinary + ephemeral NamesAndTypesList getAliases() const; NamesAndTypesList getEphemeral() const; + // Columns with preset default expression. + // For example from `CREATE TABLE` statement + NamesAndTypesList getWithDefaultExpression() const; NamesAndTypesList getAllPhysical() const; /// ordinary + materialized. NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + ephemeral /// Returns .size0/.null/... diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 7b97273d8af..2afdc0dda8a 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -959,6 +959,11 @@ void registerStorageKafka(StorageFactory & factory) { throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1"); } + if (args.columns.getOrdinary() != args.columns.getAll() || !args.columns.getWithDefaultExpression().empty()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns. " + "See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration"); + } return std::make_shared(args.table_id, args.getContext(), args.columns, std::move(kafka_settings), collection_name); }; diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 9f617369859..51952ac1eb7 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -285,6 +285,56 @@ def avro_confluent_message(schema_registry_client, value): # Tests +def test_kafka_prohibited_column_types(kafka_cluster): + def assert_returned_exception(e): + assert e.value.returncode == 36 + assert ( + "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns." + in str(e.value) + ) + + # check column with DEFAULT expression + with pytest.raises(QueryRuntimeException) as exception: + instance.query( + """ + CREATE TABLE test.kafka (a Int, b Int DEFAULT 0) + ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n') + """ + ) + assert_returned_exception(exception) + + # check EPHEMERAL + with pytest.raises(QueryRuntimeException) as exception: + instance.query( + """ + CREATE TABLE test.kafka (a Int, b Int EPHEMERAL) + ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n') + """ + ) + assert_returned_exception(exception) + + # check ALIAS + with pytest.raises(QueryRuntimeException) as exception: + instance.query( + """ + CREATE TABLE test.kafka (a Int, b String Alias toString(a)) + ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n') + """ + ) + assert_returned_exception(exception) + + # check MATERIALIZED + # check ALIAS + with pytest.raises(QueryRuntimeException) as exception: + instance.query( + """ + CREATE TABLE test.kafka (a Int, b String MATERIALIZED toString(a)) + ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n') + """ + ) + assert_returned_exception(exception) + + def test_kafka_settings_old_syntax(kafka_cluster): assert TSV( instance.query( From b5dffe7417527bb11ef5979133e092ec1ca222db Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 6 Mar 2023 19:57:09 +0100 Subject: [PATCH 2/5] Fix bug in zero copy replica which can lead to dataloss --- src/Storages/StorageReplicatedMergeTree.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 54ae8aa5a7b..9e79a715610 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8434,7 +8434,11 @@ std::pair StorageReplicatedMergeTree::unlockSharedDataByID( } else if (error_code == Coordination::Error::ZNONODE) { - LOG_TRACE(logger, "Node with parent zookeeper lock {} for part {} doesn't exist (part was unlocked before)", zookeeper_part_uniq_node, part_name); + /// We don't know what to do, because this part can be mutation part + /// with hardlinked columns. Since we don't have this information (about blobs not to remove) + /// we refuce to remove blobs. + LOG_WARNING(logger, "Node with parent zookeeper lock {} for part {} doesn't exist (part was unlocked before), refuse to remove blobs", zookeeper_part_uniq_node, part_name); + return std::make_pair{false, {}}; } else { From 1db6b9414e8b71f27bd1040f696d780f020ca1e0 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 7 Mar 2023 12:39:17 +0100 Subject: [PATCH 3/5] Update src/Storages/StorageReplicatedMergeTree.cpp Co-authored-by: Ilya Yatsishin <2159081+qoega@users.noreply.github.com> --- src/Storages/StorageReplicatedMergeTree.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 9e79a715610..61eedc18736 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8436,7 +8436,7 @@ std::pair StorageReplicatedMergeTree::unlockSharedDataByID( { /// We don't know what to do, because this part can be mutation part /// with hardlinked columns. Since we don't have this information (about blobs not to remove) - /// we refuce to remove blobs. + /// we refuse to remove blobs. LOG_WARNING(logger, "Node with parent zookeeper lock {} for part {} doesn't exist (part was unlocked before), refuse to remove blobs", zookeeper_part_uniq_node, part_name); return std::make_pair{false, {}}; } From c10cb436f41979e92679752d74dc5129637f932c Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 7 Mar 2023 12:42:42 +0100 Subject: [PATCH 4/5] Fix build --- src/Storages/StorageReplicatedMergeTree.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 61eedc18736..6c6ff30fd04 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8438,7 +8438,7 @@ std::pair StorageReplicatedMergeTree::unlockSharedDataByID( /// with hardlinked columns. Since we don't have this information (about blobs not to remove) /// we refuse to remove blobs. LOG_WARNING(logger, "Node with parent zookeeper lock {} for part {} doesn't exist (part was unlocked before), refuse to remove blobs", zookeeper_part_uniq_node, part_name); - return std::make_pair{false, {}}; + return {false, {}}; } else { From 7ce20f5cd2f702df04b2ccbc58117cee9cd974fb Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 7 Mar 2023 00:53:37 +0100 Subject: [PATCH 5/5] fix tests --- src/Interpreters/InterpreterKillQueryQuery.cpp | 4 ++++ .../Executors/PushingAsyncPipelineExecutor.cpp | 2 +- tests/integration/test_grpc_protocol/test.py | 2 -- .../02232_dist_insert_send_logs_level_hung.sh | 11 ++++++++++- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/Interpreters/InterpreterKillQueryQuery.cpp b/src/Interpreters/InterpreterKillQueryQuery.cpp index 40698386ccb..3330159aff5 100644 --- a/src/Interpreters/InterpreterKillQueryQuery.cpp +++ b/src/Interpreters/InterpreterKillQueryQuery.cpp @@ -161,6 +161,8 @@ public: if (curr_process.processed) continue; + LOG_DEBUG(&Poco::Logger::get("KillQuery"), "Will kill query {} (synchronously)", curr_process.query_id); + auto code = process_list.sendCancelToQuery(curr_process.query_id, curr_process.user, true); if (code != CancellationCode::QueryIsNotInitializedYet && code != CancellationCode::CancelSent) @@ -226,6 +228,8 @@ BlockIO InterpreterKillQueryQuery::execute() MutableColumns res_columns = header.cloneEmptyColumns(); for (const auto & query_desc : queries_to_stop) { + if (!query.test) + LOG_DEBUG(&Poco::Logger::get("KillQuery"), "Will kill query {} (asynchronously)", query_desc.query_id); auto code = (query.test) ? CancellationCode::Unknown : process_list.sendCancelToQuery(query_desc.query_id, query_desc.user, true); insertResultRow(query_desc.source_num, code, processes_block, header, res_columns); } diff --git a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp index 70815bb8b3b..4478f1548a4 100644 --- a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp @@ -187,7 +187,7 @@ void PushingAsyncPipelineExecutor::push(Chunk chunk) if (!is_pushed) throw Exception(ErrorCodes::LOGICAL_ERROR, - "Pipeline for PushingPipelineExecutor was finished before all data was inserted"); + "Pipeline for PushingAsyncPipelineExecutor was finished before all data was inserted"); } void PushingAsyncPipelineExecutor::push(Block block) diff --git a/tests/integration/test_grpc_protocol/test.py b/tests/integration/test_grpc_protocol/test.py index a1bc0d42a46..137d585f7d1 100644 --- a/tests/integration/test_grpc_protocol/test.py +++ b/tests/integration/test_grpc_protocol/test.py @@ -594,8 +594,6 @@ def test_cancel_while_processing_input(): stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel) result = stub.ExecuteQueryWithStreamInput(send_query_info()) assert result.cancelled == True - assert result.progress.written_rows == 6 - assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n6\n" def test_cancel_while_generating_output(): diff --git a/tests/queries/0_stateless/02232_dist_insert_send_logs_level_hung.sh b/tests/queries/0_stateless/02232_dist_insert_send_logs_level_hung.sh index 5ed94148bc1..734cef06214 100755 --- a/tests/queries/0_stateless/02232_dist_insert_send_logs_level_hung.sh +++ b/tests/queries/0_stateless/02232_dist_insert_send_logs_level_hung.sh @@ -49,7 +49,16 @@ insert_client_opts=( timeout 250s $CLICKHOUSE_CLIENT "${client_opts[@]}" "${insert_client_opts[@]}" -q "insert into function remote('127.2', currentDatabase(), in_02232) select * from numbers(1e6)" # Kill underlying query of remote() to make KILL faster -timeout 60s $CLICKHOUSE_CLIENT "${client_opts[@]}" -q "KILL QUERY WHERE Settings['log_comment'] = '$CLICKHOUSE_LOG_COMMENT' SYNC" --format Null +# This test is reproducing very interesting bahaviour. +# The block size is 1, so the secondary query creates InterpreterSelectQuery for each row due to pushing to the MV. +# It works extremely slow, and the initial query produces new blocks and writes them to the socket much faster +# then the secondary query can read and process them. Therefore, it fills network buffers in the kernel. +# Once a buffer in the kernel is full, send(...) blocks until the secondary query will finish processing data +# that it already has in ReadBufferFromPocoSocket and call recv. +# Or until the kernel will decide to resize the buffer (seems like it has non-trivial rules for that). +# Anyway, it may look like the initial query got stuck, but actually it did not. +# Moreover, the initial query cannot be killed at that point, so KILL QUERY ... SYNC will get "stuck" as well. +timeout 30s $CLICKHOUSE_CLIENT "${client_opts[@]}" -q "KILL QUERY WHERE query like '%INSERT INTO $CLICKHOUSE_DATABASE.in_02232%' SYNC" --format Null echo $? $CLICKHOUSE_CLIENT "${client_opts[@]}" -nm -q "