Own PriorityQueue + prettifying the code

This commit is contained in:
Nikita Mikhaylov 2021-09-02 21:31:32 +00:00
parent 6624fa12ba
commit cc7c221fad
16 changed files with 167 additions and 101 deletions

View File

@ -0,0 +1,53 @@
#pragma once
#include <queue>
namespace DB
{
template <class T, class Comparator = std::less<T>>
class PriorityQueue
{
public:
T pop()
{
assert(!buffer.empty());
std::pop_heap(buffer.begin(), buffer.end(), comparator);
auto element = std::move(buffer.back());
buffer.pop_back();
return element;
}
void push(T element)
{
buffer.push_back(std::move(element));
std::push_heap(buffer.begin(), buffer.end(), comparator);
}
template< class... Args >
void emplace(Args &&... args)
{
buffer.emplace_back(std::forward<Args>(args)...);
std::push_heap(buffer.begin(), buffer.end(), comparator);
}
bool empty() { return buffer.empty(); }
size_t size() { return buffer.size(); }
void reserve(size_t count) { buffer.reserve(count); }
void resize(size_t count)
{
buffer.resize(count);
std::make_heap(buffer.begin(), buffer.end(), comparator);
}
private:
Comparator comparator;
std::vector<T> buffer;
};
}

View File

@ -74,6 +74,8 @@ void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
{
std::lock_guard lock(mutex);
queue_size = value;
/// Reserve memory to get rid of allocations
jobs.reserve(queue_size);
}
@ -246,17 +248,9 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
need_shutdown = shutdown;
if (!jobs.empty())
{
/// std::priority_queue does not provide interface for getting non-const reference to an element
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job.
job = std::move(const_cast<Job &>(jobs.top().job));
jobs.pop();
}
job = std::move(jobs.pop().job);
else
{
/// shutdown is true, simply finish the thread.
return;
}
return; /// shutdown is true, simply finish the thread.
}
if (!need_shutdown)

View File

