This commit is contained in:
Michael Kolupaev 2014-07-04 17:59:05 +04:00
parent d00dc44397
commit 7362ca686f
2 changed files with 15 additions and 13 deletions

View File

@ -233,8 +233,10 @@ private:
/// Нужно ли завершить фоновые потоки (кроме restarting_thread).
volatile bool shutdown_called = false;
Poco::Event shutdown_event;
/// Нужно ли завершить restarting_thread.
volatile bool permanent_shutdown_called = false;
Poco::Event permanent_shutdown_event;
StorageReplicatedMergeTree(
const String & zookeeper_path_,

View File

@ -10,7 +10,7 @@ namespace DB
{
const auto QUEUE_UPDATE_SLEEP = std::chrono::seconds(5);
const auto QUEUE_UPDATE_SLEEP_MS = 5 * 1000;
const auto QUEUE_NO_WORK_SLEEP = std::chrono::seconds(5);
const auto QUEUE_ERROR_SLEEP = std::chrono::seconds(1);
const auto QUEUE_AFTER_WORK_SLEEP = std::chrono::seconds(0);
@ -38,7 +38,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name),
reader(data), writer(data), merger(data), fetcher(data),
log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)"))
log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")),
shutdown_event(false), permanent_shutdown_event(false)
{
if (!zookeeper)
{
@ -912,7 +913,7 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
std::this_thread::sleep_for(QUEUE_UPDATE_SLEEP);
shutdown_event.tryWait(QUEUE_UPDATE_SLEEP_MS);
}
}
@ -1105,14 +1106,7 @@ void StorageReplicatedMergeTree::clearOldBlocksThread()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// Спим минуту, но проверяем, нужно ли завершиться, каждую секунду.
/// TODO: Лучше во всех подобных местах использовать condition variable.
for (size_t i = 0; i < 60; ++i)
{
if (shutdown_called || !is_leader_node)
break;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
shutdown_event.tryWait(60 * 1000);
}
}
@ -1223,6 +1217,7 @@ void StorageReplicatedMergeTree::shutdown()
return;
}
permanent_shutdown_called = true;
permanent_shutdown_event.set();
restarting_thread.join();
}
@ -1230,6 +1225,7 @@ void StorageReplicatedMergeTree::partialShutdown()
{
leader_election = nullptr;
shutdown_called = true;
shutdown_event.set();
replica_is_active_node = nullptr;
merger.cancelAll();
@ -1258,8 +1254,10 @@ void StorageReplicatedMergeTree::goReadOnly()
LOG_INFO(log, "Going to read-only mode");
is_read_only = true;
shutdown_called = true;
permanent_shutdown_called = true;
permanent_shutdown_event.set();
shutdown_called = true;
shutdown_event.set();
leader_election = nullptr;
replica_is_active_node = nullptr;
@ -1271,6 +1269,7 @@ void StorageReplicatedMergeTree::goReadOnly()
if (is_leader_node)
{
is_leader_node = false;
merge_selecting_event.set();
if (merge_selecting_thread.joinable())
merge_selecting_thread.join();
if (clear_old_blocks_thread.joinable())
@ -1286,6 +1285,7 @@ void StorageReplicatedMergeTree::goReadOnly()
void StorageReplicatedMergeTree::startup()
{
shutdown_called = false;
shutdown_event.reset();
merger.uncancelAll();
if (unreplicated_merger)
@ -1324,7 +1324,7 @@ void StorageReplicatedMergeTree::restartingThread()
startup();
}
std::this_thread::sleep_for(std::chrono::seconds(2));
permanent_shutdown_event.tryWait(60 * 1000);
}
}
catch (...)