Compare commits

...

4 Commits

Author SHA1 Message Date
李扬
7813d59e54
Merge ea6a66a2c0 into 0fd196771e 2024-11-20 10:43:11 +08:00
taiyang-li
ea6a66a2c0 fix bulding 2024-11-18 15:27:22 +08:00
taiyang-li
7ca3a7b103 utilize pread in AsynchronousReadBufferxxx 2024-11-12 13:07:28 +08:00
taiyang-li
42c7383a12 support supportsRightBoundedReads for ReadBufferFromHDFS and AsynchronousReadBufferFromHDFS 2024-11-11 16:19:52 +08:00
9 changed files with 225 additions and 41 deletions

View File

@ -48,7 +48,8 @@ AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
const ReadSettings & settings_,
size_t buffer_size_,
AsyncReadCountersPtr async_read_counters_,
FilesystemReadPrefetchesLogPtr prefetches_log_)
FilesystemReadPrefetchesLogPtr prefetches_log_,
bool enable_read_at)
: ReadBufferFromFileBase(0, nullptr, 0)
, impl(std::move(impl_))
, read_settings(settings_)
@ -59,6 +60,7 @@ AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
, log(getLogger("AsynchronousBoundedReadBuffer"))
, async_read_counters(async_read_counters_)
, prefetches_log(prefetches_log_)
, supports_read_at(enable_read_at && impl->supportsReadAt())
{
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
}
@ -89,6 +91,7 @@ std::future<IAsynchronousReader::Result> AsynchronousBoundedReadBuffer::readAsyn
request.offset = file_offset_of_buffer_end;
request.priority = Priority{read_settings.priority.value + priority.value};
request.ignore = bytes_to_ignore;
request.supports_read_at = supports_read_at;
return reader.submit(request);
}
@ -100,6 +103,7 @@ IAsynchronousReader::Result AsynchronousBoundedReadBuffer::readSync(char * data,
request.size = size;
request.offset = file_offset_of_buffer_end;
request.ignore = bytes_to_ignore;
request.supports_read_at = supports_read_at;
return reader.execute(request);
}
@ -236,7 +240,10 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
pos = working_buffer.begin();
}
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
if (!impl->supportsReadAt())
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
else
file_offset_of_buffer_end += bytes_to_ignore + result.size - result.offset;
chassert(file_offset_of_buffer_end <= impl->getFileSize());

View File

@ -29,7 +29,8 @@ public:
const ReadSettings & settings_,
size_t buffer_size_,
AsyncReadCountersPtr async_read_counters_ = nullptr,
FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr);
FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr,
bool enable_read_at = false);
~AsynchronousBoundedReadBuffer() override;
@ -74,6 +75,8 @@ private:
AsyncReadCountersPtr async_read_counters;
FilesystemReadPrefetchesLogPtr prefetches_log;
bool supports_read_at;
struct LastPrefetchInfo
{
std::chrono::system_clock::time_point submit_time;

View File

@ -38,6 +38,11 @@ namespace CurrentMetrics
extern const Metric ThreadPoolRemoteFSReaderThreadsScheduled;
}
namespace ErrorCodes
{
extern const int ATTEMPT_TO_READ_AFTER_EOF;
}
namespace DB
{
@ -77,6 +82,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
auto * fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
auto & reader = fd->getReader();
if (!request.supports_read_at)
{
ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds);
/// `seek` have to be done before checking `isContentCached`, and `set` have to be done prior to `seek`
@ -125,38 +131,75 @@ IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request, b
auto read_counters = fd->getReadCounters();
std::optional<AsyncReadIncrement> increment = read_counters ? std::optional<AsyncReadIncrement>(read_counters) : std::nullopt;
auto watch = std::make_unique<Stopwatch>(CLOCK_REALTIME);
IAsynchronousReader::Result read_result;
if (!request.supports_read_at)
{
ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds);
if (!seek_performed)
{
reader.set(request.buf, request.size);
reader.seek(request.offset, SEEK_SET);
ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds);
if (!seek_performed)
{
reader.set(request.buf, request.size);
reader.seek(request.offset, SEEK_SET);
}
if (request.ignore)
{
ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore);
reader.ignore(request.ignore);
}
}
if (request.ignore)
watch->start();
bool result = reader.available();
if (!result)
result = reader.next();
watch->stop();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds());
if (result)
{
ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore);
reader.ignore(request.ignore);
chassert(reader.buffer().begin() == request.buf);
chassert(reader.buffer().end() <= request.buf + request.size);
read_result.size = reader.buffer().size();
read_result.offset = reader.offset();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size);
}
}
auto watch = std::make_unique<Stopwatch>(CLOCK_REALTIME);
bool result = reader.available();
if (!result)
result = reader.next();
watch->stop();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds());
IAsynchronousReader::Result read_result;
if (result)
else
{
chassert(reader.buffer().begin() == request.buf);
chassert(reader.buffer().end() <= request.buf + request.size);
read_result.size = reader.buffer().size();
read_result.offset = reader.offset();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size);
size_t curr_offset = request.offset;
if (request.ignore)
{
ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds);
size_t bytes_to_ignore = request.ignore;
while (bytes_to_ignore)
{
size_t size = std::min(bytes_to_ignore, request.size);
size_t n = reader.readBigAt(request.buf, size, curr_offset, nullptr);
if (!n)
break;
bytes_to_ignore -= n;
curr_offset += n;
}
if (bytes_to_ignore)
throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after eof");
}
watch->start();
size_t n = reader.readBigAt(request.buf, request.size, curr_offset, nullptr);
watch->stop();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds());
if (n)
{
read_result.size = n;
read_result.offset = 0;
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size);
}
}
read_result.execution_watch = std::move(watch);