@ -11,6 +11,7 @@
#include <Poco/Event.h>
#include <Common/ThreadStatus.h>
#include <Common/PriorityQueue.h>
#include <common/scope_guard.h>
/** Very simple thread pool similar to boost::threadpool.
@ -103,7 +104,7 @@ private:
}
};
std::priority_queue<JobWithPriority> jobs;
DB::PriorityQueue<JobWithPriority> jobs;
std::list<Thread> threads;
std::exception_ptr first_exception;

View File

@ -0,0 +1,25 @@
#include <gtest/gtest.h>
#include <random>
#include <Common/PriorityQueue.h>
using namespace DB;
TEST(PriorityQueue, Simple)
{
PriorityQueue<int> my;
std::priority_queue<int> original;
for (int i = 0; i < 1000; ++i)
{
my.push(i);
original.emplace(i);
}
for (int i = 0; i < 1000; ++i)
{
ASSERT_EQ(my.pop(), original.top());
original.pop();
}
}

View File

@ -41,7 +41,7 @@ TEST(RingBuffer, Random)
std::random_device device;
std::mt19937 generator(device());
std::uniform_int_distribution<> distribution(0, 1);
std::uniform_int_distribution<> distribution(0, 3);
RingBuffer<int> buffer(10);

View File

@ -111,6 +111,8 @@ void BackgroundJobAssignee::finish()
getContext()->getFetchesExecutor()->removeTasksCorrespondingToStorage(storage_id);
getContext()->getMergeMutateExecutor()->removeTasksCorrespondingToStorage(storage_id);
}
finished = true;
}
@ -139,7 +141,8 @@ catch (...) /// Catch any exception to avoid thread termination.
BackgroundJobAssignee::~BackgroundJobAssignee()
{
finish();
if (!finished)
throw Exception(ErrorCodes::LOGICAL_ERROR, "finish() method must be called before destructor")
}
}

View File

@ -1,6 +1,5 @@
#pragma once
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeMutateExecutor.h>
#include <Common/ThreadPool.h>
#include <Core/BackgroundSchedulePool.h>
@ -28,6 +27,7 @@ struct ExecutableTaskSchedulingSettings
double task_sleep_seconds_when_no_work_min = 10;
};
class MergeTreeData;
class BackgroundJobAssignee : protected WithContext
{
@ -48,6 +48,8 @@ private:
/// Mutex for thread safety
std::mutex holder_mutex;
bool finished{false};
public:
enum class Type
{

View File

@ -1,7 +1,9 @@
#pragma once
#include <memory>
#include <functional>
#include <common/shared_ptr_helper.h>
#include <Interpreters/StorageID.h>
namespace DB
@ -18,4 +20,31 @@ public:
using ExecutableTaskPtr = std::shared_ptr<ExecutableTask>;
class LambdaAdapter : public shared_ptr_helper<LambdaAdapter>, public ExecutableTask
{
public:
template <typename InnerJob, typename Callback>
explicit LambdaAdapter(InnerJob && inner_, Callback && callback_, StorageID id_)
: inner(inner_), callback(callback_), id(id_) {}
bool execute() override
{
res = inner();
return false;
}
void onCompleted() override { callback(!res); }
StorageID getStorageID() override { return id; }
private:
bool res = false;
std::function<bool()> inner;
std::function<void(bool)> callback;
StorageID id;
};
}

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeMutateExecutor.h>
#include <Common/setThreadName.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
@ -53,6 +54,10 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
void MergeTreeBackgroundExecutor::schedulerThreadFunction()
{
DENY_ALLOCATIONS_IN_SCOPE;
bool status;
while (true)
{
std::unique_lock lock(mutex);
@ -66,7 +71,8 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
if (!pending.tryPop(&item))
continue;
active.tryPush(item);
status = active.tryPush(item);
assert(status);
try
{
@ -137,7 +143,8 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
if (!res)
{
active.eraseAll([&] (auto x) { return x == item; });
pending.tryPush(item);
status = pending.tryPush(item);
assert(status);
}
}

View File

@ -4,51 +4,20 @@
#include <functional>
#include <atomic>
#include <mutex>
#include <future>
#include <condition_variable>
#include <unordered_set>
#include <set>
#include <common/shared_ptr_helper.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <Common/RingBuffer.h>
#include <Storages/MergeTree/ExecutableTask.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
class LambdaAdapter : public shared_ptr_helper<LambdaAdapter>, public ExecutableTask
{
public:
template <typename T>
explicit LambdaAdapter(T && inner_, MergeTreeData & data_) : inner(inner_), data(data_) {}
bool execute() override
{
res = inner();
inner = {};
return false;
}
void onCompleted() override
{
data.triggerBackgroundOperationTask(!res);
}
StorageID getStorageID() override
{
return data.getStorageID();
}
private:
bool res = false;
std::function<bool()> inner;
MergeTreeData & data;
};
class MergeTreeBackgroundExecutor : public shared_ptr_helper<MergeTreeBackgroundExecutor>
{
public:

View File

@ -199,6 +199,8 @@ MergeTreeData::MergeTreeData(
, data_parts_by_info(data_parts_indexes.get<TagByInfo>())
, data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
, parts_mover(this)
, background_executor(*this, BackgroundJobAssignee::Type::DataProcessing, getContext())
, background_moves_executor(*this, BackgroundJobAssignee::Type::Moving, getContext())
{
const auto settings = getSettings();
allow_nullable_key = attach || settings->allow_nullable_key;
@ -304,6 +306,22 @@ MergeTreeData::MergeTreeData(
if (!canUsePolymorphicParts(*settings, &reason) && !reason.empty())
LOG_WARNING(log, "{} Settings 'min_rows_for_wide_part', 'min_bytes_for_wide_part', "
"'min_rows_for_compact_part' and 'min_bytes_for_compact_part' will be ignored.", reason);
common_assignee_trigger = [this] (bool delay) noexcept
{
if (delay)
background_executor.postpone();
else
background_executor.trigger();
};
moves_assignee_trigger = [this] (bool delay) noexcept
{
if (delay)
background_moves_executor.postpone();
else
background_moves_executor.trigger();
};
}
StoragePolicyPtr MergeTreeData::getStoragePolicy() const
@ -4906,7 +4924,7 @@ bool MergeTreeData::scheduleDataMovingJob(BackgroundJobAssignee & executor)
[this, moving_tagger] () mutable
{
return moveParts(moving_tagger);
}, *this));
}, moves_assignee_trigger, getStorageID()));
return true;
}

View File

@ -3,6 +3,7 @@
#include <Common/SimpleIncrement.h>
#include <Common/MultiVersion.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
@ -57,7 +58,6 @@ class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
class MergeTreeDeduplicationLog;
class BackgroundJobAssignee;
namespace ErrorCodes
{
@ -849,9 +849,6 @@ public:
/// Mutex for currently_submerging_parts and currently_emerging_parts
mutable std::mutex currently_submerging_emerging_mutex;
/// Trigger merge scheduling task
virtual void triggerBackgroundOperationTask(bool delay) = 0;
protected:
friend class IMergeTreeDataPart;
@ -923,6 +920,15 @@ protected:
MergeTreePartsMover parts_mover;
/// Executors are common for both ReplicatedMergeTree and plain MergeTree
/// but they are being started and finished in derived classes, so let them be protected.
BackgroundJobAssignee background_executor;
BackgroundJobAssignee background_moves_executor;
/// Every task that is finished will ask to assign a new one into an executor.
std::function<void(bool)> common_assignee_trigger;
std::function<void(bool)> moves_assignee_trigger;
using DataPartIteratorByInfo = DataPartsIndexes::index<TagByInfo>::type::iterator;
using DataPartIteratorByStateAndInfo = DataPartsIndexes::index<TagByStateAndInfo>::type::iterator;

View File

@ -82,9 +82,6 @@ StorageMergeTree::StorageMergeTree(
, reader(*this)
, writer(*this)
, merger_mutator(*this, getContext()->getSettingsRef().background_pool_size)
, background_executor(*this, BackgroundJobAssignee::Type::DataProcessing, getContext())
, background_moves_executor(*this, BackgroundJobAssignee::Type::Moving, getContext())
{
loadDataParts(has_force_restore_data_flag);
@ -1080,7 +1077,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobAssignee & executo
[this, metadata_snapshot, merge_entry, share_lock] () mutable
{
return mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock);
}, *this));
}, common_assignee_trigger, getStorageID()));
return true;
}
if (mutate_entry)
@ -1089,7 +1086,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobAssignee & executo
[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
{
return mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
}, *this));
}, common_assignee_trigger, getStorageID()));
return true;
}
bool executed = false;
@ -1100,7 +1097,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobAssignee & executo
{
clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds());
return true;
}, *this));
}, common_assignee_trigger, getStorageID()));
executed = true;
}
if (time_after_previous_cleanup_parts.compareAndRestartDeferred(getContext()->getSettingsRef().merge_tree_clear_old_parts_interval_seconds))
@ -1115,7 +1112,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobAssignee & executo
clearOldMutations();
clearEmptyParts();
return true;
}, *this));
}, common_assignee_trigger, getStorageID()));
executed = true;
}

View File

@ -16,7 +16,6 @@
#include <Disks/StoragePolicy.h>
#include <Common/SimpleIncrement.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
namespace DB
@ -100,19 +99,6 @@ public:
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }
void triggerBackgroundOperationTask(bool delay) override
{
if (delay)
background_executor.postpone();
else
background_executor.trigger();
if (delay)
background_moves_executor.postpone();
else
background_moves_executor.trigger();
}
private:
/// Mutex and condvar for synchronous mutations wait
@ -148,10 +134,6 @@ private:
std::atomic<bool> shutdown_called {false};
/// Must be the last to be destroyed first
BackgroundJobAssignee background_executor;
BackgroundJobAssignee background_moves_executor;
void loadMutations();
/// Load and initialize deduplication logs. Even if deduplication setting

View File

@ -293,8 +293,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, replicated_fetches_pool_size(getContext()->getSettingsRef().background_fetches_pool_size)
, replicated_fetches_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_fetches_network_bandwidth, getContext()->getReplicatedFetchesThrottler()))
, replicated_sends_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_sends_network_bandwidth, getContext()->getReplicatedSendsThrottler()))
, background_executor(*this, BackgroundJobAssignee::Type::DataProcessing, getContext())
, background_moves_executor(*this, BackgroundJobAssignee::Type::Moving, getContext())
{
queue_updating_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); });
@ -3230,7 +3228,7 @@ bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobAssignee
[this, selected_entry] () mutable
{
return processQueueEntry(selected_entry);
}, *this));
}, common_assignee_trigger, getStorageID()));
return true;
}
else
@ -3239,7 +3237,7 @@ bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobAssignee
[this, selected_entry] () mutable
{
return processQueueEntry(selected_entry);
}, *this));
}, common_assignee_trigger, getStorageID()));
return true;
}
}

View File

@ -262,20 +262,6 @@ public:
bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);
void triggerBackgroundOperationTask(bool delay) override
{
if (delay)
background_executor.postpone();
else
background_executor.trigger();
if (delay)
background_moves_executor.postpone();
else
background_moves_executor.trigger();
}
private:
std::atomic_bool are_restoring_replica {false};
@ -408,10 +394,6 @@ private:
ThrottlerPtr replicated_fetches_throttler;
ThrottlerPtr replicated_sends_throttler;
/// Must be the last to be destroyed first
BackgroundJobAssignee background_executor;
BackgroundJobAssignee background_moves_executor;
template <class Func>
void foreachCommittedParts(Func && func, bool select_sequential_consistency) const;