diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index 6d4cd248ae7..22ae1aa56b0 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -54,6 +54,7 @@ struct MergeTreeSettings /// How many last blocks of hashes should be kept in ZooKeeper. size_t replicated_deduplication_window = 100; + /// How many seconds from an INSERT to the last INSERT should pass to delete the block hash from ZooKeeper. size_t replicated_deduplication_window_seconds = 7 * 24 * 60 * 60; /// one week /// Keep about this number of last records in ZooKeeper log, even if they are obsolete. @@ -96,6 +97,9 @@ struct MergeTreeSettings /// Period to check replication delay and compare with other replicas. size_t check_delay_period = 60; + /// Period to clean old queue logs, blocks hashes and parts + size_t cleanup_delay_period = 30; + /// Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited. size_t min_relative_delay_to_yield_leadership = 120; @@ -139,6 +143,7 @@ struct MergeTreeSettings SET(parts_to_throw_insert, getUInt64); SET(max_delay_to_insert, getUInt64); SET(replicated_deduplication_window, getUInt64); + SET(replicated_deduplication_window_seconds, getUInt64); SET(replicated_logs_to_keep, getUInt64); SET(prefer_fetch_merged_part_time_threshold, getUInt64); SET(prefer_fetch_merged_part_size_threshold, getUInt64); @@ -153,6 +158,7 @@ struct MergeTreeSettings SET(replicated_can_become_leader, getBool); SET(zookeeper_session_expiration_check_period, getUInt64); SET(check_delay_period, getUInt64); + SET(cleanup_delay_period, getUInt64); SET(min_relative_delay_to_yield_leadership, getUInt64); SET(min_relative_delay_to_close, getUInt64); SET(min_absolute_delay_to_close, getUInt64); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 4eb38f3e850..60b4fbd916c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -1,6 +1,7 @@ #include #include #include +#include namespace DB @@ -22,7 +23,7 @@ void ReplicatedMergeTreeCleanupThread::run() { setThreadName("ReplMTCleanup"); - const auto CLEANUP_SLEEP_MS = 30 * 1000; + const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000; while (!storage.shutdown_called) { @@ -116,18 +117,13 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() if (!zookeeper->exists(storage.zookeeper_path + "/blocks", &stat)) throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE); - int children_count = stat.numChildren; - - /// To make "asymptotically" fewer `exists` requests, we will wait for 1.1 times more blocks to accumulate than necessary. - if (static_cast(children_count) < storage.data.settings.replicated_deduplication_window * 1.1) - return; - - LOG_TRACE(log, "Clearing about " << static_cast(children_count) - storage.data.settings.replicated_deduplication_window - << " old blocks from ZooKeeper. This might take several minutes."); + LOG_TRACE(log, "Checking " << stat.numChildren << " blocks to clear old ones from ZooKeeper. This might take several minutes."); Strings blocks = zookeeper->getChildren(storage.zookeeper_path + "/blocks"); - std::vector > timed_blocks; + using TimedBlock = std::pair; + using TimedBlocksComparator = std::greater; + std::vector timed_blocks; for (const String & block : blocks) { @@ -136,14 +132,27 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() timed_blocks.push_back(std::make_pair(stat.ctime, block)); } - std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater>()); - for (size_t i = storage.data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i) + if (timed_blocks.empty()) + return; + + std::sort(timed_blocks.begin(), timed_blocks.end(), TimedBlocksComparator()); + + /// Use ZooKeeper's first node (last according to time) timestamp as "current" time. + Int64 current_time = timed_blocks.front().first; + Int64 time_treshold = std::max(0L, current_time - static_cast(storage.data.settings.replicated_deduplication_window_seconds)); + TimedBlock block_treshold(time_treshold, ""); + + auto first_outdated_block_fixed_treshold = timed_blocks.begin() + storage.data.settings.replicated_deduplication_window; + auto first_outdated_block_time_treshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_treshold, TimedBlocksComparator()); + auto first_outdated_block = std::min(first_outdated_block_fixed_treshold, first_outdated_block_time_treshold); + + for (auto it = first_outdated_block; it != timed_blocks.end(); ++it) { /// TODO After about half a year, we could replace this to multi op, because there will be no obsolete children nodes. - zookeeper->removeRecursive(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second); + zookeeper->removeRecursive(storage.zookeeper_path + "/blocks/" + it->second); } - LOG_TRACE(log, "Cleared " << blocks.size() - storage.data.settings.replicated_deduplication_window << " old blocks from ZooKeeper"); + LOG_TRACE(log, "Cleared " << timed_blocks.end() - first_outdated_block << " old blocks from ZooKeeper"); } } diff --git a/dbms/tests/integration/helpers/client.py b/dbms/tests/integration/helpers/client.py index a7479efde36..bd87e680f71 100644 --- a/dbms/tests/integration/helpers/client.py +++ b/dbms/tests/integration/helpers/client.py @@ -12,24 +12,23 @@ class Client: def query(self, sql, stdin=None, timeout=None): - return QueryRequest(self, sql, stdin, timeout).get_answer() + return self.get_query_request(sql, stdin, timeout).get_answer() def get_query_request(self, sql, stdin=None, timeout=None): - return QueryRequest(self, sql, stdin, timeout) + command = self.command[:] - -class QueryRequest: - def __init__(self, client, sql, stdin=None, timeout=None): - self.client = client - - command = self.client.command[:] if stdin is None: command += ['--multiquery'] stdin = sql else: command += ['--query', sql] + return CommandRequest(command, stdin, timeout) + + +class CommandRequest: + def __init__(self, command, stdin=None, timeout=None): # Write data to tmp file to avoid PIPEs and execution blocking stdin_file = tempfile.TemporaryFile() stdin_file.write(stdin) @@ -37,7 +36,7 @@ class QueryRequest: self.stdout_file = tempfile.TemporaryFile() self.stderr_file = tempfile.TemporaryFile() - #print " ".join(command), "\nQuery:", sql + #print " ".join(command) self.process = sp.Popen(command, stdin=stdin_file, stdout=self.stdout_file, stderr=self.stderr_file) @@ -46,8 +45,8 @@ class QueryRequest: if timeout is not None: def kill_process(): if self.process.poll() is None: - self.process.kill() self.process_finished_before_timeout = False + self.process.kill() self.timer = Timer(timeout, kill_process) self.timer.start() diff --git a/dbms/tests/integration/helpers/cluster.py b/dbms/tests/integration/helpers/cluster.py index 2abad55ea5a..43e844e6879 100644 --- a/dbms/tests/integration/helpers/cluster.py +++ b/dbms/tests/integration/helpers/cluster.py @@ -13,7 +13,7 @@ import xml.dom.minidom import docker -from .client import Client +from .client import Client, CommandRequest HELPERS_DIR = p.dirname(__file__)