Merge pull request #55424 from ClickHouse/update-rabbitq

Updated RabbitMQ image and fixed log retrieval in failed tests.
This commit is contained in:
Alexey Milovidov 2023-10-10 03:32:21 +02:00 committed by GitHub
commit 68ce6b9b00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 145 additions and 57 deletions

View File

@ -2,15 +2,13 @@ version: '2.3'
services:
rabbitmq1:
image: rabbitmq:3.8-management-alpine
image: rabbitmq:3.12.6-management-alpine
hostname: rabbitmq1
expose:
- ${RABBITMQ_PORT:-5672}
environment:
RABBITMQ_DEFAULT_USER: "root"
RABBITMQ_DEFAULT_PASS: "clickhouse"
RABBITMQ_LOG_BASE: /rabbitmq_logs/
volumes:
- type: ${RABBITMQ_LOGS_FS:-tmpfs}
source: ${RABBITMQ_LOGS:-}
target: /rabbitmq_logs/
- "${RABBITMQ_COOKIE_FILE}:/var/lib/rabbitmq/.erlang.cookie"
- /misc/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf

View File

@ -0,0 +1,8 @@
loopback_users.guest = false
listeners.tcp.default = 5672
default_pass = clickhouse
default_user = root
management.tcp.port = 15672
log.file = /rabbitmq_logs/rabbit.log
log.file.level = debug

View File

