This commit is contained in:
Michael Kolupaev 2014-05-26 22:14:52 +04:00
parent f137d7b84d
commit c1b265d4ac
4 changed files with 105 additions and 21 deletions

View File

@ -61,7 +61,9 @@ public:
ActiveDataPartSet();
void add(const String & name);
String getContainingPart(const String & name);
String getContainingPart(const String & name) const;
Strings getParts() const;
static String getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level);
@ -74,7 +76,7 @@ public:
private:
typedef std::set<Part> Parts;
Poco::Mutex mutex;
mutable Poco::Mutex mutex;
Parts parts;
};

View File

@ -253,13 +253,12 @@ private:
/// Инициализация.
/** Проверяет, что в ZooKeeper в таблице нет данных.
*/
bool isTableEmpty();
/** Создает минимальный набор нод в ZooKeeper.
*/
void createTable();
/** Создает реплику в ZooKeeper и добавляет в очередь все, что нужно, чтобы догнать остальные реплики.
*/
void createReplica();
/** Отметить в ZooKeeper, что эта реплика сейчас активна.

View File

@ -41,7 +41,7 @@ void ActiveDataPartSet::add(const String & name)
parts.insert(part);
}
String ActiveDataPartSet::getContainingPart(const String & part_name)
String ActiveDataPartSet::getContainingPart(const String & part_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
@ -69,6 +69,19 @@ String ActiveDataPartSet::getContainingPart(const String & part_name)
return "";
}
Strings ActiveDataPartSet::getParts() const
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
Strings res;
for (const Part & part : parts)
{
res.push_back(part.name);
}
return res;
}
String ActiveDataPartSet::getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level)

View File

@ -55,9 +55,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
if (!zookeeper->exists(zookeeper_path))
createTable();
if (!isTableEmpty())
throw Exception("Can't add new replica to non-empty table", ErrorCodes::ADDING_REPLICA_TO_NON_EMPTY_TABLE);
checkTableStructure();
createReplica();
}
@ -197,12 +194,96 @@ void StorageReplicatedMergeTree::checkTableStructure()
void StorageReplicatedMergeTree::createReplica()
{
/** Запомним список других реплик.
* NOTE: Здесь есть race condition. Если почти одновременно добавить нескольких реплик, сразу же начиная в них писать,
* небольшая часть данных может не реплицироваться.
*/
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
/// Создадим пустую реплику.
zookeeper->create(replica_path, "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
/// Если таблица пуста, больше ничего делать не нужно.
if (replicas.empty())
return;
/// "Эталонная" реплика, у которой мы возьмем информацию о множестве кусков, очередь и указатели на логи.
String source_replica = replicas[0];
/** Дождемся, пока все активные реплики заметят появление этой реплики.
* Это не даст им удалять записи из своих логов, пока эта реплика их не скопирует.
*/
for (const String & replica : replicas)
{
bool active = true;
while(true)
{
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
{
active = false;
break;
}
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name))
break;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
/// Будем предпочитать активную реплику в качестве эталонной.
if (active)
source_replica = replica;
}
String source_path = zookeeper_path + "/replicas/" + source_replica;
/// Порядок следующих трех действий важен. Записи в логе могут продублироваться, но не могут потеряться.
/// Скопируем у эталонной реплики ссылки на все логи.
for (const String & replica : replicas)
{
String pointer = zookeeper->get(source_path + "/log_pointers/" + replica);
zookeeper->create(replica_path + "/log_pointers/" + replica, pointer, zkutil::CreateMode::Persistent);
}
/// Запомним очередь эталонной реплики.
Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
std::sort(source_queue_names.begin(), source_queue_names.end());
Strings source_queue;
for (const String & entry_name : source_queue_names)
{
String entry;
if (!zookeeper->tryGet(source_path + "/queue/" + entry_name, entry))
continue;
source_queue.push_back(entry);
}
/// Добавим в очередь задания на получение всех активных кусков, которые есть у эталонной реплики.
Strings parts = zookeeper->getChildren(source_path + "/parts");
ActiveDataPartSet active_parts_set;
for (const String & part : parts)
{
active_parts_set.add(part);
}
Strings active_parts = active_parts_set.getParts();
for (const String & name : active_parts)
{
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = name;
zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
}
/// Добавим в очередь содержимое очереди эталонной реплики.
for (const String & entry : source_queue)
{
zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential);
}
}
void StorageReplicatedMergeTree::activateReplica()
@ -241,17 +322,6 @@ void StorageReplicatedMergeTree::activateReplica()
replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", *zookeeper);
}
bool StorageReplicatedMergeTree::isTableEmpty()
{
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const auto & replica : replicas)
{
if (!zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts").empty())
return false;
}
return true;
}
void StorageReplicatedMergeTree::checkParts()
{
Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts");