mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge 4b96ca960e
into 7fd2207626
This commit is contained in:
commit
b96fc82af9
@ -4,3 +4,6 @@
|
||||
./runner ' --setup-plan' | grep '::' | sed 's/ (fixtures used:.*//g' | sed 's/^ *//g' | sed 's/ *$//g' | sort -u > all_tests.txt
|
||||
# 2. Filter known tests that are currently not run in parallel
|
||||
cat all_tests.txt | grep '^test_replicated_database\|^test_disabled_mysql_server\|^test_distributed_ddl\|^test_distributed_ddl\|^test_quorum_inserts_parallel\|^test_ddl_worker_non_leader\|^test_consistent_parts_after_clone_replica\|^test_materialized_mysql_database\|^test_atomic_drop_table\|^test_distributed_respect_user_timeouts\|^test_storage_kafka\|^test_replace_partition\|^test_replicated_fetches_timeouts\|^test_system_clusters_actual_information\|^test_delayed_replica_failover\|^test_limited_replicated_fetches\|^test_hedged_requests\|^test_insert_into_distributed\|^test_insert_into_distributed_through_materialized_view\|^test_drop_replica\|^test_attach_without_fetching\|^test_system_replicated_fetches\|^test_cross_replication\|^test_dictionary_allow_read_expired_keys\|^test_dictionary_allow_read_expired_keys\|^test_dictionary_allow_read_expired_keys\|^test_insert_into_distributed_sync_async\|^test_hedged_requests_parallel\|^test_dictionaries_update_field\|^test_broken_part_during_merge\|^test_random_inserts\|^test_reload_clusters_config\|^test_parts_delete_zookeeper\|^test_polymorphic_parts\|^test_keeper_multinode_simple\|^test_https_replication\|^test_storage_kerberized_kafka\|^test_cleanup_dir_after_bad_zk_conn\|^test_system_metrics\|^test_keeper_multinode_blocade_leader' | awk '{$1=$1;print}' | jq -R -n '[inputs] | .' > parallel_skip.json
|
||||
|
||||
# To easily add all tests from a directory the following command can be used to list all tests in a directory, e.g in test_storage_kafka:
|
||||
pytest test_storage_kafka --collect-only -q
|
||||
|
@ -99,71 +99,71 @@
|
||||
"test_ttl_move/test.py::TestCancelBackgroundMoving::test_cancel_background_moving_on_table_detach",
|
||||
"test_ttl_move/test.py::TestCancelBackgroundMoving::test_cancel_background_moving_on_zookeeper_disconnect",
|
||||
|
||||
"test_storage_kafka/test.py::test_kafka_column_types",
|
||||
"test_storage_kafka/test.py::test_kafka_settings_old_syntax",
|
||||
"test_storage_kafka/test.py::test_kafka_settings_new_syntax",
|
||||
"test_storage_kafka/test.py::test_kafka_settings_predefined_macros",
|
||||
"test_storage_kafka/test.py::test_kafka_json_as_string",
|
||||
"test_storage_kafka/test.py::test_kafka_formats",
|
||||
"test_storage_kafka/test.py::test_kafka_issue11308",
|
||||
"test_storage_kafka/test.py::test_kafka_issue4116",
|
||||
"test_storage_kafka/test.py::test_kafka_consumer_hang",
|
||||
"test_storage_kafka/test.py::test_kafka_consumer_hang2",
|
||||
"test_storage_kafka/test.py::test_kafka_read_consumers_in_parallel",
|
||||
"test_storage_kafka/test.py::test_kafka_csv_with_delimiter",
|
||||
"test_storage_kafka/test.py::test_kafka_tsv_with_delimiter",
|
||||
"test_storage_kafka/test.py::test_kafka_select_empty",
|
||||
"test_storage_kafka/test.py::test_kafka_json_without_delimiter",
|
||||
"test_storage_kafka/test.py::test_kafka_protobuf",
|
||||
"test_storage_kafka/test.py::test_kafka_string_field_on_first_position_in_protobuf",
|
||||
"test_storage_kafka/test.py::test_kafka_protobuf_no_delimiter",
|
||||
"test_storage_kafka/test.py::test_kafka_materialized_view",
|
||||
"test_storage_kafka/test.py::test_kafka_recreate_kafka_table",
|
||||
"test_storage_kafka/test.py::test_librdkafka_compression",
|
||||
"test_storage_kafka/test.py::test_kafka_materialized_view_with_subquery",
|
||||
"test_storage_kafka/test.py::test_kafka_many_materialized_views",
|
||||
"test_storage_kafka/test.py::test_kafka_flush_on_big_message",
|
||||
"test_storage_kafka/test.py::test_kafka_virtual_columns",
|
||||
"test_storage_kafka/test.py::test_kafka_virtual_columns_with_materialized_view",
|
||||
"test_storage_kafka/test.py::test_kafka_insert",
|
||||
"test_storage_kafka/test.py::test_kafka_produce_consume",
|
||||
"test_storage_kafka/test.py::test_kafka_commit_on_block_write",
|
||||
"test_storage_kafka/test.py::test_kafka_virtual_columns2",
|
||||
"test_storage_kafka/test.py::test_kafka_producer_consumer_separate_settings",
|
||||
"test_storage_kafka/test.py::test_kafka_produce_key_timestamp",
|
||||
"test_storage_kafka/test.py::test_kafka_insert_avro",
|
||||
"test_storage_kafka/test.py::test_kafka_produce_consume_avro",
|
||||
"test_storage_kafka/test.py::test_kafka_flush_by_time",
|
||||
"test_storage_kafka/test.py::test_kafka_flush_by_block_size",
|
||||
"test_storage_kafka/test.py::test_kafka_lot_of_partitions_partial_commit_of_bulk",
|
||||
"test_storage_kafka/test.py::test_kafka_rebalance",
|
||||
"test_storage_kafka/test.py::test_kafka_no_holes_when_write_suffix_failed",
|
||||
"test_storage_kafka/test.py::test_exception_from_destructor",
|
||||
"test_storage_kafka/test.py::test_commits_of_unprocessed_messages_on_drop",
|
||||
"test_storage_kafka/test.py::test_bad_reschedule",
|
||||
"test_storage_kafka/test.py::test_kafka_duplicates_when_commit_failed",
|
||||
"test_storage_kafka/test.py::test_premature_flush_on_eof",
|
||||
"test_storage_kafka/test.py::test_kafka_unavailable",
|
||||
"test_storage_kafka/test.py::test_kafka_issue14202",
|
||||
"test_storage_kafka/test.py::test_kafka_csv_with_thread_per_consumer",
|
||||
"test_storage_kafka/test.py::test_kafka_engine_put_errors_to_stream",
|
||||
"test_storage_kafka/test.py::test_kafka_engine_put_errors_to_stream_with_random_malformed_json",
|
||||
"test_storage_kafka/test.py::test_kafka_formats_with_broken_message",
|
||||
"test_storage_kafka/test.py::test_kafka_consumer_failover",
|
||||
"test_storage_kafka/test.py::test_kafka_predefined_configuration",
|
||||
"test_storage_kafka/test.py::test_issue26643",
|
||||
"test_storage_kafka/test.py::test_num_consumers_limit",
|
||||
"test_storage_kafka/test.py::test_format_with_prefix_and_suffix",
|
||||
"test_storage_kafka/test.py::test_max_rows_per_message",
|
||||
"test_storage_kafka/test.py::test_row_based_formats",
|
||||
"test_storage_kafka/test.py::test_block_based_formats_1",
|
||||
"test_storage_kafka/test.py::test_block_based_formats_2",
|
||||
"test_storage_kafka/test.py::test_system_kafka_consumers",
|
||||
"test_storage_kafka/test.py::test_system_kafka_consumers_rebalance",
|
||||
"test_storage_kafka/test.py::test_system_kafka_consumers_rebalance_mv",
|
||||
"test_storage_kafka/test.py::test_formats_errors",
|
||||
"test_storage_kafka/test.py::test_multiple_read_in_materialized_views",
|
||||
"test_storage_kafka/test.py::test_kafka_null_message",
|
||||
"test_storage_kafka/test_1.py::test_kafka_consumer_failover",
|
||||
"test_storage_kafka/test_1.py::test_row_based_formats",
|
||||
"test_storage_kafka/test_1.py::test_kafka_duplicates_when_commit_failed",
|
||||
"test_storage_kafka/test_1.py::test_commits_of_unprocessed_messages_on_drop",
|
||||
"test_storage_kafka/test_1.py::test_kafka_produce_consume",
|
||||
"test_storage_kafka/test_1.py::test_kafka_materialized_view",
|
||||
"test_storage_kafka/test_1.py::test_kafka_materialized_view_with_subquery",
|
||||
"test_storage_kafka/test_1.py::test_kafka_lot_of_partitions_partial_commit_of_bulk",
|
||||
"test_storage_kafka/test_1.py::test_issue26643",
|
||||
"test_storage_kafka/test_1.py::test_kafka_csv_with_thread_per_consumer",
|
||||
"test_storage_kafka/test_1.py::test_kafka_select_empty",
|
||||
"test_storage_kafka/test_1.py::test_kafka_issue11308",
|
||||
"test_storage_kafka/test_1.py::test_kafka_protobuf",
|
||||
"test_storage_kafka/test_1.py::test_kafka_predefined_configuration",
|
||||
"test_storage_kafka/test_1.py::test_kafka_insert_avro",
|
||||
"test_storage_kafka/test_1.py::test_exception_from_destructor",
|
||||
"test_storage_kafka/test_2.py::test_kafka_unavailable",
|
||||
"test_storage_kafka/test_2.py::test_bad_reschedule",
|
||||
"test_storage_kafka/test_2.py::test_librdkafka_compression",
|
||||
"test_storage_kafka/test_2.py::test_multiple_read_in_materialized_views",
|
||||
"test_storage_kafka/test_2.py::test_kafka_virtual_columns2",
|
||||
"test_storage_kafka/test_2.py::test_kafka_produce_key_timestamp",
|
||||
"test_storage_kafka/test_2.py::test_kafka_engine_put_errors_to_stream_with_random_malformed_json",
|
||||
"test_storage_kafka/test_2.py::test_kafka_produce_consume_avro",
|
||||
"test_storage_kafka/test_2.py::test_kafka_virtual_columns_with_materialized_view",
|
||||
"test_storage_kafka/test_2.py::test_system_kafka_consumers_rebalance_mv",
|
||||
"test_storage_kafka/test_2.py::test_system_kafka_consumers",
|
||||
"test_storage_kafka/test_2.py::test_kafka_insert",
|
||||
"test_storage_kafka/test_2.py::test_kafka_settings_old_syntax",
|
||||
"test_storage_kafka/test_2.py::test_kafka_json_without_delimiter",
|
||||
"test_storage_kafka/test_2.py::test_kafka_string_field_on_first_position_in_protobuf",
|
||||
"test_storage_kafka/test_2.py::test_block_based_formats_1",
|
||||
"test_storage_kafka/test_3.py::test_formats_errors",
|
||||
"test_storage_kafka/test_3.py::test_block_based_formats_2",
|
||||
"test_storage_kafka/test_3.py::test_kafka_flush_on_big_message",
|
||||
"test_storage_kafka/test_3.py::test_kafka_recreate_kafka_table",
|
||||
"test_storage_kafka/test_3.py::test_kafka_commit_on_block_write",
|
||||
"test_storage_kafka/test_3.py::test_kafka_no_holes_when_write_suffix_failed",
|
||||
"test_storage_kafka/test_3.py::test_kafka_flush_by_block_size",
|
||||
"test_storage_kafka/test_3.py::test_max_rows_per_message",
|
||||
"test_storage_kafka/test_3.py::test_kafka_many_materialized_views",
|
||||
"test_storage_kafka/test_3.py::test_kafka_engine_put_errors_to_stream",
|
||||
"test_storage_kafka/test_3.py::test_kafka_producer_consumer_separate_settings",
|
||||
"test_storage_kafka/test_3.py::test_kafka_virtual_columns",
|
||||
"test_storage_kafka/test_3.py::test_kafka_settings_new_syntax",
|
||||
"test_storage_kafka/test_3.py::test_kafka_tsv_with_delimiter",
|
||||
"test_storage_kafka/test_3.py::test_kafka_csv_with_delimiter",
|
||||
"test_storage_kafka/test_3.py::test_format_with_prefix_and_suffix",
|
||||
"test_storage_kafka/test_4.py::test_kafka_formats_with_broken_message",
|
||||
"test_storage_kafka/test_4.py::test_kafka_flush_by_time",
|
||||
"test_storage_kafka/test_4.py::test_system_kafka_consumers_rebalance",
|
||||
"test_storage_kafka/test_4.py::test_kafka_consumer_hang",
|
||||
"test_storage_kafka/test_4.py::test_kafka_null_message",
|
||||
"test_storage_kafka/test_4.py::test_kafka_read_consumers_in_parallel",
|
||||
"test_storage_kafka/test_4.py::test_premature_flush_on_eof",
|
||||
"test_storage_kafka/test_4.py::test_num_consumers_limit",
|
||||
"test_storage_kafka/test_4.py::test_kafka_column_types",
|
||||
"test_storage_kafka/test_4.py::test_kafka_protobuf_no_delimiter",
|
||||
"test_storage_kafka/test_4.py::test_kafka_json_as_string",
|
||||
"test_storage_kafka/test_4.py::test_kafka_settings_predefined_macros",
|
||||
"test_storage_kafka/test_4.py::test_kafka_issue4116",
|
||||
"test_storage_kafka/test_4.py::test_kafka_issue14202",
|
||||
"test_storage_kafka/test_5.py::test_kafka_formats",
|
||||
"test_storage_kafka/test_5.py::test_kafka_rebalance",
|
||||
|
||||
|
||||
"test_storage_kafka/test_produce_http_interface.py::test_kafka_produce_http_interface_row_based_format",
|
||||
|
||||
|
93
tests/integration/test_storage_kafka/conftest.py
Normal file
93
tests/integration/test_storage_kafka/conftest.py
Normal file
@ -0,0 +1,93 @@
|
||||
from typing import Iterator
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
|
||||
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
|
||||
from test_storage_kafka.kafka_tests_utils import get_admin_client
|
||||
|
||||
pytestmark = pytest.mark.skip
|
||||
|
||||
KAFKA_TOPIC_OLD = "old_t"
|
||||
KAFKA_CONSUMER_GROUP_OLD = "old_cg"
|
||||
KAFKA_TOPIC_NEW = "new_t"
|
||||
KAFKA_CONSUMER_GROUP_NEW = "new_cg"
|
||||
|
||||
# We have to initialize these lazily, otherwise WORKER_FREE_PORTS is not populated making the PortPoolManager fail.
|
||||
# It would be nice to initialize these eagerly, but the fixtures has to be in conftest.py to apply them automatically,
|
||||
# so they "cannot be forgotten". We could put these variables and the gist of the fixture implementation into a separate
|
||||
# file, but I think splitting these has more disadvantages than advantages of eager initialization.
|
||||
conftest_cluster = None
|
||||
conftest_instance = None
|
||||
|
||||
|
||||
def init_cluster_and_instance():
|
||||
global conftest_cluster
|
||||
global conftest_instance
|
||||
if conftest_cluster is not None:
|
||||
return
|
||||
|
||||
conftest_cluster = ClickHouseCluster(__file__)
|
||||
conftest_instance = conftest_cluster.add_instance(
|
||||
"instance",
|
||||
main_configs=["configs/kafka.xml", "configs/named_collection.xml"],
|
||||
user_configs=["configs/users.xml"],
|
||||
with_kafka=True,
|
||||
with_zookeeper=True, # For Replicated Table
|
||||
macros={
|
||||
"kafka_broker": "kafka1",
|
||||
"kafka_topic_old": KAFKA_TOPIC_OLD,
|
||||
"kafka_group_name_old": KAFKA_CONSUMER_GROUP_OLD,
|
||||
"kafka_topic_new": KAFKA_TOPIC_NEW,
|
||||
"kafka_group_name_new": KAFKA_CONSUMER_GROUP_NEW,
|
||||
"kafka_client_id": "instance",
|
||||
"kafka_format_json_each_row": "JSONEachRow",
|
||||
},
|
||||
clickhouse_path_dir="clickhouse_path",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def kafka_cluster() -> "Iterator[ClickHouseCluster]":
|
||||
try:
|
||||
init_cluster_and_instance()
|
||||
conftest_cluster.start()
|
||||
kafka_id = conftest_cluster.kafka_docker_id
|
||||
logging.info(f"kafka_id is {kafka_id}")
|
||||
yield conftest_cluster
|
||||
finally:
|
||||
conftest_cluster.shutdown()
|
||||
|
||||
|
||||
# kafka_cluster is requested here to ensure the cluster is initialized before we yield the instance
|
||||
@pytest.fixture()
|
||||
def instance(kafka_cluster) -> "Iterator[ClickHouseInstance]":
|
||||
yield conftest_instance
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def kafka_setup_teardown(kafka_cluster, instance):
|
||||
instance.query("DROP DATABASE IF EXISTS test SYNC; CREATE DATABASE test;")
|
||||
admin_client = get_admin_client(kafka_cluster)
|
||||
|
||||
def get_topics_to_delete():
|
||||
return [t for t in admin_client.list_topics() if not t.startswith("_")]
|
||||
|
||||
topics = get_topics_to_delete()
|
||||
logging.debug(f"Deleting topics: {topics}")
|
||||
result = admin_client.delete_topics(topics)
|
||||
for topic, error in result.topic_error_codes:
|
||||
if error != 0:
|
||||
logging.warning(f"Received error {error} while deleting topic {topic}")
|
||||
else:
|
||||
logging.info(f"Deleted topic {topic}")
|
||||
|
||||
retries = 0
|
||||
topics = get_topics_to_delete()
|
||||
while len(topics) != 0:
|
||||
logging.info(f"Existing topics: {topics}")
|
||||
if retries >= 5:
|
||||
raise Exception(f"Failed to delete topics {topics}")
|
||||
retries += 1
|
||||
time.sleep(0.5)
|
||||
yield # run test
|
444
tests/integration/test_storage_kafka/kafka_tests_utils.py
Normal file
444
tests/integration/test_storage_kafka/kafka_tests_utils.py
Normal file
@ -0,0 +1,444 @@
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
|
||||
import avro.datafile
|
||||
import avro.io
|
||||
import avro.schema
|
||||
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
|
||||
from google.protobuf.internal.encoder import _VarintBytes
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.test_tools import TSV
|
||||
from kafka import BrokerConnection, KafkaAdminClient, KafkaConsumer, KafkaProducer
|
||||
from kafka.admin import NewTopic
|
||||
from kafka.protocol.admin import DescribeGroupsRequest_v1
|
||||
from kafka.protocol.group import MemberAssignment
|
||||
|
||||
# protoc --version
|
||||
# libprotoc 3.0.0
|
||||
# # to create kafka_pb2.py
|
||||
# protoc --python_out=. kafka.proto
|
||||
from test_storage_kafka import kafka_pb2, social_pb2
|
||||
|
||||
|
||||
@contextmanager
|
||||
def kafka_topic(
|
||||
admin_client,
|
||||
topic_name,
|
||||
num_partitions=1,
|
||||
replication_factor=1,
|
||||
max_retries=50,
|
||||
config=None,
|
||||
):
|
||||
kafka_create_topic(
|
||||
admin_client,
|
||||
topic_name,
|
||||
num_partitions,
|
||||
replication_factor,
|
||||
max_retries,
|
||||
config,
|
||||
)
|
||||
try:
|
||||
yield None
|
||||
finally:
|
||||
# Code to release resource, e.g.:
|
||||
kafka_delete_topic(admin_client, topic_name, max_retries)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def existing_kafka_topic(admin_client, topic_name, max_retries=50):
|
||||
try:
|
||||
yield None
|
||||
finally:
|
||||
kafka_delete_topic(admin_client, topic_name, max_retries)
|
||||
|
||||
|
||||
def kafka_create_topic(
|
||||
admin_client,
|
||||
topic_name,
|
||||
num_partitions=1,
|
||||
replication_factor=1,
|
||||
max_retries=50,
|
||||
config=None,
|
||||
):
|
||||
logging.debug(
|
||||
f"Kafka create topic={topic_name}, num_partitions={num_partitions}, replication_factor={replication_factor}"
|
||||
)
|
||||
topics_list = [
|
||||
NewTopic(
|
||||
name=topic_name,
|
||||
num_partitions=num_partitions,
|
||||
replication_factor=replication_factor,
|
||||
topic_configs=config,
|
||||
)
|
||||
]
|
||||
retries = 0
|
||||
while True:
|
||||
try:
|
||||
admin_client.create_topics(new_topics=topics_list, validate_only=False)
|
||||
logging.debug("Admin client succeed")
|
||||
return
|
||||
except Exception as e:
|
||||
retries += 1
|
||||
time.sleep(0.5)
|
||||
if retries < max_retries:
|
||||
logging.warning(f"Failed to create topic {e}")
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def kafka_delete_topic(admin_client, topic, max_retries=50):
|
||||
result = admin_client.delete_topics([topic])
|
||||
for topic, e in result.topic_error_codes:
|
||||
if e == 0:
|
||||
logging.debug(f"Topic {topic} deleted")
|
||||
else:
|
||||
logging.error(f"Failed to delete topic {topic}: {e}")
|
||||
|
||||
retries = 0
|
||||
while True:
|
||||
topics_listed = admin_client.list_topics()
|
||||
logging.debug(f"TOPICS LISTED: {topics_listed}")
|
||||
if topic not in topics_listed:
|
||||
return
|
||||
else:
|
||||
retries += 1
|
||||
time.sleep(0.5)
|
||||
if retries > max_retries:
|
||||
raise Exception(f"Failed to delete topics {topic}, {result}")
|
||||
|
||||
|
||||
def get_kafka_producer(port, serializer, retries):
|
||||
errors = []
|
||||
for _ in range(retries):
|
||||
try:
|
||||
producer = KafkaProducer(
|
||||
bootstrap_servers="localhost:{}".format(port),
|
||||
value_serializer=serializer,
|
||||
)
|
||||
logging.debug("Kafka Connection establised: localhost:{}".format(port))
|
||||
return producer
|
||||
except Exception as e:
|
||||
errors += [str(e)]
|
||||
time.sleep(1)
|
||||
|
||||
raise Exception("Connection not establised, {}".format(errors))
|
||||
|
||||
|
||||
def producer_serializer(x):
|
||||
return x.encode() if isinstance(x, str) else x
|
||||
|
||||
|
||||
def get_admin_client(kafka_cluster):
|
||||
return KafkaAdminClient(
|
||||
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
|
||||
)
|
||||
|
||||
|
||||
def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15):
|
||||
logging.debug(
|
||||
"kafka_produce server:{}:{} topic:{}".format(
|
||||
"localhost", kafka_cluster.kafka_port, topic
|
||||
)
|
||||
)
|
||||
producer = get_kafka_producer(
|
||||
kafka_cluster.kafka_port, producer_serializer, retries
|
||||
)
|
||||
for message in messages:
|
||||
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
|
||||
producer.flush()
|
||||
|
||||
|
||||
def kafka_consume(kafka_cluster, topic, need_decode=True, timestamp=0):
|
||||
consumer = KafkaConsumer(
|
||||
bootstrap_servers=f"localhost:{kafka_cluster.kafka_port}",
|
||||
auto_offset_reset="earliest",
|
||||
session_timeout_ms=1000,
|
||||
)
|
||||
consumer.subscribe(topics=(topic))
|
||||
for toppar, messages in list(consumer.poll(5000).items()):
|
||||
if toppar.topic == topic:
|
||||
for message in messages:
|
||||
assert timestamp == 0 or message.timestamp / 1000 == timestamp
|
||||
if need_decode:
|
||||
yield message.value.decode()
|
||||
else:
|
||||
yield message.value
|
||||
consumer.unsubscribe()
|
||||
consumer.close()
|
||||
|
||||
|
||||
def kafka_produce_protobuf_messages(kafka_cluster, topic, start_index, num_messages):
|
||||
data = b""
|
||||
for i in range(start_index, start_index + num_messages):
|
||||
msg = kafka_pb2.KeyValuePair()
|
||||
msg.key = i
|
||||
msg.value = str(i)
|
||||
serialized_msg = msg.SerializeToString()
|
||||
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
|
||||
producer = KafkaProducer(
|
||||
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port),
|
||||
value_serializer=producer_serializer,
|
||||
)
|
||||
producer.send(topic=topic, value=data)
|
||||
producer.flush()
|
||||
logging.debug(("Produced {} messages for topic {}".format(num_messages, topic)))
|
||||
|
||||
|
||||
def kafka_consume_with_retry(
|
||||
kafka_cluster,
|
||||
topic,
|
||||
expected_messages,
|
||||
need_decode=True,
|
||||
timestamp=0,
|
||||
retry_count=20,
|
||||
sleep_time=0.1,
|
||||
):
|
||||
messages = []
|
||||
try_count = 0
|
||||
while try_count < retry_count:
|
||||
try_count += 1
|
||||
messages.extend(
|
||||
kafka_consume(
|
||||
kafka_cluster, topic, need_decode=need_decode, timestamp=timestamp
|
||||
)
|
||||
)
|
||||
if len(messages) == expected_messages:
|
||||
break
|
||||
time.sleep(sleep_time)
|
||||
if len(messages) != expected_messages:
|
||||
raise Exception(f"Got only {len(messages)} messages")
|
||||
return messages
|
||||
|
||||
|
||||
def kafka_produce_protobuf_messages_no_delimiters(
|
||||
kafka_cluster, topic, start_index, num_messages
|
||||
):
|
||||
producer = KafkaProducer(
|
||||
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
|
||||
)
|
||||
for i in range(start_index, start_index + num_messages):
|
||||
msg = kafka_pb2.KeyValuePair()
|
||||
msg.key = i
|
||||
msg.value = str(i)
|
||||
serialized_msg = msg.SerializeToString()
|
||||
producer.send(topic=topic, value=serialized_msg)
|
||||
producer.flush()
|
||||
logging.debug("Produced {} messages for topic {}".format(num_messages, topic))
|
||||
|
||||
|
||||
def kafka_produce_protobuf_social(kafka_cluster, topic, start_index, num_messages):
|
||||
data = b""
|
||||
for i in range(start_index, start_index + num_messages):
|
||||
msg = social_pb2.User()
|
||||
msg.username = "John Doe {}".format(i)
|
||||
msg.timestamp = 1000000 + i
|
||||
serialized_msg = msg.SerializeToString()
|
||||
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
|
||||
producer = KafkaProducer(
|
||||
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port),
|
||||
value_serializer=producer_serializer,
|
||||
)
|
||||
producer.send(topic=topic, value=data)
|
||||
producer.flush()
|
||||
logging.debug(("Produced {} messages for topic {}".format(num_messages, topic)))
|
||||
|
||||
|
||||
def avro_message(value):
|
||||
schema = avro.schema.make_avsc_object(
|
||||
{
|
||||
"name": "row",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{"name": "id", "type": "long"},
|
||||
{"name": "blockNo", "type": "int"},
|
||||
{"name": "val1", "type": "string"},
|
||||
{"name": "val2", "type": "float"},
|
||||
{"name": "val3", "type": "int"},
|
||||
],
|
||||
}
|
||||
)
|
||||
bytes_writer = io.BytesIO()
|
||||
|
||||
# DataFileWrite seems to be mandatory to get schema encoded
|
||||
writer = avro.datafile.DataFileWriter(bytes_writer, avro.io.DatumWriter(), schema)
|
||||
if isinstance(value, list):
|
||||
for v in value:
|
||||
writer.append(v)
|
||||
else:
|
||||
writer.append(value)
|
||||
writer.flush()
|
||||
raw_bytes = bytes_writer.getvalue()
|
||||
|
||||
writer.close()
|
||||
bytes_writer.close()
|
||||
return raw_bytes
|
||||
|
||||
|
||||
def avro_confluent_message(schema_registry_client, value):
|
||||
serializer = MessageSerializer(schema_registry_client)
|
||||
schema = avro.schema.make_avsc_object(
|
||||
{
|
||||
"name": "row",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{"name": "id", "type": "long"},
|
||||
{"name": "blockNo", "type": "int"},
|
||||
{"name": "val1", "type": "string"},
|
||||
{"name": "val2", "type": "float"},
|
||||
{"name": "val3", "type": "int"},
|
||||
],
|
||||
}
|
||||
)
|
||||
return serializer.encode_record_with_schema("test_subject", schema, value)
|
||||
|
||||
|
||||
def create_settings_string(settings):
|
||||
if settings is None:
|
||||
return ""
|
||||
|
||||
def format_value(value):
|
||||
if isinstance(value, str):
|
||||
return f"'{value}'"
|
||||
elif isinstance(value, bool):
|
||||
return str(int(value))
|
||||
return str(value)
|
||||
|
||||
settings_string = "SETTINGS "
|
||||
keys = settings.keys()
|
||||
first_key = next(iter(settings))
|
||||
settings_string += str(first_key) + " = " + format_value(settings[first_key])
|
||||
for key in keys:
|
||||
if key == first_key:
|
||||
continue
|
||||
settings_string += ", " + str(key) + " = " + format_value(settings[key])
|
||||
return settings_string
|
||||
|
||||
|
||||
def generate_old_create_table_query(
|
||||
table_name,
|
||||
columns_def,
|
||||
database="test",
|
||||
brokers="{kafka_broker}:19092",
|
||||
topic_list="{kafka_topic_new}",
|
||||
consumer_group="{kafka_group_name_new}",
|
||||
format="{kafka_format_json_each_row}",
|
||||
row_delimiter="\\n",
|
||||
keeper_path=None, # it is not used, but it is easier to handle keeper_path and replica_name like this
|
||||
replica_name=None,
|
||||
settings=None,
|
||||
):
|
||||
settings_string = create_settings_string(settings)
|
||||
query = f"""CREATE TABLE {database}.{table_name} ({columns_def}) ENGINE = Kafka('{brokers}', '{topic_list}', '{consumer_group}', '{format}', '{row_delimiter}')
|
||||
{settings_string}"""
|
||||
logging.debug(f"Generated old create query: {query}")
|
||||
return query
|
||||
|
||||
|
||||
def generate_new_create_table_query(
|
||||
table_name,
|
||||
columns_def,
|
||||
database="test",
|
||||
brokers="{kafka_broker}:19092",
|
||||
topic_list="{kafka_topic_new}",
|
||||
consumer_group="{kafka_group_name_new}",
|
||||
format="{kafka_format_json_each_row}",
|
||||
row_delimiter="\\n",
|
||||
keeper_path=None,
|
||||
replica_name=None,
|
||||
settings=None,
|
||||
):
|
||||
if settings is None:
|
||||
settings = {}
|
||||
if keeper_path is None:
|
||||
keeper_path = f"/clickhouse/{{database}}/{table_name}"
|
||||
if replica_name is None:
|
||||
replica_name = "r1"
|
||||
settings["kafka_keeper_path"] = keeper_path
|
||||
settings["kafka_replica_name"] = replica_name
|
||||
settings_string = create_settings_string(settings)
|
||||
query = f"""CREATE TABLE {database}.{table_name} ({columns_def}) ENGINE = Kafka('{brokers}', '{topic_list}', '{consumer_group}', '{format}', '{row_delimiter}')
|
||||
{settings_string}
|
||||
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1"""
|
||||
logging.debug(f"Generated new create query: {query}")
|
||||
return query
|
||||
|
||||
|
||||
def must_use_thread_per_consumer(generator):
|
||||
if generator == generate_old_create_table_query:
|
||||
return False
|
||||
if generator == generate_new_create_table_query:
|
||||
return True
|
||||
raise Exception("Unexpected generator")
|
||||
|
||||
|
||||
def get_topic_postfix(generator):
|
||||
if generator == generate_old_create_table_query:
|
||||
return "old"
|
||||
if generator == generate_new_create_table_query:
|
||||
return "new"
|
||||
raise Exception("Unexpected generator")
|
||||
|
||||
|
||||
# Since everything is async and shaky when receiving messages from Kafka,
|
||||
# we may want to try and check results multiple times in a loop.
|
||||
def kafka_check_result(result, check=False, ref_file="test_kafka_json.reference"):
|
||||
fpath = os.path.join(os.path.dirname(__file__), ref_file)
|
||||
with open(fpath) as reference:
|
||||
if check:
|
||||
assert TSV(result) == TSV(reference)
|
||||
else:
|
||||
return TSV(result) == TSV(reference)
|
||||
|
||||
|
||||
# https://stackoverflow.com/a/57692111/1555175
|
||||
def describe_consumer_group(kafka_cluster, name):
|
||||
client = BrokerConnection("localhost", kafka_cluster.kafka_port, socket.AF_INET)
|
||||
client.connect_blocking()
|
||||
|
||||
list_members_in_groups = DescribeGroupsRequest_v1(groups=[name])
|
||||
future = client.send(list_members_in_groups)
|
||||
while not future.is_done:
|
||||
for resp, f in client.recv():
|
||||
f.success(resp)
|
||||
|
||||
(
|
||||
error_code,
|
||||
group_id,
|
||||
state,
|
||||
protocol_type,
|
||||
protocol,
|
||||
members,
|
||||
) = future.value.groups[0]
|
||||
|
||||
res = []
|
||||
for member in members:
|
||||
(member_id, client_id, client_host, member_metadata, member_assignment) = member
|
||||
member_info = {}
|
||||
member_info["member_id"] = member_id
|
||||
member_info["client_id"] = client_id
|
||||
member_info["client_host"] = client_host
|
||||
member_topics_assignment = []
|
||||
for topic, partitions in MemberAssignment.decode(member_assignment).assignment:
|
||||
member_topics_assignment.append({"topic": topic, "partitions": partitions})
|
||||
member_info["assignment"] = member_topics_assignment
|
||||
res.append(member_info)
|
||||
return res
|
||||
|
||||
|
||||
def insert_with_retry(instance, values, table_name="kafka", max_try_count=5):
|
||||
try_count = 0
|
||||
while True:
|
||||
logging.debug(f"Inserting, try_count is {try_count}")
|
||||
try:
|
||||
try_count += 1
|
||||
instance.query(f"INSERT INTO test.{table_name} VALUES {values}")
|
||||
break
|
||||
except QueryRuntimeException as e:
|
||||
if "Local: Timed out." in str(e) and try_count < max_try_count:
|
||||
continue
|
||||
else:
|
||||
raise
|
File diff suppressed because one or more lines are too long
1180
tests/integration/test_storage_kafka/test_1.py
Normal file
1180
tests/integration/test_storage_kafka/test_1.py
Normal file
File diff suppressed because it is too large
Load Diff
1224
tests/integration/test_storage_kafka/test_2.py
Normal file
1224
tests/integration/test_storage_kafka/test_2.py
Normal file
File diff suppressed because it is too large
Load Diff
1086
tests/integration/test_storage_kafka/test_3.py
Normal file
1086
tests/integration/test_storage_kafka/test_3.py
Normal file
File diff suppressed because it is too large
Load Diff
1214
tests/integration/test_storage_kafka/test_4.py
Normal file
1214
tests/integration/test_storage_kafka/test_4.py
Normal file
File diff suppressed because it is too large
Load Diff
630
tests/integration/test_storage_kafka/test_5.py
Normal file
630
tests/integration/test_storage_kafka/test_5.py
Normal file
File diff suppressed because one or more lines are too long
@ -1,109 +1,21 @@
|
||||
import time
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster, is_arm
|
||||
from helpers.test_tools import TSV
|
||||
from kafka import KafkaAdminClient
|
||||
from kafka.admin import NewTopic
|
||||
|
||||
from helpers.cluster import is_arm
|
||||
from helpers.test_tools import TSV
|
||||
from kafka.admin import KafkaAdminClient
|
||||
from test_storage_kafka.kafka_tests_utils import (
|
||||
kafka_create_topic,
|
||||
kafka_delete_topic,
|
||||
)
|
||||
from test_storage_kafka.conftest import conftest_cluster, init_cluster_and_instance
|
||||
|
||||
# Skip all tests on ARM
|
||||
if is_arm():
|
||||
pytestmark = pytest.mark.skip
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
instance = cluster.add_instance(
|
||||
"instance",
|
||||
main_configs=["configs/kafka.xml", "configs/named_collection.xml"],
|
||||
user_configs=["configs/users.xml"],
|
||||
with_kafka=True,
|
||||
with_zookeeper=True, # For Replicated Table
|
||||
macros={
|
||||
"kafka_broker": "kafka1",
|
||||
"kafka_topic_old": "old",
|
||||
"kafka_group_name_old": "old",
|
||||
"kafka_topic_new": "new",
|
||||
"kafka_group_name_new": "new",
|
||||
"kafka_client_id": "instance",
|
||||
"kafka_format_json_each_row": "JSONEachRow",
|
||||
},
|
||||
clickhouse_path_dir="clickhouse_path",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def kafka_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
kafka_id = instance.cluster.kafka_docker_id
|
||||
print(("kafka_id is {}".format(kafka_id)))
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def kafka_setup_teardown():
|
||||
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
|
||||
# logging.debug("kafka is available - running test")
|
||||
yield # run test
|
||||
|
||||
|
||||
def kafka_create_topic(
|
||||
admin_client,
|
||||
topic_name,
|
||||
num_partitions=1,
|
||||
replication_factor=1,
|
||||
max_retries=50,
|
||||
config=None,
|
||||
):
|
||||
logging.debug(
|
||||
f"Kafka create topic={topic_name}, num_partitions={num_partitions}, replication_factor={replication_factor}"
|
||||
)
|
||||
topics_list = [
|
||||
NewTopic(
|
||||
name=topic_name,
|
||||
num_partitions=num_partitions,
|
||||
replication_factor=replication_factor,
|
||||
topic_configs=config,
|
||||
)
|
||||
]
|
||||
retries = 0
|
||||
while True:
|
||||
try:
|
||||
admin_client.create_topics(new_topics=topics_list, validate_only=False)
|
||||
logging.debug("Admin client succeed")
|
||||
return
|
||||
except Exception as e:
|
||||
retries += 1
|
||||
time.sleep(0.5)
|
||||
if retries < max_retries:
|
||||
logging.warning(f"Failed to create topic {e}")
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def kafka_delete_topic(admin_client, topic, max_retries=50):
|
||||
result = admin_client.delete_topics([topic])
|
||||
for topic, e in result.topic_error_codes:
|
||||
if e == 0:
|
||||
logging.debug(f"Topic {topic} deleted")
|
||||
else:
|
||||
logging.error(f"Failed to delete topic {topic}: {e}")
|
||||
|
||||
retries = 0
|
||||
while True:
|
||||
topics_listed = admin_client.list_topics()
|
||||
logging.debug(f"TOPICS LISTED: {topics_listed}")
|
||||
if topic not in topics_listed:
|
||||
return
|
||||
else:
|
||||
retries += 1
|
||||
time.sleep(0.5)
|
||||
if retries > max_retries:
|
||||
raise Exception(f"Failed to delete topics {topic}, {result}")
|
||||
|
||||
|
||||
def test_kafka_produce_http_interface_row_based_format(kafka_cluster):
|
||||
def test_kafka_produce_http_interface_row_based_format(kafka_cluster, instance):
|
||||
# reproduction of #61060 with validating the written messages
|
||||
admin_client = KafkaAdminClient(
|
||||
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
|
||||
@ -182,7 +94,7 @@ def test_kafka_produce_http_interface_row_based_format(kafka_cluster):
|
||||
"MsgPack",
|
||||
]
|
||||
for format in formats_to_test:
|
||||
logging.debug(f"Creating tables and writing messages to {format}")
|
||||
logging.debug(f"Creating tables for {format}")
|
||||
topic = topic_prefix + format
|
||||
kafka_create_topic(admin_client, topic)
|
||||
|
||||
@ -212,6 +124,10 @@ def test_kafka_produce_http_interface_row_based_format(kafka_cluster):
|
||||
SELECT key, value FROM test.kafka_{topic};
|
||||
"""
|
||||
)
|
||||
|
||||
for format in formats_to_test:
|
||||
logging.debug(f"Inserting data to {format}")
|
||||
topic = topic_prefix + format
|
||||
instance.http_query(
|
||||
insert_query_template.format(table_name="test.kafka_writer_" + topic),
|
||||
method="POST",
|
||||
@ -238,6 +154,7 @@ def test_kafka_produce_http_interface_row_based_format(kafka_cluster):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cluster.start()
|
||||
init_cluster_and_instance()
|
||||
conftest_cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
cluster.shutdown()
|
||||
conftest_cluster.shutdown()
|
||||
|
Loading…
Reference in New Issue
Block a user