Add support for exact right bounded reads for some other buffers

This commit is contained in:
kssenii 2022-04-26 12:55:27 +02:00
parent 1b008b0014
commit a395532d54
11 changed files with 71 additions and 5 deletions

View File

@ -619,6 +619,7 @@
M(648, WRONG_DDL_RENAMING_SETTINGS) \
M(649, INVALID_TRANSACTION) \
M(650, SERIALIZATION_ERROR) \
M(651, CANNOT_USE_CACHE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -205,7 +205,7 @@ public:
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3
virtual bool removeSharedFileIfExists(const String & path, bool) { return removeFileIfExists(path); }
virtual const String & getCacheBasePath() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is not cache path"); }
virtual const String & getCacheBasePath() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no cache path"); }
virtual bool isCached() const { return false; }

View File

@ -20,6 +20,7 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_USE_CACHE;
extern const int LOGICAL_ERROR;
}
@ -107,6 +108,12 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
return remote_fs_segment_reader;
remote_fs_segment_reader = remote_file_reader_creator();
if (!remote_fs_segment_reader->supportsRightBoundedReads())
throw Exception(
ErrorCodes::CANNOT_USE_CACHE,
"Cache cannot be used with a ReadBuffer which does not support right bounded reads");
file_segment->setRemoteFileReader(remote_fs_segment_reader);
return remote_fs_segment_reader;

View File

@ -88,7 +88,13 @@ bool ReadBufferFromEncryptedFile::nextImpl()
size_t bytes_read = 0;
while (bytes_read < encrypted_buffer.size() && !in->eof())
{
bytes_read += in->read(encrypted_buffer.data() + bytes_read, encrypted_buffer.size() - bytes_read);
size_t bytes_to_read = encrypted_buffer.size() - bytes_read;
if (read_until_position)
{
assert(*read_until_position > offset);
bytes_to_read = std::min<off_t>(bytes_to_read, *read_until_position - offset);
}
bytes_read += in->read(encrypted_buffer.data() + bytes_read, bytes_to_read);
}
/// The used cipher algorithms generate the same number of bytes in output as it were in input,
@ -97,9 +103,15 @@ bool ReadBufferFromEncryptedFile::nextImpl()
encryptor.decrypt(encrypted_buffer.data(), bytes_read, working_buffer.begin());
pos = working_buffer.begin();
offset += bytes_read;
return true;
}
void ReadBufferFromEncryptedFile::setReadUntilPosition(size_t position)
{
read_until_position = position;
}
}
#endif

View File

@ -26,12 +26,18 @@ public:
std::string getFileName() const override { return in->getFileName(); }
bool supportsRightBoundedReads() const override { return true; }
void setReadUntilPosition(size_t position) override;
private:
bool nextImpl() override;
std::unique_ptr<ReadBufferFromFileBase> in;
off_t offset = 0;
std::optional<off_t> read_until_position;
bool need_seek = false;
Memory<> encrypted_buffer;

View File

@ -69,10 +69,17 @@ bool ReadBufferFromFileDescriptor::nextImpl()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
size_t bytes_to_read = internal_buffer.size();
if (read_until_position)
{
assert(*read_until_position > file_offset_of_buffer_end);
bytes_to_read = std::min(bytes_to_read, *read_until_position - file_offset_of_buffer_end);
}
if (use_pread)
res = ::pread(fd, internal_buffer.begin(), internal_buffer.size(), file_offset_of_buffer_end);
res = ::pread(fd, internal_buffer.begin(), bytes_to_read, file_offset_of_buffer_end);
else
res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
res = ::read(fd, internal_buffer.begin(), bytes_to_read);
}
if (!res)
break;
@ -272,4 +279,9 @@ void ReadBufferFromFileDescriptor::setProgressCallback(ContextPtr context)
});
}
void ReadBufferFromFileDescriptor::setReadUntilPosition(size_t position)
{
read_until_position = position;
}
}

View File

@ -18,6 +18,8 @@ protected:
bool use_pread = false; /// To access one fd from multiple threads, use 'pread' syscall instead of 'read'.
size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end().
std::optional<size_t> read_until_position; /// For right bounded reads.
int fd;
bool nextImpl() override;
@ -59,6 +61,10 @@ public:
void setProgressCallback(ContextPtr context);
bool supportsRightBoundedReads() const override { return true; }
void setReadUntilPosition(size_t position) override;
private:
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
bool poll(size_t timeout_microseconds);

View File

@ -70,6 +70,8 @@ public:
size_t getFileOffsetOfBufferEnd() const override { return offset; }
bool supportsRightBoundedReads() const override { return true; }
private:
std::unique_ptr<ReadBuffer> initialize();

View File

@ -58,6 +58,8 @@ public:
virtual String getInfoForLog() { return ""; }
virtual size_t getFileOffsetOfBufferEnd() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getFileOffsetOfBufferEnd() not implemented"); }
virtual bool supportsRightBoundedReads() const { return false; }
};
using SeekableReadBufferPtr = std::shared_ptr<SeekableReadBuffer>;

View File

@ -1216,7 +1216,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
if (disk->isBroken())
if (disk->isBroken() || disk->isCached())
continue;
if (!defined_disk_names.contains(disk_name) && disk->exists(relative_data_path))

View File

@ -3,6 +3,7 @@
<disks>
<s3_disk>
<type>s3</type>
<path>s3_disk/</path>
<endpoint>http://localhost:11111/test/00170_test/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
@ -14,6 +15,16 @@
<max_size>22548578304</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
</s3_cache>
<local_disk>
<type>local</type>
<path>local_disk/</path>
</local_disk>
<local_cache>
<type>cache</type>
<disk>local_disk</disk>
<max_size>22548578304</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
</local_cache>
</disks>
<policies>
<s3_cache>
@ -23,6 +34,13 @@
</main>
</volumes>
</s3_cache>
<local_cache>
<volumes>
<main>
<disk>local_cache</disk>
</main>
</volumes>
</local_cache>
</policies>
</storage_configuration>
</clickhouse>