mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
proper placement for test
This commit is contained in:
parent
2bcfff257c
commit
c4470ff283
@ -156,6 +156,146 @@ def avro_confluent_message(schema_registry_client, value):
|
||||
})
|
||||
return serializer.encode_record_with_schema('test_subject', schema, value)
|
||||
|
||||
# Since everything is async and shaky when receiving messages from Kafka,
|
||||
# we may want to try and check results multiple times in a loop.
|
||||
def kafka_check_result(result, check=False, ref_file='test_kafka_json.reference'):
|
||||
fpath = p.join(p.dirname(__file__), ref_file)
|
||||
with open(fpath) as reference:
|
||||
if check:
|
||||
assert TSV(result) == TSV(reference)
|
||||
else:
|
||||
return TSV(result) == TSV(reference)
|
||||
|
||||
|
||||
# https://stackoverflow.com/a/57692111/1555175
|
||||
def describe_consumer_group(name):
|
||||
client = BrokerConnection('localhost', 9092, socket.AF_INET)
|
||||
client.connect_blocking()
|
||||
|
||||
list_members_in_groups = DescribeGroupsRequest_v1(groups=[name])
|
||||
future = client.send(list_members_in_groups)
|
||||
while not future.is_done:
|
||||
for resp, f in client.recv():
|
||||
f.success(resp)
|
||||
|
||||
(error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
|
||||
|
||||
res = []
|
||||
for member in members:
|
||||
(member_id, client_id, client_host, member_metadata, member_assignment) = member
|
||||
member_info = {}
|
||||
member_info['member_id'] = member_id
|
||||
member_info['client_id'] = client_id
|
||||
member_info['client_host'] = client_host
|
||||
member_topics_assignment = []
|
||||
for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
|
||||
member_topics_assignment.append({'topic': topic, 'partitions': partitions})
|
||||
member_info['assignment'] = member_topics_assignment
|
||||
res.append(member_info)
|
||||
return res
|
||||
|
||||
|
||||
# Fixtures
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def kafka_cluster():
|
||||
try:
|
||||
global kafka_id
|
||||
cluster.start()
|
||||
kafka_id = instance.cluster.kafka_docker_id
|
||||
print(("kafka_id is {}".format(kafka_id)))
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def kafka_setup_teardown():
|
||||
instance.query('DROP DATABASE IF EXISTS test; CREATE DATABASE test;')
|
||||
wait_kafka_is_available()
|
||||
# print("kafka is available - running test")
|
||||
yield # run test
|
||||
|
||||
|
||||
# Tests
|
||||
|
||||
@pytest.mark.timeout(180)
|
||||
def test_kafka_settings_old_syntax(kafka_cluster):
|
||||
assert TSV(instance.query("SELECT * FROM system.macros WHERE macro like 'kafka%' ORDER BY macro",
|
||||
ignore_error=True)) == TSV('''kafka_broker kafka1
|
||||
kafka_client_id instance
|
||||
kafka_format_json_each_row JSONEachRow
|
||||
kafka_group_name_new new
|
||||
kafka_group_name_old old
|
||||
kafka_topic_new new
|
||||
kafka_topic_old old
|
||||
''')
|
||||
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_old}', '{kafka_group_name_old}', '{kafka_format_json_each_row}', '\\n');
|
||||
''')
|
||||
|
||||
# Don't insert malformed messages since old settings syntax
|
||||
# doesn't support skipping of broken messages.
|
||||
messages = []
|
||||
for i in range(50):
|
||||
messages.append(json.dumps({'key': i, 'value': i}))
|
||||
kafka_produce('old', messages)
|
||||
|
||||
result = ''
|
||||
while True:
|
||||
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
|
||||
if kafka_check_result(result):
|
||||
break
|
||||
|
||||
kafka_check_result(result, True)
|
||||
|
||||
members = describe_consumer_group('old')
|
||||
assert members[0]['client_id'] == 'ClickHouse-instance-test-kafka'
|
||||
# text_desc = kafka_cluster.exec_in_container(kafka_cluster.get_container_id('kafka1'),"kafka-consumer-groups --bootstrap-server localhost:9092 --describe --members --group old --verbose"))
|
||||
|
||||
|
||||
@pytest.mark.timeout(180)
|
||||
def test_kafka_settings_new_syntax(kafka_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS kafka_broker_list = '{kafka_broker}:19092',
|
||||
kafka_topic_list = '{kafka_topic_new}',
|
||||
kafka_group_name = '{kafka_group_name_new}',
|
||||
kafka_format = '{kafka_format_json_each_row}',
|
||||
kafka_row_delimiter = '\\n',
|
||||
kafka_client_id = '{kafka_client_id} test 1234',
|
||||
kafka_skip_broken_messages = 1;
|
||||
''')
|
||||
|
||||
messages = []
|
||||
for i in range(25):
|
||||
messages.append(json.dumps({'key': i, 'value': i}))
|
||||
kafka_produce('new', messages)
|
||||
|
||||
# Insert couple of malformed messages.
|
||||
kafka_produce('new', ['}{very_broken_message,'])
|
||||
kafka_produce('new', ['}another{very_broken_message,'])
|
||||
|
||||
messages = []
|
||||
for i in range(25, 50):
|
||||
messages.append(json.dumps({'key': i, 'value': i}))
|
||||
kafka_produce('new', messages)
|
||||
|
||||
result = ''
|
||||
while True:
|
||||
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
|
||||
if kafka_check_result(result):
|
||||
break
|
||||
|
||||
kafka_check_result(result, True)
|
||||
|
||||
members = describe_consumer_group('new')
|
||||
assert members[0]['client_id'] == 'instance test 1234'
|
||||
|
||||
|
||||
@pytest.mark.timeout(180)
|
||||
def test_kafka_json_as_string(kafka_cluster):
|
||||
@ -574,148 +714,6 @@ def test_kafka_formats(kafka_cluster):
|
||||
'''.format(topic_name=topic_name, offset_0=offsets[0], offset_1=offsets[1], offset_2=offsets[2])
|
||||
assert TSV(result) == TSV(expected), 'Proper result for format: {}'.format(format_name)
|
||||
|
||||
|
||||
# Since everything is async and shaky when receiving messages from Kafka,
|
||||
# we may want to try and check results multiple times in a loop.
|
||||
def kafka_check_result(result, check=False, ref_file='test_kafka_json.reference'):
|
||||
fpath = p.join(p.dirname(__file__), ref_file)
|
||||
with open(fpath) as reference:
|
||||
if check:
|
||||
assert TSV(result) == TSV(reference)
|
||||
else:
|
||||
return TSV(result) == TSV(reference)
|
||||
|
||||
|
||||
# https://stackoverflow.com/a/57692111/1555175
|
||||
def describe_consumer_group(name):
|
||||
client = BrokerConnection('localhost', 9092, socket.AF_INET)
|
||||
client.connect_blocking()
|
||||
|
||||
list_members_in_groups = DescribeGroupsRequest_v1(groups=[name])
|
||||
future = client.send(list_members_in_groups)
|
||||
while not future.is_done:
|
||||
for resp, f in client.recv():
|
||||
f.success(resp)
|
||||
|
||||
(error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
|
||||
|
||||
res = []
|
||||
for member in members:
|
||||
(member_id, client_id, client_host, member_metadata, member_assignment) = member
|
||||
member_info = {}
|
||||
member_info['member_id'] = member_id
|
||||
member_info['client_id'] = client_id
|
||||
member_info['client_host'] = client_host
|
||||
member_topics_assignment = []
|
||||
for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
|
||||
member_topics_assignment.append({'topic': topic, 'partitions': partitions})
|
||||
member_info['assignment'] = member_topics_assignment
|
||||
res.append(member_info)
|
||||
return res
|
||||
|
||||
|
||||
# Fixtures
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def kafka_cluster():
|
||||
try:
|
||||
global kafka_id
|
||||
cluster.start()
|
||||
kafka_id = instance.cluster.kafka_docker_id
|
||||
print(("kafka_id is {}".format(kafka_id)))
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def kafka_setup_teardown():
|
||||
instance.query('DROP DATABASE IF EXISTS test; CREATE DATABASE test;')
|
||||
wait_kafka_is_available()
|
||||
# print("kafka is available - running test")
|
||||
yield # run test
|
||||
|
||||
|
||||
# Tests
|
||||
|
||||
@pytest.mark.timeout(180)
|
||||
def test_kafka_settings_old_syntax(kafka_cluster):
|
||||
assert TSV(instance.query("SELECT * FROM system.macros WHERE macro like 'kafka%' ORDER BY macro",
|
||||
ignore_error=True)) == TSV('''kafka_broker kafka1
|
||||
kafka_client_id instance
|
||||
kafka_format_json_each_row JSONEachRow
|
||||
kafka_group_name_new new
|
||||
kafka_group_name_old old
|
||||
kafka_topic_new new
|
||||
kafka_topic_old old
|
||||
''')
|
||||
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_old}', '{kafka_group_name_old}', '{kafka_format_json_each_row}', '\\n');
|
||||
''')
|
||||
|
||||
# Don't insert malformed messages since old settings syntax
|
||||
# doesn't support skipping of broken messages.
|
||||
messages = []
|
||||
for i in range(50):
|
||||
messages.append(json.dumps({'key': i, 'value': i}))
|
||||
kafka_produce('old', messages)
|
||||
|
||||
result = ''
|
||||
while True:
|
||||
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
|
||||
if kafka_check_result(result):
|
||||
break
|
||||
|
||||
kafka_check_result(result, True)
|
||||
|
||||
members = describe_consumer_group('old')
|
||||
assert members[0]['client_id'] == 'ClickHouse-instance-test-kafka'
|
||||
# text_desc = kafka_cluster.exec_in_container(kafka_cluster.get_container_id('kafka1'),"kafka-consumer-groups --bootstrap-server localhost:9092 --describe --members --group old --verbose"))
|
||||
|
||||
|
||||
@pytest.mark.timeout(180)
|
||||
def test_kafka_settings_new_syntax(kafka_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS kafka_broker_list = '{kafka_broker}:19092',
|
||||
kafka_topic_list = '{kafka_topic_new}',
|
||||
kafka_group_name = '{kafka_group_name_new}',
|
||||
kafka_format = '{kafka_format_json_each_row}',
|
||||
kafka_row_delimiter = '\\n',
|
||||
kafka_client_id = '{kafka_client_id} test 1234',
|
||||
kafka_skip_broken_messages = 1;
|
||||
''')
|
||||
|
||||
messages = []
|
||||
for i in range(25):
|
||||
messages.append(json.dumps({'key': i, 'value': i}))
|
||||
kafka_produce('new', messages)
|
||||
|
||||
# Insert couple of malformed messages.
|
||||
kafka_produce('new', ['}{very_broken_message,'])
|
||||
kafka_produce('new', ['}another{very_broken_message,'])
|
||||
|
||||
messages = []
|
||||
for i in range(25, 50):
|
||||
messages.append(json.dumps({'key': i, 'value': i}))
|
||||
kafka_produce('new', messages)
|
||||
|
||||
result = ''
|
||||
while True:
|
||||
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
|
||||
if kafka_check_result(result):
|
||||
break
|
||||
|
||||
kafka_check_result(result, True)
|
||||
|
||||
members = describe_consumer_group('new')
|
||||
assert members[0]['client_id'] == 'instance test 1234'
|
||||
|
||||
|
||||
@pytest.mark.timeout(180)
|
||||
def test_kafka_issue11308(kafka_cluster):
|
||||
# Check that matview does respect Kafka SETTINGS
|
||||
|
Loading…
Reference in New Issue
Block a user