diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md index ab69e4e90ce..ccfca4c1f1f 100644 --- a/docs/en/engines/table-engines/integrations/kafka.md +++ b/docs/en/engines/table-engines/integrations/kafka.md @@ -19,8 +19,8 @@ Kafka lets you: ``` sql CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( - name1 [type1], - name2 [type2], + name1 [type1] [ALIAS expr1], + name2 [type2] [ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS diff --git a/src/Storages/ColumnsDescription.cpp b/src/Storages/ColumnsDescription.cpp index 21b140bd73a..8eabae7929c 100644 --- a/src/Storages/ColumnsDescription.cpp +++ b/src/Storages/ColumnsDescription.cpp @@ -383,15 +383,6 @@ NamesAndTypesList ColumnsDescription::getEphemeral() const return ret; } -NamesAndTypesList ColumnsDescription::getWithDefaultExpression() const -{ - NamesAndTypesList ret; - for (const auto & col : columns) - if (col.default_desc.expression) - ret.emplace_back(col.name, col.type); - return ret; -} - NamesAndTypesList ColumnsDescription::getAll() const { NamesAndTypesList ret; diff --git a/src/Storages/ColumnsDescription.h b/src/Storages/ColumnsDescription.h index e5ec867cd64..365a999673e 100644 --- a/src/Storages/ColumnsDescription.h +++ b/src/Storages/ColumnsDescription.h @@ -132,7 +132,6 @@ public: NamesAndTypesList getInsertable() const; /// ordinary + ephemeral NamesAndTypesList getAliases() const; NamesAndTypesList getEphemeral() const; - NamesAndTypesList getWithDefaultExpression() const; // columns with default expression, for example set by `CREATE TABLE` statement NamesAndTypesList getAllPhysical() const; /// ordinary + materialized. NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + ephemeral /// Returns .size0/.null/... diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 3381561eb1b..7d504833a0a 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -41,6 +41,7 @@ #include #include +#include "Storages/ColumnDefault.h" #include "config_version.h" #include @@ -966,9 +967,18 @@ void registerStorageKafka(StorageFactory & factory) { throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1"); } - if (args.columns.getOrdinary() != args.columns.getAll() || !args.columns.getWithDefaultExpression().empty()) + NamesAndTypesList supported_columns; + for (const auto & column : args.columns) { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns. " + if (column.default_desc.kind == ColumnDefaultKind::Alias) + supported_columns.emplace_back(column.name, column.type); + if (column.default_desc.kind == ColumnDefaultKind::Default && !column.default_desc.expression) + supported_columns.emplace_back(column.name, column.type); + } + // Kafka engine allows only ordinary columns without default expression or alias columns. + if (args.columns.getAll() != supported_columns) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns. " "See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration"); } diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 3a4fa6c6bfe..9a6d3e0513c 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -285,11 +285,11 @@ def avro_confluent_message(schema_registry_client, value): # Tests -def test_kafka_prohibited_column_types(kafka_cluster): +def test_kafka_column_types(kafka_cluster): def assert_returned_exception(e): assert e.value.returncode == 36 assert ( - "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns." + "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns." in str(e.value) ) @@ -314,17 +314,39 @@ def test_kafka_prohibited_column_types(kafka_cluster): assert_returned_exception(exception) # check ALIAS - with pytest.raises(QueryRuntimeException) as exception: - instance.query( - """ + instance.query( + """ CREATE TABLE test.kafka (a Int, b String Alias toString(a)) ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n') + SETTINGS kafka_commit_on_select = 1; """ - ) - assert_returned_exception(exception) + ) + messages = [] + for i in range(5): + messages.append(json.dumps({"a": i})) + kafka_produce(kafka_cluster, "new", messages) + result = "" + expected = TSV( + """ +0\t0 +1\t1 +2\t2 +3\t3 +4\t4 + """ + ) + retries = 50 + while retries > 0: + result += instance.query("SELECT a, b FROM test.kafka", ignore_error=True) + if TSV(result) == expected: + break + retries -= 1 + + assert TSV(result) == expected + + instance.query("DROP TABLE test.kafka SYNC") # check MATERIALIZED - # check ALIAS with pytest.raises(QueryRuntimeException) as exception: instance.query( """