From b830e0b4fe3d14c7a4e25baa0b41360b748a4f47 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Mon, 26 May 2014 15:40:22 +0400 Subject: [PATCH] Merge --- .../DB/Storages/StorageReplicatedMergeTree.h | 54 ++--------- dbms/src/Storages/ITableDeclaration.cpp | 22 ++--- .../Storages/StorageReplicatedMergeTree.cpp | 91 ++++++++++++------- 3 files changed, 75 insertions(+), 92 deletions(-) diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 9e1fa62b5d2..e873865f0b3 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -72,45 +72,6 @@ public: private: friend class ReplicatedMergeTreeBlockOutputStream; - /// Добавляет куски в множество currently_merging. - struct CurrentlyMergingPartsTagger - { - Strings parts; - StorageReplicatedMergeTree & storage; - - CurrentlyMergingPartsTagger(const Strings & parts_, StorageReplicatedMergeTree & storage_) - : parts(parts_), storage(storage_) - { - Poco::ScopedLock lock(storage.currently_merging_mutex); - for (const auto & name : parts) - { - if (storage.currently_merging.count(name)) - throw Exception("Tagging alreagy tagged part " + name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); - } - storage.currently_merging.insert(parts.begin(), parts.end()); - } - - ~CurrentlyMergingPartsTagger() - { - try - { - Poco::ScopedLock lock(storage.currently_merging_mutex); - for (const auto & name : parts) - { - if (!storage.currently_merging.count(name)) - throw Exception("Untagging already untagged part " + name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); - storage.currently_merging.erase(name); - } - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - }; - - typedef Poco::SharedPtr CurrentlyMergingPartsTaggerPtr; - /// Добавляет кусок в множество future_parts. struct FuturePartTagger { @@ -156,13 +117,12 @@ private: String new_part_name; Strings parts_to_merge; - CurrentlyMergingPartsTaggerPtr currently_merging_tagger; FuturePartTaggerPtr future_part_tagger; - void tagPartsAsCurrentlyMerging(StorageReplicatedMergeTree & storage) + void addResultToVirtualParts(StorageReplicatedMergeTree & storage) { - if (type == MERGE_PARTS) - currently_merging_tagger = new CurrentlyMergingPartsTagger(parts_to_merge, storage); + if (type == MERGE_PARTS || type == GET_PART) + storage.virtual_parts.add(new_part_name); } void tagPartAsFuture(StorageReplicatedMergeTree & storage) @@ -205,9 +165,8 @@ private: /// Если true, таблица в офлайновом режиме, и в нее нельзя писать. bool is_read_only = false; - /// Куски, для которых в очереди есть задание на слияние. - StringSet currently_merging; - Poco::FastMutex currently_merging_mutex; + /// Каким будет множество активных кусков после выполнения всей текущей очереди. + ActiveDataPartSet virtual_parts; /** Очередь того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/). * В ZK записи в хронологическом порядке. Здесь - не обязательно. @@ -319,6 +278,9 @@ private: */ void checkParts(); + /// Положить все куски из data в virtual_parts. + void initVirtualParts(); + /// Запустить или остановить фоновые потоки. Используется для частичной переинициализации при пересоздании сессии в ZooKeeper. void startup(); void partialShutdown(); diff --git a/dbms/src/Storages/ITableDeclaration.cpp b/dbms/src/Storages/ITableDeclaration.cpp index 8df9b195b73..385f246ab17 100644 --- a/dbms/src/Storages/ITableDeclaration.cpp +++ b/dbms/src/Storages/ITableDeclaration.cpp @@ -37,7 +37,7 @@ NameAndTypePair ITableDeclaration::getRealColumn(const String & column_name) con for (auto & it : real_columns) if (it.first == column_name) return it; - throw Exception("There is no column " + column_name + " in table " + getTableName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + throw Exception("There is no column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); } @@ -60,7 +60,7 @@ const DataTypePtr ITableDeclaration::getDataTypeByName(const String & column_nam if (it->first == column_name) return it->second; - throw Exception("There is no column " + column_name + " in table " + getTableName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + throw Exception("There is no column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); } @@ -115,8 +115,7 @@ void ITableDeclaration::check(const Names & column_names) const const NamesAndTypesList & available_columns = getColumnsList(); if (column_names.empty()) - throw Exception("Empty list of columns queried for table " + getTableName() - + ". There are columns: " + listOfColumns(available_columns), + throw Exception("Empty list of columns queried. There are columns: " + listOfColumns(available_columns), ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED); const NamesAndTypesMap & columns_map = getColumnsMap(available_columns); @@ -128,12 +127,11 @@ void ITableDeclaration::check(const Names & column_names) const for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it) { if (columns_map.end() == columns_map.find(*it)) - throw Exception("There is no column with name " + *it + " in table " + getTableName() - + ". There are columns: " + listOfColumns(available_columns), + throw Exception("There is no column with name " + *it + " in table. There are columns: " + listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); if (unique_names.end() != unique_names.find(*it)) - throw Exception("Column " + *it + " queried more than once in table " + getTableName(), + throw Exception("Column " + *it + " queried more than once", ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE); unique_names.insert(*it); } @@ -160,14 +158,12 @@ void ITableDeclaration::check(const Block & block, bool need_all) const NamesAndTypesMap::const_iterator it = columns_map.find(column.name); if (columns_map.end() == it) - throw Exception("There is no column with name " + column.name + " in table " + getTableName() - + ". There are columns: " + listOfColumns(available_columns), - ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + throw Exception("There is no column with name " + column.name + ". There are columns: " + + listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); if (column.type->getName() != it->second->getName()) - throw Exception("Type mismatch for column " + column.name + " in table " + getTableName() - + ". Column has type " + it->second->getName() + ", got type " + column.type->getName(), - ErrorCodes::TYPE_MISMATCH); + throw Exception("Type mismatch for column " + column.name + ". Column has type " + + it->second->getName() + ", got type " + column.type->getName(), ErrorCodes::TYPE_MISMATCH); } if (need_all && names_in_block.size() < columns_map.size()) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index f19cb113dbb..ad23b76b5e7 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -67,6 +67,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( checkParts(); } + initVirtualParts(); loadQueue(); String unreplicated_path = full_path + "unreplicated/"; @@ -381,6 +382,15 @@ void StorageReplicatedMergeTree::checkParts() } } +void StorageReplicatedMergeTree::initVirtualParts() +{ + auto parts = data.getDataParts(); + for (const auto & part : parts) + { + virtual_parts.add(part->name); + } +} + void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops) { String another_replica = findReplicaHavingPart(part->name, false); @@ -503,7 +513,7 @@ void StorageReplicatedMergeTree::loadQueue() String s = zookeeper->get(replica_path + "/queue/" + child); LogEntry entry = LogEntry::parse(s); entry.znode_name = child; - entry.tagPartsAsCurrentlyMerging(*this); + entry.addResultToVirtualParts(*this); queue.push_back(entry); } } @@ -592,7 +602,7 @@ void StorageReplicatedMergeTree::pullLogsToQueue() String path_created = dynamic_cast((*results)[0]).getPathCreated(); entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); - entry.tagPartsAsCurrentlyMerging(*this); + entry.addResultToVirtualParts(*this); queue.push_back(entry); ++iterator.index; @@ -863,7 +873,6 @@ void StorageReplicatedMergeTree::queueThread() if (success) { - entry.currently_merging_tagger = nullptr; std::this_thread::sleep_for(QUEUE_AFTER_WORK_SLEEP); } else @@ -874,7 +883,6 @@ void StorageReplicatedMergeTree::queueThread() Poco::ScopedLock lock(queue_mutex); queue.push_back(entry); } - entry.currently_merging_tagger = nullptr; std::this_thread::sleep_for(QUEUE_ERROR_SLEEP); } } @@ -891,13 +899,35 @@ void StorageReplicatedMergeTree::mergeSelectingThread() try { size_t merges_queued = 0; + /// Есть ли в очереди мердж крупных кусков. + /// TODO: Если мердж уже выполняется, его нет в очереди, но здесь нужно все равно как-то о нем узнать. + bool has_big_merge = false; { Poco::ScopedLock lock(queue_mutex); for (const auto & entry : queue) + { if (entry.type == LogEntry::MERGE_PARTS) + { ++merges_queued; + + if (!has_big_merge) + { + for (const String & name : entry.parts_to_merge) + { + MergeTreeData::DataPartPtr part = data.getContainingPart(name); + if (!part || part->name != name) + continue; + if (part->size * data.index_granularity > 25 * 1024 * 1024) + { + has_big_merge = true; + break; + } + } + } + } + } } if (merges_queued >= data.settings.merging_threads) @@ -906,35 +936,9 @@ void StorageReplicatedMergeTree::mergeSelectingThread() continue; } - /// Есть ли активный мердж крупных кусков. - bool has_big_merge = false; - - { - Poco::ScopedLock lock(currently_merging_mutex); - - for (const auto & name : currently_merging) - { - MergeTreeData::DataPartPtr part = data.getContainingPart(name); - if (!part) - continue; - if (part->name != name) - { - LOG_INFO(log, "currently_merging contains obsolete part " << name << " contained in " << part->name); - continue; - } - if (part->size * data.index_granularity > 25 * 1024 * 1024) - { - has_big_merge = true; - break; - } - } - } - MergeTreeData::DataPartsVector parts; { - Poco::ScopedLock lock(currently_merging_mutex); - String merged_name; auto can_merge = std::bind( &StorageReplicatedMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2); @@ -963,7 +967,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() if (success) { /// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния. - /// (чтобы куски пометились как currently_merging). + /// (чтобы кусок добавился в virtual_parts). pullLogsToQueue(); String month_name = parts[0]->name.substr(0, 6); @@ -1021,7 +1025,9 @@ void StorageReplicatedMergeTree::clearOldBlocksThread() bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) { - if (currently_merging.count(left->name) || currently_merging.count(right->name)) + /// Если какой-то из кусков уже собираются слить в больший, не соглашаемся его сливать. + if (virtual_parts.getContainingPart(left->name) != left->name || + virtual_parts.getContainingPart(right->name) != right->name) return false; String month_name = left->name.substr(0, 6); @@ -1109,7 +1115,11 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin void StorageReplicatedMergeTree::shutdown() { if (permanent_shutdown_called) + { + if (restarting_thread.joinable()) + restarting_thread.join(); return; + } permanent_shutdown_called = true; restarting_thread.join(); } @@ -1152,9 +1162,24 @@ void StorageReplicatedMergeTree::goReadOnly() leader_election = nullptr; replica_is_active_node = nullptr; merger.cancelAll(); - is_leader_node = false; endpoint_holder = nullptr; + + LOG_TRACE(log, "Waiting for threads to finish"); + if (is_leader_node) + { + is_leader_node = false; + if (merge_selecting_thread.joinable()) + merge_selecting_thread.join(); + if (clear_old_blocks_thread.joinable()) + clear_old_blocks_thread.join(); + } + if (queue_updating_thread.joinable()) + queue_updating_thread.join(); + for (auto & thread : queue_threads) + thread.join(); + queue_threads.clear(); + LOG_TRACE(log, "Threads finished"); } void StorageReplicatedMergeTree::startup()