From 0b7928f6a9bbfe42959d2ef37181fcb0ac4ea3a4 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Wed, 7 May 2014 17:58:20 +0400 Subject: [PATCH] Merge --- .../ReplicatedMergeTreeBlockOutputStream.h | 7 +++- .../Storages/StorageReplicatedMergeTree.cpp | 34 +++++++++++-------- libs/libzkutil/include/zkutil/ZooKeeper.h | 1 + libs/libzkutil/src/ZooKeeper.cpp | 6 ++++ 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 8a2eece1c45..edf2f2152ff 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB @@ -20,9 +21,13 @@ public: { ++block_index; String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index); + time_t min_date_time = DateLUTSingleton::instance().fromDayNum(DayNum_t(current_block.min_date)); + String month_name = toString(Date2OrderedIdentifier(min_date_time) / 100); + + storage.zookeeper.tryCreate(storage.zookeeper_path + "/block_numbers/" + month_name, "", zkutil::CreateMode::Persistent); AbandonableLockInZooKeeper block_number_lock( - storage.zookeeper_path + "/block_numbers/block-", + storage.zookeeper_path + "/block_numbers/" + month_name + "/block-", storage.zookeeper_path + "/temp", storage.zookeeper); UInt64 part_number = block_number_lock.getNumber(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index c523e571435..022ed361416 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -869,21 +869,25 @@ void StorageReplicatedMergeTree::mergeSelectingThread() } } - /// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния. - /// (чтобы куски пометились как currently_merging). - pullLogsToQueue(); - - for (size_t i = 0; i + 1 < parts.size(); ++i) + if (success) { - /// Уберем больше не нужные отметки о несуществующих блоках. - for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number) - { - String number_str = toString(number); - while (number_str.size() < 10) - number_str = '0' + number_str; - String path = zookeeper_path + "/block_numbers/block-" + number_str; + /// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния. + /// (чтобы куски пометились как currently_merging). + pullLogsToQueue(); - zookeeper.tryRemove(path); + String month_name = parts[0]->name.substr(0, 6); + for (size_t i = 0; i + 1 < parts.size(); ++i) + { + /// Уберем больше не нужные отметки о несуществующих блоках. + for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number) + { + String number_str = toString(number); + while (number_str.size() < 10) + number_str = '0' + number_str; + String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str; + + zookeeper.tryRemove(path); + } } } } @@ -929,13 +933,15 @@ bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr if (currently_merging.count(left->name) || currently_merging.count(right->name)) return false; + String month_name = left->name.substr(0, 6); + /// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам. for (UInt64 number = left->right + 1; number <= right->left - 1; ++number) { String number_str = toString(number); while (number_str.size() < 10) number_str = '0' + number_str; - String path = zookeeper_path + "/block_numbers/block-" + number_str; + String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str; if (AbandonableLockInZooKeeper::check(path, zookeeper) != AbandonableLockInZooKeeper::ABANDONED) { diff --git a/libs/libzkutil/include/zkutil/ZooKeeper.h b/libs/libzkutil/include/zkutil/ZooKeeper.h index 55abf4401e0..b5649b8d398 100644 --- a/libs/libzkutil/include/zkutil/ZooKeeper.h +++ b/libs/libzkutil/include/zkutil/ZooKeeper.h @@ -61,6 +61,7 @@ public: * При остальных ошибках бросает исключение. */ ReturnCode::type tryCreate(const std::string & path, const std::string & data, CreateMode::type mode, std::string & pathCreated); + ReturnCode::type tryCreate(const std::string & path, const std::string & data, CreateMode::type mode); /** Удалить ноду, если ее версия равна version (если -1, подойдет любая версия). */ diff --git a/libs/libzkutil/src/ZooKeeper.cpp b/libs/libzkutil/src/ZooKeeper.cpp index f332265dc73..b614df136e9 100644 --- a/libs/libzkutil/src/ZooKeeper.cpp +++ b/libs/libzkutil/src/ZooKeeper.cpp @@ -195,6 +195,12 @@ ReturnCode::type ZooKeeper::tryCreate(const std::string & path, const std::strin return code; } +ReturnCode::type ZooKeeper::tryCreate(const std::string & path, const std::string & data, CreateMode::type mode) +{ + std::string path_created; + return tryCreate(path, data, mode, path_created); +} + void ZooKeeper::remove(const std::string & path, int32_t version) { checkNotExpired();