mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Allow using Alias column type for KafkaEngine
``` create table kafka ( a UInt32, a_str String Alias toString(a) ) engine = Kafka; create table data ( a UInt32; a_str String ) engine = MergeTree order by tuple(); create materialized view data_mv to data ( a UInt32, a_str String ) as select a, a_str from kafka; ``` Alias type works as expected in comparison with MATERIALIZED/EPHEMERAL or column with default expression. Ref: https://github.com/ClickHouse/ClickHouse/pull/47138 Co-authored-by: Azat Khuzhin <a3at.mail@gmail.com>
This commit is contained in:
parent
25912a2673
commit
418a61a68c
@ -19,8 +19,8 @@ Kafka lets you:
|
|||||||
``` sql
|
``` sql
|
||||||
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
||||||
(
|
(
|
||||||
name1 [type1],
|
name1 [type1] [ALIAS expr1],
|
||||||
name2 [type2],
|
name2 [type2] [ALIAS expr2],
|
||||||
...
|
...
|
||||||
) ENGINE = Kafka()
|
) ENGINE = Kafka()
|
||||||
SETTINGS
|
SETTINGS
|
||||||
|
@ -383,15 +383,6 @@ NamesAndTypesList ColumnsDescription::getEphemeral() const
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
NamesAndTypesList ColumnsDescription::getWithDefaultExpression() const
|
|
||||||
{
|
|
||||||
NamesAndTypesList ret;
|
|
||||||
for (const auto & col : columns)
|
|
||||||
if (col.default_desc.expression)
|
|
||||||
ret.emplace_back(col.name, col.type);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
NamesAndTypesList ColumnsDescription::getAll() const
|
NamesAndTypesList ColumnsDescription::getAll() const
|
||||||
{
|
{
|
||||||
NamesAndTypesList ret;
|
NamesAndTypesList ret;
|
||||||
|
@ -132,7 +132,6 @@ public:
|
|||||||
NamesAndTypesList getInsertable() const; /// ordinary + ephemeral
|
NamesAndTypesList getInsertable() const; /// ordinary + ephemeral
|
||||||
NamesAndTypesList getAliases() const;
|
NamesAndTypesList getAliases() const;
|
||||||
NamesAndTypesList getEphemeral() const;
|
NamesAndTypesList getEphemeral() const;
|
||||||
NamesAndTypesList getWithDefaultExpression() const; // columns with default expression, for example set by `CREATE TABLE` statement
|
|
||||||
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
|
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
|
||||||
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + ephemeral
|
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + ephemeral
|
||||||
/// Returns .size0/.null/...
|
/// Returns .size0/.null/...
|
||||||
|
@ -41,6 +41,7 @@
|
|||||||
#include <Common/setThreadName.h>
|
#include <Common/setThreadName.h>
|
||||||
#include <Formats/FormatFactory.h>
|
#include <Formats/FormatFactory.h>
|
||||||
|
|
||||||
|
#include "Storages/ColumnDefault.h"
|
||||||
#include "config_version.h"
|
#include "config_version.h"
|
||||||
|
|
||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
@ -966,9 +967,18 @@ void registerStorageKafka(StorageFactory & factory)
|
|||||||
{
|
{
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1");
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1");
|
||||||
}
|
}
|
||||||
if (args.columns.getOrdinary() != args.columns.getAll() || !args.columns.getWithDefaultExpression().empty())
|
NamesAndTypesList supported_columns;
|
||||||
|
for (const auto & column : args.columns)
|
||||||
{
|
{
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns. "
|
if (column.default_desc.kind == ColumnDefaultKind::Alias)
|
||||||
|
supported_columns.emplace_back(column.name, column.type);
|
||||||
|
if (column.default_desc.kind == ColumnDefaultKind::Default && !column.default_desc.expression)
|
||||||
|
supported_columns.emplace_back(column.name, column.type);
|
||||||
|
}
|
||||||
|
// Kafka engine allows only ordinary columns without default expression or alias columns.
|
||||||
|
if (args.columns.getAll() != supported_columns)
|
||||||
|
{
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns. "
|
||||||
"See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration");
|
"See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,11 +285,11 @@ def avro_confluent_message(schema_registry_client, value):
|
|||||||
# Tests
|
# Tests
|
||||||
|
|
||||||
|
|
||||||
def test_kafka_prohibited_column_types(kafka_cluster):
|
def test_kafka_column_types(kafka_cluster):
|
||||||
def assert_returned_exception(e):
|
def assert_returned_exception(e):
|
||||||
assert e.value.returncode == 36
|
assert e.value.returncode == 36
|
||||||
assert (
|
assert (
|
||||||
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns."
|
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns."
|
||||||
in str(e.value)
|
in str(e.value)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -314,17 +314,39 @@ def test_kafka_prohibited_column_types(kafka_cluster):
|
|||||||
assert_returned_exception(exception)
|
assert_returned_exception(exception)
|
||||||
|
|
||||||
# check ALIAS
|
# check ALIAS
|
||||||
with pytest.raises(QueryRuntimeException) as exception:
|
|
||||||
instance.query(
|
instance.query(
|
||||||
"""
|
"""
|
||||||
CREATE TABLE test.kafka (a Int, b String Alias toString(a))
|
CREATE TABLE test.kafka (a Int, b String Alias toString(a))
|
||||||
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
|
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
|
||||||
|
SETTINGS kafka_commit_on_select = 1;
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
assert_returned_exception(exception)
|
messages = []
|
||||||
|
for i in range(5):
|
||||||
|
messages.append(json.dumps({"a": i}))
|
||||||
|
kafka_produce(kafka_cluster, "new", messages)
|
||||||
|
result = ""
|
||||||
|
expected = TSV(
|
||||||
|
"""
|
||||||
|
0\t0
|
||||||
|
1\t1
|
||||||
|
2\t2
|
||||||
|
3\t3
|
||||||
|
4\t4
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
retries = 50
|
||||||
|
while retries > 0:
|
||||||
|
result += instance.query("SELECT a, b FROM test.kafka", ignore_error=True)
|
||||||
|
if TSV(result) == expected:
|
||||||
|
break
|
||||||
|
retries -= 1
|
||||||
|
|
||||||
|
assert TSV(result) == expected
|
||||||
|
|
||||||
|
instance.query("DROP TABLE test.kafka SYNC")
|
||||||
|
|
||||||
# check MATERIALIZED
|
# check MATERIALIZED
|
||||||
# check ALIAS
|
|
||||||
with pytest.raises(QueryRuntimeException) as exception:
|
with pytest.raises(QueryRuntimeException) as exception:
|
||||||
instance.query(
|
instance.query(
|
||||||
"""
|
"""
|
||||||
|
Loading…
Reference in New Issue
Block a user