WriteBufferFromFileBase interface refactoring.

Disk->writeFile() returns WriteBufferFromFileBase.
DiskMemory read/write buffers refactoring.
This commit is contained in:
Pavel Kovalenko 2020-02-20 19:39:32 +03:00 committed by Pavel Kovalenko
parent fc9fe91331
commit e5b81aefe3
19 changed files with 239 additions and 216 deletions

View File

@ -99,8 +99,8 @@ public:
res += delta; res += delta;
DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE); DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
wb.seek(0); wb.seek(0, SEEK_SET);
wb.truncate(); wb.truncate(0);
DB::writeIntText(res, wb); DB::writeIntText(res, wb);
DB::writeChar('\n', wb); DB::writeChar('\n', wb);
wb.sync(); wb.sync();
@ -169,8 +169,8 @@ public:
if (broken) if (broken)
{ {
DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE); DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
wb.seek(0); wb.seek(0, SEEK_SET);
wb.truncate(); wb.truncate(0);
DB::writeIntText(value, wb); DB::writeIntText(value, wb);
DB::writeChar('\n', wb); DB::writeChar('\n', wb);
wb.sync(); wb.sync();

View File

@ -5,6 +5,9 @@
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <IO/createReadBufferFromFileBase.h>
#include <IO/createWriteBufferFromFileBase.h>
namespace DB namespace DB
{ {
@ -46,8 +49,10 @@ private:
class DiskLocalDirectoryIterator : public IDiskDirectoryIterator class DiskLocalDirectoryIterator : public IDiskDirectoryIterator
{ {
public: public:
explicit DiskLocalDirectoryIterator(const String & disk_path_, const String & dir_path_) : explicit DiskLocalDirectoryIterator(const String & disk_path_, const String & dir_path_)
dir_path(dir_path_), iter(disk_path_ + dir_path_) {} : dir_path(dir_path_), iter(disk_path_ + dir_path_)
{
}
void next() override { ++iter; } void next() override { ++iter; }
@ -200,15 +205,17 @@ void DiskLocal::copyFile(const String & from_path, const String & to_path)
Poco::File(disk_path + from_path).copyTo(disk_path + to_path); Poco::File(disk_path + from_path).copyTo(disk_path + to_path);
} }
std::unique_ptr<ReadBufferFromFileBase> DiskLocal::readFile(const String & path, size_t buf_size) const std::unique_ptr<ReadBufferFromFileBase>
DiskLocal::readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const
{ {
return std::make_unique<ReadBufferFromFile>(disk_path + path, buf_size); return createReadBufferFromFileBase(disk_path + path, estimated_size, aio_threshold, mmap_threshold, buf_size);
} }
std::unique_ptr<WriteBuffer> DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode) std::unique_ptr<WriteBufferFromFileBase>
DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold)
{ {
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1; int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
return std::make_unique<WriteBufferFromFile>(disk_path + path, buf_size, flags); return createWriteBufferFromFileBase(disk_path + path, estimated_size, aio_threshold, buf_size, flags);
} }
void DiskLocal::remove(const String & path) void DiskLocal::remove(const String & path)

View File

@ -1,8 +1,8 @@
#pragma once #pragma once
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <Poco/DirectoryIterator.h> #include <Poco/DirectoryIterator.h>
@ -67,9 +67,19 @@ public:
void copyFile(const String & from_path, const String & to_path) override; void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override; std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
size_t estimated_size = 0,
size_t aio_threshold = 0,
size_t mmap_threshold = 0) const override;
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override; std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
WriteMode mode = WriteMode::Rewrite,
size_t estimated_size = 0,
size_t aio_threshold = 0) override;
void remove(const String & path) override; void remove(const String & path) override;

View File

