This commit is contained in:
kssenii 2021-09-20 16:59:44 +03:00
parent 11ae94b8d3
commit f312760dca
10 changed files with 252 additions and 20 deletions

View File

@ -503,6 +503,7 @@ class IColumn;
M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \
\
M(String, local_filesystem_read_method, "pread", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \
M(String, remote_filesystem_read_method, "read", "Method of reading data from remote filesystem, one of: read, read_threadpool.", 0) \
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \

View File

@ -0,0 +1,171 @@
#include "AsynchronousReadIndirectBufferFromRemoteFS.h"
#include <IO/ReadBufferFromS3.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Disks/ReadIndirectBufferFromWebServer.h>
#include <Common/Stopwatch.h>
#include <IO/ThreadPoolRemoteFSReader.h>
namespace CurrentMetrics
{
extern const Metric AsynchronousReadWait;
}
namespace ProfileEvents
{
extern const Event AsynchronousReadWaitMicroseconds;
}
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
}
AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRemoteFS(
AsynchronousReaderPtr reader_, Int32 priority_, ReadBufferFromRemoteFSImpl impl_)
: reader(reader_), priority(priority_), impl(impl_)
{
}
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::read()
{
IAsynchronousReader::Request request;
auto remote_fd = std::make_shared<ThreadPoolRemoteFSReader::RemoteFSFileDescriptor>();
remote_fd->impl = impl;
swap(*impl);
request.descriptor = std::move(remote_fd);
request.offset = absolute_position;
request.priority = priority;
return reader->submit(request);
}
void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
{
if (prefetch_future.valid())
{
std::cerr << "Prefetch, but not needed." << "\n";
return;
}
std::cerr << fmt::format("Prefetch. Internal buffer size: {}, "
"prefetch buffer size: {}, "
"impl interanl buffer size: unknown",
internal_buffer.size());
prefetch_future = read();
}
bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
{
size_t size = 0;
std::cerr << fmt::format("NextImpl. Offset: {}, absolute_pos: {}", offset(), absolute_position) << std::endl;
if (prefetch_future.valid())
{
std::cerr << "Future is VALID!\n";
Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
size = prefetch_future.get();
watch.stop();
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
swap(*impl);
prefetch_future = {};
}
else
{
std::cerr << "Future is NOT VALID!\n";
size = read().get();
swap(*impl);
prefetch_future = {};
}
if (size)
{
absolute_position += size;
return true;
}
return false;
}
off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
{
if (whence == SEEK_CUR)
{
/// If position within current working buffer - shift pos.
if (!working_buffer.empty() && static_cast<size_t>(getPosition() + offset_) < absolute_position)
{
pos += offset_;
return getPosition();
}
else
{
absolute_position += offset_;
}
}
else if (whence == SEEK_SET)
{
/// If position is within current working buffer - shift pos.
if (!working_buffer.empty()
&& static_cast<size_t>(offset_) >= absolute_position - working_buffer.size()
&& size_t(offset_) < absolute_position)
{
pos = working_buffer.end() - (absolute_position - offset_);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return getPosition();
}
else
{
absolute_position = offset_;
}
}
else
throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (prefetch_future.valid())
{
std::cerr << "Ignoring prefetched data" << "\n";
prefetch_future.wait();
prefetch_future = {};
}
std::cerr << "Seek with new absolute_position: " << absolute_position << std::endl;
impl->seek(absolute_position, SEEK_SET);
pos = working_buffer.end();
return absolute_position;
}
void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
{
if (prefetch_future.valid())
{
prefetch_future.wait();
prefetch_future = {};
}
}
AsynchronousReadIndirectBufferFromRemoteFS::~AsynchronousReadIndirectBufferFromRemoteFS()
{
finalize();
}
}

View File

@ -0,0 +1,51 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#include <IO/ReadBufferFromFile.h>
#include <IO/AsynchronousReader.h>
#include <IO/ReadBufferFromRemoteFS.h>
#include <Disks/IDiskRemote.h>
#include <utility>
namespace DB
{
/// Reads data from S3/HDFS using stored paths in metadata.
class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{
public:
explicit AsynchronousReadIndirectBufferFromRemoteFS(
AsynchronousReaderPtr reader_, Int32 priority_, ReadBufferFromRemoteFSImpl impl_);
~AsynchronousReadIndirectBufferFromRemoteFS() override;
off_t seek(off_t offset_, int whence) override;
off_t getPosition() override { return absolute_position - available(); }
String getFileName() const override { return metadata_file_path; }
void prefetch() override;
private:
bool nextImpl() override;
void finalize();
std::future<IAsynchronousReader::Result> read();
String metadata_file_path;
size_t absolute_position = 0;
AsynchronousReaderPtr reader;
Int32 priority;
ReadBufferFromRemoteFSImpl impl;
std::future<IAsynchronousReader::Result> prefetch_future;
};
}

View File

