Unify priorities: lower value means higher priority (#50205)

This commit is contained in:
Sergei Trifonov 2023-05-26 15:55:30 +02:00 committed by GitHub
parent 9603f2b49c
commit 0d1f2e297b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 108 additions and 97 deletions

11
src/Common/Priority.h Normal file
View File

@ -0,0 +1,11 @@
#pragma once
#include <base/types.h>
/// Common type for priority values.
/// Separate type (rather than `Int64` is used just to avoid implicit conversion errors and to default-initialize
struct Priority
{
Int64 value = 0; /// Note that lower value means higher priority.
constexpr operator Int64() const { return value; } /// NOLINT
};

View File

@ -123,7 +123,7 @@ void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
template <typename Thread>
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, ssize_t priority, std::optional<uint64_t> wait_microseconds, bool propagate_opentelemetry_tracing_context)
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std::optional<uint64_t> wait_microseconds, bool propagate_opentelemetry_tracing_context)
{
auto on_error = [&](const std::string & reason)
{
@ -231,19 +231,19 @@ void ThreadPoolImpl<Thread>::startNewThreadsNoLock()
}
template <typename Thread>
void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, ssize_t priority)
void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, Priority priority)
{
scheduleImpl<void>(std::move(job), priority, std::nullopt);
}
template <typename Thread>
bool ThreadPoolImpl<Thread>::trySchedule(Job job, ssize_t priority, uint64_t wait_microseconds) noexcept
bool ThreadPoolImpl<Thread>::trySchedule(Job job, Priority priority, uint64_t wait_microseconds) noexcept
{
return scheduleImpl<bool>(std::move(job), priority, wait_microseconds);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, ssize_t priority, uint64_t wait_microseconds, bool propagate_opentelemetry_tracing_context)
void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, Priority priority, uint64_t wait_microseconds, bool propagate_opentelemetry_tracing_context)
{
scheduleImpl<void>(std::move(job), priority, wait_microseconds, propagate_opentelemetry_tracing_context);
}

View File