View File

@ -50,6 +50,7 @@ public:
char * buf = nullptr;
Priority priority;
size_t ignore = 0;
bool supports_read_at = false;
};
struct Result

View File

@ -1,26 +1,78 @@
#include <memory>
#include <string>
#include <IO/ParallelReadBuffer.h>
#include <IO/SharedThreadPools.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
#include <Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h>
#include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h>
#include <base/types.h>
#include <Common/Config/ConfigProcessor.h>
#include <Poco/Util/MapConfiguration.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/threadPoolCallbackRunner.h>
using namespace DB;
int main()
int main(int /*argc*/, char ** argv)
{
setenv("LIBHDFS3_CONF", "/path/to/hdfs-site.xml", true); /// NOLINT
String hdfs_uri = "hdfs://cluster_name";
String hdfs_file_path = "/path/to/hdfs/file";
ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration());
auto shared_context = SharedContextHolder(Context::createShared());
auto global_context = Context::createGlobal(shared_context.get());
global_context->makeGlobalContext();
global_context->setConfig(config);
getIOThreadPool().initialize(100, 0, 100);
setenv("LIBHDFS3_CONF", "/path/to/config", true); /// NOLINT
String hdfs_uri = "hdfs://clustername";
String hdfs_file_path = "/path/to/file";
ReadSettings read_settings;
ReadBufferFromHDFS read_buffer(hdfs_uri, hdfs_file_path, *config, read_settings, 2097152UL, false);
auto get_read_buffer = [&](bool async, bool prefetch, bool read_at)
{
auto rb = std::make_shared<ReadBufferFromHDFS>(hdfs_uri, hdfs_file_path, *config, read_settings, 0, true);
ReadBufferPtr res;
if (async)
{
std::cout << "use async" << std::endl;
if (prefetch)
std::cout << "use prefetch" << std::endl;
if (read_at)
std::cout << "use read_at" << std::endl;
read_settings.remote_fs_prefetch = prefetch;
res = std::make_shared<AsynchronousReadBufferFromHDFS>(
getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, rb, read_at);
}
else
res = rb;
return res;
};
auto wrap_parallel_if_needed = [&](ReadBufferPtr input, bool parallel) -> ReadBufferPtr
{
if (parallel)
{
std::cout << "use parallel" << std::endl;
return wrapInParallelReadBufferIfSupported(
*input, threadPoolCallbackRunnerUnsafe<void>(getIOThreadPool().get(), "ParallelRead"), 4, 10 * 1024 * 1024, 1529522144);
}
else
return input;
};
bool async = (std::stoi(std::string(argv[1])) != 0);
bool prefetch = (std::stoi(std::string(argv[2])) != 0);
bool read_at = (std::stoi(std::string(argv[3])) != 0);
bool parallel = (std::stoi(std::string(argv[4])) != 0);
std::cout << "async: " << async << ", prefetch: " << prefetch << ", read_at: " << read_at << ", parallel: " << parallel << std::endl;
auto holder = get_read_buffer(async, prefetch, read_at);
auto rb = wrap_parallel_if_needed(holder, parallel);
String download_path = "./download";
WriteBufferFromFile write_buffer(download_path);
copyData(read_buffer, write_buffer);
copyData(*rb, write_buffer);
return 0;
}

View File

