diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 8a6bf10b3c5..84f22d4d201 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -13,7 +13,6 @@ import avro.io import avro.datafile from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient from confluent_kafka.avro.serializer.message_serializer import MessageSerializer -from confluent_kafka import admin import kafka.errors import pytest @@ -71,6 +70,43 @@ def get_kafka_producer(port, serializer, retries): def producer_serializer(x): return x.encode() if isinstance(x, str) else x +def kafka_create_topic(admin_client, topic_name, num_partitions=1, replication_factor=1, max_retries=50, config=None): + logging.debug(f"Kafka create topic={topic_name}, num_partitions={num_partitions}, replication_factor={replication_factor}") + topics_list = [NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor, topic_configs=config)] + retries = 0 + while True: + try: + admin_client.create_topics(new_topics=topics_list, validate_only=False) + logging.debug("Admin client succeed") + return + except Exception as e: + retries += 1 + time.sleep(0.5) + if retries < max_retries: + logging.warning(f"Failed to create topic {e}") + else: + raise + +def kafka_delete_topic(admin_client, topic, max_retries=50): + result = admin_client.delete_topics([topic]) + for (topic, e) in result.topic_error_codes: + if e == 0: + logging.debug(f"Topic {topic} deleted") + else: + logging.error(f"Failed to delete topic {topic}: {e}") + + retries = 0 + while True: + topics_listed = admin_client.list_topics() + logging.debug(f"TOPICS LISTED: {topics_listed}") + if topic not in topics_listed: + return + else: + retries += 1 + time.sleep(0.5) + if retries > max_retries: + raise Exception(f"Failed to delete topics {topic}, {result}") + def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15): logging.debug("kafka_produce server:{}:{} topic:{}".format("localhost", kafka_cluster.kafka_port, topic)) producer = get_kafka_producer(kafka_cluster.kafka_port, producer_serializer, retries) @@ -289,6 +325,7 @@ def test_kafka_json_as_string(kafka_cluster): def test_kafka_formats(kafka_cluster): schema_registry_client = CachedSchemaRegistryClient('http://localhost:{}'.format(kafka_cluster.schema_registry_port)) + admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) # data was dumped from clickhouse itself in a following manner # clickhouse-client --format=Native --query='SELECT toInt64(number) as id, toUInt16( intDiv( id, 65536 ) ) as blockNo, reinterpretAsString(19777) as val1, toFloat32(0.5) as val2, toUInt8(1) as val3 from numbers(100) ORDER BY id' | xxd -ps | tr -d '\n' | sed 's/\(..\)/\\x\1/g' @@ -606,7 +643,7 @@ def test_kafka_formats(kafka_cluster): for format_name, format_opts in list(all_formats.items()): logging.debug(('Checking {}'.format(format_name))) - topic_name = 'format_tests_{}'.format(format_name) + topic_name = f'format_tests_{format_name}' # shift offsets by 1 if format supports empty value offsets = [1, 2, 3] if format_opts.get('supports_empty_value', False) else [0, 1, 2] result = instance.query('SELECT * FROM test.kafka_{format_name}_mv;'.format(format_name=format_name)) @@ -630,7 +667,7 @@ def test_kafka_formats(kafka_cluster): 0 0 AM 0.5 1 {topic_name} 0 {offset_2} '''.format(topic_name=topic_name, offset_0=offsets[0], offset_1=offsets[1], offset_2=offsets[2]) assert TSV(result) == TSV(expected), 'Proper result for format: {}'.format(format_name) - + kafka_delete_topic(admin_client, topic_name) # Since everything is async and shaky when receiving messages from Kafka, # we may want to try and check results multiple times in a loop. @@ -771,11 +808,10 @@ def test_kafka_issue4116(kafka_cluster): def test_kafka_consumer_hang(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) - topic_list = [] - topic_list.append(NewTopic(name="consumer_hang", num_partitions=8, replication_factor=1)) - admin_client.create_topics(new_topics=topic_list, validate_only=False) + topic_name = "consumer_hang" + kafka_create_topic(admin_client, topic_name, num_partitions=8) - instance.query(''' + instance.query(f''' DROP TABLE IF EXISTS test.kafka; DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; @@ -783,8 +819,8 @@ def test_kafka_consumer_hang(kafka_cluster): CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'consumer_hang', - kafka_group_name = 'consumer_hang', + kafka_topic_list = '{topic_name}', + kafka_group_name = '{topic_name}', kafka_format = 'JSONEachRow', kafka_num_consumers = 8; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = Memory(); @@ -818,13 +854,28 @@ def test_kafka_consumer_hang(kafka_cluster): # 'dr'||'op' to avoid self matching assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0 + # cleanup unread messages so kafka will not wait reading consumers to delete topic + instance.query(f''' + CREATE TABLE test.kafka (key UInt64) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = '{topic_name}', + kafka_group_name = '{topic_name}', + kafka_format = 'JSONEachRow', + kafka_num_consumers = 8; + ''') + + num_read = int(instance.query('SELECT count() FROM test.kafka')) + logging.debug(f"read {num_read} from {topic_name} before delete") + instance.query('DROP TABLE test.kafka') + kafka_delete_topic(admin_client, topic_name) + def test_kafka_consumer_hang2(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) - topic_list = [] - topic_list.append(NewTopic(name="consumer_hang2", num_partitions=1, replication_factor=1)) - admin_client.create_topics(new_topics=topic_list, validate_only=False) + topic_name = "consumer_hang2" + kafka_create_topic(admin_client, topic_name) instance.query(''' DROP TABLE IF EXISTS test.kafka; @@ -864,6 +915,7 @@ def test_kafka_consumer_hang2(kafka_cluster): # from a user perspective: we expect no hanging 'drop' queries # 'dr'||'op' to avoid self matching assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0 + kafka_delete_topic(admin_client, topic_name) def test_kafka_csv_with_delimiter(kafka_cluster): @@ -916,21 +968,21 @@ def test_kafka_tsv_with_delimiter(kafka_cluster): def test_kafka_select_empty(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) - topic_list = [] - topic_list.append(NewTopic(name="empty", num_partitions=1, replication_factor=1)) - admin_client.create_topics(new_topics=topic_list, validate_only=False) + topic_name = "empty" + kafka_create_topic(admin_client, topic_name) - instance.query(''' + instance.query(f''' CREATE TABLE test.kafka (key UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'empty', - kafka_group_name = 'empty', + kafka_topic_list = '{topic_name}', + kafka_group_name = '{topic_name}', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; ''') assert int(instance.query('SELECT count() FROM test.kafka')) == 0 + kafka_delete_topic(admin_client, topic_name) def test_kafka_json_without_delimiter(kafka_cluster): @@ -1153,9 +1205,8 @@ def test_kafka_recreate_kafka_table(kafka_cluster): # admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092") admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) - topic_list = [] - topic_list.append(NewTopic(name="recreate_kafka_table", num_partitions=6, replication_factor=1)) - admin_client.create_topics(new_topics=topic_list, validate_only=False) + topic_name = "recreate_kafka_table" + kafka_create_topic(admin_client, topic_name, num_partitions=6) instance.query(''' DROP TABLE IF EXISTS test.view; @@ -1211,6 +1262,7 @@ def test_kafka_recreate_kafka_table(kafka_cluster): DROP TABLE test.consumer; DROP TABLE test.view; ''') + kafka_delete_topic(admin_client, topic_name) def test_librdkafka_compression(kafka_cluster): @@ -1253,12 +1305,9 @@ def test_librdkafka_compression(kafka_cluster): logging.debug(('Check compression {}'.format(compression_type))) topic_name = 'test_librdkafka_compression_{}'.format(compression_type) - admin_client = admin.AdminClient({'bootstrap.servers': "localhost:{}".format(kafka_cluster.kafka_port)}) + admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) - topic = admin.NewTopic(topic=topic_name, num_partitions=1, replication_factor=1, config={ - 'compression.type': compression_type, - }) - admin_client.create_topics(new_topics=[topic], validate_only=False) + kafka_create_topic(admin_client, topic_name, config={'compression.type': compression_type}) instance.query(''' CREATE TABLE test.kafka (key UInt64, value String) @@ -1281,6 +1330,8 @@ def test_librdkafka_compression(kafka_cluster): instance.query('DROP TABLE test.kafka SYNC') instance.query('DROP TABLE test.consumer SYNC') + kafka_delete_topic(admin_client, topic_name) + def test_kafka_materialized_view_with_subquery(kafka_cluster): instance.query(''' @@ -1652,11 +1703,9 @@ def test_kafka_commit_on_block_write(kafka_cluster): def test_kafka_virtual_columns2(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) - topic_list = [] - topic_list.append(NewTopic(name="virt2_0", num_partitions=2, replication_factor=1)) - topic_list.append(NewTopic(name="virt2_1", num_partitions=2, replication_factor=1)) - admin_client.create_topics(new_topics=topic_list, validate_only=False) + kafka_create_topic(admin_client, "virt2_0", num_partitions=2) + kafka_create_topic(admin_client, "virt2_1", num_partitions=2) instance.query(''' CREATE TABLE test.kafka (value UInt64) @@ -1715,14 +1764,16 @@ def test_kafka_virtual_columns2(kafka_cluster): assert TSV(result) == TSV(expected) + kafka_delete_topic(admin_client, "virt2_0") + kafka_delete_topic(admin_client, "virt2_1") + def test_kafka_produce_key_timestamp(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) - topic_list = [] - topic_list.append(NewTopic(name="insert3", num_partitions=1, replication_factor=1)) - admin_client.create_topics(new_topics=topic_list, validate_only=False) + topic_name = "insert3" + kafka_create_topic(admin_client, topic_name) instance.query(''' DROP TABLE IF EXISTS test.view; @@ -1774,12 +1825,13 @@ def test_kafka_produce_key_timestamp(kafka_cluster): assert TSV(result) == TSV(expected) + kafka_delete_topic(admin_client, topic_name) + def test_kafka_flush_by_time(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) - topic_list = [] - topic_list.append(NewTopic(name="flush_by_time", num_partitions=1, replication_factor=1)) - admin_client.create_topics(new_topics=topic_list, validate_only=False) + topic_name = "flush_by_time" + kafka_create_topic(admin_client, topic_name) instance.query(''' DROP TABLE IF EXISTS test.view; @@ -1833,6 +1885,7 @@ def test_kafka_flush_by_time(kafka_cluster): ''') assert TSV(result) == TSV('1 1') + kafka_delete_topic(admin_client, topic_name) def test_kafka_flush_by_block_size(kafka_cluster): @@ -1892,13 +1945,11 @@ def test_kafka_flush_by_block_size(kafka_cluster): assert int( result) == 100, 'Messages from kafka should be flushed when block of size kafka_max_block_size is formed!' - def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) - topic_list = [] - topic_list.append(NewTopic(name="topic_with_multiple_partitions2", num_partitions=10, replication_factor=1)) - admin_client.create_topics(new_topics=topic_list, validate_only=False) + topic_name = "topic_with_multiple_partitions2" + kafka_create_topic(admin_client, topic_name, num_partitions=10) instance.query(''' DROP TABLE IF EXISTS test.view; @@ -1938,6 +1989,7 @@ def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster): DROP TABLE test.consumer; DROP TABLE test.view; ''') + kafka_delete_topic(admin_client, topic_name) def test_kafka_rebalance(kafka_cluster): @@ -1964,9 +2016,8 @@ def test_kafka_rebalance(kafka_cluster): # time.sleep(2) admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) - topic_list = [] - topic_list.append(NewTopic(name="topic_with_multiple_partitions", num_partitions=11, replication_factor=1)) - admin_client.create_topics(new_topics=topic_list, validate_only=False) + topic_name = "topic_with_multiple_partitions" + kafka_create_topic(admin_client, topic_name, num_partitions=11) cancel = threading.Event() @@ -2070,6 +2121,7 @@ def test_kafka_rebalance(kafka_cluster): kafka_thread.join() assert result == 1, 'Messages from kafka get duplicated!' + kafka_delete_topic(admin_client, topic_name) def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster): @@ -2645,6 +2697,7 @@ def test_kafka_engine_put_errors_to_stream_with_random_malformed_json(kafka_clus def test_kafka_formats_with_broken_message(kafka_cluster): # data was dumped from clickhouse itself in a following manner # clickhouse-client --format=Native --query='SELECT toInt64(number) as id, toUInt16( intDiv( id, 65536 ) ) as blockNo, reinterpretAsString(19777) as val1, toFloat32(0.5) as val2, toUInt8(1) as val3 from numbers(100) ORDER BY id' | xxd -ps | tr -d '\n' | sed 's/\(..\)/\\x\1/g' + admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) all_formats = { ## Text formats ## @@ -2814,8 +2867,8 @@ def test_kafka_formats_with_broken_message(kafka_cluster): topic_name_prefix = 'format_tests_4_stream_' for format_name, format_opts in list(all_formats.items()): - print(('Set up {}'.format(format_name))) - topic_name = topic_name_prefix + '{}'.format(format_name) + logging.debug(f'Set up {format_name}') + topic_name = f"{topic_name_prefix}{format_name}" data_sample = format_opts['data_sample'] data_prefix = [] raw_message = '_raw_message' @@ -2855,8 +2908,8 @@ def test_kafka_formats_with_broken_message(kafka_cluster): extra_settings=format_opts.get('extra_settings') or '')) for format_name, format_opts in list(all_formats.items()): - print(('Checking {}'.format(format_name))) - topic_name = topic_name_prefix + '{}'.format(format_name) + logging.debug('Checking {format_name}') + topic_name = f"{topic_name_prefix}{format_name}" # shift offsets by 1 if format supports empty value offsets = [1, 2, 3] if format_opts.get('supports_empty_value', False) else [0, 1, 2] result = instance.query('SELECT * FROM test.kafka_data_{format_name}_mv;'.format(format_name=format_name)) @@ -2886,6 +2939,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster): # print(errors_result.strip()) # print(errors_expected.strip()) assert errors_result.strip() == errors_expected.strip(), 'Proper errors for format: {}'.format(format_name) + kafka_delete_topic(admin_client, topic_name) def wait_for_new_data(table_name, prev_count = 0, max_retries = 120): retries = 0 @@ -2901,14 +2955,12 @@ def wait_for_new_data(table_name, prev_count = 0, max_retries = 120): raise Exception("No new data :(") def test_kafka_consumer_failover(kafka_cluster): - # for backporting: # admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092") admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) - topic_list = [] - topic_list.append(NewTopic(name="kafka_consumer_failover", num_partitions=2, replication_factor=1)) - admin_client.create_topics(new_topics=topic_list, validate_only=False) + topic_name = "kafka_consumer_failover" + kafka_create_topic(admin_client, topic_name, num_partitions=2) instance.query(''' DROP TABLE IF EXISTS test.kafka; @@ -3019,6 +3071,7 @@ def test_kafka_consumer_failover(kafka_cluster): producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':8,'value': 8}), partition=1) producer.flush() prev_count = wait_for_new_data('test.destination', prev_count) + kafka_delete_topic(admin_client, topic_name) if __name__ == '__main__': diff --git a/tests/integration/test_storage_mongodb/test.py b/tests/integration/test_storage_mongodb/test.py index 415f1c1cb33..aeb8419b734 100644 --- a/tests/integration/test_storage_mongodb/test.py +++ b/tests/integration/test_storage_mongodb/test.py @@ -46,6 +46,8 @@ def test_simple_select(started_cluster): assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + '\n' assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n' + node.query("DROP TABLE simple_mongo_table") + simple_mongo_table.drop() @pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster']) @@ -67,6 +69,8 @@ def test_complex_data_type(started_cluster): assert node.query("SELECT sum(key) FROM incomplete_mongo_table") == str(sum(range(0, 100))) + '\n' assert node.query("SELECT data from incomplete_mongo_table where key = 42") == hex(42 * 42) + '\n' + node.query("DROP TABLE incomplete_mongo_table") + incomplete_mongo_table.drop() @pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster']) @@ -95,7 +99,9 @@ def test_incorrect_data_type(started_cluster): with pytest.raises(QueryRuntimeException): node.query("SELECT bbbb FROM strange_mongo_table2") - + node.query("DROP TABLE strange_mongo_table") + node.query("DROP TABLE strange_mongo_table2") + strange_mongo_table.drop() @pytest.mark.parametrize('started_cluster', [True], indirect=['started_cluster']) def test_secure_connection(started_cluster): @@ -116,3 +122,5 @@ def test_secure_connection(started_cluster): assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + '\n' assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n' + node.query("DROP TABLE simple_mongo_table") + simple_mongo_table.drop()