mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-14 03:25:15 +00:00
Minor changes
This commit is contained in:
parent
245899c0f6
commit
4ce8950712
@ -36,6 +36,7 @@
|
||||
M(TemporaryFilesForJoin, "Number of temporary files created for JOIN") \
|
||||
M(TemporaryFilesUnknown, "Number of temporary files created without known purpose") \
|
||||
M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \
|
||||
M(RemoteRead, "Number of read with remote reader in fly") \
|
||||
M(Write, "Number of write (write, pwrite, io_getevents, etc.) syscalls in fly") \
|
||||
M(NetworkReceive, "Number of threads receiving data from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
|
||||
M(NetworkSend, "Number of threads sending data to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
|
||||
|
@ -356,7 +356,9 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(RemoteFSCancelledPrefetches, "Number of cancelled prefecthes (because of seek)") \
|
||||
M(RemoteFSUnusedPrefetches, "Number of prefetches pending at buffer destruction") \
|
||||
M(RemoteFSPrefetchedReads, "Number of reads from prefecthed buffer") \
|
||||
M(RemoteFSPrefetchedBytes, "Number of bytes from prefecthed buffer") \
|
||||
M(RemoteFSUnprefetchedReads, "Number of reads from unprefetched buffer") \
|
||||
M(RemoteFSUnprefetchedBytes, "Number of bytes from unprefetched buffer") \
|
||||
M(RemoteFSLazySeeks, "Number of lazy seeks") \
|
||||
M(RemoteFSSeeksWithReset, "Number of seeks which lead to a new connection") \
|
||||
M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \
|
||||
@ -386,6 +388,7 @@ The server successfully detected this situation and will download merged part fr
|
||||
\
|
||||
M(AsynchronousReadWaitMicroseconds, "Time spent in waiting for asynchronous reads.") \
|
||||
M(AsynchronousRemoteReadWaitMicroseconds, "Time spent in waiting for asynchronous remote reads.") \
|
||||
M(SynchronousRemoteReadWaitMicroseconds, "Time spent in waiting for synchronous remote reads.") \
|
||||
\
|
||||
M(ExternalDataSourceLocalCacheReadBytes, "Bytes read from local cache buffer in RemoteReadBufferCache")\
|
||||
\
|
||||
|
@ -15,12 +15,15 @@ namespace CurrentMetrics
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event AsynchronousRemoteReadWaitMicroseconds;
|
||||
extern const Event SynchronousRemoteReadWaitMicroseconds;
|
||||
extern const Event RemoteFSSeeks;
|
||||
extern const Event RemoteFSPrefetches;
|
||||
extern const Event RemoteFSCancelledPrefetches;
|
||||
extern const Event RemoteFSUnusedPrefetches;
|
||||
extern const Event RemoteFSPrefetchedReads;
|
||||
extern const Event RemoteFSUnprefetchedReads;
|
||||
extern const Event RemoteFSPrefetchedBytes;
|
||||
extern const Event RemoteFSUnprefetchedBytes;
|
||||
extern const Event RemoteFSLazySeeks;
|
||||
extern const Event RemoteFSSeeksWithReset;
|
||||
extern const Event RemoteFSBuffers;
|
||||
@ -131,7 +134,6 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
|
||||
}
|
||||
|
||||
|
||||
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t position)
|
||||
{
|
||||
/// Do not reinitialize internal state in case the new end of range is already included.
|
||||
@ -141,19 +143,14 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos
|
||||
/// reading and ignoring some data.
|
||||
if (!read_until_position || position > *read_until_position)
|
||||
{
|
||||
read_until_position = position;
|
||||
|
||||
/// We must wait on future and reset the prefetch here, because otherwise there might be
|
||||
/// a race between reading the data in the threadpool and impl->setReadUntilPosition()
|
||||
/// which reinitializes internal remote read buffer (because if we have a new read range
|
||||
/// then we need a new range request) and in case of reading from cache we need to request
|
||||
/// and hold more file segment ranges from cache.
|
||||
if (prefetch_future.valid())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSCancelledPrefetches);
|
||||
prefetch_future.wait();
|
||||
prefetch_future = {};
|
||||
}
|
||||
|
||||
read_until_position = position;
|
||||
resetPrefetch(FilesystemPrefetchState::CANCELLED_WITH_RANGE_CHANGE);
|
||||
impl->setReadUntilPosition(*read_until_position);
|
||||
}
|
||||
}
|
||||
@ -184,16 +181,18 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
|
||||
prefetch_buffer.swap(memory);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads);
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size);
|
||||
}
|
||||
else
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::AsynchronousRemoteReadWaitMicroseconds);
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);
|
||||
|
||||
chassert(memory.size() == read_settings.remote_fs_buffer_size);
|
||||
std::tie(size, offset) = impl->readInto(memory.data(), memory.size(), file_offset_of_buffer_end, bytes_to_ignore);
|
||||
bytes_to_ignore = 0;
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedBytes, size);
|
||||
}
|
||||
|
||||
chassert(size >= offset);
|
||||
@ -257,13 +256,11 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
|
||||
}
|
||||
else if (prefetch_future.valid())
|
||||
{
|
||||
/// Read from prefetch buffer and recheck if the new position is valid inside.
|
||||
read_from_prefetch = true;
|
||||
|
||||
/// Read from prefetch buffer and recheck if the new position is valid inside.
|
||||
if (nextImpl())
|
||||
{
|
||||
read_from_prefetch = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/// Prefetch is cancelled because of seek.
|
||||
@ -301,8 +298,11 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
|
||||
}
|
||||
else
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
|
||||
impl->reset();
|
||||
if (impl->initialized())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
|
||||
impl->reset();
|
||||
}
|
||||
file_offset_of_buffer_end = new_pos;
|
||||
}
|
||||
|
||||
@ -312,12 +312,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
|
||||
|
||||
void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
|
||||
{
|
||||
if (prefetch_future.valid())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSUnusedPrefetches);
|
||||
prefetch_future.wait();
|
||||
prefetch_future = {};
|
||||
}
|
||||
resetPrefetch(FilesystemPrefetchState::UNNEEDED);
|
||||
}
|
||||
|
||||
|
||||
@ -326,4 +321,28 @@ AsynchronousReadIndirectBufferFromRemoteFS::~AsynchronousReadIndirectBufferFromR
|
||||
finalize();
|
||||
}
|
||||
|
||||
void AsynchronousReadIndirectBufferFromRemoteFS::resetPrefetch(FilesystemPrefetchState state)
|
||||
{
|
||||
if (!prefetch_future.valid())
|
||||
return;
|
||||
|
||||
auto [size, _] = prefetch_future.get();
|
||||
prefetch_future = {};
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size);
|
||||
|
||||
switch (state)
|
||||
{
|
||||
case FilesystemPrefetchState::UNNEEDED:
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSUnusedPrefetches);
|
||||
break;
|
||||
case FilesystemPrefetchState::CANCELLED_WITH_SEEK:
|
||||
case FilesystemPrefetchState::CANCELLED_WITH_RANGE_CHANGE:
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSCancelledPrefetches);
|
||||
break;
|
||||
default:
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of prefetch: {}", magic_enum::enum_name(state));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -64,6 +64,15 @@ private:
|
||||
|
||||
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size);
|
||||
|
||||
enum class FilesystemPrefetchState
|
||||
{
|
||||
USED,
|
||||
CANCELLED_WITH_SEEK,
|
||||
CANCELLED_WITH_RANGE_CHANGE,
|
||||
UNNEEDED,
|
||||
};
|
||||
void resetPrefetch(FilesystemPrefetchState state);
|
||||
|
||||
ReadSettings read_settings;
|
||||
|
||||
IAsynchronousReader & reader;
|
||||
|
@ -38,31 +38,31 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
|
||||
&& (!FileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const String & path, size_t file_size)
|
||||
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
|
||||
{
|
||||
if (!current_file_path.empty() && !with_cache && enable_cache_log)
|
||||
if (current_object && !with_cache && enable_cache_log)
|
||||
{
|
||||
appendFilesystemCacheLog();
|
||||
}
|
||||
|
||||
current_file_path = path;
|
||||
current_file_size = file_size;
|
||||
current_object = object;
|
||||
total_bytes_read_from_current_file = 0;
|
||||
const auto & object_path = object.absolute_path;
|
||||
|
||||
size_t current_read_until_position = read_until_position ? read_until_position : file_size;
|
||||
auto current_read_buffer_creator = [path, current_read_until_position, this]() { return read_buffer_creator(path, current_read_until_position); };
|
||||
size_t current_read_until_position = read_until_position ? read_until_position : object.bytes_size;
|
||||
auto current_read_buffer_creator = [=, this]() { return read_buffer_creator(object_path, current_read_until_position); };
|
||||
|
||||
if (with_cache)
|
||||
{
|
||||
auto cache_key = settings.remote_fs_cache->hash(path);
|
||||
auto cache_key = settings.remote_fs_cache->hash(object_path);
|
||||
return std::make_shared<CachedOnDiskReadBufferFromFile>(
|
||||
path,
|
||||
object_path,
|
||||
cache_key,
|
||||
settings.remote_fs_cache,
|
||||
std::move(current_read_buffer_creator),
|
||||
settings,
|
||||
query_id,
|
||||
file_size,
|
||||
object.bytes_size,
|
||||
/* allow_seeks */false,
|
||||
/* use_external_buffer */true,
|
||||
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt);
|
||||
@ -73,12 +73,15 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
|
||||
|
||||
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
|
||||
{
|
||||
if (!current_object)
|
||||
return;
|
||||
|
||||
FilesystemCacheLogElement elem
|
||||
{
|
||||
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
|
||||
.query_id = query_id,
|
||||
.source_file_path = current_file_path,
|
||||
.file_segment_range = { 0, current_file_size },
|
||||
.source_file_path = current_object->absolute_path,
|
||||
.file_segment_range = { 0, current_object->bytes_size },
|
||||
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
|
||||
.file_segment_size = total_bytes_read_from_current_file,
|
||||
.read_from_cache_attempted = false,
|
||||
@ -123,7 +126,7 @@ void ReadBufferFromRemoteFSGather::initialize()
|
||||
if (!current_buf || current_buf_idx != i)
|
||||
{
|
||||
current_buf_idx = i;
|
||||
current_buf = createImplementationBuffer(object.absolute_path, object.bytes_size);
|
||||
current_buf = createImplementationBuffer(object);
|
||||
}
|
||||
|
||||
current_buf->seek(current_buf_offset, SEEK_SET);
|
||||
@ -170,7 +173,7 @@ bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
|
||||
++current_buf_idx;
|
||||
|
||||
const auto & object = blobs_to_read[current_buf_idx];
|
||||
current_buf = createImplementationBuffer(object.absolute_path, object.bytes_size);
|
||||
current_buf = createImplementationBuffer(object);
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -242,7 +245,9 @@ void ReadBufferFromRemoteFSGather::reset()
|
||||
|
||||
String ReadBufferFromRemoteFSGather::getFileName() const
|
||||
{
|
||||
return current_file_path;
|
||||
if (current_object)
|
||||
return current_object->absolute_path;
|
||||
return blobs_to_read[0].absolute_path;
|
||||
}
|
||||
|
||||
size_t ReadBufferFromRemoteFSGather::getFileSize() const
|
||||
|
@ -48,7 +48,7 @@ public:
|
||||
size_t getImplementationBufferOffset() const;
|
||||
|
||||
private:
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size);
|
||||
SeekableReadBufferPtr createImplementationBuffer(const StoredObject & object);
|
||||
|
||||
bool nextImpl() override;
|
||||
|
||||
@ -71,6 +71,8 @@ private:
|
||||
String current_file_path;
|
||||
size_t current_file_size = 0;
|
||||
|
||||
std::optional<StoredObject> current_object;
|
||||
|
||||
bool with_cache;
|
||||
|
||||
String query_id;
|
||||
|
@ -22,7 +22,7 @@ namespace ProfileEvents
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric Read;
|
||||
extern const Metric RemoteRead;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -42,14 +42,11 @@ ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queu
|
||||
std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Request request)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderSubmit);
|
||||
|
||||
auto schedule = threadPoolCallbackRunner<Result>(pool, "VFSRead");
|
||||
|
||||
return schedule([request]() -> Result
|
||||
return scheduleFromThreadPool<Result>([request]() -> Result
|
||||
{
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead};
|
||||
Stopwatch watch(CLOCK_MONOTONIC);
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
|
||||
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
|
||||
|
||||
Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
|
||||
@ -57,10 +54,10 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
|
||||
watch.stop();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch.elapsedMicroseconds());
|
||||
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, result.offset ? result.size - result.offset : result.size);
|
||||
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, result.size);
|
||||
|
||||
return Result{ .size = result.size, .offset = result.offset };
|
||||
}, request.priority);
|
||||
}, pool, "VFSRead", request.priority);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ StoredObjects FakeMetadataStorageFromDisk::getStorageObjects(const std::string &
|
||||
std::string object_path = fs::path(object_storage_root_path) / blob_name;
|
||||
size_t object_size = getFileSize(path);
|
||||
|
||||
auto object = StoredObject::create(*object_storage, object_path, object_size, /* exists */true);
|
||||
auto object = StoredObject::create(*object_storage, object_path, object_size, path, /* exists */true);
|
||||
return {std::move(object)};
|
||||
}
|
||||
|
||||
|
@ -145,7 +145,7 @@ StoredObjects MetadataStorageFromDisk::getStorageObjects(const std::string & pat
|
||||
for (auto & [object_relative_path, size] : object_storage_relative_paths)
|
||||
{
|
||||
auto object_path = fs::path(metadata->getBlobsCommonPrefix()) / object_relative_path;
|
||||
StoredObject object{ object_path, size, [](const String & path_){ return path_; }};
|
||||
StoredObject object{ object_path, size, path, [](const String & path_){ return path_; }};
|
||||
object_storage_paths.push_back(object);
|
||||
}
|
||||
|
||||
|
@ -108,7 +108,7 @@ StoredObjects MetadataStorageFromPlainObjectStorage::getStorageObjects(const std
|
||||
{
|
||||
std::string blob_name = object_storage->generateBlobNameForPath(path);
|
||||
size_t object_size = getFileSize(blob_name);
|
||||
auto object = StoredObject::create(*object_storage, getAbsolutePath(blob_name), object_size, /* exists */true);
|
||||
auto object = StoredObject::create(*object_storage, getAbsolutePath(blob_name), object_size, path, /* exists */true);
|
||||
return {std::move(object)};
|
||||
}
|
||||
|
||||
|
@ -11,8 +11,10 @@ namespace DB
|
||||
StoredObject::StoredObject(
|
||||
const std::string & absolute_path_,
|
||||
uint64_t bytes_size_,
|
||||
const std::string & mapped_path_,
|
||||
PathKeyForCacheCreator && path_key_for_cache_creator_)
|
||||
: absolute_path(absolute_path_)
|
||||
, mapped_path(mapped_path_)
|
||||
, bytes_size(bytes_size_)
|
||||
, path_key_for_cache_creator(std::move(path_key_for_cache_creator_))
|
||||
{
|
||||
@ -26,8 +28,18 @@ std::string StoredObject::getPathKeyForCache() const
|
||||
return path_key_for_cache_creator(absolute_path);
|
||||
}
|
||||
|
||||
const std::string & StoredObject::getMappedPath() const
|
||||
{
|
||||
return mapped_path;
|
||||
}
|
||||
|
||||
StoredObject StoredObject::create(
|
||||
const IObjectStorage & object_storage, const std::string & object_path, size_t object_size, bool exists, bool object_bypasses_cache)
|
||||
const IObjectStorage & object_storage,
|
||||
const std::string & object_path,
|
||||
size_t object_size,
|
||||
const std::string & mapped_path,
|
||||
bool exists,
|
||||
bool object_bypasses_cache)
|
||||
{
|
||||
if (object_bypasses_cache)
|
||||
return StoredObject(object_path, object_size, {});
|
||||
@ -54,7 +66,7 @@ StoredObject StoredObject::create(
|
||||
path_key_for_cache_creator = [path = path_key_for_cache_creator(object_path)](const std::string &) { return path; };
|
||||
}
|
||||
|
||||
return StoredObject(object_path, object_size, std::move(path_key_for_cache_creator));
|
||||
return StoredObject(object_path, object_size, mapped_path, std::move(path_key_for_cache_creator));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -10,17 +10,23 @@ namespace DB
|
||||
/// Object metadata: path, size, path_key_for_cache.
|
||||
struct StoredObject
|
||||
{
|
||||
/// Absolute path of the blob in object storage.
|
||||
std::string absolute_path;
|
||||
/// A map which is mapped to current blob (for example, a corresponding local path as clickhouse sees it).
|
||||
std::string mapped_path;
|
||||
|
||||
uint64_t bytes_size;
|
||||
uint64_t bytes_size = 0;
|
||||
|
||||
std::string getPathKeyForCache() const;
|
||||
|
||||
const std::string & getMappedPath() const;
|
||||
|
||||
/// Create `StoredObject` based on metadata storage and blob name of the object.
|
||||
static StoredObject create(
|
||||
const IObjectStorage & object_storage,
|
||||
const std::string & object_path,
|
||||
size_t object_size = 0,
|
||||
const std::string & mapped_path = "",
|
||||
bool exists = false,
|
||||
bool object_bypasses_cache = false);
|
||||
|
||||
@ -32,6 +38,7 @@ struct StoredObject
|
||||
explicit StoredObject(
|
||||
const std::string & absolute_path_,
|
||||
uint64_t bytes_size_ = 0,
|
||||
const std::string & mapped_path_ = "",
|
||||
PathKeyForCacheCreator && path_key_for_cache_creator_ = {});
|
||||
};
|
||||
|
||||
|
@ -108,7 +108,7 @@ StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const s
|
||||
auto fs_path = fs::path(object_storage.url) / path;
|
||||
std::string remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
|
||||
remote_path = remote_path.substr(object_storage.url.size());
|
||||
return {StoredObject::create(object_storage, remote_path, object_storage.files.at(path).size, true)};
|
||||
return {StoredObject::create(object_storage, remote_path, object_storage.files.at(path).size, path, true)};
|
||||
}
|
||||
|
||||
std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const
|
||||
|
@ -1975,11 +1975,13 @@ void Context::dropMarkCache() const
|
||||
|
||||
ThreadPool & Context::getLoadMarksThreadpool() const
|
||||
{
|
||||
const auto & config = getConfigRef();
|
||||
|
||||
auto lock = getLock();
|
||||
if (!shared->load_marks_threadpool)
|
||||
{
|
||||
constexpr size_t pool_size = 50;
|
||||
constexpr size_t queue_size = 1000000;
|
||||
auto pool_size = config.getUInt(".load_marks_threadpool_pool_size", 50);
|
||||
auto queue_size = config.getUInt(".load_marks_threadpool_queue_size", 1000000);
|
||||
shared->load_marks_threadpool = std::make_unique<ThreadPool>(pool_size, pool_size, queue_size);
|
||||
}
|
||||
return *shared->load_marks_threadpool;
|
||||
|
@ -10,16 +10,16 @@ namespace DB
|
||||
{
|
||||
|
||||
/// High-order function to run callbacks (functions with 'void()' signature) somewhere asynchronously.
|
||||
template <typename Result>
|
||||
using ThreadPoolCallbackRunner = std::function<std::future<Result>(std::function<Result()> &&, size_t priority)>;
|
||||
template <typename Result, typename Callback = std::function<Result()>>
|
||||
using ThreadPoolCallbackRunner = std::function<std::future<Result>(Callback &&, int64_t priority)>;
|
||||
|
||||
/// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrow()'.
|
||||
template <typename Result>
|
||||
ThreadPoolCallbackRunner<Result> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name)
|
||||
template <typename Result, typename Callback = std::function<Result()>>
|
||||
ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name)
|
||||
{
|
||||
return [pool = &pool, thread_group = CurrentThread::getGroup(), thread_name](std::function<Result()> && callback, size_t priority) mutable -> std::future<Result>
|
||||
return [pool = &pool, thread_group = CurrentThread::getGroup(), thread_name](Callback && callback, int64_t priority) mutable -> std::future<Result>
|
||||
{
|
||||
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, callback = std::move(callback)]() -> Result
|
||||
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, callback = std::move(callback)]() mutable -> Result
|
||||
{
|
||||
if (thread_group)
|
||||
CurrentThread::attachTo(thread_group);
|
||||
@ -43,4 +43,11 @@ ThreadPoolCallbackRunner<Result> threadPoolCallbackRunner(ThreadPool & pool, con
|
||||
};
|
||||
}
|
||||
|
||||
template <typename Result, typename T>
|
||||
std::future<Result> scheduleFromThreadPool(T && task, ThreadPool & pool, const std::string & thread_name, int64_t priority = 0)
|
||||
{
|
||||
auto schedule = threadPoolCallbackRunner<Result, T>(pool, thread_name);
|
||||
return schedule(std::move(task), priority);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Common/MemoryTrackerBlockerInThread.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <Interpreters/threadPoolCallbackRunner.h>
|
||||
#include <Compression/CompressedReadBufferFromFile.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/scope_guard_safe.h>
|
||||
@ -178,29 +179,11 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
|
||||
|
||||
std::future<MarkCache::MappedPtr> MergeTreeMarksLoader::loadMarksAsync()
|
||||
{
|
||||
ThreadGroupStatusPtr thread_group;
|
||||
if (CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup())
|
||||
thread_group = CurrentThread::get().getThreadGroup();
|
||||
|
||||
auto task = std::make_shared<std::packaged_task<MarkCache::MappedPtr()>>([thread_group, this]
|
||||
{
|
||||
setThreadName("loadMarksThread");
|
||||
|
||||
if (thread_group)
|
||||
CurrentThread::attachTo(thread_group);
|
||||
|
||||
SCOPE_EXIT_SAFE({
|
||||
if (thread_group)
|
||||
CurrentThread::detachQuery();
|
||||
});
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::BackgroundLoadingMarksTasks);
|
||||
return loadMarks();
|
||||
});
|
||||
|
||||
auto task_future = task->get_future();
|
||||
load_marks_threadpool->scheduleOrThrow([task]{ (*task)(); });
|
||||
return task_future;
|
||||
return scheduleFromThreadPool<MarkCache::MappedPtr>([this]() -> MarkCache::MappedPtr
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::BackgroundLoadingMarksTasks);
|
||||
return loadMarksImpl();
|
||||
}, *load_marks_threadpool, "LoadMarksThread");
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user