@ -36,7 +36,7 @@ namespace ErrorCodes
}
AsynchronousReadBufferFromHDFS::AsynchronousReadBufferFromHDFS(
IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr<ReadBufferFromHDFS> impl_)
IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr<ReadBufferFromHDFS> impl_, bool enable_read_at)
: BufferWithOwnMemory<SeekableReadBuffer>(settings_.remote_fs_buffer_size)
, reader(reader_)
, base_priority(settings_.priority)
@ -44,6 +44,7 @@ AsynchronousReadBufferFromHDFS::AsynchronousReadBufferFromHDFS(
, prefetch_buffer(settings_.remote_fs_buffer_size)
, read_until_position(impl->getFileSize())
, use_prefetch(settings_.remote_fs_prefetch)
, supports_read_at(enable_read_at && impl->supportsReadAt())
, log(getLogger("AsynchronousReadBufferFromHDFS"))
{
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
@ -73,9 +74,23 @@ std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncRe
request.offset = file_offset_of_buffer_end;
request.priority = Priority{base_priority.value + priority.value};
request.ignore = 0;
request.supports_read_at = supports_read_at;
return reader.submit(request);
}
IAsynchronousReader::Result AsynchronousReadBufferFromHDFS::readInto(char * data, size_t size, Priority priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, nullptr);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = Priority{base_priority.value + priority.value};
request.ignore = 0;
request.supports_read_at = supports_read_at;
return reader.execute(request);
}
void AsynchronousReadBufferFromHDFS::prefetch(Priority priority)
{
interval_watch.restart();
@ -115,6 +130,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
size_t size = 0;
size_t bytes_read = 0;
IAsynchronousReader::Result result;
if (prefetch_future.valid())
{
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads);
@ -123,7 +139,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
{
Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
auto result = prefetch_future.get();
result = prefetch_future.get();
size = result.size;
offset = result.offset;
LOG_TEST(log, "Current size: {}, offset: {}", size, offset);
@ -147,7 +163,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
{
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
auto result = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get();
result = readInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY);
size = result.size;
auto offset = result.offset;
@ -164,7 +180,11 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
}
}
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
if (!supports_read_at)
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
else
file_offset_of_buffer_end += result.size;
prefetch_future = {};
if (use_prefetch && bytes_read)
@ -270,6 +290,21 @@ size_t AsynchronousReadBufferFromHDFS::getFileOffsetOfBufferEnd() const
return file_offset_of_buffer_end;
}
bool AsynchronousReadBufferFromHDFS::supportsRightBoundedReads() const
{
return true;
}
void AsynchronousReadBufferFromHDFS::setReadUntilPosition(size_t position)
{
read_until_position = position;
}
void AsynchronousReadBufferFromHDFS::setReadUntilEnd()
{
read_until_position = impl->getFileSize();
}
}
#endif

View File

@ -27,10 +27,17 @@ public:
AsynchronousReadBufferFromHDFS(
IAsynchronousReader & reader_,
const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromHDFS> impl_);
std::shared_ptr<ReadBufferFromHDFS> impl_,
bool enable_read_at = false);
~AsynchronousReadBufferFromHDFS() override;
bool supportsRightBoundedReads() const override;
void setReadUntilPosition(size_t position) override;
void setReadUntilEnd() override;
off_t seek(off_t offset_, int whence) override;
void prefetch(Priority priority) override;
@ -51,6 +58,7 @@ private:
bool hasPendingDataToRead();
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, Priority priority);
IAsynchronousReader::Result readInto(char * data, size_t size, Priority priority);
IAsynchronousReader & reader;
Priority base_priority;
@ -61,6 +69,7 @@ private:
size_t file_offset_of_buffer_end = 0;
std::optional<size_t> read_until_position;
bool use_prefetch;
bool supports_read_at;
LoggerPtr log;

View File

@ -165,6 +165,19 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
return file_offset;
}
bool supportsRightBoundedReads() const override { return true; }
void setReadUntilPosition(size_t position) override
{
read_until_position = position;
}
void setReadUntilEnd() override
{
read_until_position = 0;
}
size_t pread(char * buffer, size_t size, size_t offset)
{
ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, size);
@ -284,6 +297,21 @@ bool ReadBufferFromHDFS::supportsReadAt()
return true;
}
bool ReadBufferFromHDFS::supportsRightBoundedReads() const
{
return impl->supportsRightBoundedReads();
}
void ReadBufferFromHDFS::setReadUntilPosition(size_t position)
{
impl->setReadUntilPosition(position);
}
void ReadBufferFromHDFS::setReadUntilEnd()
{
impl->setReadUntilEnd();
}
}
#endif

View File

@ -34,6 +34,12 @@ public:
~ReadBufferFromHDFS() override;
bool supportsRightBoundedReads() const override;
void setReadUntilPosition(size_t position) override;
void setReadUntilEnd() override;
bool nextImpl() override;
off_t seek(off_t offset_, int whence) override;