mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge branch 'silviucpp-replicated_merge_tree_thread_pool'
This commit is contained in:
commit
9456674649
257
dbms/src/Common/BackgroundSchedulePool.cpp
Normal file
257
dbms/src/Common/BackgroundSchedulePool.cpp
Normal file
@ -0,0 +1,257 @@
|
|||||||
|
#include <Common/BackgroundSchedulePool.h>
|
||||||
|
#include <Common/MemoryTracker.h>
|
||||||
|
#include <Common/CurrentMetrics.h>
|
||||||
|
#include <Common/Exception.h>
|
||||||
|
#include <Common/setThreadName.h>
|
||||||
|
#include <Common/Stopwatch.h>
|
||||||
|
#include <common/logger_useful.h>
|
||||||
|
#include <chrono>
|
||||||
|
|
||||||
|
namespace CurrentMetrics
|
||||||
|
{
|
||||||
|
extern const Metric BackgroundSchedulePoolTask;
|
||||||
|
extern const Metric MemoryTrackingInBackgroundSchedulePool;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
|
||||||
|
// TaskNotification
|
||||||
|
|
||||||
|
class TaskNotification final : public Poco::Notification
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit TaskNotification(const BackgroundSchedulePool::TaskHandle & task) : task(task) {}
|
||||||
|
void execute() { task->execute(); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
BackgroundSchedulePool::TaskHandle task;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
// BackgroundSchedulePool::TaskInfo
|
||||||
|
|
||||||
|
BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function):
|
||||||
|
name(name),
|
||||||
|
pool(pool),
|
||||||
|
function(function)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool BackgroundSchedulePool::TaskInfo::schedule()
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
|
if (deactivated || scheduled)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
scheduled = true;
|
||||||
|
|
||||||
|
if (delayed)
|
||||||
|
pool.cancelDelayedTask(shared_from_this(), lock);
|
||||||
|
|
||||||
|
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms)
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
|
if (deactivated || scheduled)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
pool.scheduleDelayedTask(shared_from_this(), ms, lock);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void BackgroundSchedulePool::TaskInfo::deactivate()
|
||||||
|
{
|
||||||
|
if (deactivated)
|
||||||
|
return;
|
||||||
|
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
deactivated = true;
|
||||||
|
scheduled = false;
|
||||||
|
|
||||||
|
if (delayed)
|
||||||
|
pool.cancelDelayedTask(shared_from_this(), lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void BackgroundSchedulePool::TaskInfo::activate()
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
deactivated = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void BackgroundSchedulePool::TaskInfo::execute()
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
|
if (deactivated)
|
||||||
|
return;
|
||||||
|
|
||||||
|
scheduled = false;
|
||||||
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask};
|
||||||
|
|
||||||
|
Stopwatch watch;
|
||||||
|
function();
|
||||||
|
UInt64 milliseconds = watch.elapsedMilliseconds();
|
||||||
|
|
||||||
|
/// If the task is executed longer than specified time, it will be logged.
|
||||||
|
static const int32_t slow_execution_threshold_ms = 50;
|
||||||
|
|
||||||
|
if (milliseconds >= slow_execution_threshold_ms)
|
||||||
|
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Executing " << name << " took " << milliseconds << " ms.");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// BackgroundSchedulePool
|
||||||
|
|
||||||
|
BackgroundSchedulePool::BackgroundSchedulePool(size_t size)
|
||||||
|
: size(size)
|
||||||
|
{
|
||||||
|
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads");
|
||||||
|
|
||||||
|
threads.resize(size);
|
||||||
|
for (auto & thread : threads)
|
||||||
|
thread = std::thread([this] { threadFunction(); });
|
||||||
|
|
||||||
|
delayed_thread = std::thread([this] { delayExecutionThreadFunction(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
BackgroundSchedulePool::~BackgroundSchedulePool()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
shutdown = true;
|
||||||
|
wakeup_event.notify_all();
|
||||||
|
queue.wakeUpAll();
|
||||||
|
|
||||||
|
delayed_thread.join();
|
||||||
|
|
||||||
|
LOG_TRACE(&Logger::get("BackgroundSchedulePool"), "Waiting for threads to finish.");
|
||||||
|
for (std::thread & thread : threads)
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
BackgroundSchedulePool::TaskHandle BackgroundSchedulePool::addTask(const std::string & name, const Task & task)
|
||||||
|
{
|
||||||
|
return std::make_shared<TaskInfo>(*this, name, task);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void BackgroundSchedulePool::removeTask(const TaskHandle & task)
|
||||||
|
{
|
||||||
|
task->deactivate();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard<std::recursive_mutex> &)
|
||||||
|
{
|
||||||
|
Poco::Timestamp current_time;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard lock(delayed_tasks_lock);
|
||||||
|
|
||||||
|
if (task->delayed)
|
||||||
|
delayed_tasks.erase(task->iterator);
|
||||||
|
|
||||||
|
task->iterator = delayed_tasks.emplace(current_time + (ms * 1000), task);
|
||||||
|
task->delayed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
wakeup_event.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void BackgroundSchedulePool::cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::recursive_mutex> &)
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard lock(delayed_tasks_lock);
|
||||||
|
delayed_tasks.erase(task->iterator);
|
||||||
|
task->delayed = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
wakeup_event.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void BackgroundSchedulePool::threadFunction()
|
||||||
|
{
|
||||||
|
setThreadName("BackgrSchedPool");
|
||||||
|
|
||||||
|
MemoryTracker memory_tracker;
|
||||||
|
memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool);
|
||||||
|
current_memory_tracker = &memory_tracker;
|
||||||
|
|
||||||
|
while (!shutdown)
|
||||||
|
{
|
||||||
|
if (Poco::AutoPtr<Poco::Notification> notification = queue.waitDequeueNotification())
|
||||||
|
{
|
||||||
|
TaskNotification & task_notification = static_cast<TaskNotification &>(*notification);
|
||||||
|
task_notification.execute();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
current_memory_tracker = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void BackgroundSchedulePool::delayExecutionThreadFunction()
|
||||||
|
{
|
||||||
|
setThreadName("BckSchPoolDelay");
|
||||||
|
|
||||||
|
while (!shutdown)
|
||||||
|
{
|
||||||
|
Poco::Timestamp min_time;
|
||||||
|
TaskHandle task;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard lock(delayed_tasks_lock);
|
||||||
|
|
||||||
|
if (!delayed_tasks.empty())
|
||||||
|
{
|
||||||
|
auto t = delayed_tasks.begin();
|
||||||
|
min_time = t->first;
|
||||||
|
task = t->second;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shutdown)
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (!task)
|
||||||
|
{
|
||||||
|
std::unique_lock lock(delayed_tasks_lock);
|
||||||
|
wakeup_event.wait(lock);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Poco::Timestamp current_time;
|
||||||
|
if (min_time > current_time)
|
||||||
|
{
|
||||||
|
std::unique_lock lock(delayed_tasks_lock);
|
||||||
|
wakeup_event.wait_for(lock, std::chrono::microseconds(min_time - current_time));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
task->schedule();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
116
dbms/src/Common/BackgroundSchedulePool.h
Normal file
116
dbms/src/Common/BackgroundSchedulePool.h
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <Poco/Notification.h>
|
||||||
|
#include <Poco/NotificationQueue.h>
|
||||||
|
#include <Poco/Timestamp.h>
|
||||||
|
|
||||||
|
#include <thread>
|
||||||
|
#include <atomic>
|
||||||
|
#include <mutex>
|
||||||
|
#include <vector>
|
||||||
|
#include <map>
|
||||||
|
#include <functional>
|
||||||
|
#include <boost/noncopyable.hpp>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
class TaskNotification;
|
||||||
|
|
||||||
|
|
||||||
|
/** Executes functions scheduled at a specific point in time.
|
||||||
|
* Basically all tasks are added in a queue and precessed by worker threads.
|
||||||
|
*
|
||||||
|
* The most important difference between this and BackgroundProcessingPool
|
||||||
|
* is that we have the guarantee that the same function is not executed from many workers in the same time.
|
||||||
|
*
|
||||||
|
* The usage scenario: instead starting a separate thread for each task,
|
||||||
|
* register a task in BackgroundSchedulePool and when you need to run the task,
|
||||||
|
* call schedule or scheduleAfter(duration) method.
|
||||||
|
*/
|
||||||
|
class BackgroundSchedulePool
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
class TaskInfo;
|
||||||
|
using TaskHandle = std::shared_ptr<TaskInfo>;
|
||||||
|
using Tasks = std::multimap<Poco::Timestamp, TaskHandle>;
|
||||||
|
using Task = std::function<void()>;
|
||||||
|
|
||||||
|
class TaskInfo : public std::enable_shared_from_this<TaskInfo>, private boost::noncopyable
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function);
|
||||||
|
|
||||||
|
/// All these methods waits for current execution of task.
|
||||||
|
|
||||||
|
/// Schedule for execution as soon as possible (if not already scheduled).
|
||||||
|
/// If the task was already scheduled with delay, the delay will be ignored.
|
||||||
|
bool schedule();
|
||||||
|
|
||||||
|
/// Schedule for execution after specified delay.
|
||||||
|
bool scheduleAfter(size_t ms);
|
||||||
|
|
||||||
|
/// Further attempts to schedule become no-op.
|
||||||
|
void deactivate();
|
||||||
|
void activate();
|
||||||
|
|
||||||
|
private:
|
||||||
|
friend class TaskNotification;
|
||||||
|
friend class BackgroundSchedulePool;
|
||||||
|
|
||||||
|
void execute();
|
||||||
|
|
||||||
|
/// This mutex is recursive, because it's locked during 'execute' method,
|
||||||
|
/// and the task can schedule itself again during execution.
|
||||||
|
std::recursive_mutex mutex;
|
||||||
|
|
||||||
|
std::string name;
|
||||||
|
bool deactivated = false;
|
||||||
|
bool scheduled = false;
|
||||||
|
bool delayed = false;
|
||||||
|
BackgroundSchedulePool & pool;
|
||||||
|
Task function;
|
||||||
|
|
||||||
|
/// If the task is scheduled with delay, points to element of delayed_tasks.
|
||||||
|
Tasks::iterator iterator;
|
||||||
|
};
|
||||||
|
|
||||||
|
BackgroundSchedulePool(size_t size);
|
||||||
|
~BackgroundSchedulePool();
|
||||||
|
|
||||||
|
TaskHandle addTask(const std::string & name, const Task & task);
|
||||||
|
void removeTask(const TaskHandle & task);
|
||||||
|
size_t getNumberOfThreads() const { return size; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
using Threads = std::vector<std::thread>;
|
||||||
|
|
||||||
|
void threadFunction();
|
||||||
|
void delayExecutionThreadFunction();
|
||||||
|
|
||||||
|
/// Schedule task for execution after specified delay from now.
|
||||||
|
void scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard<std::recursive_mutex> &);
|
||||||
|
|
||||||
|
/// Remove task, that was scheduled with delay, from schedule.
|
||||||
|
void cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::recursive_mutex> &);
|
||||||
|
|
||||||
|
/// Number for worker threads.
|
||||||
|
const size_t size;
|
||||||
|
std::atomic<bool> shutdown {false};
|
||||||
|
Threads threads;
|
||||||
|
Poco::NotificationQueue queue;
|
||||||
|
|
||||||
|
/// Delayed notifications.
|
||||||
|
|
||||||
|
std::condition_variable wakeup_event;
|
||||||
|
std::mutex delayed_tasks_lock;
|
||||||
|
/// Thread waiting for next delayed task.
|
||||||
|
std::thread delayed_thread;
|
||||||
|
/// Tasks ordered by scheduled time.
|
||||||
|
Tasks delayed_tasks;
|
||||||
|
};
|
||||||
|
|
||||||
|
using BackgroundSchedulePoolPtr = std::shared_ptr<BackgroundSchedulePool>;
|
||||||
|
|
||||||
|
}
|
@ -9,6 +9,7 @@
|
|||||||
M(ReplicatedSend) \
|
M(ReplicatedSend) \
|
||||||
M(ReplicatedChecks) \
|
M(ReplicatedChecks) \
|
||||||
M(BackgroundPoolTask) \
|
M(BackgroundPoolTask) \
|
||||||
|
M(BackgroundSchedulePoolTask) \
|
||||||
M(DiskSpaceReservedForMerge) \
|
M(DiskSpaceReservedForMerge) \
|
||||||
M(DistributedSend) \
|
M(DistributedSend) \
|
||||||
M(QueryPreempted) \
|
M(QueryPreempted) \
|
||||||
@ -25,6 +26,7 @@
|
|||||||
M(LeaderReplica) \
|
M(LeaderReplica) \
|
||||||
M(MemoryTracking) \
|
M(MemoryTracking) \
|
||||||
M(MemoryTrackingInBackgroundProcessingPool) \
|
M(MemoryTrackingInBackgroundProcessingPool) \
|
||||||
|
M(MemoryTrackingInBackgroundSchedulePool) \
|
||||||
M(MemoryTrackingForMerges) \
|
M(MemoryTrackingForMerges) \
|
||||||
M(LeaderElection) \
|
M(LeaderElection) \
|
||||||
M(EphemeralNode) \
|
M(EphemeralNode) \
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <common/logger_useful.h>
|
#include <common/logger_useful.h>
|
||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
|
#include <Common/BackgroundSchedulePool.h>
|
||||||
|
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
@ -36,9 +37,10 @@ public:
|
|||||||
* and existence of more than one ephemeral node with same identifier indicates an error
|
* and existence of more than one ephemeral node with same identifier indicates an error
|
||||||
* (see cleanOldEphemeralNodes).
|
* (see cleanOldEphemeralNodes).
|
||||||
*/
|
*/
|
||||||
LeaderElection(const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
|
LeaderElection(DB::BackgroundSchedulePool & pool_, const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
|
||||||
: path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
|
: pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
|
||||||
{
|
{
|
||||||
|
task_handle = pool.addTask("LeaderElection", [this] { threadFunction(); });
|
||||||
createNode();
|
createNode();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,9 +53,12 @@ public:
|
|||||||
~LeaderElection()
|
~LeaderElection()
|
||||||
{
|
{
|
||||||
releaseNode();
|
releaseNode();
|
||||||
|
pool.removeTask(task_handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
DB::BackgroundSchedulePool & pool;
|
||||||
|
DB::BackgroundSchedulePool::TaskHandle task_handle;
|
||||||
std::string path;
|
std::string path;
|
||||||
ZooKeeper & zookeeper;
|
ZooKeeper & zookeeper;
|
||||||
LeadershipHandler handler;
|
LeadershipHandler handler;
|
||||||
@ -62,15 +67,10 @@ private:
|
|||||||
EphemeralNodeHolderPtr node;
|
EphemeralNodeHolderPtr node;
|
||||||
std::string node_name;
|
std::string node_name;
|
||||||
|
|
||||||
std::thread thread;
|
|
||||||
std::atomic<bool> shutdown {false};
|
|
||||||
zkutil::EventPtr event = std::make_shared<Poco::Event>();
|
|
||||||
|
|
||||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection};
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection};
|
||||||
|
|
||||||
void createNode()
|
void createNode()
|
||||||
{
|
{
|
||||||
shutdown = false;
|
|
||||||
node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier);
|
node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier);
|
||||||
|
|
||||||
std::string node_path = node->getPath();
|
std::string node_path = node->getPath();
|
||||||
@ -78,7 +78,8 @@ private:
|
|||||||
|
|
||||||
cleanOldEphemeralNodes();
|
cleanOldEphemeralNodes();
|
||||||
|
|
||||||
thread = std::thread(&LeaderElection::threadFunction, this);
|
task_handle->activate();
|
||||||
|
task_handle->schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanOldEphemeralNodes()
|
void cleanOldEphemeralNodes()
|
||||||
@ -113,47 +114,41 @@ private:
|
|||||||
|
|
||||||
void releaseNode()
|
void releaseNode()
|
||||||
{
|
{
|
||||||
shutdown = true;
|
task_handle->deactivate();
|
||||||
event->set();
|
|
||||||
if (thread.joinable())
|
|
||||||
thread.join();
|
|
||||||
node = nullptr;
|
node = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void threadFunction()
|
void threadFunction()
|
||||||
{
|
{
|
||||||
while (!shutdown)
|
bool success = false;
|
||||||
|
|
||||||
|
try
|
||||||
{
|
{
|
||||||
bool success = false;
|
Strings children = zookeeper.getChildren(path);
|
||||||
|
std::sort(children.begin(), children.end());
|
||||||
|
auto it = std::lower_bound(children.begin(), children.end(), node_name);
|
||||||
|
if (it == children.end() || *it != node_name)
|
||||||
|
throw Poco::Exception("Assertion failed in LeaderElection");
|
||||||
|
|
||||||
try
|
if (it == children.begin())
|
||||||
{
|
{
|
||||||
Strings children = zookeeper.getChildren(path);
|
ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership);
|
||||||
std::sort(children.begin(), children.end());
|
handler();
|
||||||
auto it = std::lower_bound(children.begin(), children.end(), node_name);
|
return;
|
||||||
if (it == children.end() || *it != node_name)
|
|
||||||
throw Poco::Exception("Assertion failed in LeaderElection");
|
|
||||||
|
|
||||||
if (it == children.begin())
|
|
||||||
{
|
|
||||||
ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership);
|
|
||||||
handler();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (zookeeper.exists(path + "/" + *(it - 1), nullptr, event))
|
|
||||||
event->wait();
|
|
||||||
|
|
||||||
success = true;
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
DB::tryLogCurrentException("LeaderElection");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!success)
|
if (!zookeeper.exists(path + "/" + *(it - 1), nullptr, task_handle))
|
||||||
event->tryWait(10 * 1000);
|
task_handle->schedule();
|
||||||
|
|
||||||
|
success = true;
|
||||||
}
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
DB::tryLogCurrentException("LeaderElection");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!success)
|
||||||
|
task_handle->scheduleAfter(10 * 1000);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
#include <zookeeper.h>
|
#include <zookeeper.h>
|
||||||
#include <Poco/Event.h>
|
#include <Poco/Event.h>
|
||||||
|
#include <Common/BackgroundSchedulePool.h>
|
||||||
|
|
||||||
|
|
||||||
namespace zkutil
|
namespace zkutil
|
||||||
@ -149,6 +150,7 @@ namespace CreateMode
|
|||||||
}
|
}
|
||||||
|
|
||||||
using EventPtr = std::shared_ptr<Poco::Event>;
|
using EventPtr = std::shared_ptr<Poco::Event>;
|
||||||
|
using TaskHandlePtr = DB::BackgroundSchedulePool::TaskHandle; /// TODO Need to remove this dependency.
|
||||||
|
|
||||||
class ZooKeeper;
|
class ZooKeeper;
|
||||||
|
|
||||||
|
@ -197,6 +197,23 @@ WatchCallback ZooKeeper::callbackForEvent(const EventPtr & event)
|
|||||||
return callback;
|
return callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
WatchCallback ZooKeeper::callbackForTaskHandle(const TaskHandlePtr & task)
|
||||||
|
{
|
||||||
|
WatchCallback callback;
|
||||||
|
if (task)
|
||||||
|
{
|
||||||
|
callback = [t=task](ZooKeeper &, int, int, const char *) mutable
|
||||||
|
{
|
||||||
|
if (t)
|
||||||
|
{
|
||||||
|
t->schedule();
|
||||||
|
t.reset(); /// The event is set only once, even if the callback can fire multiple times due to session events.
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return callback;
|
||||||
|
}
|
||||||
|
|
||||||
WatchContext * ZooKeeper::createContext(WatchCallback && callback)
|
WatchContext * ZooKeeper::createContext(WatchCallback && callback)
|
||||||
{
|
{
|
||||||
if (callback)
|
if (callback)
|
||||||
@ -256,6 +273,7 @@ int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
|
|||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
Strings ZooKeeper::getChildren(
|
Strings ZooKeeper::getChildren(
|
||||||
const std::string & path, Stat * stat, const EventPtr & watch)
|
const std::string & path, Stat * stat, const EventPtr & watch)
|
||||||
{
|
{
|
||||||
@ -444,6 +462,11 @@ bool ZooKeeper::exists(const std::string & path, Stat * stat_, const EventPtr &
|
|||||||
return existsWatch(path, stat_, callbackForEvent(watch));
|
return existsWatch(path, stat_, callbackForEvent(watch));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ZooKeeper::exists(const std::string & path, Stat * stat, const TaskHandlePtr & watch)
|
||||||
|
{
|
||||||
|
return existsWatch(path, stat, callbackForTaskHandle(watch));
|
||||||
|
}
|
||||||
|
|
||||||
bool ZooKeeper::existsWatch(const std::string & path, Stat * stat_, const WatchCallback & watch_callback)
|
bool ZooKeeper::existsWatch(const std::string & path, Stat * stat_, const WatchCallback & watch_callback)
|
||||||
{
|
{
|
||||||
int32_t code = retry(std::bind(&ZooKeeper::existsImpl, this, path, stat_, watch_callback));
|
int32_t code = retry(std::bind(&ZooKeeper::existsImpl, this, path, stat_, watch_callback));
|
||||||
@ -499,6 +522,16 @@ std::string ZooKeeper::get(const std::string & path, Stat * stat, const EventPtr
|
|||||||
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
|
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string ZooKeeper::get(const std::string & path, Stat * stat, const TaskHandlePtr & watch)
|
||||||
|
{
|
||||||
|
int code;
|
||||||
|
std::string res;
|
||||||
|
if (tryGetWatch(path, res, stat, callbackForTaskHandle(watch), &code))
|
||||||
|
return res;
|
||||||
|
else
|
||||||
|
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
|
||||||
|
}
|
||||||
|
|
||||||
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_, const EventPtr & watch, int * return_code)
|
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_, const EventPtr & watch, int * return_code)
|
||||||
{
|
{
|
||||||
return tryGetWatch(path, res, stat_, callbackForEvent(watch), return_code);
|
return tryGetWatch(path, res, stat_, callbackForEvent(watch), return_code);
|
||||||
|
@ -160,9 +160,11 @@ public:
|
|||||||
int32_t tryRemoveEphemeralNodeWithRetries(const std::string & path, int32_t version = -1, size_t * attempt = nullptr);
|
int32_t tryRemoveEphemeralNodeWithRetries(const std::string & path, int32_t version = -1, size_t * attempt = nullptr);
|
||||||
|
|
||||||
bool exists(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
|
bool exists(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
|
||||||
|
bool exists(const std::string & path, Stat * stat, const TaskHandlePtr & watch);
|
||||||
bool existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback);
|
bool existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback);
|
||||||
|
|
||||||
std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
|
std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
|
||||||
|
std::string get(const std::string & path, Stat * stat, const TaskHandlePtr & watch);
|
||||||
|
|
||||||
/// Doesn't not throw in the following cases:
|
/// Doesn't not throw in the following cases:
|
||||||
/// * The node doesn't exist. Returns false in this case.
|
/// * The node doesn't exist. Returns false in this case.
|
||||||
@ -370,6 +372,7 @@ private:
|
|||||||
void tryRemoveChildrenRecursive(const std::string & path);
|
void tryRemoveChildrenRecursive(const std::string & path);
|
||||||
|
|
||||||
static WatchCallback callbackForEvent(const EventPtr & event);
|
static WatchCallback callbackForEvent(const EventPtr & event);
|
||||||
|
static WatchCallback callbackForTaskHandle(const TaskHandlePtr & task);
|
||||||
WatchContext * createContext(WatchCallback && callback);
|
WatchContext * createContext(WatchCallback && callback);
|
||||||
static void destroyContext(WatchContext * context);
|
static void destroyContext(WatchContext * context);
|
||||||
static void processCallback(zhandle_t * zh, int type, int state, const char * path, void * watcher_ctx);
|
static void processCallback(zhandle_t * zh, int type, int state, const char * path, void * watcher_ctx);
|
||||||
|
@ -75,7 +75,7 @@ Field convertNodeToField(capnp::DynamicValue::Reader value)
|
|||||||
throw Exception("CAPABILITY type not supported");
|
throw Exception("CAPABILITY type not supported");
|
||||||
case capnp::DynamicValue::ANY_POINTER:
|
case capnp::DynamicValue::ANY_POINTER:
|
||||||
throw Exception("ANY_POINTER type not supported");
|
throw Exception("ANY_POINTER type not supported");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::string & field)
|
capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::string & field)
|
||||||
|
@ -171,7 +171,7 @@ Block DictionaryBlockInputStream<DictionaryType, Key>::getBlock(size_t start, si
|
|||||||
}
|
}
|
||||||
return (this->*fillBlockFunction)({}, columns, {}, std::move(view_columns));
|
return (this->*fillBlockFunction)({}, columns, {}, std::move(view_columns));
|
||||||
}
|
}
|
||||||
else if(!ids.empty())
|
else if (!ids.empty())
|
||||||
{
|
{
|
||||||
PaddedPODArray<Key> block_ids(ids.begin() + start, ids.begin() + start + length);
|
PaddedPODArray<Key> block_ids(ids.begin() + start, ids.begin() + start + length);
|
||||||
return (this->*fillBlockFunction)(block_ids, {}, {}, {});
|
return (this->*fillBlockFunction)(block_ids, {}, {}, {});
|
||||||
|
@ -277,31 +277,31 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
|
|||||||
{
|
{
|
||||||
code <<
|
code <<
|
||||||
"template void Aggregator::executeSpecialized<\n"
|
"template void Aggregator::executeSpecialized<\n"
|
||||||
"\t" << method_typename << ", TypeList<" << aggregate_functions_typenames << ">>(\n"
|
" " << method_typename << ", TypeList<" << aggregate_functions_typenames << ">>(\n"
|
||||||
"\t" << method_typename << " &, Arena *, size_t, ConstColumnPlainPtrs &,\n"
|
" " << method_typename << " &, Arena *, size_t, ConstColumnPlainPtrs &,\n"
|
||||||
"\tAggregateColumns &, const Sizes &, StringRefs &, bool, AggregateDataPtr) const;\n"
|
" AggregateColumns &, const Sizes &, StringRefs &, bool, AggregateDataPtr) const;\n"
|
||||||
"\n"
|
"\n"
|
||||||
"static void wrapper" << suffix << "(\n"
|
"static void wrapper" << suffix << "(\n"
|
||||||
"\tconst Aggregator & aggregator,\n"
|
" const Aggregator & aggregator,\n"
|
||||||
"\t" << method_typename << " & method,\n"
|
" " << method_typename << " & method,\n"
|
||||||
"\tArena * arena,\n"
|
" Arena * arena,\n"
|
||||||
"\tsize_t rows,\n"
|
" size_t rows,\n"
|
||||||
"\tConstColumnPlainPtrs & key_columns,\n"
|
" ConstColumnPlainPtrs & key_columns,\n"
|
||||||
"\tAggregator::AggregateColumns & aggregate_columns,\n"
|
" Aggregator::AggregateColumns & aggregate_columns,\n"
|
||||||
"\tconst Sizes & key_sizes,\n"
|
" const Sizes & key_sizes,\n"
|
||||||
"\tStringRefs & keys,\n"
|
" StringRefs & keys,\n"
|
||||||
"\tbool no_more_keys,\n"
|
" bool no_more_keys,\n"
|
||||||
"\tAggregateDataPtr overflow_row)\n"
|
" AggregateDataPtr overflow_row)\n"
|
||||||
"{\n"
|
"{\n"
|
||||||
"\taggregator.executeSpecialized<\n"
|
" aggregator.executeSpecialized<\n"
|
||||||
"\t\t" << method_typename << ", TypeList<" << aggregate_functions_typenames << ">>(\n"
|
" " << method_typename << ", TypeList<" << aggregate_functions_typenames << ">>(\n"
|
||||||
"\t\tmethod, arena, rows, key_columns, aggregate_columns, key_sizes, keys, no_more_keys, overflow_row);\n"
|
" method, arena, rows, key_columns, aggregate_columns, key_sizes, keys, no_more_keys, overflow_row);\n"
|
||||||
"}\n"
|
"}\n"
|
||||||
"\n"
|
"\n"
|
||||||
"void * getPtr" << suffix << "() __attribute__((__visibility__(\"default\")));\n"
|
"void * getPtr" << suffix << "() __attribute__((__visibility__(\"default\")));\n"
|
||||||
"void * getPtr" << suffix << "()\n" /// Without this wrapper, it's not clear how to get the desired symbol from the compiled library.
|
"void * getPtr" << suffix << "()\n" /// Without this wrapper, it's not clear how to get the desired symbol from the compiled library.
|
||||||
"{\n"
|
"{\n"
|
||||||
"\treturn reinterpret_cast<void *>(&wrapper" << suffix << ");\n"
|
" return reinterpret_cast<void *>(&wrapper" << suffix << ");\n"
|
||||||
"}\n";
|
"}\n";
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -312,25 +312,25 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
|
|||||||
/// For `without_key` method.
|
/// For `without_key` method.
|
||||||
code <<
|
code <<
|
||||||
"template void Aggregator::executeSpecializedWithoutKey<\n"
|
"template void Aggregator::executeSpecializedWithoutKey<\n"
|
||||||
"\t" << "TypeList<" << aggregate_functions_typenames << ">>(\n"
|
" " << "TypeList<" << aggregate_functions_typenames << ">>(\n"
|
||||||
"\tAggregatedDataWithoutKey &, size_t, AggregateColumns &, Arena *) const;\n"
|
" AggregatedDataWithoutKey &, size_t, AggregateColumns &, Arena *) const;\n"
|
||||||
"\n"
|
"\n"
|
||||||
"static void wrapper(\n"
|
"static void wrapper(\n"
|
||||||
"\tconst Aggregator & aggregator,\n"
|
" const Aggregator & aggregator,\n"
|
||||||
"\tAggregatedDataWithoutKey & method,\n"
|
" AggregatedDataWithoutKey & method,\n"
|
||||||
"\tsize_t rows,\n"
|
" size_t rows,\n"
|
||||||
"\tAggregator::AggregateColumns & aggregate_columns,\n"
|
" Aggregator::AggregateColumns & aggregate_columns,\n"
|
||||||
"\tArena * arena)\n"
|
" Arena * arena)\n"
|
||||||
"{\n"
|
"{\n"
|
||||||
"\taggregator.executeSpecializedWithoutKey<\n"
|
" aggregator.executeSpecializedWithoutKey<\n"
|
||||||
"\t\tTypeList<" << aggregate_functions_typenames << ">>(\n"
|
" TypeList<" << aggregate_functions_typenames << ">>(\n"
|
||||||
"\t\tmethod, rows, aggregate_columns, arena);\n"
|
" method, rows, aggregate_columns, arena);\n"
|
||||||
"}\n"
|
"}\n"
|
||||||
"\n"
|
"\n"
|
||||||
"void * getPtr() __attribute__((__visibility__(\"default\")));\n"
|
"void * getPtr() __attribute__((__visibility__(\"default\")));\n"
|
||||||
"void * getPtr()\n"
|
"void * getPtr()\n"
|
||||||
"{\n"
|
"{\n"
|
||||||
"\treturn reinterpret_cast<void *>(&wrapper);\n"
|
" return reinterpret_cast<void *>(&wrapper);\n"
|
||||||
"}\n";
|
"}\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -343,7 +343,7 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
|
|||||||
"void * getPtrTwoLevel() __attribute__((__visibility__(\"default\")));\n"
|
"void * getPtrTwoLevel() __attribute__((__visibility__(\"default\")));\n"
|
||||||
"void * getPtrTwoLevel()\n"
|
"void * getPtrTwoLevel()\n"
|
||||||
"{\n"
|
"{\n"
|
||||||
"\treturn nullptr;\n"
|
" return nullptr;\n"
|
||||||
"}\n";
|
"}\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include <Common/setThreadName.h>
|
#include <Common/setThreadName.h>
|
||||||
#include <Common/Stopwatch.h>
|
#include <Common/Stopwatch.h>
|
||||||
#include <Common/formatReadable.h>
|
#include <Common/formatReadable.h>
|
||||||
|
#include <Common/BackgroundSchedulePool.h>
|
||||||
#include <DataStreams/FormatFactory.h>
|
#include <DataStreams/FormatFactory.h>
|
||||||
#include <Databases/IDatabase.h>
|
#include <Databases/IDatabase.h>
|
||||||
#include <Storages/IStorage.h>
|
#include <Storages/IStorage.h>
|
||||||
@ -124,6 +125,7 @@ struct ContextShared
|
|||||||
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
|
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
|
||||||
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
|
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
|
||||||
BackgroundProcessingPoolPtr background_pool; /// The thread pool for the background work performed by the tables.
|
BackgroundProcessingPoolPtr background_pool; /// The thread pool for the background work performed by the tables.
|
||||||
|
BackgroundSchedulePoolPtr schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
|
||||||
Macros macros; /// Substitutions extracted from config.
|
Macros macros; /// Substitutions extracted from config.
|
||||||
std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
|
std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
|
||||||
std::shared_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
|
std::shared_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
|
||||||
@ -1227,6 +1229,14 @@ BackgroundProcessingPool & Context::getBackgroundPool()
|
|||||||
return *shared->background_pool;
|
return *shared->background_pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BackgroundSchedulePool & Context::getSchedulePool()
|
||||||
|
{
|
||||||
|
auto lock = getLock();
|
||||||
|
if (!shared->schedule_pool)
|
||||||
|
shared->schedule_pool = std::make_shared<BackgroundSchedulePool>(settings.background_schedule_pool_size);
|
||||||
|
return *shared->schedule_pool;
|
||||||
|
}
|
||||||
|
|
||||||
void Context::setDDLWorker(std::shared_ptr<DDLWorker> ddl_worker)
|
void Context::setDDLWorker(std::shared_ptr<DDLWorker> ddl_worker)
|
||||||
{
|
{
|
||||||
auto lock = getLock();
|
auto lock = getLock();
|
||||||
|
@ -38,6 +38,7 @@ class ExternalDictionaries;
|
|||||||
class ExternalModels;
|
class ExternalModels;
|
||||||
class InterserverIOHandler;
|
class InterserverIOHandler;
|
||||||
class BackgroundProcessingPool;
|
class BackgroundProcessingPool;
|
||||||
|
class BackgroundSchedulePool;
|
||||||
class MergeList;
|
class MergeList;
|
||||||
class Cluster;
|
class Cluster;
|
||||||
class Compiler;
|
class Compiler;
|
||||||
@ -307,6 +308,7 @@ public:
|
|||||||
void dropCaches() const;
|
void dropCaches() const;
|
||||||
|
|
||||||
BackgroundProcessingPool & getBackgroundPool();
|
BackgroundProcessingPool & getBackgroundPool();
|
||||||
|
BackgroundSchedulePool & getSchedulePool();
|
||||||
|
|
||||||
void setDDLWorker(std::shared_ptr<DDLWorker> ddl_worker);
|
void setDDLWorker(std::shared_ptr<DDLWorker> ddl_worker);
|
||||||
DDLWorker & getDDLWorker() const;
|
DDLWorker & getDDLWorker() const;
|
||||||
|
@ -75,6 +75,9 @@ struct Settings
|
|||||||
/** Number of threads performing background work for tables (for example, merging in merge tree). \
|
/** Number of threads performing background work for tables (for example, merging in merge tree). \
|
||||||
* TODO: Now only applies when the server is started. You can make it dynamically variable. */ \
|
* TODO: Now only applies when the server is started. You can make it dynamically variable. */ \
|
||||||
M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE) \
|
M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE) \
|
||||||
|
/** Number of threads performing background tasks for replicated tables. \
|
||||||
|
* TODO: Now only applies when the server is started. You can make it dynamically variable. */ \
|
||||||
|
M(SettingUInt64, background_schedule_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE) \
|
||||||
\
|
\
|
||||||
/** Sleep time for StorageDistributed DirectoryMonitors in case there is no work or exception has been thrown */ \
|
/** Sleep time for StorageDistributed DirectoryMonitors in case there is no work or exception has been thrown */ \
|
||||||
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS) \
|
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS) \
|
||||||
|
@ -92,7 +92,7 @@ bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
|||||||
|
|
||||||
if (s_after.ignore(pos, expected))
|
if (s_after.ignore(pos, expected))
|
||||||
{
|
{
|
||||||
if(!parser_name.parse(pos, params.column, expected))
|
if (!parser_name.parse(pos, params.column, expected))
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,8 +166,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
|||||||
{"endpoint", getEndpointId(replica_path)},
|
{"endpoint", getEndpointId(replica_path)},
|
||||||
{"part", part_name},
|
{"part", part_name},
|
||||||
{"compress", "false"}
|
{"compress", "false"}
|
||||||
}
|
});
|
||||||
);
|
|
||||||
|
|
||||||
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST};
|
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST};
|
||||||
|
|
||||||
|
@ -13,223 +13,223 @@ namespace DB
|
|||||||
static const auto ALTER_ERROR_SLEEP_MS = 10 * 1000;
|
static const auto ALTER_ERROR_SLEEP_MS = 10 * 1000;
|
||||||
|
|
||||||
|
|
||||||
ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_)
|
ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_) :
|
||||||
: storage(storage_),
|
storage(storage_),
|
||||||
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, AlterThread)")),
|
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, AlterThread)"))
|
||||||
thread([this] { run(); }) {}
|
{
|
||||||
|
task_handle = storage_.context.getSchedulePool().addTask("ReplicatedMergeTreeAlterThread", [this]{run();});
|
||||||
|
task_handle->schedule();
|
||||||
|
}
|
||||||
|
|
||||||
|
ReplicatedMergeTreeAlterThread::~ReplicatedMergeTreeAlterThread()
|
||||||
|
{
|
||||||
|
storage.context.getSchedulePool().removeTask(task_handle);
|
||||||
|
}
|
||||||
|
|
||||||
void ReplicatedMergeTreeAlterThread::run()
|
void ReplicatedMergeTreeAlterThread::run()
|
||||||
{
|
{
|
||||||
setThreadName("ReplMTAlter");
|
|
||||||
|
|
||||||
bool force_recheck_parts = true;
|
bool force_recheck_parts = true;
|
||||||
|
|
||||||
while (!need_stop)
|
try
|
||||||
{
|
{
|
||||||
try
|
/** We have a description of columns in ZooKeeper, common for all replicas (Example: /clickhouse/tables/02-06/visits/columns),
|
||||||
|
* as well as a description of columns in local file with metadata (storage.data.getColumnsList()).
|
||||||
|
*
|
||||||
|
* If these descriptions are different - you need to do ALTER.
|
||||||
|
*
|
||||||
|
* If stored version of the node (columns_version) differs from the version in ZK,
|
||||||
|
* then the description of the columns in ZK does not necessarily differ from the local
|
||||||
|
* - this can happen with a loop from ALTER-s, which as a whole, does not change anything.
|
||||||
|
* In this case, you need to update the stored version number,
|
||||||
|
* and also check the structure of parts, and, if necessary, make ALTER.
|
||||||
|
*
|
||||||
|
* Recorded version number needs to be updated after updating the metadata, under lock.
|
||||||
|
* This version number is checked against the current one for INSERT.
|
||||||
|
* That is, we make sure to insert blocks with the correct structure.
|
||||||
|
*
|
||||||
|
* When the server starts, previous ALTER might not have been completed.
|
||||||
|
* Therefore, for the first time, regardless of the changes, we check the structure of all parts,
|
||||||
|
* (Example: /clickhouse/tables/02-06/visits/replicas/example02-06-1.yandex.ru/parts/20140806_20140831_131664_134988_3296/columns)
|
||||||
|
* and do ALTER if necessary.
|
||||||
|
*
|
||||||
|
* TODO: Too complicated, rewrite everything.
|
||||||
|
*/
|
||||||
|
|
||||||
|
auto zookeeper = storage.getZooKeeper();
|
||||||
|
|
||||||
|
zkutil::Stat stat;
|
||||||
|
const String columns_str = zookeeper->get(storage.zookeeper_path + "/columns", &stat, task_handle);
|
||||||
|
auto columns_desc = ColumnsDescription<true>::parse(columns_str);
|
||||||
|
|
||||||
|
auto & columns = columns_desc.columns;
|
||||||
|
auto & materialized_columns = columns_desc.materialized;
|
||||||
|
auto & alias_columns = columns_desc.alias;
|
||||||
|
auto & column_defaults = columns_desc.defaults;
|
||||||
|
|
||||||
|
bool changed_version = (stat.version != storage.columns_version);
|
||||||
|
|
||||||
{
|
{
|
||||||
/** We have a description of columns in ZooKeeper, common for all replicas (Example: /clickhouse/tables/02-06/visits/columns),
|
/// If you need to lock table structure, then suspend merges.
|
||||||
* as well as a description of columns in local file with metadata (storage.data.getColumnsList()).
|
ActionBlocker::BlockHolder merge_blocker;
|
||||||
*
|
|
||||||
* If these descriptions are different - you need to do ALTER.
|
|
||||||
*
|
|
||||||
* If stored version of the node (columns_version) differs from the version in ZK,
|
|
||||||
* then the description of the columns in ZK does not necessarily differ from the local
|
|
||||||
* - this can happen with a loop from ALTER-s, which as a whole, does not change anything.
|
|
||||||
* In this case, you need to update the stored version number,
|
|
||||||
* and also check the structure of parts, and, if necessary, make ALTER.
|
|
||||||
*
|
|
||||||
* Recorded version number needs to be updated after updating the metadata, under lock.
|
|
||||||
* This version number is checked against the current one for INSERT.
|
|
||||||
* That is, we make sure to insert blocks with the correct structure.
|
|
||||||
*
|
|
||||||
* When the server starts, previous ALTER might not have been completed.
|
|
||||||
* Therefore, for the first time, regardless of the changes, we check the structure of all parts,
|
|
||||||
* (Example: /clickhouse/tables/02-06/visits/replicas/example02-06-1.yandex.ru/parts/20140806_20140831_131664_134988_3296/columns)
|
|
||||||
* and do ALTER if necessary.
|
|
||||||
*
|
|
||||||
* TODO: Too complicated, rewrite everything.
|
|
||||||
*/
|
|
||||||
|
|
||||||
auto zookeeper = storage.getZooKeeper();
|
if (changed_version || force_recheck_parts)
|
||||||
|
merge_blocker = storage.merger.merges_blocker.cancel();
|
||||||
|
|
||||||
zkutil::Stat stat;
|
MergeTreeData::DataParts parts;
|
||||||
const String columns_str = zookeeper->get(storage.zookeeper_path + "/columns", &stat, wakeup_event);
|
|
||||||
auto columns_desc = ColumnsDescription<true>::parse(columns_str);
|
|
||||||
|
|
||||||
auto & columns = columns_desc.columns;
|
|
||||||
auto & materialized_columns = columns_desc.materialized;
|
|
||||||
auto & alias_columns = columns_desc.alias;
|
|
||||||
auto & column_defaults = columns_desc.defaults;
|
|
||||||
|
|
||||||
bool changed_version = (stat.version != storage.columns_version);
|
|
||||||
|
|
||||||
|
/// If columns description has changed, we will update table structure locally.
|
||||||
|
if (changed_version)
|
||||||
{
|
{
|
||||||
/// If you need to lock table structure, then suspend merges.
|
/// Temporarily cancel part checks to avoid locking for long time.
|
||||||
ActionBlocker::BlockHolder merge_blocker;
|
auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop();
|
||||||
|
|
||||||
if (changed_version || force_recheck_parts)
|
/// Temporarily cancel parts sending
|
||||||
merge_blocker = storage.merger.merges_blocker.cancel();
|
ActionBlocker::BlockHolder data_parts_exchange_blocker;
|
||||||
|
if (storage.data_parts_exchange_endpoint_holder)
|
||||||
|
data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->cancel();
|
||||||
|
|
||||||
MergeTreeData::DataParts parts;
|
/// Temporarily cancel part fetches
|
||||||
|
auto fetches_blocker = storage.fetcher.blocker.cancel();
|
||||||
|
|
||||||
/// If columns description has changed, we will update table structure locally.
|
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
|
||||||
if (changed_version)
|
|
||||||
|
auto table_lock = storage.lockStructureForAlter(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
|
const auto columns_changed = columns != storage.data.getColumnsListNonMaterialized();
|
||||||
|
const auto materialized_columns_changed = materialized_columns != storage.data.materialized_columns;
|
||||||
|
const auto alias_columns_changed = alias_columns != storage.data.alias_columns;
|
||||||
|
const auto column_defaults_changed = column_defaults != storage.data.column_defaults;
|
||||||
|
|
||||||
|
if (columns_changed || materialized_columns_changed || alias_columns_changed ||
|
||||||
|
column_defaults_changed)
|
||||||
{
|
{
|
||||||
/// Temporarily cancel part checks to avoid locking for long time.
|
LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
|
||||||
auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop();
|
|
||||||
|
|
||||||
/// Temporarily cancel parts sending
|
storage.context.getDatabase(storage.database_name)->alterTable(
|
||||||
ActionBlocker::BlockHolder data_parts_exchange_blocker;
|
storage.context, storage.table_name,
|
||||||
if (storage.data_parts_exchange_endpoint_holder)
|
columns, materialized_columns, alias_columns, column_defaults, {});
|
||||||
data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->cancel();
|
|
||||||
|
|
||||||
/// Temporarily cancel part fetches
|
if (columns_changed)
|
||||||
auto fetches_blocker = storage.fetcher.blocker.cancel();
|
|
||||||
|
|
||||||
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
|
|
||||||
|
|
||||||
auto table_lock = storage.lockStructureForAlter(__PRETTY_FUNCTION__);
|
|
||||||
|
|
||||||
const auto columns_changed = columns != storage.data.getColumnsListNonMaterialized();
|
|
||||||
const auto materialized_columns_changed = materialized_columns != storage.data.materialized_columns;
|
|
||||||
const auto alias_columns_changed = alias_columns != storage.data.alias_columns;
|
|
||||||
const auto column_defaults_changed = column_defaults != storage.data.column_defaults;
|
|
||||||
|
|
||||||
if (columns_changed || materialized_columns_changed || alias_columns_changed ||
|
|
||||||
column_defaults_changed)
|
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
|
storage.data.setColumnsList(columns);
|
||||||
|
|
||||||
storage.context.getDatabase(storage.database_name)->alterTable(
|
|
||||||
storage.context, storage.table_name,
|
|
||||||
columns, materialized_columns, alias_columns, column_defaults, {});
|
|
||||||
|
|
||||||
if (columns_changed)
|
|
||||||
{
|
|
||||||
storage.data.setColumnsList(columns);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (materialized_columns_changed)
|
|
||||||
{
|
|
||||||
storage.materialized_columns = materialized_columns;
|
|
||||||
storage.data.materialized_columns = std::move(materialized_columns);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (alias_columns_changed)
|
|
||||||
{
|
|
||||||
storage.alias_columns = alias_columns;
|
|
||||||
storage.data.alias_columns = std::move(alias_columns);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (column_defaults_changed)
|
|
||||||
{
|
|
||||||
storage.column_defaults = column_defaults;
|
|
||||||
storage.data.column_defaults = std::move(column_defaults);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Reinitialize primary key because primary key column types might have changed.
|
|
||||||
storage.data.initPrimaryKey();
|
|
||||||
|
|
||||||
LOG_INFO(log, "Applied changes to table.");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// You need to get a list of parts under table lock to avoid race condition with merge.
|
if (materialized_columns_changed)
|
||||||
parts = storage.data.getDataParts();
|
{
|
||||||
|
storage.materialized_columns = materialized_columns;
|
||||||
|
storage.data.materialized_columns = std::move(materialized_columns);
|
||||||
|
}
|
||||||
|
|
||||||
storage.columns_version = stat.version;
|
if (alias_columns_changed)
|
||||||
|
{
|
||||||
|
storage.alias_columns = alias_columns;
|
||||||
|
storage.data.alias_columns = std::move(alias_columns);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (column_defaults_changed)
|
||||||
|
{
|
||||||
|
storage.column_defaults = column_defaults;
|
||||||
|
storage.data.column_defaults = std::move(column_defaults);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reinitialize primary key because primary key column types might have changed.
|
||||||
|
storage.data.initPrimaryKey();
|
||||||
|
|
||||||
|
LOG_INFO(log, "Applied changes to table.");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs.");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update parts.
|
/// You need to get a list of parts under table lock to avoid race condition with merge.
|
||||||
if (changed_version || force_recheck_parts)
|
parts = storage.data.getDataParts();
|
||||||
{
|
|
||||||
auto table_lock = storage.lockStructure(false, __PRETTY_FUNCTION__);
|
|
||||||
|
|
||||||
if (changed_version)
|
storage.columns_version = stat.version;
|
||||||
LOG_INFO(log, "ALTER-ing parts");
|
|
||||||
|
|
||||||
int changed_parts = 0;
|
|
||||||
|
|
||||||
if (!changed_version)
|
|
||||||
parts = storage.data.getDataParts();
|
|
||||||
|
|
||||||
const auto columns_plus_materialized = storage.data.getColumnsList();
|
|
||||||
|
|
||||||
for (const MergeTreeData::DataPartPtr & part : parts)
|
|
||||||
{
|
|
||||||
/// Update the part and write result to temporary files.
|
|
||||||
/// TODO: You can skip checking for too large changes if ZooKeeper has, for example,
|
|
||||||
/// node /flags/force_alter.
|
|
||||||
auto transaction = storage.data.alterDataPart(
|
|
||||||
part, columns_plus_materialized, storage.data.primary_expr_ast, false);
|
|
||||||
|
|
||||||
if (!transaction)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
++changed_parts;
|
|
||||||
|
|
||||||
/// Update part metadata in ZooKeeper.
|
|
||||||
zkutil::Ops ops;
|
|
||||||
ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
|
|
||||||
storage.replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1));
|
|
||||||
ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
|
|
||||||
storage.replica_path + "/parts/" + part->name + "/checksums", transaction->getNewChecksums().toString(), -1));
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
zookeeper->multi(ops);
|
|
||||||
}
|
|
||||||
catch (const zkutil::KeeperException & e)
|
|
||||||
{
|
|
||||||
/// The part does not exist in ZK. We will add to queue for verification - maybe the part is superfluous, and it must be removed locally.
|
|
||||||
if (e.code == ZNONODE)
|
|
||||||
storage.enqueuePartForCheck(part->name);
|
|
||||||
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Apply file changes.
|
|
||||||
transaction->commit();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Columns sizes could be quietly changed in case of MODIFY/ADD COLUMN
|
|
||||||
storage.data.recalculateColumnSizes();
|
|
||||||
|
|
||||||
/// List of columns for a specific replica.
|
|
||||||
zookeeper->set(storage.replica_path + "/columns", columns_str);
|
|
||||||
|
|
||||||
if (changed_version)
|
|
||||||
{
|
|
||||||
if (changed_parts != 0)
|
|
||||||
LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
|
|
||||||
else
|
|
||||||
LOG_INFO(log, "No parts ALTER-ed");
|
|
||||||
}
|
|
||||||
|
|
||||||
force_recheck_parts = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// It's important that parts and merge_blocker are destroyed before the wait.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wakeup_event->wait();
|
/// Update parts.
|
||||||
}
|
if (changed_version || force_recheck_parts)
|
||||||
catch (...)
|
{
|
||||||
{
|
auto table_lock = storage.lockStructure(false, __PRETTY_FUNCTION__);
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
|
|
||||||
force_recheck_parts = true;
|
if (changed_version)
|
||||||
|
LOG_INFO(log, "ALTER-ing parts");
|
||||||
|
|
||||||
wakeup_event->tryWait(ALTER_ERROR_SLEEP_MS);
|
int changed_parts = 0;
|
||||||
|
|
||||||
|
if (!changed_version)
|
||||||
|
parts = storage.data.getDataParts();
|
||||||
|
|
||||||
|
const auto columns_plus_materialized = storage.data.getColumnsList();
|
||||||
|
|
||||||
|
for (const MergeTreeData::DataPartPtr & part : parts)
|
||||||
|
{
|
||||||
|
/// Update the part and write result to temporary files.
|
||||||
|
/// TODO: You can skip checking for too large changes if ZooKeeper has, for example,
|
||||||
|
/// node /flags/force_alter.
|
||||||
|
auto transaction = storage.data.alterDataPart(
|
||||||
|
part, columns_plus_materialized, storage.data.primary_expr_ast, false);
|
||||||
|
|
||||||
|
if (!transaction)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
++changed_parts;
|
||||||
|
|
||||||
|
/// Update part metadata in ZooKeeper.
|
||||||
|
zkutil::Ops ops;
|
||||||
|
ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
|
||||||
|
storage.replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1));
|
||||||
|
ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
|
||||||
|
storage.replica_path + "/parts/" + part->name + "/checksums", transaction->getNewChecksums().toString(), -1));
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
zookeeper->multi(ops);
|
||||||
|
}
|
||||||
|
catch (const zkutil::KeeperException & e)
|
||||||
|
{
|
||||||
|
/// The part does not exist in ZK. We will add to queue for verification - maybe the part is superfluous, and it must be removed locally.
|
||||||
|
if (e.code == ZNONODE)
|
||||||
|
storage.enqueuePartForCheck(part->name);
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Apply file changes.
|
||||||
|
transaction->commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Columns sizes could be quietly changed in case of MODIFY/ADD COLUMN
|
||||||
|
storage.data.recalculateColumnSizes();
|
||||||
|
|
||||||
|
/// List of columns for a specific replica.
|
||||||
|
zookeeper->set(storage.replica_path + "/columns", columns_str);
|
||||||
|
|
||||||
|
if (changed_version)
|
||||||
|
{
|
||||||
|
if (changed_parts != 0)
|
||||||
|
LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
|
||||||
|
else
|
||||||
|
LOG_INFO(log, "No parts ALTER-ed");
|
||||||
|
}
|
||||||
|
|
||||||
|
force_recheck_parts = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// It's important that parts and merge_blocker are destroyed before the wait.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
|
force_recheck_parts = true;
|
||||||
|
|
||||||
|
task_handle->scheduleAfter(ALTER_ERROR_SLEEP_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Alter thread finished");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <Common/BackgroundSchedulePool.h>
|
||||||
#include <Common/ZooKeeper/Types.h>
|
#include <Common/ZooKeeper/Types.h>
|
||||||
#include <Core/Types.h>
|
#include <Core/Types.h>
|
||||||
#include <common/logger_useful.h>
|
#include <common/logger_useful.h>
|
||||||
@ -21,25 +22,14 @@ class ReplicatedMergeTreeAlterThread
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_);
|
ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_);
|
||||||
|
~ReplicatedMergeTreeAlterThread();
|
||||||
~ReplicatedMergeTreeAlterThread()
|
|
||||||
{
|
|
||||||
need_stop = true;
|
|
||||||
wakeup_event->set();
|
|
||||||
if (thread.joinable())
|
|
||||||
thread.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void run();
|
void run();
|
||||||
|
|
||||||
StorageReplicatedMergeTree & storage;
|
StorageReplicatedMergeTree & storage;
|
||||||
Logger * log;
|
Logger * log;
|
||||||
|
BackgroundSchedulePool::TaskHandle task_handle;
|
||||||
zkutil::EventPtr wakeup_event { std::make_shared<Poco::Event>() };
|
|
||||||
std::atomic<bool> need_stop { false };
|
|
||||||
|
|
||||||
std::thread thread;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -297,7 +297,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
|
|||||||
if (code == ZOK)
|
if (code == ZOK)
|
||||||
{
|
{
|
||||||
transaction.commit();
|
transaction.commit();
|
||||||
storage.merge_selecting_event.set();
|
storage.merge_selecting_handle->schedule();
|
||||||
}
|
}
|
||||||
else if (code == ZNODEEXISTS)
|
else if (code == ZNODEEXISTS)
|
||||||
{
|
{
|
||||||
|
@ -15,36 +15,33 @@ namespace ErrorCodes
|
|||||||
|
|
||||||
ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_)
|
ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_)
|
||||||
: storage(storage_),
|
: storage(storage_),
|
||||||
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)")),
|
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)"))
|
||||||
thread([this] { run(); })
|
|
||||||
{
|
{
|
||||||
|
task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreeCleanupThread", [this]{ run(); });
|
||||||
|
task_handle->schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ReplicatedMergeTreeCleanupThread::~ReplicatedMergeTreeCleanupThread()
|
||||||
|
{
|
||||||
|
storage.context.getSchedulePool().removeTask(task_handle);
|
||||||
|
}
|
||||||
|
|
||||||
void ReplicatedMergeTreeCleanupThread::run()
|
void ReplicatedMergeTreeCleanupThread::run()
|
||||||
{
|
{
|
||||||
setThreadName("ReplMTCleanup");
|
|
||||||
|
|
||||||
const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000;
|
const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000;
|
||||||
|
|
||||||
while (!storage.shutdown_called)
|
try
|
||||||
{
|
{
|
||||||
try
|
iterate();
|
||||||
{
|
}
|
||||||
iterate();
|
catch (...)
|
||||||
}
|
{
|
||||||
catch (...)
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
{
|
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
}
|
|
||||||
|
|
||||||
storage.cleanup_thread_event.tryWait(CLEANUP_SLEEP_MS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Cleanup thread finished");
|
task_handle->scheduleAfter(CLEANUP_SLEEP_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ReplicatedMergeTreeCleanupThread::iterate()
|
void ReplicatedMergeTreeCleanupThread::iterate()
|
||||||
{
|
{
|
||||||
storage.clearOldPartsAndRemoveFromZK();
|
storage.clearOldPartsAndRemoveFromZK();
|
||||||
@ -231,11 +228,4 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
|
|||||||
std::sort(timed_blocks.begin(), timed_blocks.end(), NodeWithStat::greaterByTime);
|
std::sort(timed_blocks.begin(), timed_blocks.end(), NodeWithStat::greaterByTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
ReplicatedMergeTreeCleanupThread::~ReplicatedMergeTreeCleanupThread()
|
|
||||||
{
|
|
||||||
if (thread.joinable())
|
|
||||||
thread.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
#include <Core/Types.h>
|
#include <Core/Types.h>
|
||||||
#include <Common/ZooKeeper/Types.h>
|
#include <Common/ZooKeeper/Types.h>
|
||||||
#include <common/logger_useful.h>
|
#include <common/logger_useful.h>
|
||||||
|
#include <Common/BackgroundSchedulePool.h>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
|
||||||
@ -19,13 +20,14 @@ class ReplicatedMergeTreeCleanupThread
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_);
|
ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_);
|
||||||
|
|
||||||
~ReplicatedMergeTreeCleanupThread();
|
~ReplicatedMergeTreeCleanupThread();
|
||||||
|
|
||||||
|
void schedule() { task_handle->schedule(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
StorageReplicatedMergeTree & storage;
|
StorageReplicatedMergeTree & storage;
|
||||||
Logger * log;
|
Logger * log;
|
||||||
std::thread thread;
|
BackgroundSchedulePool::TaskHandle task_handle;
|
||||||
|
|
||||||
void run();
|
void run();
|
||||||
void iterate();
|
void iterate();
|
||||||
|
@ -21,34 +21,34 @@ ReplicatedMergeTreePartCheckThread::ReplicatedMergeTreePartCheckThread(StorageRe
|
|||||||
: storage(storage_),
|
: storage(storage_),
|
||||||
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, PartCheckThread)"))
|
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, PartCheckThread)"))
|
||||||
{
|
{
|
||||||
|
task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreePartCheckThread", [this] { run(); });
|
||||||
|
task_handle->schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ReplicatedMergeTreePartCheckThread::~ReplicatedMergeTreePartCheckThread()
|
||||||
|
{
|
||||||
|
stop();
|
||||||
|
storage.context.getSchedulePool().removeTask(task_handle);
|
||||||
|
}
|
||||||
|
|
||||||
void ReplicatedMergeTreePartCheckThread::start()
|
void ReplicatedMergeTreePartCheckThread::start()
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(start_stop_mutex);
|
std::lock_guard<std::mutex> lock(start_stop_mutex);
|
||||||
|
need_stop = false;
|
||||||
if (need_stop)
|
task_handle->activate();
|
||||||
need_stop = false;
|
task_handle->schedule();
|
||||||
else
|
|
||||||
thread = std::thread([this] { run(); });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ReplicatedMergeTreePartCheckThread::stop()
|
void ReplicatedMergeTreePartCheckThread::stop()
|
||||||
{
|
{
|
||||||
|
//based on discussion on https://github.com/yandex/ClickHouse/pull/1489#issuecomment-344756259
|
||||||
|
//using the schedule pool there is no problem in case stop is called two time in row and the start multiple times
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(start_stop_mutex);
|
std::lock_guard<std::mutex> lock(start_stop_mutex);
|
||||||
|
|
||||||
need_stop = true;
|
need_stop = true;
|
||||||
if (thread.joinable())
|
task_handle->deactivate();
|
||||||
{
|
|
||||||
wakeup_event.set();
|
|
||||||
thread.join();
|
|
||||||
need_stop = false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t delay_to_check_seconds)
|
void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t delay_to_check_seconds)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(parts_mutex);
|
std::lock_guard<std::mutex> lock(parts_mutex);
|
||||||
@ -58,7 +58,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t
|
|||||||
|
|
||||||
parts_queue.emplace_back(name, time(0) + delay_to_check_seconds);
|
parts_queue.emplace_back(name, time(0) + delay_to_check_seconds);
|
||||||
parts_set.insert(name);
|
parts_set.insert(name);
|
||||||
wakeup_event.set();
|
task_handle->schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -303,86 +303,74 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
|
|||||||
|
|
||||||
void ReplicatedMergeTreePartCheckThread::run()
|
void ReplicatedMergeTreePartCheckThread::run()
|
||||||
{
|
{
|
||||||
setThreadName("ReplMTPartCheck");
|
if (need_stop)
|
||||||
|
return;
|
||||||
|
|
||||||
while (!need_stop)
|
try
|
||||||
{
|
{
|
||||||
try
|
time_t current_time = time(nullptr);
|
||||||
|
|
||||||
|
/// Take part from the queue for verification.
|
||||||
|
PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated
|
||||||
|
time_t min_check_time = std::numeric_limits<time_t>::max();
|
||||||
|
|
||||||
{
|
{
|
||||||
time_t current_time = time(nullptr);
|
std::lock_guard<std::mutex> lock(parts_mutex);
|
||||||
|
|
||||||
/// Take part from the queue for verification.
|
|
||||||
PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated
|
|
||||||
time_t min_check_time = std::numeric_limits<time_t>::max();
|
|
||||||
|
|
||||||
|
if (parts_queue.empty())
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(parts_mutex);
|
if (!parts_set.empty())
|
||||||
|
|
||||||
if (parts_queue.empty())
|
|
||||||
{
|
{
|
||||||
if (!parts_set.empty())
|
LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug.");
|
||||||
{
|
parts_set.clear();
|
||||||
LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug.");
|
|
||||||
parts_set.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
for (auto it = parts_queue.begin(); it != parts_queue.end(); ++it)
|
|
||||||
{
|
|
||||||
if (it->second <= current_time)
|
|
||||||
{
|
|
||||||
selected = it;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (it->second < min_check_time)
|
|
||||||
min_check_time = it->second;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
if (selected == parts_queue.end())
|
|
||||||
{
|
{
|
||||||
/// Poco::Event is triggered immediately if `signal` was before the `wait` call.
|
for (auto it = parts_queue.begin(); it != parts_queue.end(); ++it)
|
||||||
/// We can wait a little more than we need due to the use of the old `current_time`.
|
|
||||||
|
|
||||||
if (min_check_time != std::numeric_limits<time_t>::max() && min_check_time > current_time)
|
|
||||||
wakeup_event.tryWait(1000 * (min_check_time - current_time));
|
|
||||||
else
|
|
||||||
wakeup_event.wait();
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
checkPart(selected->first);
|
|
||||||
|
|
||||||
if (need_stop)
|
|
||||||
break;
|
|
||||||
|
|
||||||
/// Remove the part from check queue.
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(parts_mutex);
|
|
||||||
|
|
||||||
if (parts_queue.empty())
|
|
||||||
{
|
{
|
||||||
LOG_ERROR(log, "Someone erased cheking part from parts_queue. This is a bug.");
|
if (it->second <= current_time)
|
||||||
}
|
{
|
||||||
else
|
selected = it;
|
||||||
{
|
break;
|
||||||
parts_set.erase(selected->first);
|
}
|
||||||
parts_queue.erase(selected);
|
|
||||||
|
if (it->second < min_check_time)
|
||||||
|
min_check_time = it->second;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (...)
|
|
||||||
|
if (selected == parts_queue.end())
|
||||||
|
return;
|
||||||
|
|
||||||
|
checkPart(selected->first);
|
||||||
|
|
||||||
|
if (need_stop)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/// Remove the part from check queue.
|
||||||
{
|
{
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
std::lock_guard<std::mutex> lock(parts_mutex);
|
||||||
wakeup_event.tryWait(PART_CHECK_ERROR_SLEEP_MS);
|
|
||||||
|
if (parts_queue.empty())
|
||||||
|
{
|
||||||
|
LOG_ERROR(log, "Someone erased cheking part from parts_queue. This is a bug.");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
parts_set.erase(selected->first);
|
||||||
|
parts_queue.erase(selected);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
task_handle->schedule();
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
task_handle->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Part check thread finished");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@
|
|||||||
#include <Poco/Event.h>
|
#include <Poco/Event.h>
|
||||||
#include <Core/Types.h>
|
#include <Core/Types.h>
|
||||||
#include <common/logger_useful.h>
|
#include <common/logger_useful.h>
|
||||||
|
#include <Common/BackgroundSchedulePool.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -29,6 +29,7 @@ class ReplicatedMergeTreePartCheckThread
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ReplicatedMergeTreePartCheckThread(StorageReplicatedMergeTree & storage_);
|
ReplicatedMergeTreePartCheckThread(StorageReplicatedMergeTree & storage_);
|
||||||
|
~ReplicatedMergeTreePartCheckThread();
|
||||||
|
|
||||||
/// Processing of the queue to be checked is done in the background thread, which you must first start.
|
/// Processing of the queue to be checked is done in the background thread, which you must first start.
|
||||||
void start();
|
void start();
|
||||||
@ -65,10 +66,7 @@ public:
|
|||||||
/// Get the number of parts in the queue for check.
|
/// Get the number of parts in the queue for check.
|
||||||
size_t size() const;
|
size_t size() const;
|
||||||
|
|
||||||
~ReplicatedMergeTreePartCheckThread()
|
|
||||||
{
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void run();
|
void run();
|
||||||
@ -91,11 +89,10 @@ private:
|
|||||||
mutable std::mutex parts_mutex;
|
mutable std::mutex parts_mutex;
|
||||||
StringSet parts_set;
|
StringSet parts_set;
|
||||||
PartsToCheckQueue parts_queue;
|
PartsToCheckQueue parts_queue;
|
||||||
Poco::Event wakeup_event;
|
|
||||||
|
|
||||||
std::mutex start_stop_mutex;
|
std::mutex start_stop_mutex;
|
||||||
std::atomic<bool> need_stop { false };
|
std::atomic<bool> need_stop { false };
|
||||||
std::thread thread;
|
BackgroundSchedulePool::TaskHandle task_handle;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -229,7 +229,7 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event)
|
bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle next_update_event)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(pull_logs_to_queue_mutex);
|
std::lock_guard<std::mutex> lock(pull_logs_to_queue_mutex);
|
||||||
|
|
||||||
@ -358,7 +358,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
|
|||||||
if (next_update_event)
|
if (next_update_event)
|
||||||
{
|
{
|
||||||
if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event))
|
if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event))
|
||||||
next_update_event->set();
|
next_update_event->schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
return !log_entries.empty();
|
return !log_entries.empty();
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
#include <Storages/MergeTree/MergeTreeData.h>
|
#include <Storages/MergeTree/MergeTreeData.h>
|
||||||
|
|
||||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||||
|
#include <Common/BackgroundSchedulePool.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -147,7 +148,7 @@ public:
|
|||||||
* If next_update_event != nullptr, will call this event when new entries appear in the log.
|
* If next_update_event != nullptr, will call this event when new entries appear in the log.
|
||||||
* Returns true if new entries have been.
|
* Returns true if new entries have been.
|
||||||
*/
|
*/
|
||||||
bool pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event);
|
bool pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle next_update_event);
|
||||||
|
|
||||||
/** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
|
/** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
|
||||||
* And also wait for the completion of their execution, if they are now being executed.
|
* And also wait for the completion of their execution, if they are now being executed.
|
||||||
|
@ -28,6 +28,10 @@ namespace ErrorCodes
|
|||||||
extern const int REPLICA_IS_ALREADY_ACTIVE;
|
extern const int REPLICA_IS_ALREADY_ACTIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
constexpr auto retry_period_ms = 10 * 1000;
|
||||||
|
}
|
||||||
|
|
||||||
/// Used to check whether it's us who set node `is_active`, or not.
|
/// Used to check whether it's us who set node `is_active`, or not.
|
||||||
static String generateActiveNodeIdentifier()
|
static String generateActiveNodeIdentifier()
|
||||||
@ -35,129 +39,131 @@ static String generateActiveNodeIdentifier()
|
|||||||
return "pid: " + toString(getpid()) + ", random: " + toString(randomSeed());
|
return "pid: " + toString(getpid()) + ", random: " + toString(randomSeed());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_)
|
ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_)
|
||||||
: storage(storage_),
|
: storage(storage_),
|
||||||
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, RestartingThread)")),
|
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, RestartingThread)")),
|
||||||
active_node_identifier(generateActiveNodeIdentifier()),
|
active_node_identifier(generateActiveNodeIdentifier())
|
||||||
thread([this] { run(); })
|
|
||||||
{
|
{
|
||||||
}
|
check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000;
|
||||||
|
|
||||||
|
|
||||||
void ReplicatedMergeTreeRestartingThread::run()
|
|
||||||
{
|
|
||||||
constexpr auto retry_period_ms = 10 * 1000;
|
|
||||||
|
|
||||||
/// The frequency of checking expiration of session in ZK.
|
|
||||||
Int64 check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000;
|
|
||||||
|
|
||||||
/// Periodicity of checking lag of replica.
|
/// Periodicity of checking lag of replica.
|
||||||
if (check_period_ms > static_cast<Int64>(storage.data.settings.check_delay_period) * 1000)
|
if (check_period_ms > static_cast<Int64>(storage.data.settings.check_delay_period) * 1000)
|
||||||
check_period_ms = storage.data.settings.check_delay_period * 1000;
|
check_period_ms = storage.data.settings.check_delay_period * 1000;
|
||||||
|
|
||||||
setThreadName("ReplMTRestart");
|
storage.queue_updating_task_handle = storage.context.getSchedulePool().addTask("queue_updating_task_handle", [this]{ storage.queueUpdatingThread(); });
|
||||||
|
storage.queue_updating_task_handle->deactivate();
|
||||||
|
|
||||||
bool first_time = true; /// Activate replica for the first time.
|
task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreeRestartingThread", [this]{ run(); });
|
||||||
time_t prev_time_of_check_delay = 0;
|
task_handle->schedule();
|
||||||
|
}
|
||||||
|
|
||||||
/// Starts the replica when the server starts/creates a table. Restart the replica when session expires with ZK.
|
ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread()
|
||||||
while (!need_stop)
|
{
|
||||||
|
storage.context.getSchedulePool().removeTask(task_handle);
|
||||||
|
completeShutdown();
|
||||||
|
storage.context.getSchedulePool().removeTask(storage.queue_updating_task_handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReplicatedMergeTreeRestartingThread::run()
|
||||||
|
{
|
||||||
|
if (need_stop)
|
||||||
|
return;
|
||||||
|
|
||||||
|
try
|
||||||
{
|
{
|
||||||
try
|
if (first_time || storage.getZooKeeper()->expired())
|
||||||
{
|
{
|
||||||
if (first_time || storage.getZooKeeper()->expired())
|
startup_completed = false;
|
||||||
|
|
||||||
|
if (first_time)
|
||||||
{
|
{
|
||||||
if (first_time)
|
LOG_DEBUG(log, "Activating replica.");
|
||||||
{
|
}
|
||||||
LOG_DEBUG(log, "Activating replica.");
|
else
|
||||||
}
|
{
|
||||||
else
|
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
|
||||||
{
|
|
||||||
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
|
|
||||||
|
|
||||||
if (!storage.is_readonly)
|
if (!storage.is_readonly)
|
||||||
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
|
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
|
||||||
storage.is_readonly = true;
|
storage.is_readonly = true;
|
||||||
partialShutdown();
|
partialShutdown();
|
||||||
}
|
|
||||||
|
|
||||||
while (!need_stop)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
storage.setZooKeeper(storage.context.getZooKeeper());
|
|
||||||
}
|
|
||||||
catch (const zkutil::KeeperException & e)
|
|
||||||
{
|
|
||||||
/// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again.
|
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
|
|
||||||
wakeup_event.tryWait(retry_period_ms);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!need_stop && !tryStartup())
|
|
||||||
{
|
|
||||||
wakeup_event.tryWait(retry_period_ms);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (need_stop)
|
|
||||||
break;
|
|
||||||
|
|
||||||
if (storage.is_readonly)
|
|
||||||
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
|
|
||||||
storage.is_readonly = false;
|
|
||||||
first_time = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
time_t current_time = time(nullptr);
|
if (!startup_completed)
|
||||||
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage.data.settings.check_delay_period))
|
|
||||||
{
|
{
|
||||||
/// Find out lag of replicas.
|
try
|
||||||
time_t absolute_delay = 0;
|
|
||||||
time_t relative_delay = 0;
|
|
||||||
|
|
||||||
storage.getReplicaDelays(absolute_delay, relative_delay);
|
|
||||||
|
|
||||||
if (absolute_delay)
|
|
||||||
LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".");
|
|
||||||
|
|
||||||
prev_time_of_check_delay = current_time;
|
|
||||||
|
|
||||||
/// We give up leadership if the relative gap is greater than threshold.
|
|
||||||
if (storage.is_leader_node
|
|
||||||
&& relative_delay > static_cast<time_t>(storage.data.settings.min_relative_delay_to_yield_leadership))
|
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold ("
|
storage.setZooKeeper(storage.context.getZooKeeper());
|
||||||
<< storage.data.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership.");
|
}
|
||||||
|
catch (const zkutil::KeeperException & e)
|
||||||
|
{
|
||||||
|
/// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again.
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
task_handle->scheduleAfter(retry_period_ms);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
|
if (!need_stop && !tryStartup())
|
||||||
|
{
|
||||||
|
task_handle->scheduleAfter(retry_period_ms);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (storage.is_leader_node)
|
startup_completed = true;
|
||||||
{
|
}
|
||||||
storage.is_leader_node = false;
|
|
||||||
CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
|
|
||||||
if (storage.merge_selecting_thread.joinable())
|
|
||||||
storage.merge_selecting_thread.join();
|
|
||||||
|
|
||||||
storage.leader_election->yield();
|
if (need_stop)
|
||||||
}
|
return;
|
||||||
|
|
||||||
|
if (storage.is_readonly)
|
||||||
|
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
|
||||||
|
storage.is_readonly = false;
|
||||||
|
first_time = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
time_t current_time = time(nullptr);
|
||||||
|
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage.data.settings.check_delay_period))
|
||||||
|
{
|
||||||
|
/// Find out lag of replicas.
|
||||||
|
time_t absolute_delay = 0;
|
||||||
|
time_t relative_delay = 0;
|
||||||
|
|
||||||
|
storage.getReplicaDelays(absolute_delay, relative_delay);
|
||||||
|
|
||||||
|
if (absolute_delay)
|
||||||
|
LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".");
|
||||||
|
|
||||||
|
prev_time_of_check_delay = current_time;
|
||||||
|
|
||||||
|
/// We give up leadership if the relative gap is greater than threshold.
|
||||||
|
if (storage.is_leader_node
|
||||||
|
&& relative_delay > static_cast<time_t>(storage.data.settings.min_relative_delay_to_yield_leadership))
|
||||||
|
{
|
||||||
|
LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold ("
|
||||||
|
<< storage.data.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership.");
|
||||||
|
|
||||||
|
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
|
||||||
|
|
||||||
|
if (storage.is_leader_node)
|
||||||
|
{
|
||||||
|
storage.is_leader_node = false;
|
||||||
|
CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
|
||||||
|
storage.merge_selecting_handle->deactivate();
|
||||||
|
storage.leader_election->yield();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (...)
|
}
|
||||||
{
|
catch (...)
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
{
|
||||||
}
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
wakeup_event.tryWait(check_period_ms);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
task_handle->scheduleAfter(check_period_ms);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReplicatedMergeTreeRestartingThread::completeShutdown()
|
||||||
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
storage.data_parts_exchange_endpoint_holder->cancelForever();
|
storage.data_parts_exchange_endpoint_holder->cancelForever();
|
||||||
@ -175,11 +181,8 @@ void ReplicatedMergeTreeRestartingThread::run()
|
|||||||
{
|
{
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Restarting thread finished");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
@ -190,6 +193,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
|||||||
|
|
||||||
if (storage.data.settings.replicated_can_become_leader)
|
if (storage.data.settings.replicated_can_become_leader)
|
||||||
storage.leader_election = std::make_shared<zkutil::LeaderElection>(
|
storage.leader_election = std::make_shared<zkutil::LeaderElection>(
|
||||||
|
storage.context.getSchedulePool(),
|
||||||
storage.zookeeper_path + "/leader_election",
|
storage.zookeeper_path + "/leader_election",
|
||||||
*storage.current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election,
|
*storage.current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election,
|
||||||
/// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method.
|
/// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method.
|
||||||
@ -201,8 +205,9 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
|||||||
|
|
||||||
storage.shutdown_called = false;
|
storage.shutdown_called = false;
|
||||||
storage.shutdown_event.reset();
|
storage.shutdown_event.reset();
|
||||||
|
storage.queue_updating_task_handle->activate();
|
||||||
|
storage.queue_updating_task_handle->schedule();
|
||||||
|
|
||||||
storage.queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, &storage);
|
|
||||||
storage.part_check_thread.start();
|
storage.part_check_thread.start();
|
||||||
storage.alter_thread = std::make_unique<ReplicatedMergeTreeAlterThread>(storage);
|
storage.alter_thread = std::make_unique<ReplicatedMergeTreeAlterThread>(storage);
|
||||||
storage.cleanup_thread = std::make_unique<ReplicatedMergeTreeCleanupThread>(storage);
|
storage.cleanup_thread = std::make_unique<ReplicatedMergeTreeCleanupThread>(storage);
|
||||||
@ -352,10 +357,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
|
|||||||
|
|
||||||
storage.shutdown_called = true;
|
storage.shutdown_called = true;
|
||||||
storage.shutdown_event.set();
|
storage.shutdown_event.set();
|
||||||
storage.merge_selecting_event.set();
|
|
||||||
storage.queue_updating_event->set();
|
|
||||||
storage.alter_query_event->set();
|
storage.alter_query_event->set();
|
||||||
storage.cleanup_thread_event.set();
|
|
||||||
storage.replica_is_active_node = nullptr;
|
storage.replica_is_active_node = nullptr;
|
||||||
|
|
||||||
LOG_TRACE(log, "Waiting for threads to finish");
|
LOG_TRACE(log, "Waiting for threads to finish");
|
||||||
@ -366,12 +368,11 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
|
|||||||
{
|
{
|
||||||
storage.is_leader_node = false;
|
storage.is_leader_node = false;
|
||||||
CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
|
CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
|
||||||
if (storage.merge_selecting_thread.joinable())
|
storage.merge_selecting_handle->deactivate();
|
||||||
storage.merge_selecting_thread.join();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (storage.queue_updating_thread.joinable())
|
|
||||||
storage.queue_updating_thread.join();
|
storage.queue_updating_task_handle->deactivate();
|
||||||
|
|
||||||
storage.cleanup_thread.reset();
|
storage.cleanup_thread.reset();
|
||||||
storage.alter_thread.reset();
|
storage.alter_thread.reset();
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
#include <Poco/Event.h>
|
#include <Poco/Event.h>
|
||||||
#include <common/logger_useful.h>
|
#include <common/logger_useful.h>
|
||||||
|
#include <Common/BackgroundSchedulePool.h>
|
||||||
#include <Core/Types.h>
|
#include <Core/Types.h>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
@ -22,16 +23,12 @@ class ReplicatedMergeTreeRestartingThread
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_);
|
ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_);
|
||||||
|
~ReplicatedMergeTreeRestartingThread();
|
||||||
~ReplicatedMergeTreeRestartingThread()
|
|
||||||
{
|
|
||||||
if (thread.joinable())
|
|
||||||
thread.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
void wakeup()
|
void wakeup()
|
||||||
{
|
{
|
||||||
wakeup_event.set();
|
wakeup_event.set();
|
||||||
|
task_handle->schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
Poco::Event & getWakeupEvent()
|
Poco::Event & getWakeupEvent()
|
||||||
@ -42,7 +39,7 @@ public:
|
|||||||
void stop()
|
void stop()
|
||||||
{
|
{
|
||||||
need_stop = true;
|
need_stop = true;
|
||||||
wakeup();
|
wakeup_event.set();
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -54,9 +51,14 @@ private:
|
|||||||
/// The random data we wrote into `/replicas/me/is_active`.
|
/// The random data we wrote into `/replicas/me/is_active`.
|
||||||
String active_node_identifier;
|
String active_node_identifier;
|
||||||
|
|
||||||
std::thread thread;
|
BackgroundSchedulePool::TaskHandle task_handle;
|
||||||
|
Int64 check_period_ms; /// The frequency of checking expiration of session in ZK.
|
||||||
|
bool first_time = true; /// Activate replica for the first time.
|
||||||
|
time_t prev_time_of_check_delay = 0;
|
||||||
|
bool startup_completed = false;
|
||||||
|
|
||||||
void run();
|
void run();
|
||||||
|
void completeShutdown();
|
||||||
|
|
||||||
/// Start or stop background threads. Used for partial reinitialization when re-creating a session in ZooKeeper.
|
/// Start or stop background threads. Used for partial reinitialization when re-creating a session in ZooKeeper.
|
||||||
bool tryStartup(); /// Returns false if ZooKeeper is not available.
|
bool tryStartup(); /// Returns false if ZooKeeper is not available.
|
||||||
|
@ -200,7 +200,7 @@ StorageKafka::StorageKafka(
|
|||||||
const NamesAndTypesList & materialized_columns_,
|
const NamesAndTypesList & materialized_columns_,
|
||||||
const NamesAndTypesList & alias_columns_,
|
const NamesAndTypesList & alias_columns_,
|
||||||
const ColumnDefaults & column_defaults_,
|
const ColumnDefaults & column_defaults_,
|
||||||
const String & brokers_, const String & group_, const Names & topics_,
|
const String & brokers_, const String & group_, const Names & topics_,
|
||||||
const String & format_name_, const String & schema_name_)
|
const String & format_name_, const String & schema_name_)
|
||||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
||||||
table_name(table_name_), database_name(database_name_), context(context_),
|
table_name(table_name_), database_name(database_name_), context(context_),
|
||||||
@ -243,7 +243,7 @@ BlockInputStreams StorageKafka::read(
|
|||||||
processed_stage = QueryProcessingStage::FetchColumns;
|
processed_stage = QueryProcessingStage::FetchColumns;
|
||||||
|
|
||||||
if (!conf)
|
if (!conf)
|
||||||
return BlockInputStreams();
|
return BlockInputStreams();
|
||||||
|
|
||||||
BlockInputStreams streams;
|
BlockInputStreams streams;
|
||||||
streams.reserve(num_streams);
|
streams.reserve(num_streams);
|
||||||
|
@ -193,6 +193,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
|||||||
shutdown_event(false), part_check_thread(*this),
|
shutdown_event(false), part_check_thread(*this),
|
||||||
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
|
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
|
||||||
{
|
{
|
||||||
|
initMergeSelectSession();
|
||||||
|
merge_selecting_handle = context_.getSchedulePool().addTask("StorageReplicatedMergeTree", [this] { mergeSelectingThread(); });
|
||||||
|
|
||||||
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
|
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
|
||||||
zookeeper_path.resize(zookeeper_path.size() - 1);
|
zookeeper_path.resize(zookeeper_path.size() - 1);
|
||||||
/// If zookeeper chroot prefix is used, path should starts with '/', because chroot concatenates without it.
|
/// If zookeeper chroot prefix is used, path should starts with '/', because chroot concatenates without it.
|
||||||
@ -960,7 +963,7 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event)
|
void StorageReplicatedMergeTree::pullLogsToQueue(BackgroundSchedulePool::TaskHandle next_update_event)
|
||||||
{
|
{
|
||||||
if (queue.pullLogsToQueue(getZooKeeper(), next_update_event))
|
if (queue.pullLogsToQueue(getZooKeeper(), next_update_event))
|
||||||
{
|
{
|
||||||
@ -1182,7 +1185,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
|
|||||||
* This is not a problem, because in this case the merge will remain in the queue, and we will try again.
|
* This is not a problem, because in this case the merge will remain in the queue, and we will try again.
|
||||||
*/
|
*/
|
||||||
transaction.commit();
|
transaction.commit();
|
||||||
merge_selecting_event.set();
|
merge_selecting_handle->schedule();
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
|
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
|
||||||
}
|
}
|
||||||
@ -1504,39 +1507,34 @@ void StorageReplicatedMergeTree::executeClearColumnInPartition(const LogEntry &
|
|||||||
|
|
||||||
void StorageReplicatedMergeTree::queueUpdatingThread()
|
void StorageReplicatedMergeTree::queueUpdatingThread()
|
||||||
{
|
{
|
||||||
setThreadName("ReplMTQueueUpd");
|
//most probably this check is not relevant
|
||||||
|
if (shutdown_called)
|
||||||
|
return;
|
||||||
|
|
||||||
bool update_in_progress = false;
|
if (!queue_update_in_progress)
|
||||||
while (!shutdown_called)
|
|
||||||
{
|
{
|
||||||
if (!update_in_progress)
|
last_queue_update_start_time.store(time(nullptr));
|
||||||
{
|
queue_update_in_progress = true;
|
||||||
last_queue_update_start_time.store(time(nullptr));
|
|
||||||
update_in_progress = true;
|
|
||||||
}
|
|
||||||
try
|
|
||||||
{
|
|
||||||
pullLogsToQueue(queue_updating_event);
|
|
||||||
last_queue_update_finish_time.store(time(nullptr));
|
|
||||||
update_in_progress = false;
|
|
||||||
queue_updating_event->wait();
|
|
||||||
}
|
|
||||||
catch (const zkutil::KeeperException & e)
|
|
||||||
{
|
|
||||||
if (e.code == ZINVALIDSTATE)
|
|
||||||
restarting_thread->wakeup();
|
|
||||||
|
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
queue_updating_event->tryWait(QUEUE_UPDATE_ERROR_SLEEP_MS);
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
queue_updating_event->tryWait(QUEUE_UPDATE_ERROR_SLEEP_MS);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
try
|
||||||
|
{
|
||||||
|
pullLogsToQueue(queue_updating_task_handle);
|
||||||
|
last_queue_update_finish_time.store(time(nullptr));
|
||||||
|
queue_update_in_progress = false;
|
||||||
|
}
|
||||||
|
catch (const zkutil::KeeperException & e)
|
||||||
|
{
|
||||||
|
if (e.code == ZINVALIDSTATE)
|
||||||
|
restarting_thread->wakeup();
|
||||||
|
|
||||||
LOG_DEBUG(log, "Queue updating thread finished");
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
queue_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
queue_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1656,7 +1654,7 @@ namespace
|
|||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** It can take a long time to determine whether it is possible to merge two adjacent parts.
|
/** It can take a long time to determine whether it is possible to merge two adjacent parts.
|
||||||
* Two adjacent parts can be merged if all block numbers between their numbers are not used (abandoned).
|
* Two adjacent parts can be merged if all block numbers between their numbers are not used (abandoned).
|
||||||
@ -1738,104 +1736,103 @@ namespace
|
|||||||
template <typename Key> constexpr CachedMergingPredicate<Key>::clock::duration CachedMergingPredicate<Key>::Expiration::min_delay;
|
template <typename Key> constexpr CachedMergingPredicate<Key>::clock::duration CachedMergingPredicate<Key>::Expiration::min_delay;
|
||||||
template <typename Key> constexpr CachedMergingPredicate<Key>::clock::duration CachedMergingPredicate<Key>::Expiration::max_delay;
|
template <typename Key> constexpr CachedMergingPredicate<Key>::clock::duration CachedMergingPredicate<Key>::Expiration::max_delay;
|
||||||
template <typename Key> constexpr double CachedMergingPredicate<Key>::Expiration::exponent_base;
|
template <typename Key> constexpr double CachedMergingPredicate<Key>::Expiration::exponent_base;
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void StorageReplicatedMergeTree::mergeSelectingThread()
|
void StorageReplicatedMergeTree::initMergeSelectSession()
|
||||||
{
|
{
|
||||||
setThreadName("ReplMTMergeSel");
|
merge_sel_deduplicate = false; /// TODO: read deduplicate option from table config
|
||||||
LOG_DEBUG(log, "Merge selecting thread started");
|
merge_sel_need_pull = true;
|
||||||
|
|
||||||
bool deduplicate = false; /// TODO: read deduplicate option from table config
|
merge_sel_uncached_merging_predicate = [this](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
|
||||||
bool need_pull = true;
|
|
||||||
|
|
||||||
auto uncached_merging_predicate = [this](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
|
|
||||||
{
|
{
|
||||||
return canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data);
|
return canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data);
|
||||||
};
|
};
|
||||||
|
|
||||||
auto merging_predicate_args_to_key = [](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
|
merge_sel_merging_predicate_args_to_key = [](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
|
||||||
{
|
{
|
||||||
return std::make_pair(left->name, right->name);
|
return std::make_pair(left->name, right->name);
|
||||||
};
|
};
|
||||||
|
|
||||||
CachedMergingPredicate<std::pair<std::string, std::string>> cached_merging_predicate;
|
merge_sel_cached_merging_predicate.reset(new CachedMergingPredicate<std::pair<std::string, std::string>>());
|
||||||
|
|
||||||
/// Will be updated below.
|
/// Will be updated below.
|
||||||
std::chrono::steady_clock::time_point now;
|
merge_sel_now = std::chrono::steady_clock::time_point();
|
||||||
|
|
||||||
auto can_merge = [&]
|
merge_sel_can_merge = [&]
|
||||||
(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
|
(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
|
||||||
{
|
{
|
||||||
/// If any of the parts is already going to be merge into a larger one, do not agree to merge it.
|
/// If any of the parts is already going to be merge into a larger one, do not agree to merge it.
|
||||||
if (queue.partWillBeMergedOrMergesDisabled(left->name)
|
if (queue.partWillBeMergedOrMergesDisabled(left->name)
|
||||||
|| (left.get() != right.get() && queue.partWillBeMergedOrMergesDisabled(right->name)))
|
|| (left.get() != right.get() && queue.partWillBeMergedOrMergesDisabled(right->name)))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
return cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
|
return merge_sel_cached_merging_predicate->get(merge_sel_now, merge_sel_uncached_merging_predicate, merge_sel_merging_predicate_args_to_key, left, right);
|
||||||
};
|
};
|
||||||
|
}
|
||||||
|
|
||||||
while (!shutdown_called && is_leader_node)
|
void StorageReplicatedMergeTree::mergeSelectingThread()
|
||||||
|
{
|
||||||
|
if (shutdown_called || !is_leader_node)
|
||||||
|
return;
|
||||||
|
|
||||||
|
bool success = false;
|
||||||
|
|
||||||
|
try
|
||||||
{
|
{
|
||||||
bool success = false;
|
if (merge_sel_need_pull)
|
||||||
|
|
||||||
try
|
|
||||||
{
|
{
|
||||||
if (need_pull)
|
/// You need to load new entries into the queue before you select parts to merge.
|
||||||
{
|
/// (so we know which parts are already going to be merged).
|
||||||
/// You need to load new entries into the queue before you select parts to merge.
|
pullLogsToQueue();
|
||||||
/// (so we know which parts are already going to be merged).
|
merge_sel_need_pull = false;
|
||||||
pullLogsToQueue();
|
|
||||||
need_pull = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
|
|
||||||
|
|
||||||
/** If many merges is already queued, then will queue only small enough merges.
|
|
||||||
* Otherwise merge queue could be filled with only large merges,
|
|
||||||
* and in the same time, many small parts could be created and won't be merged.
|
|
||||||
*/
|
|
||||||
size_t merges_queued = queue.countMerges();
|
|
||||||
|
|
||||||
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
|
|
||||||
{
|
|
||||||
LOG_TRACE(log, "Number of queued merges (" << merges_queued
|
|
||||||
<< ") is greater than max_replicated_merges_in_queue ("
|
|
||||||
<< data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge.");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MergeTreeDataMerger::FuturePart future_merged_part;
|
|
||||||
|
|
||||||
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued);
|
|
||||||
|
|
||||||
now = std::chrono::steady_clock::now();
|
|
||||||
|
|
||||||
if (max_parts_size_for_merge > 0
|
|
||||||
&& merger.selectPartsToMerge(
|
|
||||||
future_merged_part, false,
|
|
||||||
max_parts_size_for_merge,
|
|
||||||
can_merge)
|
|
||||||
&& createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate))
|
|
||||||
{
|
|
||||||
success = true;
|
|
||||||
need_pull = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shutdown_called || !is_leader_node)
|
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
|
||||||
break;
|
|
||||||
|
|
||||||
if (!success)
|
/** If many merges is already queued, then will queue only small enough merges.
|
||||||
merge_selecting_event.tryWait(MERGE_SELECTING_SLEEP_MS);
|
* Otherwise merge queue could be filled with only large merges,
|
||||||
|
* and in the same time, many small parts could be created and won't be merged.
|
||||||
|
*/
|
||||||
|
size_t merges_queued = queue.countMerges();
|
||||||
|
|
||||||
|
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
|
||||||
|
{
|
||||||
|
LOG_TRACE(log, "Number of queued merges (" << merges_queued
|
||||||
|
<< ") is greater than max_replicated_merges_in_queue ("
|
||||||
|
<< data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge.");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MergeTreeDataMerger::FuturePart future_merged_part;
|
||||||
|
|
||||||
|
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued);
|
||||||
|
|
||||||
|
merge_sel_now = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
|
if (max_parts_size_for_merge > 0
|
||||||
|
&& merger.selectPartsToMerge(
|
||||||
|
future_merged_part, false,
|
||||||
|
max_parts_size_for_merge,
|
||||||
|
merge_sel_can_merge)
|
||||||
|
&& createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, merge_sel_deduplicate))
|
||||||
|
{
|
||||||
|
success = true;
|
||||||
|
merge_sel_need_pull = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Merge selecting thread finished");
|
if (shutdown_called || !is_leader_node)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (!success)
|
||||||
|
merge_selecting_handle->scheduleAfter(MERGE_SELECTING_SLEEP_MS);
|
||||||
|
else
|
||||||
|
merge_selecting_handle->schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1940,16 +1937,12 @@ void StorageReplicatedMergeTree::becomeLeader()
|
|||||||
if (shutdown_called)
|
if (shutdown_called)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (merge_selecting_thread.joinable())
|
|
||||||
{
|
|
||||||
LOG_INFO(log, "Deleting old leader");
|
|
||||||
is_leader_node = false; /// exit trigger inside thread
|
|
||||||
merge_selecting_thread.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_INFO(log, "Became leader");
|
LOG_INFO(log, "Became leader");
|
||||||
|
is_leader_node = false;
|
||||||
|
merge_selecting_handle->activate();
|
||||||
|
initMergeSelectSession();
|
||||||
is_leader_node = true;
|
is_leader_node = true;
|
||||||
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
|
merge_selecting_handle->schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -2122,7 +2115,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
|
|||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch");
|
LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch");
|
||||||
/// Force premature parts cleanup
|
/// Force premature parts cleanup
|
||||||
cleanup_thread_event.set();
|
cleanup_thread->schedule();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2209,7 +2202,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
|
|||||||
if (quorum)
|
if (quorum)
|
||||||
updateQuorum(part_name);
|
updateQuorum(part_name);
|
||||||
|
|
||||||
merge_selecting_event.set();
|
merge_selecting_handle->schedule();
|
||||||
|
|
||||||
for (const auto & removed_part : removed_parts)
|
for (const auto & removed_part : removed_parts)
|
||||||
{
|
{
|
||||||
@ -2281,6 +2274,8 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
|
|||||||
{
|
{
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
context.getSchedulePool().removeTask(merge_selecting_handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -21,11 +21,14 @@
|
|||||||
#include <Common/randomSeed.h>
|
#include <Common/randomSeed.h>
|
||||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||||
#include <Common/ZooKeeper/LeaderElection.h>
|
#include <Common/ZooKeeper/LeaderElection.h>
|
||||||
|
#include <Common/BackgroundSchedulePool.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
template <typename Key> struct CachedMergingPredicate;
|
||||||
|
|
||||||
/** The engine that uses the merge tree (see MergeTreeData) and replicated through ZooKeeper.
|
/** The engine that uses the merge tree (see MergeTreeData) and replicated through ZooKeeper.
|
||||||
*
|
*
|
||||||
* ZooKeeper is used for the following things:
|
* ZooKeeper is used for the following things:
|
||||||
@ -267,22 +270,27 @@ private:
|
|||||||
|
|
||||||
/// Threads.
|
/// Threads.
|
||||||
|
|
||||||
/// A thread that keeps track of the updates in the logs of all replicas and loads them into the queue.
|
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
|
||||||
std::thread queue_updating_thread;
|
bool queue_update_in_progress = false;
|
||||||
zkutil::EventPtr queue_updating_event = std::make_shared<Poco::Event>();
|
BackgroundSchedulePool::TaskHandle queue_updating_task_handle;
|
||||||
|
|
||||||
/// A task that performs actions from the queue.
|
/// A task that performs actions from the queue.
|
||||||
BackgroundProcessingPool::TaskHandle queue_task_handle;
|
BackgroundProcessingPool::TaskHandle queue_task_handle;
|
||||||
|
|
||||||
/// A thread that selects parts to merge.
|
/// A task that selects parts to merge.
|
||||||
std::thread merge_selecting_thread;
|
BackgroundSchedulePool::TaskHandle merge_selecting_handle;
|
||||||
Poco::Event merge_selecting_event;
|
bool merge_sel_deduplicate;
|
||||||
|
bool merge_sel_need_pull;
|
||||||
|
std::function<bool(const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> merge_sel_uncached_merging_predicate;
|
||||||
|
std::function<std::pair<String, String>(const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> merge_sel_merging_predicate_args_to_key;
|
||||||
|
std::chrono::steady_clock::time_point merge_sel_now;
|
||||||
|
std::unique_ptr<CachedMergingPredicate<std::pair<std::string, std::string>> > merge_sel_cached_merging_predicate;
|
||||||
|
std::function<bool(const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> merge_sel_can_merge;
|
||||||
|
|
||||||
std::mutex merge_selecting_mutex; /// It is taken for each iteration of the selection of parts to merge.
|
std::mutex merge_selecting_mutex; /// It is taken for each iteration of the selection of parts to merge.
|
||||||
|
|
||||||
/// A thread that removes old parts, log entries, and blocks.
|
/// A thread that removes old parts, log entries, and blocks.
|
||||||
std::unique_ptr<ReplicatedMergeTreeCleanupThread> cleanup_thread;
|
std::unique_ptr<ReplicatedMergeTreeCleanupThread> cleanup_thread;
|
||||||
/// Is used to wakeup cleanup_thread
|
|
||||||
Poco::Event cleanup_thread_event;
|
|
||||||
|
|
||||||
/// A thread that processes reconnection to ZooKeeper when the session expires.
|
/// A thread that processes reconnection to ZooKeeper when the session expires.
|
||||||
std::unique_ptr<ReplicatedMergeTreeRestartingThread> restarting_thread;
|
std::unique_ptr<ReplicatedMergeTreeRestartingThread> restarting_thread;
|
||||||
@ -302,6 +310,8 @@ private:
|
|||||||
|
|
||||||
/// Initialization.
|
/// Initialization.
|
||||||
|
|
||||||
|
void initMergeSelectSession();
|
||||||
|
|
||||||
/** Creates the minimum set of nodes in ZooKeeper.
|
/** Creates the minimum set of nodes in ZooKeeper.
|
||||||
*/
|
*/
|
||||||
void createTableIfNotExists();
|
void createTableIfNotExists();
|
||||||
@ -349,7 +359,7 @@ private:
|
|||||||
/** Copies the new entries from the logs of all replicas to the queue of this replica.
|
/** Copies the new entries from the logs of all replicas to the queue of this replica.
|
||||||
* If next_update_event != nullptr, calls this event when new entries appear in the log.
|
* If next_update_event != nullptr, calls this event when new entries appear in the log.
|
||||||
*/
|
*/
|
||||||
void pullLogsToQueue(zkutil::EventPtr next_update_event = nullptr);
|
void pullLogsToQueue(BackgroundSchedulePool::TaskHandle next_update_event = nullptr);
|
||||||
|
|
||||||
/** Execute the action from the queue. Throws an exception if something is wrong.
|
/** Execute the action from the queue. Throws an exception if something is wrong.
|
||||||
* Returns whether or not it succeeds. If it did not work, write it to the end of the queue.
|
* Returns whether or not it succeeds. If it did not work, write it to the end of the queue.
|
||||||
|
Loading…
Reference in New Issue
Block a user