dbms: replication: writers don't block ZK session restart [#METR-12572].

This commit is contained in:
Alexey Milovidov 2014-09-05 00:26:14 +04:00
parent f8e5ae8a02
commit d6d163b5dc
2 changed files with 14 additions and 17 deletions

View File

@ -17,12 +17,12 @@ public:
void write(const Block & block) override void write(const Block & block) override
{ {
assertSessionIsNotExpired();
auto part_blocks = storage.writer.splitBlockIntoParts(block); auto part_blocks = storage.writer.splitBlockIntoParts(block);
for (auto & current_block : part_blocks) for (auto & current_block : part_blocks)
{ {
if (storage.zookeeper->expired()) assertSessionIsNotExpired();
throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER);
/// TODO Можно ли здесь не блокировать структуру таблицы? /// TODO Можно ли здесь не блокировать структуру таблицы?
storage.data.delayInsertIfNeeded(&storage.restarting_event); storage.data.delayInsertIfNeeded(&storage.restarting_event);
@ -145,6 +145,14 @@ private:
size_t block_index; size_t block_index;
Logger * log; Logger * log;
/// Позволяет проверить, что сессия в ZooKeeper ещё жива.
void assertSessionIsNotExpired()
{
if (storage.zookeeper->expired())
throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER);
}
}; };
} }

View File

@ -2026,16 +2026,9 @@ void StorageReplicatedMergeTree::restartingThread()
{ {
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
{
/// Запретим писать в таблицу, пока подменяем zookeeper.
auto structure_lock = lockDataForAlter();
partialShutdown(); partialShutdown();
zookeeper = context.getZooKeeper(); zookeeper = context.getZooKeeper();
is_read_only = true; is_read_only = true;
}
while (!permanent_shutdown_called && !tryStartup()) while (!permanent_shutdown_called && !tryStartup())
restarting_event.tryWait(10 * 1000); restarting_event.tryWait(10 * 1000);
@ -2043,12 +2036,8 @@ void StorageReplicatedMergeTree::restartingThread()
if (permanent_shutdown_called) if (permanent_shutdown_called)
break; break;
{
auto structure_lock = lockDataForAlter();
is_read_only = false; is_read_only = false;
} }
}
restarting_event.tryWait(60 * 1000); restarting_event.tryWait(60 * 1000);
} }