@ -1,8 +1,9 @@
#include "DiskMemory.h" #include "DiskMemory.h"
#include "DiskFactory.h" #include "DiskFactory.h"
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <IO/SeekableReadBuffer.h> #include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
@ -39,77 +40,45 @@ private:
std::vector<String>::iterator iter; std::vector<String>::iterator iter;
}; };
ReadIndirectBuffer::ReadIndirectBuffer(String path_, const String & data_) /// Adapter with actual behaviour as ReadBufferFromString.
: ReadBufferFromFileBase(), buf(ReadBufferFromString(data_)), path(std::move(path_)) class ReadIndirectBuffer : public ReadBufferFromFileBase
{ {
internal_buffer = buf.buffer(); public:
ReadIndirectBuffer(String path_, const String & data_)
: ReadBufferFromFileBase(), impl(ReadBufferFromString(data_)), path(std::move(path_))
{
internal_buffer = impl.buffer();
working_buffer = internal_buffer; working_buffer = internal_buffer;
pos = working_buffer.begin(); pos = working_buffer.begin();
} }
off_t ReadIndirectBuffer::seek(off_t offset, int whence) std::string getFileName() const override { return path; }
off_t seek(off_t off, int whence) override
{ {
if (whence == SEEK_SET) impl.swap(*this);
{ off_t result = impl.seek(off, whence);
if (offset >= 0 && working_buffer.begin() + offset < working_buffer.end()) impl.swap(*this);
{ return result;
pos = working_buffer.begin() + offset;
return size_t(pos - working_buffer.begin());
}
else
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
else if (whence == SEEK_CUR)
{
Position new_pos = pos + offset;
if (new_pos >= working_buffer.begin() && new_pos < working_buffer.end())
{
pos = new_pos;
return size_t(pos - working_buffer.begin());
}
else
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
else
throw Exception("Only SEEK_SET and SEEK_CUR seek modes allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
} }
off_t ReadIndirectBuffer::getPosition() off_t getPosition() override { return pos - working_buffer.begin(); }
private:
ReadBufferFromString impl;
const String path;
};
/// This class is responsible to update files metadata after buffer is finalized.
class WriteIndirectBuffer : public WriteBufferFromFileBase
{
public:
WriteIndirectBuffer(DiskMemory * disk_, String path_, WriteMode mode_, size_t buf_size)
: WriteBufferFromFileBase(buf_size, nullptr, 0), impl(), disk(disk_), path(std::move(path_)), mode(mode_)
{ {
return pos - working_buffer.begin();
} }
void WriteIndirectBuffer::finalize() ~WriteIndirectBuffer() override
{
if (isFinished())
return;
next();
WriteBufferFromVector::finalize();
auto iter = disk->files.find(path);
if (iter == disk->files.end())
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
// Resize to the actual number of bytes written to string.
value.resize(count());
if (mode == WriteMode::Rewrite)
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, value});
else if (mode == WriteMode::Append)
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, iter->second.data + value});
}
WriteIndirectBuffer::~WriteIndirectBuffer()
{ {
try try
{ {
@ -121,6 +90,51 @@ WriteIndirectBuffer::~WriteIndirectBuffer()
} }
} }
void finalize() override
{
if (impl.isFinished())
return;
next();
/// str() finalizes buffer.
String value = impl.str();
auto iter = disk->files.find(path);
if (iter == disk->files.end())
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
/// Resize to the actual number of bytes written to string.
value.resize(count());
if (mode == WriteMode::Rewrite)
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, value});
else if (mode == WriteMode::Append)
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, iter->second.data + value});
}
std::string getFileName() const override { return path; }
void sync() override {}
private:
void nextImpl() override
{
if (!offset())
return;
impl.write(working_buffer.begin(), offset());
}
private:
WriteBufferFromOwnString impl;
DiskMemory * disk;
const String path;
const WriteMode mode;
};
ReservationPtr DiskMemory::reserve(UInt64 /*bytes*/) ReservationPtr DiskMemory::reserve(UInt64 /*bytes*/)
{ {
throw Exception("Method reserve is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Method reserve is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
@ -302,7 +316,7 @@ void DiskMemory::copyFile(const String & /*from_path*/, const String & /*to_path
throw Exception("Method copyFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Method copyFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
} }
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, size_t /*buf_size*/) const std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, size_t /*buf_size*/, size_t, size_t, size_t) const
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
@ -313,7 +327,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path
return std::make_unique<ReadIndirectBuffer>(path, iter->second.data); return std::make_unique<ReadIndirectBuffer>(path, iter->second.data);
} }
std::unique_ptr<WriteBuffer> DiskMemory::writeFile(const String & path, size_t /*buf_size*/, WriteMode mode) std::unique_ptr<WriteBufferFromFileBase> DiskMemory::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
@ -328,7 +342,7 @@ std::unique_ptr<WriteBuffer> DiskMemory::writeFile(const String & path, size_t /
files.emplace(path, FileData{FileType::File}); files.emplace(path, FileData{FileType::File});
} }
return std::make_unique<WriteIndirectBuffer>(this, path, mode); return std::make_unique<WriteIndirectBuffer>(this, path, mode, buf_size);
} }
void DiskMemory::remove(const String & path) void DiskMemory::remove(const String & path)

View File

@ -5,46 +5,13 @@
#include <unordered_map> #include <unordered_map>
#include <utility> #include <utility>
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
namespace DB namespace DB
{ {
class DiskMemory; class DiskMemory;
class ReadBuffer; class ReadBufferFromFileBase;
class WriteBuffer; class WriteBufferFromFileBase;
/// Adapter with actual behaviour as ReadBufferFromString.
class ReadIndirectBuffer : public ReadBufferFromFileBase
{
public:
ReadIndirectBuffer(String path_, const String & data_);
std::string getFileName() const override { return path; }
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
private:
ReadBufferFromString buf;
String path;
};
/// This class is responsible to update files metadata after buffer is finalized.
class WriteIndirectBuffer : public WriteBufferFromOwnString
{
public:
WriteIndirectBuffer(DiskMemory * disk_, String path_, WriteMode mode_) : disk(disk_), path(std::move(path_)), mode(mode_) {}
~WriteIndirectBuffer() override;
void finalize() override;
private:
DiskMemory * disk;
String path;
WriteMode mode;
};
/** Implementation of Disk intended only for testing purposes. /** Implementation of Disk intended only for testing purposes.
* All filesystem objects are stored in memory and lost on server restart. * All filesystem objects are stored in memory and lost on server restart.
@ -93,10 +60,19 @@ public:
void copyFile(const String & from_path, const String & to_path) override; void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override; std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
size_t estimated_size = 0,
size_t aio_threshold = 0,
size_t mmap_threshold = 0) const override;
std::unique_ptr<WriteBuffer> std::unique_ptr<WriteBufferFromFileBase> writeFile(
writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override; const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
WriteMode mode = WriteMode::Rewrite,
size_t estimated_size = 0,
size_t aio_threshold = 0) override;
void remove(const String & path) override; void remove(const String & path) override;

View File

@ -52,25 +52,28 @@ namespace
*/ */
struct Metadata struct Metadata
{ {
// Metadata file version. /// Metadata file version.
const UInt32 VERSION = 1; const UInt32 VERSION = 1;
using PathAndSize = std::pair<String, size_t>; using PathAndSize = std::pair<String, size_t>;
// Path to metadata file on local FS. /// Path to metadata file on local FS.
String metadata_file_path; String metadata_file_path;
// S3 object references count. /// S3 object references count.
UInt32 s3_objects_count; UInt32 s3_objects_count;
// Total size of all S3 objects. /// Total size of all S3 objects.
size_t total_size; size_t total_size;
// S3 objects paths and their sizes. /// S3 objects paths and their sizes.
std::vector<PathAndSize> s3_objects; std::vector<PathAndSize> s3_objects;
explicit Metadata(const Poco::File & file) : Metadata(file.path(), false) {} explicit Metadata(const Poco::File & file) : Metadata(file.path(), false) {}
// Load metadata by path or create empty if `create` flag is set. /// Load metadata by path or create empty if `create` flag is set.
explicit Metadata(const String & file_path, bool create = false) explicit Metadata(const String & file_path, bool create = false)
: metadata_file_path(file_path), s3_objects_count(0), total_size(0), s3_objects(0) : metadata_file_path(file_path)
, s3_objects_count(0)
, total_size(0)
, s3_objects(0)
{ {
if (create) if (create)
return; return;
@ -112,7 +115,8 @@ namespace
s3_objects.emplace_back(path, size); s3_objects.emplace_back(path, size);
} }
void save() /// Fsync metadata file if 'sync' flag is set.
void save(bool sync = false)
{ {
WriteBufferFromFile buf(metadata_file_path, 1024); WriteBufferFromFile buf(metadata_file_path, 1024);
@ -132,11 +136,12 @@ namespace
writeChar('\n', buf); writeChar('\n', buf);
} }
buf.finalize(); buf.finalize();
if (sync)
buf.sync();
} }
}; };
// Reads data from S3. /// Reads data from S3 using stored paths in metadata.
// It supports reading from multiple S3 paths that resides in Metadata.
class ReadIndirectBufferFromS3 : public ReadBufferFromFileBase class ReadIndirectBufferFromS3 : public ReadBufferFromFileBase
{ {
public: public:
@ -148,7 +153,6 @@ namespace
, metadata(std::move(metadata_)) , metadata(std::move(metadata_))
, buf_size(buf_size_) , buf_size(buf_size_)
, absolute_position(0) , absolute_position(0)
, initialized(false)
, current_buf_idx(0) , current_buf_idx(0)
, current_buf(nullptr) , current_buf(nullptr)
{ {
@ -196,19 +200,16 @@ namespace
} }
offset -= size; offset -= size;
} }
initialized = true;
return nullptr; return nullptr;
} }
bool nextImpl() override bool nextImpl() override
{ {
// Find first available buffer that fits to given offset. /// Find first available buffer that fits to given offset.
if (!initialized) if (!current_buf)
{
current_buf = initialize(); current_buf = initialize();
}
// If current buffer has remaining data - use it. /// If current buffer has remaining data - use it.
if (current_buf && current_buf->next()) if (current_buf && current_buf->next())
{ {
working_buffer = current_buf->buffer(); working_buffer = current_buf->buffer();
@ -216,7 +217,7 @@ namespace
return true; return true;
} }
// If there is no available buffers - nothing to read. /// If there is no available buffers - nothing to read.
if (current_buf_idx + 1 >= metadata.s3_objects_count) if (current_buf_idx + 1 >= metadata.s3_objects_count)
return false; return false;
@ -237,13 +238,12 @@ namespace
size_t buf_size; size_t buf_size;
size_t absolute_position = 0; size_t absolute_position = 0;
bool initialized;
UInt32 current_buf_idx; UInt32 current_buf_idx;
std::unique_ptr<ReadBufferFromS3> current_buf; std::unique_ptr<ReadBufferFromS3> current_buf;
}; };
/// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS. /// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS.
class WriteIndirectBufferFromS3 : public WriteBufferFromS3 class WriteIndirectBufferFromS3 : public WriteBufferFromFileBase
{ {
public: public:
WriteIndirectBufferFromS3( WriteIndirectBufferFromS3(
@ -253,25 +253,15 @@ namespace
const String & s3_path_, const String & s3_path_,
size_t min_upload_part_size, size_t min_upload_part_size,
size_t buf_size_) size_t buf_size_)
: WriteBufferFromS3(client_ptr_, bucket_, s3_path_, min_upload_part_size, buf_size_) : WriteBufferFromFileBase(buf_size_, nullptr, 0)
, impl(WriteBufferFromS3(client_ptr_, bucket_, s3_path_, min_upload_part_size, buf_size_))
, metadata(std::move(metadata_)) , metadata(std::move(metadata_))
, s3_path(s3_path_) , s3_path(s3_path_)
{ {
} }
void finalize() override
{
WriteBufferFromS3::finalize();
metadata.addObject(s3_path, total_size);
metadata.save();
finalized = true;
}
~WriteIndirectBufferFromS3() override ~WriteIndirectBufferFromS3() override
{ {
if (finalized)
return;
try try
{ {
finalize(); finalize();
@ -282,7 +272,38 @@ namespace
} }
} }
void finalize() override
{
if (finalized)
return;
next();
impl.finalize();
metadata.addObject(s3_path, count());
metadata.save();
finalized = true;
}
void sync() override { metadata.save(true); }
std::string getFileName() const override { return metadata.metadata_file_path; }
private: private:
void nextImpl() override
{
/// Transfer current working buffer to WriteBufferFromS3.
impl.swap(*this);
/// Write actual data to S3.
impl.next();
/// Return back working buffer.
impl.swap(*this);
}
private:
WriteBufferFromS3 impl;
bool finalized = false; bool finalized = false;
Metadata metadata; Metadata metadata;
String s3_path; String s3_path;
@ -457,7 +478,7 @@ void DiskS3::copyFile(const String & from_path, const String & to_path)
to.save(); to.save();
} }
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size) const std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t) const
{ {
Metadata metadata(metadata_path + path); Metadata metadata(metadata_path + path);
@ -468,19 +489,19 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
return std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, buf_size); return std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, buf_size);
} }
std::unique_ptr<WriteBuffer> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode) std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t)
{ {
bool exist = exists(path); bool exist = exists(path);
// Reference to store new S3 object. /// Path to store new S3 object.
auto s3_path = s3_root_path + getRandomName(); auto s3_path = s3_root_path + getRandomName();
if (!exist || mode == WriteMode::Rewrite) if (!exist || mode == WriteMode::Rewrite)
{ {
// If metadata file exists - remove and create new. /// If metadata file exists - remove and create new.
if (exist) if (exist)
remove(path); remove(path);
Metadata metadata(metadata_path + path, true); Metadata metadata(metadata_path + path, true);
// Save empty metadata to disk to have ability to get file size while buffer is not finalized. /// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
metadata.save(); metadata.save();
LOG_DEBUG(&Logger::get("DiskS3"), "Write to file by path: " << backQuote(metadata_path + path) << " New S3 path: " << s3_path); LOG_DEBUG(&Logger::get("DiskS3"), "Write to file by path: " << backQuote(metadata_path + path) << " New S3 path: " << s3_path);
@ -512,7 +533,7 @@ void DiskS3::remove(const String & path)
{ {
auto s3_path = metadata.s3_objects[i].first; auto s3_path = metadata.s3_objects[i].first;
// TODO: Make operation idempotent. Do not throw exception if key is already deleted. /// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
Aws::S3::Model::DeleteObjectRequest request; Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket); request.SetBucket(bucket);
request.SetKey(s3_path); request.SetKey(s3_path);

View File

@ -21,8 +21,13 @@ class DiskS3 : public IDisk
public: public:
friend class DiskS3Reservation; friend class DiskS3Reservation;
DiskS3(String name_, std::shared_ptr<Aws::S3::S3Client> client_, String bucket_, String s3_root_path_, DiskS3(
String metadata_path_, size_t min_upload_part_size_); String name_,
std::shared_ptr<Aws::S3::S3Client> client_,
String bucket_,
String s3_root_path_,
String metadata_path_,
size_t min_upload_part_size_);
const String & getName() const override { return name; } const String & getName() const override { return name; }
@ -62,9 +67,19 @@ public:
void copyFile(const String & from_path, const String & to_path) override; void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size) const override; std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
size_t estimated_size = 0,
size_t aio_threshold = 0,
size_t mmap_threshold = 0) const override;
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size, WriteMode mode) override; std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
WriteMode mode = WriteMode::Rewrite,
size_t estimated_size = 0,
size_t aio_threshold = 0) override;
void remove(const String & path) override; void remove(const String & path) override;

View File

@ -26,7 +26,7 @@ class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>; using ReservationPtr = std::unique_ptr<IReservation>;
class ReadBufferFromFileBase; class ReadBufferFromFileBase;
class WriteBuffer; class WriteBufferFromFileBase;
/** /**
* Mode of opening a file for write. * Mode of opening a file for write.
@ -121,11 +121,21 @@ public:
/// Copy the file from `from_path` to `to_path`. /// Copy the file from `from_path` to `to_path`.
virtual void copyFile(const String & from_path, const String & to_path) = 0; virtual void copyFile(const String & from_path, const String & to_path) = 0;
/// Open the file for read and return SeekableReadBuffer object. /// Open the file for read and return ReadBufferFromFileBase object.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const = 0; virtual std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
size_t estimated_size = 0,
size_t aio_threshold = 0,
size_t mmap_threshold = 0) const = 0;
/// Open the file for write and return WriteBuffer object. /// Open the file for write and return WriteBufferFromFileBase object.
virtual std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) = 0; virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
WriteMode mode = WriteMode::Rewrite,
size_t estimated_size = 0,
size_t aio_threshold = 0) = 0;
/// Remove file or directory. Throws exception if file doesn't exists or if directory is not empty. /// Remove file or directory. Throws exception if file doesn't exists or if directory is not empty.
virtual void remove(const String & path) = 0; virtual void remove(const String & path) = 0;

View File

@ -139,7 +139,7 @@ void WriteBufferAIO::nextImpl()
is_pending_write = true; is_pending_write = true;
} }
off_t WriteBufferAIO::doSeek(off_t off, int whence) off_t WriteBufferAIO::seek(off_t off, int whence)
{ {
flush(); flush();
@ -169,7 +169,7 @@ off_t WriteBufferAIO::doSeek(off_t off, int whence)
return pos_in_file; return pos_in_file;
} }
void WriteBufferAIO::doTruncate(off_t length) void WriteBufferAIO::truncate(off_t length)
{ {
flush(); flush();

View File

@ -34,15 +34,15 @@ public:
WriteBufferAIO(const WriteBufferAIO &) = delete; WriteBufferAIO(const WriteBufferAIO &) = delete;
WriteBufferAIO & operator=(const WriteBufferAIO &) = delete; WriteBufferAIO & operator=(const WriteBufferAIO &) = delete;
off_t getPositionInFile() override; off_t getPositionInFile();
off_t seek(off_t off, int whence);
void truncate(off_t length);
void sync() override; void sync() override;
std::string getFileName() const override { return filename; } std::string getFileName() const override { return filename; }
int getFD() const override { return fd; } int getFD() const { return fd; }
private: private:
void nextImpl() override; void nextImpl() override;
off_t doSeek(off_t off, int whence) override;
void doTruncate(off_t length) override;
/// If there's still data in the buffer, we'll write them. /// If there's still data in the buffer, we'll write them.
void flush(); void flush();

View File

@ -8,14 +8,4 @@ WriteBufferFromFileBase::WriteBufferFromFileBase(size_t buf_size, char * existin
{ {
} }
off_t WriteBufferFromFileBase::seek(off_t off, int whence)
{
return doSeek(off, whence);
}
void WriteBufferFromFileBase::truncate(off_t length)
{
return doTruncate(length);
}
} }

View File

@ -15,16 +15,8 @@ public:
WriteBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment); WriteBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
~WriteBufferFromFileBase() override = default; ~WriteBufferFromFileBase() override = default;
off_t seek(off_t off, int whence = SEEK_SET);
void truncate(off_t length = 0);
virtual off_t getPositionInFile() = 0;
void sync() override = 0; void sync() override = 0;
virtual std::string getFileName() const = 0; virtual std::string getFileName() const = 0;
virtual int getFD() const = 0;
protected:
virtual off_t doSeek(off_t off, int whence) = 0;
virtual void doTruncate(off_t length) = 0;
}; };
} }

View File

@ -98,12 +98,6 @@ WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
} }
off_t WriteBufferFromFileDescriptor::getPositionInFile()
{
return seek(0, SEEK_CUR);
}
void WriteBufferFromFileDescriptor::sync() void WriteBufferFromFileDescriptor::sync()
{ {
/// If buffer has pending data - write it. /// If buffer has pending data - write it.
@ -116,7 +110,7 @@ void WriteBufferFromFileDescriptor::sync()
} }
off_t WriteBufferFromFileDescriptor::doSeek(off_t offset, int whence) off_t WriteBufferFromFileDescriptor::seek(off_t offset, int whence)
{ {
off_t res = lseek(fd, offset, whence); off_t res = lseek(fd, offset, whence);
if (-1 == res) if (-1 == res)
@ -126,7 +120,7 @@ off_t WriteBufferFromFileDescriptor::doSeek(off_t offset, int whence)
} }
void WriteBufferFromFileDescriptor::doTruncate(off_t length) void WriteBufferFromFileDescriptor::truncate(off_t length)
{ {
int res = ftruncate(fd, length); int res = ftruncate(fd, length);
if (-1 == res) if (-1 == res)

View File

@ -35,19 +35,15 @@ public:
~WriteBufferFromFileDescriptor() override; ~WriteBufferFromFileDescriptor() override;
int getFD() const override int getFD() const
{ {
return fd; return fd;
} }
off_t getPositionInFile() override;
void sync() override; void sync() override;
private: off_t seek(off_t offset, int whence);
off_t doSeek(off_t offset, int whence) override; void truncate(off_t length);
void doTruncate(off_t length) override;
}; };
} }

View File

@ -136,7 +136,6 @@ void WriteBufferFromS3::writePart(const String & data)
{ {
auto etag = outcome.GetResult().GetETag(); auto etag = outcome.GetResult().GetETag();
part_tags.push_back(etag); part_tags.push_back(etag);
total_size += data.size();
LOG_DEBUG( LOG_DEBUG(
log, log,
"Writing part finished. " "Writing part finished. "

View File

@ -38,10 +38,6 @@ private:
Logger * log = &Logger::get("WriteBufferFromS3"); Logger * log = &Logger::get("WriteBufferFromS3");
protected:
// Total size of all uploaded parts.
size_t total_size = 0;
public: public:
explicit WriteBufferFromS3( explicit WriteBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, std::shared_ptr<Aws::S3::S3Client> client_ptr_,

View File

@ -8,6 +8,7 @@
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <IO/ReadBufferFromFileBase.h> #include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>

View File

@ -8,6 +8,7 @@
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Compression/CompressedReadBufferFromFile.h> #include <Compression/CompressedReadBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>

View File

@ -11,6 +11,7 @@
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <IO/ReadBufferFromFileBase.h> #include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>