Use ThreadPoolCallbackRunnerLocal in MergeTreePrefetchedReadPool

This commit is contained in:
Antonio Andelic 2024-05-08 09:25:38 +02:00
parent 090ef99339
commit 195ea4ec1f
2 changed files with 22 additions and 48 deletions

View File

@ -46,49 +46,30 @@ bool MergeTreePrefetchedReadPool::TaskHolder::operator<(const TaskHolder & other
}
MergeTreePrefetchedReadPool::PrefetchedReaders::~PrefetchedReaders()
{
for (auto & prefetch_future : prefetch_futures)
if (prefetch_future.valid())
prefetch_future.wait();
}
MergeTreePrefetchedReadPool::PrefetchedReaders::PrefetchedReaders(
ThreadPool & pool,
MergeTreeReadTask::Readers readers_,
Priority priority_,
MergeTreePrefetchedReadPool & pool_)
MergeTreePrefetchedReadPool & read_prefetch)
: is_valid(true)
, readers(std::move(readers_))
, prefetch_runner(pool, "Prefetch")
{
try
prefetch_runner(read_prefetch.createPrefetchedTask(readers.main.get(), priority_));
for (const auto & reader : readers.prewhere)
prefetch_runner(read_prefetch.createPrefetchedTask(reader.get(), priority_));
fiu_do_on(FailPoints::prefetched_reader_pool_failpoint,
{
prefetch_futures.reserve(1 + readers.prewhere.size());
prefetch_futures.push_back(pool_.createPrefetchedFuture(readers.main.get(), priority_));
for (const auto & reader : readers.prewhere)
prefetch_futures.push_back(pool_.createPrefetchedFuture(reader.get(), priority_));
fiu_do_on(FailPoints::prefetched_reader_pool_failpoint,
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failpoint for prefetched reader enabled");
});
}
catch (...) /// in case of memory exceptions we have to wait
{
for (auto & prefetch_future : prefetch_futures)
if (prefetch_future.valid())
prefetch_future.wait();
throw;
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failpoint for prefetched reader enabled");
});
}
void MergeTreePrefetchedReadPool::PrefetchedReaders::wait()
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
for (auto & prefetch_future : prefetch_futures)
prefetch_future.wait();
prefetch_runner.waitForAllToFinish();
}
MergeTreeReadTask::Readers MergeTreePrefetchedReadPool::PrefetchedReaders::get()
@ -96,13 +77,7 @@ MergeTreeReadTask::Readers MergeTreePrefetchedReadPool::PrefetchedReaders::get()
SCOPE_EXIT({ is_valid = false; });
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
/// First wait for completion of all futures.
for (auto & prefetch_future : prefetch_futures)
prefetch_future.wait();
/// Then rethrow first exception if any.
for (auto & prefetch_future : prefetch_futures)
prefetch_future.get();
prefetch_runner.waitForAllToFinishAndRethrowFirstError();
return std::move(readers);
}
@ -139,7 +114,7 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
fillPerThreadTasks(pool_settings.threads, pool_settings.sum_marks);
}
std::future<void> MergeTreePrefetchedReadPool::createPrefetchedFuture(IMergeTreeReader * reader, Priority priority)
std::function<void()> MergeTreePrefetchedReadPool::createPrefetchedTask(IMergeTreeReader * reader, Priority priority)
{
/// In order to make a prefetch we need to wait for marks to be loaded. But we just created
/// a reader (which starts loading marks in its constructor), then if we do prefetch right
@ -147,14 +122,12 @@ std::future<void> MergeTreePrefetchedReadPool::createPrefetchedFuture(IMergeTree
/// only inside this MergeTreePrefetchedReadPool, where read tasks are created and distributed,
/// and we cannot block either, therefore make prefetch inside the pool and put the future
/// into the thread task. When a thread calls getTask(), it will wait for it is not ready yet.
auto task = [=, context = getContext()]() mutable
return [=, context = getContext()]() mutable
{
/// For async read metrics in system.query_log.
PrefetchIncrement watch(context->getAsyncReadCounters());
reader->prefetchBeginOfRange(priority);
};
return scheduleFromThreadPoolUnsafe<void>(std::move(task), prefetch_threadpool, "ReadPrepare", priority);
}
void MergeTreePrefetchedReadPool::createPrefetchedReadersForTask(ThreadTask & task)
@ -164,7 +137,7 @@ void MergeTreePrefetchedReadPool::createPrefetchedReadersForTask(ThreadTask & ta
auto extras = getExtras();
auto readers = MergeTreeReadTask::createReaders(task.read_info, extras, task.ranges);
task.readers_future = std::make_unique<PrefetchedReaders>(std::move(readers), task.priority, *this);
task.readers_future = std::make_unique<PrefetchedReaders>(prefetch_threadpool, std::move(readers), task.priority, *this);
}
void MergeTreePrefetchedReadPool::startPrefetches()

View File

@ -1,5 +1,6 @@
#pragma once
#include <Storages/MergeTree/MergeTreeReadPoolBase.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Common/ThreadPool_fwd.h>
#include <IO/AsyncReadCounters.h>
#include <boost/heap/priority_queue.hpp>
@ -51,18 +52,18 @@ private:
class PrefetchedReaders
{
public:
PrefetchedReaders() = default;
PrefetchedReaders(MergeTreeReadTask::Readers readers_, Priority priority_, MergeTreePrefetchedReadPool & pool_);
PrefetchedReaders(
ThreadPool & pool, MergeTreeReadTask::Readers readers_, Priority priority_, MergeTreePrefetchedReadPool & read_prefetch);
void wait();
MergeTreeReadTask::Readers get();
bool valid() const { return is_valid; }
~PrefetchedReaders();
private:
bool is_valid = false;
MergeTreeReadTask::Readers readers;
std::vector<std::future<void>> prefetch_futures;
ThreadPoolCallbackRunnerLocal<void> prefetch_runner;
};
struct ThreadTask
@ -108,7 +109,7 @@ private:
void startPrefetches();
void createPrefetchedReadersForTask(ThreadTask & task);
std::future<void> createPrefetchedFuture(IMergeTreeReader * reader, Priority priority);
std::function<void()> createPrefetchedTask(IMergeTreeReader * reader, Priority priority);
MergeTreeReadTaskPtr stealTask(size_t thread, MergeTreeReadTask * previous_task);
MergeTreeReadTaskPtr createTask(ThreadTask & thread_task, MergeTreeReadTask * previous_task);