This commit is contained in:
李扬 2024-11-21 14:14:32 +08:00 committed by GitHub
commit 71172ecbe5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 225 additions and 41 deletions

View File

@ -48,7 +48,8 @@ AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
const ReadSettings & settings_, const ReadSettings & settings_,
size_t buffer_size_, size_t buffer_size_,
AsyncReadCountersPtr async_read_counters_, AsyncReadCountersPtr async_read_counters_,
FilesystemReadPrefetchesLogPtr prefetches_log_) FilesystemReadPrefetchesLogPtr prefetches_log_,
bool enable_read_at)
: ReadBufferFromFileBase(0, nullptr, 0) : ReadBufferFromFileBase(0, nullptr, 0)
, impl(std::move(impl_)) , impl(std::move(impl_))
, read_settings(settings_) , read_settings(settings_)
@ -59,6 +60,7 @@ AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
, log(getLogger("AsynchronousBoundedReadBuffer")) , log(getLogger("AsynchronousBoundedReadBuffer"))
, async_read_counters(async_read_counters_) , async_read_counters(async_read_counters_)
, prefetches_log(prefetches_log_) , prefetches_log(prefetches_log_)
, supports_read_at(enable_read_at && impl->supportsReadAt())
{ {
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers); ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
} }
@ -89,6 +91,7 @@ std::future<IAsynchronousReader::Result> AsynchronousBoundedReadBuffer::readAsyn
request.offset = file_offset_of_buffer_end; request.offset = file_offset_of_buffer_end;
request.priority = Priority{read_settings.priority.value + priority.value}; request.priority = Priority{read_settings.priority.value + priority.value};
request.ignore = bytes_to_ignore; request.ignore = bytes_to_ignore;
request.supports_read_at = supports_read_at;
return reader.submit(request); return reader.submit(request);
} }
@ -100,6 +103,7 @@ IAsynchronousReader::Result AsynchronousBoundedReadBuffer::readSync(char * data,
request.size = size; request.size = size;
request.offset = file_offset_of_buffer_end; request.offset = file_offset_of_buffer_end;
request.ignore = bytes_to_ignore; request.ignore = bytes_to_ignore;
request.supports_read_at = supports_read_at;
return reader.execute(request); return reader.execute(request);
} }
@ -236,7 +240,10 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
pos = working_buffer.begin(); pos = working_buffer.begin();
} }
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); if (!impl->supportsReadAt())
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
else
file_offset_of_buffer_end += bytes_to_ignore + result.size - result.offset;
chassert(file_offset_of_buffer_end <= impl->getFileSize()); chassert(file_offset_of_buffer_end <= impl->getFileSize());

View File

