Merge pull request #49426 from kssenii/remove-dependency-from-context

Remove dependency from DB::Context in remote/cache readers
This commit is contained in:
Kseniia Sumarokova 2023-05-08 23:25:12 +02:00 committed by GitHub
commit bfe9537012
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 240 additions and 160 deletions

View File

@ -114,7 +114,7 @@ if (BUILD_STANDALONE_KEEPER)
clickhouse_add_executable(clickhouse-keeper ${CLICKHOUSE_KEEPER_STANDALONE_SOURCES})
# Remove some redundant dependencies
target_compile_definitions (clickhouse-keeper PRIVATE -DKEEPER_STANDALONE_BUILD)
target_compile_definitions (clickhouse-keeper PRIVATE -DCLICKHOUSE_PROGRAM_STANDALONE_BUILD)
target_compile_definitions (clickhouse-keeper PUBLIC -DWITHOUT_TEXT_LOG)
target_include_directories(clickhouse-keeper PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../src") # uses includes from src directory

View File

@ -57,7 +57,7 @@ int mainEntryClickHouseKeeper(int argc, char ** argv)
}
}
#ifdef KEEPER_STANDALONE_BUILD
#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
// Weak symbols don't work correctly on Darwin
// so we have a stub implementation to avoid linker errors

View File

@ -172,7 +172,7 @@ void registerCodecDeflateQpl(CompressionCodecFactory & factory);
/// Keeper use only general-purpose codecs, so we don't need these special codecs
/// in standalone build
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
void registerCodecDelta(CompressionCodecFactory & factory);
void registerCodecT64(CompressionCodecFactory & factory);
void registerCodecDoubleDelta(CompressionCodecFactory & factory);
@ -188,7 +188,7 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecZSTD(*this);
registerCodecLZ4HC(*this);
registerCodecMultiple(*this);
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
registerCodecDelta(*this);
registerCodecT64(*this);
registerCodecDoubleDelta(*this);

View File

@ -338,7 +338,7 @@ void SettingFieldString::readBinary(ReadBuffer & in)
/// that. The linker does not complain only because clickhouse-keeper does not call any of below
/// functions. A cleaner alternative would be more modular libraries, e.g. one for data types, which
/// could then be linked by the server and the linker.
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
SettingFieldMap::SettingFieldMap(const Field & f) : value(fieldToMap(f)) {}

View File

@ -18,7 +18,7 @@
#include "config.h"
#include "config_version.h"
#if USE_SENTRY && !defined(KEEPER_STANDALONE_BUILD)
#if USE_SENTRY && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
# include <sentry.h>
# include <cstdio>

View File

@ -47,14 +47,14 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
IAsynchronousReader & reader_,
const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t min_bytes_for_seek_)
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, read_settings(settings_)
, reader(reader_)
, base_priority(settings_.priority)
, impl(impl_)
, prefetch_buffer(settings_.prefetch_buffer_size)
, min_bytes_for_seek(min_bytes_for_seek_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
@ -63,6 +63,8 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
#else
, log(&Poco::Logger::get("AsyncBuffer(" + impl->getFileName() + ")"))
#endif
, async_read_counters(async_read_counters_)
, prefetches_log(prefetches_log_)
{
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
}
@ -111,7 +113,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, async_read_counters);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
@ -186,8 +188,8 @@ void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemP
.reader_id = current_reader_id,
};
if (auto prefetch_log = Context::getGlobalContextInstance()->getFilesystemReadPrefetchesLog())
prefetch_log->add(elem);
if (prefetches_log)
prefetches_log->add(elem);
}
@ -335,7 +337,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
if (impl->initialized()
&& read_until_position && new_pos < *read_until_position
&& new_pos > file_offset_of_buffer_end
&& new_pos < file_offset_of_buffer_end + min_bytes_for_seek)
&& new_pos < file_offset_of_buffer_end + read_settings.remote_read_min_bytes_for_seek)
{
ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks);
bytes_to_ignore = new_pos - file_offset_of_buffer_end;

View File

