mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Accept real file size in createReadBufferFromFileBase()
Right now streams relies on correct file size not the number of bytes that will be read from the stream, to overcome one bug in the linux kernel that may return EIINVAL for pread() with offset pass the EOF. v2: Swap read_hint and file_size (since it is easy to miss something) Before the first argument to readFile()/createReadBufferFromFileBase() was read_hint, not the file_size, and let's preserve the order, since it is easy to miss something This will also fix 02051_read_settings test automatically because now MergeTreeReaderStream will pass estimated_sum_mark_range_bytes to read_hint not file_size, previously it cause on of the following errors: - Attempt to read after EOF w/ O_DIRECT - and LOGICAL_ERROR while adjusting granulas w/o O_DIRECT This will also improve zero-length reads guard (via ReadBufferFromEmptyFile), that had been added in #30190 v3: fix for other storages that wasn't enabled in fast-test v4: ignore ENOENT/ENOTSUP in readFile
This commit is contained in:
parent
e548dae8d9
commit
71a99ab27c
@ -62,7 +62,8 @@ DiskAzureBlobStorage::DiskAzureBlobStorage(
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskAzureBlobStorage::readFile(
|
||||
const String & path,
|
||||
const ReadSettings & read_settings,
|
||||
std::optional<size_t> /*estimated_size*/) const
|
||||
std::optional<size_t>,
|
||||
std::optional<size_t>) const
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
auto metadata = readMeta(path);
|
||||
|
@ -50,7 +50,8 @@ public:
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> estimated_size) const override;
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const override;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
|
@ -86,15 +86,16 @@ std::unique_ptr<ReadBufferFromFileBase>
|
||||
DiskCacheWrapper::readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size) const
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const
|
||||
{
|
||||
if (!cache_file_predicate(path))
|
||||
return DiskDecorator::readFile(path, settings, size);
|
||||
return DiskDecorator::readFile(path, settings, read_hint, file_size);
|
||||
|
||||
LOG_TEST(log, "Read file {} from cache", backQuote(path));
|
||||
|
||||
if (cache_disk->exists(path))
|
||||
return cache_disk->readFile(path, settings, size);
|
||||
return cache_disk->readFile(path, settings, read_hint, file_size);
|
||||
|
||||
auto metadata = acquireDownloadMetadata(path);
|
||||
|
||||
@ -128,7 +129,7 @@ DiskCacheWrapper::readFile(
|
||||
|
||||
auto tmp_path = path + ".tmp";
|
||||
{
|
||||
auto src_buffer = DiskDecorator::readFile(path, settings, size);
|
||||
auto src_buffer = DiskDecorator::readFile(path, settings, read_hint, file_size);
|
||||
auto dst_buffer = cache_disk->writeFile(tmp_path, settings.local_fs_buffer_size, WriteMode::Rewrite);
|
||||
copyData(*src_buffer, *dst_buffer);
|
||||
}
|
||||
@ -152,9 +153,9 @@ DiskCacheWrapper::readFile(
|
||||
}
|
||||
|
||||
if (metadata->status == DOWNLOADED)
|
||||
return cache_disk->readFile(path, settings, size);
|
||||
return cache_disk->readFile(path, settings, read_hint, file_size);
|
||||
|
||||
return DiskDecorator::readFile(path, settings, size);
|
||||
return DiskDecorator::readFile(path, settings, read_hint, file_size);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase>
|
||||
@ -174,7 +175,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
|
||||
[this, path, buf_size, mode]()
|
||||
{
|
||||
/// Copy file from cache to actual disk when cached buffer is finalized.
|
||||
auto src_buffer = cache_disk->readFile(path, ReadSettings(), /* size= */ {});
|
||||
auto src_buffer = cache_disk->readFile(path, ReadSettings(), /* read_hint= */ {}, /* file_size= */ {});
|
||||
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode);
|
||||
copyData(*src_buffer, *dst_buffer);
|
||||
dst_buffer->finalize();
|
||||
|
@ -37,7 +37,8 @@ public:
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size) const override;
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const override;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
|
||||
|
||||
|
@ -115,9 +115,9 @@ void DiskDecorator::listFiles(const String & path, std::vector<String> & file_na
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase>
|
||||
DiskDecorator::readFile(
|
||||
const String & path, const ReadSettings & settings, std::optional<size_t> size) const
|
||||
const String & path, const ReadSettings & settings, std::optional<size_t> read_hint, std::optional<size_t> file_size) const
|
||||
{
|
||||
return delegate->readFile(path, settings, size);
|
||||
return delegate->readFile(path, settings, read_hint, file_size);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase>
|
||||
|
@ -38,7 +38,8 @@ public:
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size) const override;
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const override;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
|
@ -252,10 +252,11 @@ void DiskEncrypted::copy(const String & from_path, const std::shared_ptr<IDisk>
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size) const
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
auto buffer = delegate->readFile(wrapped_path, settings, size);
|
||||
auto buffer = delegate->readFile(wrapped_path, settings, read_hint, file_size);
|
||||
if (buffer->eof())
|
||||
{
|
||||
/// File is empty, that's a normal case, see DiskEncrypted::truncateFile().
|
||||
|
@ -120,7 +120,8 @@ public:
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size) const override;
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const override;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
|
@ -86,6 +86,22 @@ static void loadDiskLocalConfig(const String & name,
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<size_t> fileSizeSafe(const fs::path & path)
|
||||
{
|
||||
std::error_code ec;
|
||||
|
||||
size_t size = fs::file_size(path, ec);
|
||||
if (!ec)
|
||||
return size;
|
||||
|
||||
if (ec == std::errc::no_such_file_or_directory)
|
||||
return std::nullopt;
|
||||
if (ec == std::errc::operation_not_supported)
|
||||
return std::nullopt;
|
||||
|
||||
throw fs::filesystem_error("DiskLocal", path, ec);
|
||||
}
|
||||
|
||||
class DiskLocalReservation : public IReservation
|
||||
{
|
||||
public:
|
||||
@ -269,9 +285,11 @@ void DiskLocal::replaceFile(const String & from_path, const String & to_path)
|
||||
fs::rename(from_file, to_file);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskLocal::readFile(const String & path, const ReadSettings & settings, std::optional<size_t> size) const
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskLocal::readFile(const String & path, const ReadSettings & settings, std::optional<size_t> read_hint, std::optional<size_t> file_size) const
|
||||
{
|
||||
return createReadBufferFromFileBase(fs::path(disk_path) / path, settings, size);
|
||||
if (!file_size.has_value())
|
||||
file_size = fileSizeSafe(fs::path(disk_path) / path);
|
||||
return createReadBufferFromFileBase(fs::path(disk_path) / path, settings, read_hint, file_size);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase>
|
||||
|
@ -74,7 +74,8 @@ public:
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size) const override;
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const override;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
|
@ -315,7 +315,7 @@ void DiskMemory::replaceFileImpl(const String & from_path, const String & to_pat
|
||||
files.insert(std::move(node));
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, const ReadSettings &, std::optional<size_t>) const
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, const ReadSettings &, std::optional<size_t>, std::optional<size_t>) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
|
@ -65,7 +65,8 @@ public:
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size) const override;
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const override;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
|
@ -190,10 +190,10 @@ void DiskRestartProxy::listFiles(const String & path, std::vector<String> & file
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskRestartProxy::readFile(
|
||||
const String & path, const ReadSettings & settings, std::optional<size_t> size) const
|
||||
const String & path, const ReadSettings & settings, std::optional<size_t> read_hint, std::optional<size_t> file_size) const
|
||||
{
|
||||
ReadLock lock (mutex);
|
||||
auto impl = DiskDecorator::readFile(path, settings, size);
|
||||
auto impl = DiskDecorator::readFile(path, settings, read_hint, file_size);
|
||||
return std::make_unique<RestartAwareReadBuffer>(*this, std::move(impl));
|
||||
}
|
||||
|
||||
|
@ -46,7 +46,8 @@ public:
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size) const override;
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const override;
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
|
||||
void removeFile(const String & path) override;
|
||||
void removeFileIfExists(const String & path) override;
|
||||
|
@ -154,7 +154,7 @@ bool DiskWebServer::exists(const String & path) const
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & path, const ReadSettings & read_settings, std::optional<size_t>) const
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & path, const ReadSettings & read_settings, std::optional<size_t>, std::optional<size_t>) const
|
||||
{
|
||||
LOG_TRACE(log, "Read from path: {}", path);
|
||||
auto iter = files.find(path);
|
||||
|
@ -63,7 +63,8 @@ public:
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size) const override;
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const override;
|
||||
|
||||
/// Disk info
|
||||
|
||||
|
@ -71,7 +71,7 @@ DiskHDFS::DiskHDFS(
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path, const ReadSettings & read_settings, std::optional<size_t>) const
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path, const ReadSettings & read_settings, std::optional<size_t>, std::optional<size_t>) const
|
||||
{
|
||||
auto metadata = readMeta(path);
|
||||
|
||||
|
@ -53,7 +53,8 @@ public:
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size) const override;
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const override;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
|
||||
|
||||
|
@ -161,7 +161,8 @@ public:
|
||||
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings = ReadSettings{},
|
||||
std::optional<size_t> size = {}) const = 0;
|
||||
std::optional<size_t> read_hint = {},
|
||||
std::optional<size_t> file_size = {}) const = 0;
|
||||
|
||||
/// Open the file for write and return WriteBufferFromFileBase object.
|
||||
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
|
@ -214,7 +214,7 @@ void DiskS3::moveFile(const String & from_path, const String & to_path, bool sen
|
||||
metadata_disk->moveFile(from_path, to_path);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, const ReadSettings & read_settings, std::optional<size_t>) const
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, const ReadSettings & read_settings, std::optional<size_t>, std::optional<size_t>) const
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
auto metadata = readMeta(path);
|
||||
|
@ -76,7 +76,8 @@ public:
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size) const override;
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const override;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
|
@ -57,7 +57,7 @@ protected:
|
||||
|
||||
String getFileContents(const String & file_name)
|
||||
{
|
||||
auto buf = encrypted_disk->readFile(file_name, /* settings= */ {}, /* size= */ {});
|
||||
auto buf = encrypted_disk->readFile(file_name, /* settings= */ {}, /* read_hint= */ {}, /* file_size= */ {});
|
||||
String str;
|
||||
readStringUntilEOF(str, *buf);
|
||||
return str;
|
||||
|
@ -53,7 +53,7 @@ TEST(DiskTestHDFS, WriteReadHDFS)
|
||||
|
||||
{
|
||||
DB::String result;
|
||||
auto in = disk.readFile(file_name, {}, 1024);
|
||||
auto in = disk.readFile(file_name, {}, 1024, 1024);
|
||||
readString(result, *in);
|
||||
EXPECT_EQ("Test write to file", result);
|
||||
}
|
||||
@ -76,7 +76,7 @@ TEST(DiskTestHDFS, RewriteFileHDFS)
|
||||
|
||||
{
|
||||
String result;
|
||||
auto in = disk.readFile(file_name, {}, 1024);
|
||||
auto in = disk.readFile(file_name, {}, 1024, 1024);
|
||||
readString(result, *in);
|
||||
EXPECT_EQ("Text10", result);
|
||||
readString(result, *in);
|
||||
@ -104,7 +104,7 @@ TEST(DiskTestHDFS, AppendFileHDFS)
|
||||
|
||||
{
|
||||
String result, expected;
|
||||
auto in = disk.readFile(file_name, {}, 1024);
|
||||
auto in = disk.readFile(file_name, {}, 1024, 1024);
|
||||
|
||||
readString(result, *in);
|
||||
EXPECT_EQ("Text0123456789", result);
|
||||
@ -131,7 +131,7 @@ TEST(DiskTestHDFS, SeekHDFS)
|
||||
/// Test SEEK_SET
|
||||
{
|
||||
String buf(4, '0');
|
||||
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, {}, 1024);
|
||||
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, {}, 1024, 1024);
|
||||
|
||||
in->seek(5, SEEK_SET);
|
||||
|
||||
@ -141,7 +141,7 @@ TEST(DiskTestHDFS, SeekHDFS)
|
||||
|
||||
/// Test SEEK_CUR
|
||||
{
|
||||
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, {}, 1024);
|
||||
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, {}, 1024, 1024);
|
||||
String buf(4, '0');
|
||||
|
||||
in->readStrict(buf.data(), 4);
|
||||
|
@ -72,7 +72,7 @@ public:
|
||||
void rewind();
|
||||
|
||||
private:
|
||||
std::future<IAsynchronousReader::Result> readInto(char * data, size_t file_size_);
|
||||
std::future<IAsynchronousReader::Result> readInto(char * data, size_t size);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -29,14 +29,20 @@ namespace ErrorCodes
|
||||
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
||||
const std::string & filename,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size,
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size,
|
||||
int flags,
|
||||
char * existing_memory,
|
||||
size_t alignment)
|
||||
{
|
||||
if (size.has_value() && !*size)
|
||||
if (file_size.has_value() && !*file_size)
|
||||
return std::make_unique<ReadBufferFromEmptyFile>();
|
||||
size_t estimated_size = size.has_value() ? *size : 0;
|
||||
|
||||
size_t estimated_size = 0;
|
||||
if (read_hint.has_value())
|
||||
estimated_size = *read_hint;
|
||||
else if (file_size.has_value())
|
||||
estimated_size = file_size.has_value() ? *file_size : 0;
|
||||
|
||||
if (!existing_memory
|
||||
&& settings.local_fs_method == LocalFSReadMethod::mmap
|
||||
@ -63,23 +69,23 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
||||
|
||||
if (settings.local_fs_method == LocalFSReadMethod::read)
|
||||
{
|
||||
res = std::make_unique<ReadBufferFromFile>(filename, buffer_size, actual_flags, existing_memory, alignment, size);
|
||||
res = std::make_unique<ReadBufferFromFile>(filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
|
||||
}
|
||||
else if (settings.local_fs_method == LocalFSReadMethod::pread || settings.local_fs_method == LocalFSReadMethod::mmap)
|
||||
{
|
||||
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, alignment, size);
|
||||
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
|
||||
}
|
||||
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
|
||||
{
|
||||
static AsynchronousReaderPtr reader = std::make_shared<SynchronousReader>();
|
||||
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
||||
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, size);
|
||||
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
|
||||
}
|
||||
else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool)
|
||||
{
|
||||
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolReader>(16, 1000000);
|
||||
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
||||
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, size);
|
||||
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
|
||||
}
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown read method");
|
||||
|
@ -11,12 +11,14 @@ namespace DB
|
||||
|
||||
/** Create an object to read data from a file.
|
||||
*
|
||||
* @param size - the number of bytes to read
|
||||
* @param read_hint - the number of bytes to read hint
|
||||
* @param file_size - size of file
|
||||
*/
|
||||
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
||||
const std::string & filename,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> size = {},
|
||||
std::optional<size_t> read_hint = {},
|
||||
std::optional<size_t> file_size = {},
|
||||
int flags_ = -1,
|
||||
char * existing_memory = nullptr,
|
||||
size_t alignment = 0);
|
||||
|
Loading…
Reference in New Issue
Block a user