mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge branch 'ClickHouse:master' into time_buckets_impl
This commit is contained in:
commit
eb540da247
@ -3,6 +3,8 @@
|
|||||||
|
|
||||||
FROM alpine:3.18
|
FROM alpine:3.18
|
||||||
RUN apk add --no-cache -U iproute2 \
|
RUN apk add --no-cache -U iproute2 \
|
||||||
&& for bin in iptables iptables-restore iptables-save; \
|
&& for bin in \
|
||||||
|
iptables iptables-restore iptables-save \
|
||||||
|
ip6tables ip6tables-restore ip6tables-save; \
|
||||||
do ln -sf xtables-nft-multi "/sbin/$bin"; \
|
do ln -sf xtables-nft-multi "/sbin/$bin"; \
|
||||||
done
|
done
|
||||||
|
@ -24,9 +24,11 @@ DELETE FROM hits WHERE Title LIKE '%hello%';
|
|||||||
|
|
||||||
## Lightweight `DELETE` does not delete data immediately
|
## Lightweight `DELETE` does not delete data immediately
|
||||||
|
|
||||||
Lightweight `DELETE` is implemented as a [mutation](/en/sql-reference/statements/alter#mutations), which is executed asynchronously in the background by default. The statement is going to return almost immediately, but the data can still be visible to queries until the mutation is finished.
|
Lightweight `DELETE` is implemented as a [mutation](/en/sql-reference/statements/alter#mutations) that marks rows as deleted but does not immediately physically delete them.
|
||||||
|
|
||||||
The mutation marks rows as deleted, and at that point, they will no longer show up in query results. It does not physically delete the data, this will happen during the next merge. As a result, it is possible that for an unspecified period, data is not actually deleted from storage and is only marked as deleted.
|
By default, `DELETE` statements wait until marking the rows as deleted is completed before returning. This can take a long time if the amount of data is large. Alternatively, you can run it asynchronously in the background using the setting [`lightweight_deletes_sync`](/en/operations/settings/settings#lightweight_deletes_sync). If disabled, the `DELETE` statement is going to return immediately, but the data can still be visible to queries until the background mutation is finished.
|
||||||
|
|
||||||
|
The mutation does not physically delete the rows that have been marked as deleted, this will only happen during the next merge. As a result, it is possible that for an unspecified period, data is not actually deleted from storage and is only marked as deleted.
|
||||||
|
|
||||||
If you need to guarantee that your data is deleted from storage in a predictable time, consider using the table setting [`min_age_to_force_merge_seconds`](https://clickhouse.com/docs/en/operations/settings/merge-tree-settings#min_age_to_force_merge_seconds). Or you can use the [ALTER TABLE ... DELETE](/en/sql-reference/statements/alter/delete) command. Note that deleting data using `ALTER TABLE ... DELETE` may consume significant resources as it recreates all affected parts.
|
If you need to guarantee that your data is deleted from storage in a predictable time, consider using the table setting [`min_age_to_force_merge_seconds`](https://clickhouse.com/docs/en/operations/settings/merge-tree-settings#min_age_to_force_merge_seconds). Or you can use the [ALTER TABLE ... DELETE](/en/sql-reference/statements/alter/delete) command. Note that deleting data using `ALTER TABLE ... DELETE` may consume significant resources as it recreates all affected parts.
|
||||||
|
|
||||||
|
@ -42,7 +42,7 @@ public:
|
|||||||
size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);
|
size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);
|
||||||
|
|
||||||
using Entry = IConnectionPool::Entry;
|
using Entry = IConnectionPool::Entry;
|
||||||
using PoolWithFailoverBase<IConnectionPool>::isTryResultInvalid;
|
using PoolWithFailoverBase<IConnectionPool>::checkTryResultIsValid;
|
||||||
|
|
||||||
/** Allocates connection to work. */
|
/** Allocates connection to work. */
|
||||||
Entry get(const ConnectionTimeouts & timeouts) override;
|
Entry get(const ConnectionTimeouts & timeouts) override;
|
||||||
|
@ -122,6 +122,14 @@ public:
|
|||||||
return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly);
|
return result.entry.isNull() || !result.is_usable || (skip_read_only_replicas && result.is_readonly);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void checkTryResultIsValid(const TryResult & result, bool skip_read_only_replicas) const
|
||||||
|
{
|
||||||
|
if (isTryResultInvalid(result, skip_read_only_replicas))
|
||||||
|
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
|
||||||
|
"Got an invalid connection result: entry.isNull {}, is_usable {}, is_up_to_date {}, delay {}, is_readonly {}, skip_read_only_replicas {}",
|
||||||
|
result.entry.isNull(), result.is_usable, result.is_up_to_date, result.delay, result.is_readonly, skip_read_only_replicas);
|
||||||
|
}
|
||||||
|
|
||||||
size_t getPoolSize() const { return nested_pools.size(); }
|
size_t getPoolSize() const { return nested_pools.size(); }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
@ -28,7 +28,6 @@ namespace ErrorCodes
|
|||||||
extern const int TOO_MANY_PARTITIONS;
|
extern const int TOO_MANY_PARTITIONS;
|
||||||
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
|
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
|
||||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||||
extern const int LOGICAL_ERROR;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Can the batch be split and send files from batch one-by-one instead?
|
/// Can the batch be split and send files from batch one-by-one instead?
|
||||||
@ -244,9 +243,7 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha
|
|||||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
||||||
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
|
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
|
||||||
auto result = results.front();
|
auto result = results.front();
|
||||||
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
|
parent.pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas);
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
|
|
||||||
|
|
||||||
connection = std::move(result.entry);
|
connection = std::move(result.entry);
|
||||||
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
|
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
|
||||||
|
|
||||||
@ -306,9 +303,7 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett
|
|||||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
||||||
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
|
auto results = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
|
||||||
auto result = results.front();
|
auto result = results.front();
|
||||||
if (parent.pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
|
parent.pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas);
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
|
|
||||||
|
|
||||||
auto connection = std::move(result.entry);
|
auto connection = std::move(result.entry);
|
||||||
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
|
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
|
||||||
|
|
||||||
|
@ -416,9 +416,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path,
|
|||||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
|
||||||
auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
|
auto results = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
|
||||||
auto result = results.front();
|
auto result = results.front();
|
||||||
if (pool->isTryResultInvalid(result, insert_settings.distributed_insert_skip_read_only_replicas))
|
pool->checkTryResultIsValid(result, insert_settings.distributed_insert_skip_read_only_replicas);
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
|
|
||||||
|
|
||||||
auto connection = std::move(result.entry);
|
auto connection = std::move(result.entry);
|
||||||
|
|
||||||
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
|
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
|
||||||
|
@ -347,7 +347,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
|
|||||||
}
|
}
|
||||||
|
|
||||||
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
|
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
|
||||||
const Settings & settings = context->getSettingsRef();
|
const Settings settings = context->getSettingsCopy();
|
||||||
|
|
||||||
size_t rows = shard_block.rows();
|
size_t rows = shard_block.rows();
|
||||||
|
|
||||||
@ -378,9 +378,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
|
|||||||
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
|
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
|
||||||
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
|
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
|
||||||
auto result = results.front();
|
auto result = results.front();
|
||||||
if (shard_info.pool->isTryResultInvalid(result, settings.distributed_insert_skip_read_only_replicas))
|
shard_info.pool->checkTryResultIsValid(result, settings.distributed_insert_skip_read_only_replicas);
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
|
|
||||||
|
|
||||||
job.connection_entry = std::move(result.entry);
|
job.connection_entry = std::move(result.entry);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -2112,6 +2112,7 @@ class ClickHouseCluster:
|
|||||||
self.base_cmd + ["up", "--force-recreate", "--no-deps", "-d", node.name]
|
self.base_cmd + ["up", "--force-recreate", "--no-deps", "-d", node.name]
|
||||||
)
|
)
|
||||||
node.ip_address = self.get_instance_ip(node.name)
|
node.ip_address = self.get_instance_ip(node.name)
|
||||||
|
node.ipv6_address = self.get_instance_global_ipv6(node.name)
|
||||||
node.client = Client(node.ip_address, command=self.client_bin_path)
|
node.client = Client(node.ip_address, command=self.client_bin_path)
|
||||||
|
|
||||||
logging.info("Restart node with ip change")
|
logging.info("Restart node with ip change")
|
||||||
@ -3182,6 +3183,7 @@ class ClickHouseCluster:
|
|||||||
for instance in self.instances.values():
|
for instance in self.instances.values():
|
||||||
instance.docker_client = self.docker_client
|
instance.docker_client = self.docker_client
|
||||||
instance.ip_address = self.get_instance_ip(instance.name)
|
instance.ip_address = self.get_instance_ip(instance.name)
|
||||||
|
instance.ipv6_address = self.get_instance_global_ipv6(instance.name)
|
||||||
|
|
||||||
logging.debug(
|
logging.debug(
|
||||||
f"Waiting for ClickHouse start in {instance.name}, ip: {instance.ip_address}..."
|
f"Waiting for ClickHouse start in {instance.name}, ip: {instance.ip_address}..."
|
||||||
|
@ -3,6 +3,7 @@ import subprocess
|
|||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
import docker
|
import docker
|
||||||
|
import ipaddress
|
||||||
|
|
||||||
|
|
||||||
class PartitionManager:
|
class PartitionManager:
|
||||||
@ -26,23 +27,74 @@ class PartitionManager:
|
|||||||
self._check_instance(instance)
|
self._check_instance(instance)
|
||||||
|
|
||||||
self._add_rule(
|
self._add_rule(
|
||||||
{"source": instance.ip_address, "destination_port": 2181, "action": action}
|
{
|
||||||
|
"source": instance.ip_address,
|
||||||
|
"destination_port": 2181,
|
||||||
|
"action": action,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
self._add_rule(
|
self._add_rule(
|
||||||
{"destination": instance.ip_address, "source_port": 2181, "action": action}
|
{
|
||||||
|
"destination": instance.ip_address,
|
||||||
|
"source_port": 2181,
|
||||||
|
"action": action,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if instance.ipv6_address:
|
||||||
|
self._add_rule(
|
||||||
|
{
|
||||||
|
"source": instance.ipv6_address,
|
||||||
|
"destination_port": 2181,
|
||||||
|
"action": action,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
self._add_rule(
|
||||||
|
{
|
||||||
|
"destination": instance.ipv6_address,
|
||||||
|
"source_port": 2181,
|
||||||
|
"action": action,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
def dump_rules(self):
|
def dump_rules(self):
|
||||||
return _NetworkManager.get().dump_rules()
|
v4 = _NetworkManager.get().dump_rules()
|
||||||
|
v6 = _NetworkManager.get().dump_v6_rules()
|
||||||
|
|
||||||
|
return v4 + v6
|
||||||
|
|
||||||
def restore_instance_zk_connections(self, instance, action="DROP"):
|
def restore_instance_zk_connections(self, instance, action="DROP"):
|
||||||
self._check_instance(instance)
|
self._check_instance(instance)
|
||||||
|
|
||||||
self._delete_rule(
|
self._delete_rule(
|
||||||
{"source": instance.ip_address, "destination_port": 2181, "action": action}
|
{
|
||||||
|
"source": instance.ip_address,
|
||||||
|
"destination_port": 2181,
|
||||||
|
"action": action,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
self._delete_rule(
|
self._delete_rule(
|
||||||
{"destination": instance.ip_address, "source_port": 2181, "action": action}
|
{
|
||||||
|
"destination": instance.ip_address,
|
||||||
|
"source_port": 2181,
|
||||||
|
"action": action,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if instance.ipv6_address:
|
||||||
|
self._delete_rule(
|
||||||
|
{
|
||||||
|
"source": instance.ipv6_address,
|
||||||
|
"destination_port": 2181,
|
||||||
|
"action": action,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
self._delete_rule(
|
||||||
|
{
|
||||||
|
"destination": instance.ipv6_address,
|
||||||
|
"source_port": 2181,
|
||||||
|
"action": action,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
def partition_instances(self, left, right, port=None, action="DROP"):
|
def partition_instances(self, left, right, port=None, action="DROP"):
|
||||||
@ -59,15 +111,33 @@ class PartitionManager:
|
|||||||
rule["destination_port"] = port
|
rule["destination_port"] = port
|
||||||
return rule
|
return rule
|
||||||
|
|
||||||
|
def create_rule_v6(src, dst):
|
||||||
|
rule = {
|
||||||
|
"source": src.ipv6_address,
|
||||||
|
"destination": dst.ipv6_address,
|
||||||
|
"action": action,
|
||||||
|
}
|
||||||
|
if port is not None:
|
||||||
|
rule["destination_port"] = port
|
||||||
|
return rule
|
||||||
|
|
||||||
self._add_rule(create_rule(left, right))
|
self._add_rule(create_rule(left, right))
|
||||||
self._add_rule(create_rule(right, left))
|
self._add_rule(create_rule(right, left))
|
||||||
|
|
||||||
|
if left.ipv6_address and right.ipv6_address:
|
||||||
|
self._add_rule(create_rule_v6(left, right))
|
||||||
|
self._add_rule(create_rule_v6(right, left))
|
||||||
|
|
||||||
def add_network_delay(self, instance, delay_ms):
|
def add_network_delay(self, instance, delay_ms):
|
||||||
self._add_tc_netem_delay(instance, delay_ms)
|
self._add_tc_netem_delay(instance, delay_ms)
|
||||||
|
|
||||||
def heal_all(self):
|
def heal_all(self):
|
||||||
while self._iptables_rules:
|
while self._iptables_rules:
|
||||||
rule = self._iptables_rules.pop()
|
rule = self._iptables_rules.pop()
|
||||||
|
|
||||||
|
if self._is_ipv6_rule(rule):
|
||||||
|
_NetworkManager.get().delete_ip6tables_rule(**rule)
|
||||||
|
else:
|
||||||
_NetworkManager.get().delete_iptables_rule(**rule)
|
_NetworkManager.get().delete_iptables_rule(**rule)
|
||||||
|
|
||||||
while self._netem_delayed_instances:
|
while self._netem_delayed_instances:
|
||||||
@ -90,11 +160,26 @@ class PartitionManager:
|
|||||||
if instance.ip_address is None:
|
if instance.ip_address is None:
|
||||||
raise Exception("Instance + " + instance.name + " is not launched!")
|
raise Exception("Instance + " + instance.name + " is not launched!")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_ipv6_rule(rule):
|
||||||
|
if rule.get("source"):
|
||||||
|
return ipaddress.ip_address(rule["source"]).version == 6
|
||||||
|
if rule.get("destination"):
|
||||||
|
return ipaddress.ip_address(rule["destination"]).version == 6
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def _add_rule(self, rule):
|
def _add_rule(self, rule):
|
||||||
|
if self._is_ipv6_rule(rule):
|
||||||
|
_NetworkManager.get().add_ip6tables_rule(**rule)
|
||||||
|
else:
|
||||||
_NetworkManager.get().add_iptables_rule(**rule)
|
_NetworkManager.get().add_iptables_rule(**rule)
|
||||||
self._iptables_rules.append(rule)
|
self._iptables_rules.append(rule)
|
||||||
|
|
||||||
def _delete_rule(self, rule):
|
def _delete_rule(self, rule):
|
||||||
|
if self._is_ipv6_rule(rule):
|
||||||
|
_NetworkManager.get().delete_ip6tables_rule(**rule)
|
||||||
|
else:
|
||||||
_NetworkManager.get().delete_iptables_rule(**rule)
|
_NetworkManager.get().delete_iptables_rule(**rule)
|
||||||
self._iptables_rules.remove(rule)
|
self._iptables_rules.remove(rule)
|
||||||
|
|
||||||
@ -150,35 +235,65 @@ class _NetworkManager:
|
|||||||
cls._instance = cls(**kwargs)
|
cls._instance = cls(**kwargs)
|
||||||
return cls._instance
|
return cls._instance
|
||||||
|
|
||||||
|
def setup_ip6tables_docker_user_chain(self):
|
||||||
|
_rules = subprocess.check_output(f"ip6tables-save", shell=True)
|
||||||
|
if "DOCKER-USER" in _rules.decode("utf-8"):
|
||||||
|
return
|
||||||
|
|
||||||
|
setup_cmds = [
|
||||||
|
["ip6tables", "--wait", "-N", "DOCKER-USER"],
|
||||||
|
["ip6tables", "--wait", "-I", "FORWARD", "-j", "DOCKER-USER"],
|
||||||
|
["ip6tables", "--wait", "-A", "DOCKER-USER", "-j", "RETURN"],
|
||||||
|
]
|
||||||
|
for cmd in setup_cmds:
|
||||||
|
self._exec_run(cmd, privileged=True)
|
||||||
|
|
||||||
def add_iptables_rule(self, **kwargs):
|
def add_iptables_rule(self, **kwargs):
|
||||||
cmd = ["iptables", "--wait", "-I", "DOCKER-USER", "1"]
|
cmd = ["iptables", "--wait", "-I", "DOCKER-USER", "1"]
|
||||||
cmd.extend(self._iptables_cmd_suffix(**kwargs))
|
cmd.extend(self._iptables_cmd_suffix(**kwargs))
|
||||||
self._exec_run(cmd, privileged=True)
|
self._exec_run(cmd, privileged=True)
|
||||||
|
|
||||||
|
def add_ip6tables_rule(self, **kwargs):
|
||||||
|
self.setup_ip6tables_docker_user_chain()
|
||||||
|
|
||||||
|
cmd = ["ip6tables", "--wait", "-I", "DOCKER-USER", "1"]
|
||||||
|
cmd.extend(self._iptables_cmd_suffix(**kwargs))
|
||||||
|
self._exec_run(cmd, privileged=True)
|
||||||
|
|
||||||
def delete_iptables_rule(self, **kwargs):
|
def delete_iptables_rule(self, **kwargs):
|
||||||
cmd = ["iptables", "--wait", "-D", "DOCKER-USER"]
|
cmd = ["iptables", "--wait", "-D", "DOCKER-USER"]
|
||||||
cmd.extend(self._iptables_cmd_suffix(**kwargs))
|
cmd.extend(self._iptables_cmd_suffix(**kwargs))
|
||||||
self._exec_run(cmd, privileged=True)
|
self._exec_run(cmd, privileged=True)
|
||||||
|
|
||||||
|
def delete_ip6tables_rule(self, **kwargs):
|
||||||
|
cmd = ["ip6tables", "--wait", "-D", "DOCKER-USER"]
|
||||||
|
cmd.extend(self._iptables_cmd_suffix(**kwargs))
|
||||||
|
self._exec_run(cmd, privileged=True)
|
||||||
|
|
||||||
def dump_rules(self):
|
def dump_rules(self):
|
||||||
cmd = ["iptables", "-L", "DOCKER-USER"]
|
cmd = ["iptables", "-L", "DOCKER-USER"]
|
||||||
return self._exec_run(cmd, privileged=True)
|
return self._exec_run(cmd, privileged=True)
|
||||||
|
|
||||||
|
def dump_v6_rules(self):
|
||||||
|
cmd = ["ip6tables", "-L", "DOCKER-USER"]
|
||||||
|
return self._exec_run(cmd, privileged=True)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def clean_all_user_iptables_rules():
|
def clean_all_user_iptables_rules():
|
||||||
|
for iptables in ("iptables", "ip6tables"):
|
||||||
for i in range(1000):
|
for i in range(1000):
|
||||||
iptables_iter = i
|
iptables_iter = i
|
||||||
# when rules will be empty, it will return error
|
# when rules will be empty, it will return error
|
||||||
res = subprocess.run("iptables --wait -D DOCKER-USER 1", shell=True)
|
res = subprocess.run(f"{iptables} --wait -D DOCKER-USER 1", shell=True)
|
||||||
|
|
||||||
if res.returncode != 0:
|
if res.returncode != 0:
|
||||||
logging.info(
|
logging.info(
|
||||||
"All iptables rules cleared, "
|
f"All {iptables} rules cleared, "
|
||||||
+ str(iptables_iter)
|
+ str(iptables_iter)
|
||||||
+ " iterations, last error: "
|
+ " iterations, last error: "
|
||||||
+ str(res.stderr)
|
+ str(res.stderr)
|
||||||
)
|
)
|
||||||
return
|
break
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _iptables_cmd_suffix(
|
def _iptables_cmd_suffix(
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
<access_key_id>minio</access_key_id>
|
<access_key_id>minio</access_key_id>
|
||||||
<secret_access_key>minio123</secret_access_key>
|
<secret_access_key>minio123</secret_access_key>
|
||||||
</s3_snapshot>
|
</s3_snapshot>
|
||||||
|
<use_cluster>false</use_cluster>
|
||||||
<tcp_port>9181</tcp_port>
|
<tcp_port>9181</tcp_port>
|
||||||
<server_id>1</server_id>
|
<server_id>1</server_id>
|
||||||
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
<access_key_id>minio</access_key_id>
|
<access_key_id>minio</access_key_id>
|
||||||
<secret_access_key>minio123</secret_access_key>
|
<secret_access_key>minio123</secret_access_key>
|
||||||
</s3_snapshot>
|
</s3_snapshot>
|
||||||
|
<use_cluster>false</use_cluster>
|
||||||
<tcp_port>9181</tcp_port>
|
<tcp_port>9181</tcp_port>
|
||||||
<server_id>2</server_id>
|
<server_id>2</server_id>
|
||||||
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
<access_key_id>minio</access_key_id>
|
<access_key_id>minio</access_key_id>
|
||||||
<secret_access_key>minio123</secret_access_key>
|
<secret_access_key>minio123</secret_access_key>
|
||||||
</s3_snapshot>
|
</s3_snapshot>
|
||||||
|
<use_cluster>false</use_cluster>
|
||||||
<tcp_port>9181</tcp_port>
|
<tcp_port>9181</tcp_port>
|
||||||
<server_id>3</server_id>
|
<server_id>3</server_id>
|
||||||
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||||
|
@ -2,6 +2,9 @@ import pytest
|
|||||||
from helpers.cluster import ClickHouseCluster
|
from helpers.cluster import ClickHouseCluster
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from retry import retry
|
from retry import retry
|
||||||
|
from multiprocessing.dummy import Pool
|
||||||
|
import helpers.keeper_utils as keeper_utils
|
||||||
|
from minio.deleteobjects import DeleteObject
|
||||||
|
|
||||||
from kazoo.client import KazooClient
|
from kazoo.client import KazooClient
|
||||||
|
|
||||||
@ -75,7 +78,18 @@ def wait_node(node):
|
|||||||
raise Exception("Can't wait node", node.name, "to become ready")
|
raise Exception("Can't wait node", node.name, "to become ready")
|
||||||
|
|
||||||
|
|
||||||
|
def delete_keeper_snapshots_logs(nodex):
|
||||||
|
nodex.exec_in_container(
|
||||||
|
[
|
||||||
|
"bash",
|
||||||
|
"-c",
|
||||||
|
"rm -rf /var/lib/clickhouse/coordination/log /var/lib/clickhouse/coordination/snapshots",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_s3_upload(started_cluster):
|
def test_s3_upload(started_cluster):
|
||||||
|
|
||||||
node1_zk = get_fake_zk(node1.name)
|
node1_zk = get_fake_zk(node1.name)
|
||||||
|
|
||||||
# we defined in configs snapshot_distance as 50
|
# we defined in configs snapshot_distance as 50
|
||||||
@ -89,6 +103,11 @@ def test_s3_upload(started_cluster):
|
|||||||
for obj in list(cluster.minio_client.list_objects("snapshots"))
|
for obj in list(cluster.minio_client.list_objects("snapshots"))
|
||||||
]
|
]
|
||||||
|
|
||||||
|
def delete_s3_snapshots():
|
||||||
|
snapshots = cluster.minio_client.list_objects("snapshots")
|
||||||
|
for s in snapshots:
|
||||||
|
cluster.minio_client.remove_object("snapshots", s.object_name)
|
||||||
|
|
||||||
# Keeper sends snapshots asynchornously, hence we need to retry.
|
# Keeper sends snapshots asynchornously, hence we need to retry.
|
||||||
@retry(AssertionError, tries=10, delay=2)
|
@retry(AssertionError, tries=10, delay=2)
|
||||||
def _check_snapshots():
|
def _check_snapshots():
|
||||||
@ -125,3 +144,26 @@ def test_s3_upload(started_cluster):
|
|||||||
)
|
)
|
||||||
|
|
||||||
destroy_zk_client(node2_zk)
|
destroy_zk_client(node2_zk)
|
||||||
|
node2.stop_clickhouse()
|
||||||
|
delete_keeper_snapshots_logs(node2)
|
||||||
|
node3.stop_clickhouse()
|
||||||
|
delete_keeper_snapshots_logs(node3)
|
||||||
|
delete_keeper_snapshots_logs(node1)
|
||||||
|
p = Pool(3)
|
||||||
|
waiters = []
|
||||||
|
|
||||||
|
def start_clickhouse(node):
|
||||||
|
node.start_clickhouse()
|
||||||
|
|
||||||
|
waiters.append(p.apply_async(start_clickhouse, args=(node1,)))
|
||||||
|
waiters.append(p.apply_async(start_clickhouse, args=(node2,)))
|
||||||
|
waiters.append(p.apply_async(start_clickhouse, args=(node3,)))
|
||||||
|
|
||||||
|
delete_s3_snapshots() # for next iteration
|
||||||
|
|
||||||
|
for waiter in waiters:
|
||||||
|
waiter.wait()
|
||||||
|
|
||||||
|
keeper_utils.wait_until_connected(cluster, node1)
|
||||||
|
keeper_utils.wait_until_connected(cluster, node2)
|
||||||
|
keeper_utils.wait_until_connected(cluster, node3)
|
||||||
|
@ -629,5 +629,6 @@ def test_roles_cache():
|
|||||||
check()
|
check()
|
||||||
|
|
||||||
instance.query("DROP USER " + ", ".join(users))
|
instance.query("DROP USER " + ", ".join(users))
|
||||||
|
if roles:
|
||||||
instance.query("DROP ROLE " + ", ".join(roles))
|
instance.query("DROP ROLE " + ", ".join(roles))
|
||||||
instance.query("DROP TABLE tbl")
|
instance.query("DROP TABLE tbl")
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
-- Tags: no-fasttest
|
-- Tags: no-fasttest, no-debug, no-tsan, no-msan, no-asan
|
||||||
|
|
||||||
SET min_execution_speed = 100000000000, timeout_before_checking_execution_speed = 0;
|
SET min_execution_speed = 100000000000, timeout_before_checking_execution_speed = 0;
|
||||||
SELECT count() FROM system.numbers; -- { serverError TOO_SLOW }
|
SELECT count() FROM system.numbers; -- { serverError TOO_SLOW }
|
||||||
|
Loading…
Reference in New Issue
Block a user