This commit is contained in:
János Benjamin Antal 2024-09-19 01:53:56 +02:00 committed by GitHub
commit 54238110a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 5893 additions and 5700 deletions

View File

@ -4,3 +4,6 @@
./runner ' --setup-plan' | grep '::' | sed 's/ (fixtures used:.*//g' | sed 's/^ *//g' | sed 's/ *$//g' | sort -u > all_tests.txt
# 2. Filter known tests that are currently not run in parallel
cat all_tests.txt | grep '^test_replicated_database\|^test_disabled_mysql_server\|^test_distributed_ddl\|^test_distributed_ddl\|^test_quorum_inserts_parallel\|^test_ddl_worker_non_leader\|^test_consistent_parts_after_clone_replica\|^test_materialized_mysql_database\|^test_atomic_drop_table\|^test_distributed_respect_user_timeouts\|^test_storage_kafka\|^test_replace_partition\|^test_replicated_fetches_timeouts\|^test_system_clusters_actual_information\|^test_delayed_replica_failover\|^test_limited_replicated_fetches\|^test_hedged_requests\|^test_insert_into_distributed\|^test_insert_into_distributed_through_materialized_view\|^test_drop_replica\|^test_attach_without_fetching\|^test_system_replicated_fetches\|^test_cross_replication\|^test_dictionary_allow_read_expired_keys\|^test_dictionary_allow_read_expired_keys\|^test_dictionary_allow_read_expired_keys\|^test_insert_into_distributed_sync_async\|^test_hedged_requests_parallel\|^test_dictionaries_update_field\|^test_broken_part_during_merge\|^test_random_inserts\|^test_reload_clusters_config\|^test_parts_delete_zookeeper\|^test_polymorphic_parts\|^test_keeper_multinode_simple\|^test_https_replication\|^test_storage_kerberized_kafka\|^test_cleanup_dir_after_bad_zk_conn\|^test_system_metrics\|^test_keeper_multinode_blocade_leader' | awk '{$1=$1;print}' | jq -R -n '[inputs] | .' > parallel_skip.json
# To easily add all tests from a directory the following command can be used to list all tests in a directory, e.g in test_storage_kafka:
pytest test_storage_kafka --collect-only -q

View File

