Adjust range reader for remote fs reads

This commit is contained in:
kssenii 2021-10-15 11:36:26 +03:00
parent 31d98b0992
commit 3995506d37
33 changed files with 318 additions and 155 deletions

View File

@ -262,6 +262,7 @@
M(RemoteFSNewReaders, "Number of created impl objects") \
M(RemoteFSAsyncBuffers, "Total number of AsycnhronousReadIndirectBufferFromREmoteFS buffers") \
M(RemoteFSSimpleBuffers, "Total number of ReadIndirectBufferFromREmoteFS buffers") \
M(RemoteFSRedundantlyReadBytes, "") \
\
M(SleepFunctionCalls, "Number of times a sleep function (sleep, sleepEachRow) has been called.") \
M(SleepFunctionMicroseconds, "Time spent sleeping due to a sleep function call.") \

View File

@ -28,6 +28,12 @@ void CachedCompressedReadBuffer::initInput()
}
void CachedCompressedReadBuffer::prefetch()
{
file_in->prefetch();
}
bool CachedCompressedReadBuffer::nextImpl()
{
/// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists.

View File

@ -33,8 +33,11 @@ private:
UncompressedCache::MappedPtr owned_cell;
void initInput();
bool nextImpl() override;
void prefetch() override;
/// Passed into file_in.
ReadBufferFromFileBase::ProfileCallback profile_callback;
clockid_t clock_type {};

View File

@ -11,6 +11,7 @@
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <Compression/CompressionInfo.h>
#include <IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
@ -107,6 +108,13 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c
}
void CompressedReadBufferBase::setRightOffset(size_t offset)
{
if (auto * async_in = dynamic_cast<AsynchronousReadIndirectBufferFromRemoteFS *>(compressed_in))
async_in->setRightOffset(offset);
}
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum, bool always_copy)

View File

@ -60,6 +60,12 @@ public:
disable_checksum = true;
}
/**
* For asynchronous range reading from remote fs need to update last offset for current task,
* when newer tasks read behind previous task last mark.
*/
void setRightOffset(size_t offset);
public:
CompressionCodecPtr codec;
};

View File

@ -44,12 +44,6 @@ bool CompressedReadBufferFromFile::nextImpl()
}
void CompressedReadBufferFromFile::prefetch()
{
file_in.prefetch();
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0), p_file_in(std::move(buf)), file_in(*p_file_in)
{
@ -72,6 +66,12 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(
}
void CompressedReadBufferFromFile::prefetch()
{
file_in.prefetch();
}
void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
/// Nothing to do if we already at required position

View File

@ -42,6 +42,7 @@ private:
/* size_t nextimpl_working_buffer_offset; */
bool nextImpl() override;
void prefetch() override;
public:
@ -61,6 +62,7 @@ public:
{
file_in.setProfileCallback(profile_callback_, clock_type_);
}
};
}

View File

@ -168,7 +168,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool;
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, meta, getContext(), threadpool_read, read_settings);
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(path, url, meta, getContext(), threadpool_read, read_settings);
if (threadpool_read)
{

View File

@ -73,7 +73,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
"Read from file by path: {}. Existing HDFS objects: {}",
backQuote(metadata_path + path), metadata.remote_fs_objects.size());
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(config, remote_fs_root_path, metadata, read_settings.remote_fs_buffer_size);
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(path, config, remote_fs_root_path, metadata, read_settings.remote_fs_buffer_size);
if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool)
{

View File

@ -12,6 +12,7 @@
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#endif
#include <base/logger_useful.h>
#include <filesystem>
#include <iostream>
@ -27,31 +28,32 @@ namespace ErrorCodes
#if USE_AWS_S3
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path) const
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path, size_t last_offset) const
{
return std::make_unique<ReadBufferFromS3>(client_ptr, bucket,
fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, settings, threadpool_read);
fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, settings, threadpool_read, last_offset);
}
#endif
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path) const
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path, size_t last_offset) const
{
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, threadpool_read);
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, threadpool_read, last_offset);
}
#if USE_HDFS
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path) const
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path, size_t last_offset) const
{
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size);
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size, last_offset);
}
#endif
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_)
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_, const String & path_)
: ReadBuffer(nullptr, 0)
, metadata(metadata_)
, path(path_)
{
}
@ -91,7 +93,7 @@ void ReadBufferFromRemoteFSGather::initialize()
/// Do not create a new buffer if we already have what we need.
if (!current_buf || buf_idx != i)
{
current_buf = createImplementationBuffer(file_path);
current_buf = createImplementationBuffer(file_path, last_offset);
buf_idx = i;
}
@ -126,8 +128,8 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
++current_buf_idx;
const auto & path = metadata.remote_fs_objects[current_buf_idx].first;
current_buf = createImplementationBuffer(path);
const auto & current_path = metadata.remote_fs_objects[current_buf_idx].first;
current_buf = createImplementationBuffer(current_path, last_offset);
return readImpl();
}
@ -145,6 +147,7 @@ bool ReadBufferFromRemoteFSGather::readImpl()
if (bytes_to_ignore)
current_buf->ignore(bytes_to_ignore);
LOG_DEBUG(&Poco::Logger::get("Gather"), "Reading from path: {}", path);
auto result = current_buf->next();
swap(*current_buf);
@ -158,8 +161,17 @@ bool ReadBufferFromRemoteFSGather::readImpl()
void ReadBufferFromRemoteFSGather::seek(off_t offset)
{
current_buf.reset();
absolute_position = offset;
initialize();
// initialize();
}
void ReadBufferFromRemoteFSGather::setRightOffset(size_t offset)
{
assert(last_offset < offset);
current_buf.reset();
last_offset = offset;
}
@ -168,4 +180,13 @@ void ReadBufferFromRemoteFSGather::reset()
current_buf.reset();
}
String ReadBufferFromRemoteFSGather::getFileName() const
{
return path;
// if (current_buf)
// return fs::path(metadata.metadata_file_path) / metadata.remote_fs_objects[buf_idx].first;
// return metadata.metadata_file_path;
}
}