@ -29,7 +29,8 @@ public:
const ReadSettings & settings_, const ReadSettings & settings_,
size_t buffer_size_, size_t buffer_size_,
AsyncReadCountersPtr async_read_counters_ = nullptr, AsyncReadCountersPtr async_read_counters_ = nullptr,
FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr); FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr,
bool enable_read_at = false);
~AsynchronousBoundedReadBuffer() override; ~AsynchronousBoundedReadBuffer() override;
@ -74,6 +75,8 @@ private:
AsyncReadCountersPtr async_read_counters; AsyncReadCountersPtr async_read_counters;
FilesystemReadPrefetchesLogPtr prefetches_log; FilesystemReadPrefetchesLogPtr prefetches_log;
bool supports_read_at;
struct LastPrefetchInfo struct LastPrefetchInfo
{ {
std::chrono::system_clock::time_point submit_time; std::chrono::system_clock::time_point submit_time;

View File

@ -38,6 +38,11 @@ namespace CurrentMetrics
extern const Metric ThreadPoolRemoteFSReaderThreadsScheduled; extern const Metric ThreadPoolRemoteFSReaderThreadsScheduled;
} }
namespace ErrorCodes
{
extern const int ATTEMPT_TO_READ_AFTER_EOF;
}
namespace DB namespace DB
{ {
@ -77,6 +82,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
auto * fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get()); auto * fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
auto & reader = fd->getReader(); auto & reader = fd->getReader();
if (!request.supports_read_at)
{ {
ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds); ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds);
/// `seek` have to be done before checking `isContentCached`, and `set` have to be done prior to `seek` /// `seek` have to be done before checking `isContentCached`, and `set` have to be done prior to `seek`
@ -125,38 +131,75 @@ IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request, b
auto read_counters = fd->getReadCounters(); auto read_counters = fd->getReadCounters();
std::optional<AsyncReadIncrement> increment = read_counters ? std::optional<AsyncReadIncrement>(read_counters) : std::nullopt; std::optional<AsyncReadIncrement> increment = read_counters ? std::optional<AsyncReadIncrement>(read_counters) : std::nullopt;
auto watch = std::make_unique<Stopwatch>(CLOCK_REALTIME);
IAsynchronousReader::Result read_result;
if (!request.supports_read_at)
{ {
ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds);
if (!seek_performed)
{ {
reader.set(request.buf, request.size); ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds);
reader.seek(request.offset, SEEK_SET); if (!seek_performed)
{
reader.set(request.buf, request.size);
reader.seek(request.offset, SEEK_SET);
}
if (request.ignore)
{
ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore);
reader.ignore(request.ignore);
}
} }
if (request.ignore) watch->start();
bool result = reader.available();
if (!result)
result = reader.next();
watch->stop();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds());
if (result)
{ {
ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore); chassert(reader.buffer().begin() == request.buf);
reader.ignore(request.ignore); chassert(reader.buffer().end() <= request.buf + request.size);
read_result.size = reader.buffer().size();
read_result.offset = reader.offset();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size);
} }
} }
else
auto watch = std::make_unique<Stopwatch>(CLOCK_REALTIME);
bool result = reader.available();
if (!result)
result = reader.next();
watch->stop();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds());
IAsynchronousReader::Result read_result;
if (result)
{ {
chassert(reader.buffer().begin() == request.buf); size_t curr_offset = request.offset;
chassert(reader.buffer().end() <= request.buf + request.size); if (request.ignore)
read_result.size = reader.buffer().size(); {
read_result.offset = reader.offset(); ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds);
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size); size_t bytes_to_ignore = request.ignore;
while (bytes_to_ignore)
{
size_t size = std::min(bytes_to_ignore, request.size);
size_t n = reader.readBigAt(request.buf, size, curr_offset, nullptr);
if (!n)
break;
bytes_to_ignore -= n;
curr_offset += n;
}
if (bytes_to_ignore)
throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after eof");
}
watch->start();
size_t n = reader.readBigAt(request.buf, request.size, curr_offset, nullptr);
watch->stop();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds());
if (n)
{
read_result.size = n;
read_result.offset = 0;
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size);
}
} }
read_result.execution_watch = std::move(watch); read_result.execution_watch = std::move(watch);

View File

@ -50,6 +50,7 @@ public:
char * buf = nullptr; char * buf = nullptr;
Priority priority; Priority priority;
size_t ignore = 0; size_t ignore = 0;
bool supports_read_at = false;
}; };
struct Result struct Result

View File