@ -99,74 +99,6 @@
"test_ttl_move/test.py::TestCancelBackgroundMoving::test_cancel_background_moving_on_table_detach",
"test_ttl_move/test.py::TestCancelBackgroundMoving::test_cancel_background_moving_on_zookeeper_disconnect",
"test_storage_kafka/test.py::test_kafka_column_types",
"test_storage_kafka/test.py::test_kafka_settings_old_syntax",
"test_storage_kafka/test.py::test_kafka_settings_new_syntax",
"test_storage_kafka/test.py::test_kafka_settings_predefined_macros",
"test_storage_kafka/test.py::test_kafka_json_as_string",
"test_storage_kafka/test.py::test_kafka_formats",
"test_storage_kafka/test.py::test_kafka_issue11308",
"test_storage_kafka/test.py::test_kafka_issue4116",
"test_storage_kafka/test.py::test_kafka_consumer_hang",
"test_storage_kafka/test.py::test_kafka_consumer_hang2",
"test_storage_kafka/test.py::test_kafka_read_consumers_in_parallel",
"test_storage_kafka/test.py::test_kafka_csv_with_delimiter",
"test_storage_kafka/test.py::test_kafka_tsv_with_delimiter",
"test_storage_kafka/test.py::test_kafka_select_empty",
"test_storage_kafka/test.py::test_kafka_json_without_delimiter",
"test_storage_kafka/test.py::test_kafka_protobuf",
"test_storage_kafka/test.py::test_kafka_string_field_on_first_position_in_protobuf",
"test_storage_kafka/test.py::test_kafka_protobuf_no_delimiter",
"test_storage_kafka/test.py::test_kafka_materialized_view",
"test_storage_kafka/test.py::test_kafka_recreate_kafka_table",
"test_storage_kafka/test.py::test_librdkafka_compression",
"test_storage_kafka/test.py::test_kafka_materialized_view_with_subquery",
"test_storage_kafka/test.py::test_kafka_many_materialized_views",
"test_storage_kafka/test.py::test_kafka_flush_on_big_message",
"test_storage_kafka/test.py::test_kafka_virtual_columns",
"test_storage_kafka/test.py::test_kafka_virtual_columns_with_materialized_view",
"test_storage_kafka/test.py::test_kafka_insert",
"test_storage_kafka/test.py::test_kafka_produce_consume",
"test_storage_kafka/test.py::test_kafka_commit_on_block_write",
"test_storage_kafka/test.py::test_kafka_virtual_columns2",
"test_storage_kafka/test.py::test_kafka_producer_consumer_separate_settings",
"test_storage_kafka/test.py::test_kafka_produce_key_timestamp",
"test_storage_kafka/test.py::test_kafka_insert_avro",
"test_storage_kafka/test.py::test_kafka_produce_consume_avro",
"test_storage_kafka/test.py::test_kafka_flush_by_time",
"test_storage_kafka/test.py::test_kafka_flush_by_block_size",
"test_storage_kafka/test.py::test_kafka_lot_of_partitions_partial_commit_of_bulk",
"test_storage_kafka/test.py::test_kafka_rebalance",
"test_storage_kafka/test.py::test_kafka_no_holes_when_write_suffix_failed",
"test_storage_kafka/test.py::test_exception_from_destructor",
"test_storage_kafka/test.py::test_commits_of_unprocessed_messages_on_drop",
"test_storage_kafka/test.py::test_bad_reschedule",
"test_storage_kafka/test.py::test_kafka_duplicates_when_commit_failed",
"test_storage_kafka/test.py::test_premature_flush_on_eof",
"test_storage_kafka/test.py::test_kafka_unavailable",
"test_storage_kafka/test.py::test_kafka_issue14202",
"test_storage_kafka/test.py::test_kafka_csv_with_thread_per_consumer",
"test_storage_kafka/test.py::test_kafka_engine_put_errors_to_stream",
"test_storage_kafka/test.py::test_kafka_engine_put_errors_to_stream_with_random_malformed_json",
"test_storage_kafka/test.py::test_kafka_formats_with_broken_message",
"test_storage_kafka/test.py::test_kafka_consumer_failover",
"test_storage_kafka/test.py::test_kafka_predefined_configuration",
"test_storage_kafka/test.py::test_issue26643",
"test_storage_kafka/test.py::test_num_consumers_limit",
"test_storage_kafka/test.py::test_format_with_prefix_and_suffix",
"test_storage_kafka/test.py::test_max_rows_per_message",
"test_storage_kafka/test.py::test_row_based_formats",
"test_storage_kafka/test.py::test_block_based_formats_1",
"test_storage_kafka/test.py::test_block_based_formats_2",
"test_storage_kafka/test.py::test_system_kafka_consumers",
"test_storage_kafka/test.py::test_system_kafka_consumers_rebalance",
"test_storage_kafka/test.py::test_system_kafka_consumers_rebalance_mv",
"test_storage_kafka/test.py::test_formats_errors",
"test_storage_kafka/test.py::test_multiple_read_in_materialized_views",
"test_storage_kafka/test.py::test_kafka_null_message",
"test_storage_kafka/test_produce_http_interface.py::test_kafka_produce_http_interface_row_based_format",
"test_storage_kerberized_kafka/test.py::test_kafka_json_as_string",
"test_storage_kerberized_kafka/test.py::test_kafka_json_as_string_request_new_ticket_after_expiration",
"test_storage_kerberized_kafka/test.py::test_kafka_json_as_string_no_kdc",

View File

