2016-01-10 04:44:12 +00:00
|
|
|
|
#include <DB/IO/ReadHelpers.h>
|
|
|
|
|
#include <DB/IO/WriteHelpers.h>
|
|
|
|
|
#include <DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h>
|
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
2016-01-12 19:45:51 +00:00
|
|
|
|
namespace ErrorCodes
|
|
|
|
|
{
|
|
|
|
|
extern const int UNEXPECTED_NODE_IN_ZOOKEEPER;
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeQueue::initVirtualParts(const MergeTreeData::DataParts & parts)
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
|
|
|
|
|
for (const auto & part : parts)
|
|
|
|
|
virtual_parts.add(part->name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
|
|
|
|
|
{
|
2016-01-12 19:45:51 +00:00
|
|
|
|
auto queue_path = replica_path + "/queue";
|
|
|
|
|
LOG_DEBUG(log, "Loading queue from " << queue_path);
|
|
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
|
2016-01-12 19:45:51 +00:00
|
|
|
|
Strings children = zookeeper->getChildren(queue_path);
|
|
|
|
|
LOG_DEBUG(log, "Having " << children.size() << " queue entries to load.");
|
|
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
|
std::sort(children.begin(), children.end());
|
|
|
|
|
|
|
|
|
|
std::vector<std::pair<String, zkutil::ZooKeeper::GetFuture>> futures;
|
|
|
|
|
futures.reserve(children.size());
|
|
|
|
|
|
|
|
|
|
for (const String & child : children)
|
2016-01-12 19:45:51 +00:00
|
|
|
|
futures.emplace_back(child, zookeeper->asyncGet(queue_path + "/" + child));
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
|
|
|
|
for (auto & future : futures)
|
|
|
|
|
{
|
|
|
|
|
zkutil::ZooKeeper::ValueAndStat res = future.second.get();
|
|
|
|
|
LogEntryPtr entry = LogEntry::parse(res.value, res.stat);
|
|
|
|
|
|
|
|
|
|
entry->znode_name = future.first;
|
|
|
|
|
insertUnlocked(entry);
|
|
|
|
|
}
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-17 13:00:42 +00:00
|
|
|
|
updateTimesInZooKeeper(zookeeper, true, false);
|
|
|
|
|
|
2016-01-12 19:45:51 +00:00
|
|
|
|
LOG_TRACE(log, "Loaded queue");
|
2016-01-10 04:44:12 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeQueue::initialize(
|
|
|
|
|
const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
|
|
|
|
|
const MergeTreeData::DataParts & parts, zkutil::ZooKeeperPtr zookeeper)
|
|
|
|
|
{
|
|
|
|
|
zookeeper_path = zookeeper_path_;
|
|
|
|
|
replica_path = replica_path_;
|
|
|
|
|
logger_name = logger_name_;
|
|
|
|
|
log = &Logger::get(logger_name);
|
|
|
|
|
|
|
|
|
|
initVirtualParts(parts);
|
|
|
|
|
load(zookeeper);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry)
|
|
|
|
|
{
|
|
|
|
|
virtual_parts.add(entry->new_part_name);
|
|
|
|
|
queue.push_back(entry);
|
2016-01-17 13:00:42 +00:00
|
|
|
|
|
|
|
|
|
if (entry->type == LogEntry::GET_PART)
|
|
|
|
|
{
|
|
|
|
|
inserts_by_time.insert(entry);
|
|
|
|
|
|
|
|
|
|
if (entry->create_time && (!min_unprocessed_insert_time || entry->create_time < min_unprocessed_insert_time))
|
|
|
|
|
min_unprocessed_insert_time = entry->create_time;
|
|
|
|
|
}
|
2016-01-10 04:44:12 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2016-01-17 13:00:42 +00:00
|
|
|
|
void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry)
|
2016-01-10 04:44:12 +00:00
|
|
|
|
{
|
2016-01-17 13:00:42 +00:00
|
|
|
|
time_t prev_min_unprocessed_insert_time;
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
prev_min_unprocessed_insert_time = min_unprocessed_insert_time;
|
|
|
|
|
insertUnlocked(entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (min_unprocessed_insert_time != prev_min_unprocessed_insert_time)
|
|
|
|
|
updateTimesInZooKeeper(zookeeper, true, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeQueue::updateTimesOnRemoval(
|
|
|
|
|
const LogEntryPtr & entry,
|
|
|
|
|
bool & min_unprocessed_insert_time_changed,
|
|
|
|
|
bool & max_processed_insert_time_changed)
|
|
|
|
|
{
|
|
|
|
|
if (entry->type != LogEntry::GET_PART)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
inserts_by_time.erase(entry);
|
|
|
|
|
|
|
|
|
|
if (inserts_by_time.empty())
|
|
|
|
|
{
|
|
|
|
|
min_unprocessed_insert_time = 0;
|
|
|
|
|
min_unprocessed_insert_time_changed = true;
|
|
|
|
|
}
|
|
|
|
|
else if ((*inserts_by_time.begin())->create_time > min_unprocessed_insert_time)
|
|
|
|
|
{
|
|
|
|
|
min_unprocessed_insert_time = (*inserts_by_time.begin())->create_time;
|
|
|
|
|
min_unprocessed_insert_time_changed = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (entry->create_time > max_processed_insert_time)
|
|
|
|
|
{
|
|
|
|
|
max_processed_insert_time = entry->create_time;
|
|
|
|
|
max_processed_insert_time_changed = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
|
|
|
|
|
zkutil::ZooKeeperPtr zookeeper,
|
|
|
|
|
bool min_unprocessed_insert_time_changed,
|
|
|
|
|
bool max_processed_insert_time_changed)
|
|
|
|
|
{
|
|
|
|
|
/// Здесь может быть race condition (при одновременном выполнении разных remove).
|
|
|
|
|
/// Считаем его несущественным (в течение небольшого времени, в ZK будет записано немного отличающееся значение времени).
|
|
|
|
|
/// Также читаем значения переменных min_unprocessed_insert_time, max_processed_insert_time без синхронизации.
|
|
|
|
|
zkutil::Ops ops;
|
|
|
|
|
|
|
|
|
|
if (min_unprocessed_insert_time_changed)
|
|
|
|
|
ops.push_back(new zkutil::Op::SetData(
|
|
|
|
|
replica_path + "/min_unprocessed_insert_time", toString(min_unprocessed_insert_time), -1));
|
|
|
|
|
|
|
|
|
|
if (max_processed_insert_time_changed)
|
|
|
|
|
ops.push_back(new zkutil::Op::SetData(
|
|
|
|
|
replica_path + "/max_processed_insert_time", toString(max_processed_insert_time), -1));
|
|
|
|
|
|
|
|
|
|
if (!ops.empty())
|
|
|
|
|
{
|
|
|
|
|
auto code = zookeeper->tryMulti(ops);
|
|
|
|
|
|
|
|
|
|
if (code != ZOK)
|
|
|
|
|
LOG_ERROR(log, "Couldn't set value of nodes for insert times ("
|
|
|
|
|
<< replica_path << "/min_unprocessed_insert_time, max_processed_insert_time)" << ": "
|
|
|
|
|
<< zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often.");
|
|
|
|
|
}
|
2016-01-10 04:44:12 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry)
|
|
|
|
|
{
|
|
|
|
|
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name);
|
|
|
|
|
|
|
|
|
|
if (code != ZOK)
|
2016-01-17 13:00:42 +00:00
|
|
|
|
LOG_ERROR(log, "Couldn't remove " << replica_path << "/queue/" << entry->znode_name << ": "
|
|
|
|
|
<< zkutil::ZooKeeper::error2string(code) << ". This shouldn't happen often.");
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
2016-01-17 13:00:42 +00:00
|
|
|
|
bool min_unprocessed_insert_time_changed = false;
|
|
|
|
|
bool max_processed_insert_time_changed = false;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
|
|
|
|
{
|
2016-01-17 13:00:42 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
|
|
|
|
|
/// Удалим задание из очереди в оперативке.
|
|
|
|
|
/// Нельзя просто обратиться по заранее сохраненному итератору, потому что задание мог успеть удалить кто-то другой.
|
|
|
|
|
/// Почему просматриваем очередь с конца?
|
|
|
|
|
/// - потому что задание к выполнению сначала перемещается в конец очереди, чтобы в случае неуспеха оно осталось в конце.
|
|
|
|
|
for (Queue::iterator it = queue.end(); it != queue.begin();)
|
2016-01-10 04:44:12 +00:00
|
|
|
|
{
|
2016-01-17 13:00:42 +00:00
|
|
|
|
--it;
|
|
|
|
|
if (*it == entry)
|
|
|
|
|
{
|
|
|
|
|
queue.erase(it);
|
|
|
|
|
break;
|
|
|
|
|
}
|
2016-01-10 04:44:12 +00:00
|
|
|
|
}
|
2016-01-17 13:00:42 +00:00
|
|
|
|
|
|
|
|
|
updateTimesOnRemoval(entry, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
|
2016-01-10 04:44:12 +00:00
|
|
|
|
}
|
2016-01-17 13:00:42 +00:00
|
|
|
|
|
|
|
|
|
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
|
2016-01-10 04:44:12 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name)
|
|
|
|
|
{
|
2016-01-17 13:00:42 +00:00
|
|
|
|
LogEntryPtr found;
|
|
|
|
|
|
|
|
|
|
bool min_unprocessed_insert_time_changed = false;
|
|
|
|
|
bool max_processed_insert_time_changed = false;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
|
|
|
|
{
|
2016-01-17 13:00:42 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
|
|
|
|
|
for (Queue::iterator it = queue.begin(); it != queue.end();)
|
2016-01-10 04:44:12 +00:00
|
|
|
|
{
|
2016-01-17 13:00:42 +00:00
|
|
|
|
if ((*it)->new_part_name == part_name)
|
|
|
|
|
{
|
|
|
|
|
found = *it;
|
|
|
|
|
queue.erase(it++);
|
|
|
|
|
updateTimesOnRemoval(found, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
++it;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-17 13:00:42 +00:00
|
|
|
|
if (!found)
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
zookeeper->tryRemove(replica_path + "/queue/" + found->znode_name);
|
|
|
|
|
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
|
|
|
|
|
|
|
|
|
|
return true;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event)
|
|
|
|
|
{
|
2016-01-13 02:36:11 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(pull_logs_to_queue_mutex);
|
|
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
|
String index_str = zookeeper->get(replica_path + "/log_pointer");
|
|
|
|
|
UInt64 index;
|
|
|
|
|
|
2016-01-12 19:45:51 +00:00
|
|
|
|
Strings log_entries = zookeeper->getChildren(zookeeper_path + "/log");
|
|
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
|
if (index_str.empty())
|
|
|
|
|
{
|
|
|
|
|
/// Если у нас еще нет указателя на лог, поставим указатель на первую запись в нем.
|
2016-01-12 19:45:51 +00:00
|
|
|
|
index = log_entries.empty() ? 0 : parse<UInt64>(std::min_element(log_entries.begin(), log_entries.end())->substr(strlen("log-")));
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
|
|
|
|
zookeeper->set(replica_path + "/log_pointer", toString(index));
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
index = parse<UInt64>(index_str);
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-12 19:45:51 +00:00
|
|
|
|
String min_log_entry = "log-" + padIndex(index);
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
2016-01-12 19:45:51 +00:00
|
|
|
|
/// Множество записей лога, которые должны быть скопированы в очередь.
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
2016-01-12 19:45:51 +00:00
|
|
|
|
log_entries.erase(
|
|
|
|
|
std::remove_if(log_entries.begin(), log_entries.end(), [&min_log_entry](const String & entry) { return entry < min_log_entry; }),
|
|
|
|
|
log_entries.end());
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
2016-01-17 05:22:22 +00:00
|
|
|
|
if (!log_entries.empty())
|
|
|
|
|
{
|
|
|
|
|
std::sort(log_entries.begin(), log_entries.end());
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
/// ZK содержит ограничение на количество или суммарный размер операций в multi-запросе.
|
|
|
|
|
/// При превышении ограничения просто разрывается соединение.
|
|
|
|
|
/// Константа выбрана с запасом. Ограничение по-умолчанию в ZK - 1 МБ данных суммарно.
|
|
|
|
|
/// Средний размер значения узла в данном случае, меньше килобайта.
|
|
|
|
|
static constexpr auto MAX_MULTI_OPS = 1000;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
for (size_t i = 0, size = log_entries.size(); i < size; i += MAX_MULTI_OPS)
|
|
|
|
|
{
|
|
|
|
|
auto begin = log_entries.begin() + i;
|
|
|
|
|
auto end = i + MAX_MULTI_OPS >= log_entries.size()
|
|
|
|
|
? log_entries.end()
|
|
|
|
|
: (begin + MAX_MULTI_OPS);
|
|
|
|
|
auto last = end - 1;
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
String last_entry = *last;
|
|
|
|
|
if (0 != last_entry.compare(0, strlen("log-"), "log-"))
|
|
|
|
|
throw Exception("Error in zookeeper data: unexpected node " + last_entry + " in " + zookeeper_path + "/log",
|
|
|
|
|
ErrorCodes::UNEXPECTED_NODE_IN_ZOOKEEPER);
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
UInt64 last_entry_index = parse<UInt64>(last_entry.substr(strlen("log-")));
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
LOG_DEBUG(log, "Pulling " << (end - begin) << " entries to queue: " << *begin << " - " << *last);
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
std::vector<std::pair<String, zkutil::ZooKeeper::GetFuture>> futures;
|
|
|
|
|
futures.reserve(end - begin);
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
for (auto it = begin; it != end; ++it)
|
|
|
|
|
futures.emplace_back(*it, zookeeper->asyncGet(zookeeper_path + "/log/" + *it));
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
/// Одновременно добавим все новые записи в очередь и продвинем указатель на лог.
|
2016-01-17 13:00:42 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
zkutil::Ops ops;
|
|
|
|
|
std::vector<LogEntryPtr> copied_entries;
|
|
|
|
|
copied_entries.reserve(end - begin);
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
bool min_unprocessed_insert_time_changed = false;
|
2016-01-17 13:00:42 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
for (auto & future : futures)
|
2016-01-17 13:00:42 +00:00
|
|
|
|
{
|
2016-01-21 19:43:00 +00:00
|
|
|
|
zkutil::ZooKeeper::ValueAndStat res = future.second.get();
|
|
|
|
|
copied_entries.emplace_back(LogEntry::parse(res.value, res.stat));
|
|
|
|
|
|
|
|
|
|
ops.push_back(new zkutil::Op::Create(
|
|
|
|
|
replica_path + "/queue/queue-", res.value, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
|
|
|
|
|
|
|
|
|
|
const auto & entry = *copied_entries.back();
|
|
|
|
|
if (entry.type == LogEntry::GET_PART)
|
2016-01-17 13:00:42 +00:00
|
|
|
|
{
|
2016-01-21 19:43:00 +00:00
|
|
|
|
if (entry.create_time && (!min_unprocessed_insert_time || entry.create_time < min_unprocessed_insert_time))
|
|
|
|
|
{
|
|
|
|
|
min_unprocessed_insert_time = entry.create_time;
|
|
|
|
|
min_unprocessed_insert_time_changed = true;
|
|
|
|
|
}
|
2016-01-17 13:00:42 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-17 13:00:42 +00:00
|
|
|
|
ops.push_back(new zkutil::Op::SetData(
|
2016-01-21 19:43:00 +00:00
|
|
|
|
replica_path + "/log_pointer", toString(last_entry_index + 1), -1));
|
2016-01-17 13:00:42 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
if (min_unprocessed_insert_time_changed)
|
|
|
|
|
ops.push_back(new zkutil::Op::SetData(
|
|
|
|
|
replica_path + "/min_unprocessed_insert_time", toString(min_unprocessed_insert_time), -1));
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
auto results = zookeeper->multi(ops);
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
/// Сейчас мы успешно обновили очередь в ZooKeeper. Обновим её в оперативке.
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
try
|
2016-01-17 05:22:22 +00:00
|
|
|
|
{
|
2016-01-21 19:43:00 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
for (size_t i = 0, size = copied_entries.size(); i < size; ++i)
|
|
|
|
|
{
|
|
|
|
|
String path_created = dynamic_cast<zkutil::Op::Create &>(ops[i]).getPathCreated();
|
|
|
|
|
copied_entries[i]->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
|
|
|
|
|
|
|
|
|
|
insertUnlocked(copied_entries[i]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
last_queue_update = time(0);
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
/// Если не удалось, то данные в оперативке некорректные. Во избежание возможной дальнейшей порчи данных в ZK, убъёмся.
|
|
|
|
|
/// Попадание сюда возможно лишь в случае неизвестной логической ошибки.
|
|
|
|
|
std::terminate();
|
2016-01-17 05:22:22 +00:00
|
|
|
|
}
|
2016-01-12 19:45:51 +00:00
|
|
|
|
|
2016-01-21 19:43:00 +00:00
|
|
|
|
if (!copied_entries.empty())
|
|
|
|
|
LOG_DEBUG(log, "Pulled " << copied_entries.size() << " entries to queue.");
|
2016-01-12 19:45:51 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
|
|
|
|
if (next_update_event)
|
|
|
|
|
{
|
|
|
|
|
if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event))
|
|
|
|
|
next_update_event->set();
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-17 05:22:22 +00:00
|
|
|
|
return !log_entries.empty();
|
2016-01-10 04:44:12 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsForMergeToEndOfQueue(const String & part_name)
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
|
|
|
|
|
/// Найдем действие по объединению этого куска с другими. Запомним других.
|
|
|
|
|
StringSet parts_for_merge;
|
|
|
|
|
Queue::iterator merge_entry;
|
|
|
|
|
for (Queue::iterator it = queue.begin(); it != queue.end(); ++it)
|
|
|
|
|
{
|
|
|
|
|
if ((*it)->type == LogEntry::MERGE_PARTS)
|
|
|
|
|
{
|
|
|
|
|
if (std::find((*it)->parts_to_merge.begin(), (*it)->parts_to_merge.end(), part_name)
|
|
|
|
|
!= (*it)->parts_to_merge.end())
|
|
|
|
|
{
|
|
|
|
|
parts_for_merge = StringSet((*it)->parts_to_merge.begin(), (*it)->parts_to_merge.end());
|
|
|
|
|
merge_entry = it;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!parts_for_merge.empty())
|
|
|
|
|
{
|
|
|
|
|
/// Переместим в конец очереди действия, получающие parts_for_merge.
|
|
|
|
|
for (Queue::iterator it = queue.begin(); it != queue.end();)
|
|
|
|
|
{
|
|
|
|
|
auto it0 = it;
|
|
|
|
|
++it;
|
|
|
|
|
|
|
|
|
|
if (it0 == merge_entry)
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
if (((*it0)->type == LogEntry::MERGE_PARTS || (*it0)->type == LogEntry::GET_PART)
|
|
|
|
|
&& parts_for_merge.count((*it0)->new_part_name))
|
|
|
|
|
{
|
|
|
|
|
queue.splice(queue.end(), queue, it0, it);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return parts_for_merge;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr zookeeper, const String & part_name)
|
|
|
|
|
{
|
|
|
|
|
Queue to_wait;
|
|
|
|
|
size_t removed_entries = 0;
|
2016-01-17 13:00:42 +00:00
|
|
|
|
bool min_unprocessed_insert_time_changed = false;
|
|
|
|
|
bool max_processed_insert_time_changed = false;
|
2016-01-10 04:44:12 +00:00
|
|
|
|
|
|
|
|
|
/// Удалим из очереди операции с кусками, содержащимися в удаляемом диапазоне.
|
|
|
|
|
std::unique_lock<std::mutex> lock(mutex);
|
|
|
|
|
for (Queue::iterator it = queue.begin(); it != queue.end();)
|
|
|
|
|
{
|
|
|
|
|
if (((*it)->type == LogEntry::GET_PART || (*it)->type == LogEntry::MERGE_PARTS) &&
|
|
|
|
|
ActiveDataPartSet::contains(part_name, (*it)->new_part_name))
|
|
|
|
|
{
|
|
|
|
|
if ((*it)->currently_executing)
|
|
|
|
|
to_wait.push_back(*it);
|
|
|
|
|
auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
|
|
|
|
|
if (code != ZOK)
|
|
|
|
|
LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": "
|
|
|
|
|
<< zkutil::ZooKeeper::error2string(code));
|
2016-01-17 13:00:42 +00:00
|
|
|
|
|
|
|
|
|
updateTimesOnRemoval(*it, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
|
2016-01-10 04:44:12 +00:00
|
|
|
|
queue.erase(it++);
|
|
|
|
|
++removed_entries;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
++it;
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-17 13:00:42 +00:00
|
|
|
|
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
|
|
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
|
LOG_DEBUG(log, "Removed " << removed_entries << " entries from queue. "
|
|
|
|
|
"Waiting for " << to_wait.size() << " entries that are currently executing.");
|
|
|
|
|
|
|
|
|
|
/// Дождемся завершения операций с кусками, содержащимися в удаляемом диапазоне.
|
|
|
|
|
for (LogEntryPtr & entry : to_wait)
|
|
|
|
|
entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger)
|
|
|
|
|
{
|
|
|
|
|
/// queue_mutex уже захвачен. Функция вызывается только из selectEntryToProcess.
|
|
|
|
|
|
|
|
|
|
if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
|
|
|
|
|
{
|
|
|
|
|
/// Проверим, не создаётся ли сейчас этот же кусок другим действием.
|
|
|
|
|
if (future_parts.count(entry.new_part_name))
|
|
|
|
|
{
|
|
|
|
|
String reason = "Not executing log entry for part " + entry.new_part_name
|
|
|
|
|
+ " because another log entry for the same part is being processed. This shouldn't happen often.";
|
|
|
|
|
LOG_DEBUG(log, reason);
|
|
|
|
|
out_postpone_reason = reason;
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
/** Когда соответствующее действие завершится, то shouldExecuteLogEntry, в следующий раз, пройдёт успешно,
|
|
|
|
|
* и элемент очереди будет обработан. Сразу же в функции executeLogEntry будет выяснено, что кусок у нас уже есть,
|
|
|
|
|
* и элемент очереди будет сразу считаться обработанным.
|
|
|
|
|
*/
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Более сложная проверка - не создаётся ли сейчас другим действием кусок, который покроет этот кусок.
|
|
|
|
|
/// NOTE То, что выше - избыточно, но оставлено ради более удобного сообщения в логе.
|
|
|
|
|
ActiveDataPartSet::Part result_part;
|
|
|
|
|
ActiveDataPartSet::parsePartName(entry.new_part_name, result_part);
|
|
|
|
|
|
|
|
|
|
/// Оно может тормозить при большом размере future_parts. Но он не может быть большим, так как ограничен BackgroundProcessingPool.
|
|
|
|
|
for (const auto & future_part_name : future_parts)
|
|
|
|
|
{
|
|
|
|
|
ActiveDataPartSet::Part future_part;
|
|
|
|
|
ActiveDataPartSet::parsePartName(future_part_name, future_part);
|
|
|
|
|
|
|
|
|
|
if (future_part.contains(result_part))
|
|
|
|
|
{
|
|
|
|
|
String reason = "Not executing log entry for part " + entry.new_part_name
|
|
|
|
|
+ " because another log entry for covering part " + future_part_name + " is being processed.";
|
|
|
|
|
LOG_DEBUG(log, reason);
|
|
|
|
|
out_postpone_reason = reason;
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (entry.type == LogEntry::MERGE_PARTS)
|
|
|
|
|
{
|
|
|
|
|
/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.
|
|
|
|
|
* Иначе, даже если всех нужных частей для мерджа нет, нужно попытаться сделать мердж.
|
|
|
|
|
* Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок.
|
|
|
|
|
* Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди.
|
|
|
|
|
*/
|
|
|
|
|
for (const auto & name : entry.parts_to_merge)
|
|
|
|
|
{
|
|
|
|
|
if (future_parts.count(name))
|
|
|
|
|
{
|
|
|
|
|
String reason = "Not merging into part " + entry.new_part_name
|
|
|
|
|
+ " because part " + name + " is not ready yet (log entry for that part is being processed).";
|
|
|
|
|
LOG_TRACE(log, reason);
|
|
|
|
|
out_postpone_reason = reason;
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (merger.isCancelled())
|
|
|
|
|
{
|
|
|
|
|
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges are cancelled now.";
|
|
|
|
|
LOG_DEBUG(log, reason);
|
|
|
|
|
out_postpone_reason = reason;
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ReplicatedMergeTreeQueue::LogEntryPtr ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMerger & merger)
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
|
|
|
|
|
LogEntryPtr entry;
|
|
|
|
|
|
|
|
|
|
for (Queue::iterator it = queue.begin(); it != queue.end(); ++it)
|
|
|
|
|
{
|
|
|
|
|
if ((*it)->currently_executing)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger))
|
|
|
|
|
{
|
|
|
|
|
entry = *it;
|
|
|
|
|
queue.splice(queue.end(), queue, it);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
++(*it)->num_postponed;
|
|
|
|
|
(*it)->last_postpone_time = time(0);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return entry;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CurrentlyExecuting
|
|
|
|
|
{
|
|
|
|
|
private:
|
|
|
|
|
ReplicatedMergeTreeQueue::LogEntryPtr & entry;
|
|
|
|
|
ReplicatedMergeTreeQueue & queue;
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
CurrentlyExecuting(ReplicatedMergeTreeQueue::LogEntryPtr & entry, ReplicatedMergeTreeQueue & queue)
|
|
|
|
|
: entry(entry), queue(queue)
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(queue.mutex);
|
|
|
|
|
|
|
|
|
|
entry->currently_executing = true;
|
|
|
|
|
++entry->num_tries;
|
|
|
|
|
entry->last_attempt_time = time(0);
|
|
|
|
|
|
|
|
|
|
if (!queue.future_parts.insert(entry->new_part_name).second)
|
|
|
|
|
throw Exception("Tagging already tagged future part " + entry->new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
~CurrentlyExecuting()
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(queue.mutex);
|
|
|
|
|
|
|
|
|
|
entry->currently_executing = false;
|
|
|
|
|
entry->execution_complete.notify_all();
|
|
|
|
|
|
|
|
|
|
if (!queue.future_parts.erase(entry->new_part_name))
|
|
|
|
|
LOG_ERROR(queue.log, "Untagging already untagged future part " + entry->new_part_name + ". This is a bug.");
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool ReplicatedMergeTreeQueue::processEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func)
|
|
|
|
|
{
|
|
|
|
|
CurrentlyExecuting guard(entry, *this);
|
|
|
|
|
|
|
|
|
|
std::exception_ptr saved_exception;
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
if (func(entry))
|
|
|
|
|
remove(zookeeper, entry);
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
saved_exception = std::current_exception();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (saved_exception)
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
entry->exception = saved_exception;
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool ReplicatedMergeTreeQueue::partWillBeMergedOrMergesDisabled(const String & part_name) const
|
|
|
|
|
{
|
|
|
|
|
return virtual_parts.getContainingPart(part_name) != part_name;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name)
|
|
|
|
|
{
|
|
|
|
|
virtual_parts.add(part_name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus()
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
|
|
|
|
|
Status res;
|
|
|
|
|
|
|
|
|
|
res.future_parts = future_parts.size();
|
|
|
|
|
res.queue_size = queue.size();
|
|
|
|
|
res.last_queue_update = last_queue_update;
|
|
|
|
|
|
|
|
|
|
res.inserts_in_queue = 0;
|
|
|
|
|
res.merges_in_queue = 0;
|
|
|
|
|
res.queue_oldest_time = 0;
|
|
|
|
|
res.inserts_oldest_time = 0;
|
|
|
|
|
res.merges_oldest_time = 0;
|
|
|
|
|
|
|
|
|
|
for (const LogEntryPtr & entry : queue)
|
|
|
|
|
{
|
|
|
|
|
if (entry->create_time && (!res.queue_oldest_time || entry->create_time < res.queue_oldest_time))
|
|
|
|
|
res.queue_oldest_time = entry->create_time;
|
|
|
|
|
|
|
|
|
|
if (entry->type == LogEntry::GET_PART)
|
|
|
|
|
{
|
|
|
|
|
++res.inserts_in_queue;
|
|
|
|
|
|
|
|
|
|
if (entry->create_time && (!res.inserts_oldest_time || entry->create_time < res.inserts_oldest_time))
|
|
|
|
|
{
|
|
|
|
|
res.inserts_oldest_time = entry->create_time;
|
|
|
|
|
res.oldest_part_to_get = entry->new_part_name;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (entry->type == LogEntry::MERGE_PARTS)
|
|
|
|
|
{
|
|
|
|
|
++res.merges_in_queue;
|
|
|
|
|
|
|
|
|
|
if (entry->create_time && (!res.merges_oldest_time || entry->create_time < res.merges_oldest_time))
|
|
|
|
|
{
|
|
|
|
|
res.merges_oldest_time = entry->create_time;
|
|
|
|
|
res.oldest_part_to_merge_to = entry->new_part_name;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res)
|
|
|
|
|
{
|
|
|
|
|
res.clear();
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
|
|
|
|
|
res.reserve(queue.size());
|
|
|
|
|
for (const auto & entry : queue)
|
|
|
|
|
res.emplace_back(*entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeQueue::countMerges(size_t & all_merges, size_t & big_merges, size_t max_big_merges,
|
|
|
|
|
const std::function<bool(const String &)> & is_part_big)
|
|
|
|
|
{
|
|
|
|
|
all_merges = 0;
|
|
|
|
|
big_merges = 0;
|
|
|
|
|
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
|
|
|
|
|
for (const auto & entry : queue)
|
|
|
|
|
{
|
|
|
|
|
if (entry->type == LogEntry::MERGE_PARTS)
|
|
|
|
|
{
|
|
|
|
|
++all_merges;
|
|
|
|
|
|
|
|
|
|
if (big_merges + big_merges < max_big_merges)
|
|
|
|
|
{
|
|
|
|
|
for (const String & name : entry->parts_to_merge)
|
|
|
|
|
{
|
|
|
|
|
if (is_part_big(name))
|
|
|
|
|
{
|
|
|
|
|
++big_merges;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2016-01-17 13:00:42 +00:00
|
|
|
|
void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const
|
|
|
|
|
{
|
|
|
|
|
out_min_unprocessed_insert_time = min_unprocessed_insert_time;
|
|
|
|
|
out_max_processed_insert_time = max_processed_insert_time;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2016-01-10 04:44:12 +00:00
|
|
|
|
String padIndex(Int64 index)
|
|
|
|
|
{
|
|
|
|
|
String index_str = toString(index);
|
|
|
|
|
return std::string(10 - index_str.size(), '0') + index_str;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|