View File

@ -13,28 +13,31 @@ namespace Aws
namespace S3
{
class S3Client;
}}
}
}
namespace DB
{
class ReadBufferFromRemoteFSGather : public ReadBuffer
{
friend class ThreadPoolRemoteFSReader;
friend class ReadIndirectBufferFromRemoteFS;
public:
explicit ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_);
explicit ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_, const String & path_);
String getFileName() const { return metadata.metadata_file_path; }
String getFileName() const;
void reset();
void seek(off_t offset); /// SEEK_SET only.
protected:
void setRightOffset(size_t offset);
size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path) const = 0;
protected:
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const = 0;
RemoteMetadata metadata;
@ -54,6 +57,10 @@ private:
size_t buf_idx = 0;
size_t bytes_to_ignore = 0;
size_t last_offset = 0;
String path;
};
@ -63,13 +70,14 @@ class ReadBufferFromS3Gather final : public ReadBufferFromRemoteFSGather
{
public:
ReadBufferFromS3Gather(
const String & path_,
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
IDiskRemote::Metadata metadata_,
size_t max_single_read_retries_,
const ReadSettings & settings_,
bool threadpool_read_ = false)
: ReadBufferFromRemoteFSGather(metadata_)
: ReadBufferFromRemoteFSGather(metadata_, path_)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, max_single_read_retries(max_single_read_retries_)
@ -78,7 +86,7 @@ public:
{
}
SeekableReadBufferPtr createImplementationBuffer(const String & path) const override;
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const override;
private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
@ -94,12 +102,13 @@ class ReadBufferFromWebServerGather final : public ReadBufferFromRemoteFSGather
{
public:
ReadBufferFromWebServerGather(
const String & path_,
const String & uri_,
RemoteMetadata metadata_,
ContextPtr context_,
size_t threadpool_read_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(metadata_)
: ReadBufferFromRemoteFSGather(metadata_, path_)
, uri(uri_)
, context(context_)
, threadpool_read(threadpool_read_)
@ -107,7 +116,7 @@ public:
{
}
SeekableReadBufferPtr createImplementationBuffer(const String & path) const override;
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const override;
private:
String uri;
@ -123,11 +132,12 @@ class ReadBufferFromHDFSGather final : public ReadBufferFromRemoteFSGather
{
public:
ReadBufferFromHDFSGather(
const String & path_,
const Poco::Util::AbstractConfiguration & config_,
const String & hdfs_uri_,
IDiskRemote::Metadata metadata_,
size_t buf_size_)
: ReadBufferFromRemoteFSGather(metadata_)
: ReadBufferFromRemoteFSGather(metadata_, path_)
, config(config_)
, buf_size(buf_size_)
{
@ -136,7 +146,7 @@ public:
hdfs_uri = hdfs_uri_.substr(0, begin_of_path);
}
SeekableReadBufferPtr createImplementationBuffer(const String & path) const override;
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const override;
private:
const Poco::Util::AbstractConfiguration & config;

View File

@ -205,6 +205,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool;
auto s3_impl = std::make_unique<ReadBufferFromS3Gather>(
path,
settings->client, bucket, metadata,
settings->s3_max_single_read_retries, read_settings, threadpool_read);

View File

@ -200,27 +200,27 @@ void registerDiskS3(DiskFactory & factory)
s3disk->startup();
bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true);
// bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true);
if (cache_enabled)
{
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
// if (cache_enabled)
// {
// String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
if (metadata_path == cache_path)
throw Exception("Metadata and cache path should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS);
// if (metadata_path == cache_path)
// throw Exception("Metadata and cache path should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS);
auto cache_disk = std::make_shared<DiskLocal>("s3-cache", cache_path, 0);
auto cache_file_predicate = [] (const String & path)
{
return path.ends_with("idx") // index files.
|| path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") // mark files.
|| path.ends_with("txt") || path.ends_with("dat");
};
// auto cache_disk = std::make_shared<DiskLocal>("s3-cache", cache_path, 0);
// auto cache_file_predicate = [] (const String & path)
// {
// return path.ends_with("idx") // index files.
// || path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") // mark files.
// || path.ends_with("txt") || path.ends_with("dat");
// };
s3disk = std::make_shared<DiskCacheWrapper>(s3disk, cache_disk, cache_file_predicate);
}
// s3disk = std::make_shared<DiskCacheWrapper>(s3disk, cache_disk, cache_file_predicate);
// }
return std::make_shared<DiskRestartProxy>(s3disk);
return s3disk;
};
factory.registerDiskType("s3", creator);
}

View File

@ -2,6 +2,7 @@
#include <Common/Stopwatch.h>
#include <IO/ThreadPoolRemoteFSReader.h>
#include <base/logger_useful.h>
namespace CurrentMetrics
@ -35,14 +36,16 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
Int32 priority_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t buf_size_,
size_t min_bytes_for_seek_)
size_t /* min_bytes_for_seek_ */)
: ReadBufferFromFileBase(buf_size_, nullptr, 0)
, reader(reader_)
, priority(priority_)
, impl(impl_)
, prefetch_buffer(buf_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
// , min_bytes_for_seek(min_bytes_for_seek_)
{
ProfileEvents::increment(ProfileEvents::RemoteFSAsyncBuffers);
buffer_events += impl->getFileName() + " : ";
}
@ -66,15 +69,31 @@ std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemot
void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
{
if (hasPendingData())
return;
if (hasPendingData())
return;
if (prefetch_future.valid())
return;
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
buffer_events += "-- Prefetch (" + toString(absolute_position) + ") --";
}
void AsynchronousReadIndirectBufferFromRemoteFS::setRightOffset(size_t offset)
{
buffer_events += "-- Set last offset " + toString(offset) + "--";
if (prefetch_future.valid())
return;
{
buffer_events += "-- Cancelling because of offset update --";
ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches);
prefetch_future.wait();
prefetch_future = {};
}
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
buffer_events += "-- Prefetch --";
last_offset = offset;
impl->setRightOffset(offset);
}
@ -86,7 +105,6 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
if (prefetch_future.valid())
{
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchReads);
buffer_events += "-- Read from prefetch --";
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
Stopwatch watch;
@ -100,13 +118,17 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
absolute_position += size;
}
}
buffer_events += fmt::format("-- Read from prefetch from offset: {}, upper bound: {}, actually read: {} --",
toString(absolute_position), toString(last_offset), toString(size));
watch.stop();
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
}
else
{
buffer_events += "-- Read without prefetch --";
size = readInto(memory.data(), memory.size()).get();
buffer_events += fmt::format("-- Read without prefetch from offset: {}, upper bound: {}, actually read: {} --",
toString(absolute_position), toString(last_offset), toString(size));
if (size)
{
set(memory.data(), memory.size());
@ -115,7 +137,6 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
}
}
buffer_events += " + " + toString(size) + " + ";
prefetch_future = {};
return size;
}
@ -163,6 +184,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
if (prefetch_future.valid())
{
buffer_events += "-- cancelling prefetch because of seek --";
ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches);
prefetch_future.wait();
prefetch_future = {};
@ -170,16 +192,18 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
pos = working_buffer.end();
if (static_cast<off_t>(absolute_position) >= getPosition()
&& static_cast<off_t>(absolute_position) < getPosition() + static_cast<off_t>(min_bytes_for_seek))
{
/**
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
*/
bytes_to_ignore = absolute_position - getPosition();
}
else
// if (static_cast<off_t>(absolute_position) >= getPosition()
// && static_cast<off_t>(absolute_position) < getPosition() + static_cast<off_t>(min_bytes_for_seek))
// {
// /**
// * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
// */
// // bytes_to_ignore = absolute_position - getPosition();
// impl->seek(absolute_position); /// SEEK_SET.
// }
// else
{
buffer_events += "-- Impl seek --";
impl->seek(absolute_position); /// SEEK_SET.
}
@ -189,14 +213,14 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
{
std::cerr << "\n\n\nBuffer events: " << buffer_events << std::endl;
if (prefetch_future.valid())
{
buffer_events += "-- cancelling prefetch in finalize --";
ProfileEvents::increment(ProfileEvents::RemoteFSUnusedCancelledPrefetches);
prefetch_future.wait();
prefetch_future = {};
}
std::cerr << "Buffer events: " << buffer_events << std::endl;
}

View File

@ -47,6 +47,8 @@ public:
void prefetch() override;
void setRightOffset(size_t offset);
private:
bool nextImpl() override;
@ -68,8 +70,10 @@ private:
String buffer_events;
size_t min_bytes_for_seek;
// size_t min_bytes_for_seek;
size_t bytes_to_ignore = 0;
Int64 last_offset = 0;
};
}

View File

@ -32,7 +32,7 @@ namespace ErrorCodes
ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_,
UInt64 max_single_read_retries_, const ReadSettings & settings_, bool use_external_buffer_)
UInt64 max_single_read_retries_, const ReadSettings & settings_, bool use_external_buffer_, size_t last_offset_)
: SeekableReadBuffer(nullptr, 0)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
@ -40,11 +40,22 @@ ReadBufferFromS3::ReadBufferFromS3(
, max_single_read_retries(max_single_read_retries_)
, read_settings(settings_)
, use_external_buffer(use_external_buffer_)
, last_offset(last_offset_)
{
}
bool ReadBufferFromS3::nextImpl()
{
if (last_offset)
{
if (static_cast<off_t>(last_offset) == offset)
{
impl.reset();
working_buffer.resize(0);
return false;
}
}
bool next_result = false;
/// `impl` has been initialized earlier and now we're at the end of the current portion of data.
@ -162,16 +173,17 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
req.SetBucket(bucket);
req.SetKey(key);
auto right_offset = read_settings.remote_read_right_offset;
if (right_offset)
// auto right_offset = read_settings.remote_read_right_offset;
if (last_offset)
{
req.SetRange(fmt::format("bytes={}-{}", offset, right_offset));
LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, right_offset);
req.SetRange(fmt::format("bytes={}-{}", offset, last_offset - 1));
LOG_DEBUG(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, last_offset - 1);
}
else
{
req.SetRange(fmt::format("bytes={}-", offset));
LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset);
LOG_DEBUG(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset);
}
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);