@ -0,0 +1,93 @@
from typing import Iterator
import pytest
import logging
import time
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from test_storage_kafka.kafka_tests_utils import get_admin_client
pytestmark = pytest.mark.skip
KAFKA_TOPIC_OLD = "old_t"
KAFKA_CONSUMER_GROUP_OLD = "old_cg"
KAFKA_TOPIC_NEW = "new_t"
KAFKA_CONSUMER_GROUP_NEW = "new_cg"
# We have to initialize these lazily, otherwise WORKER_FREE_PORTS is not populated making the PortPoolManager fail.
# It would be nice to initialize these eagerly, but the fixtures has to be in conftest.py to apply them automatically,
# so they "cannot be forgotten". We could put these variables and the gist of the fixture implementation into a separate
# file, but I think splitting these has more disadvantages than advantages of eager initialization.
conftest_cluster = None
conftest_instance = None
def init_cluster_and_instance():
global conftest_cluster
global conftest_instance
if conftest_cluster is not None:
return
conftest_cluster = ClickHouseCluster(__file__)
conftest_instance = conftest_cluster.add_instance(
"instance",
main_configs=["configs/kafka.xml", "configs/named_collection.xml"],
user_configs=["configs/users.xml"],
with_kafka=True,
with_zookeeper=True, # For Replicated Table
macros={
"kafka_broker": "kafka1",
"kafka_topic_old": KAFKA_TOPIC_OLD,
"kafka_group_name_old": KAFKA_CONSUMER_GROUP_OLD,
"kafka_topic_new": KAFKA_TOPIC_NEW,
"kafka_group_name_new": KAFKA_CONSUMER_GROUP_NEW,
"kafka_client_id": "instance",
"kafka_format_json_each_row": "JSONEachRow",
},
clickhouse_path_dir="clickhouse_path",
)
@pytest.fixture(scope="module")
def kafka_cluster() -> "Iterator[ClickHouseCluster]":
try:
init_cluster_and_instance()
conftest_cluster.start()
kafka_id = conftest_cluster.kafka_docker_id
logging.info(f"kafka_id is {kafka_id}")
yield conftest_cluster
finally:
conftest_cluster.shutdown()
# kafka_cluster is requested here to ensure the cluster is initialized before we yield the instance
@pytest.fixture()
def instance(kafka_cluster) -> "Iterator[ClickHouseInstance]":
yield conftest_instance
@pytest.fixture(autouse=True)
def kafka_setup_teardown(kafka_cluster, instance):
instance.query("DROP DATABASE IF EXISTS test SYNC; CREATE DATABASE test;")
admin_client = get_admin_client(kafka_cluster)
def get_topics_to_delete():
return [t for t in admin_client.list_topics() if not t.startswith("_")]
topics = get_topics_to_delete()
logging.debug(f"Deleting topics: {topics}")
result = admin_client.delete_topics(topics)
for topic, error in result.topic_error_codes:
if error != 0:
logging.warning(f"Received error {error} while deleting topic {topic}")
else:
logging.info(f"Deleted topic {topic}")
retries = 0
topics = get_topics_to_delete()
while len(topics) != 0:
logging.info(f"Existing topics: {topics}")
if retries >= 5:
raise Exception(f"Failed to delete topics {topics}")
retries += 1
time.sleep(0.5)
yield # run test

View File

