diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 434ee375989..c8e9da0bb27 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -251,6 +251,7 @@ namespace ErrorCodes MEMORY_LIMIT_EXCEEDED, TABLE_IS_READ_ONLY, NOT_ENOUGH_SPACE, + UNEXPECTED_ZOOKEEPER_ERROR, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index bd733476768..adc6db768ed 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -428,6 +428,10 @@ public: */ void renameAndDetachPart(DataPartPtr part, const String & prefix); + /** Убрать кусок из рабочего набора. Его данные удалятся при вызове clearOldParts, когда их перестанут читать. + */ + void deletePart(DataPartPtr part); + /** Удалить неактуальные куски. Возвращает имена удаленных кусков. */ Strings clearOldParts(); diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 2f31493c2d1..7bcff41e9e5 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -45,22 +45,6 @@ public: LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id); - String expected_checksums_str; - if (!block_id.empty() && storage.zookeeper->tryGet( - storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str)) - { - LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it"); - - /// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять. - auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str); - expected_checksums.checkEqual(part->checksums, true); - - part->remove(); - - /// Бросаем block_number_lock. - continue; - } - storage.data.renameTempPartAndAdd(part); StorageReplicatedMergeTree::LogEntry log_entry; @@ -96,7 +80,40 @@ public: zkutil::CreateMode::PersistentSequential)); block_number_lock.getUnlockOps(ops); - storage.zookeeper->multi(ops); + auto code = storage.zookeeper->tryMulti(ops); + if (code != ZOK) + { + if (code == ZNODEEXISTS) + { + /// Если блок с таким ID уже есть в таблице, не будем его вставлять, и удалим только что записанные данные. + /// NOTE: В короткое время между renameTempPartAndAdd и deletePart в таблице на этой реплике доступны + /// продублированные данные. + String expected_checksums_str; + if (!block_id.empty() && storage.zookeeper->tryGet( + storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str)) + { + LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it"); + + auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str); + auto found_checksums = part->checksums; + + storage.data.deletePart(part); + + /// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение. + expected_checksums.checkEqual(part->checksums, true); + } + else + { + throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": " + + zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); + } + } + else + { + throw Exception("Unexpected error while adding block " + toString(part_number) + " with ID " + block_id + ": " + + zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); + } + } } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 402d76af108..573ab857ee9 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -678,6 +678,12 @@ void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix) part->renameAddPrefix(prefix); } +void MergeTreeData::deletePart(DataPartPtr part) +{ + Poco::ScopedLock lock(data_parts_mutex); + data_parts.erase(part); +} + MergeTreeData::DataParts MergeTreeData::getDataParts() { Poco::ScopedLock lock(data_parts_mutex);