mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Fix test_storage_kafka failures by adjusting retention.ms
This test uses predefined timestamps, and default retention.ms is too small: kafka1_1 | [2021-12-28 21:40:21,842] INFO Created log for partition virt2_0-0 in /var/lib/kafka/data with properties {compression.type -> producer, message.format.version -> 2.2-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager) kafka1_1 | [2021-12-28 21:40:24,540] INFO [Log partition=virt2_0-0, dir=/var/lib/kafka/data] Found deletable segments with base offsets [0] due to retention time 604800000ms breach (kafka.log.Log) v2: fix tests with 0 timestamp_ms and this should be all tests that was left: $ fgrep 'Found deletable segments with base offsets' test_storage_kafka/_instances_0/docker.log kafka1_1 | [2021-12-29 09:46:15,610] INFO [Log partition=avro1-0, dir=/var/lib/kafka/data] Found deletable segments with base offsets [0] due to retention time 604800000ms breach (kafka.log.Log) kafka1_1 | [2021-12-29 09:51:15,609] INFO [Log partition=virt1-0, dir=/var/lib/kafka/data] Found deletable segments with base offsets [0] due to retention time 604800000ms breach (kafka.log.Log)
This commit is contained in:
parent
c5fe3a8f8a
commit
52ad7a4538
@ -1498,6 +1498,13 @@ def test_kafka_flush_on_big_message(kafka_cluster):
|
||||
|
||||
|
||||
def test_kafka_virtual_columns(kafka_cluster):
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
topic_config = {
|
||||
# default retention, since predefined timestamp_ms is used.
|
||||
'retention.ms': '-1',
|
||||
}
|
||||
kafka_create_topic(admin_client, "virt1", config=topic_config)
|
||||
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka
|
||||
@ -1530,6 +1537,13 @@ def test_kafka_virtual_columns(kafka_cluster):
|
||||
|
||||
|
||||
def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
topic_config = {
|
||||
# default retention, since predefined timestamp_ms is used.
|
||||
'retention.ms': '-1',
|
||||
}
|
||||
kafka_create_topic(admin_client, "virt2", config=topic_config)
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
DROP TABLE IF EXISTS test.consumer;
|
||||
@ -1738,8 +1752,12 @@ def test_kafka_commit_on_block_write(kafka_cluster):
|
||||
def test_kafka_virtual_columns2(kafka_cluster):
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
|
||||
kafka_create_topic(admin_client, "virt2_0", num_partitions=2)
|
||||
kafka_create_topic(admin_client, "virt2_1", num_partitions=2)
|
||||
topic_config = {
|
||||
# default retention, since predefined timestamp_ms is used.
|
||||
'retention.ms': '-1',
|
||||
}
|
||||
kafka_create_topic(admin_client, "virt2_0", num_partitions=2, config=topic_config)
|
||||
kafka_create_topic(admin_client, "virt2_1", num_partitions=2, config=topic_config)
|
||||
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (value UInt64)
|
||||
@ -1867,6 +1885,13 @@ def test_kafka_produce_key_timestamp(kafka_cluster):
|
||||
|
||||
|
||||
def test_kafka_insert_avro(kafka_cluster):
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
topic_config = {
|
||||
# default retention, since predefined timestamp_ms is used.
|
||||
'retention.ms': '-1',
|
||||
}
|
||||
kafka_create_topic(admin_client, "avro1", config=topic_config)
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.kafka;
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64, _timestamp DateTime('UTC'))
|
||||
|
Loading…
Reference in New Issue
Block a user