From 8142941b422302245844f69c666e8208d312137d Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Fri, 25 Jul 2014 16:11:06 +0400 Subject: [PATCH] Merge --- .../Storages/StorageReplicatedMergeTree.cpp | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index a401736bed6..ca48f791875 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -163,6 +163,7 @@ void StorageReplicatedMergeTree::createTable() zookeeper->create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent); zookeeper->create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent); zookeeper->create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent); + zookeeper->create(zookeeper_path + "/flags", "", zkutil::CreateMode::Persistent); /// Создадим replicas в последнюю очередь, чтобы нельзя было добавить реплику, пока все остальные ноды не созданы. zookeeper->create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent); } @@ -582,8 +583,6 @@ void StorageReplicatedMergeTree::clearOldLogs() size_t removed = 0; zkutil::Ops ops; - /// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик. - ops.push_back(new zkutil::Op::Check(zookeeper_path + "/replicas", stat.version)); for (const String & entry : entries) { UInt64 index = parse(entry.substr(strlen("log-"))); @@ -591,13 +590,24 @@ void StorageReplicatedMergeTree::clearOldLogs() break; ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/log/" + entry, -1)); ++removed; + if (ops.size() > 400) + { + /// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик. + ops.push_back(new zkutil::Op::Check(zookeeper_path + "/replicas", stat.version)); + zookeeper->multi(ops); + ops.clear(); + } + } + + if (!ops.empty()) + { + ops.push_back(new zkutil::Op::Check(zookeeper_path + "/replicas", stat.version)); + zookeeper->multi(ops); } if (removed == 0) return; - zookeeper->multi(ops); - LOG_DEBUG(log, "Removed " << removed << " old log entries"); } @@ -627,16 +637,22 @@ void StorageReplicatedMergeTree::clearOldBlocks() timed_blocks.push_back(std::make_pair(stat.czxid, block)); } + zkutil::Ops ops; std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater>()); for (size_t i = data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i) { - zkutil::Ops ops; ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1)); ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns", -1)); ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1)); ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1)); - zookeeper->multi(ops); + if (ops.size() > 400) + { + zookeeper->multi(ops); + ops.clear(); + } } + if (!ops.empty()) + zookeeper->multi(ops); LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper"); }