mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
fix
This commit is contained in:
parent
872e36c207
commit
609c9b272f
@ -186,7 +186,6 @@ class ClickHouseCluster:
|
||||
self.kafka_host = "kafka1"
|
||||
self.kafka_port = get_open_port()
|
||||
self.kafka_docker_id = None
|
||||
self.schema_registry_client = None
|
||||
self.schema_registry_host = "schema-registry"
|
||||
self.schema_registry_port = get_open_port()
|
||||
self.kafka_docker_id = self.get_instance_docker_id(self.kafka_host)
|
||||
@ -805,7 +804,7 @@ class ClickHouseCluster:
|
||||
if self.with_kafka and self.base_kafka_cmd:
|
||||
print('Setup Kafka')
|
||||
subprocess_check_call(self.base_kafka_cmd + common_opts + ['--renew-anon-volumes'])
|
||||
self.schema_registry_client = self.wait_schema_registry_to_start(30)
|
||||
self.wait_schema_registry_to_start(30)
|
||||
|
||||
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:
|
||||
print('Setup kerberized kafka')
|
||||
|
@ -42,7 +42,7 @@ cluster = ClickHouseCluster(__file__)
|
||||
instance = cluster.add_instance('instance',
|
||||
main_configs=['configs/kafka.xml', 'configs/log_conf.xml'],
|
||||
with_kafka=True,
|
||||
# with_zookeeper=True,
|
||||
with_zookeeper=True, # For Replicated Table
|
||||
macros={"kafka_broker":"kafka1",
|
||||
"kafka_topic_old":"old",
|
||||
"kafka_group_name_old":"old",
|
||||
@ -130,7 +130,7 @@ def kafka_produce_protobuf_messages(kafka_cluster, topic, start_index, num_messa
|
||||
|
||||
def kafka_produce_protobuf_messages_no_delimeters(kafka_cluster, topic, start_index, num_messages):
|
||||
data = ''
|
||||
producer = KafkaProducer(bootstrap_servers="localhost:".format(kafka_cluster.kafka_port))
|
||||
producer = KafkaProducer(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
for i in range(start_index, start_index + num_messages):
|
||||
msg = kafka_pb2.KeyValuePair()
|
||||
msg.key = i
|
||||
@ -1477,7 +1477,7 @@ def test_kafka_insert(kafka_cluster):
|
||||
|
||||
messages = []
|
||||
while True:
|
||||
messages.extend(kafka_consume('insert1'))
|
||||
messages.extend(kafka_consume(kafka_cluster, 'insert1'))
|
||||
if len(messages) == 50:
|
||||
break
|
||||
|
||||
@ -1942,7 +1942,7 @@ def test_kafka_rebalance(kafka_cluster):
|
||||
|
||||
# time.sleep(2)
|
||||
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:".format(kafka_cluster.kafka_port))
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
topic_list = []
|
||||
topic_list.append(NewTopic(name="topic_with_multiple_partitions", num_partitions=11, replication_factor=1))
|
||||
admin_client.create_topics(new_topics=topic_list, validate_only=False)
|
||||
|
Loading…
Reference in New Issue
Block a user