@ -18,6 +18,7 @@
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/Priority.h>
#include <base/scope_guard.h>
/** Very simple thread pool similar to boost::threadpool.
@ -59,17 +60,17 @@ public:
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
/// Also throws an exception if cannot create thread.
/// Priority: greater is higher.
/// Priority: lower is higher.
/// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects,
/// located on stack of current thread, the stack must not be unwinded until all jobs finished. However,
/// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor.
void scheduleOrThrowOnError(Job job, ssize_t priority = 0);
void scheduleOrThrowOnError(Job job, Priority priority = {});
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, ssize_t priority = 0, uint64_t wait_microseconds = 0) noexcept;
bool trySchedule(Job job, Priority priority = {}, uint64_t wait_microseconds = 0) noexcept;
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
void scheduleOrThrow(Job job, ssize_t priority = 0, uint64_t wait_microseconds = 0, bool propagate_opentelemetry_tracing_context = true);
void scheduleOrThrow(Job job, Priority priority = {}, uint64_t wait_microseconds = 0, bool propagate_opentelemetry_tracing_context = true);
/// Wait for all currently active jobs to be done.
/// You may call schedule and wait many times in arbitrary order.
@ -123,15 +124,15 @@ private:
struct JobWithPriority
{
Job job;
ssize_t priority;
Priority priority;
DB::OpenTelemetry::TracingContextOnThread thread_trace_context;
JobWithPriority(Job job_, ssize_t priority_, const DB::OpenTelemetry::TracingContextOnThread& thread_trace_context_)
JobWithPriority(Job job_, Priority priority_, const DB::OpenTelemetry::TracingContextOnThread & thread_trace_context_)
: job(job_), priority(priority_), thread_trace_context(thread_trace_context_) {}
bool operator< (const JobWithPriority & rhs) const
bool operator<(const JobWithPriority & rhs) const
{
return priority < rhs.priority;
return priority > rhs.priority; // Reversed for `priority_queue` max-heap to yield minimum value (i.e. highest priority) first
}
};
@ -141,7 +142,7 @@ private:
std::stack<OnDestroyCallback> on_destroy_callbacks;
template <typename ReturnType>
ReturnType scheduleImpl(Job job, ssize_t priority, std::optional<uint64_t> wait_microseconds, bool propagate_opentelemetry_tracing_context = true);
ReturnType scheduleImpl(Job job, Priority priority, std::optional<uint64_t> wait_microseconds, bool propagate_opentelemetry_tracing_context = true);
void worker(typename std::list<Thread>::iterator thread_it);
@ -227,7 +228,7 @@ public:
DB::ThreadStatus thread_status;
std::apply(function, arguments);
},
0, // default priority
{}, // default priority
0, // default wait_microseconds
propagate_opentelemetry_context
);

View File

@ -28,7 +28,7 @@ void CachedCompressedReadBuffer::initInput()
}
void CachedCompressedReadBuffer::prefetch(int64_t priority)
void CachedCompressedReadBuffer::prefetch(Priority priority)
{
initInput();
file_in->prefetch(priority);

View File

@ -36,7 +36,7 @@ private:
bool nextImpl() override;
void prefetch(int64_t priority) override;
void prefetch(Priority priority) override;
/// Passed into file_in.
ReadBufferFromFileBase::ProfileCallback profile_callback;

View File

@ -51,7 +51,7 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadB
}
void CompressedReadBufferFromFile::prefetch(int64_t priority)
void CompressedReadBufferFromFile::prefetch(Priority priority)
{
file_in.prefetch(priority);
}

View File

@ -43,7 +43,7 @@ private:
bool nextImpl() override;
void prefetch(int64_t priority) override;
void prefetch(Priority priority) override;
public:
explicit CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_ = false);

View File

@ -83,19 +83,19 @@ bool AsynchronousBoundedReadBuffer::hasPendingDataToRead()
}
std::future<IAsynchronousReader::Result>
AsynchronousBoundedReadBuffer::asyncReadInto(char * data, size_t size, int64_t priority)
AsynchronousBoundedReadBuffer::asyncReadInto(char * data, size_t size, Priority priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, async_read_counters);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = read_settings.priority + priority;
request.priority = Priority{read_settings.priority.value + priority.value};
request.ignore = bytes_to_ignore;
return reader.submit(request);
}
void AsynchronousBoundedReadBuffer::prefetch(int64_t priority)
void AsynchronousBoundedReadBuffer::prefetch(Priority priority)
{
if (prefetch_future.valid())
return;

View File

@ -39,7 +39,7 @@ public:
off_t seek(off_t offset_, int whence) override;
void prefetch(int64_t priority) override;
void prefetch(Priority priority) override;
void setReadUntilPosition(size_t position) override; /// [..., position).
@ -72,7 +72,7 @@ private:
struct LastPrefetchInfo
{
UInt64 submit_time = 0;
size_t priority = 0;
Priority priority;
};
LastPrefetchInfo last_prefetch_info;
@ -87,7 +87,7 @@ private:
int64_t size,
const std::unique_ptr<Stopwatch> & execution_watch);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, Priority priority);
void resetPrefetch(FilesystemPrefetchState state);

View File

@ -26,7 +26,7 @@ namespace ErrorCodes
AsynchronousReadBufferFromFile::AsynchronousReadBufferFromFile(
IAsynchronousReader & reader_,
Int32 priority_,
Priority priority_,
const std::string & file_name_,
size_t buf_size,
int flags,
@ -60,7 +60,7 @@ AsynchronousReadBufferFromFile::AsynchronousReadBufferFromFile(
AsynchronousReadBufferFromFile::AsynchronousReadBufferFromFile(
IAsynchronousReader & reader_,
Int32 priority_,
Priority priority_,
int & fd_,
const std::string & original_file_name,
size_t buf_size,

View File

@ -17,7 +17,7 @@ protected:
public:
explicit AsynchronousReadBufferFromFile(
IAsynchronousReader & reader_,
Int32 priority_,
Priority priority_,
const std::string & file_name_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
int flags = -1,
@ -28,7 +28,7 @@ public:
/// Use pre-opened file descriptor.
explicit AsynchronousReadBufferFromFile(
IAsynchronousReader & reader_,
Int32 priority_,
Priority priority_,
int & fd, /// Will be set to -1 if constructor didn't throw and ownership of file descriptor is passed to the object.
const std::string & original_file_name = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
@ -58,7 +58,7 @@ private:
public:
AsynchronousReadBufferFromFileWithDescriptorsCache(
IAsynchronousReader & reader_,
Int32 priority_,
Priority priority_,
const std::string & file_name_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
int flags = -1,

View File

@ -40,14 +40,14 @@ std::string AsynchronousReadBufferFromFileDescriptor::getFileName() const
}
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescriptor::asyncReadInto(char * data, size_t size, int64_t priority)
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescriptor::asyncReadInto(char * data, size_t size, Priority priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<IAsynchronousReader::LocalFileDescriptor>(fd);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = base_priority + priority;
request.priority = Priority{base_priority.value + priority.value};
request.ignore = bytes_to_ignore;
bytes_to_ignore = 0;
@ -61,7 +61,7 @@ std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescripto
}
void AsynchronousReadBufferFromFileDescriptor::prefetch(int64_t priority)
void AsynchronousReadBufferFromFileDescriptor::prefetch(Priority priority)
{
if (prefetch_future.valid())
return;
@ -151,7 +151,7 @@ void AsynchronousReadBufferFromFileDescriptor::finalize()
AsynchronousReadBufferFromFileDescriptor::AsynchronousReadBufferFromFileDescriptor(
IAsynchronousReader & reader_,
Int32 priority_,
Priority priority_,
int fd_,
size_t buf_size,
char * existing_memory,

View File

@ -4,6 +4,7 @@
#include <IO/AsynchronousReader.h>
#include <Interpreters/Context.h>
#include <Common/Throttler_fwd.h>
#include <Common/Priority.h>
#include <optional>
#include <unistd.h>
@ -18,7 +19,7 @@ class AsynchronousReadBufferFromFileDescriptor : public ReadBufferFromFileBase
{
protected:
IAsynchronousReader & reader;
int64_t base_priority;
Priority base_priority;
Memory<> prefetch_buffer;
std::future<IAsynchronousReader::Result> prefetch_future;
@ -39,7 +40,7 @@ protected:
public:
AsynchronousReadBufferFromFileDescriptor(
IAsynchronousReader & reader_,
Int32 priority_,
Priority priority_,
int fd_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
@ -49,7 +50,7 @@ public:
~AsynchronousReadBufferFromFileDescriptor() override;
void prefetch(int64_t priority) override;
void prefetch(Priority priority) override;
int getFD() const
{
@ -70,7 +71,7 @@ public:
size_t getFileSize() override;
private:
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, Priority priority);
};
}

View File

@ -6,6 +6,7 @@
#include <future>
#include <boost/noncopyable.hpp>
#include <Common/Stopwatch.h>
#include <Common/Priority.h>
namespace DB
@ -47,7 +48,7 @@ public:
size_t offset = 0;
size_t size = 0;
char * buf = nullptr;
int64_t priority = 0;
Priority priority;
size_t ignore = 0;
};

View File

@ -19,7 +19,7 @@ public:
const ReadBuffer & getWrappedReadBuffer() const { return *in; }
ReadBuffer & getWrappedReadBuffer() { return *in; }
void prefetch(int64_t priority) override { in->prefetch(priority); }
void prefetch(Priority priority) override { in->prefetch(priority); }
protected:
std::unique_ptr<ReadBuffer> in;

View File

@ -87,7 +87,7 @@ bool ParallelReadBuffer::addReaderToPool()
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader), range_start, size));
++active_working_reader;
schedule([this, my_worker = std::move(worker)]() mutable { readerThreadFunction(std::move(my_worker)); }, 0);
schedule([this, my_worker = std::move(worker)]() mutable { readerThreadFunction(std::move(my_worker)); }, Priority{});
return true;
}

View File

@ -20,7 +20,7 @@ public:
~PeekableReadBuffer() override;
void prefetch(int64_t priority) override { sub_buf->prefetch(priority); }
void prefetch(Priority priority) override { sub_buf->prefetch(priority); }
/// Sets checkpoint at current position
ALWAYS_INLINE inline void setCheckpoint()

View File

@ -6,6 +6,7 @@
#include <memory>
#include <Common/Exception.h>
#include <Common/Priority.h>
#include <IO/BufferBase.h>
#include <IO/AsynchronousReader.h>
@ -20,7 +21,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
static constexpr auto DEFAULT_PREFETCH_PRIORITY = 0;
static constexpr auto DEFAULT_PREFETCH_PRIORITY = Priority{0};
/** A simple abstract class for buffered data reading (char sequences) from somewhere.
* Unlike std::istream, it provides access to the internal buffer,
@ -208,10 +209,10 @@ public:
/** Do something to allow faster subsequent call to 'nextImpl' if possible.
* It's used for asynchronous readers with double-buffering.
* `priority` is the Threadpool priority, with which the prefetch task will be schedules.
* Smaller is more priority.
* `priority` is the `ThreadPool` priority, with which the prefetch task will be scheduled.
* Lower value means higher priority.
*/
virtual void prefetch(int64_t /* priority */) {}
virtual void prefetch(Priority) {}
/**
* Set upper bound for read range [..., position).

View File

@ -124,7 +124,7 @@ bool ReadBufferFromFileDescriptor::nextImpl()
}
void ReadBufferFromFileDescriptor::prefetch(int64_t)
void ReadBufferFromFileDescriptor::prefetch(Priority)
{
#if defined(POSIX_FADV_WILLNEED)
/// For direct IO, loading data into page cache is pointless.

View File

@ -25,7 +25,7 @@ protected:
ThrottlerPtr throttler;
bool nextImpl() override;
void prefetch(int64_t priority) override;
void prefetch(Priority priority) override;
/// Name or some description of file.
std::string getFileName() const override;

View File

@ -5,6 +5,7 @@
#include <Core/Defines.h>
#include <Interpreters/Cache/FileCache_fwd.h>
#include <Common/Throttler_fwd.h>
#include <Common/Priority.h>
#include <IO/ResourceLink.h>
namespace DB
@ -84,8 +85,8 @@ struct ReadSettings
size_t mmap_threshold = 0;
MMappedFileCache * mmap_cache = nullptr;
/// For 'pread_threadpool'/'io_uring' method. Lower is more priority.
size_t priority = 0;
/// For 'pread_threadpool'/'io_uring' method. Lower value is higher priority.
Priority priority;
bool load_marks_asynchronously = true;

View File

@ -361,7 +361,7 @@ namespace
task->exception = std::current_exception();
}
task_finish_notify();
}, 0);
}, Priority{});
}
catch (...)
{

View File

@ -17,7 +17,7 @@ public:
off_t seek(off_t off, int whence) override;
void prefetch(int64_t priority) override { impl->prefetch(priority); }
void prefetch(Priority priority) override { impl->prefetch(priority); }
private:
UInt64 min_bytes_for_seek; /// Minimum positive seek offset which shall be executed using seek operation.

View File

@ -113,7 +113,7 @@ void WriteBufferFromS3::TaskTracker::add(Callback && func)
{
LOG_TEST(log, "add, in queue {}", futures.size());
auto future = scheduler(std::move(func), 0);
auto future = scheduler(std::move(func), Priority{});
auto exit_scope = scope_guard(
[&future]()
{

View File

@ -4269,7 +4269,7 @@ ReadSettings Context::getReadSettings() const
res.prefetch_buffer_size = settings.prefetch_buffer_size;
res.direct_io_threshold = settings.min_bytes_to_use_direct_io;
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;
res.priority = settings.read_priority;
res.priority = Priority{settings.read_priority};
res.remote_throttler = getRemoteReadThrottler();
res.local_throttler = getLocalReadThrottler();

View File

@ -19,7 +19,7 @@ NamesAndTypesList FilesystemReadPrefetchesLogElement::getNamesAndTypes()
{"offset", std::make_shared<DataTypeUInt64>()},
{"size", std::make_shared<DataTypeInt64>()},
{"prefetch_submit_time", std::make_shared<DataTypeDateTime64>(6)},
{"priority", std::make_shared<DataTypeUInt64>()},
{"priority", std::make_shared<DataTypeInt64>()},
{"prefetch_execution_start_time", std::make_shared<DataTypeDateTime64>(6)},
{"prefetch_execution_end_time", std::make_shared<DataTypeDateTime64>(6)},
{"prefetch_execution_time_us", std::make_shared<DataTypeUInt64>()},
@ -40,7 +40,7 @@ void FilesystemReadPrefetchesLogElement::appendToBlock(MutableColumns & columns)
columns[i++]->insert(offset);
columns[i++]->insert(size);
columns[i++]->insert(prefetch_submit_time);
columns[i++]->insert(priority);
columns[i++]->insert(priority.value);
if (execution_watch)
{
columns[i++]->insert(execution_watch->getStart());

View File

@ -4,6 +4,7 @@
#include <Core/NamesAndTypes.h>
#include <Interpreters/SystemLog.h>
#include <Common/Stopwatch.h>
#include <Common/Priority.h>
namespace DB
{
@ -25,7 +26,7 @@ struct FilesystemReadPrefetchesLogElement
Int64 size; /// -1 means unknown
Decimal64 prefetch_submit_time{};
std::optional<Stopwatch> execution_watch;
size_t priority;
Priority priority;
FilesystemPrefetchState state;
UInt64 thread_id;
String reader_id;

View File

@ -11,13 +11,13 @@ namespace DB
/// High-order function to run callbacks (functions with 'void()' signature) somewhere asynchronously.
template <typename Result, typename Callback = std::function<Result()>>
using ThreadPoolCallbackRunner = std::function<std::future<Result>(Callback &&, int64_t priority)>;
using ThreadPoolCallbackRunner = std::function<std::future<Result>(Callback &&, Priority)>;
/// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrow()'.
template <typename Result, typename Callback = std::function<Result()>>
ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name)
{
return [my_pool = &pool, thread_group = CurrentThread::getGroup(), thread_name](Callback && callback, int64_t priority) mutable -> std::future<Result>
return [my_pool = &pool, thread_group = CurrentThread::getGroup(), thread_name](Callback && callback, Priority priority) mutable -> std::future<Result>
{
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, my_callback = std::move(callback)]() mutable -> Result
{
@ -44,15 +44,14 @@ ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool &
auto future = task->get_future();
/// ThreadPool is using "bigger is higher priority" instead of "smaller is more priority".
my_pool->scheduleOrThrow([my_task = std::move(task)]{ (*my_task)(); }, -priority);
my_pool->scheduleOrThrow([my_task = std::move(task)]{ (*my_task)(); }, priority);
return future;
};
}
template <typename Result, typename T>
std::future<Result> scheduleFromThreadPool(T && task, ThreadPool & pool, const std::string & thread_name, int64_t priority = 0)
std::future<Result> scheduleFromThreadPool(T && task, ThreadPool & pool, const std::string & thread_name, Priority priority = {})
{
auto schedule = threadPoolCallbackRunner<Result, T>(pool, thread_name);
return schedule(std::move(task), priority);

View File

@ -64,19 +64,19 @@ bool AsynchronousReadBufferFromHDFS::hasPendingDataToRead()
return true;
}
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size, int64_t priority)
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size, Priority priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, nullptr);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = base_priority + priority;
request.priority = Priority{base_priority.value + priority.value};
request.ignore = 0;
return reader.submit(request);
}
void AsynchronousReadBufferFromHDFS::prefetch(int64_t priority)
void AsynchronousReadBufferFromHDFS::prefetch(Priority priority)
{
interval_watch.restart();

View File

@ -33,7 +33,7 @@ public:
off_t seek(off_t offset_, int whence) override;
void prefetch(int64_t priority) override;
void prefetch(Priority priority) override;
size_t getFileSize() override;
@ -50,10 +50,10 @@ private:
bool hasPendingDataToRead();
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, Priority priority);
IAsynchronousReader & reader;
int64_t base_priority;
Priority base_priority;
std::shared_ptr<ReadBufferFromHDFS> impl;
std::future<IAsynchronousReader::Result> prefetch_future;
Memory<> prefetch_buffer;

View File

@ -61,7 +61,7 @@ public:
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read;
virtual void prefetchBeginOfRange(int64_t /* priority */) {}
virtual void prefetchBeginOfRange(Priority) {}
protected:
/// Returns actual column name in part, which can differ from table metadata.

View File

@ -142,7 +142,7 @@ MergeTreeReadTask::MergeTreeReadTask(
const NameSet & column_name_set_,
const MergeTreeReadTaskColumns & task_columns_,
MergeTreeBlockSizePredictorPtr size_predictor_,
int64_t priority_,
Priority priority_,
std::future<MergeTreeReaderPtr> reader_,
std::vector<std::future<MergeTreeReaderPtr>> && pre_reader_for_step_)
: data_part{data_part_}

View File

@ -71,11 +71,7 @@ struct MergeTreeReadTask
std::future<MergeTreeReaderPtr> reader;
std::vector<std::future<MergeTreeReaderPtr>> pre_reader_for_step;
int64_t priority = 0; /// Priority of the task. Bigger value, bigger priority.
bool operator <(const MergeTreeReadTask & rhs) const
{
return priority < rhs.priority;
}
Priority priority;
bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); }
@ -86,7 +82,7 @@ struct MergeTreeReadTask
const NameSet & column_name_set_,
const MergeTreeReadTaskColumns & task_columns_,
MergeTreeBlockSizePredictorPtr size_predictor_,
int64_t priority_ = 0,
Priority priority_ = {},
std::future<MergeTreeReaderPtr> reader_ = {},
std::vector<std::future<MergeTreeReaderPtr>> && pre_reader_for_step_ = {});

