2014-10-15 01:22:06 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
|
|
|
|
|
#include <DB/Storages/StorageReplicatedMergeTree.h>
|
2015-09-24 18:54:21 +00:00
|
|
|
|
#include <DB/Common/setThreadName.h>
|
2014-10-15 01:22:06 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_)
|
|
|
|
|
: storage(storage_),
|
2014-10-15 13:38:09 +00:00
|
|
|
|
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)")),
|
|
|
|
|
thread([this] { run(); }) {}
|
2014-10-15 01:22:06 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeCleanupThread::run()
|
|
|
|
|
{
|
2015-09-24 18:54:21 +00:00
|
|
|
|
setThreadName("ReplMTCleanup");
|
|
|
|
|
|
2014-10-15 01:22:06 +00:00
|
|
|
|
const auto CLEANUP_SLEEP_MS = 30 * 1000;
|
|
|
|
|
|
|
|
|
|
while (!storage.shutdown_called)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
iterate();
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
storage.shutdown_event.tryWait(CLEANUP_SLEEP_MS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Cleanup thread finished");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeCleanupThread::iterate()
|
|
|
|
|
{
|
|
|
|
|
clearOldParts();
|
|
|
|
|
|
|
|
|
|
if (storage.unreplicated_data)
|
|
|
|
|
storage.unreplicated_data->clearOldParts();
|
|
|
|
|
|
|
|
|
|
if (storage.is_leader_node)
|
|
|
|
|
{
|
|
|
|
|
clearOldLogs();
|
|
|
|
|
clearOldBlocks();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeCleanupThread::clearOldParts()
|
|
|
|
|
{
|
|
|
|
|
auto table_lock = storage.lockStructure(false);
|
2014-12-12 20:50:32 +00:00
|
|
|
|
auto zookeeper = storage.getZooKeeper();
|
2014-10-15 01:22:06 +00:00
|
|
|
|
|
|
|
|
|
MergeTreeData::DataPartsVector parts = storage.data.grabOldParts();
|
|
|
|
|
size_t count = parts.size();
|
|
|
|
|
|
|
|
|
|
if (!count)
|
|
|
|
|
{
|
|
|
|
|
LOG_TRACE(log, "No old parts");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
while (!parts.empty())
|
|
|
|
|
{
|
|
|
|
|
MergeTreeData::DataPartPtr & part = parts.back();
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Removing " << part->name);
|
|
|
|
|
|
|
|
|
|
zkutil::Ops ops;
|
2015-09-20 11:02:59 +00:00
|
|
|
|
storage.removePartFromZooKeeper(part->name, ops);
|
2014-12-12 20:50:32 +00:00
|
|
|
|
auto code = zookeeper->tryMulti(ops);
|
2014-10-15 01:22:06 +00:00
|
|
|
|
if (code != ZOK)
|
|
|
|
|
LOG_WARNING(log, "Couldn't remove " << part->name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(code));
|
|
|
|
|
|
|
|
|
|
part->remove();
|
|
|
|
|
parts.pop_back();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
|
storage.data.addOldParts(parts);
|
|
|
|
|
throw;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Removed " << count << " old parts");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
|
|
|
|
{
|
2014-12-12 20:50:32 +00:00
|
|
|
|
auto zookeeper = storage.getZooKeeper();
|
|
|
|
|
|
2014-10-15 01:22:06 +00:00
|
|
|
|
zkutil::Stat stat;
|
2014-12-12 20:50:32 +00:00
|
|
|
|
if (!zookeeper->exists(storage.zookeeper_path + "/log", &stat))
|
2014-10-15 01:22:06 +00:00
|
|
|
|
throw Exception(storage.zookeeper_path + "/log doesn't exist", ErrorCodes::NOT_FOUND_NODE);
|
|
|
|
|
|
|
|
|
|
int children_count = stat.numChildren;
|
|
|
|
|
|
|
|
|
|
/// Будем ждать, пока накопятся в 1.1 раза больше записей, чем нужно.
|
|
|
|
|
if (static_cast<double>(children_count) < storage.data.settings.replicated_logs_to_keep * 1.1)
|
|
|
|
|
return;
|
|
|
|
|
|
2014-12-12 20:50:32 +00:00
|
|
|
|
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
|
2014-10-15 01:22:06 +00:00
|
|
|
|
UInt64 min_pointer = std::numeric_limits<UInt64>::max();
|
|
|
|
|
for (const String & replica : replicas)
|
|
|
|
|
{
|
2014-12-12 20:50:32 +00:00
|
|
|
|
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
|
2014-10-15 01:22:06 +00:00
|
|
|
|
if (pointer.empty())
|
|
|
|
|
return;
|
|
|
|
|
min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
|
|
|
|
|
}
|
|
|
|
|
|
2014-12-12 20:50:32 +00:00
|
|
|
|
Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log");
|
2014-10-15 01:22:06 +00:00
|
|
|
|
std::sort(entries.begin(), entries.end());
|
|
|
|
|
|
|
|
|
|
/// Не будем трогать последние replicated_logs_to_keep записей.
|
|
|
|
|
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep), entries.end());
|
|
|
|
|
/// Не будем трогать записи, не меньшие min_pointer.
|
|
|
|
|
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + storage.padIndex(min_pointer)), entries.end());
|
|
|
|
|
|
|
|
|
|
if (entries.empty())
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
zkutil::Ops ops;
|
|
|
|
|
for (size_t i = 0; i < entries.size(); ++i)
|
|
|
|
|
{
|
|
|
|
|
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/log/" + entries[i], -1));
|
|
|
|
|
|
|
|
|
|
if (ops.size() > 400 || i + 1 == entries.size())
|
|
|
|
|
{
|
|
|
|
|
/// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик.
|
|
|
|
|
ops.push_back(new zkutil::Op::Check(storage.zookeeper_path + "/replicas", stat.version));
|
2014-12-12 20:50:32 +00:00
|
|
|
|
zookeeper->multi(ops);
|
2014-10-15 01:22:06 +00:00
|
|
|
|
ops.clear();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Removed " << entries.size() << " old log entries: " << entries.front() << " - " << entries.back());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
|
|
|
|
|
{
|
2014-12-12 20:50:32 +00:00
|
|
|
|
auto zookeeper = storage.getZooKeeper();
|
|
|
|
|
|
2014-10-15 01:22:06 +00:00
|
|
|
|
zkutil::Stat stat;
|
2014-12-12 20:50:32 +00:00
|
|
|
|
if (!zookeeper->exists(storage.zookeeper_path + "/blocks", &stat))
|
2014-10-15 01:22:06 +00:00
|
|
|
|
throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
|
|
|
|
|
|
|
|
|
|
int children_count = stat.numChildren;
|
|
|
|
|
|
|
|
|
|
/// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно.
|
|
|
|
|
if (static_cast<double>(children_count) < storage.data.settings.replicated_deduplication_window * 1.1)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - storage.data.settings.replicated_deduplication_window
|
|
|
|
|
<< " old blocks from ZooKeeper. This might take several minutes.");
|
|
|
|
|
|
2014-12-12 20:50:32 +00:00
|
|
|
|
Strings blocks = zookeeper->getChildren(storage.zookeeper_path + "/blocks");
|
2014-10-15 01:22:06 +00:00
|
|
|
|
|
|
|
|
|
std::vector<std::pair<Int64, String> > timed_blocks;
|
|
|
|
|
|
|
|
|
|
for (const String & block : blocks)
|
|
|
|
|
{
|
|
|
|
|
zkutil::Stat stat;
|
2014-12-12 20:50:32 +00:00
|
|
|
|
zookeeper->exists(storage.zookeeper_path + "/blocks/" + block, &stat);
|
2014-10-15 01:22:06 +00:00
|
|
|
|
timed_blocks.push_back(std::make_pair(stat.czxid, block));
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-25 07:51:00 +00:00
|
|
|
|
//zkutil::Ops ops;
|
2014-10-15 01:22:06 +00:00
|
|
|
|
std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>());
|
2015-09-25 07:51:00 +00:00
|
|
|
|
for (size_t i = storage.data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i)
|
2014-10-15 01:22:06 +00:00
|
|
|
|
{
|
2015-09-24 05:47:17 +00:00
|
|
|
|
/// Устаревшие ноды. Этот код можно будет убрать через пол года.
|
|
|
|
|
zookeeper->tryRemove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns");
|
|
|
|
|
zookeeper->tryRemove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums");
|
2015-09-25 07:51:00 +00:00
|
|
|
|
zookeeper->tryRemove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksum");
|
|
|
|
|
zookeeper->tryRemove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number");
|
|
|
|
|
zookeeper->tryRemove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second);
|
2015-09-24 05:47:17 +00:00
|
|
|
|
|
2015-09-25 07:51:00 +00:00
|
|
|
|
/*ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
|
2015-09-24 05:47:17 +00:00
|
|
|
|
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksum", -1));
|
2014-10-15 01:22:06 +00:00
|
|
|
|
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
|
|
|
|
|
|
|
|
|
|
if (ops.size() > 400 || i + 1 == timed_blocks.size())
|
|
|
|
|
{
|
2014-12-12 20:50:32 +00:00
|
|
|
|
zookeeper->multi(ops);
|
2014-10-15 01:22:06 +00:00
|
|
|
|
ops.clear();
|
2015-09-25 07:51:00 +00:00
|
|
|
|
}*/
|
2014-10-15 01:22:06 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
LOG_TRACE(log, "Cleared " << blocks.size() - storage.data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|