View File

@ -45,8 +45,10 @@ public:
const String & key_,
UInt64 max_single_read_retries_,
const ReadSettings & settings_,
bool use_external_buffer = false);
bool use_external_buffer = false,
size_t last_offset_ = 0);
size_t right = 0;
bool nextImpl() override;
off_t seek(off_t off, int whence) override;
@ -57,6 +59,7 @@ private:
ReadSettings read_settings;
bool use_external_buffer;
size_t last_offset;
};
}

View File

@ -23,7 +23,7 @@ namespace ErrorCodes
static constexpr size_t HTTP_MAX_TRIES = 10;
ReadBufferFromWebServer::ReadBufferFromWebServer(
const String & url_, ContextPtr context_, const ReadSettings & settings_, bool use_external_buffer_)
const String & url_, ContextPtr context_, const ReadSettings & settings_, bool use_external_buffer_, size_t)
: SeekableReadBuffer(nullptr, 0)
, log(&Poco::Logger::get("ReadBufferFromWebServer"))
, context(context_)
@ -108,7 +108,7 @@ void ReadBufferFromWebServer::initializeWithRetry()
if (i == num_tries - 1)
throw;
LOG_ERROR(&Poco::Logger::get("ReadBufferFromWeb"), e.what());
LOG_ERROR(&Poco::Logger::get("ReadBufferFromWeb"), "Error: {}, code: {}", e.what(), e.code());
sleepForMilliseconds(milliseconds_to_wait);
milliseconds_to_wait *= 2;
}

View File

@ -20,7 +20,8 @@ public:
explicit ReadBufferFromWebServer(
const String & url_, ContextPtr context_,
const ReadSettings & settings_ = {},
bool use_external_buffer_ = false);
bool use_external_buffer_ = false,
size_t last_offset = 0);
bool nextImpl() override;

View File

