diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 0e29168f906..8c4c6327c52 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -503,6 +503,7 @@ class IColumn; M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \ \ M(String, local_filesystem_read_method, "pread", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \ + M(String, remote_filesystem_read_method, "read", "Method of reading data from remote filesystem, one of: read, read_threadpool.", 0) \ M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \ M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \ M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \ diff --git a/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.cpp new file mode 100644 index 00000000000..beb69939e5f --- /dev/null +++ b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -0,0 +1,171 @@ +#include "AsynchronousReadIndirectBufferFromRemoteFS.h" + +#include +#include +#include +#include +#include + + +namespace CurrentMetrics +{ + extern const Metric AsynchronousReadWait; +} +namespace ProfileEvents +{ + extern const Event AsynchronousReadWaitMicroseconds; +} + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CANNOT_SEEK_THROUGH_FILE; +} + + +AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRemoteFS( + AsynchronousReaderPtr reader_, Int32 priority_, ReadBufferFromRemoteFSImpl impl_) + : reader(reader_), priority(priority_), impl(impl_) +{ +} + + +std::future AsynchronousReadIndirectBufferFromRemoteFS::read() +{ + IAsynchronousReader::Request request; + + auto remote_fd = std::make_shared(); + remote_fd->impl = impl; + swap(*impl); + request.descriptor = std::move(remote_fd); + request.offset = absolute_position; + request.priority = priority; + + return reader->submit(request); +} + + +void AsynchronousReadIndirectBufferFromRemoteFS::prefetch() +{ + if (prefetch_future.valid()) + { + std::cerr << "Prefetch, but not needed." << "\n"; + return; + } + + std::cerr << fmt::format("Prefetch. Internal buffer size: {}, " + "prefetch buffer size: {}, " + "impl interanl buffer size: unknown", + internal_buffer.size()); + + prefetch_future = read(); +} + + +bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() +{ + size_t size = 0; + std::cerr << fmt::format("NextImpl. Offset: {}, absolute_pos: {}", offset(), absolute_position) << std::endl; + + if (prefetch_future.valid()) + { + std::cerr << "Future is VALID!\n"; + + Stopwatch watch; + CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; + + size = prefetch_future.get(); + + watch.stop(); + ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds()); + + swap(*impl); + prefetch_future = {}; + } + else + { + std::cerr << "Future is NOT VALID!\n"; + + size = read().get(); + swap(*impl); + prefetch_future = {}; + } + + if (size) + { + absolute_position += size; + return true; + } + + return false; +} + + +off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) +{ + if (whence == SEEK_CUR) + { + /// If position within current working buffer - shift pos. + if (!working_buffer.empty() && static_cast(getPosition() + offset_) < absolute_position) + { + pos += offset_; + return getPosition(); + } + else + { + absolute_position += offset_; + } + } + else if (whence == SEEK_SET) + { + /// If position is within current working buffer - shift pos. + if (!working_buffer.empty() + && static_cast(offset_) >= absolute_position - working_buffer.size() + && size_t(offset_) < absolute_position) + { + pos = working_buffer.end() - (absolute_position - offset_); + assert(pos >= working_buffer.begin()); + assert(pos <= working_buffer.end()); + return getPosition(); + } + else + { + absolute_position = offset_; + } + } + else + throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); + + if (prefetch_future.valid()) + { + std::cerr << "Ignoring prefetched data" << "\n"; + prefetch_future.wait(); + prefetch_future = {}; + } + + std::cerr << "Seek with new absolute_position: " << absolute_position << std::endl; + impl->seek(absolute_position, SEEK_SET); + pos = working_buffer.end(); + + return absolute_position; +} + + +void AsynchronousReadIndirectBufferFromRemoteFS::finalize() +{ + if (prefetch_future.valid()) + { + prefetch_future.wait(); + prefetch_future = {}; + } +} + + +AsynchronousReadIndirectBufferFromRemoteFS::~AsynchronousReadIndirectBufferFromRemoteFS() +{ + finalize(); +} + +} diff --git a/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.h b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.h new file mode 100644 index 00000000000..eb62300ebec --- /dev/null +++ b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.h @@ -0,0 +1,51 @@ +#pragma once + +#if !defined(ARCADIA_BUILD) +#include +#endif + +#include +#include +#include +#include +#include + + +namespace DB +{ + +/// Reads data from S3/HDFS using stored paths in metadata. +class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase +{ +public: + explicit AsynchronousReadIndirectBufferFromRemoteFS( + AsynchronousReaderPtr reader_, Int32 priority_, ReadBufferFromRemoteFSImpl impl_); + + ~AsynchronousReadIndirectBufferFromRemoteFS() override; + + off_t seek(off_t offset_, int whence) override; + + off_t getPosition() override { return absolute_position - available(); } + + String getFileName() const override { return metadata_file_path; } + + void prefetch() override; + +private: + bool nextImpl() override; + + void finalize(); + + std::future read(); + + String metadata_file_path; + size_t absolute_position = 0; + + AsynchronousReaderPtr reader; + Int32 priority; + ReadBufferFromRemoteFSImpl impl; + + std::future prefetch_future; +}; + +} diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index aae5bb8fc26..93174d808c3 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -236,8 +236,9 @@ std::unique_ptr DiskS3::readFile(const String & path, co settings->client, bucket, metadata, settings->s3_max_single_read_retries, read_settings.remote_fs_buffer_size); - if (settings->async_read) + if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool) { + std::cerr << "\n\ncreating read buffer with thtread pool\n\n"; static AsynchronousReaderPtr reader = std::make_shared(16, 1000000); return std::make_unique(reader, read_settings.priority, std::move(s3_impl)); //return std::make_unique(std::move(buf), settings->min_bytes_for_seek); @@ -1065,8 +1066,7 @@ DiskS3Settings::DiskS3Settings( bool send_metadata_, int thread_pool_size_, int list_object_keys_size_, - int objects_chunk_size_to_delete_, - bool async_read_) + int objects_chunk_size_to_delete_) : client(client_) , s3_max_single_read_retries(s3_max_single_read_retries_) , s3_min_upload_part_size(s3_min_upload_part_size_) @@ -1076,7 +1076,6 @@ DiskS3Settings::DiskS3Settings( , thread_pool_size(thread_pool_size_) , list_object_keys_size(list_object_keys_size_) , objects_chunk_size_to_delete(objects_chunk_size_to_delete_) - , async_read(async_read_) { } diff --git a/src/Disks/S3/DiskS3.h b/src/Disks/S3/DiskS3.h index fafd115946d..e2141b6196e 100644 --- a/src/Disks/S3/DiskS3.h +++ b/src/Disks/S3/DiskS3.h @@ -36,8 +36,7 @@ struct DiskS3Settings bool send_metadata_, int thread_pool_size_, int list_object_keys_size_, - int objects_chunk_size_to_delete_, - bool async_read); + int objects_chunk_size_to_delete_); std::shared_ptr client; size_t s3_max_single_read_retries; @@ -48,7 +47,6 @@ struct DiskS3Settings int thread_pool_size; int list_object_keys_size; int objects_chunk_size_to_delete; - bool async_read; }; diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index aab97d10760..decf454d257 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -161,8 +161,7 @@ std::unique_ptr getSettings(const Poco::Util::AbstractConfigurat config.getBool(config_prefix + ".send_metadata", false), config.getInt(config_prefix + ".thread_pool_size", 16), config.getInt(config_prefix + ".list_object_keys_size", 1000), - config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000), - config.getBool(config_prefix + ".async_read", false)); + config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000)); } } diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index 100041d3dec..642da1cff37 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -6,7 +6,7 @@ namespace DB { -enum class ReadMethod +enum class LocalFSReadMethod { /** * Simple synchronous reads with 'read'. @@ -43,12 +43,20 @@ enum class ReadMethod pread_fake_async }; +enum class RemoteFSReadMethod +{ + read, + read_threadpool, +}; + class MMappedFileCache; struct ReadSettings { /// Method to use reading from local filesystem. - ReadMethod local_fs_method = ReadMethod::pread; + LocalFSReadMethod local_fs_method = LocalFSReadMethod::pread; + /// Method to use reading from remote filesystem. + RemoteFSReadMethod remote_fs_method = RemoteFSReadMethod::read; size_t local_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE; size_t remote_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE; diff --git a/src/IO/ThreadPoolRemoteFSReader.cpp b/src/IO/ThreadPoolRemoteFSReader.cpp index c218ce160d0..706408e818d 100644 --- a/src/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/IO/ThreadPoolRemoteFSReader.cpp @@ -39,8 +39,6 @@ std::future ThreadPoolRemoteFSReader::submit(Reques { auto task = std::make_shared>([request] { - std::cerr << "\n\nTask is execited!!!\n\n"; - setThreadName("ThreadPoolRead"); CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; Stopwatch watch(CLOCK_MONOTONIC); diff --git a/src/IO/createReadBufferFromFileBase.cpp b/src/IO/createReadBufferFromFileBase.cpp index f828199a2b4..dd667e03205 100644 --- a/src/IO/createReadBufferFromFileBase.cpp +++ b/src/IO/createReadBufferFromFileBase.cpp @@ -34,7 +34,7 @@ std::unique_ptr createReadBufferFromFileBase( size_t alignment) { if (!existing_memory - && settings.local_fs_method == ReadMethod::mmap + && settings.local_fs_method == LocalFSReadMethod::mmap && settings.mmap_threshold && settings.mmap_cache && estimated_size >= settings.mmap_threshold) @@ -56,21 +56,21 @@ std::unique_ptr createReadBufferFromFileBase( { std::unique_ptr res; - if (settings.local_fs_method == ReadMethod::read) + if (settings.local_fs_method == LocalFSReadMethod::read) { res = std::make_unique(filename, buffer_size, actual_flags, existing_memory, alignment); } - else if (settings.local_fs_method == ReadMethod::pread || settings.local_fs_method == ReadMethod::mmap) + else if (settings.local_fs_method == LocalFSReadMethod::pread || settings.local_fs_method == LocalFSReadMethod::mmap) { res = std::make_unique(filename, buffer_size, actual_flags, existing_memory, alignment); } - else if (settings.local_fs_method == ReadMethod::pread_fake_async) + else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async) { static AsynchronousReaderPtr reader = std::make_shared(); res = std::make_unique( reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment); } - else if (settings.local_fs_method == ReadMethod::pread_threadpool) + else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool) { static AsynchronousReaderPtr reader = std::make_shared(16, 1000000); res = std::make_unique( diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index be1cb21bbc3..67f5d9e35ff 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2803,10 +2803,17 @@ ReadSettings Context::getReadSettings() const std::string_view read_method_str = settings.local_filesystem_read_method.value; - if (auto opt_method = magic_enum::enum_cast(read_method_str)) + if (auto opt_method = magic_enum::enum_cast(read_method_str)) res.local_fs_method = *opt_method; else - throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}'", read_method_str); + throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}' for local filesystem", read_method_str); + + read_method_str = settings.remote_filesystem_read_method.value; + + if (auto opt_method = magic_enum::enum_cast(read_method_str)) + res.remote_fs_method = *opt_method; + else + throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}' for remote filesystem", read_method_str); res.local_fs_prefetch = settings.local_filesystem_read_prefetch; res.remote_fs_prefetch = settings.remote_filesystem_read_prefetch;