dbms: better [#METR-19586].

This commit is contained in:
Alexey Milovidov 2016-01-10 07:43:30 +03:00
parent 47191fbb00
commit a5b8166541
7 changed files with 82 additions and 564 deletions

View File

@ -17,23 +17,7 @@ namespace DB
class ReadBuffer;
class WriteBuffer;
class StorageReplicatedMergeTree;
/** Добавляет кусок в множество future_parts; в деструкторе убирает.
* future_parts - множество кусков, которые будут созданы после выполнения
* выполняющихся в данный момент элементов очереди.
*/
struct FuturePartTagger
{
String part;
StorageReplicatedMergeTree & storage;
FuturePartTagger(const String & part_, StorageReplicatedMergeTree & storage_);
~FuturePartTagger();
};
typedef Poco::SharedPtr<FuturePartTagger> FuturePartTaggerPtr;
class ReplicatedMergeTreeQueue;
/// Запись о том, что нужно сделать. Только данные (их можно копировать).
@ -81,12 +65,12 @@ struct ReplicatedMergeTreeLogEntryData
/// Нужно переносить из директории unreplicated, а не detached.
bool attach_unreplicated = false;
/// Доступ под queue_mutex.
/// Доступ под queue_mutex, см. ReplicatedMergeTreeQueue.
bool currently_executing = false; /// Выполняется ли действие сейчас.
/// Эти несколько полей имеют лишь информационный характер (для просмотра пользователем с помощью системных таблиц).
/// Доступ под queue_mutex.
/// Доступ под queue_mutex, см. ReplicatedMergeTreeQueue.
size_t num_tries = 0; /// Количество попыток выполнить действие (с момента старта сервера; включая выполняющееся).
std::exception_ptr exception; /// Последний эксепшен, в случае безуспешной попытки выполнить действие.
std::exception_ptr exception; /// Последний эксепшен, в случае безуспешной попытки выполнить действие.
time_t last_attempt_time = 0; /// Время начала последней попытки выполнить действие.
size_t num_postponed = 0; /// Количество раз, когда действие было отложено.
String postpone_reason; /// Причина, по которой действие было отложено, если оно отложено.
@ -104,12 +88,8 @@ struct ReplicatedMergeTreeLogEntry : ReplicatedMergeTreeLogEntryData
{
typedef Poco::SharedPtr<ReplicatedMergeTreeLogEntry> Ptr;
FuturePartTaggerPtr future_part_tagger;
std::condition_variable execution_complete; /// Пробуждается когда currently_executing становится false.
void addResultToVirtualParts(StorageReplicatedMergeTree & storage);
void tagPartAsFuture(StorageReplicatedMergeTree & storage);
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);

View File

@ -6,6 +6,7 @@
#include <DB/Storages/MergeTree/MergeTreeDataWriter.h>
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
@ -153,23 +154,14 @@ public:
bool is_leader;
bool is_readonly;
bool is_session_expired;
UInt32 future_parts;
ReplicatedMergeTreeQueue::Status queue;
UInt32 parts_to_check;
String zookeeper_path;
String replica_name;
String replica_path;
Int32 columns_version;
UInt32 queue_size;
UInt32 inserts_in_queue;
UInt32 merges_in_queue;
UInt32 queue_oldest_time;
UInt32 inserts_oldest_time;
UInt32 merges_oldest_time;
String oldest_part_to_get;
String oldest_part_to_merge_to;
UInt64 log_max_index;
UInt64 log_pointer;
UInt32 last_queue_update;
UInt8 total_replicas;
UInt8 active_replicas;
};
@ -189,15 +181,12 @@ private:
friend class ReplicatedMergeTreeRestartingThread;
friend class ReplicatedMergeTreeCleanupThread;
friend struct ReplicatedMergeTreeLogEntry;
friend struct FuturePartTagger;
typedef ReplicatedMergeTreeLogEntry LogEntry;
typedef LogEntry::Ptr LogEntryPtr;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;
typedef std::list<LogEntryPtr> LogEntries;
typedef std::set<String> StringSet;
typedef std::list<String> StringList;
using StringSet = std::set<String>;
using StringList = std::list<String>;
Context & context;
@ -219,21 +208,6 @@ private:
/// Если true, таблица в офлайновом режиме, и в нее нельзя писать.
bool is_readonly = false;
/// Каким будет множество активных кусков после выполнения всей текущей очереди.
ActiveDataPartSet virtual_parts;
/** Очередь того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/).
* В ZK записи в хронологическом порядке. Здесь - не обязательно.
*/
LogEntries queue;
time_t last_queue_update = 0;
std::mutex queue_mutex;
/** Куски, которые появятся в результате действий, выполняемых прямо сейчас фоновыми потоками (этих действий нет в очереди).
* Использовать под залоченным queue_mutex.
*/
StringSet future_parts;
/** Куски, для которых нужно проверить одно из двух:
* - Если кусок у нас есть, сверить, его данные с его контрольными суммами, а их с ZooKeeper.
* - Если куска у нас нет, проверить, есть ли он (или покрывающий его кусок) хоть у кого-то.
@ -251,6 +225,11 @@ private:
String replica_name;
String replica_path;
/** Очередь того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/).
* В ZK записи в хронологическом порядке. Здесь - не обязательно.
*/
ReplicatedMergeTreeQueue queue;
/** /replicas/me/is_active.
*/
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
@ -360,10 +339,6 @@ private:
*/
void checkParts(bool skip_sanity_checks);
/// Положить все куски из data в virtual_parts.
void initVirtualParts();
/** Проверить, что чексумма куска совпадает с чексуммой того же куска на какой-нибудь другой реплике.
* Если ни у кого нет такого куска, ничего не проверяет.
* Не очень надежно: если две реплики добавляют кусок почти одновременно, ни одной проверки не произойдет.
@ -380,20 +355,11 @@ private:
/// Выполнение заданий из очереди.
/** Кладет в queue записи из ZooKeeper (/replicas/me/queue/).
*/
void loadQueue();
/** Копирует новые записи из логов всех реплик в очередь этой реплики.
* Если next_update_event != nullptr, вызовет это событие, когда в логе появятся новые записи.
*/
void pullLogsToQueue(zkutil::EventPtr next_update_event = nullptr);
/** Можно ли сейчас попробовать выполнить это действие. Если нет, нужно оставить его в очереди и попробовать выполнить другое.
* Вызывается под queue_mutex.
*/
bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason);
/** Выполнить действие из очереди. Бросает исключение, если что-то не так.
* Возвращает, получилось ли выполнить. Если не получилось, запись нужно положить в конец очереди.
*/
@ -455,16 +421,6 @@ private:
/** Дождаться, пока указанная реплика выполнит указанное действие из лога.
*/
void waitForReplicaToProcessLogEntry(const String & replica_name, const LogEntry & entry);
/** Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper.
* Поддерживаются также отрицательные числа - для них имя ноды выглядит несколько глупо
* и не соответствует никакой автоинкрементной ноде в ZK.
*/
static String padIndex(Int64 index)
{
String index_str = toString(index);
return std::string(10 - index_str.size(), '0') + index_str;
}
};
}