@ -125,7 +125,7 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
const String & hdfs_uri_,
const String & hdfs_file_path_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_)
size_t buf_size_, size_t)
: SeekableReadBuffer(nullptr, 0)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_uri_, hdfs_file_path_, config_, buf_size_))
{

View File

@ -27,7 +27,9 @@ struct ReadBufferFromHDFSImpl;
public:
ReadBufferFromHDFS(const String & hdfs_uri_, const String & hdfs_file_path_,
const Poco::Util::AbstractConfiguration & config_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE,
size_t last_offset = 0);
~ReadBufferFromHDFS() override;

View File

@ -31,7 +31,8 @@ public:
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
virtual size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) = 0;
virtual size_t readRows(size_t from_mark, size_t current_task_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) = 0;
virtual bool canReadIncompleteGranules() const = 0;

View File

@ -55,8 +55,11 @@ static void filterColumns(Columns & columns, const ColumnPtr & filter)
MergeTreeRangeReader::DelayedStream::DelayedStream(
size_t from_mark, IMergeTreeReader * merge_tree_reader_)
size_t from_mark,
size_t current_task_last_mark_,
IMergeTreeReader * merge_tree_reader_)
: current_mark(from_mark), current_offset(0), num_delayed_rows(0)
, current_task_last_mark(current_task_last_mark_)
, merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
, continue_reading(false), is_finished(false)
@ -73,7 +76,8 @@ size_t MergeTreeRangeReader::DelayedStream::readRows(Columns & columns, size_t n
{
if (num_rows)
{
size_t rows_read = merge_tree_reader->readRows(current_mark, continue_reading, num_rows, columns);
size_t rows_read = merge_tree_reader->readRows(
current_mark, current_task_last_mark, continue_reading, num_rows, columns);
continue_reading = true;
/// Zero rows_read maybe either because reading has finished
@ -151,13 +155,13 @@ size_t MergeTreeRangeReader::DelayedStream::finalize(Columns & columns)
MergeTreeRangeReader::Stream::Stream(
size_t from_mark, size_t to_mark, IMergeTreeReader * merge_tree_reader_)
size_t from_mark, size_t to_mark, size_t current_task_last_mark, IMergeTreeReader * merge_tree_reader_)
: current_mark(from_mark), offset_after_current_mark(0)
, last_mark(to_mark)
, merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
, current_mark_index_granularity(index_granularity->getMarkRows(from_mark))
, stream(from_mark, merge_tree_reader)
, stream(from_mark, current_task_last_mark, merge_tree_reader)
{
size_t marks_count = index_granularity->getMarksCount();
if (from_mark >= marks_count)
@ -280,9 +284,9 @@ void MergeTreeRangeReader::ReadResult::adjustLastGranule()
throw Exception("Can't adjust last granule because no granules were added.", ErrorCodes::LOGICAL_ERROR);
if (num_rows_to_subtract > rows_per_granule.back())
throw Exception("Can't adjust last granule because it has " + toString(rows_per_granule.back())
+ " rows, but try to subtract " + toString(num_rows_to_subtract) + " rows.",
ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Can't adjust last granule because it has {} rows, but try to subtract {} rows.",
toString(rows_per_granule.back()), toString(num_rows_to_subtract));
rows_per_granule.back() -= num_rows_to_subtract;
total_rows_per_granule -= num_rows_to_subtract;
@ -750,6 +754,16 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
ReadResult result;
result.columns.resize(merge_tree_reader->getColumns().size());
auto current_task_last_mark_range = std::max_element(ranges.begin(), ranges.end(),
[&](const MarkRange & range1, const MarkRange & range2)
{
return range1.end < range2.end;
});
size_t current_task_last_mark = 0;
if (current_task_last_mark_range != ranges.end())
current_task_last_mark = current_task_last_mark_range->end;
/// Stream is lazy. result.num_added_rows is the number of rows added to block which is not equal to
/// result.num_rows_read until call to stream.finalize(). Also result.num_added_rows may be less than
/// result.num_rows_read if the last granule in range also the last in part (so we have to adjust last granule).
@ -760,7 +774,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
if (stream.isFinished())
{
result.addRows(stream.finalize(result.columns));
stream = Stream(ranges.front().begin, ranges.front().end, merge_tree_reader);
stream = Stream(ranges.front().begin, ranges.front().end, current_task_last_mark, merge_tree_reader);
result.addRange(ranges.front());
ranges.pop_front();
}
@ -818,7 +832,7 @@ Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t &
num_rows += stream.finalize(columns);
const auto & range = started_ranges[next_range_to_start].range;
++next_range_to_start;
stream = Stream(range.begin, range.end, merge_tree_reader);
stream = Stream(range.begin, range.end, 0, merge_tree_reader);
}
bool last = i + 1 == size;

View File

@ -62,7 +62,7 @@ public:
{
public:
DelayedStream() = default;
DelayedStream(size_t from_mark, IMergeTreeReader * merge_tree_reader);
DelayedStream(size_t from_mark, size_t current_task_last_mark_, IMergeTreeReader * merge_tree_reader);
/// Read @num_rows rows from @from_mark starting from @offset row
/// Returns the number of rows added to block.
@ -81,6 +81,8 @@ public:
size_t current_offset = 0;
/// Num of rows we have to read
size_t num_delayed_rows = 0;
/// Last mark from all ranges of current task.
size_t current_task_last_mark = 0;
/// Actual reader of data from disk
IMergeTreeReader * merge_tree_reader = nullptr;
@ -99,7 +101,8 @@ public:
{
public:
Stream() = default;
Stream(size_t from_mark, size_t to_mark, IMergeTreeReader * merge_tree_reader);
Stream(size_t from_mark, size_t to_mark,
size_t current_task_last_mark, IMergeTreeReader * merge_tree_reader);
/// Returns the number of rows added to block.
size_t read(Columns & columns, size_t num_rows, bool skip_remaining_rows_in_current_granule);
@ -122,6 +125,7 @@ public:
/// Invariant: offset_after_current_mark + skipped_rows_after_offset < index_granularity
size_t offset_after_current_mark = 0;
/// Last mark in current range.
size_t last_mark = 0;
IMergeTreeReader * merge_tree_reader = nullptr;

View File

@ -121,7 +121,8 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
}
}
size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
size_t MergeTreeReaderCompact::readRows(
size_t from_mark, size_t /* current_task_last_mark */, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
if (continue_reading)
from_mark = next_mark;

View File

@ -32,7 +32,8 @@ public:
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
size_t readRows(size_t from_mark, size_t current_task_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
bool canReadIncompleteGranules() const override { return false; }

View File

@ -37,7 +37,8 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
}
}
size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
size_t MergeTreeReaderInMemory::readRows(
size_t from_mark, size_t /* current_task_last_mark */, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
if (!continue_reading)
total_rows_read = 0;

View File

@ -23,7 +23,8 @@ public:
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
size_t readRows(size_t from_mark, size_t current_tasl_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
bool canReadIncompleteGranules() const override { return true; }

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeReaderStream.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <base/getThreadId.h>
#include <utility>
@ -12,68 +13,38 @@ namespace ErrorCodes
extern const int ARGUMENT_OUT_OF_BOUND;
}
MergeTreeReaderStream::MergeTreeReaderStream(
DiskPtr disk_,
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
const MarkRanges & all_mark_ranges,
const MergeTreeReaderSettings & settings,
MarkCache * mark_cache_,
UncompressedCache * uncompressed_cache, size_t file_size,
UncompressedCache * uncompressed_cache, size_t file_size_,
const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
: disk(std::move(disk_)), path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_)
, mark_cache(mark_cache_), save_marks_in_cache(settings.save_marks_in_cache)
, index_granularity_info(index_granularity_info_)
, marks_loader(disk, mark_cache, index_granularity_info->getMarksFilePath(path_prefix),
marks_count, *index_granularity_info, save_marks_in_cache)
: disk(std::move(disk_))
, path_prefix(path_prefix_)
, data_file_extension(data_file_extension_)
, marks_count(marks_count_)
, file_size(file_size_)
, mark_cache(mark_cache_)
, save_marks_in_cache(settings.save_marks_in_cache)
, index_granularity_info(index_granularity_info_)
, marks_loader(disk, mark_cache, index_granularity_info->getMarksFilePath(path_prefix),
marks_count, *index_granularity_info, save_marks_in_cache)
{
/// Compute the size of the buffer.
size_t max_mark_range_bytes = 0;
size_t sum_mark_range_bytes = 0;
/// Rightmost bound to read.
size_t right_bound = 0;
for (const auto & mark_range : all_mark_ranges)
{
size_t left_mark = mark_range.begin;
size_t right_mark = mark_range.end;
/// NOTE: if we are reading the whole file, then right_mark == marks_count
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
/// If the end of range is inside the block, we will need to read it too.
if (right_mark < marks_count && marks_loader.getMark(right_mark).offset_in_decompressed_block > 0)
{
auto indices = collections::range(right_mark, marks_count);
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark, [this](size_t i, size_t j)
{
return marks_loader.getMark(i).offset_in_compressed_file < marks_loader.getMark(j).offset_in_compressed_file;
});
right_mark = (it == indices.end() ? marks_count : *it);
}
size_t mark_range_bytes;
size_t current_right_offset;
/// If there are no marks after the end of range, just use file size
if (right_mark >= marks_count
|| (right_mark + 1 == marks_count
&& marks_loader.getMark(right_mark).offset_in_compressed_file == marks_loader.getMark(mark_range.end).offset_in_compressed_file))
{
mark_range_bytes = file_size - (left_mark < marks_count ? marks_loader.getMark(left_mark).offset_in_compressed_file : 0);
current_right_offset = file_size;
}
else
{
mark_range_bytes = marks_loader.getMark(right_mark).offset_in_compressed_file - marks_loader.getMark(left_mark).offset_in_compressed_file;
current_right_offset = marks_loader.getMark(right_mark).offset_in_compressed_file;
}
auto [right_offset, mark_range_bytes] = getRightOffsetAndBytesRange(left_mark, right_mark);
max_mark_range_bytes = std::max(max_mark_range_bytes, mark_range_bytes);
sum_mark_range_bytes += mark_range_bytes;
right_bound = std::max(right_bound, current_right_offset);
}
/// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality.
@ -82,9 +53,6 @@ MergeTreeReaderStream::MergeTreeReaderStream(
if (max_mark_range_bytes != 0)
read_settings = read_settings.adjustBufferSize(max_mark_range_bytes);
/// Set bound for reading from remote disk.
read_settings.remote_read_right_offset = right_bound;
/// Initialize the objects that shall be used to perform read operations.
if (uncompressed_cache)
{
@ -128,6 +96,45 @@ MergeTreeReaderStream::MergeTreeReaderStream(
}
std::pair<size_t, size_t> MergeTreeReaderStream::getRightOffsetAndBytesRange(size_t left_mark, size_t right_mark)
{
/// NOTE: if we are reading the whole file, then right_mark == marks_count
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
/// If the end of range is inside the block, we will need to read it too.
size_t result_right_mark = right_mark;
if (right_mark < marks_count && marks_loader.getMark(right_mark).offset_in_decompressed_block > 0)
{
auto indices = collections::range(right_mark, marks_count);
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark, [this](size_t i, size_t j)
{
return marks_loader.getMark(i).offset_in_compressed_file < marks_loader.getMark(j).offset_in_compressed_file;
});
result_right_mark = (it == indices.end() ? marks_count : *it);
}
size_t right_offset;
size_t mark_range_bytes;
/// If there are no marks after the end of range, just use file size
if (result_right_mark >= marks_count
|| (result_right_mark + 1 == marks_count
&& marks_loader.getMark(result_right_mark).offset_in_compressed_file == marks_loader.getMark(right_mark).offset_in_compressed_file))
{
right_offset = file_size;
mark_range_bytes = right_offset - (left_mark < marks_count ? marks_loader.getMark(left_mark).offset_in_compressed_file : 0);
}
else
{
right_offset = marks_loader.getMark(result_right_mark).offset_in_compressed_file;
mark_range_bytes = right_offset - marks_loader.getMark(left_mark).offset_in_compressed_file;
}
return std::make_pair(right_offset, mark_range_bytes);
}
void MergeTreeReaderStream::seekToMark(size_t index)
{
MarkInCompressedFile mark = marks_loader.getMark(index);
@ -172,4 +179,18 @@ void MergeTreeReaderStream::seekToStart()
}
}
void MergeTreeReaderStream::adjustForRange(size_t left_mark, size_t right_mark)
{
auto [right_offset, mark_range_bytes] = getRightOffsetAndBytesRange(left_mark, right_mark);
if (right_offset > last_right_offset)
{
last_right_offset = right_offset;
if (cached_buffer)
cached_buffer->setRightOffset(last_right_offset);
if (non_cached_buffer)
non_cached_buffer->setRightOffset(last_right_offset);
}
}
}

View File

@ -23,25 +23,32 @@ public:
const MarkRanges & all_mark_ranges,
const MergeTreeReaderSettings & settings_,
MarkCache * mark_cache, UncompressedCache * uncompressed_cache,
size_t file_size, const MergeTreeIndexGranularityInfo * index_granularity_info_,
size_t file_size_, const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
void seekToMark(size_t index);
void seekToStart();
void adjustForRange(size_t left_mark, size_t right_mark);
ReadBuffer * data_buffer;
private:
std::pair<size_t, size_t> getRightOffsetAndBytesRange(size_t left_mark, size_t right_mark);
DiskPtr disk;
std::string path_prefix;
std::string data_file_extension;
size_t marks_count;
size_t file_size;
MarkCache * mark_cache;
bool save_marks_in_cache;
size_t last_right_offset = 0;
const MergeTreeIndexGranularityInfo * index_granularity_info;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;

View File

@ -61,7 +61,8 @@ MergeTreeReaderWide::MergeTreeReaderWide(
}
size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
size_t MergeTreeReaderWide::readRows(
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
size_t read_rows = 0;
try
@ -87,7 +88,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
try
{
auto & cache = caches[column_from_part.getNameInStorage()];
prefetch(column_from_part, from_mark, continue_reading, cache, prefetched_streams);
prefetch(column_from_part, from_mark, continue_reading, current_task_last_mark, cache, prefetched_streams);
}
catch (Exception & e)
{
@ -117,7 +118,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
auto & cache = caches[column_from_part.getNameInStorage()];
readData(
column_from_part, column, from_mark, continue_reading,
column_from_part, column, from_mark, continue_reading, current_task_last_mark,
max_rows_to_read, cache, /* was_prefetched =*/ !prefetched_streams.empty());
/// For elements of Nested, column_size_before_reading may be greater than column size
@ -199,6 +200,7 @@ static ReadBuffer * getStream(
MergeTreeReaderWide::FileStreams & streams,
const NameAndTypePair & name_and_type,
size_t from_mark, bool seek_to_mark,
size_t current_task_last_mark,
ISerialization::SubstreamsCache & cache)
{
/// If substream have already been read.
@ -212,6 +214,7 @@ static ReadBuffer * getStream(
return nullptr;
MergeTreeReaderStream & stream = *it->second;
stream.adjustForRange(seek_to_start ? 0 : from_mark, current_task_last_mark);
if (seek_to_start)
stream.seekToStart();
@ -226,6 +229,7 @@ void MergeTreeReaderWide::prefetch(
const NameAndTypePair & name_and_type,
size_t from_mark,
bool continue_reading,
size_t current_task_last_mark,
ISerialization::SubstreamsCache & cache,
std::unordered_set<std::string> & prefetched_streams)
{
@ -239,7 +243,7 @@ void MergeTreeReaderWide::prefetch(
if (!prefetched_streams.count(stream_name))
{
bool seek_to_mark = !continue_reading;
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache))
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache))
buf->prefetch();
prefetched_streams.insert(stream_name);
@ -250,8 +254,8 @@ void MergeTreeReaderWide::prefetch(
void MergeTreeReaderWide::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
ISerialization::SubstreamsCache & cache, bool was_prefetched)
size_t from_mark, bool continue_reading, size_t current_task_last_mark,
size_t max_rows_to_read, ISerialization::SubstreamsCache & cache, bool was_prefetched)
{
double & avg_value_size_hint = avg_value_size_hints[name_and_type.name];
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
@ -264,7 +268,7 @@ void MergeTreeReaderWide::readData(
{
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
{
return getStream(/* seek_to_start = */true, substream_path, streams, name_and_type, from_mark, /* seek_to_mark = */false, cache);
return getStream(/* seek_to_start = */true, substream_path, streams, name_and_type, from_mark, /* seek_to_mark = */false, current_task_last_mark, cache);
};
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]);
}
@ -275,7 +279,7 @@ void MergeTreeReaderWide::readData(
return getStream(
/* seek_to_start = */false, substream_path, streams, name_and_type, from_mark,
seek_to_mark, cache);
seek_to_mark, current_task_last_mark, cache);
};
deserialize_settings.continuous_reading = continue_reading;
auto & deserialize_state = deserialize_binary_bulk_state_map[name];

View File

@ -28,7 +28,8 @@ public:
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
size_t readRows(size_t from_mark, size_t current_task_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
bool canReadIncompleteGranules() const override { return true; }
@ -39,13 +40,14 @@ private:
FileStreams streams;
Serializations serializations;
DiskPtr disk;
std::map<String, std::set<size_t>> marks;
void addStreams(const NameAndTypePair & name_and_type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
void readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
size_t from_mark, bool continue_reading, size_t current_task_last_mark, size_t max_rows_to_read,
ISerialization::SubstreamsCache & cache, bool was_prefetched);
/// Make next readData more simple by calling 'prefetch' of all related ReadBuffers (column streams).
@ -53,6 +55,7 @@ private:
const NameAndTypePair & name_and_type,
size_t from_mark,
bool continue_reading,
size_t current_task_last_mark,
ISerialization::SubstreamsCache & cache,
std::unordered_set<std::string> & prefetched_streams); /// if stream was already prefetched do nothing
};

View File

@ -78,7 +78,8 @@ try
const auto & sample = reader->getColumns();
Columns columns(sample.size());
size_t rows_read = reader->readRows(current_mark, continue_reading, rows_to_read, columns);
/// TODO: pass stream size instead of zero?
size_t rows_read = reader->readRows(current_mark, 0, continue_reading, rows_to_read, columns);
if (rows_read)
{