Add replicated_deduplication_window_seconds merge_tree parameter. [#CLICKHOUSE-3173]

This commit is contained in:
Vitaliy Lyudvichenko 2017-07-24 23:12:59 +03:00 committed by alexey-milovidov
parent dcc4d3bff0
commit cd5bb4d921
4 changed files with 39 additions and 25 deletions

View File

@ -54,6 +54,7 @@ struct MergeTreeSettings
/// How many last blocks of hashes should be kept in ZooKeeper.
size_t replicated_deduplication_window = 100;
/// How many seconds from an INSERT to the last INSERT should pass to delete the block hash from ZooKeeper.
size_t replicated_deduplication_window_seconds = 7 * 24 * 60 * 60; /// one week
/// Keep about this number of last records in ZooKeeper log, even if they are obsolete.
@ -96,6 +97,9 @@ struct MergeTreeSettings
/// Period to check replication delay and compare with other replicas.
size_t check_delay_period = 60;
/// Period to clean old queue logs, blocks hashes and parts
size_t cleanup_delay_period = 30;
/// Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited.
size_t min_relative_delay_to_yield_leadership = 120;
@ -139,6 +143,7 @@ struct MergeTreeSettings
SET(parts_to_throw_insert, getUInt64);
SET(max_delay_to_insert, getUInt64);
SET(replicated_deduplication_window, getUInt64);
SET(replicated_deduplication_window_seconds, getUInt64);
SET(replicated_logs_to_keep, getUInt64);
SET(prefer_fetch_merged_part_time_threshold, getUInt64);
SET(prefer_fetch_merged_part_size_threshold, getUInt64);
@ -153,6 +158,7 @@ struct MergeTreeSettings
SET(replicated_can_become_leader, getBool);
SET(zookeeper_session_expiration_check_period, getUInt64);
SET(check_delay_period, getUInt64);
SET(cleanup_delay_period, getUInt64);
SET(min_relative_delay_to_yield_leadership, getUInt64);
SET(min_relative_delay_to_close, getUInt64);
SET(min_absolute_delay_to_close, getUInt64);

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/setThreadName.h>
#include <Poco/Timestamp.h>
namespace DB
@ -22,7 +23,7 @@ void ReplicatedMergeTreeCleanupThread::run()
{
setThreadName("ReplMTCleanup");
const auto CLEANUP_SLEEP_MS = 30 * 1000;
const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000;
while (!storage.shutdown_called)
{
@ -116,18 +117,13 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
if (!zookeeper->exists(storage.zookeeper_path + "/blocks", &stat))
throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int children_count = stat.numChildren;
/// To make "asymptotically" fewer `exists` requests, we will wait for 1.1 times more blocks to accumulate than necessary.
if (static_cast<double>(children_count) < storage.data.settings.replicated_deduplication_window * 1.1)
return;
LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - storage.data.settings.replicated_deduplication_window
<< " old blocks from ZooKeeper. This might take several minutes.");
LOG_TRACE(log, "Checking " << stat.numChildren << " blocks to clear old ones from ZooKeeper. This might take several minutes.");
Strings blocks = zookeeper->getChildren(storage.zookeeper_path + "/blocks");
std::vector<std::pair<Int64, String> > timed_blocks;
using TimedBlock = std::pair<Int64, String>;
using TimedBlocksComparator = std::greater<TimedBlock>;
std::vector<TimedBlock> timed_blocks;
for (const String & block : blocks)
{
@ -136,14 +132,27 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
timed_blocks.push_back(std::make_pair(stat.ctime, block));
}
std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>());
for (size_t i = storage.data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i)
if (timed_blocks.empty())
return;
std::sort(timed_blocks.begin(), timed_blocks.end(), TimedBlocksComparator());
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64 current_time = timed_blocks.front().first;
Int64 time_treshold = std::max(0L, current_time - static_cast<Int64>(storage.data.settings.replicated_deduplication_window_seconds));
TimedBlock block_treshold(time_treshold, "");
auto first_outdated_block_fixed_treshold = timed_blocks.begin() + storage.data.settings.replicated_deduplication_window;
auto first_outdated_block_time_treshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_treshold, TimedBlocksComparator());
auto first_outdated_block = std::min(first_outdated_block_fixed_treshold, first_outdated_block_time_treshold);
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
{
/// TODO After about half a year, we could replace this to multi op, because there will be no obsolete children nodes.
zookeeper->removeRecursive(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second);
zookeeper->removeRecursive(storage.zookeeper_path + "/blocks/" + it->second);
}
LOG_TRACE(log, "Cleared " << blocks.size() - storage.data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
LOG_TRACE(log, "Cleared " << timed_blocks.end() - first_outdated_block << " old blocks from ZooKeeper");
}
}

View File

@ -12,24 +12,23 @@ class Client:
def query(self, sql, stdin=None, timeout=None):
return QueryRequest(self, sql, stdin, timeout).get_answer()
return self.get_query_request(sql, stdin, timeout).get_answer()
def get_query_request(self, sql, stdin=None, timeout=None):
return QueryRequest(self, sql, stdin, timeout)
command = self.command[:]
class QueryRequest:
def __init__(self, client, sql, stdin=None, timeout=None):
self.client = client
command = self.client.command[:]
if stdin is None:
command += ['--multiquery']
stdin = sql
else:
command += ['--query', sql]
return CommandRequest(command, stdin, timeout)
class CommandRequest:
def __init__(self, command, stdin=None, timeout=None):
# Write data to tmp file to avoid PIPEs and execution blocking
stdin_file = tempfile.TemporaryFile()
stdin_file.write(stdin)
@ -37,7 +36,7 @@ class QueryRequest:
self.stdout_file = tempfile.TemporaryFile()
self.stderr_file = tempfile.TemporaryFile()
#print " ".join(command), "\nQuery:", sql
#print " ".join(command)
self.process = sp.Popen(command, stdin=stdin_file, stdout=self.stdout_file, stderr=self.stderr_file)
@ -46,8 +45,8 @@ class QueryRequest:
if timeout is not None:
def kill_process():
if self.process.poll() is None:
self.process.kill()
self.process_finished_before_timeout = False
self.process.kill()
self.timer = Timer(timeout, kill_process)
self.timer.start()

View File

@ -13,7 +13,7 @@ import xml.dom.minidom
import docker
from .client import Client
from .client import Client, CommandRequest
HELPERS_DIR = p.dirname(__file__)