2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
|
|
|
|
#include <Storages/StorageReplicatedMergeTree.h>
|
|
|
|
#include <Common/setThreadName.h>
|
2017-07-24 20:12:59 +00:00
|
|
|
#include <Poco/Timestamp.h>
|
2014-10-15 01:22:06 +00:00
|
|
|
|
2018-04-06 19:43:37 +00:00
|
|
|
#include <random>
|
|
|
|
|
2014-10-15 01:22:06 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int NOT_FOUND_NODE;
|
2018-08-22 14:01:54 +00:00
|
|
|
extern const int ALL_REPLICAS_LOST;
|
2018-08-23 13:55:59 +00:00
|
|
|
extern const int REPLICA_STATUS_CHANGED;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2014-10-15 01:22:06 +00:00
|
|
|
ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_)
|
2018-05-31 13:05:05 +00:00
|
|
|
: storage(storage_)
|
|
|
|
, log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeCleanupThread)")
|
|
|
|
, log(&Logger::get(log_name))
|
2017-11-15 16:32:47 +00:00
|
|
|
{
|
2018-05-31 13:05:05 +00:00
|
|
|
task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); });
|
2017-12-29 22:32:04 +00:00
|
|
|
}
|
2014-10-15 01:22:06 +00:00
|
|
|
|
|
|
|
void ReplicatedMergeTreeCleanupThread::run()
|
|
|
|
{
|
2018-04-06 19:44:55 +00:00
|
|
|
const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000
|
|
|
|
+ std::uniform_int_distribution<UInt64>(0, storage.data.settings.cleanup_delay_period_random_add * 1000)(rng);
|
2014-10-15 01:22:06 +00:00
|
|
|
|
2017-12-29 22:32:04 +00:00
|
|
|
try
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-12-29 22:32:04 +00:00
|
|
|
iterate();
|
|
|
|
}
|
2018-04-24 17:11:59 +00:00
|
|
|
catch (const zkutil::KeeperException & e)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(log, __PRETTY_FUNCTION__);
|
2017-12-21 18:17:06 +00:00
|
|
|
|
2018-04-24 17:11:59 +00:00
|
|
|
if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED)
|
|
|
|
return;
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2017-12-29 22:32:04 +00:00
|
|
|
catch (...)
|
|
|
|
{
|
2018-04-10 13:20:14 +00:00
|
|
|
tryLogCurrentException(log, __PRETTY_FUNCTION__);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2014-10-15 01:22:06 +00:00
|
|
|
|
2018-05-31 13:05:05 +00:00
|
|
|
task->scheduleAfter(CLEANUP_SLEEP_MS);
|
2014-10-15 01:22:06 +00:00
|
|
|
}
|
|
|
|
|
2017-12-21 18:17:06 +00:00
|
|
|
|
2014-10-15 01:22:06 +00:00
|
|
|
void ReplicatedMergeTreeCleanupThread::iterate()
|
|
|
|
{
|
2017-10-06 11:30:57 +00:00
|
|
|
storage.clearOldPartsAndRemoveFromZK();
|
2017-04-01 07:20:54 +00:00
|
|
|
storage.data.clearOldTemporaryDirectories();
|
|
|
|
|
2017-11-19 21:17:58 +00:00
|
|
|
/// This is loose condition: no problem if we actually had lost leadership at this moment
|
|
|
|
/// and two replicas will try to do cleanup simultaneously.
|
2018-04-06 16:06:07 +00:00
|
|
|
if (storage.is_leader)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
clearOldLogs();
|
|
|
|
clearOldBlocks();
|
2018-07-31 11:36:08 +00:00
|
|
|
clearOldMutations();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2014-10-15 01:22:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
auto zookeeper = storage.getZooKeeper();
|
|
|
|
|
|
|
|
zkutil::Stat stat;
|
|
|
|
if (!zookeeper->exists(storage.zookeeper_path + "/log", &stat))
|
|
|
|
throw Exception(storage.zookeeper_path + "/log doesn't exist", ErrorCodes::NOT_FOUND_NODE);
|
|
|
|
|
|
|
|
int children_count = stat.numChildren;
|
|
|
|
|
|
|
|
/// We will wait for 1.1 times more records to accumulate than necessary.
|
2018-08-07 15:21:42 +00:00
|
|
|
if (static_cast<double>(children_count) < storage.data.settings.min_replicated_logs_to_keep * 1.1)
|
2017-04-01 07:20:54 +00:00
|
|
|
return;
|
|
|
|
|
|
|
|
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
|
2018-08-23 13:55:59 +00:00
|
|
|
UInt64 min_saved_log_pointer = std::numeric_limits<UInt64>::max();
|
2018-08-27 12:09:22 +00:00
|
|
|
UInt64 min_inactive_log_pointer = std::numeric_limits<UInt64>::max();
|
2018-08-07 15:21:42 +00:00
|
|
|
|
|
|
|
Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log");
|
|
|
|
|
|
|
|
if (entries.empty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
std::sort(entries.begin(), entries.end());
|
|
|
|
|
2018-08-08 14:07:39 +00:00
|
|
|
String min_saved_record_log_str = entries[entries.size() > storage.data.settings.max_replicated_logs_to_keep.value
|
|
|
|
? entries.size() - storage.data.settings.max_replicated_logs_to_keep.value
|
|
|
|
: 0];
|
2018-07-30 16:31:14 +00:00
|
|
|
|
2018-08-27 12:09:22 +00:00
|
|
|
std::unordered_set<String> recovering_replicas;
|
|
|
|
|
2018-08-23 13:55:59 +00:00
|
|
|
std::unordered_map<String, UInt32> host_versions_inactive_replicas;
|
2018-08-23 15:58:29 +00:00
|
|
|
std::unordered_map<String, String> log_pointers_lost_replicas;
|
|
|
|
|
|
|
|
size_t replicas_were_marked_is_lost = 0;
|
2018-08-09 15:06:39 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
for (const String & replica : replicas)
|
|
|
|
{
|
2018-08-23 15:58:29 +00:00
|
|
|
zkutil::Stat host_stat;
|
2018-08-20 13:31:24 +00:00
|
|
|
zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/host", &host_stat);
|
2018-08-23 14:04:53 +00:00
|
|
|
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
|
2018-07-31 13:04:35 +00:00
|
|
|
if (pointer.empty())
|
2017-04-01 07:20:54 +00:00
|
|
|
return;
|
2018-08-07 15:21:42 +00:00
|
|
|
|
2018-07-30 16:31:14 +00:00
|
|
|
UInt32 log_pointer = parse<UInt64>(pointer);
|
2018-08-23 13:55:59 +00:00
|
|
|
|
2018-07-30 16:31:14 +00:00
|
|
|
/// Check status of replica (active or not).
|
2018-08-07 15:21:42 +00:00
|
|
|
/// If replica was not active, we could check when it's log_pointer locates.
|
2018-08-27 12:09:22 +00:00
|
|
|
|
|
|
|
String res;
|
|
|
|
|
|
|
|
bool new_version_of_replica = zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", res);
|
|
|
|
|
2018-08-23 13:55:59 +00:00
|
|
|
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active"))
|
2018-08-27 12:09:22 +00:00
|
|
|
if (new_version_of_replica && res == "1")
|
|
|
|
recovering_replicas.insert(replica);
|
|
|
|
else
|
|
|
|
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
|
2018-07-30 16:31:14 +00:00
|
|
|
else
|
2018-08-07 15:21:42 +00:00
|
|
|
{
|
2018-08-27 12:09:22 +00:00
|
|
|
if (!new_version_of_replica)
|
|
|
|
{
|
|
|
|
/// Only to support old versions CH.
|
|
|
|
/// If replica did not have "/is_lost" we must save it's log_pointer.
|
|
|
|
/// Because old version CH can not work with recovering.
|
2018-08-23 13:55:59 +00:00
|
|
|
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
|
2018-08-27 12:09:22 +00:00
|
|
|
}
|
2018-08-23 13:55:59 +00:00
|
|
|
else
|
|
|
|
if (res == "0")
|
2018-08-22 14:01:54 +00:00
|
|
|
{
|
2018-08-23 13:55:59 +00:00
|
|
|
String log_pointer_str = "log-" + padIndex(log_pointer);
|
|
|
|
if (log_pointer_str >= min_saved_record_log_str)
|
|
|
|
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
|
|
|
|
else
|
|
|
|
{
|
|
|
|
host_versions_inactive_replicas[replica] = host_stat.version;
|
2018-08-23 15:58:29 +00:00
|
|
|
log_pointers_lost_replicas[replica] = log_pointer_str;
|
2018-08-27 12:16:52 +00:00
|
|
|
min_inactive_log_pointer = std::min(min_inactive_log_pointer, log_pointer);
|
2018-08-23 13:55:59 +00:00
|
|
|
}
|
2018-08-22 14:01:54 +00:00
|
|
|
}
|
2018-08-23 13:55:59 +00:00
|
|
|
else
|
2018-08-23 17:19:05 +00:00
|
|
|
{
|
2018-08-23 15:58:29 +00:00
|
|
|
++replicas_were_marked_is_lost;
|
2018-08-23 13:55:59 +00:00
|
|
|
host_versions_inactive_replicas[replica] = host_stat.version;
|
2018-08-23 17:19:05 +00:00
|
|
|
}
|
2018-08-07 15:21:42 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2018-08-27 12:09:22 +00:00
|
|
|
/// We must check log_pointer recovering replicas at the end.
|
|
|
|
/// Because log pointer recovering replicas can move backward.
|
|
|
|
for (const String & replica : recovering_replicas)
|
|
|
|
{
|
|
|
|
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
|
|
|
|
UInt32 log_pointer = parse<UInt64>(pointer);
|
|
|
|
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (recovering_replicas.size() != 0)
|
|
|
|
min_saved_log_pointer = std::min(min_saved_log_pointer, min_inactive_log_pointer);
|
|
|
|
|
2018-08-07 15:21:42 +00:00
|
|
|
/// We will not touch the last `min_replicated_logs_to_keep` records.
|
|
|
|
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.min_replicated_logs_to_keep.value), entries.end());
|
|
|
|
/// We will not touch records that are no less than `min_pointer_active_replica`.
|
2018-08-23 13:55:59 +00:00
|
|
|
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end());
|
2018-08-07 15:21:42 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (entries.empty())
|
|
|
|
return;
|
|
|
|
|
2018-08-23 15:58:29 +00:00
|
|
|
markLostReplicas(host_versions_inactive_replicas, log_pointers_lost_replicas, replicas.size() - replicas_were_marked_is_lost, zookeeper);
|
2018-08-09 15:06:39 +00:00
|
|
|
|
2018-03-24 00:45:04 +00:00
|
|
|
zkutil::Requests ops;
|
2017-04-01 07:20:54 +00:00
|
|
|
for (size_t i = 0; i < entries.size(); ++i)
|
|
|
|
{
|
2018-03-24 00:45:04 +00:00
|
|
|
ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1));
|
2018-08-23 14:20:13 +00:00
|
|
|
|
2017-08-10 15:19:36 +00:00
|
|
|
if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-08-23 16:36:54 +00:00
|
|
|
/// we need to check this because the replica that was restored from one of the marked replicas does not copy a non-valid log_pointer.
|
|
|
|
for (auto host_version: host_versions_inactive_replicas)
|
|
|
|
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" + host_version.first + "/host", host_version.second));
|
2018-08-23 15:58:29 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
|
2018-03-24 00:45:04 +00:00
|
|
|
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", stat.version));
|
2017-04-01 07:20:54 +00:00
|
|
|
zookeeper->multi(ops);
|
|
|
|
ops.clear();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Removed " << entries.size() << " old log entries: " << entries.front() << " - " << entries.back());
|
2014-10-15 01:22:06 +00:00
|
|
|
}
|
|
|
|
|
2017-08-10 15:19:36 +00:00
|
|
|
|
2018-08-23 13:55:59 +00:00
|
|
|
void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map<String, UInt32> & host_versions_inactive_replicas,
|
2018-08-23 15:58:29 +00:00
|
|
|
const std::unordered_map<String, String> & log_pointers_lost_replicas,
|
|
|
|
size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper)
|
2018-08-09 15:06:39 +00:00
|
|
|
{
|
2018-08-23 16:37:28 +00:00
|
|
|
struct LostReplicaInfo
|
|
|
|
{
|
2018-08-23 16:36:54 +00:00
|
|
|
String name;
|
|
|
|
zkutil::Requests requests;
|
2018-08-23 15:58:29 +00:00
|
|
|
};
|
|
|
|
|
2018-08-22 14:01:54 +00:00
|
|
|
std::vector<zkutil::Requests> requests;
|
2018-08-23 15:58:29 +00:00
|
|
|
std::vector<LostReplicaInfo> lost_replicas_info;
|
|
|
|
std::vector<std::pair<LostReplicaInfo, zkutil::ZooKeeper::FutureMulti>> info_and_future;
|
2018-08-09 15:06:39 +00:00
|
|
|
|
2018-08-23 15:58:29 +00:00
|
|
|
for (auto pair : log_pointers_lost_replicas)
|
2018-08-09 15:06:39 +00:00
|
|
|
{
|
|
|
|
String replica = pair.first;
|
2018-08-23 15:58:29 +00:00
|
|
|
zkutil::Requests ops;
|
|
|
|
/// If host changed version we can not mark replicas, because replica started to be active.
|
|
|
|
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_inactive_replicas.at(replica)));
|
|
|
|
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1));
|
|
|
|
lost_replicas_info.push_back(LostReplicaInfo{replica, ops});
|
2018-08-09 15:06:39 +00:00
|
|
|
}
|
2018-08-20 13:31:24 +00:00
|
|
|
|
2018-08-23 15:58:29 +00:00
|
|
|
if (lost_replicas_info.size() == replicas_count)
|
2018-08-23 13:55:59 +00:00
|
|
|
throw Exception("All replicas wiil be lost", ErrorCodes::ALL_REPLICAS_LOST);
|
2018-08-22 14:01:54 +00:00
|
|
|
|
2018-08-23 15:58:29 +00:00
|
|
|
for (auto & replica_info : lost_replicas_info)
|
|
|
|
info_and_future.emplace_back(replica_info, zookeeper->tryAsyncMulti(replica_info.requests));
|
2018-08-09 15:06:39 +00:00
|
|
|
|
2018-08-23 15:58:29 +00:00
|
|
|
for (auto & pair : info_and_future)
|
2018-08-20 13:31:24 +00:00
|
|
|
{
|
2018-08-23 15:58:29 +00:00
|
|
|
auto multi_responses = pair.second.get();
|
2018-08-23 13:55:59 +00:00
|
|
|
if (multi_responses.responses[0]->error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
|
2018-08-23 15:58:29 +00:00
|
|
|
throw Exception(pair.first.name + " became active, when we clear log", DB::ErrorCodes::REPLICA_STATUS_CHANGED);
|
2018-08-23 16:02:19 +00:00
|
|
|
zkutil::KeeperMultiException::check(multi_responses.error, pair.first.requests, multi_responses.responses);
|
2018-08-20 13:31:24 +00:00
|
|
|
}
|
2018-08-09 15:06:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-08-09 21:09:44 +00:00
|
|
|
struct ReplicatedMergeTreeCleanupThread::NodeWithStat
|
|
|
|
{
|
|
|
|
String node;
|
2017-11-15 16:32:47 +00:00
|
|
|
Int64 ctime = 0;
|
2017-08-09 21:09:44 +00:00
|
|
|
|
2017-11-15 16:32:47 +00:00
|
|
|
NodeWithStat(String node_, Int64 ctime_) : node(std::move(node_)), ctime(ctime_) {}
|
2017-08-09 21:09:44 +00:00
|
|
|
|
2017-11-15 16:32:47 +00:00
|
|
|
static bool greaterByTime(const NodeWithStat & lhs, const NodeWithStat & rhs)
|
2017-08-09 21:09:44 +00:00
|
|
|
{
|
2017-11-15 16:32:47 +00:00
|
|
|
return std::forward_as_tuple(lhs.ctime, lhs.node) > std::forward_as_tuple(rhs.ctime, rhs.node);
|
2017-08-09 21:09:44 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2014-10-15 01:22:06 +00:00
|
|
|
void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
auto zookeeper = storage.getZooKeeper();
|
2014-12-12 20:50:32 +00:00
|
|
|
|
2017-08-09 21:09:44 +00:00
|
|
|
std::vector<NodeWithStat> timed_blocks;
|
2017-11-15 16:32:47 +00:00
|
|
|
getBlocksSortedByTime(*zookeeper, timed_blocks);
|
2017-08-09 21:09:44 +00:00
|
|
|
|
|
|
|
if (timed_blocks.empty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
|
2017-11-15 16:32:47 +00:00
|
|
|
Int64 current_time = timed_blocks.front().ctime;
|
2017-09-01 17:21:03 +00:00
|
|
|
Int64 time_threshold = std::max(static_cast<Int64>(0), current_time - static_cast<Int64>(1000 * storage.data.settings.replicated_deduplication_window_seconds));
|
2017-08-10 15:19:36 +00:00
|
|
|
|
|
|
|
/// Virtual node, all nodes that are "greater" than this one will be deleted
|
2017-11-15 16:32:47 +00:00
|
|
|
NodeWithStat block_threshold{{}, time_threshold};
|
2017-08-09 21:09:44 +00:00
|
|
|
|
2017-09-20 14:41:07 +00:00
|
|
|
size_t current_deduplication_window = std::min(timed_blocks.size(), storage.data.settings.replicated_deduplication_window.value);
|
2017-08-09 21:09:44 +00:00
|
|
|
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
|
|
|
|
auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
|
|
|
|
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
|
|
|
|
|
2018-05-10 15:56:38 +00:00
|
|
|
zkutil::AsyncResponses<zkutil::RemoveResponse> try_remove_futures;
|
2017-08-09 21:09:44 +00:00
|
|
|
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
|
|
|
|
{
|
|
|
|
String path = storage.zookeeper_path + "/blocks/" + it->node;
|
2017-11-15 16:32:47 +00:00
|
|
|
try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path));
|
2017-08-09 21:09:44 +00:00
|
|
|
}
|
|
|
|
|
2017-11-15 16:32:47 +00:00
|
|
|
for (auto & pair : try_remove_futures)
|
2017-08-09 21:09:44 +00:00
|
|
|
{
|
2017-11-15 16:32:47 +00:00
|
|
|
const String & path = pair.first;
|
2018-03-24 20:00:16 +00:00
|
|
|
int32_t rc = pair.second.get().error;
|
2018-03-24 00:45:04 +00:00
|
|
|
if (rc == ZooKeeperImpl::ZooKeeper::ZNOTEMPTY)
|
2017-11-15 16:32:47 +00:00
|
|
|
{
|
2017-12-21 17:43:32 +00:00
|
|
|
/// Can happen if there are leftover block nodes with children created by previous server versions.
|
2017-11-15 16:32:47 +00:00
|
|
|
zookeeper->removeRecursive(path);
|
|
|
|
}
|
2018-03-24 01:00:12 +00:00
|
|
|
else if (rc)
|
2017-11-15 16:32:47 +00:00
|
|
|
LOG_WARNING(log,
|
|
|
|
"Error while deleting ZooKeeper path `" << path << "`: " + zkutil::ZooKeeper::error2string(rc) << ", ignoring.");
|
2017-08-09 21:09:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
|
2017-11-15 16:32:47 +00:00
|
|
|
if (num_nodes_to_delete)
|
|
|
|
LOG_TRACE(log, "Cleared " << num_nodes_to_delete << " old blocks from ZooKeeper");
|
2017-08-09 21:09:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-11-15 16:32:47 +00:00
|
|
|
void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & zookeeper, std::vector<NodeWithStat> & timed_blocks)
|
2017-08-09 21:09:44 +00:00
|
|
|
{
|
|
|
|
timed_blocks.clear();
|
|
|
|
|
2017-08-06 21:40:38 +00:00
|
|
|
Strings blocks;
|
2017-04-01 07:20:54 +00:00
|
|
|
zkutil::Stat stat;
|
2018-03-24 01:00:12 +00:00
|
|
|
if (zookeeper.tryGetChildren(storage.zookeeper_path + "/blocks", blocks, &stat))
|
2017-04-01 07:20:54 +00:00
|
|
|
throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
|
2014-10-15 01:22:06 +00:00
|
|
|
|
2017-08-06 21:40:38 +00:00
|
|
|
/// Clear already deleted blocks from the cache, cached_block_ctime should be subset of blocks
|
|
|
|
{
|
|
|
|
NameSet blocks_set(blocks.begin(), blocks.end());
|
2017-11-15 16:32:47 +00:00
|
|
|
for (auto it = cached_block_stats.begin(); it != cached_block_stats.end();)
|
2017-08-06 21:40:38 +00:00
|
|
|
{
|
|
|
|
if (!blocks_set.count(it->first))
|
2017-11-15 16:32:47 +00:00
|
|
|
it = cached_block_stats.erase(it);
|
2017-08-06 21:40:38 +00:00
|
|
|
else
|
|
|
|
++it;
|
|
|
|
}
|
|
|
|
}
|
2014-10-15 01:22:06 +00:00
|
|
|
|
2017-11-15 16:32:47 +00:00
|
|
|
auto not_cached_blocks = stat.numChildren - cached_block_stats.size();
|
2017-09-26 15:17:31 +00:00
|
|
|
if (not_cached_blocks)
|
|
|
|
{
|
|
|
|
LOG_TRACE(log, "Checking " << stat.numChildren << " blocks (" << not_cached_blocks << " are not cached)"
|
2017-11-15 16:32:47 +00:00
|
|
|
<< " to clear old ones from ZooKeeper.");
|
2017-09-26 15:17:31 +00:00
|
|
|
}
|
2014-10-15 01:22:06 +00:00
|
|
|
|
2018-05-10 15:56:38 +00:00
|
|
|
zkutil::AsyncResponses<zkutil::ExistsResponse> exists_futures;
|
2017-04-01 07:20:54 +00:00
|
|
|
for (const String & block : blocks)
|
|
|
|
{
|
2017-11-15 16:32:47 +00:00
|
|
|
auto it = cached_block_stats.find(block);
|
|
|
|
if (it == cached_block_stats.end())
|
2017-08-06 21:40:38 +00:00
|
|
|
{
|
2017-11-15 16:32:47 +00:00
|
|
|
/// New block. Fetch its stat asynchronously.
|
|
|
|
exists_futures.emplace_back(block, zookeeper.asyncExists(storage.zookeeper_path + "/blocks/" + block));
|
2017-08-06 21:40:38 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Cached block
|
2017-08-09 21:09:44 +00:00
|
|
|
timed_blocks.emplace_back(block, it->second);
|
2017-08-06 21:40:38 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2014-10-15 01:22:06 +00:00
|
|
|
|
2017-08-09 21:09:44 +00:00
|
|
|
/// Put fetched stats into the cache
|
|
|
|
for (auto & elem : exists_futures)
|
|
|
|
{
|
2018-03-24 20:00:16 +00:00
|
|
|
auto status = elem.second.get();
|
|
|
|
if (status.error != ZooKeeperImpl::ZooKeeper::ZNONODE)
|
2017-11-15 16:32:47 +00:00
|
|
|
{
|
|
|
|
cached_block_stats.emplace(elem.first, status.stat.ctime);
|
|
|
|
timed_blocks.emplace_back(elem.first, status.stat.ctime);
|
|
|
|
}
|
2017-08-09 21:09:44 +00:00
|
|
|
}
|
2017-07-24 20:12:59 +00:00
|
|
|
|
2017-08-09 21:09:44 +00:00
|
|
|
std::sort(timed_blocks.begin(), timed_blocks.end(), NodeWithStat::greaterByTime);
|
|
|
|
}
|
2017-07-24 20:12:59 +00:00
|
|
|
|
2018-07-31 11:36:08 +00:00
|
|
|
|
|
|
|
void ReplicatedMergeTreeCleanupThread::clearOldMutations()
|
|
|
|
{
|
|
|
|
if (!storage.data.settings.finished_mutations_to_keep)
|
|
|
|
return;
|
|
|
|
|
|
|
|
if (storage.queue.countFinishedMutations() <= storage.data.settings.finished_mutations_to_keep)
|
|
|
|
{
|
|
|
|
/// Not strictly necessary, but helps to avoid unnecessary ZooKeeper requests.
|
|
|
|
/// If even this replica hasn't finished enough mutations yet, then we don't need to clean anything.
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
auto zookeeper = storage.getZooKeeper();
|
|
|
|
|
|
|
|
zkutil::Stat replicas_stat;
|
|
|
|
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &replicas_stat);
|
|
|
|
|
|
|
|
UInt64 min_pointer = std::numeric_limits<UInt64>::max();
|
|
|
|
for (const String & replica : replicas)
|
|
|
|
{
|
|
|
|
String pointer;
|
|
|
|
zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/mutation_pointer", pointer);
|
|
|
|
if (pointer.empty())
|
|
|
|
return; /// One replica hasn't done anything yet so we can't delete any mutations.
|
|
|
|
min_pointer = std::min(parse<UInt64>(pointer), min_pointer);
|
|
|
|
}
|
|
|
|
|
|
|
|
Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/mutations");
|
|
|
|
std::sort(entries.begin(), entries.end());
|
|
|
|
|
|
|
|
/// Do not remove entries that are greater than `min_pointer` (they are not done yet).
|
|
|
|
entries.erase(std::upper_bound(entries.begin(), entries.end(), padIndex(min_pointer)), entries.end());
|
|
|
|
/// Do not remove last `storage.data.settings.finished_mutations_to_keep` entries.
|
|
|
|
if (entries.size() <= storage.data.settings.finished_mutations_to_keep)
|
|
|
|
return;
|
|
|
|
entries.erase(entries.end() - storage.data.settings.finished_mutations_to_keep, entries.end());
|
|
|
|
|
|
|
|
if (entries.empty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
zkutil::Requests ops;
|
|
|
|
size_t batch_start_i = 0;
|
|
|
|
for (size_t i = 0; i < entries.size(); ++i)
|
|
|
|
{
|
|
|
|
ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/mutations/" + entries[i], -1));
|
|
|
|
|
|
|
|
if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
|
|
|
|
{
|
|
|
|
/// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
|
|
|
|
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", replicas_stat.version));
|
|
|
|
zookeeper->multi(ops);
|
|
|
|
LOG_DEBUG(log, "Removed " << (i + 1 - batch_start_i) << " old mutation entries: " << entries[batch_start_i] << " - " << entries[i]);
|
|
|
|
batch_start_i = i + 1;
|
|
|
|
ops.clear();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-10-15 01:22:06 +00:00
|
|
|
}
|