@ -236,8 +236,9 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
settings->client, bucket, metadata,
settings->s3_max_single_read_retries, read_settings.remote_fs_buffer_size);
if (settings->async_read)
if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool)
{
std::cerr << "\n\ncreating read buffer with thtread pool\n\n";
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolRemoteFSReader>(16, 1000000);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings.priority, std::move(s3_impl));
//return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
@ -1065,8 +1066,7 @@ DiskS3Settings::DiskS3Settings(
bool send_metadata_,
int thread_pool_size_,
int list_object_keys_size_,
int objects_chunk_size_to_delete_,
bool async_read_)
int objects_chunk_size_to_delete_)
: client(client_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, s3_min_upload_part_size(s3_min_upload_part_size_)
@ -1076,7 +1076,6 @@ DiskS3Settings::DiskS3Settings(
, thread_pool_size(thread_pool_size_)
, list_object_keys_size(list_object_keys_size_)
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_)
, async_read(async_read_)
{
}

View File

@ -36,8 +36,7 @@ struct DiskS3Settings
bool send_metadata_,
int thread_pool_size_,
int list_object_keys_size_,
int objects_chunk_size_to_delete_,
bool async_read);
int objects_chunk_size_to_delete_);
std::shared_ptr<Aws::S3::S3Client> client;
size_t s3_max_single_read_retries;
@ -48,7 +47,6 @@ struct DiskS3Settings
int thread_pool_size;
int list_object_keys_size;
int objects_chunk_size_to_delete;
bool async_read;
};

View File

@ -161,8 +161,7 @@ std::unique_ptr<DiskS3Settings> getSettings(const Poco::Util::AbstractConfigurat
config.getBool(config_prefix + ".send_metadata", false),
config.getInt(config_prefix + ".thread_pool_size", 16),
config.getInt(config_prefix + ".list_object_keys_size", 1000),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
config.getBool(config_prefix + ".async_read", false));
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000));
}
}

View File

@ -6,7 +6,7 @@
namespace DB
{
enum class ReadMethod
enum class LocalFSReadMethod
{
/**
* Simple synchronous reads with 'read'.
@ -43,12 +43,20 @@ enum class ReadMethod
pread_fake_async
};
enum class RemoteFSReadMethod
{
read,
read_threadpool,
};
class MMappedFileCache;
struct ReadSettings
{
/// Method to use reading from local filesystem.
ReadMethod local_fs_method = ReadMethod::pread;
LocalFSReadMethod local_fs_method = LocalFSReadMethod::pread;
/// Method to use reading from remote filesystem.
RemoteFSReadMethod remote_fs_method = RemoteFSReadMethod::read;
size_t local_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
size_t remote_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;

View File

@ -39,8 +39,6 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
{
auto task = std::make_shared<std::packaged_task<Result()>>([request]
{
std::cerr << "\n\nTask is execited!!!\n\n";
setThreadName("ThreadPoolRead");
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
Stopwatch watch(CLOCK_MONOTONIC);

View File

@ -34,7 +34,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
size_t alignment)
{
if (!existing_memory
&& settings.local_fs_method == ReadMethod::mmap
&& settings.local_fs_method == LocalFSReadMethod::mmap
&& settings.mmap_threshold
&& settings.mmap_cache
&& estimated_size >= settings.mmap_threshold)
@ -56,21 +56,21 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
{
std::unique_ptr<ReadBufferFromFileBase> res;
if (settings.local_fs_method == ReadMethod::read)
if (settings.local_fs_method == LocalFSReadMethod::read)
{
res = std::make_unique<ReadBufferFromFile>(filename, buffer_size, actual_flags, existing_memory, alignment);
}
else if (settings.local_fs_method == ReadMethod::pread || settings.local_fs_method == ReadMethod::mmap)
else if (settings.local_fs_method == LocalFSReadMethod::pread || settings.local_fs_method == LocalFSReadMethod::mmap)
{
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, alignment);
}
else if (settings.local_fs_method == ReadMethod::pread_fake_async)
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
{
static AsynchronousReaderPtr reader = std::make_shared<SynchronousReader>();
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment);
}
else if (settings.local_fs_method == ReadMethod::pread_threadpool)
else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool)
{
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolReader>(16, 1000000);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(

View File

@ -2803,10 +2803,17 @@ ReadSettings Context::getReadSettings() const
std::string_view read_method_str = settings.local_filesystem_read_method.value;
if (auto opt_method = magic_enum::enum_cast<ReadMethod>(read_method_str))
if (auto opt_method = magic_enum::enum_cast<LocalFSReadMethod>(read_method_str))
res.local_fs_method = *opt_method;
else
throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}'", read_method_str);
throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}' for local filesystem", read_method_str);
read_method_str = settings.remote_filesystem_read_method.value;
if (auto opt_method = magic_enum::enum_cast<RemoteFSReadMethod>(read_method_str))
res.remote_fs_method = *opt_method;
else
throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}' for remote filesystem", read_method_str);
res.local_fs_prefetch = settings.local_filesystem_read_prefetch;
res.remote_fs_prefetch = settings.remote_filesystem_read_prefetch;