@ -1,26 +1,78 @@
#include <memory> #include <memory>
#include <string> #include <IO/ParallelReadBuffer.h>
#include <IO/SharedThreadPools.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h> #include <IO/copyData.h>
#include <Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h>
#include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h> #include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h>
#include <base/types.h> #include <base/types.h>
#include <Common/Config/ConfigProcessor.h>
#include <Poco/Util/MapConfiguration.h> #include <Poco/Util/MapConfiguration.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/threadPoolCallbackRunner.h>
using namespace DB; using namespace DB;
int main() int main(int /*argc*/, char ** argv)
{ {
setenv("LIBHDFS3_CONF", "/path/to/hdfs-site.xml", true); /// NOLINT
String hdfs_uri = "hdfs://cluster_name";
String hdfs_file_path = "/path/to/hdfs/file";
ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration()); ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration());
auto shared_context = SharedContextHolder(Context::createShared());
auto global_context = Context::createGlobal(shared_context.get());
global_context->makeGlobalContext();
global_context->setConfig(config);
getIOThreadPool().initialize(100, 0, 100);
setenv("LIBHDFS3_CONF", "/path/to/config", true); /// NOLINT
String hdfs_uri = "hdfs://clustername";
String hdfs_file_path = "/path/to/file";
ReadSettings read_settings; ReadSettings read_settings;
ReadBufferFromHDFS read_buffer(hdfs_uri, hdfs_file_path, *config, read_settings, 2097152UL, false);
auto get_read_buffer = [&](bool async, bool prefetch, bool read_at)
{
auto rb = std::make_shared<ReadBufferFromHDFS>(hdfs_uri, hdfs_file_path, *config, read_settings, 0, true);
ReadBufferPtr res;
if (async)
{
std::cout << "use async" << std::endl;
if (prefetch)
std::cout << "use prefetch" << std::endl;
if (read_at)
std::cout << "use read_at" << std::endl;
read_settings.remote_fs_prefetch = prefetch;
res = std::make_shared<AsynchronousReadBufferFromHDFS>(
getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, rb, read_at);
}
else
res = rb;
return res;
};
auto wrap_parallel_if_needed = [&](ReadBufferPtr input, bool parallel) -> ReadBufferPtr
{
if (parallel)
{
std::cout << "use parallel" << std::endl;
return wrapInParallelReadBufferIfSupported(
*input, threadPoolCallbackRunnerUnsafe<void>(getIOThreadPool().get(), "ParallelRead"), 4, 10 * 1024 * 1024, 1529522144);
}
else
return input;
};
bool async = (std::stoi(std::string(argv[1])) != 0);
bool prefetch = (std::stoi(std::string(argv[2])) != 0);
bool read_at = (std::stoi(std::string(argv[3])) != 0);
bool parallel = (std::stoi(std::string(argv[4])) != 0);
std::cout << "async: " << async << ", prefetch: " << prefetch << ", read_at: " << read_at << ", parallel: " << parallel << std::endl;
auto holder = get_read_buffer(async, prefetch, read_at);
auto rb = wrap_parallel_if_needed(holder, parallel);
String download_path = "./download"; String download_path = "./download";
WriteBufferFromFile write_buffer(download_path); WriteBufferFromFile write_buffer(download_path);
copyData(read_buffer, write_buffer); copyData(*rb, write_buffer);
return 0; return 0;
} }

View File

@ -36,7 +36,7 @@ namespace ErrorCodes
} }
AsynchronousReadBufferFromHDFS::AsynchronousReadBufferFromHDFS( AsynchronousReadBufferFromHDFS::AsynchronousReadBufferFromHDFS(
IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr<ReadBufferFromHDFS> impl_) IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr<ReadBufferFromHDFS> impl_, bool enable_read_at)
: BufferWithOwnMemory<SeekableReadBuffer>(settings_.remote_fs_buffer_size) : BufferWithOwnMemory<SeekableReadBuffer>(settings_.remote_fs_buffer_size)
, reader(reader_) , reader(reader_)
, base_priority(settings_.priority) , base_priority(settings_.priority)
@ -44,6 +44,7 @@ AsynchronousReadBufferFromHDFS::AsynchronousReadBufferFromHDFS(
, prefetch_buffer(settings_.remote_fs_buffer_size) , prefetch_buffer(settings_.remote_fs_buffer_size)
, read_until_position(impl->getFileSize()) , read_until_position(impl->getFileSize())
, use_prefetch(settings_.remote_fs_prefetch) , use_prefetch(settings_.remote_fs_prefetch)
, supports_read_at(enable_read_at && impl->supportsReadAt())
, log(getLogger("AsynchronousReadBufferFromHDFS")) , log(getLogger("AsynchronousReadBufferFromHDFS"))
{ {
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers); ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
@ -73,9 +74,23 @@ std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncRe
request.offset = file_offset_of_buffer_end; request.offset = file_offset_of_buffer_end;
request.priority = Priority{base_priority.value + priority.value}; request.priority = Priority{base_priority.value + priority.value};
request.ignore = 0; request.ignore = 0;
request.supports_read_at = supports_read_at;
return reader.submit(request); return reader.submit(request);
} }
IAsynchronousReader::Result AsynchronousReadBufferFromHDFS::readInto(char * data, size_t size, Priority priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, nullptr);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = Priority{base_priority.value + priority.value};
request.ignore = 0;
request.supports_read_at = supports_read_at;
return reader.execute(request);
}
void AsynchronousReadBufferFromHDFS::prefetch(Priority priority) void AsynchronousReadBufferFromHDFS::prefetch(Priority priority)
{ {
interval_watch.restart(); interval_watch.restart();
@ -115,6 +130,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
size_t size = 0; size_t size = 0;
size_t bytes_read = 0; size_t bytes_read = 0;
IAsynchronousReader::Result result;
if (prefetch_future.valid()) if (prefetch_future.valid())
{ {
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads); ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads);
@ -123,7 +139,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
{ {
Stopwatch watch; Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
auto result = prefetch_future.get(); result = prefetch_future.get();
size = result.size; size = result.size;
offset = result.offset; offset = result.offset;
LOG_TEST(log, "Current size: {}, offset: {}", size, offset); LOG_TEST(log, "Current size: {}, offset: {}", size, offset);
@ -147,7 +163,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
{ {
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads); ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
auto result = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get(); result = readInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY);
size = result.size; size = result.size;
auto offset = result.offset; auto offset = result.offset;
@ -164,7 +180,11 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
} }
} }
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); if (!supports_read_at)
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
else
file_offset_of_buffer_end += result.size;
prefetch_future = {}; prefetch_future = {};
if (use_prefetch && bytes_read) if (use_prefetch && bytes_read)
@ -270,6 +290,21 @@ size_t AsynchronousReadBufferFromHDFS::getFileOffsetOfBufferEnd() const
return file_offset_of_buffer_end; return file_offset_of_buffer_end;
} }
bool AsynchronousReadBufferFromHDFS::supportsRightBoundedReads() const
{
return true;
}
void AsynchronousReadBufferFromHDFS::setReadUntilPosition(size_t position)
{
read_until_position = position;
}
void AsynchronousReadBufferFromHDFS::setReadUntilEnd()
{
read_until_position = impl->getFileSize();
}
} }
#endif #endif

