This commit is contained in:
Mikhail Filimonov 2021-06-03 20:38:12 +02:00
parent cc2ef7daea
commit e11f7db28b
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
2 changed files with 69 additions and 1 deletions

View File

@ -1858,7 +1858,7 @@ class ClickHouseInstance:
wait_duration = time.time() - start_time
logging.debug('{} log line matching "{}" appeared in a {} seconds'.format(repetitions, regexp, wait_duration))
logging.debug('{} log line(s) matching "{}" appeared in a {:.3f} seconds'.format(repetitions, regexp, wait_duration))
return wait_duration
def file_exists(self, path):

View File

@ -1279,6 +1279,74 @@ def test_kafka_materialized_view(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(180)
def test_kafka_recreate_kafka_table(kafka_cluster):
'''
Checks that materialized view work properly after dropping and recreating the Kafka table.
'''
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
topic_list = []
topic_list.append(NewTopic(name="recreate_kafka_table", num_partitions=6, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'recreate_kafka_table',
kafka_group_name = 'recreate_kafka_table_group',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 6,
kafka_flush_interval_ms = 1000,
kafka_skip_broken_messages = 1048577;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;
''')
messages = []
for i in range(120):
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('recreate_kafka_table', messages)
instance.wait_for_log_line('kafka.*Committed offset [0-9]+.*recreate_kafka_table', repetitions=6, look_behind_lines=100)
instance.query('''
DROP TABLE test.kafka;
''')
kafka_produce('recreate_kafka_table', messages)
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'recreate_kafka_table',
kafka_group_name = 'recreate_kafka_table_group',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 6,
kafka_flush_interval_ms = 1000,
kafka_skip_broken_messages = 1048577;
''')
instance.wait_for_log_line('kafka.*Committed offset [0-9]+.*recreate_kafka_table', repetitions=6, look_behind_lines=100)
# data was not flushed yet (it will be flushed 7.5 sec after creating MV)
assert int(instance.query("SELECT count() FROM test.view")) == 240
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
@pytest.mark.timeout(180)
def test_librdkafka_compression(kafka_cluster):
"""