Improve kafka topic creation and deletion. Improve mongodb tests

This commit is contained in:
Yatsishin Ilya 2021-08-30 15:58:21 +03:00
parent 121df40a97
commit da4e688293
2 changed files with 113 additions and 52 deletions

View File

@ -13,7 +13,6 @@ import avro.io
import avro.datafile
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from confluent_kafka import admin
import kafka.errors
import pytest
@ -71,6 +70,43 @@ def get_kafka_producer(port, serializer, retries):
def producer_serializer(x):
return x.encode() if isinstance(x, str) else x
def kafka_create_topic(admin_client, topic_name, num_partitions=1, replication_factor=1, max_retries=50, config=None):
logging.debug(f"Kafka create topic={topic_name}, num_partitions={num_partitions}, replication_factor={replication_factor}")
topics_list = [NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor, topic_configs=config)]
retries = 0
while True:
try:
admin_client.create_topics(new_topics=topics_list, validate_only=False)
logging.debug("Admin client succeed")
return
except Exception as e:
retries += 1
time.sleep(0.5)
if retries < max_retries:
logging.warning(f"Failed to create topic {e}")
else:
raise
def kafka_delete_topic(admin_client, topic, max_retries=50):
result = admin_client.delete_topics([topic])
for (topic, e) in result.topic_error_codes:
if e == 0:
logging.debug(f"Topic {topic} deleted")
else:
logging.error(f"Failed to delete topic {topic}: {e}")
retries = 0
while True:
topics_listed = admin_client.list_topics()
logging.debug(f"TOPICS LISTED: {topics_listed}")
if topic not in topics_listed:
return
else:
retries += 1
time.sleep(0.5)
if retries > max_retries:
raise Exception(f"Failed to delete topics {topic}, {result}")
def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15):
logging.debug("kafka_produce server:{}:{} topic:{}".format("localhost", kafka_cluster.kafka_port, topic))
producer = get_kafka_producer(kafka_cluster.kafka_port, producer_serializer, retries)
@ -289,6 +325,7 @@ def test_kafka_json_as_string(kafka_cluster):
def test_kafka_formats(kafka_cluster):
schema_registry_client = CachedSchemaRegistryClient('http://localhost:{}'.format(kafka_cluster.schema_registry_port))
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
# data was dumped from clickhouse itself in a following manner
# clickhouse-client --format=Native --query='SELECT toInt64(number) as id, toUInt16( intDiv( id, 65536 ) ) as blockNo, reinterpretAsString(19777) as val1, toFloat32(0.5) as val2, toUInt8(1) as val3 from numbers(100) ORDER BY id' | xxd -ps | tr -d '\n' | sed 's/\(..\)/\\x\1/g'
@ -606,7 +643,7 @@ def test_kafka_formats(kafka_cluster):
for format_name, format_opts in list(all_formats.items()):
logging.debug(('Checking {}'.format(format_name)))
topic_name = 'format_tests_{}'.format(format_name)
topic_name = f'format_tests_{format_name}'
# shift offsets by 1 if format supports empty value
offsets = [1, 2, 3] if format_opts.get('supports_empty_value', False) else [0, 1, 2]
result = instance.query('SELECT * FROM test.kafka_{format_name}_mv;'.format(format_name=format_name))
@ -630,7 +667,7 @@ def test_kafka_formats(kafka_cluster):
0 0 AM 0.5 1 {topic_name} 0 {offset_2}
'''.format(topic_name=topic_name, offset_0=offsets[0], offset_1=offsets[1], offset_2=offsets[2])
assert TSV(result) == TSV(expected), 'Proper result for format: {}'.format(format_name)
kafka_delete_topic(admin_client, topic_name)
# Since everything is async and shaky when receiving messages from Kafka,
# we may want to try and check results multiple times in a loop.
@ -771,11 +808,10 @@ def test_kafka_issue4116(kafka_cluster):
def test_kafka_consumer_hang(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = []
topic_list.append(NewTopic(name="consumer_hang", num_partitions=8, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
topic_name = "consumer_hang"
kafka_create_topic(admin_client, topic_name, num_partitions=8)
instance.query('''
instance.query(f'''
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
@ -783,8 +819,8 @@ def test_kafka_consumer_hang(kafka_cluster):
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'consumer_hang',
kafka_group_name = 'consumer_hang',
kafka_topic_list = '{topic_name}',
kafka_group_name = '{topic_name}',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 8;
CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = Memory();
@ -818,13 +854,28 @@ def test_kafka_consumer_hang(kafka_cluster):
# 'dr'||'op' to avoid self matching
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
# cleanup unread messages so kafka will not wait reading consumers to delete topic
instance.query(f'''
CREATE TABLE test.kafka (key UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{topic_name}',
kafka_group_name = '{topic_name}',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 8;
''')
num_read = int(instance.query('SELECT count() FROM test.kafka'))
logging.debug(f"read {num_read} from {topic_name} before delete")
instance.query('DROP TABLE test.kafka')
kafka_delete_topic(admin_client, topic_name)
def test_kafka_consumer_hang2(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = []
topic_list.append(NewTopic(name="consumer_hang2", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
topic_name = "consumer_hang2"
kafka_create_topic(admin_client, topic_name)
instance.query('''
DROP TABLE IF EXISTS test.kafka;
@ -864,6 +915,7 @@ def test_kafka_consumer_hang2(kafka_cluster):
# from a user perspective: we expect no hanging 'drop' queries
# 'dr'||'op' to avoid self matching
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
kafka_delete_topic(admin_client, topic_name)
def test_kafka_csv_with_delimiter(kafka_cluster):
@ -916,21 +968,21 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
def test_kafka_select_empty(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = []
topic_list.append(NewTopic(name="empty", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
topic_name = "empty"
kafka_create_topic(admin_client, topic_name)
instance.query('''
instance.query(f'''
CREATE TABLE test.kafka (key UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'empty',
kafka_group_name = 'empty',
kafka_topic_list = '{topic_name}',
kafka_group_name = '{topic_name}',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
''')
assert int(instance.query('SELECT count() FROM test.kafka')) == 0
kafka_delete_topic(admin_client, topic_name)
def test_kafka_json_without_delimiter(kafka_cluster):
@ -1153,9 +1205,8 @@ def test_kafka_recreate_kafka_table(kafka_cluster):
# admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = []
topic_list.append(NewTopic(name="recreate_kafka_table", num_partitions=6, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
topic_name = "recreate_kafka_table"
kafka_create_topic(admin_client, topic_name, num_partitions=6)
instance.query('''
DROP TABLE IF EXISTS test.view;
@ -1211,6 +1262,7 @@ def test_kafka_recreate_kafka_table(kafka_cluster):
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
kafka_delete_topic(admin_client, topic_name)
def test_librdkafka_compression(kafka_cluster):
@ -1253,12 +1305,9 @@ def test_librdkafka_compression(kafka_cluster):
logging.debug(('Check compression {}'.format(compression_type)))
topic_name = 'test_librdkafka_compression_{}'.format(compression_type)
admin_client = admin.AdminClient({'bootstrap.servers': "localhost:{}".format(kafka_cluster.kafka_port)})
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic = admin.NewTopic(topic=topic_name, num_partitions=1, replication_factor=1, config={
'compression.type': compression_type,
})
admin_client.create_topics(new_topics=[topic], validate_only=False)
kafka_create_topic(admin_client, topic_name, config={'compression.type': compression_type})
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
@ -1281,6 +1330,8 @@ def test_librdkafka_compression(kafka_cluster):
instance.query('DROP TABLE test.kafka SYNC')
instance.query('DROP TABLE test.consumer SYNC')
kafka_delete_topic(admin_client, topic_name)
def test_kafka_materialized_view_with_subquery(kafka_cluster):
instance.query('''
@ -1652,11 +1703,9 @@ def test_kafka_commit_on_block_write(kafka_cluster):
def test_kafka_virtual_columns2(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = []
topic_list.append(NewTopic(name="virt2_0", num_partitions=2, replication_factor=1))
topic_list.append(NewTopic(name="virt2_1", num_partitions=2, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
kafka_create_topic(admin_client, "virt2_0", num_partitions=2)
kafka_create_topic(admin_client, "virt2_1", num_partitions=2)
instance.query('''
CREATE TABLE test.kafka (value UInt64)
@ -1715,14 +1764,16 @@ def test_kafka_virtual_columns2(kafka_cluster):
assert TSV(result) == TSV(expected)
kafka_delete_topic(admin_client, "virt2_0")
kafka_delete_topic(admin_client, "virt2_1")
def test_kafka_produce_key_timestamp(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = []
topic_list.append(NewTopic(name="insert3", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
topic_name = "insert3"
kafka_create_topic(admin_client, topic_name)
instance.query('''
DROP TABLE IF EXISTS test.view;
@ -1774,12 +1825,13 @@ def test_kafka_produce_key_timestamp(kafka_cluster):
assert TSV(result) == TSV(expected)
kafka_delete_topic(admin_client, topic_name)
def test_kafka_flush_by_time(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = []
topic_list.append(NewTopic(name="flush_by_time", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
topic_name = "flush_by_time"
kafka_create_topic(admin_client, topic_name)
instance.query('''
DROP TABLE IF EXISTS test.view;
@ -1833,6 +1885,7 @@ def test_kafka_flush_by_time(kafka_cluster):
''')
assert TSV(result) == TSV('1 1')
kafka_delete_topic(admin_client, topic_name)
def test_kafka_flush_by_block_size(kafka_cluster):
@ -1892,13 +1945,11 @@ def test_kafka_flush_by_block_size(kafka_cluster):
assert int(
result) == 100, 'Messages from kafka should be flushed when block of size kafka_max_block_size is formed!'
def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = []
topic_list.append(NewTopic(name="topic_with_multiple_partitions2", num_partitions=10, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
topic_name = "topic_with_multiple_partitions2"
kafka_create_topic(admin_client, topic_name, num_partitions=10)
instance.query('''
DROP TABLE IF EXISTS test.view;
@ -1938,6 +1989,7 @@ def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster):
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
kafka_delete_topic(admin_client, topic_name)
def test_kafka_rebalance(kafka_cluster):
@ -1964,9 +2016,8 @@ def test_kafka_rebalance(kafka_cluster):
# time.sleep(2)
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = []
topic_list.append(NewTopic(name="topic_with_multiple_partitions", num_partitions=11, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
topic_name = "topic_with_multiple_partitions"
kafka_create_topic(admin_client, topic_name, num_partitions=11)
cancel = threading.Event()
@ -2070,6 +2121,7 @@ def test_kafka_rebalance(kafka_cluster):
kafka_thread.join()
assert result == 1, 'Messages from kafka get duplicated!'
kafka_delete_topic(admin_client, topic_name)
def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
@ -2645,6 +2697,7 @@ def test_kafka_engine_put_errors_to_stream_with_random_malformed_json(kafka_clus
def test_kafka_formats_with_broken_message(kafka_cluster):
# data was dumped from clickhouse itself in a following manner
# clickhouse-client --format=Native --query='SELECT toInt64(number) as id, toUInt16( intDiv( id, 65536 ) ) as blockNo, reinterpretAsString(19777) as val1, toFloat32(0.5) as val2, toUInt8(1) as val3 from numbers(100) ORDER BY id' | xxd -ps | tr -d '\n' | sed 's/\(..\)/\\x\1/g'
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
all_formats = {
## Text formats ##
@ -2814,8 +2867,8 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
topic_name_prefix = 'format_tests_4_stream_'
for format_name, format_opts in list(all_formats.items()):
print(('Set up {}'.format(format_name)))
topic_name = topic_name_prefix + '{}'.format(format_name)
logging.debug(f'Set up {format_name}')
topic_name = f"{topic_name_prefix}{format_name}"
data_sample = format_opts['data_sample']
data_prefix = []
raw_message = '_raw_message'
@ -2855,8 +2908,8 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
extra_settings=format_opts.get('extra_settings') or ''))
for format_name, format_opts in list(all_formats.items()):
print(('Checking {}'.format(format_name)))
topic_name = topic_name_prefix + '{}'.format(format_name)
logging.debug('Checking {format_name}')
topic_name = f"{topic_name_prefix}{format_name}"
# shift offsets by 1 if format supports empty value
offsets = [1, 2, 3] if format_opts.get('supports_empty_value', False) else [0, 1, 2]
result = instance.query('SELECT * FROM test.kafka_data_{format_name}_mv;'.format(format_name=format_name))
@ -2886,6 +2939,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
# print(errors_result.strip())
# print(errors_expected.strip())
assert errors_result.strip() == errors_expected.strip(), 'Proper errors for format: {}'.format(format_name)
kafka_delete_topic(admin_client, topic_name)
def wait_for_new_data(table_name, prev_count = 0, max_retries = 120):
retries = 0
@ -2901,14 +2955,12 @@ def wait_for_new_data(table_name, prev_count = 0, max_retries = 120):
raise Exception("No new data :(")
def test_kafka_consumer_failover(kafka_cluster):
# for backporting:
# admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = []
topic_list.append(NewTopic(name="kafka_consumer_failover", num_partitions=2, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
topic_name = "kafka_consumer_failover"
kafka_create_topic(admin_client, topic_name, num_partitions=2)
instance.query('''
DROP TABLE IF EXISTS test.kafka;
@ -3019,6 +3071,7 @@ def test_kafka_consumer_failover(kafka_cluster):
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':8,'value': 8}), partition=1)
producer.flush()
prev_count = wait_for_new_data('test.destination', prev_count)
kafka_delete_topic(admin_client, topic_name)
if __name__ == '__main__':

View File

@ -46,6 +46,8 @@ def test_simple_select(started_cluster):
assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + '\n'
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n'
node.query("DROP TABLE simple_mongo_table")
simple_mongo_table.drop()
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
@ -67,6 +69,8 @@ def test_complex_data_type(started_cluster):
assert node.query("SELECT sum(key) FROM incomplete_mongo_table") == str(sum(range(0, 100))) + '\n'
assert node.query("SELECT data from incomplete_mongo_table where key = 42") == hex(42 * 42) + '\n'
node.query("DROP TABLE incomplete_mongo_table")
incomplete_mongo_table.drop()
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
@ -95,7 +99,9 @@ def test_incorrect_data_type(started_cluster):
with pytest.raises(QueryRuntimeException):
node.query("SELECT bbbb FROM strange_mongo_table2")
node.query("DROP TABLE strange_mongo_table")
node.query("DROP TABLE strange_mongo_table2")
strange_mongo_table.drop()
@pytest.mark.parametrize('started_cluster', [True], indirect=['started_cluster'])
def test_secure_connection(started_cluster):
@ -116,3 +122,5 @@ def test_secure_connection(started_cluster):
assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + '\n'
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n'
node.query("DROP TABLE simple_mongo_table")
simple_mongo_table.drop()