View File

@ -448,7 +448,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
* затем попадаем сюда.
* Ситуация - было заменено M > N кусков тоже нормальная.
*
* Хотя это должно предотвращаться проверкой в методе StorageReplicatedMergeTree::shouldExecuteLogEntry.
* Хотя это должно предотвращаться проверкой в методе ReplicatedMergeTreeQueue::shouldExecuteLogEntry.
*/
LOG_WARNING(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size()
<< " instead of " << parts.size());

View File

@ -124,7 +124,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
/// Не будем трогать последние replicated_logs_to_keep записей.
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep), entries.end());
/// Не будем трогать записи, не меньшие min_pointer.
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + storage.padIndex(min_pointer)), entries.end());
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_pointer)), entries.end());
if (entries.empty())
return;

View File

@ -1,48 +1,14 @@
#include <zkutil/Types.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/IO/Operators.h>
#include <DB/IO/ReadBufferFromString.h>
namespace DB
{
FuturePartTagger::FuturePartTagger(const String & part_, StorageReplicatedMergeTree & storage_)
: part(part_), storage(storage_)
{
if (!storage.future_parts.insert(part).second)
throw Exception("Tagging already tagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
FuturePartTagger::~FuturePartTagger()
{
try
{
std::unique_lock<std::mutex> lock(storage.queue_mutex);
if (!storage.future_parts.erase(part))
throw Exception("Untagging already untagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void ReplicatedMergeTreeLogEntry::addResultToVirtualParts(StorageReplicatedMergeTree & storage)
{
if (type == MERGE_PARTS || type == GET_PART || type == DROP_RANGE || type == ATTACH_PART)
storage.virtual_parts.add(new_part_name);
}
void ReplicatedMergeTreeLogEntry::tagPartAsFuture(StorageReplicatedMergeTree & storage)
{
if (type == MERGE_PARTS || type == GET_PART || type == ATTACH_PART)
future_part_tagger = new FuturePartTagger(new_part_name, storage);
}
void ReplicatedMergeTreeLogEntry::writeText(WriteBuffer & out) const
{
out << "format version: 3\n"

View File

@ -164,8 +164,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
createNewZooKeeperNodes();
initVirtualParts();
String unreplicated_path = full_path + "unreplicated/";
if (Poco::File(unreplicated_path).exists())
{
@ -189,7 +187,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
}
loadQueue();
queue.initialize(
zookeeper_path, replica_path,
database_name + "." + table_name + " (ReplicatedMergeTreeQueue)",
data.getDataParts(), current_zookeeper);
/// В этом потоке реплика будет активирована.
restarting_thread.reset(new ReplicatedMergeTreeRestartingThread(*this));
@ -527,7 +528,7 @@ void StorageReplicatedMergeTree::createReplica()
zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential);
}
/// Далее оно будет загружено в переменную queue в методе loadQueue.
/// Далее оно будет загружено в переменную queue в методе queue.load.
LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries");
}
@ -666,7 +667,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
log_entry.new_part_name = name;
log_entry.create_time = tryGetPartCreateTime(zookeeper, replica_path, name);
/// Полагаемся, что это происходит до загрузки очереди (loadQueue).
/// Полагаемся, что это происходит до загрузки очереди (queue.load).
zkutil::Ops ops;
removePartFromZooKeeper(name, ops);
ops.push_back(new zkutil::Op::Create(
@ -683,14 +684,6 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
}
void StorageReplicatedMergeTree::initVirtualParts()
{
auto parts = data.getDataParts();
for (const auto & part : parts)
virtual_parts.add(part->name);
}
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String part_name)
{
auto zookeeper = getZooKeeper();
@ -762,171 +755,13 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData:
}
void StorageReplicatedMergeTree::loadQueue()
{
auto zookeeper = getZooKeeper();
std::lock_guard<std::mutex> lock(queue_mutex);
Strings children = zookeeper->getChildren(replica_path + "/queue");
std::sort(children.begin(), children.end());
std::vector<std::pair<String, zkutil::ZooKeeper::GetFuture>> futures;
futures.reserve(children.size());
for (const String & child : children)
futures.emplace_back(child, zookeeper->asyncGet(replica_path + "/queue/" + child));
for (auto & future : futures)
{
zkutil::ZooKeeper::ValueAndStat res = future.second.get();
LogEntryPtr entry = LogEntry::parse(res.value, res.stat);
entry->znode_name = future.first;
entry->addResultToVirtualParts(*this);
queue.push_back(entry);
}
}
void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event)
{
auto zookeeper = getZooKeeper();
std::lock_guard<std::mutex> lock(queue_mutex);
String index_str = zookeeper->get(replica_path + "/log_pointer");
UInt64 index;
if (index_str.empty())
if (queue.pullLogsToQueue(getZooKeeper(), next_update_event))
{
/// Если у нас еще нет указателя на лог, поставим указатель на первую запись в нем.
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
index = entries.empty() ? 0 : parse<UInt64>(std::min_element(entries.begin(), entries.end())->substr(strlen("log-")));
zookeeper->set(replica_path + "/log_pointer", toString(index));
if (queue_task_handle)
queue_task_handle->wake();
}
else
{
index = parse<UInt64>(index_str);
}
UInt64 first_index = index;
size_t count = 0;
String entry_str;
zkutil::Stat stat;
while (zookeeper->tryGet(zookeeper_path + "/log/log-" + padIndex(index), entry_str, &stat))
{
++count;
++index;
LogEntryPtr entry = LogEntry::parse(entry_str, stat);
/// Одновременно добавим запись в очередь и продвинем указатель на лог.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", entry_str, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/log_pointer", toString(index), -1));
auto results = zookeeper->multi(ops);
String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
entry->addResultToVirtualParts(*this);
queue.push_back(entry);
}
last_queue_update = time(0);
if (next_update_event)
{
if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event))
next_update_event->set();
}
if (!count)
return;
if (queue_task_handle)
queue_task_handle->wake();
LOG_DEBUG(log, "Pulled " << count << " entries to queue: log-" << padIndex(first_index) << " - log-" << padIndex(index - 1));
}
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason)
{
/// queue_mutex уже захвачен. Функция вызывается только из queueTask.
if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
{
/// Проверим, не создаётся ли сейчас этот же кусок другим действием.
if (future_parts.count(entry.new_part_name))
{
String reason = "Not executing log entry for part " + entry.new_part_name
+ " because another log entry for the same part is being processed. This shouldn't happen often.";
LOG_DEBUG(log, reason);
out_postpone_reason = reason;
return false;
/** Когда соответствующее действие завершится, то shouldExecuteLogEntry, в следующий раз, пройдёт успешно,
* и элемент очереди будет обработан. Сразу же в функции executeLogEntry будет выяснено, что кусок у нас уже есть,
* и элемент очереди будет сразу считаться обработанным.
*/
}
/// Более сложная проверка - не создаётся ли сейчас другим действием кусок, который покроет этот кусок.
/// NOTE То, что выше - избыточно, но оставлено ради более удобного сообщения в логе.
ActiveDataPartSet::Part result_part;
ActiveDataPartSet::parsePartName(entry.new_part_name, result_part);
/// Оно может тормозить при большом размере future_parts. Но он не может быть большим, так как ограничен BackgroundProcessingPool.
for (const auto & future_part_name : future_parts)
{
ActiveDataPartSet::Part future_part;
ActiveDataPartSet::parsePartName(future_part_name, future_part);
if (future_part.contains(result_part))
{
String reason = "Not executing log entry for part " + entry.new_part_name
+ " because another log entry for covering part " + future_part_name + " is being processed.";
LOG_DEBUG(log, reason);
out_postpone_reason = reason;
return false;
}
}
}
if (entry.type == LogEntry::MERGE_PARTS)
{
/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.
* Иначе, даже если всех нужных частей для мерджа нет, нужно попытаться сделать мердж.
* Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок.
* Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди.
*/
for (const auto & name : entry.parts_to_merge)
{
if (future_parts.count(name))
{
String reason = "Not merging into part " + entry.new_part_name
+ " because part " + name + " is not ready yet (log entry for that part is being processed).";
LOG_TRACE(log, reason);
out_postpone_reason = reason;
return false;
}
}
if (merger.isCancelled())
{
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges are cancelled now.";
LOG_DEBUG(log, reason);
out_postpone_reason = reason;
return false;
}
}
return true;
}
@ -1227,51 +1062,12 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
*/
try
{
std::lock_guard<std::mutex> lock(queue_mutex);
auto parts_for_merge = queue.moveSiblingPartsForMergeToEndOfQueue(entry.new_part_name);
/// Найдем действие по объединению этого куска с другими. Запомним других.
StringSet parts_for_merge;
LogEntries::iterator merge_entry;
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
if (!parts_for_merge.empty() && replica.empty())
{
if ((*it)->type == LogEntry::MERGE_PARTS)
{
if (std::find((*it)->parts_to_merge.begin(), (*it)->parts_to_merge.end(), entry.new_part_name)
!= (*it)->parts_to_merge.end())
{
parts_for_merge = StringSet((*it)->parts_to_merge.begin(), (*it)->parts_to_merge.end());
merge_entry = it;
break;
}
}
}
if (!parts_for_merge.empty())
{
/// Переместим в конец очереди действия, получающие parts_for_merge.
for (LogEntries::iterator it = queue.begin(); it != queue.end();)
{
auto it0 = it;
++it;
if (it0 == merge_entry)
break;
if (((*it0)->type == LogEntry::MERGE_PARTS || (*it0)->type == LogEntry::GET_PART)
&& parts_for_merge.count((*it0)->new_part_name))
{
queue.splice(queue.end(), queue, it0, it);
}
}
/** Если этого куска ни у кого нет, но в очереди упоминается мердж с его участием, то наверно этот кусок такой старый,
* что его все померджили и удалили. Не будем бросать исключение, чтобы queueTask лишний раз не спала.
*/
if (replica.empty())
{
LOG_INFO(log, "No active replica has part " << entry.new_part_name << ". Will fetch merged part instead.");
return false;
}
LOG_INFO(log, "No active replica has part " << entry.new_part_name << ". Will fetch merged part instead.");
return false;
}
/** Если ни у какой активной реплики нет куска, и в очереди нет слияний с его участием,
@ -1299,37 +1095,7 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << ".");
{
LogEntries to_wait;
size_t removed_entries = 0;
/// Удалим из очереди операции с кусками, содержащимися в удаляемом диапазоне.
std::unique_lock<std::mutex> lock(queue_mutex);
for (LogEntries::iterator it = queue.begin(); it != queue.end();)
{
if (((*it)->type == LogEntry::GET_PART || (*it)->type == LogEntry::MERGE_PARTS) &&
ActiveDataPartSet::contains(entry.new_part_name, (*it)->new_part_name))
{
if ((*it)->currently_executing)
to_wait.push_back(*it);
auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
if (code != ZOK)
LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": "
<< zkutil::ZooKeeper::error2string(code));
queue.erase(it++);
++removed_entries;
}
else
++it;
}
LOG_DEBUG(log, "Removed " << removed_entries << " entries from queue. "
"Waiting for " << to_wait.size() << " entries that are currently executing.");
/// Дождемся завершения операций с кусками, содержащимися в удаляемом диапазоне.
for (LogEntryPtr & entry : to_wait)
entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; });
}
queue.removeGetsAndMergesInRange(zookeeper, entry.new_part_name);
LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts.");
size_t removed_parts = 0;
@ -1445,32 +1211,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p
try
{
std::lock_guard<std::mutex> lock(queue_mutex);
bool empty = queue.empty();
if (!empty)
{
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
{
if ((*it)->currently_executing)
continue;
if (shouldExecuteLogEntry(**it, (*it)->postpone_reason))
{
entry = *it;
entry->tagPartAsFuture(*this);
queue.splice(queue.end(), queue, it);
entry->currently_executing = true;
++entry->num_tries;
entry->last_attempt_time = time(0);
break;
}
else
{
++(*it)->num_postponed;
(*it)->last_postpone_time = time(0);
}
}
}
entry = queue.selectEntryToProcess(merger);
}
catch (...)
{
@ -1480,79 +1221,37 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p
if (!entry)
return false;
bool was_exception = true;
bool success = false;
std::exception_ptr saved_exception;
try
bool res = queue.processEntry(getZooKeeper(), entry, [&](LogEntryPtr & entry)
{
try
{
if (executeLogEntry(*entry, pool_context))
return executeLogEntry(*entry, pool_context);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
{
auto zookeeper = getZooKeeper();
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name);
if (code != ZOK)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry->znode_name << ": "
<< zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often.");
success = true;
/// Если ни у кого нет нужного куска, наверно, просто не все реплики работают; не будем писать в лог с уровнем Error.
LOG_INFO(log, e.displayText());
}
else if (e.code() == ErrorCodes::ABORTED)
{
/// Прерванный мердж или скачивание куска - не ошибка.
LOG_INFO(log, e.message());
}
else
tryLogCurrentException(__PRETTY_FUNCTION__);
}
catch (...)
{
saved_exception = std::current_exception();
throw;
}
was_exception = false;
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
{
/// Если ни у кого нет нужного куска, наверно, просто не все реплики работают; не будем писать в лог с уровнем Error.
LOG_INFO(log, e.displayText());
}
else if (e.code() == ErrorCodes::ABORTED)
{
/// Прерванный мердж или скачивание куска - не ошибка.
LOG_INFO(log, e.message());
}
else
tryLogCurrentException(__PRETTY_FUNCTION__);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
entry->future_part_tagger = nullptr;
std::lock_guard<std::mutex> lock(queue_mutex);
entry->currently_executing = false;
entry->exception = saved_exception;
entry->execution_complete.notify_all();
if (success)
{
/// Удалим задание из очереди.
/// Нельзя просто обратиться по заранее сохраненному итератору, потому что задание мог успеть удалить кто-то другой.
for (LogEntries::iterator it = queue.end(); it != queue.begin();)
{
--it;
if (*it == entry)
{
queue.erase(it);
break;
}
}
}
return false;
});
/// Если не было исключения, не нужно спать.
return !was_exception;
return res;
}
@ -1580,8 +1279,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) -> bool
{
/// Если какой-то из кусков уже собираются слить в больший, не соглашаемся его сливать.
if (virtual_parts.getContainingPart(left->name) != left->name ||
virtual_parts.getContainingPart(right->name) != right->name)
if (queue.partWillBeMergedOrMergesDisabled(left->name) || queue.partWillBeMergedOrMergesDisabled(right->name))
return false;
auto key = std::make_pair(left->name, right->name);
@ -1635,8 +1333,8 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
if (need_pull)
{
/// Нужно загрузить новую запись в очередь перед тем, как выбирать куски для слияния.
/// (чтобы кусок добавился в virtual_parts).
/// Нужно загрузить новые записи в очередь перед тем, как выбирать куски для слияния.
/// (чтобы мы знали, какие куски уже собираются сливать).
pullLogsToQueue();
need_pull = false;
}
@ -1653,31 +1351,15 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
if (big_merges_current < max_number_of_big_merges)
{
std::lock_guard<std::mutex> lock(queue_mutex);
for (const auto & entry : queue)
{
if (entry->type == LogEntry::MERGE_PARTS)
queue.countMerges(merges_queued, big_merges_queued, max_number_of_big_merges - big_merges_current,
[&](const String & name)
{
++merges_queued;
MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name);
if (!part || part->name != name)
return false;
if (big_merges_current + big_merges_queued < max_number_of_big_merges)
{
for (const String & name : entry->parts_to_merge)
{
MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name);
if (!part || part->name != name)
continue;
if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small)
{
++big_merges_queued;
break;
}
}
}
}
}
return part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small;
});
}
bool only_small = big_merges_current + big_merges_queued >= max_number_of_big_merges;
@ -1691,8 +1373,6 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
do
{
auto zookeeper = getZooKeeper();
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges (" << merges_queued
@ -1713,6 +1393,8 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
break;
}
auto zookeeper = getZooKeeper();
bool all_in_zk = true;
for (const auto & part : parts)
{
@ -2018,14 +1700,9 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
auto results = zookeeper->multi(ops);
{
std::lock_guard<std::mutex> lock(queue_mutex);
String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
log_entry->addResultToVirtualParts(*this);
queue.push_back(log_entry);
}
String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
queue.insert(log_entry);
}
@ -2131,24 +1808,7 @@ void StorageReplicatedMergeTree::searchForMissingPart(const String & part_name)
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/// Есть ли он в очереди репликации? Если есть - удалим, так как задачу невозможно обработать.
bool was_in_queue = false;
{
std::lock_guard<std::mutex> lock(queue_mutex);
for (LogEntries::iterator it = queue.begin(); it != queue.end();)
{
if ((*it)->new_part_name == part_name)
{
zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
queue.erase(it++);
was_in_queue = true;
}
else
++it;
}
}
if (!was_in_queue)
if (!queue.remove(zookeeper, part_name))
{
/// Куска не было в нашей очереди. С чего бы это?
LOG_ERROR(log, "Checker: Missing part " << part_name << " is not in our queue.");
@ -2915,13 +2575,13 @@ void StorageReplicatedMergeTree::dropPartition(ASTPtr query, const Field & field
String fake_part_name = getFakePartNameForDrop(month_name, 0, right);
/** Запретим выбирать для слияния удаляемые куски - сделаем вид, что их всех уже собираются слить в fake_part_name.
/** Запретим выбирать для слияния удаляемые куски.
* Инвариант: после появления в логе записи DROP_RANGE, в логе не появятся слияния удаляемых кусков.
*/
{
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
virtual_parts.add(fake_part_name);
queue.disableMergesInRange(fake_part_name);
}
/// Наконец, добившись нужных инвариантов, можно положить запись в лог.
@ -3274,46 +2934,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
res.is_readonly = is_readonly;
res.is_session_expired = !zookeeper || zookeeper->expired();
{
std::lock_guard<std::mutex> lock(queue_mutex);
res.future_parts = future_parts.size();
res.queue_size = queue.size();
res.last_queue_update = last_queue_update;
res.inserts_in_queue = 0;
res.merges_in_queue = 0;
res.queue_oldest_time = 0;
res.inserts_oldest_time = 0;
res.merges_oldest_time = 0;
for (const LogEntryPtr & entry : queue)
{
if (entry->create_time && (!res.queue_oldest_time || entry->create_time < res.queue_oldest_time))
res.queue_oldest_time = entry->create_time;
if (entry->type == LogEntry::GET_PART)
{
++res.inserts_in_queue;
if (entry->create_time && (!res.inserts_oldest_time || entry->create_time < res.inserts_oldest_time))
{
res.inserts_oldest_time = entry->create_time;
res.oldest_part_to_get = entry->new_part_name;
}
}
if (entry->type == LogEntry::MERGE_PARTS)
{
++res.merges_in_queue;
if (entry->create_time && (!res.merges_oldest_time || entry->create_time < res.merges_oldest_time))
{
res.merges_oldest_time = entry->create_time;
res.oldest_part_to_merge_to = entry->new_part_name;
}
}
}
}
res.queue = queue.getStatus();
{
std::lock_guard<std::mutex> lock(parts_to_check_mutex);
@ -3362,13 +2983,8 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica_name_)
{
res.clear();
replica_name_ = replica_name;
std::lock_guard<std::mutex> lock(queue_mutex);
res.reserve(queue.size());
for (const auto & entry : queue)
res.emplace_back(*entry);
queue.getEntries(res);
}

View File

@ -149,23 +149,23 @@ BlockInputStreams StorageSystemReplicas::read(
col_is_leader .column->insert(UInt64(status.is_leader));
col_is_readonly .column->insert(UInt64(status.is_readonly));
col_is_session_expired .column->insert(UInt64(status.is_session_expired));
col_future_parts .column->insert(UInt64(status.future_parts));
col_future_parts .column->insert(UInt64(status.queue.future_parts));
col_parts_to_check .column->insert(UInt64(status.parts_to_check));
col_zookeeper_path .column->insert(status.zookeeper_path);
col_replica_name .column->insert(status.replica_name);
col_replica_path .column->insert(status.replica_path);
col_columns_version .column->insert(Int64(status.columns_version));
col_queue_size .column->insert(UInt64(status.queue_size));
col_inserts_in_queue .column->insert(UInt64(status.inserts_in_queue));
col_merges_in_queue .column->insert(UInt64(status.merges_in_queue));
col_queue_oldest_time .column->insert(UInt64(status.queue_oldest_time));
col_inserts_oldest_time .column->insert(UInt64(status.inserts_oldest_time));
col_merges_oldest_time .column->insert(UInt64(status.merges_oldest_time));
col_oldest_part_to_get .column->insert(status.oldest_part_to_get);
col_oldest_part_to_merge_to.column->insert(status.oldest_part_to_merge_to);
col_queue_size .column->insert(UInt64(status.queue.queue_size));
col_inserts_in_queue .column->insert(UInt64(status.queue.inserts_in_queue));
col_merges_in_queue .column->insert(UInt64(status.queue.merges_in_queue));
col_queue_oldest_time .column->insert(UInt64(status.queue.queue_oldest_time));
col_inserts_oldest_time .column->insert(UInt64(status.queue.inserts_oldest_time));
col_merges_oldest_time .column->insert(UInt64(status.queue.merges_oldest_time));
col_oldest_part_to_get .column->insert(status.queue.oldest_part_to_get);
col_oldest_part_to_merge_to.column->insert(status.queue.oldest_part_to_merge_to);
col_log_max_index .column->insert(status.log_max_index);
col_log_pointer .column->insert(status.log_pointer);
col_last_queue_update .column->insert(UInt64(status.last_queue_update));
col_last_queue_update .column->insert(UInt64(status.queue.last_queue_update));
col_total_replicas .column->insert(UInt64(status.total_replicas));
col_active_replicas .column->insert(UInt64(status.active_replicas));
}