Merge pull request #49824 from AVMusorin/allow-alias-column-kafka

KafkaEngine: Allow usage of Alias column type
This commit is contained in:
Robert Schulze 2023-05-15 23:40:03 +02:00 committed by GitHub
commit 59bc3e25be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 22 deletions

View File

@ -19,8 +19,8 @@ Kafka lets you:
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1],
name2 [type2],
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS

View File

@ -383,15 +383,6 @@ NamesAndTypesList ColumnsDescription::getEphemeral() const
return ret;
}
NamesAndTypesList ColumnsDescription::getWithDefaultExpression() const
{
NamesAndTypesList ret;
for (const auto & col : columns)
if (col.default_desc.expression)
ret.emplace_back(col.name, col.type);
return ret;
}
NamesAndTypesList ColumnsDescription::getAll() const
{
NamesAndTypesList ret;

View File

@ -132,7 +132,6 @@ public:
NamesAndTypesList getInsertable() const; /// ordinary + ephemeral
NamesAndTypesList getAliases() const;
NamesAndTypesList getEphemeral() const;
NamesAndTypesList getWithDefaultExpression() const; // columns with default expression, for example set by `CREATE TABLE` statement
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + ephemeral
/// Returns .size0/.null/...

View File

@ -41,6 +41,7 @@
#include <Common/setThreadName.h>
#include <Formats/FormatFactory.h>
#include "Storages/ColumnDefault.h"
#include "config_version.h"
#include <Common/CurrentMetrics.h>
@ -966,9 +967,18 @@ void registerStorageKafka(StorageFactory & factory)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1");
}
if (args.columns.getOrdinary() != args.columns.getAll() || !args.columns.getWithDefaultExpression().empty())
NamesAndTypesList supported_columns;
for (const auto & column : args.columns)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns. "
if (column.default_desc.kind == ColumnDefaultKind::Alias)
supported_columns.emplace_back(column.name, column.type);
if (column.default_desc.kind == ColumnDefaultKind::Default && !column.default_desc.expression)
supported_columns.emplace_back(column.name, column.type);
}
// Kafka engine allows only ordinary columns without default expression or alias columns.
if (args.columns.getAll() != supported_columns)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns. "
"See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration");
}

View File

@ -285,11 +285,11 @@ def avro_confluent_message(schema_registry_client, value):
# Tests
def test_kafka_prohibited_column_types(kafka_cluster):
def test_kafka_column_types(kafka_cluster):
def assert_returned_exception(e):
assert e.value.returncode == 36
assert (
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns."
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns."
in str(e.value)
)
@ -314,17 +314,39 @@ def test_kafka_prohibited_column_types(kafka_cluster):
assert_returned_exception(exception)
# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
instance.query(
"""
CREATE TABLE test.kafka (a Int, b String Alias toString(a))
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
SETTINGS kafka_commit_on_select = 1;
"""
)
assert_returned_exception(exception)
)
messages = []
for i in range(5):
messages.append(json.dumps({"a": i}))
kafka_produce(kafka_cluster, "new", messages)
result = ""
expected = TSV(
"""
0\t0
1\t1
2\t2
3\t3
4\t4
"""
)
retries = 50
while retries > 0:
result += instance.query("SELECT a, b FROM test.kafka", ignore_error=True)
if TSV(result) == expected:
break
retries -= 1
assert TSV(result) == expected
instance.query("DROP TABLE test.kafka SYNC")
# check MATERIALIZED
# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""