This commit is contained in:
Konstantin Bogdanov 2024-08-26 19:10:49 +02:00
parent dde915681c
commit cedddf6fa4
No known key found for this signature in database
2 changed files with 129 additions and 10 deletions

View File

@ -3,6 +3,8 @@
FROM alpine:3.18
RUN apk add --no-cache -U iproute2 \
&& for bin in iptables iptables-restore iptables-save; \
&& for bin in \
iptables iptables-restore iptables-save \
ip6tables ip6tables-restore ip6tables-save; \
do ln -sf xtables-nft-multi "/sbin/$bin"; \
done

View File

@ -3,6 +3,7 @@ import subprocess
import time
import logging
import docker
import ipaddress
class PartitionManager:
@ -26,25 +27,76 @@ class PartitionManager:
self._check_instance(instance)
self._add_rule(
{"source": instance.ip_address, "destination_port": 2181, "action": action}
{
"source": instance.ipv4_address,
"destination_port": 2181,
"action": action,
}
)
self._add_rule(
{"destination": instance.ip_address, "source_port": 2181, "action": action}
{
"destination": instance.ipv4_address,
"source_port": 2181,
"action": action,
}
)
if instance.ipv6_address:
self._add_rule(
{
"source": instance.ipv6_address,
"destination_port": 2181,
"action": action,
}
)
self._add_rule(
{
"destination": instance.ipv6_address,
"source_port": 2181,
"action": action,
}
)
def dump_rules(self):
return _NetworkManager.get().dump_rules()
v4 = _NetworkManager.get().dump_rules()
v6 = _NetworkManager.get().dump_v6_rules()
return v4 + v6
def restore_instance_zk_connections(self, instance, action="DROP"):
self._check_instance(instance)
self._delete_rule(
{"source": instance.ip_address, "destination_port": 2181, "action": action}
{
"source": instance.ipv4_address,
"destination_port": 2181,
"action": action,
}
)
self._delete_rule(
{"destination": instance.ip_address, "source_port": 2181, "action": action}
{
"destination": instance.ipv4_address,
"source_port": 2181,
"action": action,
}
)
if instance.ipv6_address:
self._delete_rule(
{
"source": instance.ipv6_address,
"destination_port": 2181,
"action": action,
}
)
self._delete_rule(
{
"destination": instance.ipv6_address,
"source_port": 2181,
"action": action,
}
)
def partition_instances(self, left, right, port=None, action="DROP"):
self._check_instance(left)
self._check_instance(right)
@ -59,16 +111,36 @@ class PartitionManager:
rule["destination_port"] = port
return rule
def create_rule_v6(src, dst):
rule = {
"source": src.ipv6_address,
"destination": dst.ipv6_address,
"action": action,
}
if port is not None:
rule["destination_port"] = port
return rule
self._add_rule(create_rule(left, right))
self._add_rule(create_rule(right, left))
if left.ipv6_address and right.ipv6_address:
self._add_rule(create_rule_v6(left, right))
self._add_rule(create_rule_v6(right, left))
def add_network_delay(self, instance, delay_ms):
self._add_tc_netem_delay(instance, delay_ms)
def heal_all(self):
while self._iptables_rules:
rule = self._iptables_rules.pop()
_NetworkManager.get().delete_iptables_rule(**rule)
if self._is_ipv6_rule(rule):
_NetworkManager.get().delete_ip6tables_rule(**rule)
else:
_NetworkManager.get().delete_iptables_rule(**rule)
# _NetworkManager.get().delete_iptables_rule(**rule)
# _NetworkManager.get().delete_ip6tables_rule(**rule)
while self._netem_delayed_instances:
instance = self._netem_delayed_instances.pop()
@ -90,12 +162,29 @@ class PartitionManager:
if instance.ip_address is None:
raise Exception("Instance + " + instance.name + " is not launched!")
@staticmethod
def _is_ipv6_rule(rule):
is_ipv6 = False
if "source" in rule:
is_ipv6 = ipaddress.ip_address(rule["source"]).version == 6
if "destination" in rule:
is_ipv6 = ipaddress.ip_address(rule["source"]).version == 6
return is_ipv6
def _add_rule(self, rule):
_NetworkManager.get().add_iptables_rule(**rule)
if self._is_ipv6_rule(rule):
_NetworkManager.get().add_ip6tables_rule(**rule)
else:
_NetworkManager.get().add_iptables_rule(**rule)
self._iptables_rules.append(rule)
def _delete_rule(self, rule):
_NetworkManager.get().delete_iptables_rule(**rule)
if self._is_ipv6_rule(rule):
_NetworkManager.get().delete_ip6tables_rule(**rule)
else:
_NetworkManager.get().delete_iptables_rule(**rule)
self._iptables_rules.remove(rule)
def _add_tc_netem_delay(self, instance, delay_ms):
@ -155,15 +244,29 @@ class _NetworkManager:
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
def add_ip6tables_rule(self, **kwargs):
cmd = ["ip6tables-legacy", "--wait", "-I", "DOCKER-USER", "1"]
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
def delete_iptables_rule(self, **kwargs):
cmd = ["iptables", "--wait", "-D", "DOCKER-USER"]
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
def delete_ip6tables_rule(self, **kwargs):
cmd = ["ip6tables-legacy", "--wait", "-D", "DOCKER-USER"]
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
def dump_rules(self):
cmd = ["iptables", "-L", "DOCKER-USER"]
return self._exec_run(cmd, privileged=True)
def dump_v6_rules(self):
cmd = ["ip6tables-legacy", "-L", "DOCKER-USER"]
return self._exec_run(cmd, privileged=True)
@staticmethod
def clean_all_user_iptables_rules():
for i in range(1000):
@ -178,6 +281,20 @@ class _NetworkManager:
+ " iterations, last error: "
+ str(res.stderr)
)
break
for i in range(1000):
iptables_iter = i
# when rules will be empty, it will return error
res = subprocess.run("ip6tables-legacy --wait -D DOCKER-USER 1", shell=True)
if res.returncode != 0:
logging.info(
"All ip6tables rules cleared, "
+ str(iptables_iter)
+ " iterations, last error: "
+ str(res.stderr)
)
return
@staticmethod
@ -244,7 +361,7 @@ class _NetworkManager:
def _ensure_container(self):
if self._container is None or self._container_expire_time <= time.time():
image_name = "clickhouse/integration-helper:" + os.getenv(
"DOCKER_HELPER_TAG", "latest"
"DOCKER_HELPER_TAG", ""
)
for i in range(5):
if self._container is not None: