dbms: little better [#METR-13153].

This commit is contained in:
Alexey Milovidov 2014-10-15 05:22:06 +04:00
parent 8d0bdffdff
commit 3d7639e5f1
6 changed files with 496 additions and 391 deletions

View File

@ -0,0 +1,46 @@
#pragma once
#include <Yandex/logger_useful.h>
#include <thread>
namespace DB
{
class StorageReplicatedMergeTree;
/** Удаляет устаревшие данные таблицы типа ReplicatedMergeTree.
*/
class ReplicatedMergeTreeCleanupThread
{
public:
ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeCleanupThread()
{
if (thread.joinable())
thread.join();
}
private:
StorageReplicatedMergeTree & storage;
std::thread thread;
Logger * log;
void run();
void iterate();
/// Удалить старые куски с диска и из ZooKeeper.
void clearOldParts();
/// Удалить из ZooKeeper старые записи в логе.
void clearOldLogs();
/// Удалить из ZooKeeper старые хеши блоков. Это делает ведущая реплика.
void clearOldBlocks();
};
}

View File

@ -0,0 +1,78 @@
#pragma once
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/Types.h>
#include <mutex>
#include <condition_variable>
namespace DB
{
class ReadBuffer;
class WriteBuffer;
class StorageReplicatedMergeTree;
/// Добавляет кусок в множество future_parts; в деструкторе убирает.
struct FuturePartTagger
{
String part;
StorageReplicatedMergeTree & storage;
FuturePartTagger(const String & part_, StorageReplicatedMergeTree & storage_);
~FuturePartTagger();
};
typedef Poco::SharedPtr<FuturePartTagger> FuturePartTaggerPtr;
/// Запись о том, что нужно сделать.
struct ReplicatedMergeTreeLogEntry
{
typedef Poco::SharedPtr<ReplicatedMergeTreeLogEntry> Ptr;
enum Type
{
GET_PART, /// Получить кусок с другой реплики.
MERGE_PARTS, /// Слить куски.
DROP_RANGE, /// Удалить куски в указанном месяце в указанном диапазоне номеров.
ATTACH_PART, /// Перенести кусок из директории detached или unreplicated.
};
String znode_name;
Type type;
String source_replica; /// Пустая строка значит, что эта запись была добавлена сразу в очередь, а не скопирована из лога.
/// Имя куска, получающегося в результате.
/// Для DROP_RANGE имя несуществующего куска. Нужно удалить все куски, покрытые им.
String new_part_name;
Strings parts_to_merge;
/// Для DROP_RANGE, true значит, что куски нужно не удалить, а перенести в директорию detached.
bool detach = false;
/// Для ATTACH_PART имя куска в директории detached или unreplicated.
String source_part_name;
/// Нужно переносить из директории unreplicated, а не detached.
bool attach_unreplicated;
FuturePartTaggerPtr future_part_tagger;
bool currently_executing = false; /// Доступ под queue_mutex.
std::condition_variable execution_complete; /// Пробуждается когда currently_executing становится false.
void addResultToVirtualParts(StorageReplicatedMergeTree & storage);
void tagPartAsFuture(StorageReplicatedMergeTree & storage);
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
String toString() const;
static Ptr parse(const String & s);
};
}

View File

@ -5,7 +5,9 @@
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
#include <DB/Storages/MergeTree/MergeTreeDataWriter.h>
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include "MergeTree/AbandonableLockInZooKeeper.h"
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <zkutil/ZooKeeper.h>
@ -123,107 +125,11 @@ public:
private:
friend class ReplicatedMergeTreeBlockOutputStream;
friend class ReplicatedMergeTreeCleanupThread;
friend class ReplicatedMergeTreeLogEntry;
friend class FuturePartTagger;
/// Добавляет кусок в множество future_parts.
struct FuturePartTagger
{
String part;
StorageReplicatedMergeTree & storage;
FuturePartTagger(const String & part_, StorageReplicatedMergeTree & storage_)
: part(part_), storage(storage_)
{
if (!storage.future_parts.insert(part).second)
throw Exception("Tagging already tagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
~FuturePartTagger()
{
try
{
std::unique_lock<std::mutex> lock(storage.queue_mutex);
if (!storage.future_parts.erase(part))
throw Exception("Untagging already untagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
};
typedef Poco::SharedPtr<FuturePartTagger> FuturePartTaggerPtr;
struct LogEntry
{
typedef Poco::SharedPtr<LogEntry> Ptr;
enum Type
{
GET_PART, /// Получить кусок с другой реплики.
MERGE_PARTS, /// Слить куски.
DROP_RANGE, /// Удалить куски в указанном месяце в указанном диапазоне номеров.
ATTACH_PART, /// Перенести кусок из директории detached или unreplicated.
};
String znode_name;
Type type;
String source_replica; /// Пустая строка значит, что эта запись была добавлена сразу в очередь, а не скопирована из лога.
/// Имя куска, получающегося в результате.
/// Для DROP_RANGE имя несуществующего куска. Нужно удалить все куски, покрытые им.
String new_part_name;
Strings parts_to_merge;
/// Для DROP_RANGE, true значит, что куски нужно не удалить, а перенести в директорию detached.
bool detach = false;
/// Для ATTACH_PART имя куска в директории detached или unreplicated.
String source_part_name;
/// Нужно переносить из директории unreplicated, а не detached.
bool attach_unreplicated;
FuturePartTaggerPtr future_part_tagger;
bool currently_executing = false; /// Доступ под queue_mutex.
std::condition_variable execution_complete; /// Пробуждается когда currently_executing становится false.
void addResultToVirtualParts(StorageReplicatedMergeTree & storage)
{
if (type == MERGE_PARTS || type == GET_PART || type == DROP_RANGE || type == ATTACH_PART)
storage.virtual_parts.add(new_part_name);
}
void tagPartAsFuture(StorageReplicatedMergeTree & storage)
{
if (type == MERGE_PARTS || type == GET_PART || type == ATTACH_PART)
future_part_tagger = new FuturePartTagger(new_part_name, storage);
}
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
String toString() const
{
String s;
{
WriteBufferFromString out(s);
writeText(out);
}
return s;
}
static Ptr parse(const String & s)
{
ReadBufferFromString in(s);
Ptr res = new LogEntry;
res->readText(in);
assertEOF(in);
return res;
}
};
typedef ReplicatedMergeTreeLogEntry LogEntry;
typedef LogEntry::Ptr LogEntryPtr;
typedef std::list<LogEntryPtr> LogEntries;
@ -315,7 +221,7 @@ private:
std::mutex merge_selecting_mutex; /// Берется на каждую итерацию выбора кусков для слияния.
/// Поток, удаляющий старые куски, записи в логе и блоки.
std::thread cleanup_thread;
std::unique_ptr<ReplicatedMergeTreeCleanupThread> cleanup_thread;
/// Поток, обрабатывающий переподключение к ZooKeeper при истечении сессии (очень маловероятное событие).
std::thread restarting_thread;
@ -402,14 +308,6 @@ private:
/// Убирает кусок из ZooKeeper и добавляет в очередь задание скачать его. Предполагается это делать с битыми кусками.
void removePartAndEnqueueFetch(const String & part_name);
void clearOldParts();
/// Удалить из ZooKeeper старые записи в логе.
void clearOldLogs();
/// Удалить из ZooKeeper старые хеши блоков. Это делает ведущая реплика.
void clearOldBlocks();
/// Выполнение заданий из очереди.
/** Кладет в queue записи из ZooKeeper (/replicas/me/queue/).
@ -450,10 +348,6 @@ private:
*/
void mergeSelectingThread();
/** Удаляет устаревшие данные.
*/
void cleanupThread();
/** Делает локальный ALTER, когда список столбцов в ZooKeeper меняется.
*/
void alterThread();
@ -483,6 +377,14 @@ private:
* Если одновременно с этим добавляются реплики, может не дождаться добавленную реплику.
*/
void waitForAllReplicasToProcessLogEntry(const LogEntry & entry);
/// Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper.
static String padIndex(UInt64 index)
{
String index_str = toString(index);
return std::string(10 - index_str.size(), '0') + index_str;
}
};
}

View File

@ -0,0 +1,191 @@
#include <DB/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
namespace DB
{
ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_)
: storage(storage_),
thread([this] { run(); }),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)")) {}
void ReplicatedMergeTreeCleanupThread::run()
{
const auto CLEANUP_SLEEP_MS = 30 * 1000;
while (!storage.shutdown_called)
{
try
{
iterate();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
storage.shutdown_event.tryWait(CLEANUP_SLEEP_MS);
}
LOG_DEBUG(log, "Cleanup thread finished");
}
void ReplicatedMergeTreeCleanupThread::iterate()
{
clearOldParts();
if (storage.unreplicated_data)
storage.unreplicated_data->clearOldParts();
if (storage.is_leader_node)
{
clearOldLogs();
clearOldBlocks();
}
}
void ReplicatedMergeTreeCleanupThread::clearOldParts()
{
auto table_lock = storage.lockStructure(false);
MergeTreeData::DataPartsVector parts = storage.data.grabOldParts();
size_t count = parts.size();
if (!count)
{
LOG_TRACE(log, "No old parts");
return;
}
try
{
while (!parts.empty())
{
MergeTreeData::DataPartPtr & part = parts.back();
LOG_DEBUG(log, "Removing " << part->name);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(storage.replica_path + "/parts/" + part->name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(storage.replica_path + "/parts/" + part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(storage.replica_path + "/parts/" + part->name, -1));
auto code = storage.zookeeper->tryMulti(ops);
if (code != ZOK)
LOG_WARNING(log, "Couldn't remove " << part->name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(code));
part->remove();
parts.pop_back();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
storage.data.addOldParts(parts);
throw;
}
LOG_DEBUG(log, "Removed " << count << " old parts");
}
void ReplicatedMergeTreeCleanupThread::clearOldLogs()
{
zkutil::Stat stat;
if (!storage.zookeeper->exists(storage.zookeeper_path + "/log", &stat))
throw Exception(storage.zookeeper_path + "/log doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int children_count = stat.numChildren;
/// Будем ждать, пока накопятся в 1.1 раза больше записей, чем нужно.
if (static_cast<double>(children_count) < storage.data.settings.replicated_logs_to_keep * 1.1)
return;
Strings replicas = storage.zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
UInt64 min_pointer = std::numeric_limits<UInt64>::max();
for (const String & replica : replicas)
{
String pointer = storage.zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
if (pointer.empty())
return;
min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
}
Strings entries = storage.zookeeper->getChildren(storage.zookeeper_path + "/log");
std::sort(entries.begin(), entries.end());
/// Не будем трогать последние replicated_logs_to_keep записей.
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep), entries.end());
/// Не будем трогать записи, не меньшие min_pointer.
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + storage.padIndex(min_pointer)), entries.end());
if (entries.empty())
return;
zkutil::Ops ops;
for (size_t i = 0; i < entries.size(); ++i)
{
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/log/" + entries[i], -1));
if (ops.size() > 400 || i + 1 == entries.size())
{
/// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик.
ops.push_back(new zkutil::Op::Check(storage.zookeeper_path + "/replicas", stat.version));
storage.zookeeper->multi(ops);
ops.clear();
}
}
LOG_DEBUG(log, "Removed " << entries.size() << " old log entries: " << entries.front() << " - " << entries.back());
}
void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
{
zkutil::Stat stat;
if (!storage.zookeeper->exists(storage.zookeeper_path + "/blocks", &stat))
throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int children_count = stat.numChildren;
/// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно.
if (static_cast<double>(children_count) < storage.data.settings.replicated_deduplication_window * 1.1)
return;
LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - storage.data.settings.replicated_deduplication_window
<< " old blocks from ZooKeeper. This might take several minutes.");
Strings blocks = storage.zookeeper->getChildren(storage.zookeeper_path + "/blocks");
std::vector<std::pair<Int64, String> > timed_blocks;
for (const String & block : blocks)
{
zkutil::Stat stat;
storage.zookeeper->exists(storage.zookeeper_path + "/blocks/" + block, &stat);
timed_blocks.push_back(std::make_pair(stat.czxid, block));
}
zkutil::Ops ops;
std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>());
for (size_t i = storage.data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i)
{
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
if (ops.size() > 400 || i + 1 == timed_blocks.size())
{
storage.zookeeper->multi(ops);
ops.clear();
}
}
LOG_TRACE(log, "Cleared " << blocks.size() - storage.data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
}
}

View File

@ -0,0 +1,160 @@
#include <DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
namespace DB
{
FuturePartTagger::FuturePartTagger(const String & part_, StorageReplicatedMergeTree & storage_)
: part(part_), storage(storage_)
{
if (!storage.future_parts.insert(part).second)
throw Exception("Tagging already tagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
FuturePartTagger::~FuturePartTagger()
{
try
{
std::unique_lock<std::mutex> lock(storage.queue_mutex);
if (!storage.future_parts.erase(part))
throw Exception("Untagging already untagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void ReplicatedMergeTreeLogEntry::addResultToVirtualParts(StorageReplicatedMergeTree & storage)
{
if (type == MERGE_PARTS || type == GET_PART || type == DROP_RANGE || type == ATTACH_PART)
storage.virtual_parts.add(new_part_name);
}
void ReplicatedMergeTreeLogEntry::tagPartAsFuture(StorageReplicatedMergeTree & storage)
{
if (type == MERGE_PARTS || type == GET_PART || type == ATTACH_PART)
future_part_tagger = new FuturePartTagger(new_part_name, storage);
}
void ReplicatedMergeTreeLogEntry::writeText(WriteBuffer & out) const
{
writeString("format version: 1\n", out);
writeString("source replica: ", out);
writeString(source_replica, out);
writeString("\n", out);
switch (type)
{
case GET_PART:
writeString("get\n", out);
writeString(new_part_name, out);
break;
case MERGE_PARTS:
writeString("merge\n", out);
for (const String & s : parts_to_merge)
{
writeString(s, out);
writeString("\n", out);
}
writeString("into\n", out);
writeString(new_part_name, out);
break;
case DROP_RANGE:
if (detach)
writeString("detach\n", out);
else
writeString("drop\n", out);
writeString(new_part_name, out);
break;
case ATTACH_PART:
writeString("attach\n", out);
if (attach_unreplicated)
writeString("unreplicated\n", out);
else
writeString("detached\n", out);
writeString(source_part_name, out);
writeString("\ninto\n", out);
writeString(new_part_name, out);
break;
}
writeString("\n", out);
}
void ReplicatedMergeTreeLogEntry::readText(ReadBuffer & in)
{
String type_str;
assertString("format version: 1\n", in);
assertString("source replica: ", in);
readString(source_replica, in);
assertString("\n", in);
readString(type_str, in);
assertString("\n", in);
if (type_str == "get")
{
type = GET_PART;
readString(new_part_name, in);
}
else if (type_str == "merge")
{
type = MERGE_PARTS;
while (true)
{
String s;
readString(s, in);
assertString("\n", in);
if (s == "into")
break;
parts_to_merge.push_back(s);
}
readString(new_part_name, in);
}
else if (type_str == "drop" || type_str == "detach")
{
type = DROP_RANGE;
detach = type_str == "detach";
readString(new_part_name, in);
}
else if (type_str == "attach")
{
type = ATTACH_PART;
String source_type;
readString(source_type, in);
if (source_type == "unreplicated")
attach_unreplicated = true;
else if (source_type == "detached")
attach_unreplicated = false;
else
throw Exception("Bad format: expected 'unreplicated' or 'detached', found '" + source_type + "'", ErrorCodes::CANNOT_PARSE_TEXT);
assertString("\n", in);
readString(source_part_name, in);
assertString("\ninto\n", in);
readString(new_part_name, in);
}
assertString("\n", in);
}
String ReplicatedMergeTreeLogEntry::toString() const
{
String s;
{
WriteBufferFromString out(s);
writeText(out);
}
return s;
}
ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String & s)
{
ReadBufferFromString in(s);
Ptr res = new ReplicatedMergeTreeLogEntry;
res->readText(in);
assertEOF(in);
return res;
}
}

View File

@ -16,19 +16,9 @@ namespace DB
const auto ERROR_SLEEP_MS = 1000;
const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
const auto CLEANUP_SLEEP_MS = 30 * 1000;
const auto RESERVED_BLOCK_NUMBERS = 200;
/// Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper.
static String padIndex(UInt64 index)
{
String index_str = toString(index);
while (index_str.size() < 10)
index_str = '0' + index_str;
return index_str;
}
/// Используется для проверки, выставили ли ноду is_active мы, или нет.
static String generateActiveNodeIdentifier()
@ -60,10 +50,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
zookeeper_path(context.getMacros().expand(zookeeper_path_)),
replica_name(context.getMacros().expand(replica_name_)),
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true,
index_granularity_, mode_, sign_column_, settings_, database_name + "." + table_name, true,
std::bind(&StorageReplicatedMergeTree::enqueuePartForCheck, this, std::placeholders::_1)),
reader(data), writer(data), merger(data), fetcher(data),
log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)")),
shutdown_event(false)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
@ -623,143 +613,6 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData:
zkutil::CreateMode::Persistent));
}
void StorageReplicatedMergeTree::clearOldParts()
{
auto table_lock = lockStructure(false);
MergeTreeData::DataPartsVector parts = data.grabOldParts();
size_t count = parts.size();
if (!count)
{
LOG_TRACE(log, "No old parts");
return;
}
try
{
while (!parts.empty())
{
MergeTreeData::DataPartPtr & part = parts.back();
LOG_DEBUG(log, "Removing " << part->name);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name, -1));
auto code = zookeeper->tryMulti(ops);
if (code != ZOK)
LOG_WARNING(log, "Couldn't remove " << part->name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(code));
part->remove();
parts.pop_back();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
data.addOldParts(parts);
throw;
}
LOG_DEBUG(log, "Removed " << count << " old parts");
}
void StorageReplicatedMergeTree::clearOldLogs()
{
zkutil::Stat stat;
if (!zookeeper->exists(zookeeper_path + "/log", &stat))
throw Exception(zookeeper_path + "/log doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int children_count = stat.numChildren;
/// Будем ждать, пока накопятся в 1.1 раза больше записей, чем нужно.
if (static_cast<double>(children_count) < data.settings.replicated_logs_to_keep * 1.1)
return;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
UInt64 min_pointer = std::numeric_limits<UInt64>::max();
for (const String & replica : replicas)
{
String pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer");
if (pointer.empty())
return;
min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
}
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
std::sort(entries.begin(), entries.end());
/// Не будем трогать последние replicated_logs_to_keep записей.
entries.erase(entries.end() - std::min(entries.size(), data.settings.replicated_logs_to_keep), entries.end());
/// Не будем трогать записи, не меньшие min_pointer.
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_pointer)), entries.end());
if (entries.empty())
return;
zkutil::Ops ops;
for (size_t i = 0; i < entries.size(); ++i)
{
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/log/" + entries[i], -1));
if (ops.size() > 400 || i + 1 == entries.size())
{
/// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик.
ops.push_back(new zkutil::Op::Check(zookeeper_path + "/replicas", stat.version));
zookeeper->multi(ops);
ops.clear();
}
}
LOG_DEBUG(log, "Removed " << entries.size() << " old log entries: " << entries.front() << " - " << entries.back());
}
void StorageReplicatedMergeTree::clearOldBlocks()
{
zkutil::Stat stat;
if (!zookeeper->exists(zookeeper_path + "/blocks", &stat))
throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int children_count = stat.numChildren;
/// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно.
if (static_cast<double>(children_count) < data.settings.replicated_deduplication_window * 1.1)
return;
LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - data.settings.replicated_deduplication_window
<< " old blocks from ZooKeeper. This might take several minutes.");
Strings blocks = zookeeper->getChildren(zookeeper_path + "/blocks");
std::vector<std::pair<Int64, String> > timed_blocks;
for (const String & block : blocks)
{
zkutil::Stat stat;
zookeeper->exists(zookeeper_path + "/blocks/" + block, &stat);
timed_blocks.push_back(std::make_pair(stat.czxid, block));
}
zkutil::Ops ops;
std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>());
for (size_t i = data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i)
{
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
if (ops.size() > 400 || i + 1 == timed_blocks.size())
{
zookeeper->multi(ops);
ops.clear();
}
}
LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
}
void StorageReplicatedMergeTree::loadQueue()
{
std::unique_lock<std::mutex> lock(queue_mutex);
@ -1504,33 +1357,6 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
LOG_DEBUG(log, "Merge selecting thread finished");
}
void StorageReplicatedMergeTree::cleanupThread()
{
while (!shutdown_called)
{
try
{
clearOldParts();
if (unreplicated_data)
unreplicated_data->clearOldParts();
if (is_leader_node)
{
clearOldLogs();
clearOldBlocks();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
shutdown_event.tryWait(CLEANUP_SLEEP_MS);
}
LOG_DEBUG(log, "Cleanup thread finished");
}
void StorageReplicatedMergeTree::alterThread()
{
@ -2046,8 +1872,9 @@ void StorageReplicatedMergeTree::partialShutdown()
}
if (queue_updating_thread.joinable())
queue_updating_thread.join();
if (cleanup_thread.joinable())
cleanup_thread.join();
cleanup_thread.reset();
if (alter_thread.joinable())
alter_thread.join();
if (part_check_thread.joinable())
@ -2089,7 +1916,7 @@ bool StorageReplicatedMergeTree::tryStartup()
unreplicated_merger->uncancelAll();
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
cleanup_thread = std::thread(&StorageReplicatedMergeTree::cleanupThread, this);
cleanup_thread.reset(new ReplicatedMergeTreeCleanupThread(*this));
alter_thread = std::thread(&StorageReplicatedMergeTree::alterThread, this);
part_check_thread = std::thread(&StorageReplicatedMergeTree::partCheckThread, this);
queue_task_handle = context.getBackgroundPool().addTask(
@ -2673,105 +2500,6 @@ void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const LogEn
}
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
writeString("format version: 1\n", out);
writeString("source replica: ", out);
writeString(source_replica, out);
writeString("\n", out);
switch (type)
{
case GET_PART:
writeString("get\n", out);
writeString(new_part_name, out);
break;
case MERGE_PARTS:
writeString("merge\n", out);
for (const String & s : parts_to_merge)
{
writeString(s, out);
writeString("\n", out);
}
writeString("into\n", out);
writeString(new_part_name, out);
break;
case DROP_RANGE:
if (detach)
writeString("detach\n", out);
else
writeString("drop\n", out);
writeString(new_part_name, out);
break;
case ATTACH_PART:
writeString("attach\n", out);
if (attach_unreplicated)
writeString("unreplicated\n", out);
else
writeString("detached\n", out);
writeString(source_part_name, out);
writeString("\ninto\n", out);
writeString(new_part_name, out);
break;
}
writeString("\n", out);
}
void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
{
String type_str;
assertString("format version: 1\n", in);
assertString("source replica: ", in);
readString(source_replica, in);
assertString("\n", in);
readString(type_str, in);
assertString("\n", in);
if (type_str == "get")
{
type = GET_PART;
readString(new_part_name, in);
}
else if (type_str == "merge")
{
type = MERGE_PARTS;
while (true)
{
String s;
readString(s, in);
assertString("\n", in);
if (s == "into")
break;
parts_to_merge.push_back(s);
}
readString(new_part_name, in);
}
else if (type_str == "drop" || type_str == "detach")
{
type = DROP_RANGE;
detach = type_str == "detach";
readString(new_part_name, in);
}
else if (type_str == "attach")
{
type = ATTACH_PART;
String source_type;
readString(source_type, in);
if (source_type == "unreplicated")
attach_unreplicated = true;
else if (source_type == "detached")
attach_unreplicated = false;
else
throw Exception("Bad format: expected 'unreplicated' or 'detached', found '" + source_type + "'", ErrorCodes::CANNOT_PARSE_TEXT);
assertString("\n", in);
readString(source_part_name, in);
assertString("\ninto\n", in);
readString(new_part_name, in);
}
assertString("\n", in);
}
void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{
res.is_leader = is_leader_node;