View File

@ -27,10 +27,17 @@ public:
AsynchronousReadBufferFromHDFS( AsynchronousReadBufferFromHDFS(
IAsynchronousReader & reader_, IAsynchronousReader & reader_,
const ReadSettings & settings_, const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromHDFS> impl_); std::shared_ptr<ReadBufferFromHDFS> impl_,
bool enable_read_at = false);
~AsynchronousReadBufferFromHDFS() override; ~AsynchronousReadBufferFromHDFS() override;
bool supportsRightBoundedReads() const override;
void setReadUntilPosition(size_t position) override;
void setReadUntilEnd() override;
off_t seek(off_t offset_, int whence) override; off_t seek(off_t offset_, int whence) override;
void prefetch(Priority priority) override; void prefetch(Priority priority) override;
@ -51,6 +58,7 @@ private:
bool hasPendingDataToRead(); bool hasPendingDataToRead();
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, Priority priority); std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, Priority priority);
IAsynchronousReader::Result readInto(char * data, size_t size, Priority priority);
IAsynchronousReader & reader; IAsynchronousReader & reader;
Priority base_priority; Priority base_priority;
@ -61,6 +69,7 @@ private:
size_t file_offset_of_buffer_end = 0; size_t file_offset_of_buffer_end = 0;
std::optional<size_t> read_until_position; std::optional<size_t> read_until_position;
bool use_prefetch; bool use_prefetch;
bool supports_read_at;
LoggerPtr log; LoggerPtr log;

View File

@ -165,6 +165,19 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
return file_offset; return file_offset;
} }
bool supportsRightBoundedReads() const override { return true; }
void setReadUntilPosition(size_t position) override
{
read_until_position = position;
}
void setReadUntilEnd() override
{
read_until_position = 0;
}
size_t pread(char * buffer, size_t size, size_t offset) size_t pread(char * buffer, size_t size, size_t offset)
{ {
ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, size); ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, size);
@ -284,6 +297,21 @@ bool ReadBufferFromHDFS::supportsReadAt()
return true; return true;
} }
bool ReadBufferFromHDFS::supportsRightBoundedReads() const
{
return impl->supportsRightBoundedReads();
}
void ReadBufferFromHDFS::setReadUntilPosition(size_t position)
{
impl->setReadUntilPosition(position);
}
void ReadBufferFromHDFS::setReadUntilEnd()
{
impl->setReadUntilEnd();
}
} }
#endif #endif

View File

@ -34,6 +34,12 @@ public:
~ReadBufferFromHDFS() override; ~ReadBufferFromHDFS() override;
bool supportsRightBoundedReads() const override;
void setReadUntilPosition(size_t position) override;
void setReadUntilEnd() override;
bool nextImpl() override; bool nextImpl() override;
off_t seek(off_t offset_, int whence) override; off_t seek(off_t offset_, int whence) override;