Add test on virtual columns and materialized view

This commit is contained in:
Ivan Lezhankin 2019-06-21 20:43:39 +03:00
parent b989d45818
commit 892a82e5ff

View File

@ -389,11 +389,48 @@ def test_kafka_virtual_columns(kafka_cluster):
result = ''
for i in range(50):
result += instance.query('SELECT _key, key, _topic, value, _offset FROM test.kafka')
if kafka_check_result(result):
if kafka_check_result(result, False, 'test_kafka_virtual.reference'):
break
kafka_check_result(result, True, 'test_kafka_virtual.reference')
def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json3',
kafka_group_name = 'json3',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT *, _topic, _offset FROM test.kafka;
''')
messages = []
for i in range(50):
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('json3', messages)
for i in range(20):
time.sleep(1)
result = instance.query('SELECT kafka_key, key, topic, value, offset FROM test.view')
if kafka_check_result(result, False, 'test_kafka_virtual.reference'):
break
kafka_check_result(result, True, 'test_kafka_virtual.reference')
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")