The Kafka table engine allows global configuration and per-Kafka-topic
configuration. The latter uses syntax <kafka_TOPIC>, e.g. for topic
"football":
<kafka_football>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_football>
Some users had to find out the hard way that such configuration doesn't
take effect if the topic name contains a period, e.g. "sports.football".
The reason is that ClickHouse configuration framework already uses
periods as level separators to descend the configuration hierarchy.
(Besides that, per-topic configuration at the same level as global
configuration could be considered ugly.)
Note that Kafka topics may contain characters "a-zA-Z0-9._-" (*) and
a tree-like topic organization using periods is quite common in
practice.
This PR deprecates the existing per-topic configuration syntax (but
continues to support it for backward compat) and introduces a new
per-topic configuration syntax below the global Kafka configuration of
the form:
<kafka>
<topic name="football">
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</topic>
</kafka>
The period restriction doesn't apply to XML attributes, so <topic
name="sports.football"> will work. Also, everything Kafka-related is
below <kafka>.
Considered but rejected alternatives:
- Extending Poco ConfigurationView with custom separators (e.g."/"
instead of "."). Won't work easily because ConfigurationView only
builds a path but defers descending the configuration tree to the
normal configuration classes.
- Reloading the configuration file in StorageKafka (instead of reading
the loaded file) but with a custom separator. This mode is supported
by XML configuration. Too ugly and error-prone since the true
configuration is composed from multiple configuration files.
(*) https://stackoverflow.com/a/37067544
This test uses predefined timestamps, and default retention.ms is too
small:
kafka1_1 | [2021-12-28 21:40:21,842] INFO Created log for partition virt2_0-0 in /var/lib/kafka/data with properties {compression.type -> producer, message.format.version -> 2.2-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
kafka1_1 | [2021-12-28 21:40:24,540] INFO [Log partition=virt2_0-0, dir=/var/lib/kafka/data] Found deletable segments with base offsets [0] due to retention time 604800000ms breach (kafka.log.Log)
v2: fix tests with 0 timestamp_ms
and this should be all tests that was left:
$ fgrep 'Found deletable segments with base offsets' test_storage_kafka/_instances_0/docker.log
kafka1_1 | [2021-12-29 09:46:15,610] INFO [Log partition=avro1-0, dir=/var/lib/kafka/data] Found deletable segments with base offsets [0] due to retention time 604800000ms breach (kafka.log.Log)
kafka1_1 | [2021-12-29 09:51:15,609] INFO [Log partition=virt1-0, dir=/var/lib/kafka/data] Found deletable segments with base offsets [0] due to retention time 604800000ms breach (kafka.log.Log)