This commit is contained in:
Michael Kolupaev 2014-07-15 18:37:49 +04:00
parent a6e31b55d5
commit 86fb1eee37
4 changed files with 119 additions and 171 deletions

View File

@ -106,8 +106,12 @@ struct MergeTreeSettings
/// Таким образом, скорость вставок автоматически замедлится примерно до скорости слияний.
double insert_delay_step = 1.1;
/// Для скольки блоков, вставленных с непустым insert ID, хранить хеши в ZooKeeper.
/// Для скольки последних блоков хранить хеши в ZooKeeper.
size_t replicated_deduplication_window = 10000;
/// Хранить примерно столько последних записей в логе в ZooKeeper, даже если они никому уже не нужны.
/// Не влияет на работу таблиц; используется только чтобы успеть посмотреть на лог в ZooKeeper глазами прежде, чем его очистят.
size_t replicated_logs_to_keep = 100;
};
class MergeTreeData : public ITableDeclaration

View File

@ -80,7 +80,7 @@ public:
}
storage.checkPartAndAddToZooKeeper(part, ops);
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/log/log-",
storage.zookeeper_path + "/log/log-",
log_entry.toString(),
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::PersistentSequential));

View File

@ -218,6 +218,7 @@ private:
/// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь.
std::thread queue_updating_thread;
zkutil::EventPtr queue_updating_event = zkutil::EventPtr(new Poco::Event);
/// Задание, выполняющее действия из очереди.
BackgroundProcessingPool::TaskHandle queue_task_handle;
@ -226,8 +227,8 @@ private:
std::thread merge_selecting_thread;
Poco::Event merge_selecting_event;
/// Поток, удаляющий информацию о старых блоках из ZooKeeper.
std::thread clear_old_blocks_thread;
/// Поток, удаляющий старые куски, записи в логе и блоки.
std::thread cleanup_thread;
/// Поток, обрабатывающий переподключение к ZooKeeper при истечении сессии (очень маловероятное событие).
std::thread restarting_thread;
@ -323,8 +324,9 @@ private:
void loadQueue();
/** Копирует новые записи из логов всех реплик в очередь этой реплики.
* Если next_update_event != nullptr, вызовет это событие, когда в логе появятся новые записи.
*/
void pullLogsToQueue();
void pullLogsToQueue(zkutil::EventPtr next_update_event = nullptr);
/** Можно ли сейчас попробовать выполнить это действие. Если нет, нужно оставить его в очереди и попробовать выполнить другое.
* Вызывается под queue_mutex.
@ -353,7 +355,7 @@ private:
/** В бесконечном цикле вызывает clearOldBlocks.
*/
void clearOldBlocksThread();
void cleanupThread();
/** В бесконечном цикле проверяет, не протухла ли сессия в ZooKeeper.
*/

View File