@ -12,6 +12,7 @@ namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
class ReadBufferFromRemoteFSGather;
/**
@ -34,7 +35,8 @@ public:
explicit AsynchronousReadIndirectBufferFromRemoteFS(
IAsynchronousReader & reader_, const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE);
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_);
~AsynchronousReadIndirectBufferFromRemoteFS() override;
@ -83,8 +85,6 @@ private:
Memory<> prefetch_buffer;
size_t min_bytes_for_seek;
std::string query_id;
std::string current_reader_id;
@ -95,6 +95,9 @@ private:
Poco::Logger * log;
std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;

View File

@ -48,7 +48,8 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
size_t file_size_,
bool allow_seeks_after_first_read_,
bool use_external_buffer_,
std::optional<size_t> read_until_position_)
std::optional<size_t> read_until_position_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_)
#ifndef NDEBUG
, log(&Poco::Logger::get("CachedOnDiskReadBufferFromFile(" + source_file_path_ + ")"))
@ -62,12 +63,12 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
, read_until_position(read_until_position_ ? *read_until_position_ : file_size_)
, implementation_buffer_creator(implementation_buffer_creator_)
, query_id(query_id_)
, enable_logging(!query_id.empty() && settings_.enable_filesystem_cache_log)
, current_buffer_id(getRandomASCIIString(8))
, allow_seeks_after_first_read(allow_seeks_after_first_read_)
, use_external_buffer(use_external_buffer_)
, query_context_holder(cache_->getQueryContextHolder(query_id, settings_))
, is_persistent(settings_.is_file_cache_persistent)
, cache_log(cache_log_)
{
}
@ -103,7 +104,7 @@ void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
break;
}
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
if (cache_log)
cache_log->add(elem);
}
@ -487,7 +488,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
auto * current_file_segment = &file_segments->front();
auto completed_range = current_file_segment->range();
if (enable_logging)
if (cache_log)
appendFilesystemCacheLog(completed_range, read_type);
chassert(file_offset_of_buffer_end > completed_range.right);
@ -512,7 +513,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile()
{
if (enable_logging && file_segments && !file_segments->empty())
if (cache_log && file_segments && !file_segments->empty())
{
appendFilesystemCacheLog(file_segments->front().range(), read_type);
}

View File

@ -32,7 +32,8 @@ public:
size_t file_size_,
bool allow_seeks_after_first_read_,
bool use_external_buffer_,
std::optional<size_t> read_until_position_ = std::nullopt);
std::optional<size_t> read_until_position_,
std::shared_ptr<FilesystemCacheLog> cache_log_);
~CachedOnDiskReadBufferFromFile() override;
@ -137,7 +138,6 @@ private:
String last_caller_id;
String query_id;
bool enable_logging = false;
String current_buffer_id;
bool allow_seeks_after_first_read;
@ -148,6 +148,8 @@ private:
FileCache::QueryContextHolderPtr query_context_holder;
bool is_persistent;
std::shared_ptr<FilesystemCacheLog> cache_log;
};
}

View File

@ -153,8 +153,9 @@ FileSegment & FileSegmentRangeWriter::allocateFileSegment(size_t offset, FileSeg
void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_segment)
{
if (cache_log)
{
if (!cache_log)
return;
auto file_segment_range = file_segment.range();
size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize(false) - 1;
@ -174,7 +175,6 @@ void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_s
cache_log->add(elem);
}
}
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
{

View File

@ -8,7 +8,6 @@
#include <iostream>
#include <base/hex.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/Context.h>
namespace DB
@ -17,15 +16,18 @@ namespace DB
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_)
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBuffer(nullptr, 0)
, read_buffer_creator(std::move(read_buffer_creator_))
, blobs_to_read(blobs_to_read_)
, settings(settings_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
, enable_cache_log(!query_id.empty() && settings.enable_filesystem_cache_log)
{
if (cache_log_ && settings.enable_filesystem_cache_log)
cache_log = cache_log_;
if (!blobs_to_read.empty())
current_object = blobs_to_read.front();
@ -36,7 +38,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
{
if (current_buf != nullptr && !with_cache && enable_cache_log)
if (current_buf != nullptr && !with_cache)
{
appendFilesystemCacheLog();
}
@ -61,7 +63,8 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt);
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt,
cache_log);
}
return current_read_buffer_creator();
@ -69,7 +72,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
{
if (current_object.remote_path.empty())
if (!cache_log || current_object.remote_path.empty())
return;
FilesystemCacheLogElement elem
@ -82,8 +85,6 @@ void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
.file_segment_size = total_bytes_read_from_current_file,
.read_from_cache_attempted = false,
};
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
cache_log->add(elem);
}
@ -267,10 +268,8 @@ size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
ReadBufferFromRemoteFSGather::~ReadBufferFromRemoteFSGather()
{
if (!with_cache && enable_cache_log)
{
if (!with_cache)
appendFilesystemCacheLog();
}
}
}

View File

@ -25,7 +25,8 @@ public:
ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_);
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_);
~ReadBufferFromRemoteFSGather() override;
@ -93,7 +94,7 @@ private:
size_t total_bytes_read_from_current_file = 0;
bool enable_cache_log = false;
std::shared_ptr<FilesystemCacheLog> cache_log;
};
}

View File

@ -11,7 +11,6 @@
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/AsyncReadCounters.h>
#include <Interpreters/Context.h>
#include <base/getThreadId.h>
#include <future>
@ -75,17 +74,11 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
return scheduleFromThreadPool<Result>([request]() -> Result
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead};
std::optional<AsyncReadIncrement> increment;
if (CurrentThread::isInitialized())
{
auto query_context = CurrentThread::get().getQueryContext();
if (query_context)
increment.emplace(query_context->getAsyncReadCounters());
}
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
auto async_read_counters = remote_fs_fd->getReadCounters();
std::optional<AsyncReadIncrement> increment = async_read_counters ? std::optional<AsyncReadIncrement>(async_read_counters) : std::nullopt;
auto watch = std::make_unique<Stopwatch>(CLOCK_MONOTONIC);
Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
watch->stop();

View File

@ -8,6 +8,8 @@
namespace DB
{
struct AsyncReadCounters;
class ThreadPoolRemoteFSReader : public IAsynchronousReader
{
public:
@ -24,12 +26,19 @@ private:
class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor
{
public:
explicit RemoteFSFileDescriptor(ReadBuffer & reader_) : reader(reader_) { }
explicit RemoteFSFileDescriptor(
ReadBuffer & reader_,
std::shared_ptr<AsyncReadCounters> async_read_counters_)
: reader(reader_)
, async_read_counters(async_read_counters_) {}
IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
std::shared_ptr<AsyncReadCounters> getReadCounters() const { return async_read_counters; }
private:
ReadBuffer & reader;
std::shared_ptr<AsyncReadCounters> async_read_counters;
};
}

View File

@ -5,7 +5,9 @@
#include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/IOUringReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <IO/SynchronousReader.h>
#include <IO/AsynchronousReader.h>
#include <Common/ProfileEvents.h>
#include "config.h"
@ -27,7 +29,6 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
const std::string & filename,
const ReadSettings & settings,
@ -119,11 +120,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
auto & reader = getThreadPoolReader(FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
settings.priority,
@ -137,11 +134,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool)
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
settings.priority,

View File

@ -0,0 +1,76 @@
#include <Common/ErrorCodes.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <IO/AsynchronousReader.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <IO/SynchronousReader.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
#include <Interpreters/Context.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type)
{
#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
const auto & config = Poco::Util::Application::instance().config();
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
static auto asynchronous_remote_fs_reader = createThreadPoolReader(type, config);
return *asynchronous_remote_fs_reader;
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
static auto asynchronous_local_fs_reader = createThreadPoolReader(type, config);
return *asynchronous_local_fs_reader;
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
static auto synchronous_local_fs_reader = createThreadPoolReader(type, config);
return *synchronous_local_fs_reader;
}
}
#else
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
return context->getThreadPoolReader(type);
#endif
}
std::unique_ptr<IAsynchronousReader> createThreadPoolReader(
FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config)
{
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
auto pool_size = config.getUInt(".threadpool_remote_fs_reader_pool_size", 250);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
return std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
auto pool_size = config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
return std::make_unique<ThreadPoolReader>(pool_size, queue_size);
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
return std::make_unique<SynchronousReader>();
}
}
}
}

View File

@ -0,0 +1,23 @@
#pragma once
namespace Poco::Util { class AbstractConfiguration; }
namespace DB
{
class IAsynchronousReader;
enum class FilesystemReaderType
{
SYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_REMOTE_FS_READER,
};
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type);
std::unique_ptr<IAsynchronousReader> createThreadPoolReader(
FilesystemReaderType type,
const Poco::Util::AbstractConfiguration & config);
}

View File

@ -10,6 +10,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
@ -86,6 +87,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto settings_ptr = settings.get();
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
@ -104,12 +106,16 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
auto reader_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
disk_read_settings);
disk_read_settings,
global_context->getFilesystemCacheLog());
if (disk_read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, disk_read_settings, std::move(reader_impl));
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(reader_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -4,6 +4,7 @@
#include <IO/BoundedReadBuffer.h>
#include <Disks/IO/CachedOnDiskWriteBufferFromFile.h>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Common/CurrentThread.h>

View File

@ -74,7 +74,7 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
hdfs_uri, hdfs_path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true);
};
auto hdfs_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, disk_read_settings);
auto hdfs_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, disk_read_settings, nullptr);
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl), read_settings);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
}

View File

@ -26,15 +26,6 @@ void IObjectStorage::getDirectoryContents(const std::string &,
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getDirectoryContents() is not supported");
}
IAsynchronousReader & IObjectStorage::getThreadPoolReader()
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
return context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
}
ThreadPool & IObjectStorage::getThreadPoolWriter()
{
auto context = Context::getGlobalContextInstance();

View File

@ -157,8 +157,6 @@ public:
virtual const std::string & getCacheName() const;
static IAsynchronousReader & getThreadPoolReader();
static ThreadPool & getThreadPoolWriter();
virtual void shutdown() = 0;

View File

@ -51,6 +51,7 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
std::optional<size_t> file_size) const
{
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[=] (const std::string & file_path, size_t /* read_until_position */)
-> std::unique_ptr<ReadBufferFromFileBase>
@ -59,14 +60,18 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
};
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, modified_settings);
std::move(read_buffer_creator), objects, modified_settings,
global_context->getFilesystemCacheLog());
/// We use `remove_fs_method` (not `local_fs_method`) because we are about to use
/// AsynchronousReadIndirectBufferFromRemoteFS which works by the remote_fs_* settings.
if (modified_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, modified_settings, std::move(impl));
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, modified_settings, std::move(impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -98,6 +98,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
std::optional<size_t>) const
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto settings_ptr = s3_settings.get();
@ -121,13 +122,16 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
disk_read_settings);
disk_read_settings,
global_context->getFilesystemCacheLog());
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(s3_impl), disk_read_settings.remote_read_min_bytes_for_seek);
reader, disk_read_settings, std::move(s3_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -13,6 +13,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ReadBufferFromWebServer.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Storages/MergeTree/MergeTreeData.h>
@ -179,12 +180,20 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
read_until_position);
};
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), StoredObjects{object}, read_settings);
auto global_context = Context::getGlobalContextInstance();
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
read_settings,
global_context->getFilesystemCacheLog());
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = IObjectStorage::getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(web_impl), min_bytes_for_seek);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, read_settings, std::move(web_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -4169,35 +4169,8 @@ OrdinaryBackgroundExecutorPtr Context::getCommonExecutor() const
return shared->common_executor;
}
static size_t getThreadPoolReaderSizeFromConfig(Context::FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config)
{
switch (type)
{
case Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
return config.getUInt(".threadpool_remote_fs_reader_pool_size", 250);
}
case Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
return config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
}
case Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
return std::numeric_limits<std::size_t>::max();
}
}
}
size_t Context::getThreadPoolReaderSize(FilesystemReaderType type) const
{
const auto & config = getConfigRef();
return getThreadPoolReaderSizeFromConfig(type, config);
}
IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) const
{
const auto & config = getConfigRef();
auto lock = getLock();
switch (type)
@ -4205,31 +4178,20 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
if (!shared->asynchronous_remote_fs_reader)
{
auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
shared->asynchronous_remote_fs_reader = std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);
}
shared->asynchronous_remote_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->asynchronous_remote_fs_reader;
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->asynchronous_local_fs_reader)
{
auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
shared->asynchronous_local_fs_reader = std::make_unique<ThreadPoolReader>(pool_size, queue_size);
}
shared->asynchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->asynchronous_local_fs_reader;
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->synchronous_local_fs_reader)
{
shared->synchronous_local_fs_reader = std::make_unique<SynchronousReader>();
}
shared->synchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->synchronous_local_fs_reader;
}

View File

@ -11,6 +11,7 @@
#include <Core/Settings.h>
#include <Core/UUID.h>
#include <IO/AsyncReadCounters.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DatabaseCatalog.h>
@ -1096,17 +1097,8 @@ public:
OrdinaryBackgroundExecutorPtr getFetchesExecutor() const;
OrdinaryBackgroundExecutorPtr getCommonExecutor() const;
enum class FilesystemReaderType
{
SYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_REMOTE_FS_READER,
};
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
size_t getThreadPoolReaderSize(FilesystemReaderType type) const;
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
ThreadPool & getThreadPoolWriter() const;

View File

@ -1,7 +1,7 @@
#include <Server/ProtocolServerAdapter.h>
#include <Server/TCPServer.h>
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
#include <Server/GRPCServer.h>
#endif
@ -37,7 +37,7 @@ ProtocolServerAdapter::ProtocolServerAdapter(
{
}
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
class ProtocolServerAdapter::GRPCServerAdapterImpl : public Impl
{
public:

View File

@ -23,7 +23,7 @@ public:
ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default;
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<TCPServer> tcp_server_);
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<GRPCServer> grpc_server_);
#endif

View File

@ -67,7 +67,7 @@ bool AsynchronousReadBufferFromHDFS::hasPendingDataToRead()
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, nullptr);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;

View File

@ -21,6 +21,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <IO/ReadBufferFromString.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTCreateQuery.h>
@ -232,7 +233,7 @@ public:
if (thread_pool_read)
{
return std::make_unique<AsynchronousReadBufferFromHDFS>(
IObjectStorage::getThreadPoolReader(), read_settings, std::move(buf));
getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, std::move(buf));
}
else
{

View File

@ -646,6 +646,7 @@ StorageS3Source::ReadBufferOrFactory StorageS3Source::createS3ReadBuffer(const S
std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
const String & key, const ReadSettings & read_settings, size_t object_size)
{
auto context = getContext();
auto read_buffer_creator =
[this, read_settings, object_size]
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
@ -667,10 +668,17 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{StoredObject{key, object_size}},
read_settings);
read_settings,
/* cache_log */nullptr);
auto & pool_reader = getContext()->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(pool_reader, read_settings, std::move(s3_impl));
auto modified_settings{read_settings};
/// FIXME: Changing this setting to default value breaks something around parquet reading
modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size;
auto & pool_reader = context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
pool_reader, modified_settings, std::move(s3_impl),
context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog());
async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch)

View File

@ -6,6 +6,7 @@
#include <IO/copyData.h>
#include <IO/WriteBufferFromString.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Interpreters/Context.h>
#include <Common/Config/ConfigProcessor.h>
#include <Storages/HDFS/AsynchronousReadBufferFromHDFS.h>
@ -25,7 +26,7 @@ int main()
String path = "/path/to/hdfs/file";
ReadSettings settings = {};
auto in = std::make_unique<ReadBufferFromHDFS>(hdfs_namenode_url, path, *config, settings);
auto & reader = IObjectStorage::getThreadPoolReader();
auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
AsynchronousReadBufferFromHDFS buf(reader, {}, std::move(in));
String output;