From a5b8166541f635fb8a54501f547d2ed44f7635e4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 10 Jan 2016 07:43:30 +0300 Subject: [PATCH] dbms: better [#METR-19586]. --- .../MergeTree/ReplicatedMergeTreeLogEntry.h | 28 +- .../DB/Storages/StorageReplicatedMergeTree.h | 66 +-- .../MergeTree/MergeTreeDataMerger.cpp | 2 +- .../ReplicatedMergeTreeCleanupThread.cpp | 2 +- .../MergeTree/ReplicatedMergeTreeLogEntry.cpp | 36 +- .../Storages/StorageReplicatedMergeTree.cpp | 492 ++---------------- .../Storages/System/StorageSystemReplicas.cpp | 20 +- 7 files changed, 82 insertions(+), 564 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index 9bef6d7a98f..e8fc1041b08 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -17,23 +17,7 @@ namespace DB class ReadBuffer; class WriteBuffer; -class StorageReplicatedMergeTree; - - -/** Добавляет кусок в множество future_parts; в деструкторе убирает. - * future_parts - множество кусков, которые будут созданы после выполнения - * выполняющихся в данный момент элементов очереди. - */ -struct FuturePartTagger -{ - String part; - StorageReplicatedMergeTree & storage; - - FuturePartTagger(const String & part_, StorageReplicatedMergeTree & storage_); - ~FuturePartTagger(); -}; - -typedef Poco::SharedPtr FuturePartTaggerPtr; +class ReplicatedMergeTreeQueue; /// Запись о том, что нужно сделать. Только данные (их можно копировать). @@ -81,12 +65,12 @@ struct ReplicatedMergeTreeLogEntryData /// Нужно переносить из директории unreplicated, а не detached. bool attach_unreplicated = false; - /// Доступ под queue_mutex. + /// Доступ под queue_mutex, см. ReplicatedMergeTreeQueue. bool currently_executing = false; /// Выполняется ли действие сейчас. /// Эти несколько полей имеют лишь информационный характер (для просмотра пользователем с помощью системных таблиц). - /// Доступ под queue_mutex. + /// Доступ под queue_mutex, см. ReplicatedMergeTreeQueue. size_t num_tries = 0; /// Количество попыток выполнить действие (с момента старта сервера; включая выполняющееся). - std::exception_ptr exception; /// Последний эксепшен, в случае безуспешной попытки выполнить действие. + std::exception_ptr exception; /// Последний эксепшен, в случае безуспешной попытки выполнить действие. time_t last_attempt_time = 0; /// Время начала последней попытки выполнить действие. size_t num_postponed = 0; /// Количество раз, когда действие было отложено. String postpone_reason; /// Причина, по которой действие было отложено, если оно отложено. @@ -104,12 +88,8 @@ struct ReplicatedMergeTreeLogEntry : ReplicatedMergeTreeLogEntryData { typedef Poco::SharedPtr Ptr; - FuturePartTaggerPtr future_part_tagger; std::condition_variable execution_complete; /// Пробуждается когда currently_executing становится false. - void addResultToVirtualParts(StorageReplicatedMergeTree & storage); - void tagPartAsFuture(StorageReplicatedMergeTree & storage); - void writeText(WriteBuffer & out) const; void readText(ReadBuffer & in); diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 0c7ed307d2c..e73ad1e999f 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -153,23 +154,14 @@ public: bool is_leader; bool is_readonly; bool is_session_expired; - UInt32 future_parts; + ReplicatedMergeTreeQueue::Status queue; UInt32 parts_to_check; String zookeeper_path; String replica_name; String replica_path; Int32 columns_version; - UInt32 queue_size; - UInt32 inserts_in_queue; - UInt32 merges_in_queue; - UInt32 queue_oldest_time; - UInt32 inserts_oldest_time; - UInt32 merges_oldest_time; - String oldest_part_to_get; - String oldest_part_to_merge_to; UInt64 log_max_index; UInt64 log_pointer; - UInt32 last_queue_update; UInt8 total_replicas; UInt8 active_replicas; }; @@ -189,15 +181,12 @@ private: friend class ReplicatedMergeTreeRestartingThread; friend class ReplicatedMergeTreeCleanupThread; friend struct ReplicatedMergeTreeLogEntry; - friend struct FuturePartTagger; - typedef ReplicatedMergeTreeLogEntry LogEntry; - typedef LogEntry::Ptr LogEntryPtr; + using LogEntry = ReplicatedMergeTreeLogEntry; + using LogEntryPtr = LogEntry::Ptr; - typedef std::list LogEntries; - - typedef std::set StringSet; - typedef std::list StringList; + using StringSet = std::set; + using StringList = std::list; Context & context; @@ -219,21 +208,6 @@ private: /// Если true, таблица в офлайновом режиме, и в нее нельзя писать. bool is_readonly = false; - /// Каким будет множество активных кусков после выполнения всей текущей очереди. - ActiveDataPartSet virtual_parts; - - /** Очередь того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/). - * В ZK записи в хронологическом порядке. Здесь - не обязательно. - */ - LogEntries queue; - time_t last_queue_update = 0; - std::mutex queue_mutex; - - /** Куски, которые появятся в результате действий, выполняемых прямо сейчас фоновыми потоками (этих действий нет в очереди). - * Использовать под залоченным queue_mutex. - */ - StringSet future_parts; - /** Куски, для которых нужно проверить одно из двух: * - Если кусок у нас есть, сверить, его данные с его контрольными суммами, а их с ZooKeeper. * - Если куска у нас нет, проверить, есть ли он (или покрывающий его кусок) хоть у кого-то. @@ -251,6 +225,11 @@ private: String replica_name; String replica_path; + /** Очередь того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/). + * В ZK записи в хронологическом порядке. Здесь - не обязательно. + */ + ReplicatedMergeTreeQueue queue; + /** /replicas/me/is_active. */ zkutil::EphemeralNodeHolderPtr replica_is_active_node; @@ -360,10 +339,6 @@ private: */ void checkParts(bool skip_sanity_checks); - /// Положить все куски из data в virtual_parts. - void initVirtualParts(); - - /** Проверить, что чексумма куска совпадает с чексуммой того же куска на какой-нибудь другой реплике. * Если ни у кого нет такого куска, ничего не проверяет. * Не очень надежно: если две реплики добавляют кусок почти одновременно, ни одной проверки не произойдет. @@ -380,20 +355,11 @@ private: /// Выполнение заданий из очереди. - /** Кладет в queue записи из ZooKeeper (/replicas/me/queue/). - */ - void loadQueue(); - /** Копирует новые записи из логов всех реплик в очередь этой реплики. * Если next_update_event != nullptr, вызовет это событие, когда в логе появятся новые записи. */ void pullLogsToQueue(zkutil::EventPtr next_update_event = nullptr); - /** Можно ли сейчас попробовать выполнить это действие. Если нет, нужно оставить его в очереди и попробовать выполнить другое. - * Вызывается под queue_mutex. - */ - bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason); - /** Выполнить действие из очереди. Бросает исключение, если что-то не так. * Возвращает, получилось ли выполнить. Если не получилось, запись нужно положить в конец очереди. */ @@ -455,16 +421,6 @@ private: /** Дождаться, пока указанная реплика выполнит указанное действие из лога. */ void waitForReplicaToProcessLogEntry(const String & replica_name, const LogEntry & entry); - - /** Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper. - * Поддерживаются также отрицательные числа - для них имя ноды выглядит несколько глупо - * и не соответствует никакой автоинкрементной ноде в ZK. - */ - static String padIndex(Int64 index) - { - String index_str = toString(index); - return std::string(10 - index_str.size(), '0') + index_str; - } }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index b827de1aeff..3a71396aec1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -448,7 +448,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts( * затем попадаем сюда. * Ситуация - было заменено M > N кусков тоже нормальная. * - * Хотя это должно предотвращаться проверкой в методе StorageReplicatedMergeTree::shouldExecuteLogEntry. + * Хотя это должно предотвращаться проверкой в методе ReplicatedMergeTreeQueue::shouldExecuteLogEntry. */ LOG_WARNING(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size() << " instead of " << parts.size()); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index dda615db792..428e8f6ad05 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -124,7 +124,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() /// Не будем трогать последние replicated_logs_to_keep записей. entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep), entries.end()); /// Не будем трогать записи, не меньшие min_pointer. - entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + storage.padIndex(min_pointer)), entries.end()); + entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_pointer)), entries.end()); if (entries.empty()) return; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 9d1a40b3f69..eb995b626de 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -1,48 +1,14 @@ #include #include -#include #include +#include namespace DB { -FuturePartTagger::FuturePartTagger(const String & part_, StorageReplicatedMergeTree & storage_) - : part(part_), storage(storage_) -{ - if (!storage.future_parts.insert(part).second) - throw Exception("Tagging already tagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); -} - -FuturePartTagger::~FuturePartTagger() -{ - try - { - std::unique_lock lock(storage.queue_mutex); - if (!storage.future_parts.erase(part)) - throw Exception("Untagging already untagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } -} - - -void ReplicatedMergeTreeLogEntry::addResultToVirtualParts(StorageReplicatedMergeTree & storage) -{ - if (type == MERGE_PARTS || type == GET_PART || type == DROP_RANGE || type == ATTACH_PART) - storage.virtual_parts.add(new_part_name); -} - -void ReplicatedMergeTreeLogEntry::tagPartAsFuture(StorageReplicatedMergeTree & storage) -{ - if (type == MERGE_PARTS || type == GET_PART || type == ATTACH_PART) - future_part_tagger = new FuturePartTagger(new_part_name, storage); -} - void ReplicatedMergeTreeLogEntry::writeText(WriteBuffer & out) const { out << "format version: 3\n" diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 4f1e8ccc630..49a610128a2 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -164,8 +164,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( createNewZooKeeperNodes(); - initVirtualParts(); - String unreplicated_path = full_path + "unreplicated/"; if (Poco::File(unreplicated_path).exists()) { @@ -189,7 +187,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } } - loadQueue(); + queue.initialize( + zookeeper_path, replica_path, + database_name + "." + table_name + " (ReplicatedMergeTreeQueue)", + data.getDataParts(), current_zookeeper); /// В этом потоке реплика будет активирована. restarting_thread.reset(new ReplicatedMergeTreeRestartingThread(*this)); @@ -527,7 +528,7 @@ void StorageReplicatedMergeTree::createReplica() zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential); } - /// Далее оно будет загружено в переменную queue в методе loadQueue. + /// Далее оно будет загружено в переменную queue в методе queue.load. LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries"); } @@ -666,7 +667,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) log_entry.new_part_name = name; log_entry.create_time = tryGetPartCreateTime(zookeeper, replica_path, name); - /// Полагаемся, что это происходит до загрузки очереди (loadQueue). + /// Полагаемся, что это происходит до загрузки очереди (queue.load). zkutil::Ops ops; removePartFromZooKeeper(name, ops); ops.push_back(new zkutil::Op::Create( @@ -683,14 +684,6 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) } -void StorageReplicatedMergeTree::initVirtualParts() -{ - auto parts = data.getDataParts(); - for (const auto & part : parts) - virtual_parts.add(part->name); -} - - void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String part_name) { auto zookeeper = getZooKeeper(); @@ -762,171 +755,13 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData: } -void StorageReplicatedMergeTree::loadQueue() -{ - auto zookeeper = getZooKeeper(); - - std::lock_guard lock(queue_mutex); - - Strings children = zookeeper->getChildren(replica_path + "/queue"); - std::sort(children.begin(), children.end()); - - std::vector> futures; - futures.reserve(children.size()); - - for (const String & child : children) - futures.emplace_back(child, zookeeper->asyncGet(replica_path + "/queue/" + child)); - - for (auto & future : futures) - { - zkutil::ZooKeeper::ValueAndStat res = future.second.get(); - LogEntryPtr entry = LogEntry::parse(res.value, res.stat); - - entry->znode_name = future.first; - entry->addResultToVirtualParts(*this); - queue.push_back(entry); - } -} - - void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event) { - auto zookeeper = getZooKeeper(); - - std::lock_guard lock(queue_mutex); - - String index_str = zookeeper->get(replica_path + "/log_pointer"); - UInt64 index; - - if (index_str.empty()) + if (queue.pullLogsToQueue(getZooKeeper(), next_update_event)) { - /// Если у нас еще нет указателя на лог, поставим указатель на первую запись в нем. - Strings entries = zookeeper->getChildren(zookeeper_path + "/log"); - index = entries.empty() ? 0 : parse(std::min_element(entries.begin(), entries.end())->substr(strlen("log-"))); - - zookeeper->set(replica_path + "/log_pointer", toString(index)); + if (queue_task_handle) + queue_task_handle->wake(); } - else - { - index = parse(index_str); - } - - UInt64 first_index = index; - - size_t count = 0; - String entry_str; - zkutil::Stat stat; - while (zookeeper->tryGet(zookeeper_path + "/log/log-" + padIndex(index), entry_str, &stat)) - { - ++count; - ++index; - - LogEntryPtr entry = LogEntry::parse(entry_str, stat); - - /// Одновременно добавим запись в очередь и продвинем указатель на лог. - zkutil::Ops ops; - ops.push_back(new zkutil::Op::Create( - replica_path + "/queue/queue-", entry_str, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential)); - ops.push_back(new zkutil::Op::SetData( - replica_path + "/log_pointer", toString(index), -1)); - auto results = zookeeper->multi(ops); - - String path_created = dynamic_cast(ops[0]).getPathCreated(); - entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1); - entry->addResultToVirtualParts(*this); - queue.push_back(entry); - } - - last_queue_update = time(0); - - if (next_update_event) - { - if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event)) - next_update_event->set(); - } - - if (!count) - return; - - if (queue_task_handle) - queue_task_handle->wake(); - - LOG_DEBUG(log, "Pulled " << count << " entries to queue: log-" << padIndex(first_index) << " - log-" << padIndex(index - 1)); -} - - -bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason) -{ - /// queue_mutex уже захвачен. Функция вызывается только из queueTask. - - if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART) - { - /// Проверим, не создаётся ли сейчас этот же кусок другим действием. - if (future_parts.count(entry.new_part_name)) - { - String reason = "Not executing log entry for part " + entry.new_part_name - + " because another log entry for the same part is being processed. This shouldn't happen often."; - LOG_DEBUG(log, reason); - out_postpone_reason = reason; - return false; - - /** Когда соответствующее действие завершится, то shouldExecuteLogEntry, в следующий раз, пройдёт успешно, - * и элемент очереди будет обработан. Сразу же в функции executeLogEntry будет выяснено, что кусок у нас уже есть, - * и элемент очереди будет сразу считаться обработанным. - */ - } - - /// Более сложная проверка - не создаётся ли сейчас другим действием кусок, который покроет этот кусок. - /// NOTE То, что выше - избыточно, но оставлено ради более удобного сообщения в логе. - ActiveDataPartSet::Part result_part; - ActiveDataPartSet::parsePartName(entry.new_part_name, result_part); - - /// Оно может тормозить при большом размере future_parts. Но он не может быть большим, так как ограничен BackgroundProcessingPool. - for (const auto & future_part_name : future_parts) - { - ActiveDataPartSet::Part future_part; - ActiveDataPartSet::parsePartName(future_part_name, future_part); - - if (future_part.contains(result_part)) - { - String reason = "Not executing log entry for part " + entry.new_part_name - + " because another log entry for covering part " + future_part_name + " is being processed."; - LOG_DEBUG(log, reason); - out_postpone_reason = reason; - return false; - } - } - } - - if (entry.type == LogEntry::MERGE_PARTS) - { - /** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции. - * Иначе, даже если всех нужных частей для мерджа нет, нужно попытаться сделать мердж. - * Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок. - * Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди. - */ - for (const auto & name : entry.parts_to_merge) - { - if (future_parts.count(name)) - { - String reason = "Not merging into part " + entry.new_part_name - + " because part " + name + " is not ready yet (log entry for that part is being processed)."; - LOG_TRACE(log, reason); - out_postpone_reason = reason; - return false; - } - } - - if (merger.isCancelled()) - { - String reason = "Not executing log entry for part " + entry.new_part_name + " because merges are cancelled now."; - LOG_DEBUG(log, reason); - out_postpone_reason = reason; - return false; - } - } - - return true; } @@ -1227,51 +1062,12 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro */ try { - std::lock_guard lock(queue_mutex); + auto parts_for_merge = queue.moveSiblingPartsForMergeToEndOfQueue(entry.new_part_name); - /// Найдем действие по объединению этого куска с другими. Запомним других. - StringSet parts_for_merge; - LogEntries::iterator merge_entry; - for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it) + if (!parts_for_merge.empty() && replica.empty()) { - if ((*it)->type == LogEntry::MERGE_PARTS) - { - if (std::find((*it)->parts_to_merge.begin(), (*it)->parts_to_merge.end(), entry.new_part_name) - != (*it)->parts_to_merge.end()) - { - parts_for_merge = StringSet((*it)->parts_to_merge.begin(), (*it)->parts_to_merge.end()); - merge_entry = it; - break; - } - } - } - - if (!parts_for_merge.empty()) - { - /// Переместим в конец очереди действия, получающие parts_for_merge. - for (LogEntries::iterator it = queue.begin(); it != queue.end();) - { - auto it0 = it; - ++it; - - if (it0 == merge_entry) - break; - - if (((*it0)->type == LogEntry::MERGE_PARTS || (*it0)->type == LogEntry::GET_PART) - && parts_for_merge.count((*it0)->new_part_name)) - { - queue.splice(queue.end(), queue, it0, it); - } - } - - /** Если этого куска ни у кого нет, но в очереди упоминается мердж с его участием, то наверно этот кусок такой старый, - * что его все померджили и удалили. Не будем бросать исключение, чтобы queueTask лишний раз не спала. - */ - if (replica.empty()) - { - LOG_INFO(log, "No active replica has part " << entry.new_part_name << ". Will fetch merged part instead."); - return false; - } + LOG_INFO(log, "No active replica has part " << entry.new_part_name << ". Will fetch merged part instead."); + return false; } /** Если ни у какой активной реплики нет куска, и в очереди нет слияний с его участием, @@ -1299,37 +1095,7 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << "."); - { - LogEntries to_wait; - size_t removed_entries = 0; - - /// Удалим из очереди операции с кусками, содержащимися в удаляемом диапазоне. - std::unique_lock lock(queue_mutex); - for (LogEntries::iterator it = queue.begin(); it != queue.end();) - { - if (((*it)->type == LogEntry::GET_PART || (*it)->type == LogEntry::MERGE_PARTS) && - ActiveDataPartSet::contains(entry.new_part_name, (*it)->new_part_name)) - { - if ((*it)->currently_executing) - to_wait.push_back(*it); - auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name); - if (code != ZOK) - LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": " - << zkutil::ZooKeeper::error2string(code)); - queue.erase(it++); - ++removed_entries; - } - else - ++it; - } - - LOG_DEBUG(log, "Removed " << removed_entries << " entries from queue. " - "Waiting for " << to_wait.size() << " entries that are currently executing."); - - /// Дождемся завершения операций с кусками, содержащимися в удаляемом диапазоне. - for (LogEntryPtr & entry : to_wait) - entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; }); - } + queue.removeGetsAndMergesInRange(zookeeper, entry.new_part_name); LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts."); size_t removed_parts = 0; @@ -1445,32 +1211,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p try { - std::lock_guard lock(queue_mutex); - bool empty = queue.empty(); - if (!empty) - { - for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it) - { - if ((*it)->currently_executing) - continue; - - if (shouldExecuteLogEntry(**it, (*it)->postpone_reason)) - { - entry = *it; - entry->tagPartAsFuture(*this); - queue.splice(queue.end(), queue, it); - entry->currently_executing = true; - ++entry->num_tries; - entry->last_attempt_time = time(0); - break; - } - else - { - ++(*it)->num_postponed; - (*it)->last_postpone_time = time(0); - } - } - } + entry = queue.selectEntryToProcess(merger); } catch (...) { @@ -1480,79 +1221,37 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p if (!entry) return false; - bool was_exception = true; - bool success = false; - std::exception_ptr saved_exception; - - try + bool res = queue.processEntry(getZooKeeper(), entry, [&](LogEntryPtr & entry) { try { - if (executeLogEntry(*entry, pool_context)) + return executeLogEntry(*entry, pool_context); + } + catch (const Exception & e) + { + if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART) { - auto zookeeper = getZooKeeper(); - auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name); - - if (code != ZOK) - LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry->znode_name << ": " - << zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often."); - - success = true; + /// Если ни у кого нет нужного куска, наверно, просто не все реплики работают; не будем писать в лог с уровнем Error. + LOG_INFO(log, e.displayText()); } + else if (e.code() == ErrorCodes::ABORTED) + { + /// Прерванный мердж или скачивание куска - не ошибка. + LOG_INFO(log, e.message()); + } + else + tryLogCurrentException(__PRETTY_FUNCTION__); } catch (...) { - saved_exception = std::current_exception(); - throw; - } - - was_exception = false; - } - catch (const Exception & e) - { - if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART) - { - /// Если ни у кого нет нужного куска, наверно, просто не все реплики работают; не будем писать в лог с уровнем Error. - LOG_INFO(log, e.displayText()); - } - else if (e.code() == ErrorCodes::ABORTED) - { - /// Прерванный мердж или скачивание куска - не ошибка. - LOG_INFO(log, e.message()); - } - else tryLogCurrentException(__PRETTY_FUNCTION__); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - - entry->future_part_tagger = nullptr; - - std::lock_guard lock(queue_mutex); - - entry->currently_executing = false; - entry->exception = saved_exception; - entry->execution_complete.notify_all(); - - if (success) - { - /// Удалим задание из очереди. - /// Нельзя просто обратиться по заранее сохраненному итератору, потому что задание мог успеть удалить кто-то другой. - for (LogEntries::iterator it = queue.end(); it != queue.begin();) - { - --it; - if (*it == entry) - { - queue.erase(it); - break; - } } - } + + return false; + }); /// Если не было исключения, не нужно спать. - return !was_exception; + return res; } @@ -1580,8 +1279,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) -> bool { /// Если какой-то из кусков уже собираются слить в больший, не соглашаемся его сливать. - if (virtual_parts.getContainingPart(left->name) != left->name || - virtual_parts.getContainingPart(right->name) != right->name) + if (queue.partWillBeMergedOrMergesDisabled(left->name) || queue.partWillBeMergedOrMergesDisabled(right->name)) return false; auto key = std::make_pair(left->name, right->name); @@ -1635,8 +1333,8 @@ void StorageReplicatedMergeTree::mergeSelectingThread() if (need_pull) { - /// Нужно загрузить новую запись в очередь перед тем, как выбирать куски для слияния. - /// (чтобы кусок добавился в virtual_parts). + /// Нужно загрузить новые записи в очередь перед тем, как выбирать куски для слияния. + /// (чтобы мы знали, какие куски уже собираются сливать). pullLogsToQueue(); need_pull = false; } @@ -1653,31 +1351,15 @@ void StorageReplicatedMergeTree::mergeSelectingThread() if (big_merges_current < max_number_of_big_merges) { - std::lock_guard lock(queue_mutex); - - for (const auto & entry : queue) - { - if (entry->type == LogEntry::MERGE_PARTS) + queue.countMerges(merges_queued, big_merges_queued, max_number_of_big_merges - big_merges_current, + [&](const String & name) { - ++merges_queued; + MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name); + if (!part || part->name != name) + return false; - if (big_merges_current + big_merges_queued < max_number_of_big_merges) - { - for (const String & name : entry->parts_to_merge) - { - MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name); - if (!part || part->name != name) - continue; - - if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small) - { - ++big_merges_queued; - break; - } - } - } - } - } + return part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small; + }); } bool only_small = big_merges_current + big_merges_queued >= max_number_of_big_merges; @@ -1691,8 +1373,6 @@ void StorageReplicatedMergeTree::mergeSelectingThread() do { - auto zookeeper = getZooKeeper(); - if (merges_queued >= data.settings.max_replicated_merges_in_queue) { LOG_TRACE(log, "Number of queued merges (" << merges_queued @@ -1713,6 +1393,8 @@ void StorageReplicatedMergeTree::mergeSelectingThread() break; } + auto zookeeper = getZooKeeper(); + bool all_in_zk = true; for (const auto & part : parts) { @@ -2018,14 +1700,9 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n auto results = zookeeper->multi(ops); - { - std::lock_guard lock(queue_mutex); - - String path_created = dynamic_cast(ops[0]).getPathCreated(); - log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1); - log_entry->addResultToVirtualParts(*this); - queue.push_back(log_entry); - } + String path_created = dynamic_cast(ops[0]).getPathCreated(); + log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1); + queue.insert(log_entry); } @@ -2131,24 +1808,7 @@ void StorageReplicatedMergeTree::searchForMissingPart(const String & part_name) ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); /// Есть ли он в очереди репликации? Если есть - удалим, так как задачу невозможно обработать. - bool was_in_queue = false; - { - std::lock_guard lock(queue_mutex); - - for (LogEntries::iterator it = queue.begin(); it != queue.end();) - { - if ((*it)->new_part_name == part_name) - { - zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name); - queue.erase(it++); - was_in_queue = true; - } - else - ++it; - } - } - - if (!was_in_queue) + if (!queue.remove(zookeeper, part_name)) { /// Куска не было в нашей очереди. С чего бы это? LOG_ERROR(log, "Checker: Missing part " << part_name << " is not in our queue."); @@ -2915,13 +2575,13 @@ void StorageReplicatedMergeTree::dropPartition(ASTPtr query, const Field & field String fake_part_name = getFakePartNameForDrop(month_name, 0, right); - /** Запретим выбирать для слияния удаляемые куски - сделаем вид, что их всех уже собираются слить в fake_part_name. + /** Запретим выбирать для слияния удаляемые куски. * Инвариант: после появления в логе записи DROP_RANGE, в логе не появятся слияния удаляемых кусков. */ { std::lock_guard merge_selecting_lock(merge_selecting_mutex); - virtual_parts.add(fake_part_name); + queue.disableMergesInRange(fake_part_name); } /// Наконец, добившись нужных инвариантов, можно положить запись в лог. @@ -3274,46 +2934,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) res.is_readonly = is_readonly; res.is_session_expired = !zookeeper || zookeeper->expired(); - { - std::lock_guard lock(queue_mutex); - res.future_parts = future_parts.size(); - res.queue_size = queue.size(); - res.last_queue_update = last_queue_update; - - res.inserts_in_queue = 0; - res.merges_in_queue = 0; - res.queue_oldest_time = 0; - res.inserts_oldest_time = 0; - res.merges_oldest_time = 0; - - for (const LogEntryPtr & entry : queue) - { - if (entry->create_time && (!res.queue_oldest_time || entry->create_time < res.queue_oldest_time)) - res.queue_oldest_time = entry->create_time; - - if (entry->type == LogEntry::GET_PART) - { - ++res.inserts_in_queue; - - if (entry->create_time && (!res.inserts_oldest_time || entry->create_time < res.inserts_oldest_time)) - { - res.inserts_oldest_time = entry->create_time; - res.oldest_part_to_get = entry->new_part_name; - } - } - - if (entry->type == LogEntry::MERGE_PARTS) - { - ++res.merges_in_queue; - - if (entry->create_time && (!res.merges_oldest_time || entry->create_time < res.merges_oldest_time)) - { - res.merges_oldest_time = entry->create_time; - res.oldest_part_to_merge_to = entry->new_part_name; - } - } - } - } + res.queue = queue.getStatus(); { std::lock_guard lock(parts_to_check_mutex); @@ -3362,13 +2983,8 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica_name_) { - res.clear(); replica_name_ = replica_name; - - std::lock_guard lock(queue_mutex); - res.reserve(queue.size()); - for (const auto & entry : queue) - res.emplace_back(*entry); + queue.getEntries(res); } diff --git a/dbms/src/Storages/System/StorageSystemReplicas.cpp b/dbms/src/Storages/System/StorageSystemReplicas.cpp index 8f8cc799b8a..2d1f64bb112 100644 --- a/dbms/src/Storages/System/StorageSystemReplicas.cpp +++ b/dbms/src/Storages/System/StorageSystemReplicas.cpp @@ -149,23 +149,23 @@ BlockInputStreams StorageSystemReplicas::read( col_is_leader .column->insert(UInt64(status.is_leader)); col_is_readonly .column->insert(UInt64(status.is_readonly)); col_is_session_expired .column->insert(UInt64(status.is_session_expired)); - col_future_parts .column->insert(UInt64(status.future_parts)); + col_future_parts .column->insert(UInt64(status.queue.future_parts)); col_parts_to_check .column->insert(UInt64(status.parts_to_check)); col_zookeeper_path .column->insert(status.zookeeper_path); col_replica_name .column->insert(status.replica_name); col_replica_path .column->insert(status.replica_path); col_columns_version .column->insert(Int64(status.columns_version)); - col_queue_size .column->insert(UInt64(status.queue_size)); - col_inserts_in_queue .column->insert(UInt64(status.inserts_in_queue)); - col_merges_in_queue .column->insert(UInt64(status.merges_in_queue)); - col_queue_oldest_time .column->insert(UInt64(status.queue_oldest_time)); - col_inserts_oldest_time .column->insert(UInt64(status.inserts_oldest_time)); - col_merges_oldest_time .column->insert(UInt64(status.merges_oldest_time)); - col_oldest_part_to_get .column->insert(status.oldest_part_to_get); - col_oldest_part_to_merge_to.column->insert(status.oldest_part_to_merge_to); + col_queue_size .column->insert(UInt64(status.queue.queue_size)); + col_inserts_in_queue .column->insert(UInt64(status.queue.inserts_in_queue)); + col_merges_in_queue .column->insert(UInt64(status.queue.merges_in_queue)); + col_queue_oldest_time .column->insert(UInt64(status.queue.queue_oldest_time)); + col_inserts_oldest_time .column->insert(UInt64(status.queue.inserts_oldest_time)); + col_merges_oldest_time .column->insert(UInt64(status.queue.merges_oldest_time)); + col_oldest_part_to_get .column->insert(status.queue.oldest_part_to_get); + col_oldest_part_to_merge_to.column->insert(status.queue.oldest_part_to_merge_to); col_log_max_index .column->insert(status.log_max_index); col_log_pointer .column->insert(status.log_pointer); - col_last_queue_update .column->insert(UInt64(status.last_queue_update)); + col_last_queue_update .column->insert(UInt64(status.queue.last_queue_update)); col_total_replicas .column->insert(UInt64(status.total_replicas)); col_active_replicas .column->insert(UInt64(status.active_replicas)); }