mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 09:32:01 +00:00
Review fixes
This commit is contained in:
parent
5f24eb10ec
commit
e4e157688d
@ -80,8 +80,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
|
||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool)
|
||||
{
|
||||
auto reader = getThreadPoolReader();
|
||||
auto buf = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings.priority, std::move(hdfs_impl));
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
|
||||
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings.priority, std::move(hdfs_impl));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -56,13 +56,41 @@ String AsynchronousReadIndirectBufferFromRemoteFS::getFileName() const
|
||||
}
|
||||
|
||||
|
||||
size_t AsynchronousReadIndirectBufferFromRemoteFS::getNumBytesToRead()
|
||||
{
|
||||
size_t num_bytes_to_read;
|
||||
|
||||
/// Position is set only for MergeTree tables.
|
||||
if (read_until_position)
|
||||
{
|
||||
/// Everything is already read.
|
||||
if (file_offset_of_buffer_end == *read_until_position)
|
||||
return 0;
|
||||
|
||||
if (file_offset_of_buffer_end > *read_until_position)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
|
||||
file_offset_of_buffer_end, *read_until_position);
|
||||
|
||||
/// Read range [file_offset_of_buffer_end, read_until_position).
|
||||
num_bytes_to_read = *read_until_position - file_offset_of_buffer_end;
|
||||
num_bytes_to_read = std::min(num_bytes_to_read, internal_buffer.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
num_bytes_to_read = internal_buffer.size();
|
||||
}
|
||||
|
||||
return num_bytes_to_read;
|
||||
}
|
||||
|
||||
|
||||
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::readInto(char * data, size_t size)
|
||||
{
|
||||
IAsynchronousReader::Request request;
|
||||
request.descriptor = std::make_shared<ThreadPoolRemoteFSReader::RemoteFSFileDescriptor>(impl);
|
||||
request.buf = data;
|
||||
request.size = size;
|
||||
request.offset = absolute_position;
|
||||
request.offset = file_offset_of_buffer_end;
|
||||
request.priority = priority;
|
||||
|
||||
if (bytes_to_ignore)
|
||||
@ -79,17 +107,12 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
|
||||
if (prefetch_future.valid())
|
||||
return;
|
||||
|
||||
/// Everything is already read.
|
||||
if (absolute_position == last_offset)
|
||||
auto num_bytes_to_read = getNumBytesToRead();
|
||||
if (!num_bytes_to_read)
|
||||
return;
|
||||
|
||||
if (absolute_position > last_offset)
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
|
||||
absolute_position, last_offset);
|
||||
}
|
||||
|
||||
/// Prefetch even in case hasPendingData() == true.
|
||||
prefetch_buffer.resize(num_bytes_to_read);
|
||||
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
|
||||
}
|
||||
@ -98,27 +121,15 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
|
||||
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t position)
|
||||
{
|
||||
if (prefetch_future.valid())
|
||||
{
|
||||
/// TODO: Planning to put logical error here after more testing,
|
||||
// because seems like future is never supposed to be valid at this point.
|
||||
std::terminate();
|
||||
}
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Prefetch is valid in readUntilPosition");
|
||||
|
||||
last_offset = position;
|
||||
read_until_position = position;
|
||||
impl->setReadUntilPosition(position);
|
||||
}
|
||||
|
||||
|
||||
bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
|
||||
{
|
||||
/// Everything is already read.
|
||||
if (absolute_position == last_offset)
|
||||
return false;
|
||||
|
||||
if (absolute_position > last_offset)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
|
||||
absolute_position, last_offset);
|
||||
|
||||
size_t size = 0;
|
||||
|
||||
if (prefetch_future.valid())
|
||||
@ -134,7 +145,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
|
||||
memory.swap(prefetch_buffer);
|
||||
set(memory.data(), memory.size());
|
||||
working_buffer.resize(size);
|
||||
absolute_position += size;
|
||||
file_offset_of_buffer_end += size;
|
||||
}
|
||||
}
|
||||
|
||||
@ -143,14 +154,18 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
|
||||
}
|
||||
else
|
||||
{
|
||||
auto num_bytes_to_read = getNumBytesToRead();
|
||||
if (!num_bytes_to_read) /// Nothing to read.
|
||||
return false;
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
|
||||
size = readInto(memory.data(), memory.size()).get();
|
||||
size = readInto(memory.data(), num_bytes_to_read).get();
|
||||
|
||||
if (size)
|
||||
{
|
||||
set(memory.data(), memory.size());
|
||||
working_buffer.resize(size);
|
||||
absolute_position += size;
|
||||
file_offset_of_buffer_end += size;
|
||||
}
|
||||
}
|
||||
|
||||
@ -166,24 +181,24 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
|
||||
if (whence == SEEK_CUR)
|
||||
{
|
||||
/// If position within current working buffer - shift pos.
|
||||
if (!working_buffer.empty() && static_cast<size_t>(getPosition() + offset_) < absolute_position)
|
||||
if (!working_buffer.empty() && static_cast<size_t>(getPosition() + offset_) < file_offset_of_buffer_end)
|
||||
{
|
||||
pos += offset_;
|
||||
return getPosition();
|
||||
}
|
||||
else
|
||||
{
|
||||
absolute_position += offset_;
|
||||
file_offset_of_buffer_end += offset_;
|
||||
}
|
||||
}
|
||||
else if (whence == SEEK_SET)
|
||||
{
|
||||
/// If position is within current working buffer - shift pos.
|
||||
if (!working_buffer.empty()
|
||||
&& static_cast<size_t>(offset_) >= absolute_position - working_buffer.size()
|
||||
&& size_t(offset_) < absolute_position)
|
||||
&& static_cast<size_t>(offset_) >= file_offset_of_buffer_end - working_buffer.size()
|
||||
&& size_t(offset_) < file_offset_of_buffer_end)
|
||||
{
|
||||
pos = working_buffer.end() - (absolute_position - offset_);
|
||||
pos = working_buffer.end() - (file_offset_of_buffer_end - offset_);
|
||||
|
||||
assert(pos >= working_buffer.begin());
|
||||
assert(pos <= working_buffer.end());
|
||||
@ -192,7 +207,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
|
||||
}
|
||||
else
|
||||
{
|
||||
absolute_position = offset_;
|
||||
file_offset_of_buffer_end = offset_;
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -207,22 +222,22 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
|
||||
|
||||
pos = working_buffer.end();
|
||||
|
||||
/// Note: we read in range [absolute_position, last_offset).
|
||||
if (absolute_position < last_offset
|
||||
&& static_cast<off_t>(absolute_position) >= getPosition()
|
||||
&& static_cast<off_t>(absolute_position) < getPosition() + static_cast<off_t>(min_bytes_for_seek))
|
||||
/// Note: we read in range [file_offset_of_buffer_end, read_until_position).
|
||||
if (file_offset_of_buffer_end < read_until_position
|
||||
&& static_cast<off_t>(file_offset_of_buffer_end) >= getPosition()
|
||||
&& static_cast<off_t>(file_offset_of_buffer_end) < getPosition() + static_cast<off_t>(min_bytes_for_seek))
|
||||
{
|
||||
/**
|
||||
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
|
||||
*/
|
||||
bytes_to_ignore = absolute_position - getPosition();
|
||||
bytes_to_ignore = file_offset_of_buffer_end - getPosition();
|
||||
}
|
||||
else
|
||||
{
|
||||
impl->seek(absolute_position); /// SEEK_SET.
|
||||
impl->reset();
|
||||
}
|
||||
|
||||
return absolute_position;
|
||||
return file_offset_of_buffer_end;
|
||||
}
|
||||
|
||||
|
||||
|
@ -41,7 +41,7 @@ public:
|
||||
|
||||
off_t seek(off_t offset_, int whence) override;
|
||||
|
||||
off_t getPosition() override { return absolute_position - available(); }
|
||||
off_t getPosition() override { return file_offset_of_buffer_end - available(); }
|
||||
|
||||
String getFileName() const override;
|
||||
|
||||
@ -54,6 +54,8 @@ private:
|
||||
|
||||
void finalize();
|
||||
|
||||
size_t getNumBytesToRead();
|
||||
|
||||
std::future<IAsynchronousReader::Result> readInto(char * data, size_t size);
|
||||
|
||||
AsynchronousReaderPtr reader;
|
||||
@ -64,7 +66,7 @@ private:
|
||||
|
||||
std::future<IAsynchronousReader::Result> prefetch_future;
|
||||
|
||||
size_t absolute_position = 0;
|
||||
size_t file_offset_of_buffer_end = 0;
|
||||
|
||||
Memory<> prefetch_buffer;
|
||||
|
||||
@ -72,7 +74,7 @@ private:
|
||||
|
||||
size_t bytes_to_ignore = 0;
|
||||
|
||||
size_t last_offset = 0;
|
||||
std::optional<size_t> read_until_position = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ void ReadBufferFromRemoteFSGather::initialize()
|
||||
/// Do not create a new buffer if we already have what we need.
|
||||
if (!current_buf || current_buf_idx != i)
|
||||
{
|
||||
current_buf = createImplementationBuffer(file_path, last_offset);
|
||||
current_buf = createImplementationBuffer(file_path, read_until_position);
|
||||
current_buf_idx = i;
|
||||
}
|
||||
|
||||
@ -123,7 +123,7 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
|
||||
++current_buf_idx;
|
||||
|
||||
const auto & current_path = metadata.remote_fs_objects[current_buf_idx].first;
|
||||
current_buf = createImplementationBuffer(current_path, last_offset);
|
||||
current_buf = createImplementationBuffer(current_path, read_until_position);
|
||||
|
||||
return readImpl();
|
||||
}
|
||||
@ -141,7 +141,6 @@ bool ReadBufferFromRemoteFSGather::readImpl()
|
||||
if (bytes_to_ignore)
|
||||
current_buf->ignore(bytes_to_ignore);
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Reading from path: {}", canonical_path);
|
||||
auto result = current_buf->next();
|
||||
|
||||
swap(*current_buf);
|
||||
@ -153,18 +152,10 @@ bool ReadBufferFromRemoteFSGather::readImpl()
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRemoteFSGather::seek(off_t offset)
|
||||
{
|
||||
current_buf.reset();
|
||||
absolute_position = offset;
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRemoteFSGather::setReadUntilPosition(size_t position)
|
||||
{
|
||||
assert(last_offset < position);
|
||||
current_buf.reset();
|
||||
last_offset = position;
|
||||
read_until_position = position;
|
||||
reset();
|
||||
}
|
||||
|
||||
|
||||
|
@ -19,6 +19,10 @@ class S3Client;
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/**
|
||||
* Remote disk might need to split one clickhouse file into multiple files in remote fs.
|
||||
* This class works like a proxy to allow transition from one file into multiple.
|
||||
*/
|
||||
class ReadBufferFromRemoteFSGather : public ReadBuffer
|
||||
{
|
||||
friend class ReadIndirectBufferFromRemoteFS;
|
||||
@ -30,8 +34,6 @@ public:
|
||||
|
||||
void reset();
|
||||
|
||||
void seek(off_t offset); /// SEEK_SET only.
|
||||
|
||||
void setReadUntilPosition(size_t position) override;
|
||||
|
||||
size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
|
||||
@ -56,7 +58,7 @@ private:
|
||||
|
||||
size_t bytes_to_ignore = 0;
|
||||
|
||||
size_t last_offset = 0;
|
||||
size_t read_until_position = 0;
|
||||
|
||||
String canonical_path;
|
||||
};
|
||||
@ -84,7 +86,7 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const override;
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr;
|
||||
@ -114,7 +116,7 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const override;
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const override;
|
||||
|
||||
private:
|
||||
String uri;
|
||||
@ -144,7 +146,7 @@ public:
|
||||
hdfs_uri = hdfs_uri_.substr(0, begin_of_path);
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const override;
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const override;
|
||||
|
||||
private:
|
||||
const Poco::Util::AbstractConfiguration & config;
|
||||
|
@ -49,31 +49,44 @@ std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescripto
|
||||
}
|
||||
|
||||
|
||||
void AsynchronousReadBufferFromFileDescriptor::prefetch()
|
||||
size_t AsynchronousReadBufferFromFileDescriptor::getNumBytesToRead()
|
||||
{
|
||||
if (prefetch_future.valid())
|
||||
return;
|
||||
size_t num_bytes_to_read;
|
||||
|
||||
size_t read_size;
|
||||
/// Position is set only for MergeTree tables.
|
||||
if (read_until_position)
|
||||
{
|
||||
/// Everything is already read.
|
||||
if (file_offset_of_buffer_end == *read_until_position)
|
||||
return;
|
||||
return 0;
|
||||
|
||||
if (file_offset_of_buffer_end > *read_until_position)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
|
||||
file_offset_of_buffer_end, *read_until_position);
|
||||
|
||||
/// Read range [file_offset_of_buffer_end, read_until_position).
|
||||
read_size = *read_until_position - file_offset_of_buffer_end - 1;
|
||||
num_bytes_to_read = *read_until_position - file_offset_of_buffer_end;
|
||||
num_bytes_to_read = std::min(num_bytes_to_read, internal_buffer.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
read_size = internal_buffer.size();
|
||||
num_bytes_to_read = internal_buffer.size();
|
||||
}
|
||||
|
||||
prefetch_buffer.resize(read_size);
|
||||
return num_bytes_to_read;
|
||||
}
|
||||
|
||||
|
||||
void AsynchronousReadBufferFromFileDescriptor::prefetch()
|
||||
{
|
||||
if (prefetch_future.valid())
|
||||
return;
|
||||
|
||||
auto num_bytes_to_read = getNumBytesToRead();
|
||||
if (!num_bytes_to_read)
|
||||
return;
|
||||
|
||||
prefetch_buffer.resize(num_bytes_to_read);
|
||||
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
|
||||
}
|
||||
|
||||
@ -89,17 +102,6 @@ void AsynchronousReadBufferFromFileDescriptor::setReadUntilPosition(size_t posit
|
||||
|
||||
bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
|
||||
{
|
||||
if (read_until_position)
|
||||
{
|
||||
/// Everything is already read.
|
||||
if (file_offset_of_buffer_end == *read_until_position)
|
||||
return false;
|
||||
|
||||
if (file_offset_of_buffer_end > *read_until_position)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
|
||||
file_offset_of_buffer_end, *read_until_position);
|
||||
}
|
||||
|
||||
if (prefetch_future.valid())
|
||||
{
|
||||
/// Read request already in flight. Wait for its completion.
|
||||
@ -127,9 +129,13 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
|
||||
}
|
||||
else
|
||||
{
|
||||
auto num_bytes_to_read = getNumBytesToRead();
|
||||
if (!num_bytes_to_read) /// Nothing to read.
|
||||
return false;
|
||||
|
||||
/// No pending request. Do synchronous read.
|
||||
|
||||
auto size = readInto(memory.data(), memory.size()).get();
|
||||
auto size = readInto(memory.data(), num_bytes_to_read).get();
|
||||
file_offset_of_buffer_end += size;
|
||||
|
||||
if (size)
|
||||
|
@ -67,6 +67,8 @@ public:
|
||||
|
||||
private:
|
||||
std::future<IAsynchronousReader::Result> readInto(char * data, size_t size);
|
||||
|
||||
size_t getNumBytesToRead();
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ namespace ErrorCodes
|
||||
|
||||
ReadBufferFromS3::ReadBufferFromS3(
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_,
|
||||
UInt64 max_single_read_retries_, const ReadSettings & settings_, bool use_external_buffer_, size_t last_offset_)
|
||||
UInt64 max_single_read_retries_, const ReadSettings & settings_, bool use_external_buffer_, size_t read_until_position_)
|
||||
: SeekableReadBuffer(nullptr, 0)
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, bucket(bucket_)
|
||||
@ -43,19 +43,19 @@ ReadBufferFromS3::ReadBufferFromS3(
|
||||
, max_single_read_retries(max_single_read_retries_)
|
||||
, read_settings(settings_)
|
||||
, use_external_buffer(use_external_buffer_)
|
||||
, last_offset(last_offset_)
|
||||
, read_until_position(read_until_position_)
|
||||
{
|
||||
}
|
||||
|
||||
bool ReadBufferFromS3::nextImpl()
|
||||
{
|
||||
if (last_offset)
|
||||
if (read_until_position)
|
||||
{
|
||||
if (last_offset == offset)
|
||||
if (read_until_position == offset)
|
||||
return false;
|
||||
|
||||
if (last_offset < offset)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, last_offset - 1);
|
||||
if (read_until_position < offset)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
|
||||
}
|
||||
|
||||
bool next_result = false;
|
||||
@ -171,13 +171,13 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
|
||||
if (last_offset)
|
||||
if (read_until_position)
|
||||
{
|
||||
if (offset >= last_offset)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, last_offset - 1);
|
||||
if (offset >= read_until_position)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
|
||||
|
||||
req.SetRange(fmt::format("bytes={}-{}", offset, last_offset - 1));
|
||||
LOG_DEBUG(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, last_offset - 1);
|
||||
req.SetRange(fmt::format("bytes={}-{}", offset, read_until_position - 1));
|
||||
LOG_DEBUG(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, read_until_position - 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -46,7 +46,7 @@ public:
|
||||
UInt64 max_single_read_retries_,
|
||||
const ReadSettings & settings_,
|
||||
bool use_external_buffer = false,
|
||||
size_t last_offset_ = 0);
|
||||
size_t read_until_position_ = 0);
|
||||
|
||||
size_t right = 0;
|
||||
bool nextImpl() override;
|
||||
@ -59,7 +59,7 @@ private:
|
||||
|
||||
ReadSettings read_settings;
|
||||
bool use_external_buffer;
|
||||
off_t last_offset;
|
||||
off_t read_until_position = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
extern const int CANNOT_SEEK_THROUGH_FILE;
|
||||
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
|
||||
@ -33,16 +34,18 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
|
||||
off_t offset = 0;
|
||||
bool initialized = false;
|
||||
off_t read_until_position = 0;
|
||||
|
||||
explicit ReadBufferFromHDFSImpl(
|
||||
const std::string & hdfs_uri_,
|
||||
const std::string & hdfs_file_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
size_t buf_size_)
|
||||
size_t buf_size_, size_t read_until_position_)
|
||||
: BufferWithOwnMemory<SeekableReadBuffer>(buf_size_)
|
||||
, hdfs_uri(hdfs_uri_)
|
||||
, hdfs_file_path(hdfs_file_path_)
|
||||
, builder(createHDFSBuilder(hdfs_uri_, config_))
|
||||
, read_until_position(read_until_position_)
|
||||
{
|
||||
std::lock_guard lock(hdfs_init_mutex);
|
||||
|
||||
@ -79,7 +82,23 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
int bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), internal_buffer.size());
|
||||
size_t num_bytes_to_read;
|
||||
if (read_until_position)
|
||||
{
|
||||
if (read_until_position == offset)
|
||||
return false;
|
||||
|
||||
if (read_until_position < offset)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
|
||||
|
||||
num_bytes_to_read = read_until_position - offset;
|
||||
}
|
||||
else
|
||||
{
|
||||
num_bytes_to_read = internal_buffer.size();
|
||||
}
|
||||
|
||||
int bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), num_bytes_to_read);
|
||||
if (bytes_read < 0)
|
||||
throw Exception(ErrorCodes::NETWORK_ERROR,
|
||||
"Fail to read from HDFS: {}, file path: {}. Error: {}",
|
||||
@ -125,9 +144,9 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
|
||||
const String & hdfs_uri_,
|
||||
const String & hdfs_file_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
size_t buf_size_, size_t)
|
||||
size_t buf_size_, size_t read_until_position_)
|
||||
: SeekableReadBuffer(nullptr, 0)
|
||||
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_uri_, hdfs_file_path_, config_, buf_size_))
|
||||
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_uri_, hdfs_file_path_, config_, buf_size_, read_until_position_))
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@ public:
|
||||
ReadBufferFromHDFS(const String & hdfs_uri_, const String & hdfs_file_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
size_t last_offset = 0);
|
||||
size_t read_until_position_ = 0);
|
||||
|
||||
~ReadBufferFromHDFS() override;
|
||||
|
||||
|
@ -30,7 +30,8 @@ public:
|
||||
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{});
|
||||
|
||||
/// Return the number of rows has been read or zero if there is no columns to read.
|
||||
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
|
||||
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark.
|
||||
/// current_task_last mark is needed for asynchronous reading (mainly from remote fs).
|
||||
virtual size_t readRows(size_t from_mark, size_t current_task_last_mark,
|
||||
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) = 0;
|
||||
|
||||
|
@ -54,15 +54,9 @@ MergeTreeIndexReader::MergeTreeIndexReader(
|
||||
std::move(settings));
|
||||
version = index_format.version;
|
||||
|
||||
auto current_task_last_mark_range = std::max_element(all_mark_ranges_.begin(), all_mark_ranges_.end(),
|
||||
[&](const MarkRange & range1, const MarkRange & range2)
|
||||
{
|
||||
return range1.end < range2.end;
|
||||
});
|
||||
|
||||
size_t current_task_last_mark = 0;
|
||||
if (current_task_last_mark_range != all_mark_ranges_.end())
|
||||
current_task_last_mark = current_task_last_mark_range->end;
|
||||
for (const auto mark_range : all_mark_ranges_)
|
||||
current_task_last_mark = std::max(current_task_last_mark, mark_range.end);
|
||||
|
||||
stream->adjustForRange(0, current_task_last_mark);
|
||||
stream->seekToStart();
|
||||
|
@ -754,15 +754,9 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
|
||||
ReadResult result;
|
||||
result.columns.resize(merge_tree_reader->getColumns().size());
|
||||
|
||||
auto current_task_last_mark_range = std::max_element(ranges.begin(), ranges.end(),
|
||||
[&](const MarkRange & range1, const MarkRange & range2)
|
||||
{
|
||||
return range1.end < range2.end;
|
||||
});
|
||||
|
||||
size_t current_task_last_mark = 0;
|
||||
if (current_task_last_mark_range != ranges.end())
|
||||
current_task_last_mark = current_task_last_mark_range->end;
|
||||
for (const auto mark_range : ranges)
|
||||
current_task_last_mark = std::max(current_task_last_mark, mark_range.end);
|
||||
|
||||
/// Stream is lazy. result.num_added_rows is the number of rows added to block which is not equal to
|
||||
/// result.num_rows_read until call to stream.finalize(). Also result.num_added_rows may be less than
|
||||
@ -821,15 +815,9 @@ Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t &
|
||||
const auto & rows_per_granule = result.rowsPerGranule();
|
||||
const auto & started_ranges = result.startedRanges();
|
||||
|
||||
auto current_task_last_mark_range = std::max_element(started_ranges.begin(), started_ranges.end(),
|
||||
[&](const ReadResult::RangeInfo & lhs, const ReadResult::RangeInfo & rhs)
|
||||
{
|
||||
return lhs.range.end < rhs.range.end;
|
||||
});
|
||||
|
||||
size_t current_task_last_mark = 0;
|
||||
if (current_task_last_mark_range != started_ranges.end())
|
||||
current_task_last_mark = current_task_last_mark_range->range.end;
|
||||
for (const auto mark_range : started_ranges)
|
||||
current_task_last_mark = std::max(current_task_last_mark, mark_range.range.end);
|
||||
|
||||
size_t next_range_to_start = 0;
|
||||
|
||||
|
@ -42,13 +42,7 @@ MergeTreeReadPool::MergeTreeReadPool(
|
||||
{
|
||||
/// parts don't contain duplicate MergeTreeDataPart's.
|
||||
const auto per_part_sum_marks = fillPerPartInfo(parts_ranges, check_columns_);
|
||||
auto min_marks_for_concurrent_read = min_marks_for_concurrent_read_;
|
||||
if (stored_on_remote_disk)
|
||||
{
|
||||
do_not_steal_tasks = true;
|
||||
min_marks_for_concurrent_read = std::max(min_marks_for_concurrent_read, sum_marks_ / threads_);
|
||||
}
|
||||
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read);
|
||||
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read_);
|
||||
}
|
||||
|
||||
|
||||
@ -96,7 +90,7 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
|
||||
auto & marks_in_part = thread_tasks.sum_marks_in_parts.back();
|
||||
|
||||
size_t need_marks;
|
||||
if (stored_on_remote_disk) /// For better performance with remote disks
|
||||
if (is_part_on_remote_disk[part_idx]) /// For better performance with remote disks
|
||||
need_marks = marks_in_part;
|
||||
else /// Get whole part to read if it is small enough.
|
||||
need_marks = std::min(marks_in_part, min_marks_to_read);
|
||||
@ -201,14 +195,12 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
|
||||
{
|
||||
std::vector<size_t> per_part_sum_marks;
|
||||
Block sample_block = metadata_snapshot->getSampleBlock();
|
||||
is_part_on_remote_disk.resize(parts.size());
|
||||
|
||||
for (const auto i : collections::range(0, parts.size()))
|
||||
{
|
||||
const auto & part = parts[i];
|
||||
|
||||
/// Turn off tasks stealing in case there is remote disk.
|
||||
if (part.data_part->isStoredOnRemoteDisk())
|
||||
stored_on_remote_disk = true;
|
||||
is_part_on_remote_disk[i] = part.data_part->isStoredOnRemoteDisk();
|
||||
|
||||
/// Read marks for every data part.
|
||||
size_t sum_marks = 0;
|
||||
|
@ -137,7 +137,7 @@ private:
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("MergeTreeReadPool");
|
||||
|
||||
bool stored_on_remote_disk = false;
|
||||
std::vector<bool> is_part_on_remote_disk;
|
||||
};
|
||||
|
||||
using MergeTreeReadPoolPtr = std::shared_ptr<MergeTreeReadPool>;
|
||||
|
Loading…
Reference in New Issue
Block a user