This commit is contained in:
Michael Kolupaev 2014-04-14 14:56:06 +04:00
parent b9c4a3419a
commit ee6a647245
2 changed files with 48 additions and 13 deletions

View File

@ -246,9 +246,10 @@ private:
/// Поток, выбирающий куски для слияния. /// Поток, выбирающий куски для слияния.
std::thread merge_selecting_thread; std::thread merge_selecting_thread;
/// Поток, удаляющий информацию о старых блоках из ZooKeeper.
std::thread clear_old_blocks_thread;
/// Когда последний раз выбрасывали старые данные из ZooKeeper. /// Когда последний раз выбрасывали старые логи из ZooKeeper.
time_t clear_old_blocks_time = 0;
time_t clear_old_logs_time = 0; time_t clear_old_logs_time = 0;
Logger * log; Logger * log;
@ -347,6 +348,10 @@ private:
*/ */
void mergeSelectingThread(); void mergeSelectingThread();
/** В бесконечном цикле вызывает clearOldBlocks.
*/
void clearOldBlocksThread();
/// Вызывается во время выбора кусков для слияния. /// Вызывается во время выбора кусков для слияния.
bool canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right); bool canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right);

View File

@ -369,10 +369,15 @@ void StorageReplicatedMergeTree::clearOldBlocks()
if (!zookeeper.exists(zookeeper_path + "/blocks", &stat)) if (!zookeeper.exists(zookeeper_path + "/blocks", &stat))
throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE); throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int children_count = stat.getnumChildren();
/// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно. /// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно.
if (static_cast<double>(stat.getnumChildren()) < data.settings.replicated_deduplication_window * 1.1) if (static_cast<double>(children_count) < data.settings.replicated_deduplication_window * 1.1)
return; return;
LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - data.settings.replicated_deduplication_window
<< " old blocks from ZooKeeper");
Strings blocks = zookeeper.getChildren(zookeeper_path + "/blocks"); Strings blocks = zookeeper.getChildren(zookeeper_path + "/blocks");
std::vector<std::pair<Int64, String> > timed_blocks; std::vector<std::pair<Int64, String> > timed_blocks;
@ -384,10 +389,14 @@ void StorageReplicatedMergeTree::clearOldBlocks()
timed_blocks.push_back(std::make_pair(stat.getczxid(), block)); timed_blocks.push_back(std::make_pair(stat.getczxid(), block));
} }
std::sort(timed_blocks.begin(), timed_blocks.end()); std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>());
for (size_t i = data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i) for (size_t i = data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i)
{ {
zookeeper.remove(zookeeper_path + "/blocks/" + timed_blocks[i].second); zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
zookeeper.multi(ops);
} }
LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper"); LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
@ -882,21 +891,38 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
if (shutdown_called) if (shutdown_called || !is_leader_node)
break; break;
/// Каждую минуту выбрасываем старые блоки.
if (time(0) - clear_old_blocks_time > 60)
{
clear_old_blocks_time = time(0);
clearOldBlocks();
}
if (!success) if (!success)
std::this_thread::sleep_for(MERGE_SELECTING_SLEEP); std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
} }
} }
void StorageReplicatedMergeTree::clearOldBlocksThread()
{
while (!shutdown_called && is_leader_node)
{
try
{
clearOldBlocks();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// Спим минуту, но проверяем, нужно ли завершиться, каждую секунду.
/// TODO: Лучше во всех подобных местах использовать condition variable.
for (size_t i = 0; i < 60; ++i)
{
if (shutdown_called || !is_leader_node)
break;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{ {
if (currently_merging.count(left->name) || currently_merging.count(right->name)) if (currently_merging.count(left->name) || currently_merging.count(right->name))
@ -925,6 +951,7 @@ void StorageReplicatedMergeTree::becomeLeader()
LOG_INFO(log, "Became leader"); LOG_INFO(log, "Became leader");
is_leader_node = true; is_leader_node = true;
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this); merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
clear_old_blocks_thread = std::thread(&StorageReplicatedMergeTree::clearOldBlocksThread, this);
} }
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active) String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
@ -992,7 +1019,10 @@ void StorageReplicatedMergeTree::shutdown()
LOG_TRACE(log, "Waiting for threads to finish"); LOG_TRACE(log, "Waiting for threads to finish");
if (is_leader_node) if (is_leader_node)
{
merge_selecting_thread.join(); merge_selecting_thread.join();
clear_old_blocks_thread.join();
}
queue_updating_thread.join(); queue_updating_thread.join();
for (auto & thread : queue_threads) for (auto & thread : queue_threads)
thread.join(); thread.join();