This commit is contained in:
Michael Kolupaev 2014-07-25 16:11:06 +04:00
parent 13b9392241
commit 8142941b42

View File

@ -163,6 +163,7 @@ void StorageReplicatedMergeTree::createTable()
zookeeper->create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent); zookeeper->create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent); zookeeper->create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent); zookeeper->create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/flags", "", zkutil::CreateMode::Persistent);
/// Создадим replicas в последнюю очередь, чтобы нельзя было добавить реплику, пока все остальные ноды не созданы. /// Создадим replicas в последнюю очередь, чтобы нельзя было добавить реплику, пока все остальные ноды не созданы.
zookeeper->create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent); zookeeper->create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
} }
@ -582,8 +583,6 @@ void StorageReplicatedMergeTree::clearOldLogs()
size_t removed = 0; size_t removed = 0;
zkutil::Ops ops; zkutil::Ops ops;
/// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик.
ops.push_back(new zkutil::Op::Check(zookeeper_path + "/replicas", stat.version));
for (const String & entry : entries) for (const String & entry : entries)
{ {
UInt64 index = parse<UInt64>(entry.substr(strlen("log-"))); UInt64 index = parse<UInt64>(entry.substr(strlen("log-")));
@ -591,13 +590,24 @@ void StorageReplicatedMergeTree::clearOldLogs()
break; break;
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/log/" + entry, -1)); ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/log/" + entry, -1));
++removed; ++removed;
if (ops.size() > 400)
{
/// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик.
ops.push_back(new zkutil::Op::Check(zookeeper_path + "/replicas", stat.version));
zookeeper->multi(ops);
ops.clear();
}
}
if (!ops.empty())
{
ops.push_back(new zkutil::Op::Check(zookeeper_path + "/replicas", stat.version));
zookeeper->multi(ops);
} }
if (removed == 0) if (removed == 0)
return; return;
zookeeper->multi(ops);
LOG_DEBUG(log, "Removed " << removed << " old log entries"); LOG_DEBUG(log, "Removed " << removed << " old log entries");
} }
@ -627,16 +637,22 @@ void StorageReplicatedMergeTree::clearOldBlocks()
timed_blocks.push_back(std::make_pair(stat.czxid, block)); timed_blocks.push_back(std::make_pair(stat.czxid, block));
} }
zkutil::Ops ops;
std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>()); std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>());
for (size_t i = data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i) for (size_t i = data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i)
{ {
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1)); ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns", -1)); ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1)); ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1)); ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
zookeeper->multi(ops); if (ops.size() > 400)
{
zookeeper->multi(ops);
ops.clear();
}
} }
if (!ops.empty())
zookeeper->multi(ops);
LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper"); LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
} }