View File

@ -1967,7 +1967,7 @@ try
res.part->remove();
else
preparePartForRemoval(res.part);
}, 0));
}, Priority{}));
}
/// Wait for every scheduled task

View File

@ -90,7 +90,7 @@ std::future<MergeTreeReaderPtr> MergeTreePrefetchedReadPool::createPrefetchedRea
const IMergeTreeDataPart & data_part,
const NamesAndTypesList & columns,
const MarkRanges & required_ranges,
int64_t priority) const
Priority priority) const
{
auto reader = data_part.getReader(
columns, storage_snapshot->metadata, required_ranges,
@ -142,7 +142,7 @@ bool MergeTreePrefetchedReadPool::TaskHolder::operator <(const TaskHolder & othe
{
chassert(task->priority >= 0);
chassert(other.task->priority >= 0);
return -task->priority < -other.task->priority; /// Less is better.
return task->priority > other.task->priority; /// Less is better.
/// With default std::priority_queue, top() returns largest element.
/// So closest to 0 will be on top with this comparator.
}
@ -153,7 +153,7 @@ void MergeTreePrefetchedReadPool::startPrefetches() const
return;
[[maybe_unused]] TaskHolder prev(nullptr, 0);
[[maybe_unused]] const int64_t highest_priority = reader_settings.read_settings.priority + 1;
[[maybe_unused]] const Priority highest_priority{reader_settings.read_settings.priority.value + 1};
assert(prefetch_queue.top().task->priority == highest_priority);
while (!prefetch_queue.empty())
{
@ -495,11 +495,11 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
auto need_marks = min_marks_per_thread;
/// Priority is given according to the prefetch number for each thread,
/// e.g. the first task of each thread has the same priority and is bigger
/// than second task of each thread, and so on.
/// e.g. the first task of each thread has the same priority and is greater
/// than the second task of each thread, and so on.
/// Add 1 to query read priority because higher priority should be given to
/// reads from pool which are from reader.
int64_t priority = reader_settings.read_settings.priority + 1;
Priority priority{reader_settings.read_settings.priority.value + 1};
while (need_marks > 0 && part_idx < parts_infos.size())
{
@ -597,7 +597,7 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
{
prefetch_queue.emplace(TaskHolder(read_task.get(), i));
}
++priority;
++priority.value;
result_threads_tasks[i].push_back(std::move(read_task));
}

View File

@ -53,12 +53,11 @@ private:
using ThreadTasks = std::deque<MergeTreeReadTaskPtr>;
using ThreadsTasks = std::map<size_t, ThreadTasks>;
/// smaller `priority` means more priority
std::future<MergeTreeReaderPtr> createPrefetchedReader(
const IMergeTreeDataPart & data_part,
const NamesAndTypesList & columns,
const MarkRanges & required_ranges,
int64_t priority) const;
Priority priority) const;
void createPrefetchedReaderForTask(MergeTreeReadTask & task) const;

View File

@ -314,7 +314,7 @@ void MergeTreeReaderCompact::readData(
last_read_granule.emplace(from_mark, column_position);
}
void MergeTreeReaderCompact::prefetchBeginOfRange(int64_t priority)
void MergeTreeReaderCompact::prefetchBeginOfRange(Priority priority)
{
if (!initialized)
{

View File

@ -38,7 +38,7 @@ public:
bool canReadIncompleteGranules() const override { return false; }
void prefetchBeginOfRange(int64_t priority) override;
void prefetchBeginOfRange(Priority priority) override;
private:
bool isContinuousReading(size_t mark, size_t column_position);

View File

@ -58,7 +58,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
}
}
void MergeTreeReaderWide::prefetchBeginOfRange(int64_t priority)
void MergeTreeReaderWide::prefetchBeginOfRange(Priority priority)
{
prefetched_streams.clear();
@ -90,7 +90,7 @@ void MergeTreeReaderWide::prefetchBeginOfRange(int64_t priority)
}
void MergeTreeReaderWide::prefetchForAllColumns(
int64_t priority, size_t num_columns, size_t from_mark, size_t current_task_last_mark, bool continue_reading)
Priority priority, size_t num_columns, size_t from_mark, size_t current_task_last_mark, bool continue_reading)
{
bool do_prefetch = data_part_info_for_read->getDataPartStorage()->isStoredOnRemoteDisk()
? settings.read_settings.remote_fs_prefetch
@ -137,7 +137,7 @@ size_t MergeTreeReaderWide::readRows(
if (num_columns == 0)
return max_rows_to_read;
prefetchForAllColumns(/* priority */0, num_columns, from_mark, current_task_last_mark, continue_reading);
prefetchForAllColumns(Priority{}, num_columns, from_mark, current_task_last_mark, continue_reading);
for (size_t pos = 0; pos < num_columns; ++pos)
{
@ -305,7 +305,7 @@ void MergeTreeReaderWide::deserializePrefix(
}
void MergeTreeReaderWide::prefetchForColumn(
int64_t priority,
Priority priority,
const NameAndTypePair & name_and_type,
const SerializationPtr & serialization,
size_t from_mark,

View File

@ -33,14 +33,14 @@ public:
bool canReadIncompleteGranules() const override { return true; }
void prefetchBeginOfRange(int64_t priority) override;
void prefetchBeginOfRange(Priority priority) override;
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;
private:
FileStreams streams;
void prefetchForAllColumns(int64_t priority, size_t num_columns, size_t from_mark, size_t current_task_last_mark, bool continue_reading);
void prefetchForAllColumns(Priority priority, size_t num_columns, size_t from_mark, size_t current_task_last_mark, bool continue_reading);
void addStreams(
const NameAndTypePair & name_and_type,
@ -55,7 +55,7 @@ private:
/// Make next readData more simple by calling 'prefetch' of all related ReadBuffers (column streams).
void prefetchForColumn(
int64_t priority,
Priority priority,
const NameAndTypePair & name_and_type,
const SerializationPtr & serialization,
size_t from_mark,

View File

@ -84,7 +84,7 @@ struct MergeTreeSource::AsyncReadingState
{
try
{
callback_runner(std::move(job), 0);
callback_runner(std::move(job), Priority{});
}
catch (...)
{

View File

@ -356,7 +356,7 @@ private:
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
return outcome;
}, 0);
}, Priority{});
}
std::mutex mutex;
@ -619,7 +619,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync()
{
return create_reader_scheduler([this] { return createReader(); }, 0);
return create_reader_scheduler([this] { return createReader(); }, Priority{});
}
StorageS3Source::ReadBufferOrFactory StorageS3Source::createS3ReadBuffer(const String & key, size_t object_size)