@ -0,0 +1,444 @@
import io
import logging
import os
import socket
import time
from contextlib import contextmanager
import avro.datafile
import avro.io
import avro.schema
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from google.protobuf.internal.encoder import _VarintBytes
from helpers.client import QueryRuntimeException
from helpers.test_tools import TSV
from kafka import BrokerConnection, KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic
from kafka.protocol.admin import DescribeGroupsRequest_v1
from kafka.protocol.group import MemberAssignment
# protoc --version
# libprotoc 3.0.0
# # to create kafka_pb2.py
# protoc --python_out=. kafka.proto
from test_storage_kafka import kafka_pb2, social_pb2
@contextmanager
def kafka_topic(
admin_client,
topic_name,
num_partitions=1,
replication_factor=1,
max_retries=50,
config=None,
):
kafka_create_topic(
admin_client,
topic_name,
num_partitions,
replication_factor,
max_retries,
config,
)
try:
yield None
finally:
# Code to release resource, e.g.:
kafka_delete_topic(admin_client, topic_name, max_retries)
@contextmanager
def existing_kafka_topic(admin_client, topic_name, max_retries=50):
try:
yield None
finally:
kafka_delete_topic(admin_client, topic_name, max_retries)
def kafka_create_topic(
admin_client,
topic_name,
num_partitions=1,
replication_factor=1,
max_retries=50,
config=None,
):
logging.debug(
f"Kafka create topic={topic_name}, num_partitions={num_partitions}, replication_factor={replication_factor}"
)
topics_list = [
NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor,
topic_configs=config,
)
]
retries = 0
while True:
try:
admin_client.create_topics(new_topics=topics_list, validate_only=False)
logging.debug("Admin client succeed")
return
except Exception as e:
retries += 1
time.sleep(0.5)
if retries < max_retries:
logging.warning(f"Failed to create topic {e}")
else:
raise
def kafka_delete_topic(admin_client, topic, max_retries=50):
result = admin_client.delete_topics([topic])
for topic, e in result.topic_error_codes:
if e == 0:
logging.debug(f"Topic {topic} deleted")
else:
logging.error(f"Failed to delete topic {topic}: {e}")
retries = 0
while True:
topics_listed = admin_client.list_topics()
logging.debug(f"TOPICS LISTED: {topics_listed}")
if topic not in topics_listed:
return
else:
retries += 1
time.sleep(0.5)
if retries > max_retries:
raise Exception(f"Failed to delete topics {topic}, {result}")
def get_kafka_producer(port, serializer, retries):
errors = []
for _ in range(retries):
try:
producer = KafkaProducer(
bootstrap_servers="localhost:{}".format(port),
value_serializer=serializer,
)
logging.debug("Kafka Connection establised: localhost:{}".format(port))
return producer
except Exception as e:
errors += [str(e)]
time.sleep(1)
raise Exception("Connection not establised, {}".format(errors))
def producer_serializer(x):
return x.encode() if isinstance(x, str) else x
def get_admin_client(kafka_cluster):
return KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15):
logging.debug(
"kafka_produce server:{}:{} topic:{}".format(
"localhost", kafka_cluster.kafka_port, topic
)
)
producer = get_kafka_producer(
kafka_cluster.kafka_port, producer_serializer, retries
)
for message in messages:
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
producer.flush()
def kafka_consume(kafka_cluster, topic, need_decode=True, timestamp=0):
consumer = KafkaConsumer(
bootstrap_servers=f"localhost:{kafka_cluster.kafka_port}",
auto_offset_reset="earliest",
session_timeout_ms=1000,
)
consumer.subscribe(topics=(topic))
for toppar, messages in list(consumer.poll(5000).items()):
if toppar.topic == topic:
for message in messages:
assert timestamp == 0 or message.timestamp / 1000 == timestamp
if need_decode:
yield message.value.decode()
else:
yield message.value
consumer.unsubscribe()
consumer.close()
def kafka_produce_protobuf_messages(kafka_cluster, topic, start_index, num_messages):
data = b""
for i in range(start_index, start_index + num_messages):
msg = kafka_pb2.KeyValuePair()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
producer = KafkaProducer(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port),
value_serializer=producer_serializer,
)
producer.send(topic=topic, value=data)
producer.flush()
logging.debug(("Produced {} messages for topic {}".format(num_messages, topic)))
def kafka_consume_with_retry(
kafka_cluster,
topic,
expected_messages,
need_decode=True,
timestamp=0,
retry_count=20,
sleep_time=0.1,
):
messages = []
try_count = 0
while try_count < retry_count:
try_count += 1
messages.extend(
kafka_consume(
kafka_cluster, topic, need_decode=need_decode, timestamp=timestamp
)
)
if len(messages) == expected_messages:
break
time.sleep(sleep_time)
if len(messages) != expected_messages:
raise Exception(f"Got only {len(messages)} messages")
return messages
def kafka_produce_protobuf_messages_no_delimiters(
kafka_cluster, topic, start_index, num_messages
):
producer = KafkaProducer(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
for i in range(start_index, start_index + num_messages):
msg = kafka_pb2.KeyValuePair()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
producer.send(topic=topic, value=serialized_msg)
producer.flush()
logging.debug("Produced {} messages for topic {}".format(num_messages, topic))
def kafka_produce_protobuf_social(kafka_cluster, topic, start_index, num_messages):
data = b""
for i in range(start_index, start_index + num_messages):
msg = social_pb2.User()
msg.username = "John Doe {}".format(i)
msg.timestamp = 1000000 + i
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
producer = KafkaProducer(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port),
value_serializer=producer_serializer,
)
producer.send(topic=topic, value=data)
producer.flush()
logging.debug(("Produced {} messages for topic {}".format(num_messages, topic)))
def avro_message(value):
schema = avro.schema.make_avsc_object(
{
"name": "row",
"type": "record",
"fields": [
{"name": "id", "type": "long"},
{"name": "blockNo", "type": "int"},
{"name": "val1", "type": "string"},
{"name": "val2", "type": "float"},
{"name": "val3", "type": "int"},
],
}
)
bytes_writer = io.BytesIO()
# DataFileWrite seems to be mandatory to get schema encoded
writer = avro.datafile.DataFileWriter(bytes_writer, avro.io.DatumWriter(), schema)
if isinstance(value, list):
for v in value:
writer.append(v)
else:
writer.append(value)
writer.flush()
raw_bytes = bytes_writer.getvalue()
writer.close()
bytes_writer.close()
return raw_bytes
def avro_confluent_message(schema_registry_client, value):
serializer = MessageSerializer(schema_registry_client)
schema = avro.schema.make_avsc_object(
{
"name": "row",
"type": "record",
"fields": [
{"name": "id", "type": "long"},
{"name": "blockNo", "type": "int"},
{"name": "val1", "type": "string"},
{"name": "val2", "type": "float"},
{"name": "val3", "type": "int"},
],
}
)
return serializer.encode_record_with_schema("test_subject", schema, value)
def create_settings_string(settings):
if settings is None:
return ""
def format_value(value):
if isinstance(value, str):
return f"'{value}'"
elif isinstance(value, bool):
return str(int(value))
return str(value)
settings_string = "SETTINGS "
keys = settings.keys()
first_key = next(iter(settings))
settings_string += str(first_key) + " = " + format_value(settings[first_key])
for key in keys:
if key == first_key:
continue
settings_string += ", " + str(key) + " = " + format_value(settings[key])
return settings_string
def generate_old_create_table_query(
table_name,
columns_def,
database="test",
brokers="{kafka_broker}:19092",
topic_list="{kafka_topic_new}",
consumer_group="{kafka_group_name_new}",
format="{kafka_format_json_each_row}",
row_delimiter="\\n",
keeper_path=None, # it is not used, but it is easier to handle keeper_path and replica_name like this
replica_name=None,
settings=None,
):
settings_string = create_settings_string(settings)
query = f"""CREATE TABLE {database}.{table_name} ({columns_def}) ENGINE = Kafka('{brokers}', '{topic_list}', '{consumer_group}', '{format}', '{row_delimiter}')
{settings_string}"""
logging.debug(f"Generated old create query: {query}")
return query
def generate_new_create_table_query(
table_name,
columns_def,
database="test",
brokers="{kafka_broker}:19092",
topic_list="{kafka_topic_new}",
consumer_group="{kafka_group_name_new}",
format="{kafka_format_json_each_row}",
row_delimiter="\\n",
keeper_path=None,
replica_name=None,
settings=None,
):
if settings is None:
settings = {}
if keeper_path is None:
keeper_path = f"/clickhouse/{{database}}/{table_name}"
if replica_name is None:
replica_name = "r1"
settings["kafka_keeper_path"] = keeper_path
settings["kafka_replica_name"] = replica_name
settings_string = create_settings_string(settings)
query = f"""CREATE TABLE {database}.{table_name} ({columns_def}) ENGINE = Kafka('{brokers}', '{topic_list}', '{consumer_group}', '{format}', '{row_delimiter}')
{settings_string}
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1"""
logging.debug(f"Generated new create query: {query}")
return query
def must_use_thread_per_consumer(generator):
if generator == generate_old_create_table_query:
return False
if generator == generate_new_create_table_query:
return True
raise Exception("Unexpected generator")
def get_topic_postfix(generator):
if generator == generate_old_create_table_query:
return "old"
if generator == generate_new_create_table_query:
return "new"
raise Exception("Unexpected generator")
# Since everything is async and shaky when receiving messages from Kafka,
# we may want to try and check results multiple times in a loop.
def kafka_check_result(result, check=False, ref_file="test_kafka_json.reference"):
fpath = os.path.join(os.path.dirname(__file__), ref_file)
with open(fpath) as reference:
if check:
assert TSV(result) == TSV(reference)
else:
return TSV(result) == TSV(reference)
# https://stackoverflow.com/a/57692111/1555175
def describe_consumer_group(kafka_cluster, name):
client = BrokerConnection("localhost", kafka_cluster.kafka_port, socket.AF_INET)
client.connect_blocking()
list_members_in_groups = DescribeGroupsRequest_v1(groups=[name])
future = client.send(list_members_in_groups)
while not future.is_done:
for resp, f in client.recv():
f.success(resp)
(
error_code,
group_id,
state,
protocol_type,
protocol,
members,
) = future.value.groups[0]
res = []
for member in members:
(member_id, client_id, client_host, member_metadata, member_assignment) = member
member_info = {}
member_info["member_id"] = member_id
member_info["client_id"] = client_id
member_info["client_host"] = client_host
member_topics_assignment = []
for topic, partitions in MemberAssignment.decode(member_assignment).assignment:
member_topics_assignment.append({"topic": topic, "partitions": partitions})
member_info["assignment"] = member_topics_assignment
res.append(member_info)
return res
def insert_with_retry(instance, values, table_name="kafka", max_try_count=5):
try_count = 0
while True:
logging.debug(f"Inserting, try_count is {try_count}")
try:
try_count += 1
instance.query(f"INSERT INTO test.{table_name} VALUES {values}")
break
except QueryRuntimeException as e:
if "Local: Timed out." in str(e) and try_count < max_try_count:
continue
else:
raise

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@ -1,109 +1,21 @@
import time
import logging
import pytest
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from helpers.cluster import is_arm
from helpers.test_tools import TSV
from kafka.admin import KafkaAdminClient
from test_storage_kafka.kafka_tests_utils import (
kafka_create_topic,
kafka_delete_topic,
)
from test_storage_kafka.conftest import conftest_cluster, init_cluster_and_instance
# Skip all tests on ARM
if is_arm():
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",
main_configs=["configs/kafka.xml", "configs/named_collection.xml"],
user_configs=["configs/users.xml"],
with_kafka=True,
with_zookeeper=True, # For Replicated Table
macros={
"kafka_broker": "kafka1",
"kafka_topic_old": "old",
"kafka_group_name_old": "old",
"kafka_topic_new": "new",
"kafka_group_name_new": "new",
"kafka_client_id": "instance",
"kafka_format_json_each_row": "JSONEachRow",
},
clickhouse_path_dir="clickhouse_path",
)
@pytest.fixture(scope="module")
def kafka_cluster():
try:
cluster.start()
kafka_id = instance.cluster.kafka_docker_id
print(("kafka_id is {}".format(kafka_id)))
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def kafka_setup_teardown():
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
# logging.debug("kafka is available - running test")
yield # run test
def kafka_create_topic(
admin_client,
topic_name,
num_partitions=1,
replication_factor=1,
max_retries=50,
config=None,
):
logging.debug(
f"Kafka create topic={topic_name}, num_partitions={num_partitions}, replication_factor={replication_factor}"
)
topics_list = [
NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor,
topic_configs=config,
)
]
retries = 0
while True:
try:
admin_client.create_topics(new_topics=topics_list, validate_only=False)
logging.debug("Admin client succeed")
return
except Exception as e:
retries += 1
time.sleep(0.5)
if retries < max_retries:
logging.warning(f"Failed to create topic {e}")
else:
raise
def kafka_delete_topic(admin_client, topic, max_retries=50):
result = admin_client.delete_topics([topic])
for topic, e in result.topic_error_codes:
if e == 0:
logging.debug(f"Topic {topic} deleted")
else:
logging.error(f"Failed to delete topic {topic}: {e}")
retries = 0
while True:
topics_listed = admin_client.list_topics()
logging.debug(f"TOPICS LISTED: {topics_listed}")
if topic not in topics_listed:
return
else:
retries += 1
time.sleep(0.5)
if retries > max_retries:
raise Exception(f"Failed to delete topics {topic}, {result}")
def test_kafka_produce_http_interface_row_based_format(kafka_cluster):
def test_kafka_produce_http_interface_row_based_format(kafka_cluster, instance):
# reproduction of #61060 with validating the written messages
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
@ -182,7 +94,7 @@ def test_kafka_produce_http_interface_row_based_format(kafka_cluster):
"MsgPack",
]
for format in formats_to_test:
logging.debug(f"Creating tables and writing messages to {format}")
logging.debug(f"Creating tables for {format}")
topic = topic_prefix + format
kafka_create_topic(admin_client, topic)
@ -212,6 +124,10 @@ def test_kafka_produce_http_interface_row_based_format(kafka_cluster):
SELECT key, value FROM test.kafka_{topic};
"""
)
for format in formats_to_test:
logging.debug(f"Inserting data to {format}")
topic = topic_prefix + format
instance.http_query(
insert_query_template.format(table_name="test.kafka_writer_" + topic),
method="POST",
@ -238,6 +154,7 @@ def test_kafka_produce_http_interface_row_based_format(kafka_cluster):
if __name__ == "__main__":
cluster.start()
init_cluster_and_instance()
conftest_cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()
conftest_cluster.shutdown()