librdkafka usually polls both messages on the first SELECT, but sometimes only one. If it polls only one message first, then it will read two messages before rebalancing at the second SELECT from the table. This means it usually reads a single message twice (thus num_messages_read = 4 is usually fine as 1 discarded message + 3 actually consumed message). But when only one message is read in the first SELECT, then 2 messages are discarded, thus num_messages_read will be 5 as 2 discarded message + 3 actually consumed messages.
Simplify and do some refactoring for kafka client settings.
Allows to set up separate
settings for consumer and producer like:
```
<consumer>
...
</consumer>
<producer>
<kafka_topic>
<name>topic_name</name>
...
</kafka_topic>
</producer>
```
Moreover, this fixes warnings from kafka client like:
`Configuration property session.timeout.ms is a consumer property and
will be ignored by this producer instance`
It might take a few minutes to receive all the messages for the last
materialized view. By waiting to the number of expected results the
happy path of execution takes minimal time while becoming more stable,
while the erroneous path might take a bit longer.
```
create table kafka
(
a UInt32,
a_str String Alias toString(a)
) engine = Kafka;
create table data
(
a UInt32;
a_str String
) engine = MergeTree
order by tuple();
create materialized view data_mv to data
(
a UInt32,
a_str String
) as
select a, a_str from kafka;
```
Alias type works as expected in comparison with MATERIALIZED/EPHEMERAL
or column with default expression.
Ref: https://github.com/ClickHouse/ClickHouse/pull/47138
Co-authored-by: Azat Khuzhin <a3at.mail@gmail.com>
The Kafka table engine allows global configuration and per-Kafka-topic
configuration. The latter uses syntax <kafka_TOPIC>, e.g. for topic
"football":
<kafka_football>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_football>
Some users had to find out the hard way that such configuration doesn't
take effect if the topic name contains a period, e.g. "sports.football".
The reason is that ClickHouse configuration framework already uses
periods as level separators to descend the configuration hierarchy.
(Besides that, per-topic configuration at the same level as global
configuration could be considered ugly.)
Note that Kafka topics may contain characters "a-zA-Z0-9._-" (*) and
a tree-like topic organization using periods is quite common in
practice.
This PR deprecates the existing per-topic configuration syntax (but
continues to support it for backward compat) and introduces a new
per-topic configuration syntax below the global Kafka configuration of
the form:
<kafka>
<topic name="football">
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</topic>
</kafka>
The period restriction doesn't apply to XML attributes, so <topic
name="sports.football"> will work. Also, everything Kafka-related is
below <kafka>.
Considered but rejected alternatives:
- Extending Poco ConfigurationView with custom separators (e.g."/"
instead of "."). Won't work easily because ConfigurationView only
builds a path but defers descending the configuration tree to the
normal configuration classes.
- Reloading the configuration file in StorageKafka (instead of reading
the loaded file) but with a custom separator. This mode is supported
by XML configuration. Too ugly and error-prone since the true
configuration is composed from multiple configuration files.
(*) https://stackoverflow.com/a/37067544