@ -239,15 +239,71 @@ def check_postgresql_java_client_is_available(postgresql_java_client_id):
return p.returncode == 0
def check_rabbitmq_is_available(rabbitmq_id):
def check_rabbitmq_is_available(rabbitmq_id, cookie):
p = subprocess.Popen(
("docker", "exec", "-i", rabbitmq_id, "rabbitmqctl", "await_startup"),
(
"docker",
"exec",
"-e",
f"RABBITMQ_ERLANG_COOKIE={cookie}",
"-i",
rabbitmq_id,
"rabbitmqctl",
"await_startup",
),
stdout=subprocess.PIPE,
)
p.communicate()
return p.returncode == 0
def rabbitmq_debuginfo(rabbitmq_id, cookie):
p = subprocess.Popen(
(
"docker",
"exec",
"-e",
f"RABBITMQ_ERLANG_COOKIE={cookie}",
"-i",
rabbitmq_id,
"rabbitmq-diagnostics",
"status",
),
stdout=subprocess.PIPE,
)
p.communicate()
p = subprocess.Popen(
(
"docker",
"exec",
"-e",
f"RABBITMQ_ERLANG_COOKIE={cookie}",
"-i",
rabbitmq_id,
"rabbitmq-diagnostics",
"listeners",
),
stdout=subprocess.PIPE,
)
p.communicate()
p = subprocess.Popen(
(
"docker",
"exec",
"-e",
f"RABBITMQ_ERLANG_COOKIE={cookie}",
"-i",
rabbitmq_id,
"rabbitmq-diagnostics",
"environment",
),
stdout=subprocess.PIPE,
)
p.communicate()
async def check_nats_is_available(nats_port, ssl_ctx=None):
nc = await nats_connect_ssl(
nats_port, user="click", password="house", ssl_ctx=ssl_ctx
@ -271,11 +327,13 @@ async def nats_connect_ssl(nats_port, user, password, ssl_ctx=None):
return nc
def enable_consistent_hash_plugin(rabbitmq_id):
def enable_consistent_hash_plugin(rabbitmq_id, cookie):
p = subprocess.Popen(
(
"docker",
"exec",
"-e",
f"RABBITMQ_ERLANG_COOKIE={cookie}",
"-i",
rabbitmq_id,
"rabbitmq-plugins",
@ -527,7 +585,9 @@ class ClickHouseCluster:
self.rabbitmq_ip = None
self.rabbitmq_port = 5672
self.rabbitmq_dir = p.abspath(p.join(self.instances_dir, "rabbitmq"))
self.rabbitmq_cookie_file = os.path.join(self.rabbitmq_dir, "erlang.cookie")
self.rabbitmq_logs_dir = os.path.join(self.rabbitmq_dir, "logs")
self.rabbitmq_cookie = self.get_instance_docker_id(self.rabbitmq_host)
self.nats_host = "nats1"
self.nats_port = 4444
@ -1250,6 +1310,8 @@ class ClickHouseCluster:
env_variables["RABBITMQ_PORT"] = str(self.rabbitmq_port)
env_variables["RABBITMQ_LOGS"] = self.rabbitmq_logs_dir
env_variables["RABBITMQ_LOGS_FS"] = "bind"
env_variables["RABBITMQ_COOKIE_FILE"] = self.rabbitmq_cookie_file
env_variables["RABBITMQ_COOKIE_FILE_FS"] = "bind"
self.base_cmd.extend(
["--file", p.join(docker_compose_yml_dir, "docker_compose_rabbitmq.yml")]
@ -2222,25 +2284,36 @@ class ClickHouseCluster:
time.sleep(0.5)
raise Exception("Cannot wait PostgreSQL Java Client container")
def wait_rabbitmq_to_start(self, timeout=180, throw=True):
def wait_rabbitmq_to_start(self, timeout=30):
self.print_all_docker_pieces()
self.rabbitmq_ip = self.get_instance_ip(self.rabbitmq_host)
start = time.time()
while time.time() - start < timeout:
try:
if check_rabbitmq_is_available(self.rabbitmq_docker_id):
if check_rabbitmq_is_available(
self.rabbitmq_docker_id, self.rabbitmq_cookie
):
logging.debug("RabbitMQ is available")
if enable_consistent_hash_plugin(self.rabbitmq_docker_id):
if enable_consistent_hash_plugin(
self.rabbitmq_docker_id, self.rabbitmq_cookie
):
logging.debug("RabbitMQ consistent hash plugin is available")
return True
return True
time.sleep(0.5)
except Exception as ex:
logging.debug("Can't connect to RabbitMQ " + str(ex))
time.sleep(0.5)
if throw:
raise Exception("Cannot wait RabbitMQ container")
return False
try:
with open(os.path.join(self.rabbitmq_dir, "docker.log"), "w+") as f:
subprocess.check_call( # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
self.base_rabbitmq_cmd + ["logs"], stdout=f
)
rabbitmq_debuginfo(self.rabbitmq_docker_id, self.rabbitmq_cookie)
except Exception as e:
logging.debug("Unable to get logs from docker.")
raise Exception("Cannot wait RabbitMQ container")
def wait_nats_is_available(self, max_retries=5):
retries = 0
@ -2749,15 +2822,18 @@ class ClickHouseCluster:
os.makedirs(self.rabbitmq_logs_dir)
os.chmod(self.rabbitmq_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
for i in range(5):
subprocess_check_call(
self.base_rabbitmq_cmd + common_opts + ["--renew-anon-volumes"]
)
self.up_called = True
self.rabbitmq_docker_id = self.get_instance_docker_id("rabbitmq1")
logging.debug(f"RabbitMQ checking container try: {i}")
if self.wait_rabbitmq_to_start(throw=(i == 4)):
break
with open(self.rabbitmq_cookie_file, "w") as f:
f.write(self.rabbitmq_cookie)
os.chmod(self.rabbitmq_cookie_file, stat.S_IRUSR)
subprocess_check_call(
self.base_rabbitmq_cmd + common_opts + ["--renew-anon-volumes"]
)
self.up_called = True
self.rabbitmq_docker_id = self.get_instance_docker_id("rabbitmq1")
time.sleep(2)
logging.debug(f"RabbitMQ checking container try")
self.wait_rabbitmq_to_start()
if self.with_nats and self.base_nats_cmd:
logging.debug("Setup NATS")

View File

@ -49,12 +49,12 @@ def rabbitmq_check_result(result, check=False, ref_file="test_rabbitmq_json.refe
return TSV(result) == TSV(reference)
def wait_rabbitmq_to_start(rabbitmq_docker_id, timeout=180):
def wait_rabbitmq_to_start(rabbitmq_docker_id, cookie, timeout=180):
logging.getLogger("pika").propagate = False
start = time.time()
while time.time() - start < timeout:
try:
if check_rabbitmq_is_available(rabbitmq_docker_id):
if check_rabbitmq_is_available(rabbitmq_docker_id, cookie):
logging.debug("RabbitMQ is available")
return
time.sleep(0.5)
@ -69,10 +69,10 @@ def kill_rabbitmq(rabbitmq_id):
return p.returncode == 0
def revive_rabbitmq(rabbitmq_id):
def revive_rabbitmq(rabbitmq_id, cookie):
p = subprocess.Popen(("docker", "start", rabbitmq_id), stdout=subprocess.PIPE)
p.communicate()
wait_rabbitmq_to_start(rabbitmq_id)
wait_rabbitmq_to_start(rabbitmq_id, cookie)
# Fixtures
@ -93,7 +93,7 @@ def rabbitmq_cluster():
@pytest.fixture(autouse=True)
def rabbitmq_setup_teardown():
print("RabbitMQ is available - running test")
logging.debug("RabbitMQ is available - running test")
yield # run test
instance.query("DROP DATABASE test SYNC")
instance.query("CREATE DATABASE test")
@ -398,7 +398,7 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster):
while time.monotonic() < deadline:
result = instance.query("SELECT * FROM test.view2 ORDER BY key")
print(f"Result: {result}")
logging.debug(f"Result: {result}")
if rabbitmq_check_result(result):
break
time.sleep(1)
@ -630,7 +630,7 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
expected = messages_num * threads_num
if int(result1) == expected:
break
print(f"Result {result1} / {expected}")
logging.debug(f"Result {result1} / {expected}")
result2 = instance.query("SELECT count(DISTINCT channel_id) FROM test.view")
@ -724,7 +724,7 @@ def test_rabbitmq_mv_combo(rabbitmq_cluster):
expected = messages_num * threads_num * NUM_MV
if int(result) == expected:
break
print(f"Result: {result} / {expected}")
logging.debug(f"Result: {result} / {expected}")
time.sleep(1)
for thread in threads:
@ -938,7 +938,7 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster):
while True:
result = instance.query("SELECT count() FROM test.view_many")
print(result, messages_num * threads_num)
logging.debug(result, messages_num * threads_num)
if int(result) == messages_num * threads_num:
break
time.sleep(1)
@ -1029,7 +1029,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
expected = messages_num * threads_num
if int(result) == expected:
break
print(f"Result: {result} / {expected}")
logging.debug(f"Result: {result} / {expected}")
time.sleep(1)
instance.query(
@ -1060,7 +1060,7 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster):
num_tables = 5
for consumer_id in range(num_tables):
print(("Setting up table {}".format(consumer_id)))
logging.debug(("Setting up table {}".format(consumer_id)))
instance.query(
"""
DROP TABLE IF EXISTS test.direct_exchange_{0};
@ -1153,7 +1153,7 @@ def test_rabbitmq_fanout_exchange(rabbitmq_cluster):
num_tables = 5
for consumer_id in range(num_tables):
print(("Setting up table {}".format(consumer_id)))
logging.debug(("Setting up table {}".format(consumer_id)))
instance.query(
"""
DROP TABLE IF EXISTS test.fanout_exchange_{0};
@ -1241,7 +1241,7 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster):
num_tables = 5
for consumer_id in range(num_tables):
print(("Setting up table {}".format(consumer_id)))
logging.debug(("Setting up table {}".format(consumer_id)))
instance.query(
"""
DROP TABLE IF EXISTS test.topic_exchange_{0};
@ -1266,7 +1266,7 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster):
)
for consumer_id in range(num_tables):
print(("Setting up table {}".format(num_tables + consumer_id)))
logging.debug(("Setting up table {}".format(num_tables + consumer_id)))
instance.query(
"""
DROP TABLE IF EXISTS test.topic_exchange_{0};
@ -1366,7 +1366,7 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster):
num_tables = 4
for consumer_id in range(num_tables):
table_name = "rabbitmq_consumer{}".format(consumer_id)
print(("Setting up {}".format(table_name)))
logging.debug(("Setting up {}".format(table_name)))
instance.query(
"""
DROP TABLE IF EXISTS test.{0};
@ -1557,7 +1557,7 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster):
num_tables_to_receive = 2
for consumer_id in range(num_tables_to_receive):
print(("Setting up table {}".format(consumer_id)))
logging.debug(("Setting up table {}".format(consumer_id)))
instance.query(
"""
DROP TABLE IF EXISTS test.headers_exchange_{0};
@ -1582,7 +1582,9 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster):
num_tables_to_ignore = 2
for consumer_id in range(num_tables_to_ignore):
print(("Setting up table {}".format(consumer_id + num_tables_to_receive)))
logging.debug(
("Setting up table {}".format(consumer_id + num_tables_to_receive))
)
instance.query(
"""
DROP TABLE IF EXISTS test.headers_exchange_{0};
@ -1814,7 +1816,7 @@ def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster):
num_tables = 4
for table_id in range(num_tables):
print(("Setting up table {}".format(table_id)))
logging.debug(("Setting up table {}".format(table_id)))
instance.query(
"""
DROP TABLE IF EXISTS test.many_consumers_{0};
@ -1966,7 +1968,9 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
kill_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id)
time.sleep(4)
revive_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id)
revive_rabbitmq(
rabbitmq_cluster.rabbitmq_docker_id, rabbitmq_cluster.rabbitmq_cookie
)
while True:
result = instance.query("SELECT count(DISTINCT key) FROM test.view")
@ -2036,12 +2040,14 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
)
while int(instance.query("SELECT count() FROM test.view")) == 0:
print(3)
logging.debug(3)
time.sleep(0.1)
kill_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id)
time.sleep(8)
revive_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id)
revive_rabbitmq(
rabbitmq_cluster.rabbitmq_docker_id, rabbitmq_cluster.rabbitmq_cookie
)
# while int(instance.query('SELECT count() FROM test.view')) == 0:
# time.sleep(0.1)
@ -2054,7 +2060,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
result = instance.query("SELECT count(DISTINCT key) FROM test.view").strip()
if int(result) == messages_num:
break
print(f"Result: {result} / {messages_num}")
logging.debug(f"Result: {result} / {messages_num}")
time.sleep(1)
instance.query(
@ -2599,7 +2605,7 @@ def test_rabbitmq_issue_30691(rabbitmq_cluster):
result = ""
while True:
result = instance.query("SELECT * FROM test.rabbitmq_drop", ignore_error=True)
print(result)
logging.debug(result)
if result != "":
break
assert (
@ -2649,7 +2655,7 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
while True:
res = instance.query("SELECT COUNT(*) FROM test.view")
print(f"Current count (1): {res}")
logging.debug(f"Current count (1): {res}")
if int(res) == 20:
break
else:
@ -2674,7 +2680,7 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
while True:
result = instance.query("SELECT count() FROM test.view")
print(f"Current count (2): {result}")
logging.debug(f"Current count (2): {result}")
if int(result) == 50:
break
time.sleep(1)
@ -2921,7 +2927,7 @@ def test_format_with_prefix_and_suffix(rabbitmq_cluster):
def onReceived(channel, method, properties, body):
message = body.decode()
insert_messages.append(message)
print(f"Received {len(insert_messages)} message: {message}")
logging.debug(f"Received {len(insert_messages)} message: {message}")
if len(insert_messages) == 2:
channel.stop_consuming()
@ -3029,7 +3035,7 @@ def test_row_based_formats(rabbitmq_cluster):
"RowBinaryWithNamesAndTypes",
"MsgPack",
]:
print(format_name)
logging.debug(format_name)
instance.query(
f"""
@ -3176,7 +3182,7 @@ def test_block_based_formats_2(rabbitmq_cluster):
"ORC",
"JSONCompactColumns",
]:
print(format_name)
logging.debug(format_name)
instance.query(
f"""
@ -3296,7 +3302,7 @@ def test_rabbitmq_flush_by_block_size(rabbitmq_cluster):
body=json.dumps({"key": 0, "value": 0}),
)
except e:
print(f"Got error: {str(e)}")
logging.debug(f"Got error: {str(e)}")
produce_thread = threading.Thread(target=produce)
produce_thread.start()
@ -3371,10 +3377,10 @@ def test_rabbitmq_flush_by_time(rabbitmq_cluster):
routing_key="",
body=json.dumps({"key": 0, "value": 0}),
)
print("Produced a message")
logging.debug("Produced a message")
time.sleep(0.8)
except e:
print(f"Got error: {str(e)}")
logging.debug(f"Got error: {str(e)}")
produce_thread = threading.Thread(target=produce)
produce_thread.start()
@ -3391,13 +3397,13 @@ def test_rabbitmq_flush_by_time(rabbitmq_cluster):
count = instance.query(
"SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view'"
)
print(f"kssenii total count: {count}")
logging.debug(f"kssenii total count: {count}")
count = int(
instance.query(
"SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'"
)
)
print(f"kssenii count: {count}")
logging.debug(f"kssenii count: {count}")
if count > 0:
break