@ -11,11 +11,9 @@ namespace DB
{
const auto QUEUE_UPDATE_SLEEP_MS = 5 * 1000;
const auto QUEUE_NO_WORK_SLEEP = std::chrono::seconds(5);
const auto QUEUE_ERROR_SLEEP = std::chrono::seconds(1);
const auto QUEUE_AFTER_WORK_SLEEP = std::chrono::seconds(0);
const auto ERROR_SLEEP_MS = 1000;
const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
const auto CLEANUP_SLEEP_MS = 30 * 1000;
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
@ -155,11 +153,12 @@ void StorageReplicatedMergeTree::createTable()
zookeeper->create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/columns", data.getColumnsList().toString(), zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/log", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
/// Создадим replicas в последнюю очередь, чтобы нельзя было добавить реплику, пока все эти ноды не созданы.
/// Создадим replicas в последнюю очередь, чтобы нельзя было добавить реплику, пока все остальные ноды не созданы.
zookeeper->create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
}
@ -222,47 +221,23 @@ void StorageReplicatedMergeTree::createReplica()
/// Создадим пустую реплику.
zookeeper->create(replica_path, "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
/// Если таблица пуста, больше ничего делать не нужно.
/** Нужно изменить данные ноды /replicas на что угодно, чтобы поток, удаляющий старые записи в логе,
* споткнулся об это изменение и не удалил записи, которые мы еще не прочитали.
*/
zookeeper->set(zookeeper_path + "/replicas", "last added replica: " + replica_name);
if (replicas.empty())
{
LOG_DEBUG(log, "No other replicas");
return;
}
/// "Эталонная" реплика, у которой мы возьмем информацию о множестве кусков, очередь и указатели на логи.
String source_replica = replicas[0];
/** Дождемся, пока все активные реплики заметят появление этой реплики.
* Это не даст им удалять записи из своих логов, пока эта реплика их не скопирует.
*/
for (const String & replica : replicas)
{
LOG_DEBUG(log, "Waiting for " << replica << " to acknowledge me");
bool active = true;
while(true)
{
zkutil::EventPtr event = new Poco::Event;
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active", nullptr, event))
{
active = false;
break;
}
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, nullptr, event))
break;
event->wait();
}
/// Будем предпочитать активную реплику в качестве эталонной.
if (active)
source_replica = replica;
}
/// "Эталонная" реплика, у которой мы возьмем информацию о множестве кусков, очередь и указатель на лог.
String source_replica = replicas[rand() % replicas.size()];
LOG_INFO(log, "Will mimic " << source_replica);
@ -270,13 +245,8 @@ void StorageReplicatedMergeTree::createReplica()
/// Порядок следующих трех действий важен. Записи в логе могут продублироваться, но не могут потеряться.
/// Скопируем у эталонной реплики ссылки на все логи.
for (const String & replica : replicas)
{
String pointer;
if (zookeeper->tryGet(source_path + "/log_pointers/" + replica, pointer))
zookeeper->create(replica_path + "/log_pointers/" + replica, pointer, zkutil::CreateMode::Persistent);
}
/// Скопируем у эталонной реплики ссылку на лог.
zookeeper->set(replica_path + "/log_pointer", zookeeper->get(source_path + "/log_pointer"));
/// Запомним очередь эталонной реплики.
Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
@ -565,30 +535,47 @@ void StorageReplicatedMergeTree::clearOldParts()
void StorageReplicatedMergeTree::clearOldLogs()
{
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
zkutil::Stat stat;
if (!zookeeper->exists(zookeeper_path + "/log", &stat))
throw Exception(zookeeper_path + "/log doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int children_count = stat.numChildren;
/// Будем ждать, пока накопятся в 1.1 раза больше записей, чем нужно.
if (static_cast<double>(children_count) < data.settings.replicated_logs_to_keep * 1.1)
return;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
UInt64 min_pointer = std::numeric_limits<UInt64>::max();
for (const String & replica : replicas)
{
String pointer;
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, pointer))
String pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer");
if (pointer.empty())
return;
min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
}
Strings entries = zookeeper->getChildren(replica_path + "/log");
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
std::sort(entries.begin(), entries.end());
size_t removed = 0;
zkutil::Ops ops;
/// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик.
ops.push_back(new zkutil::Op::Check(zookeeper_path + "/replicas", stat.version));
for (const String & entry : entries)
{
UInt64 index = parse<UInt64>(entry.substr(strlen("log-")));
if (index >= min_pointer)
break;
zookeeper->remove(replica_path + "/log/" + entry);
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/log/" + entry, -1));
++removed;
}
if (removed > 0)
if (removed == 0)
return;
zookeeper->multi(ops);
LOG_DEBUG(log, "Removed " << removed << " old log entries");
}
@ -605,7 +592,7 @@ void StorageReplicatedMergeTree::clearOldBlocks()
return;
LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - data.settings.replicated_deduplication_window
<< " old blocks from ZooKeeper");
<< " old blocks from ZooKeeper. This might take several minutes.");
Strings blocks = zookeeper->getChildren(zookeeper_path + "/blocks");
@ -648,104 +635,67 @@ void StorageReplicatedMergeTree::loadQueue()
}
}
void StorageReplicatedMergeTree::pullLogsToQueue()
/// Преобразовать число в формате суффиксов автоинкрементных нод в ZooKeeper.
static String padIndex(UInt64 index)
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
/// Сольем все логи в хронологическом порядке.
struct LogIterator
{
String replica; /// Имя реплики.
UInt64 index; /// Номер записи в логе (суффикс имени ноды).
Int64 timestamp; /// Время (czxid) создания записи в логе.
String entry_str; /// Сама запись.
bool operator<(const LogIterator & rhs) const
{
/// Нужно доставать из очереди минимальный timestamp.
return timestamp > rhs.timestamp;
}
bool readEntry(zkutil::ZooKeeper & zookeeper, const String & zookeeper_path)
{
String index_str = toString(index);
while (index_str.size() < 10)
index_str = '0' + index_str;
zkutil::Stat stat;
if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log/log-" + index_str, entry_str, &stat))
return false;
timestamp = stat.czxid;
return true;
}
};
return index_str;
}
typedef std::priority_queue<LogIterator> PriorityQueue;
PriorityQueue priority_queue;
void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event)
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
String index_str;
String index_str = zookeeper->get(replica_path + "/log_pointer");
UInt64 index;
if (zookeeper->tryGet(replica_path + "/log_pointers/" + replica, index_str))
if (index_str.empty())
{
index = parse<UInt64>(index_str);
/// Если у нас еще нет указателя на лог, поставим указатель на первую запись в нем.
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
index = entries.empty() ? 0 : parse<UInt64>(std::min_element(entries.begin(), entries.end())->substr(strlen("log-")));
zookeeper->set(replica_path + "/log_pointer", toString(index), zkutil::CreateMode::Persistent);
}
else
{
/// Если у нас еще нет указателя на лог этой реплики, поставим указатель на первую запись в нем.
Strings entries = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/log");
std::sort(entries.begin(), entries.end());
index = entries.empty() ? 0 : parse<UInt64>(entries[0].substr(strlen("log-")));
zookeeper->create(replica_path + "/log_pointers/" + replica, toString(index), zkutil::CreateMode::Persistent);
index = parse<UInt64>(index_str);
}
LogIterator iterator;
iterator.replica = replica;
iterator.index = index;
if (iterator.readEntry(*zookeeper, zookeeper_path))
priority_queue.push(iterator);
}
if (priority_queue.empty())
return;
size_t count = 0;
while (!priority_queue.empty())
String entry_str;
while (zookeeper->tryGet(zookeeper_path + "/log/log-" + padIndex(index), entry_str))
{
LogIterator iterator = priority_queue.top();
priority_queue.pop();
++count;
++index;
LogEntry entry = LogEntry::parse(iterator.entry_str);
LogEntry entry = LogEntry::parse(entry_str);
/// Одновременно добавим запись в очередь и продвинем указатель на лог.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", iterator.entry_str, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
replica_path + "/queue/queue-", entry_str, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/log_pointers/" + iterator.replica, toString(iterator.index + 1), -1));
replica_path + "/log_pointer", toString(index), -1));
auto results = zookeeper->multi(ops);
String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
entry.addResultToVirtualParts(*this);
queue.push_back(entry);
}
++iterator.index;
if (iterator.readEntry(*zookeeper, zookeeper_path))
priority_queue.push(iterator);
if (next_update_event)
{
if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event))
next_update_event->set();
}
if (queue_task_handle)
queue_task_handle->wake();
LOG_DEBUG(log, "Pulled " << count << " entries to queue");
}
@ -943,24 +893,19 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
{
try
{
pullLogsToQueue();
pullLogsToQueue(queue_updating_event);
clearOldParts();
/// Каждую минуту выбрасываем ненужные записи из лога.
if (time(0) - clear_old_logs_time > 60)
{
clear_old_logs_time = time(0);
clearOldLogs();
}
queue_updating_event->wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
queue_updating_event->tryWait(ERROR_SLEEP_MS);
}
}
shutdown_event.tryWait(QUEUE_UPDATE_SLEEP_MS);
}
LOG_DEBUG(log, "queue updating thread finished");
}
bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & pool_context)
@ -1101,7 +1046,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
entry.parts_to_merge.push_back(part->name);
}
zookeeper->create(replica_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
/// (чтобы кусок добавился в virtual_parts).
@ -1137,23 +1082,33 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
if (!success)
merge_selecting_event.tryWait(MERGE_SELECTING_SLEEP_MS);
}
LOG_DEBUG(log, "merge selecting thread finished");
}
void StorageReplicatedMergeTree::clearOldBlocksThread()
void StorageReplicatedMergeTree::cleanupThread()
{
while (!shutdown_called && is_leader_node)
while (!shutdown_called)
{
try
{
clearOldParts();
if (is_leader_node)
{
clearOldLogs();
clearOldBlocks();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
shutdown_event.tryWait(60 * 1000);
shutdown_event.tryWait(CLEANUP_SLEEP_MS);
}
LOG_DEBUG(log, "cleanup thread finished");
}
bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
@ -1193,7 +1148,6 @@ void StorageReplicatedMergeTree::becomeLeader()
LOG_INFO(log, "Became leader");
is_leader_node = true;
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
clear_old_blocks_thread = std::thread(&StorageReplicatedMergeTree::clearOldBlocksThread, this);
}
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
@ -1262,9 +1216,12 @@ void StorageReplicatedMergeTree::shutdown()
restarting_thread.join();
return;
}
permanent_shutdown_called = true;
permanent_shutdown_event.set();
restarting_thread.join();
endpoint_holder = nullptr;
}
void StorageReplicatedMergeTree::partialShutdown()
@ -1272,6 +1229,9 @@ void StorageReplicatedMergeTree::partialShutdown()
leader_election = nullptr;
shutdown_called = true;
shutdown_event.set();
merge_selecting_event.set();
queue_updating_event->set();
alter_thread_event->set();
replica_is_active_node = nullptr;
merger.cancelAll();
@ -1282,14 +1242,15 @@ void StorageReplicatedMergeTree::partialShutdown()
if (is_leader_node)
{
is_leader_node = false;
merge_selecting_event.set();
if (merge_selecting_thread.joinable())
merge_selecting_thread.join();
if (clear_old_blocks_thread.joinable())
clear_old_blocks_thread.join();
}
if (queue_updating_thread.joinable())
queue_updating_thread.join();
if (cleanup_thread.joinable())
cleanup_thread.join();
if (alter_thread.joinable())
alter_thread.join();
if (queue_task_handle)
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
@ -1303,31 +1264,8 @@ void StorageReplicatedMergeTree::goReadOnly()
is_read_only = true;
permanent_shutdown_called = true;
permanent_shutdown_event.set();
shutdown_called = true;
shutdown_event.set();
leader_election = nullptr;
replica_is_active_node = nullptr;
merger.cancelAll();
endpoint_holder = nullptr;
LOG_TRACE(log, "Waiting for threads to finish");
if (is_leader_node)
{
is_leader_node = false;
merge_selecting_event.set();
if (merge_selecting_thread.joinable())
merge_selecting_thread.join();
if (clear_old_blocks_thread.joinable())
clear_old_blocks_thread.join();
}
if (queue_updating_thread.joinable())
queue_updating_thread.join();
if (queue_task_handle)
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
LOG_TRACE(log, "Threads finished");
partialShutdown();
}
void StorageReplicatedMergeTree::startup()
@ -1345,6 +1283,7 @@ void StorageReplicatedMergeTree::startup()
std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
cleanup_thread = std::thread(&StorageReplicatedMergeTree::cleanupThread, this);
queue_task_handle = context.getBackgroundPool().addTask(
std::bind(&StorageReplicatedMergeTree::queueTask, this, std::placeholders::_1));
}
@ -1381,6 +1320,7 @@ void StorageReplicatedMergeTree::restartingThread()
tryLogCurrentException("StorageReplicatedMergeTree::restartingThread");
LOG_ERROR(log, "Exception in restartingThread. The storage will be read-only until server restart.");
goReadOnly();
LOG_DEBUG(log, "restarting thread finished");
return;
}
@ -1393,6 +1333,8 @@ void StorageReplicatedMergeTree::restartingThread()
{
tryLogCurrentException("StorageReplicatedMergeTree::restartingThread");
}
LOG_DEBUG(log, "restarting thread finished");
}
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()