prohibit DEFAULT/EPHEMERAL/ALIAS in KafkaEngine

This commit is contained in:
AVMusorin 2023-03-02 13:51:21 +01:00
parent 866c318c12
commit 0f2ae72141
No known key found for this signature in database
GPG Key ID: 6D8C8B3C90094A13
6 changed files with 75 additions and 0 deletions

View File

@ -125,6 +125,10 @@ Groups are flexible and synced on the cluster. For instance, if you have 10 topi
2. Create a table with the desired structure.
3. Create a materialized view that converts data from the engine and puts it into a previously created table.
:::info
Kafka Engine doesn't support columns with default value of type `DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS`. If you need columns with any default type, they can be added at `MATERIALIZED VIEW` level.
:::
When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background. This allows you to continually receive messages from Kafka and convert them to the required format using `SELECT`.
One kafka table can have as many materialized views as you like, they do not read data from the kafka table directly, but receive new records (in blocks), this way you can write to several tables with different detail level (with grouping - aggregation and without).

View File

@ -121,6 +121,10 @@ If the data type and default expression are defined explicitly, this expression
Default expressions may be defined as an arbitrary expression from table constants and columns. When creating and changing the table structure, it checks that expressions do not contain loops. For INSERT, it checks that expressions are resolvable that all columns they can be calculated from have been passed.
:::info
Kafka Engine doesn't support columns with default value of type `DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS`. If you need columns with any default type, they can be added at `MATERIALIZED VIEW` level, see [Kafka Engine](../../../engines/table-engines/integrations/kafka.md#description).
:::
### DEFAULT
`DEFAULT expr`

View File

@ -383,6 +383,15 @@ NamesAndTypesList ColumnsDescription::getEphemeral() const
return ret;
}
NamesAndTypesList ColumnsDescription::getWithDefaultExpression() const
{
NamesAndTypesList ret;
for (const auto & col : columns)
if (col.default_desc.expression)
ret.emplace_back(col.name, col.type);
return ret;
}
NamesAndTypesList ColumnsDescription::getAll() const
{
NamesAndTypesList ret;

View File

@ -132,6 +132,9 @@ public:
NamesAndTypesList getInsertable() const; /// ordinary + ephemeral
NamesAndTypesList getAliases() const;
NamesAndTypesList getEphemeral() const;
// Columns with preset default expression.
// For example from `CREATE TABLE` statement
NamesAndTypesList getWithDefaultExpression() const;
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + ephemeral
/// Returns .size0/.null/...

View File

@ -959,6 +959,11 @@ void registerStorageKafka(StorageFactory & factory)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1");
}
if (args.columns.getOrdinary() != args.columns.getAll() || !args.columns.getWithDefaultExpression().empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns. "
"See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration");
}
return std::make_shared<StorageKafka>(args.table_id, args.getContext(), args.columns, std::move(kafka_settings), collection_name);
};

View File

@ -285,6 +285,56 @@ def avro_confluent_message(schema_registry_client, value):
# Tests
def test_kafka_prohibited_column_types(kafka_cluster):
def assert_returned_exception(e):
assert e.value.returncode == 36
assert (
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns."
in str(e.value)
)
# check column with DEFAULT expression
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
CREATE TABLE test.kafka (a Int, b Int DEFAULT 0)
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
"""
)
assert_returned_exception(exception)
# check EPHEMERAL
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
CREATE TABLE test.kafka (a Int, b Int EPHEMERAL)
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
"""
)
assert_returned_exception(exception)
# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
CREATE TABLE test.kafka (a Int, b String Alias toString(a))
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
"""
)
assert_returned_exception(exception)
# check MATERIALIZED
# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
CREATE TABLE test.kafka (a Int, b String MATERIALIZED toString(a))
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
"""
)
assert_returned_exception(exception)
def test_kafka_settings_old_syntax(kafka_cluster):
assert TSV(
instance.query(