diff --git a/dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h b/dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h index 70a942e9616..09e7a67c653 100644 --- a/dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h +++ b/dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h @@ -96,6 +96,11 @@ public: return UNLOCKED; } + static void createAbandonedIfNotExists(const String & path, zkutil::ZooKeeper & zookeeper) + { + zookeeper.createIfNotExists(path, ""); + } + private: zkutil::ZooKeeper & zookeeper; String path_prefix; diff --git a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h index 96630fbcdce..9543aa991e9 100644 --- a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h +++ b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h @@ -73,6 +73,8 @@ public: /// Кладет в DataPart данные из имени кусочка. static void parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches = nullptr); + static bool contains(const String & outer_part_name, const String & inner_part_name); + private: typedef std::set Parts; diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h index 67f4f0b2d2a..f0300a5ab8b 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h @@ -10,52 +10,19 @@ namespace DB { +class StorageReplicatedMergeTree; + class ReplicatedMergeTreePartsServer : public InterserverIOEndpoint { public: - ReplicatedMergeTreePartsServer(MergeTreeData & data_, StoragePtr owned_storage_) : data(data_), - owned_storage(owned_storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsServer)")) {} + ReplicatedMergeTreePartsServer(MergeTreeData & data_, StorageReplicatedMergeTree & storage_) : data(data_), + storage(storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsServer)")) {} - void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override - { - String part_name = params.get("part"); - LOG_TRACE(log, "Sending part " << part_name); - - auto storage_lock = owned_storage->lockStructure(false); - - MergeTreeData::DataPartPtr part = findPart(part_name); - - Poco::ScopedReadRWLock part_lock(part->columns_lock); - - /// Список файлов возьмем из списка контрольных сумм. - MergeTreeData::DataPart::Checksums checksums = part->checksums; - /// Добавим файлы, которых нет в списке контрольных сумм. - checksums.files["checksums.txt"]; - checksums.files["columns.txt"]; - - writeBinary(checksums.files.size(), out); - for (const auto & it : checksums.files) - { - String path = data.getFullPath() + part_name + "/" + it.first; - UInt64 size = Poco::File(path).getSize(); - - writeStringBinary(it.first, out); - writeBinary(size, out); - - ReadBufferFromFile file_in(path); - HashingWriteBuffer hashing_out(out); - copyData(file_in, hashing_out); - - if (hashing_out.count() != size) - throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART); - - writeBinary(hashing_out.getHash(), out); - } - } + void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override; private: MergeTreeData & data; - StoragePtr owned_storage; + StorageReplicatedMergeTree & storage; Logger * log; @@ -78,60 +45,7 @@ public: const String & part_name, const String & replica_path, const String & host, - int port) - { - ReadBufferFromHTTP::Params params = { - std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path), - std::make_pair("part", part_name), - std::make_pair("compress", "false")}; - ReadBufferFromHTTP in(host, port, params); - - String part_path = data.getFullPath() + "tmp_" + part_name + "/"; - if (!Poco::File(part_path).createDirectory()) - throw Exception("Directory " + part_path + " already exists"); - - MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); - new_data_part->name = "tmp_" + part_name; - new_data_part->is_temp = true; - - size_t files; - readBinary(files, in); - MergeTreeData::DataPart::Checksums checksums; - for (size_t i = 0; i < files; ++i) - { - String file_name; - UInt64 file_size; - - readStringBinary(file_name, in); - readBinary(file_size, in); - - WriteBufferFromFile file_out(part_path + file_name); - HashingWriteBuffer hashing_out(file_out); - copyData(in, hashing_out, file_size); - - uint128 expected_hash; - readBinary(expected_hash, in); - - if (expected_hash != hashing_out.getHash()) - throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path); - - if (file_name != "checksums.txt" && - file_name != "columns.txt") - checksums.addFile(file_name, file_size, expected_hash); - } - - assertEOF(in); - - ActiveDataPartSet::parsePartName(part_name, *new_data_part); - new_data_part->modification_time = time(0); - new_data_part->loadColumns(); - new_data_part->loadChecksums(); - new_data_part->loadIndex(); - - new_data_part->checksums.checkEqual(checksums, false); - - return new_data_part; - } + int port); private: MergeTreeData & data; diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index e79bbea0a99..30954786b7e 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -70,6 +70,9 @@ public: bool supportsIndexForIn() const override { return true; } + /// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке. + void enqueuePartForCheck(const String & name); + private: friend class ReplicatedMergeTreeBlockOutputStream; @@ -186,7 +189,7 @@ private: */ StringSet parts_to_check_set; StringList parts_to_check_queue; - Poco::FastMuterx parts_to_check_mutex; + Poco::FastMutex parts_to_check_mutex; Poco::Event parts_to_check_event; String database_name; @@ -326,8 +329,8 @@ private: */ void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops); - /// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке. - void enqueuePartForCheck(const String & name); + /// Убирает кусок из ZooKeeper и добавляет в очередь задание скачать его. Предполагается это делать с битыми кусками. + void removePartAndEnqueueFetch(const String & part_name); void clearOldParts(); diff --git a/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp b/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp index e6a116543df..8f6bd3f8eef 100644 --- a/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp +++ b/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp @@ -140,4 +140,12 @@ void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, con part.right_month = date_lut.toFirstDayNumOfMonth(part.right_date); } +bool ActiveDataPartSet::contains(const String & outer_part_name, const String & inner_part_name) +{ + Part outer, inner; + parsePartName(outer_part_name, outer); + parsePartName(inner_part_name, inner); + return outer.contains(inner); +} + } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index a392c4ae85d..9e1b6de2c06 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -532,6 +532,9 @@ void MergeTreeData::AlterDataPartTransaction::commit() mutable_part.size_in_bytes = MergeTreeData::DataPart::calcTotalSize(path); + /// TODO: можно не сбрасывать кеши при добавлении столбца. + data_part->storage.context.resetCaches(); + clear(); } catch (...) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartsExchange.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartsExchange.cpp new file mode 100644 index 00000000000..5a81a47cb58 --- /dev/null +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartsExchange.cpp @@ -0,0 +1,122 @@ +#include +#include + + +namespace DB +{ + +void ReplicatedMergeTreePartsServer::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) +{ + String part_name = params.get("part"); + LOG_TRACE(log, "Sending part " << part_name); + + try + { + auto storage_lock = storage.lockStructure(false); + + MergeTreeData::DataPartPtr part = findPart(part_name); + + Poco::ScopedReadRWLock part_lock(part->columns_lock); + + /// Список файлов возьмем из списка контрольных сумм. + MergeTreeData::DataPart::Checksums checksums = part->checksums; + /// Добавим файлы, которых нет в списке контрольных сумм. + checksums.files["checksums.txt"]; + checksums.files["columns.txt"]; + + MergeTreeData::DataPart::Checksums data_checksums; + + writeBinary(checksums.files.size(), out); + for (const auto & it : checksums.files) + { + String file_name = it.first; + + String path = data.getFullPath() + part_name + "/" + file_name; + UInt64 size = Poco::File(path).getSize(); + + writeStringBinary(it.first, out); + writeBinary(size, out); + + ReadBufferFromFile file_in(path); + HashingWriteBuffer hashing_out(out); + copyData(file_in, hashing_out); + + if (hashing_out.count() != size) + throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART); + + writeBinary(hashing_out.getHash(), out); + + if (file_name != "checksums.txt" && + file_name != "columns.txt") + checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash()); + } + + part->checksums.checkEqual(checksums, false); + } + catch (...) + { + storage.enqueuePartForCheck(part_name); + throw; + } +} + +MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart( + const String & part_name, + const String & replica_path, + const String & host, + int port) +{ + ReadBufferFromHTTP::Params params = { + std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path), + std::make_pair("part", part_name), + std::make_pair("compress", "false")}; + ReadBufferFromHTTP in(host, port, params); + + String part_path = data.getFullPath() + "tmp_" + part_name + "/"; + if (!Poco::File(part_path).createDirectory()) + throw Exception("Directory " + part_path + " already exists"); + + MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); + new_data_part->name = "tmp_" + part_name; + new_data_part->is_temp = true; + + size_t files; + readBinary(files, in); + MergeTreeData::DataPart::Checksums checksums; + for (size_t i = 0; i < files; ++i) + { + String file_name; + UInt64 file_size; + + readStringBinary(file_name, in); + readBinary(file_size, in); + + WriteBufferFromFile file_out(part_path + file_name); + HashingWriteBuffer hashing_out(file_out); + copyData(in, hashing_out, file_size); + + uint128 expected_hash; + readBinary(expected_hash, in); + + if (expected_hash != hashing_out.getHash()) + throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path); + + if (file_name != "checksums.txt" && + file_name != "columns.txt") + checksums.addFile(file_name, file_size, expected_hash); + } + + assertEOF(in); + + ActiveDataPartSet::parsePartName(part_name, *new_data_part); + new_data_part->modification_time = time(0); + new_data_part->loadColumns(); + new_data_part->loadChecksums(); + new_data_part->loadIndex(); + + new_data_part->checksums.checkEqual(checksums, false); + + return new_data_part; +} + +} diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 49b2992d59c..9786eeed435 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -119,7 +120,7 @@ StoragePtr StorageReplicatedMergeTree::create( if (!res->is_read_only) { String endpoint_name = "ReplicatedMergeTree:" + res->replica_path; - InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr); + InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, *res); res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler()); } return res_ptr; @@ -515,15 +516,6 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP zkutil::CreateMode::Persistent)); } -void StorageReplicatedMergeTree::enqueuePartForCheck(const String & name) -{ - Poco::ScopedLock lock(parts_to_check_mutex); - if (parts_to_check_set.count(name)) - return; - parts_to_check_queue.push_back(name); - parts_to_check_set.insert(name); -} - void StorageReplicatedMergeTree::clearOldParts() { Strings parts = data.clearOldParts(); @@ -1091,10 +1083,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() /// Уберем больше не нужные отметки о несуществующих блоках. for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number) { - String number_str = toString(number); - while (number_str.size() < 10) - number_str = '0' + number_str; - String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str; + String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number); zookeeper->tryRemove(path); } @@ -1257,6 +1246,45 @@ void StorageReplicatedMergeTree::alterThread() LOG_DEBUG(log, "alter thread finished"); } +void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name) +{ + String part_path = replica_path + "/parts/" + part_name; + + LogEntry log_entry; + log_entry.type = LogEntry::GET_PART; + log_entry.source_replica = ""; + log_entry.new_part_name = part_name; + + zkutil::Ops ops; + ops.push_back(new zkutil::Op::Create( + replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(), + zkutil::CreateMode::PersistentSequential)); + ops.push_back(new zkutil::Op::Remove(part_path + "/checksums", -1)); + ops.push_back(new zkutil::Op::Remove(part_path + "/columns", -1)); + ops.push_back(new zkutil::Op::Remove(part_path, -1)); + auto results = zookeeper->multi(ops); + + { + Poco::ScopedLock lock(queue_mutex); + + String path_created = dynamic_cast(ops[0]).getPathCreated(); + log_entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); + log_entry.addResultToVirtualParts(*this); + queue.push_back(log_entry); + } +} + +void StorageReplicatedMergeTree::enqueuePartForCheck(const String & name) +{ + Poco::ScopedLock lock(parts_to_check_mutex); + + if (parts_to_check_set.count(name)) + return; + parts_to_check_queue.push_back(name); + parts_to_check_set.insert(name); + parts_to_check_event.set(); +} + void StorageReplicatedMergeTree::partCheckThread() { while (!shutdown_called) @@ -1300,39 +1328,80 @@ void StorageReplicatedMergeTree::partCheckThread() LOG_WARNING(log, "Part " << part_name << " exists in ZooKeeper but not locally. " "Removing from ZooKeeper and queueing a fetch."); - LogEntry log_entry; - log_entry.type = LogEntry::GET_PART; - log_entry.source_replica = ""; - log_entry.new_part_name = part_name; - - zkutil::Ops ops; - ops.push_back(new zkutil::Op::Create( - replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(), - zkutil::CreateMode::PersistentSequential)); - ops.push_back(new zkutil::Op::Remove(part_path + "/checksums", -1)); - ops.push_back(new zkutil::Op::Remove(part_path + "/columns", -1)); - ops.push_back(new zkutil::Op::Remove(part_path, -1)); - auto results = zookeeper->multi(ops); - - { - Poco::ScopedLock lock(queue_mutex); - - String path_created = dynamic_cast(ops[0]).getPathCreated(); - log_entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); - log_entry.addResultToVirtualParts(*this); - queue.push_back(entry); - } + removePartAndEnqueueFetch(part_name); } /// Если куска нет в ZooKeeper, проверим есть ли он хоть у кого-то. else { LOG_WARNING(log, "Checking if anyone has part covering " << part_name << "."); - asdqwe; - /// Если ни у кого нет такого куска, удалим его из нашей очереди и добавим его в block_numbers. - //не получится надежно удалить из очереди :( Можно попробовать полагаться на block_numbers, но их могут удалить - LOG_ERROR(log, - //asdqwe; + bool found = false; + Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); + for (const String & replica : replicas) + { + Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts"); + for (const String & part_on_replica : parts) + { + if (ActiveDataPartSet::contains(part_on_replica, part_name)) + { + found = true; + LOG_WARNING(log, "Found part " << part_on_replica << " on " << replica); + break; + } + } + if (found) + break; + } + + if (!found) + { + /** Такая ситуация возможна при нормальной работе, без потери данных, например, так: + * ReplicatedMergeTreeBlockOutputStream записал кусок, попытался добавить его в ZK, + * получил operation timeout, удалил локальный кусок и бросил исключение, + * а на самом деле, кусок добавился в ZK. + */ + LOG_ERROR(log, "No replica has part covering " << part_name << ". This part is lost forever. " + << "There might or might not be a data loss."); + + /// Если ни у кого нет такого куска, удалим его из нашей очереди и добавим его в block_numbers. + + String month_name = part_name.substr(0, 6); + zookeeper->createIfNotExists(zookeeper_path + "/block_numbers/" + month_name, ""); + + ActiveDataPartSet::Part part_info; + ActiveDataPartSet::parsePartName(part_name, part_info); + + if (part_info.left != part_info.right) + LOG_ERROR(log, "Lost part " << part_name << " is a result of a merge. " + "This means some data is definitely lost (or there's a bug)."); + + for (size_t index = part_info.left; index <= part_info.right; ++index) + { + AbandonableLockInZooKeeper::createAbandonedIfNotExists( + zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(index), *zookeeper); + } + + { + Poco::ScopedLock lock(queue_mutex); + + /** NOTE: Не удалятся записи в очереди, которые сейчас выполняются. + * Они пофейлятся и положат кусок снова в очередь на проверку. + * Расчитываем, что это редкая ситуация. + */ + for (LogEntries::iterator it = queue.begin(); it != queue.end(); ) + { + if (it->new_part_name == part_name) + { + zookeeper->remove(replica_path + "/queue/" + it->znode_name); + queue.erase(it++); + } + else + { + ++it; + } + } + } + } } } /// У нас есть этот кусок, и он активен. @@ -1341,11 +1410,35 @@ void StorageReplicatedMergeTree::partCheckThread() /// Если кусок есть в ZooKeeper, сверим его данные с его чексуммами, а их с ZooKeeper. if (zookeeper->exists(replica_path + "/parts/" + part_name)) { - asdqwe; + LOG_WARNING(log, "Checking data of part " << part_name << "."); - /// Если кусок сломан, одновременно удалим его из ZK и добавим в очередь задание забрать этот кусок у другой реплики. - /// И удалим кусок локально. - asdqwe; + try + { + auto zk_checksums = MergeTreeData::DataPart::Checksums::parse( + zookeeper->get(replica_path + "/parts/" + part_name + "/checksums")); + zk_checksums.checkEqual(part->checksums, true); + + auto zk_columns = NamesAndTypesList::parse( + zookeeper->get(replica_path + "/parts/" + part_name + "/columns"), context.getDataTypeFactory()); + if (part->columns != zk_columns) + throw Exception("Columns of local part " + part_name + " are different from ZooKeeper"); + + MergeTreePartChecker::checkDataPart( + data.getFullPath() + part_name, data.index_granularity, true, context.getDataTypeFactory()); + + LOG_INFO(log, "Part " << part_name << " looks good."); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + + LOG_INFO(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch."); + + removePartAndEnqueueFetch(part_name); + + /// Удалим кусок локально. + data.deletePart(part); + } } /// Если куска нет в ZooKeeper, удалим его локально. else @@ -1353,11 +1446,12 @@ void StorageReplicatedMergeTree::partCheckThread() /// Если этот кусок еще и получен в результате слияния, это уже чересчур странно. if (part->left != part->right) { - LOG_ERROR(log, ); + LOG_ERROR(log, "Unexpected part " << part_name << " is a result of a merge. You have to resolve this manually."); } else { - asdqwe; + LOG_ERROR(log, "Unexpected part " << part_name << ". Removing."); + data.deletePart(part); } } } @@ -1407,10 +1501,7 @@ bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr /// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам. for (UInt64 number = left->right + 1; number <= right->left - 1; ++number) { - String number_str = toString(number); - while (number_str.size() < 10) - number_str = '0' + number_str; - String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str; + String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number); if (AbandonableLockInZooKeeper::check(path, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED) { @@ -1512,6 +1603,7 @@ void StorageReplicatedMergeTree::partialShutdown() queue_updating_event->set(); alter_thread_event->set(); alter_query_event->set(); + parts_to_check_event.set(); replica_is_active_node = nullptr; merger.cancelAll(); @@ -1531,6 +1623,8 @@ void StorageReplicatedMergeTree::partialShutdown() cleanup_thread.join(); if (alter_thread.joinable()) alter_thread.join(); + if (part_check_thread.joinable()) + part_check_thread.join(); if (queue_task_handle) context.getBackgroundPool().removeTask(queue_task_handle); queue_task_handle.reset(); @@ -1565,6 +1659,7 @@ void StorageReplicatedMergeTree::startup() queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this); cleanup_thread = std::thread(&StorageReplicatedMergeTree::cleanupThread, this); alter_thread = std::thread(&StorageReplicatedMergeTree::alterThread, this); + part_check_thread = std::thread(&StorageReplicatedMergeTree::partCheckThread, this); queue_task_handle = context.getBackgroundPool().addTask( std::bind(&StorageReplicatedMergeTree::queueTask, this, std::placeholders::_1)); }