ClickHouse/tests/integration/test_storage_kafka/test.py

1617 lines
56 KiB
Python
Raw Normal View History

import os.path as p
import random
import threading
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from helpers.client import QueryRuntimeException
from helpers.network import PartitionManager
import json
2018-07-26 04:36:28 +00:00
import subprocess
2019-04-18 15:52:18 +00:00
import kafka.errors
2020-05-28 13:39:55 +00:00
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
from kafka.admin import NewTopic
2020-05-28 13:39:55 +00:00
from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1
from kafka.protocol.group import MemberAssignment
import socket
from google.protobuf.internal.encoder import _VarintBytes
"""
protoc --version
libprotoc 3.0.0
# to create kafka_pb2.py
protoc --python_out=. kafka.proto
"""
import kafka_pb2
2019-01-22 12:18:18 +00:00
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
# TODO: add test for SELECT LIMIT is working.
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
config_dir='configs',
2019-10-28 14:41:24 +00:00
main_configs=['configs/kafka.xml', 'configs/log_conf.xml' ],
with_kafka=True,
with_zookeeper=True,
clickhouse_path_dir='clickhouse_path')
kafka_id = ''
2019-02-07 16:40:16 +00:00
# Helpers
2019-02-07 16:40:16 +00:00
def check_kafka_is_available():
p = subprocess.Popen(('docker',
'exec',
'-i',
2018-08-27 16:15:39 +00:00
kafka_id,
'/usr/bin/kafka-broker-api-versions',
'--bootstrap-server',
'INSIDE://localhost:9092'),
stdout=subprocess.PIPE)
2019-02-07 16:40:16 +00:00
p.communicate()
2018-07-26 04:36:28 +00:00
return p.returncode == 0
2019-02-07 16:40:16 +00:00
def wait_kafka_is_available(max_retries=50):
retries = 0
while True:
if check_kafka_is_available():
break
else:
retries += 1
if retries > max_retries:
raise "Kafka is not available"
print("Waiting for Kafka to start up")
time.sleep(1)
2019-08-06 14:18:37 +00:00
def kafka_produce(topic, messages, timestamp=None):
2019-04-02 17:34:04 +00:00
producer = KafkaProducer(bootstrap_servers="localhost:9092")
for message in messages:
2019-08-06 14:18:37 +00:00
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
2019-04-02 17:34:04 +00:00
producer.flush()
# print ("Produced {} messages for topic {}".format(len(messages), topic))
def kafka_consume(topic):
consumer = KafkaConsumer(bootstrap_servers="localhost:9092", auto_offset_reset="earliest")
consumer.subscribe(topics=(topic))
for toppar, messages in consumer.poll(5000).items():
if toppar.topic == topic:
for message in messages:
yield message.value
consumer.unsubscribe()
consumer.close()
def kafka_produce_protobuf_messages(topic, start_index, num_messages):
data = ''
for i in range(start_index, start_index + num_messages):
msg = kafka_pb2.KeyValuePair()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
producer = KafkaProducer(bootstrap_servers="localhost:9092")
producer.send(topic=topic, value=data)
producer.flush()
print("Produced {} messages for topic {}".format(num_messages, topic))
2018-07-26 04:36:28 +00:00
2019-02-11 11:54:30 +00:00
# Since everything is async and shaky when receiving messages from Kafka,
# we may want to try and check results multiple times in a loop.
2019-05-23 14:25:41 +00:00
def kafka_check_result(result, check=False, ref_file='test_kafka_json.reference'):
fpath = p.join(p.dirname(__file__), ref_file)
2019-02-07 16:40:16 +00:00
with open(fpath) as reference:
2019-02-11 11:54:30 +00:00
if check:
assert TSV(result) == TSV(reference)
else:
return TSV(result) == TSV(reference)
2019-02-07 16:40:16 +00:00
2020-05-28 13:39:55 +00:00
# https://stackoverflow.com/a/57692111/1555175
def describe_consumer_group(name):
client = BrokerConnection('localhost', 9092, socket.AF_INET)
client.connect_blocking()
list_members_in_groups = DescribeGroupsRequest_v1(groups=[name])
future = client.send(list_members_in_groups)
while not future.is_done:
for resp, f in client.recv():
f.success(resp)
(error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
res = []
for member in members:
(member_id, client_id, client_host, member_metadata, member_assignment) = member
member_info = {}
member_info['member_id'] = member_id
member_info['client_id'] = client_id
member_info['client_host'] = client_host
member_topics_assignment = []
for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
member_topics_assignment.append({'topic':topic, 'partitions':partitions})
member_info['assignment'] = member_topics_assignment
res.append(member_info)
return res
2019-02-07 16:40:16 +00:00
# Fixtures
@pytest.fixture(scope="module")
def kafka_cluster():
try:
global kafka_id
2019-02-07 16:40:16 +00:00
cluster.start()
kafka_id = instance.cluster.kafka_docker_id
2019-02-11 11:54:30 +00:00
print("kafka_id is {}".format(kafka_id))
2019-02-07 16:40:16 +00:00
instance.query('CREATE DATABASE test')
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def kafka_setup_teardown():
instance.query('DROP TABLE IF EXISTS test.kafka')
wait_kafka_is_available()
2019-02-11 11:54:30 +00:00
print("kafka is available - running test")
2019-02-07 16:40:16 +00:00
yield # run test
instance.query('DROP TABLE IF EXISTS test.kafka')
2019-02-07 16:40:16 +00:00
# Tests
@pytest.mark.timeout(180)
2019-02-07 16:40:16 +00:00
def test_kafka_settings_old_syntax(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka('kafka1:19092', 'old', 'old', 'JSONEachRow', '\\n');
2019-02-07 16:40:16 +00:00
''')
# Don't insert malformed messages since old settings syntax
# doesn't support skipping of broken messages.
2019-04-02 17:34:04 +00:00
messages = []
2019-02-07 16:40:16 +00:00
for i in range(50):
2019-04-02 17:34:04 +00:00
messages.append(json.dumps({'key': i, 'value': i}))
2019-02-07 16:40:16 +00:00
kafka_produce('old', messages)
2019-02-11 11:54:30 +00:00
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
2019-02-11 11:54:30 +00:00
if kafka_check_result(result):
break
2019-02-11 11:54:30 +00:00
kafka_check_result(result, True)
2019-02-07 16:40:16 +00:00
2020-05-28 13:39:55 +00:00
members = describe_consumer_group('old')
assert members[0]['client_id'] == u'ClickHouse-instance-test-kafka'
# text_desc = kafka_cluster.exec_in_container(kafka_cluster.get_container_id('kafka1'),"kafka-consumer-groups --bootstrap-server localhost:9092 --describe --members --group old --verbose"))
@pytest.mark.timeout(180)
2019-02-07 16:40:16 +00:00
def test_kafka_settings_new_syntax(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
2019-06-23 14:48:58 +00:00
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'new',
kafka_group_name = 'new',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n',
2020-05-28 13:39:55 +00:00
kafka_client_id = '{instance} test 1234',
2019-06-23 14:48:58 +00:00
kafka_skip_broken_messages = 1;
2019-02-07 16:40:16 +00:00
''')
2019-01-22 12:18:18 +00:00
2019-04-02 17:34:04 +00:00
messages = []
2019-01-22 12:18:18 +00:00
for i in range(25):
2019-04-02 17:34:04 +00:00
messages.append(json.dumps({'key': i, 'value': i}))
2019-02-07 16:40:16 +00:00
kafka_produce('new', messages)
2019-01-22 12:18:18 +00:00
2019-02-07 16:40:16 +00:00
# Insert couple of malformed messages.
2019-04-02 17:34:04 +00:00
kafka_produce('new', ['}{very_broken_message,'])
kafka_produce('new', ['}another{very_broken_message,'])
2019-01-22 12:18:18 +00:00
2019-04-02 17:34:04 +00:00
messages = []
2019-01-22 12:18:18 +00:00
for i in range(25, 50):
2019-04-02 17:34:04 +00:00
messages.append(json.dumps({'key': i, 'value': i}))
2019-02-07 16:40:16 +00:00
kafka_produce('new', messages)
2019-01-22 12:18:18 +00:00
2019-02-11 11:54:30 +00:00
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
2019-02-11 11:54:30 +00:00
if kafka_check_result(result):
break
2019-02-11 11:54:30 +00:00
kafka_check_result(result, True)
2018-08-27 16:15:39 +00:00
2020-05-28 13:39:55 +00:00
members = describe_consumer_group('new')
assert members[0]['client_id'] == u'instance test 1234'
@pytest.mark.timeout(180)
def test_kafka_issue11308(kafka_cluster):
2020-06-06 16:48:03 +00:00
# Check that matview does respect Kafka SETTINGS
kafka_produce('issue11308', ['{"t": 123, "e": {"x": "woof"} }', '{"t": 123, "e": {"x": "woof"} }', '{"t": 124, "e": {"x": "test"} }'])
instance.query('''
CREATE TABLE test.persistent_kafka (
time UInt64,
some_string String
)
ENGINE = MergeTree()
ORDER BY time;
CREATE TABLE test.kafka (t UInt64, `e.x` String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'issue11308',
kafka_group_name = 'issue11308',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n',
kafka_flush_interval_ms=1000,
input_format_import_nested_json = 1;
CREATE MATERIALIZED VIEW test.persistent_kafka_mv TO test.persistent_kafka AS
SELECT
`t` AS `time`,
`e.x` AS `some_string`
FROM test.kafka;
''')
time.sleep(9)
result = instance.query('SELECT * FROM test.persistent_kafka ORDER BY time;')
instance.query('''
DROP TABLE test.persistent_kafka;
DROP TABLE test.persistent_kafka_mv;
''')
expected = '''\
123 woof
123 woof
124 test
'''
assert TSV(result) == TSV(expected)
@pytest.mark.timeout(180)
def test_kafka_issue4116(kafka_cluster):
2020-06-06 16:48:03 +00:00
# Check that format_csv_delimiter parameter works now - as part of all available format settings.
kafka_produce('issue4116', ['1|foo', '2|bar', '42|answer','100|multi\n101|row\n103|message'])
instance.query('''
CREATE TABLE test.kafka (a UInt64, b String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'issue4116',
kafka_group_name = 'issue4116',
kafka_format = 'CSV',
kafka_row_delimiter = '\\n',
format_csv_delimiter = '|';
''')
result = instance.query('SELECT * FROM test.kafka ORDER BY a;')
expected = '''\
1 foo
2 bar
42 answer
100 multi
101 row
103 message
'''
assert TSV(result) == TSV(expected)
@pytest.mark.timeout(180)
def test_kafka_consumer_hang(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'consumer_hang',
kafka_group_name = 'consumer_hang',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 8,
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = Memory();
CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka;
''')
time.sleep(10)
instance.query('SELECT * FROM test.view')
# This should trigger heartbeat fail,
# which will trigger REBALANCE_IN_PROGRESS,
# and which can lead to consumer hang.
kafka_cluster.pause_container('kafka1')
time.sleep(0.5)
kafka_cluster.unpause_container('kafka1')
# print("Attempt to drop")
instance.query('DROP TABLE test.kafka')
#kafka_cluster.open_bash_shell('instance')
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
2020-05-14 10:18:17 +00:00
# original problem appearance was a sequence of the following messages in librdkafka logs:
2020-05-14 10:15:38 +00:00
# BROKERFAIL -> |ASSIGN| -> REBALANCE_IN_PROGRESS -> "waiting for rebalance_cb" (repeated forever)
# so it was waiting forever while the application will execute queued rebalance callback
2020-05-14 10:15:38 +00:00
# from a user perspective: we expect no hanging 'drop' queries
# 'dr'||'op' to avoid self matching
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
@pytest.mark.timeout(180)
def test_kafka_consumer_hang2(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'consumer_hang2',
kafka_group_name = 'consumer_hang2',
kafka_format = 'JSONEachRow';
CREATE TABLE test.kafka2 (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'consumer_hang2',
kafka_group_name = 'consumer_hang2',
kafka_format = 'JSONEachRow';
''')
# first consumer subscribe the topic, try to poll some data, and go to rest
instance.query('SELECT * FROM test.kafka')
# second consumer do the same leading to rebalance in the first
# consumer, try to poll some data
instance.query('SELECT * FROM test.kafka2')
#echo 'SELECT * FROM test.kafka; SELECT * FROM test.kafka2; DROP TABLE test.kafka;' | clickhouse client -mn &
# kafka_cluster.open_bash_shell('instance')
# first consumer has pending rebalance callback unprocessed (no poll after select)
# one of those queries was failing because of
# https://github.com/edenhill/librdkafka/issues/2077
# https://github.com/edenhill/librdkafka/issues/2898
instance.query('DROP TABLE test.kafka')
instance.query('DROP TABLE test.kafka2')
# from a user perspective: we expect no hanging 'drop' queries
# 'dr'||'op' to avoid self matching
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
@pytest.mark.timeout(180)
2019-02-07 16:40:16 +00:00
def test_kafka_csv_with_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
2019-02-07 16:40:16 +00:00
ENGINE = Kafka
2019-06-23 14:48:58 +00:00
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'csv',
kafka_group_name = 'csv',
kafka_format = 'CSV',
kafka_row_delimiter = '\\n';
''')
2019-01-22 12:18:18 +00:00
2019-04-02 17:34:04 +00:00
messages = []
2019-02-07 16:40:16 +00:00
for i in range(50):
2019-04-02 17:34:04 +00:00
messages.append('{i}, {i}'.format(i=i))
2019-02-07 16:40:16 +00:00
kafka_produce('csv', messages)
2019-01-22 12:18:18 +00:00
2019-02-11 11:54:30 +00:00
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
2019-02-11 11:54:30 +00:00
if kafka_check_result(result):
break
2019-02-11 11:54:30 +00:00
kafka_check_result(result, True)
@pytest.mark.timeout(180)
2019-02-07 16:40:16 +00:00
def test_kafka_tsv_with_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
2019-06-23 14:48:58 +00:00
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'tsv',
kafka_group_name = 'tsv',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
''')
2019-01-22 12:18:18 +00:00
2019-04-02 17:34:04 +00:00
messages = []
2019-02-07 16:40:16 +00:00
for i in range(50):
2019-04-02 17:34:04 +00:00
messages.append('{i}\t{i}'.format(i=i))
2019-02-07 16:40:16 +00:00
kafka_produce('tsv', messages)
2019-01-22 12:18:18 +00:00
2019-02-11 11:54:30 +00:00
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
2019-02-11 11:54:30 +00:00
if kafka_check_result(result):
break
2019-02-11 11:54:30 +00:00
kafka_check_result(result, True)
2019-09-11 12:29:26 +00:00
@pytest.mark.timeout(180)
def test_kafka_select_empty(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'empty',
kafka_group_name = 'empty',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
''')
assert int(instance.query('SELECT count() FROM test.kafka')) == 0
@pytest.mark.timeout(180)
2019-04-02 17:34:04 +00:00
def test_kafka_json_without_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
2019-06-23 14:48:58 +00:00
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow';
2019-04-02 17:34:04 +00:00
''')
messages = ''
for i in range(25):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('json', [messages])
2019-04-03 17:46:54 +00:00
messages = ''
for i in range(25, 50):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('json', [messages])
2019-04-02 17:34:04 +00:00
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
2019-04-02 17:34:04 +00:00
if kafka_check_result(result):
break
2019-04-02 17:34:04 +00:00
kafka_check_result(result, True)
@pytest.mark.timeout(180)
def test_kafka_protobuf(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
2019-06-23 14:48:58 +00:00
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'pb',
kafka_group_name = 'pb',
kafka_format = 'Protobuf',
kafka_schema = 'kafka.proto:KeyValuePair';
''')
kafka_produce_protobuf_messages('pb', 0, 20)
kafka_produce_protobuf_messages('pb', 20, 1)
kafka_produce_protobuf_messages('pb', 21, 29)
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
if kafka_check_result(result):
break
kafka_check_result(result, True)
@pytest.mark.timeout(180)
2019-02-07 16:40:16 +00:00
def test_kafka_materialized_view(kafka_cluster):
2019-01-22 12:18:18 +00:00
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
2019-06-23 14:48:58 +00:00
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'mv',
kafka_group_name = 'mv',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
2019-01-22 12:18:18 +00:00
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;
''')
2019-04-02 17:34:04 +00:00
messages = []
2019-02-07 16:40:16 +00:00
for i in range(50):
2019-04-02 17:34:04 +00:00
messages.append(json.dumps({'key': i, 'value': i}))
2019-06-23 14:48:58 +00:00
kafka_produce('mv', messages)
2019-02-07 16:40:16 +00:00
while True:
2019-02-07 16:40:16 +00:00
result = instance.query('SELECT * FROM test.view')
2019-02-11 11:54:30 +00:00
if kafka_check_result(result):
2019-02-07 16:40:16 +00:00
break
2019-01-22 12:18:18 +00:00
instance.query('''
DROP TABLE test.consumer;
2019-02-07 16:40:16 +00:00
DROP TABLE test.view;
2019-01-22 12:18:18 +00:00
''')
kafka_check_result(result, True)
@pytest.mark.timeout(180)
def test_kafka_materialized_view_with_subquery(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'mvsq',
kafka_group_name = 'mvsq',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM (SELECT * FROM test.kafka);
''')
messages = []
for i in range(50):
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('mvsq', messages)
while True:
result = instance.query('SELECT * FROM test.view')
if kafka_check_result(result):
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
kafka_check_result(result, True)
@pytest.mark.timeout(180)
2019-08-15 13:49:49 +00:00
def test_kafka_many_materialized_views(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view1;
DROP TABLE IF EXISTS test.view2;
DROP TABLE IF EXISTS test.consumer1;
DROP TABLE IF EXISTS test.consumer2;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'mmv',
kafka_group_name = 'mmv',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
CREATE TABLE test.view1 (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE TABLE test.view2 (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer1 TO test.view1 AS
SELECT * FROM test.kafka;
CREATE MATERIALIZED VIEW test.consumer2 TO test.view2 AS
SELECT * FROM test.kafka;
''')
messages = []
for i in range(50):
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('mmv', messages)
while True:
result1 = instance.query('SELECT * FROM test.view1')
result2 = instance.query('SELECT * FROM test.view2')
if kafka_check_result(result1) and kafka_check_result(result2):
break
instance.query('''
DROP TABLE test.consumer1;
DROP TABLE test.consumer2;
DROP TABLE test.view1;
DROP TABLE test.view2;
''')
kafka_check_result(result1, True)
kafka_check_result(result2, True)
@pytest.mark.timeout(300)
2019-04-18 15:52:18 +00:00
def test_kafka_flush_on_big_message(kafka_cluster):
# Create batchs of messages of size ~100Kb
2019-06-21 16:58:13 +00:00
kafka_messages = 1000
2019-04-18 15:52:18 +00:00
batch_messages = 1000
messages = [json.dumps({'key': i, 'value': 'x' * 100}) * batch_messages for i in range(kafka_messages)]
kafka_produce('flush', messages)
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
2019-06-23 14:48:58 +00:00
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'flush',
kafka_group_name = 'flush',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 10;
2019-04-18 15:52:18 +00:00
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;
''')
client = KafkaAdminClient(bootstrap_servers="localhost:9092")
received = False
while not received:
try:
offsets = client.list_consumer_group_offsets('flush')
for topic, offset in offsets.items():
if topic.topic == 'flush' and offset.offset == kafka_messages:
received = True
break
except kafka.errors.GroupCoordinatorNotAvailableError:
continue
while True:
2019-04-18 15:52:18 +00:00
result = instance.query('SELECT count() FROM test.view')
if int(result) == kafka_messages*batch_messages:
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
2019-04-18 15:52:18 +00:00
assert int(result) == kafka_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(180)
2019-05-23 14:25:41 +00:00
def test_kafka_virtual_columns(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
2019-06-23 14:48:58 +00:00
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'virt1',
kafka_group_name = 'virt1',
kafka_format = 'JSONEachRow';
2019-05-23 14:25:41 +00:00
''')
messages = ''
for i in range(25):
messages += json.dumps({'key': i, 'value': i}) + '\n'
2019-08-06 14:18:37 +00:00
kafka_produce('virt1', [messages], 0)
2019-05-23 14:25:41 +00:00
messages = ''
for i in range(25, 50):
messages += json.dumps({'key': i, 'value': i}) + '\n'
2019-08-06 14:18:37 +00:00
kafka_produce('virt1', [messages], 0)
2019-05-23 14:25:41 +00:00
result = ''
while True:
2019-08-08 09:57:00 +00:00
result += instance.query('SELECT _key, key, _topic, value, _offset, _partition, _timestamp FROM test.kafka', ignore_error=True)
2019-06-23 14:48:58 +00:00
if kafka_check_result(result, False, 'test_kafka_virtual1.reference'):
break
2019-06-23 14:48:58 +00:00
kafka_check_result(result, True, 'test_kafka_virtual1.reference')
@pytest.mark.timeout(180)
def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
2019-06-23 14:48:58 +00:00
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'virt2',
kafka_group_name = 'virt2',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
2019-08-06 14:18:37 +00:00
CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64, partition UInt64, timestamp Nullable(DateTime))
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
2019-08-06 14:18:37 +00:00
SELECT *, _key as kafka_key, _topic as topic, _offset as offset, _partition as partition, _timestamp as timestamp FROM test.kafka;
''')
messages = []
for i in range(50):
messages.append(json.dumps({'key': i, 'value': i}))
2019-08-06 14:18:37 +00:00
kafka_produce('virt2', messages, 0)
while True:
2019-08-06 14:18:37 +00:00
result = instance.query('SELECT kafka_key, key, topic, value, offset, partition, timestamp FROM test.view')
2019-06-23 14:48:58 +00:00
if kafka_check_result(result, False, 'test_kafka_virtual2.reference'):
2019-05-23 14:25:41 +00:00
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
kafka_check_result(result, True, 'test_kafka_virtual2.reference')
2019-05-23 14:25:41 +00:00
@pytest.mark.timeout(180)
def test_kafka_insert(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'insert1',
kafka_group_name = 'insert1',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
''')
values = []
for i in range(50):
values.append("({i}, {i})".format(i=i))
values = ','.join(values)
while True:
try:
instance.query("INSERT INTO test.kafka VALUES {}".format(values))
break
except QueryRuntimeException as e:
if 'Local: Timed out.' in str(e):
continue
else:
raise
messages = []
while True:
messages.extend(kafka_consume('insert1'))
if len(messages) == 50:
break
result = '\n'.join(messages)
kafka_check_result(result, True)
2019-10-28 14:41:24 +00:00
@pytest.mark.timeout(240)
def test_kafka_produce_consume(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'insert2',
kafka_group_name = 'insert2',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;
''')
messages_num = 10000
def insert():
values = []
for i in range(messages_num):
values.append("({i}, {i})".format(i=i))
values = ','.join(values)
while True:
try:
instance.query("INSERT INTO test.kafka VALUES {}".format(values))
break
except QueryRuntimeException as e:
if 'Local: Timed out.' in str(e):
continue
else:
raise
threads = []
threads_num = 16
for _ in range(threads_num):
threads.append(threading.Thread(target=insert))
for thread in threads:
time.sleep(random.uniform(0, 1))
thread.start()
while True:
result = instance.query('SELECT count() FROM test.view')
time.sleep(1)
if int(result) == messages_num * threads_num:
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
for thread in threads:
thread.join()
assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
2019-09-01 13:03:38 +00:00
@pytest.mark.timeout(300)
def test_kafka_commit_on_block_write(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'block',
kafka_group_name = 'block',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 100,
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;
''')
cancel = threading.Event()
i = [0]
def produce():
while not cancel.is_set():
messages = []
for _ in range(101):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
kafka_produce('block', messages)
kafka_thread = threading.Thread(target=produce)
kafka_thread.start()
while int(instance.query('SELECT count() FROM test.view')) == 0:
time.sleep(1)
cancel.set()
instance.query('''
DROP TABLE test.kafka;
''')
while int(instance.query("SELECT count() FROM system.tables WHERE database='test' AND name='kafka'")) == 1:
time.sleep(1)
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'block',
kafka_group_name = 'block',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 100,
kafka_row_delimiter = '\\n';
''')
while int(instance.query('SELECT uniqExact(key) FROM test.view')) < i[0]:
time.sleep(1)
result = int(instance.query('SELECT count() == uniqExact(key) FROM test.view'))
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
kafka_thread.join()
assert result == 1, 'Messages from kafka get duplicated!'
@pytest.mark.timeout(180)
def test_kafka_virtual_columns2(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
topic_list = []
topic_list.append(NewTopic(name="virt2_0", num_partitions=2, replication_factor=1))
topic_list.append(NewTopic(name="virt2_1", num_partitions=2, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
instance.query('''
CREATE TABLE test.kafka (value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'virt2_0,virt2_1',
kafka_group_name = 'virt2',
2020-05-28 13:39:55 +00:00
kafka_num_consumers = 2,
kafka_format = 'JSONEachRow';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT value, _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp), toUnixTimestamp64Milli(_timestamp_ms), _headers.name, _headers.value FROM test.kafka;
''')
producer = KafkaProducer(bootstrap_servers="localhost:9092")
producer.send(topic='virt2_0', value=json.dumps({'value': 1}), partition=0, key='k1', timestamp_ms=1577836801001, headers=[('content-encoding', b'base64')])
producer.send(topic='virt2_0', value=json.dumps({'value': 2}), partition=0, key='k2', timestamp_ms=1577836802002, headers=[('empty_value', ''),('', 'empty name'), ('',''), ('repetition', '1'), ('repetition', '2')])
producer.flush()
time.sleep(1)
producer.send(topic='virt2_0', value=json.dumps({'value': 3}), partition=1, key='k3', timestamp_ms=1577836803003, headers=[('b', 'b'),('a', 'a')])
producer.send(topic='virt2_0', value=json.dumps({'value': 4}), partition=1, key='k4', timestamp_ms=1577836804004, headers=[('a', 'a'),('b', 'b')])
producer.flush()
time.sleep(1)
producer.send(topic='virt2_1', value=json.dumps({'value': 5}), partition=0, key='k5', timestamp_ms=1577836805005)
producer.send(topic='virt2_1', value=json.dumps({'value': 6}), partition=0, key='k6', timestamp_ms=1577836806006)
producer.flush()
time.sleep(1)
producer.send(topic='virt2_1', value=json.dumps({'value': 7}), partition=1, key='k7', timestamp_ms=1577836807007)
producer.send(topic='virt2_1', value=json.dumps({'value': 8}), partition=1, key='k8', timestamp_ms=1577836808008)
producer.flush()
time.sleep(10)
2020-05-28 13:39:55 +00:00
members = describe_consumer_group('virt2')
#pprint.pprint(members)
members[0]['client_id'] = u'ClickHouse-instance-test-kafka-0'
members[1]['client_id'] = u'ClickHouse-instance-test-kafka-1'
result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True)
expected = '''\
1 k1 virt2_0 0 0 1577836801 1577836801001 ['content-encoding'] ['base64']
2 k2 virt2_0 0 1 1577836802 1577836802002 ['empty_value','','','repetition','repetition'] ['','empty name','','1','2']
3 k3 virt2_0 1 0 1577836803 1577836803003 ['b','a'] ['b','a']
4 k4 virt2_0 1 1 1577836804 1577836804004 ['a','b'] ['a','b']
5 k5 virt2_1 0 0 1577836805 1577836805005 [] []
6 k6 virt2_1 0 1 1577836806 1577836806006 [] []
7 k7 virt2_1 1 0 1577836807 1577836807007 [] []
8 k8 virt2_1 1 1 1577836808 1577836808008 [] []
'''
assert TSV(result) == TSV(expected)
@pytest.mark.timeout(240)
def test_kafka_produce_key_timestamp(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka_writer (key UInt64, value UInt64, _key String, _timestamp DateTime)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'insert3',
kafka_group_name = 'insert3',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
CREATE TABLE test.kafka (key UInt64, value UInt64, inserted_key String, inserted_timestamp DateTime)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'insert3',
kafka_group_name = 'insert3',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT key, value, inserted_key, toUnixTimestamp(inserted_timestamp), _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp) FROM test.kafka;
''')
instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(1,1,'k1',1577836801))
instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(2,2,'k2',1577836802))
instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({})),({},{},'{}',toDateTime({}))".format(3,3,'k3',1577836803,4,4,'k4',1577836804))
instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(5,5,'k5',1577836805))
time.sleep(10)
result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True)
2020-02-05 07:05:55 +00:00
# print(result)
expected = '''\
1 1 k1 1577836801 k1 insert3 0 0 1577836801
2 2 k2 1577836802 k2 insert3 0 1 1577836802
3 3 k3 1577836803 k3 insert3 0 2 1577836803
4 4 k4 1577836804 k4 insert3 0 3 1577836804
5 5 k5 1577836805 k5 insert3 0 4 1577836805
'''
assert TSV(result) == TSV(expected)
@pytest.mark.timeout(600)
def test_kafka_flush_by_time(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'flush_by_time',
kafka_group_name = 'flush_by_time',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 100,
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;
''')
cancel = threading.Event()
def produce():
while not cancel.is_set():
messages = []
messages.append(json.dumps({'key': 0, 'value': 0}))
kafka_produce('flush_by_time', messages)
time.sleep(1)
kafka_thread = threading.Thread(target=produce)
kafka_thread.start()
time.sleep(18)
result = instance.query('SELECT count() FROM test.view')
print(result)
cancel.set()
kafka_thread.join()
# kafka_cluster.open_bash_shell('instance')
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
# 40 = 2 flushes (7.5 sec), 15 polls each, about 1 mgs per 1.5 sec
assert int(result) > 12, 'Messages from kafka should be flushed at least every stream_flush_interval_ms!'
@pytest.mark.timeout(600)
def test_kafka_flush_by_block_size(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'flush_by_block_size',
kafka_group_name = 'flush_by_block_size',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 100,
kafka_row_delimiter = '\\n';
SELECT * FROM test.kafka;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;
''')
messages = []
for _ in range(101):
messages.append(json.dumps({'key': 0, 'value': 0}))
kafka_produce('flush_by_block_size', messages)
time.sleep(1)
2020-05-27 05:44:13 +00:00
# TODO: due to https://github.com/ClickHouse/ClickHouse/issues/11216
# second flush happens earlier than expected, so we have 2 parts here instead of one
# flush by block size works correctly, so the feature checked by the test is working correctly
result = instance.query("SELECT count() FROM test.view WHERE _part='all_1_1_0'")
# print(result)
# kafka_cluster.open_bash_shell('instance')
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
# 100 = first poll should return 100 messages (and rows)
# not waiting for stream_flush_interval_ms
assert int(result) == 100, 'Messages from kafka should be flushed at least every stream_flush_interval_ms!'
@pytest.mark.timeout(600)
def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
topic_list = []
topic_list.append(NewTopic(name="topic_with_multiple_partitions2", num_partitions=10, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'topic_with_multiple_partitions2',
kafka_group_name = 'topic_with_multiple_partitions2',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 211;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;
''')
messages = []
count = 0
for dummy_msg in range(1000):
rows = []
for dummy_row in range(random.randrange(3,10)):
count = count + 1
rows.append(json.dumps({'key': count, 'value': count}))
messages.append("\n".join(rows))
kafka_produce('topic_with_multiple_partitions2', messages)
time.sleep(30)
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view')
print(result)
assert TSV(result) == TSV('{0}\t{0}\t{0}'.format(count) )
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
@pytest.mark.timeout(1200)
def test_kafka_rebalance(kafka_cluster):
NUMBER_OF_CONSURRENT_CONSUMERS=11
instance.query('''
DROP TABLE IF EXISTS test.destination;
CREATE TABLE test.destination (
key UInt64,
value UInt64,
_topic String,
_key String,
_offset UInt64,
_partition UInt64,
_timestamp Nullable(DateTime),
_consumed_by LowCardinality(String)
)
ENGINE = MergeTree()
ORDER BY key;
''')
# kafka_cluster.open_bash_shell('instance')
#time.sleep(2)
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
topic_list = []
topic_list.append(NewTopic(name="topic_with_multiple_partitions", num_partitions=11, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
cancel = threading.Event()
msg_index = [0]
def produce():
while not cancel.is_set():
messages = []
for _ in range(59):
messages.append(json.dumps({'key': msg_index[0], 'value': msg_index[0]}))
msg_index[0] += 1
kafka_produce('topic_with_multiple_partitions', messages)
kafka_thread = threading.Thread(target=produce)
kafka_thread.start()
for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS):
table_name = 'kafka_consumer{}'.format(consumer_index)
print("Setting up {}".format(table_name))
instance.query('''
DROP TABLE IF EXISTS test.{0};
DROP TABLE IF EXISTS test.{0}_mv;
CREATE TABLE test.{0} (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'topic_with_multiple_partitions',
kafka_group_name = 'rebalance_test_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 33;
CREATE MATERIALIZED VIEW test.{0}_mv TO test.destination AS
SELECT
key,
value,
_topic,
_key,
_offset,
_partition,
_timestamp,
'{0}' as _consumed_by
FROM test.{0};
'''.format(table_name))
# kafka_cluster.open_bash_shell('instance')
while int(instance.query("SELECT count() FROM test.destination WHERE _consumed_by='{}'".format(table_name))) == 0:
print("Waiting for test.kafka_consumer{} to start consume".format(consumer_index))
time.sleep(1)
cancel.set()
# I leave last one working by intent (to finish consuming after all rebalances)
for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS-1):
print("Dropping test.kafka_consumer{}".format(consumer_index))
instance.query('DROP TABLE IF EXISTS test.kafka_consumer{}'.format(consumer_index))
while int(instance.query("SELECT count() FROM system.tables WHERE database='test' AND name='kafka_consumer{}'".format(consumer_index))) == 1:
time.sleep(1)
# print(instance.query('SELECT count(), uniqExact(key), max(key) + 1 FROM test.destination'))
# kafka_cluster.open_bash_shell('instance')
while 1:
messages_consumed = int(instance.query('SELECT uniqExact(key) FROM test.destination'))
if messages_consumed >= msg_index[0]:
break
time.sleep(1)
print("Waiting for finishing consuming (have {}, should be {})".format(messages_consumed,msg_index[0]))
print(instance.query('SELECT count(), uniqExact(key), max(key) + 1 FROM test.destination'))
# Some queries to debug...
# SELECT * FROM test.destination where key in (SELECT key FROM test.destination group by key having count() <> 1)
# select number + 1 as key from numbers(4141) left join test.destination using (key) where test.destination.key = 0;
# SELECT * FROM test.destination WHERE key between 2360 and 2370 order by key;
# select _partition from test.destination group by _partition having count() <> max(_offset) + 1;
# select toUInt64(0) as _partition, number + 1 as _offset from numbers(400) left join test.destination using (_partition,_offset) where test.destination.key = 0 order by _offset;
# SELECT * FROM test.destination WHERE _partition = 0 and _offset between 220 and 240 order by _offset;
# CREATE TABLE test.reference (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092',
# kafka_topic_list = 'topic_with_multiple_partitions',
# kafka_group_name = 'rebalance_test_group_reference',
# kafka_format = 'JSONEachRow',
# kafka_max_block_size = 100000;
#
# CREATE MATERIALIZED VIEW test.reference_mv Engine=Log AS
# SELECT key, value, _topic,_key,_offset, _partition, _timestamp, 'reference' as _consumed_by
# FROM test.reference;
#
# select * from test.reference_mv left join test.destination using (key,_topic,_offset,_partition) where test.destination._consumed_by = '';
result = int(instance.query('SELECT count() == uniqExact(key) FROM test.destination'))
for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS):
print("kafka_consumer{}".format(consumer_index))
table_name = 'kafka_consumer{}'.format(consumer_index)
instance.query('''
DROP TABLE IF EXISTS test.{0};
DROP TABLE IF EXISTS test.{0}_mv;
'''.format(table_name))
instance.query('''
DROP TABLE IF EXISTS test.destination;
''')
kafka_thread.join()
assert result == 1, 'Messages from kafka get duplicated!'
@pytest.mark.timeout(1200)
def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('no_holes_when_write_suffix_failed', messages)
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'no_holes_when_write_suffix_failed',
kafka_group_name = 'no_holes_when_write_suffix_failed',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 20;
CREATE TABLE test.view (key UInt64, value String)
ENGINE = ReplicatedMergeTree('/clickhouse/kafkatest/tables/no_holes_when_write_suffix_failed', 'node1')
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka
WHERE NOT sleepEachRow(1);
''')
# the tricky part here is that disconnect should happen after write prefix, but before write suffix
# so i use sleepEachRow
with PartitionManager() as pm:
time.sleep(12)
pm.drop_instance_zk_connections(instance)
time.sleep(20)
pm.heal_all
# connection restored and it will take a while until next block will be flushed
# it takes years on CI :\
time.sleep(90)
# as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly
assert instance.contains_in_log("ZooKeeper session has been expired.: while write prefix to view")
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view')
print(result)
# kafka_cluster.open_bash_shell('instance')
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
assert TSV(result) == TSV('22\t22\t22')
@pytest.mark.timeout(120)
def test_exception_from_destructor(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'xyz',
kafka_group_name = '',
kafka_format = 'JSONEachRow';
''')
instance.query_and_get_error('''
SELECT * FROM test.kafka;
''')
instance.query('''
DROP TABLE test.kafka;
''')
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'xyz',
kafka_group_name = '',
kafka_format = 'JSONEachRow';
''')
instance.query('''
DROP TABLE test.kafka;
''')
#kafka_cluster.open_bash_shell('instance')
assert TSV(instance.query('SELECT 1')) == TSV('1')
@pytest.mark.timeout(120)
def test_commits_of_unprocessed_messages_on_drop(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(1)]
kafka_produce('commits_of_unprocessed_messages_on_drop', messages)
instance.query('''
DROP TABLE IF EXISTS test.destination;
CREATE TABLE test.destination (
key UInt64,
value UInt64,
_topic String,
_key String,
_offset UInt64,
_partition UInt64,
_timestamp Nullable(DateTime),
_consumed_by LowCardinality(String)
)
ENGINE = MergeTree()
ORDER BY key;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'commits_of_unprocessed_messages_on_drop',
kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1000;
CREATE MATERIALIZED VIEW test.kafka_consumer TO test.destination AS
SELECT
key,
value,
_topic,
_key,
_offset,
_partition,
_timestamp
FROM test.kafka;
''')
while int(instance.query("SELECT count() FROM test.destination")) == 0:
print("Waiting for test.kafka_consumer to start consume")
time.sleep(1)
cancel = threading.Event()
i = [2]
def produce():
while not cancel.is_set():
messages = []
for _ in range(113):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
kafka_produce('commits_of_unprocessed_messages_on_drop', messages)
time.sleep(1)
kafka_thread = threading.Thread(target=produce)
kafka_thread.start()
time.sleep(12)
instance.query('''
DROP TABLE test.kafka;
''')
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'commits_of_unprocessed_messages_on_drop',
kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 10000;
''')
cancel.set()
time.sleep(15)
#kafka_cluster.open_bash_shell('instance')
# SELECT key, _timestamp, _offset FROM test.destination where runningDifference(key) <> 1 ORDER BY key;
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.destination')
print(result)
instance.query('''
DROP TABLE test.kafka_consumer;
DROP TABLE test.destination;
''')
kafka_thread.join()
assert TSV(result) == TSV('{0}\t{0}\t{0}'.format(i[0]-1)), 'Missing data!'
@pytest.mark.timeout(120)
def test_bad_reschedule(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(20000)]
kafka_produce('test_bad_reschedule', messages)
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'test_bad_reschedule',
kafka_group_name = 'test_bad_reschedule',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1000;
CREATE MATERIALIZED VIEW test.destination Engine=Log AS
SELECT
key,
now() as consume_ts,
value,
_topic,
_key,
_offset,
_partition,
_timestamp
FROM test.kafka;
''')
while int(instance.query("SELECT count() FROM test.destination")) < 20000:
print("Waiting for consume")
time.sleep(1)
assert int(instance.query("SELECT max(consume_ts) - min(consume_ts) FROM test.destination")) < 8
2020-03-26 14:43:22 +00:00
@pytest.mark.timeout(1200)
def test_kafka_duplicates_when_commit_failed(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('duplicates_when_commit_failed', messages)
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'duplicates_when_commit_failed',
kafka_group_name = 'duplicates_when_commit_failed',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 20;
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka
WHERE NOT sleepEachRow(0.5);
''')
#print time.strftime("%m/%d/%Y %H:%M:%S")
time.sleep(12) # 5-6 sec to connect to kafka, do subscription, and fetch 20 rows, another 10 sec for MV, after that commit should happen
#print time.strftime("%m/%d/%Y %H:%M:%S")
kafka_cluster.pause_container('kafka1')
# that timeout it VERY important, and picked after lot of experiments
# when too low (<30sec) librdkafka will not report any timeout (alternative is to decrease the default session timeouts for librdkafka)
# when too high (>50sec) broker will decide to remove us from the consumer group, and will start answering "Broker: Unknown member"
time.sleep(40)
#print time.strftime("%m/%d/%Y %H:%M:%S")
kafka_cluster.unpause_container('kafka1')
#kafka_cluster.open_bash_shell('instance')
# connection restored and it will take a while until next block will be flushed
# it takes years on CI :\
time.sleep(30)
# as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly
assert instance.contains_in_log("Local: Waiting for coordinator")
assert instance.contains_in_log("All commit attempts failed")
2020-03-26 14:43:22 +00:00
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view')
print(result)
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
# After https://github.com/edenhill/librdkafka/issues/2631
# timeout triggers rebalance, making further commits to the topic after getting back online
# impossible. So we have a duplicate in that scenario, but we report that situation properly.
assert TSV(result) == TSV('42\t22\t22')
2020-03-26 14:43:22 +00:00
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")
cluster.shutdown()