load mutations from ZK

This commit is contained in:
Alexey Zatelepin 2018-04-19 17:20:18 +03:00
parent 21e03324af
commit 0e6ba2caca
7 changed files with 130 additions and 4 deletions

View File

@ -36,12 +36,15 @@ String ReplicatedMergeTreeMutationEntry::toString() const
return out.str();
}
ReplicatedMergeTreeMutationEntry ReplicatedMergeTreeMutationEntry::parse(const String & str)
ReplicatedMergeTreeMutationEntry ReplicatedMergeTreeMutationEntry::parse(const String & str, String znode_name)
{
ReadBufferFromString in(str);
ReplicatedMergeTreeMutationEntry res;
res.znode_name = std::move(znode_name);
ReadBufferFromString in(str);
res.readText(in);
assertEOF(in);
return res;
}

View File

@ -18,7 +18,7 @@ struct ReplicatedMergeTreeMutationEntry
void readText(ReadBuffer & in);
String toString() const;
static ReplicatedMergeTreeMutationEntry parse(const String & str);
static ReplicatedMergeTreeMutationEntry parse(const String & str, String znode_name);
String znode_name;

View File

@ -261,7 +261,7 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event)
{
std::lock_guard<std::mutex> lock(pull_logs_to_queue_mutex);
std::lock_guard lock(pull_logs_to_queue_mutex);
std::set<Int64> new_ephemeral_block_numbers;
{
@ -282,6 +282,12 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
Strings log_entries = zookeeper->getChildren(zookeeper_path + "/log", nullptr, next_update_event);
/// We update mutations after we have loaded the list of log entries, but before we insert them
/// in the queue.
/// With this we ensure that if you read the queue state Q1 and then the state of mutations M1,
/// then Q1 "happened-before" M1.
updateMutations(zookeeper, nullptr);
if (index_str.empty())
{
/// If we do not already have a pointer to the log, put a pointer to the first entry in it.
@ -418,6 +424,82 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
}
bool ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event)
{
std::lock_guard lock(update_mutations_mutex);
Strings entries_in_zk = zookeeper->getChildren(zookeeper_path + "/mutations", nullptr, next_update_event);
std::sort(entries_in_zk.begin(), entries_in_zk.end());
/// Compare with the local state, delete obsolete entries and determine which new entries to load.
Strings entries_to_load;
{
std::lock_guard lock(mutex);
if (entries_in_zk.empty())
{
mutations_by_block_number.clear();
mutations.clear();
}
else
{
for (auto it = mutations.begin(); it != mutations.end(); )
{
if (it->znode_name < entries_in_zk.front())
{
LOG_DEBUG(log, "Removing obsolete mutation " + it->znode_name + " from local state.");
mutations_by_block_number.erase(it->block_number);
it = mutations.erase(it);
}
else
break;
}
}
if (mutations.empty())
entries_to_load = std::move(entries_in_zk);
else
{
for (auto it = entries_in_zk.rbegin(); it != entries_in_zk.rend(); ++it)
{
if (*it > mutations.back().znode_name)
entries_to_load.push_back(std::move(*it));
else
break;
}
std::reverse(entries_to_load.begin(), entries_to_load.end());
}
}
if (!entries_to_load.empty())
{
LOG_INFO(log, "Loading " + toString(entries_to_load.size()) + " mutation entries: "
+ entries_to_load.front() + " - " + entries_to_load.back());
std::vector<std::future<zkutil::GetResponse>> futures;
for (const String & entry : entries_to_load)
futures.emplace_back(zookeeper->asyncGet(zookeeper_path + "/mutations/" + entry));
std::list<ReplicatedMergeTreeMutationEntry> new_mutations;
for (size_t i = 0; i < entries_to_load.size(); ++i)
{
new_mutations.push_back(
ReplicatedMergeTreeMutationEntry::parse(futures[i].get().data, entries_to_load[i]));
}
{
std::lock_guard lock(mutex);
for (const ReplicatedMergeTreeMutationEntry & mutation : new_mutations)
mutations_by_block_number.emplace(mutation.block_number, &mutation);
mutations.splice(mutations.end(), new_mutations);
}
}
return !entries_to_load.empty();
}
ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsForMergeToEndOfQueue(const String & part_name)
{
std::lock_guard<std::mutex> lock(mutex);

View File

@ -3,6 +3,7 @@
#include <optional>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/MergeTreeData.h>
@ -68,6 +69,9 @@ private:
/// Provides only one simultaneous call to pullLogsToQueue.
std::mutex pull_logs_to_queue_mutex;
/// Ensures that only one thread is simultaneously updating mutations.
std::mutex update_mutations_mutex;
/** What will be the set of active parts after running the entire current queue - adding new parts and performing merges.
* Used to determine which merges have already been assigned:
* - if there is a part in this set, then the smaller parts inside its range are not made.
@ -79,6 +83,9 @@ private:
std::set<Int64> ephemeral_block_numbers;
std::list<ReplicatedMergeTreeMutationEntry> mutations;
std::map<Int64, const ReplicatedMergeTreeMutationEntry *> mutations_by_block_number;
Logger * log = nullptr;
@ -165,6 +172,8 @@ public:
*/
bool pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event);
bool updateMutations(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event);
/** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
* And also wait for the completion of their execution, if they are now being executed.
*/

View File

@ -205,6 +205,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.shutdown_event.reset();
storage.queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, &storage);
storage.mutations_updating_thread = std::thread(&StorageReplicatedMergeTree::mutationsUpdatingThread, &storage);
storage.part_check_thread.start();
storage.alter_thread = std::make_unique<ReplicatedMergeTreeAlterThread>(storage);
storage.cleanup_thread = std::make_unique<ReplicatedMergeTreeCleanupThread>(storage);
@ -350,6 +351,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.shutdown_event.set();
storage.merge_selecting_event.set();
storage.queue_updating_event->set();
storage.mutations_updating_event->set();
storage.alter_query_event->set();
storage.cleanup_thread_event.set();
storage.replica_is_active_node = nullptr;
@ -361,6 +363,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
if (storage.queue_updating_thread.joinable())
storage.queue_updating_thread.join();
if (storage.mutations_updating_thread.joinable())
storage.mutations_updating_thread.join();
storage.cleanup_thread.reset();
storage.alter_thread.reset();
storage.part_check_thread.stop();

View File

@ -1626,6 +1626,28 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
}
void StorageReplicatedMergeTree::mutationsUpdatingThread()
{
setThreadName("ReplMTMutUpd");
while (!shutdown_called)
{
try
{
queue.updateMutations(getZooKeeper(), mutations_updating_event);
mutations_updating_event->wait();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
mutations_updating_event->tryWait(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
}
LOG_DEBUG(log, "Mutations updating thread finished");
}
bool StorageReplicatedMergeTree::queueTask()
{
/// This object will mark the element of the queue as running.

View File

@ -259,6 +259,9 @@ private:
std::thread queue_updating_thread;
zkutil::EventPtr queue_updating_event = std::make_shared<Poco::Event>();
std::thread mutations_updating_thread;
zkutil::EventPtr mutations_updating_event = std::make_shared<Poco::Event>();
/// A task that performs actions from the queue.
BackgroundProcessingPool::TaskHandle queue_task_handle;
@ -366,6 +369,8 @@ private:
*/
void queueUpdatingThread();
void mutationsUpdatingThread();
/** Performs actions from the queue.
*/
bool queueTask();