mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Merge pull request #9258 from Jokser/disk-s3-read-write-buffers-refactoring
Disk->writeFile() returns WriteBufferFromFileBase.
This commit is contained in:
commit
abe8315861
@ -5,19 +5,20 @@
|
||||
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
|
||||
#include <Poco/File.h>
|
||||
#include <Poco/Exception.h>
|
||||
#include <mutex>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <IO/ReadBufferFromFileDescriptor.h>
|
||||
#include <IO/WriteBufferFromFileDescriptor.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <common/Types.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
@ -99,8 +100,8 @@ public:
|
||||
res += delta;
|
||||
|
||||
DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
|
||||
wb.seek(0);
|
||||
wb.truncate();
|
||||
wb.seek(0, SEEK_SET);
|
||||
wb.truncate(0);
|
||||
DB::writeIntText(res, wb);
|
||||
DB::writeChar('\n', wb);
|
||||
wb.sync();
|
||||
@ -169,8 +170,8 @@ public:
|
||||
if (broken)
|
||||
{
|
||||
DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
|
||||
wb.seek(0);
|
||||
wb.truncate();
|
||||
wb.seek(0, SEEK_SET);
|
||||
wb.truncate(0);
|
||||
DB::writeIntText(value, wb);
|
||||
DB::writeChar('\n', wb);
|
||||
wb.sync();
|
||||
|
@ -5,6 +5,9 @@
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/quoteString.h>
|
||||
|
||||
#include <IO/createReadBufferFromFileBase.h>
|
||||
#include <IO/createWriteBufferFromFileBase.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -46,8 +49,10 @@ private:
|
||||
class DiskLocalDirectoryIterator : public IDiskDirectoryIterator
|
||||
{
|
||||
public:
|
||||
explicit DiskLocalDirectoryIterator(const String & disk_path_, const String & dir_path_) :
|
||||
dir_path(dir_path_), iter(disk_path_ + dir_path_) {}
|
||||
explicit DiskLocalDirectoryIterator(const String & disk_path_, const String & dir_path_)
|
||||
: dir_path(dir_path_), iter(disk_path_ + dir_path_)
|
||||
{
|
||||
}
|
||||
|
||||
void next() override { ++iter; }
|
||||
|
||||
@ -200,15 +205,17 @@ void DiskLocal::copyFile(const String & from_path, const String & to_path)
|
||||
Poco::File(disk_path + from_path).copyTo(disk_path + to_path);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskLocal::readFile(const String & path, size_t buf_size) const
|
||||
std::unique_ptr<ReadBufferFromFileBase>
|
||||
DiskLocal::readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const
|
||||
{
|
||||
return std::make_unique<ReadBufferFromFile>(disk_path + path, buf_size);
|
||||
return createReadBufferFromFileBase(disk_path + path, estimated_size, aio_threshold, mmap_threshold, buf_size);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBuffer> DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode)
|
||||
std::unique_ptr<WriteBufferFromFileBase>
|
||||
DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold)
|
||||
{
|
||||
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
|
||||
return std::make_unique<WriteBufferFromFile>(disk_path + path, buf_size, flags);
|
||||
return createWriteBufferFromFileBase(disk_path + path, estimated_size, aio_threshold, buf_size, flags);
|
||||
}
|
||||
|
||||
void DiskLocal::remove(const String & path)
|
||||
|
@ -1,8 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <Disks/IDisk.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
@ -67,9 +67,19 @@ public:
|
||||
|
||||
void copyFile(const String & from_path, const String & to_path) override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
size_t estimated_size = 0,
|
||||
size_t aio_threshold = 0,
|
||||
size_t mmap_threshold = 0) const override;
|
||||
|
||||
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override;
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
WriteMode mode = WriteMode::Rewrite,
|
||||
size_t estimated_size = 0,
|
||||
size_t aio_threshold = 0) override;
|
||||
|
||||
void remove(const String & path) override;
|
||||
|
||||
|
@ -1,8 +1,9 @@
|
||||
#include "DiskMemory.h"
|
||||
#include "DiskFactory.h"
|
||||
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
#include <IO/WriteBufferFromFileBase.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
@ -39,87 +40,100 @@ private:
|
||||
std::vector<String>::iterator iter;
|
||||
};
|
||||
|
||||
ReadIndirectBuffer::ReadIndirectBuffer(String path_, const String & data_)
|
||||
: ReadBufferFromFileBase(), buf(ReadBufferFromString(data_)), path(std::move(path_))
|
||||
/// Adapter with actual behaviour as ReadBufferFromString.
|
||||
class ReadIndirectBuffer : public ReadBufferFromFileBase
|
||||
{
|
||||
internal_buffer = buf.buffer();
|
||||
working_buffer = internal_buffer;
|
||||
pos = working_buffer.begin();
|
||||
}
|
||||
|
||||
off_t ReadIndirectBuffer::seek(off_t offset, int whence)
|
||||
{
|
||||
if (whence == SEEK_SET)
|
||||
public:
|
||||
ReadIndirectBuffer(String path_, const String & data_)
|
||||
: ReadBufferFromFileBase(), impl(ReadBufferFromString(data_)), path(std::move(path_))
|
||||
{
|
||||
if (offset >= 0 && working_buffer.begin() + offset < working_buffer.end())
|
||||
internal_buffer = impl.buffer();
|
||||
working_buffer = internal_buffer;
|
||||
pos = working_buffer.begin();
|
||||
}
|
||||
|
||||
std::string getFileName() const override { return path; }
|
||||
|
||||
off_t seek(off_t off, int whence) override
|
||||
{
|
||||
impl.swap(*this);
|
||||
off_t result = impl.seek(off, whence);
|
||||
impl.swap(*this);
|
||||
return result;
|
||||
}
|
||||
|
||||
off_t getPosition() override { return pos - working_buffer.begin(); }
|
||||
|
||||
private:
|
||||
ReadBufferFromString impl;
|
||||
const String path;
|
||||
};
|
||||
|
||||
/// This class is responsible to update files metadata after buffer is finalized.
|
||||
class WriteIndirectBuffer : public WriteBufferFromFileBase
|
||||
{
|
||||
public:
|
||||
WriteIndirectBuffer(DiskMemory * disk_, String path_, WriteMode mode_, size_t buf_size)
|
||||
: WriteBufferFromFileBase(buf_size, nullptr, 0), impl(), disk(disk_), path(std::move(path_)), mode(mode_)
|
||||
{
|
||||
}
|
||||
|
||||
~WriteIndirectBuffer() override
|
||||
{
|
||||
try
|
||||
{
|
||||
pos = working_buffer.begin() + offset;
|
||||
return size_t(pos - working_buffer.begin());
|
||||
finalize();
|
||||
}
|
||||
else
|
||||
throw Exception(
|
||||
"Seek position is out of bounds. "
|
||||
"Offset: "
|
||||
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
|
||||
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
||||
}
|
||||
else if (whence == SEEK_CUR)
|
||||
{
|
||||
Position new_pos = pos + offset;
|
||||
if (new_pos >= working_buffer.begin() && new_pos < working_buffer.end())
|
||||
catch (...)
|
||||
{
|
||||
pos = new_pos;
|
||||
return size_t(pos - working_buffer.begin());
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
else
|
||||
throw Exception(
|
||||
"Seek position is out of bounds. "
|
||||
"Offset: "
|
||||
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
|
||||
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
||||
}
|
||||
else
|
||||
throw Exception("Only SEEK_SET and SEEK_CUR seek modes allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
||||
}
|
||||
|
||||
off_t ReadIndirectBuffer::getPosition()
|
||||
{
|
||||
return pos - working_buffer.begin();
|
||||
}
|
||||
|
||||
void WriteIndirectBuffer::finalize()
|
||||
{
|
||||
if (isFinished())
|
||||
return;
|
||||
|
||||
next();
|
||||
WriteBufferFromVector::finalize();
|
||||
|
||||
auto iter = disk->files.find(path);
|
||||
|
||||
if (iter == disk->files.end())
|
||||
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
|
||||
|
||||
// Resize to the actual number of bytes written to string.
|
||||
value.resize(count());
|
||||
|
||||
if (mode == WriteMode::Rewrite)
|
||||
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, value});
|
||||
else if (mode == WriteMode::Append)
|
||||
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, iter->second.data + value});
|
||||
}
|
||||
|
||||
WriteIndirectBuffer::~WriteIndirectBuffer()
|
||||
{
|
||||
try
|
||||
void finalize() override
|
||||
{
|
||||
finalize();
|
||||
if (impl.isFinished())
|
||||
return;
|
||||
|
||||
next();
|
||||
|
||||
/// str() finalizes buffer.
|
||||
String value = impl.str();
|
||||
|
||||
auto iter = disk->files.find(path);
|
||||
|
||||
if (iter == disk->files.end())
|
||||
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
|
||||
|
||||
/// Resize to the actual number of bytes written to string.
|
||||
value.resize(count());
|
||||
|
||||
if (mode == WriteMode::Rewrite)
|
||||
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, value});
|
||||
else if (mode == WriteMode::Append)
|
||||
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, iter->second.data + value});
|
||||
}
|
||||
catch (...)
|
||||
|
||||
std::string getFileName() const override { return path; }
|
||||
|
||||
void sync() override {}
|
||||
|
||||
private:
|
||||
void nextImpl() override
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
if (!offset())
|
||||
return;
|
||||
|
||||
impl.write(working_buffer.begin(), offset());
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
WriteBufferFromOwnString impl;
|
||||
DiskMemory * disk;
|
||||
const String path;
|
||||
const WriteMode mode;
|
||||
};
|
||||
|
||||
|
||||
ReservationPtr DiskMemory::reserve(UInt64 /*bytes*/)
|
||||
{
|
||||
@ -302,7 +316,7 @@ void DiskMemory::copyFile(const String & /*from_path*/, const String & /*to_path
|
||||
throw Exception("Method copyFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, size_t /*buf_size*/) const
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, size_t /*buf_size*/, size_t, size_t, size_t) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
@ -313,7 +327,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path
|
||||
return std::make_unique<ReadIndirectBuffer>(path, iter->second.data);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBuffer> DiskMemory::writeFile(const String & path, size_t /*buf_size*/, WriteMode mode)
|
||||
std::unique_ptr<WriteBufferFromFileBase> DiskMemory::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
@ -328,7 +342,7 @@ std::unique_ptr<WriteBuffer> DiskMemory::writeFile(const String & path, size_t /
|
||||
files.emplace(path, FileData{FileType::File});
|
||||
}
|
||||
|
||||
return std::make_unique<WriteIndirectBuffer>(this, path, mode);
|
||||
return std::make_unique<WriteIndirectBuffer>(this, path, mode, buf_size);
|
||||
}
|
||||
|
||||
void DiskMemory::remove(const String & path)
|
||||
|
@ -5,46 +5,13 @@
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <Disks/IDisk.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class DiskMemory;
|
||||
class ReadBuffer;
|
||||
class WriteBuffer;
|
||||
class ReadBufferFromFileBase;
|
||||
class WriteBufferFromFileBase;
|
||||
|
||||
/// Adapter with actual behaviour as ReadBufferFromString.
|
||||
class ReadIndirectBuffer : public ReadBufferFromFileBase
|
||||
{
|
||||
public:
|
||||
ReadIndirectBuffer(String path_, const String & data_);
|
||||
|
||||
std::string getFileName() const override { return path; }
|
||||
off_t seek(off_t off, int whence) override;
|
||||
off_t getPosition() override;
|
||||
|
||||
private:
|
||||
ReadBufferFromString buf;
|
||||
String path;
|
||||
};
|
||||
|
||||
/// This class is responsible to update files metadata after buffer is finalized.
|
||||
class WriteIndirectBuffer : public WriteBufferFromOwnString
|
||||
{
|
||||
public:
|
||||
WriteIndirectBuffer(DiskMemory * disk_, String path_, WriteMode mode_) : disk(disk_), path(std::move(path_)), mode(mode_) {}
|
||||
|
||||
~WriteIndirectBuffer() override;
|
||||
|
||||
void finalize() override;
|
||||
|
||||
private:
|
||||
DiskMemory * disk;
|
||||
String path;
|
||||
WriteMode mode;
|
||||
};
|
||||
|
||||
/** Implementation of Disk intended only for testing purposes.
|
||||
* All filesystem objects are stored in memory and lost on server restart.
|
||||
@ -93,10 +60,19 @@ public:
|
||||
|
||||
void copyFile(const String & from_path, const String & to_path) override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
size_t estimated_size = 0,
|
||||
size_t aio_threshold = 0,
|
||||
size_t mmap_threshold = 0) const override;
|
||||
|
||||
std::unique_ptr<WriteBuffer>
|
||||
writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override;
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
WriteMode mode = WriteMode::Rewrite,
|
||||
size_t estimated_size = 0,
|
||||
size_t aio_threshold = 0) override;
|
||||
|
||||
void remove(const String & path) override;
|
||||
|
||||
|
@ -52,23 +52,23 @@ namespace
|
||||
*/
|
||||
struct Metadata
|
||||
{
|
||||
// Metadata file version.
|
||||
/// Metadata file version.
|
||||
const UInt32 VERSION = 1;
|
||||
|
||||
using PathAndSize = std::pair<String, size_t>;
|
||||
|
||||
// Path to metadata file on local FS.
|
||||
/// Path to metadata file on local FS.
|
||||
String metadata_file_path;
|
||||
// S3 object references count.
|
||||
/// S3 object references count.
|
||||
UInt32 s3_objects_count;
|
||||
// Total size of all S3 objects.
|
||||
/// Total size of all S3 objects.
|
||||
size_t total_size;
|
||||
// S3 objects paths and their sizes.
|
||||
/// S3 objects paths and their sizes.
|
||||
std::vector<PathAndSize> s3_objects;
|
||||
|
||||
explicit Metadata(const Poco::File & file) : Metadata(file.path(), false) {}
|
||||
|
||||
// Load metadata by path or create empty if `create` flag is set.
|
||||
/// Load metadata by path or create empty if `create` flag is set.
|
||||
explicit Metadata(const String & file_path, bool create = false)
|
||||
: metadata_file_path(file_path), s3_objects_count(0), total_size(0), s3_objects(0)
|
||||
{
|
||||
@ -112,7 +112,8 @@ namespace
|
||||
s3_objects.emplace_back(path, size);
|
||||
}
|
||||
|
||||
void save()
|
||||
/// Fsync metadata file if 'sync' flag is set.
|
||||
void save(bool sync = false)
|
||||
{
|
||||
WriteBufferFromFile buf(metadata_file_path, 1024);
|
||||
|
||||
@ -132,11 +133,12 @@ namespace
|
||||
writeChar('\n', buf);
|
||||
}
|
||||
buf.finalize();
|
||||
if (sync)
|
||||
buf.sync();
|
||||
}
|
||||
};
|
||||
|
||||
// Reads data from S3.
|
||||
// It supports reading from multiple S3 paths that resides in Metadata.
|
||||
/// Reads data from S3 using stored paths in metadata.
|
||||
class ReadIndirectBufferFromS3 : public ReadBufferFromFileBase
|
||||
{
|
||||
public:
|
||||
@ -148,7 +150,6 @@ namespace
|
||||
, metadata(std::move(metadata_))
|
||||
, buf_size(buf_size_)
|
||||
, absolute_position(0)
|
||||
, initialized(false)
|
||||
, current_buf_idx(0)
|
||||
, current_buf(nullptr)
|
||||
{
|
||||
@ -196,19 +197,16 @@ namespace
|
||||
}
|
||||
offset -= size;
|
||||
}
|
||||
initialized = true;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
bool nextImpl() override
|
||||
{
|
||||
// Find first available buffer that fits to given offset.
|
||||
if (!initialized)
|
||||
{
|
||||
/// Find first available buffer that fits to given offset.
|
||||
if (!current_buf)
|
||||
current_buf = initialize();
|
||||
}
|
||||
|
||||
// If current buffer has remaining data - use it.
|
||||
/// If current buffer has remaining data - use it.
|
||||
if (current_buf && current_buf->next())
|
||||
{
|
||||
working_buffer = current_buf->buffer();
|
||||
@ -216,7 +214,7 @@ namespace
|
||||
return true;
|
||||
}
|
||||
|
||||
// If there is no available buffers - nothing to read.
|
||||
/// If there is no available buffers - nothing to read.
|
||||
if (current_buf_idx + 1 >= metadata.s3_objects_count)
|
||||
return false;
|
||||
|
||||
@ -237,13 +235,12 @@ namespace
|
||||
size_t buf_size;
|
||||
|
||||
size_t absolute_position = 0;
|
||||
bool initialized;
|
||||
UInt32 current_buf_idx;
|
||||
std::unique_ptr<ReadBufferFromS3> current_buf;
|
||||
};
|
||||
|
||||
/// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS.
|
||||
class WriteIndirectBufferFromS3 : public WriteBufferFromS3
|
||||
class WriteIndirectBufferFromS3 : public WriteBufferFromFileBase
|
||||
{
|
||||
public:
|
||||
WriteIndirectBufferFromS3(
|
||||
@ -253,25 +250,15 @@ namespace
|
||||
const String & s3_path_,
|
||||
size_t min_upload_part_size,
|
||||
size_t buf_size_)
|
||||
: WriteBufferFromS3(client_ptr_, bucket_, s3_path_, min_upload_part_size, buf_size_)
|
||||
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
|
||||
, impl(WriteBufferFromS3(client_ptr_, bucket_, s3_path_, min_upload_part_size, buf_size_))
|
||||
, metadata(std::move(metadata_))
|
||||
, s3_path(s3_path_)
|
||||
{
|
||||
}
|
||||
|
||||
void finalize() override
|
||||
{
|
||||
WriteBufferFromS3::finalize();
|
||||
metadata.addObject(s3_path, total_size);
|
||||
metadata.save();
|
||||
finalized = true;
|
||||
}
|
||||
|
||||
~WriteIndirectBufferFromS3() override
|
||||
{
|
||||
if (finalized)
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
finalize();
|
||||
@ -282,7 +269,38 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
void finalize() override
|
||||
{
|
||||
if (finalized)
|
||||
return;
|
||||
|
||||
next();
|
||||
impl.finalize();
|
||||
|
||||
metadata.addObject(s3_path, count());
|
||||
metadata.save();
|
||||
|
||||
finalized = true;
|
||||
}
|
||||
|
||||
void sync() override { metadata.save(true); }
|
||||
std::string getFileName() const override { return metadata.metadata_file_path; }
|
||||
|
||||
private:
|
||||
void nextImpl() override
|
||||
{
|
||||
/// Transfer current working buffer to WriteBufferFromS3.
|
||||
impl.swap(*this);
|
||||
|
||||
/// Write actual data to S3.
|
||||
impl.next();
|
||||
|
||||
/// Return back working buffer.
|
||||
impl.swap(*this);
|
||||
}
|
||||
|
||||
private:
|
||||
WriteBufferFromS3 impl;
|
||||
bool finalized = false;
|
||||
Metadata metadata;
|
||||
String s3_path;
|
||||
@ -457,7 +475,7 @@ void DiskS3::copyFile(const String & from_path, const String & to_path)
|
||||
to.save();
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size) const
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t) const
|
||||
{
|
||||
Metadata metadata(metadata_path + path);
|
||||
|
||||
@ -468,19 +486,19 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
|
||||
return std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, buf_size);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBuffer> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode)
|
||||
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t)
|
||||
{
|
||||
bool exist = exists(path);
|
||||
// Reference to store new S3 object.
|
||||
/// Path to store new S3 object.
|
||||
auto s3_path = s3_root_path + getRandomName();
|
||||
if (!exist || mode == WriteMode::Rewrite)
|
||||
{
|
||||
// If metadata file exists - remove and create new.
|
||||
/// If metadata file exists - remove and create new.
|
||||
if (exist)
|
||||
remove(path);
|
||||
|
||||
Metadata metadata(metadata_path + path, true);
|
||||
// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
|
||||
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
|
||||
metadata.save();
|
||||
|
||||
LOG_DEBUG(&Logger::get("DiskS3"), "Write to file by path: " << backQuote(metadata_path + path) << " New S3 path: " << s3_path);
|
||||
@ -512,7 +530,7 @@ void DiskS3::remove(const String & path)
|
||||
{
|
||||
auto s3_path = metadata.s3_objects[i].first;
|
||||
|
||||
// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
|
||||
/// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
|
||||
Aws::S3::Model::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(s3_path);
|
||||
|
@ -21,8 +21,13 @@ class DiskS3 : public IDisk
|
||||
public:
|
||||
friend class DiskS3Reservation;
|
||||
|
||||
DiskS3(String name_, std::shared_ptr<Aws::S3::S3Client> client_, String bucket_, String s3_root_path_,
|
||||
String metadata_path_, size_t min_upload_part_size_);
|
||||
DiskS3(
|
||||
String name_,
|
||||
std::shared_ptr<Aws::S3::S3Client> client_,
|
||||
String bucket_,
|
||||
String s3_root_path_,
|
||||
String metadata_path_,
|
||||
size_t min_upload_part_size_);
|
||||
|
||||
const String & getName() const override { return name; }
|
||||
|
||||
@ -62,9 +67,19 @@ public:
|
||||
|
||||
void copyFile(const String & from_path, const String & to_path) override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size) const override;
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
size_t estimated_size = 0,
|
||||
size_t aio_threshold = 0,
|
||||
size_t mmap_threshold = 0) const override;
|
||||
|
||||
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
WriteMode mode = WriteMode::Rewrite,
|
||||
size_t estimated_size = 0,
|
||||
size_t aio_threshold = 0) override;
|
||||
|
||||
void remove(const String & path) override;
|
||||
|
||||
|
@ -26,7 +26,7 @@ class IReservation;
|
||||
using ReservationPtr = std::unique_ptr<IReservation>;
|
||||
|
||||
class ReadBufferFromFileBase;
|
||||
class WriteBuffer;
|
||||
class WriteBufferFromFileBase;
|
||||
|
||||
/**
|
||||
* Mode of opening a file for write.
|
||||
@ -121,11 +121,21 @@ public:
|
||||
/// Copy the file from `from_path` to `to_path`.
|
||||
virtual void copyFile(const String & from_path, const String & to_path) = 0;
|
||||
|
||||
/// Open the file for read and return SeekableReadBuffer object.
|
||||
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const = 0;
|
||||
/// Open the file for read and return ReadBufferFromFileBase object.
|
||||
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const String & path,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
size_t estimated_size = 0,
|
||||
size_t aio_threshold = 0,
|
||||
size_t mmap_threshold = 0) const = 0;
|
||||
|
||||
/// Open the file for write and return WriteBuffer object.
|
||||
virtual std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) = 0;
|
||||
/// Open the file for write and return WriteBufferFromFileBase object.
|
||||
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(
|
||||
const String & path,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
WriteMode mode = WriteMode::Rewrite,
|
||||
size_t estimated_size = 0,
|
||||
size_t aio_threshold = 0) = 0;
|
||||
|
||||
/// Remove file or directory. Throws exception if file doesn't exists or if directory is not empty.
|
||||
virtual void remove(const String & path) = 0;
|
||||
|
@ -139,7 +139,7 @@ void WriteBufferAIO::nextImpl()
|
||||
is_pending_write = true;
|
||||
}
|
||||
|
||||
off_t WriteBufferAIO::doSeek(off_t off, int whence)
|
||||
off_t WriteBufferAIO::seek(off_t off, int whence)
|
||||
{
|
||||
flush();
|
||||
|
||||
@ -169,7 +169,7 @@ off_t WriteBufferAIO::doSeek(off_t off, int whence)
|
||||
return pos_in_file;
|
||||
}
|
||||
|
||||
void WriteBufferAIO::doTruncate(off_t length)
|
||||
void WriteBufferAIO::truncate(off_t length)
|
||||
{
|
||||
flush();
|
||||
|
||||
|
@ -34,15 +34,15 @@ public:
|
||||
WriteBufferAIO(const WriteBufferAIO &) = delete;
|
||||
WriteBufferAIO & operator=(const WriteBufferAIO &) = delete;
|
||||
|
||||
off_t getPositionInFile() override;
|
||||
off_t getPositionInFile();
|
||||
off_t seek(off_t off, int whence);
|
||||
void truncate(off_t length);
|
||||
void sync() override;
|
||||
std::string getFileName() const override { return filename; }
|
||||
int getFD() const override { return fd; }
|
||||
int getFD() const { return fd; }
|
||||
|
||||
private:
|
||||
void nextImpl() override;
|
||||
off_t doSeek(off_t off, int whence) override;
|
||||
void doTruncate(off_t length) override;
|
||||
|
||||
/// If there's still data in the buffer, we'll write them.
|
||||
void flush();
|
||||
|
@ -8,14 +8,4 @@ WriteBufferFromFileBase::WriteBufferFromFileBase(size_t buf_size, char * existin
|
||||
{
|
||||
}
|
||||
|
||||
off_t WriteBufferFromFileBase::seek(off_t off, int whence)
|
||||
{
|
||||
return doSeek(off, whence);
|
||||
}
|
||||
|
||||
void WriteBufferFromFileBase::truncate(off_t length)
|
||||
{
|
||||
return doTruncate(length);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -15,16 +15,8 @@ public:
|
||||
WriteBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
|
||||
~WriteBufferFromFileBase() override = default;
|
||||
|
||||
off_t seek(off_t off, int whence = SEEK_SET);
|
||||
void truncate(off_t length = 0);
|
||||
virtual off_t getPositionInFile() = 0;
|
||||
void sync() override = 0;
|
||||
virtual std::string getFileName() const = 0;
|
||||
virtual int getFD() const = 0;
|
||||
|
||||
protected:
|
||||
virtual off_t doSeek(off_t off, int whence) = 0;
|
||||
virtual void doTruncate(off_t length) = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -98,12 +98,6 @@ WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
|
||||
}
|
||||
|
||||
|
||||
off_t WriteBufferFromFileDescriptor::getPositionInFile()
|
||||
{
|
||||
return seek(0, SEEK_CUR);
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferFromFileDescriptor::sync()
|
||||
{
|
||||
/// If buffer has pending data - write it.
|
||||
@ -116,7 +110,7 @@ void WriteBufferFromFileDescriptor::sync()
|
||||
}
|
||||
|
||||
|
||||
off_t WriteBufferFromFileDescriptor::doSeek(off_t offset, int whence)
|
||||
off_t WriteBufferFromFileDescriptor::seek(off_t offset, int whence)
|
||||
{
|
||||
off_t res = lseek(fd, offset, whence);
|
||||
if (-1 == res)
|
||||
@ -126,7 +120,7 @@ off_t WriteBufferFromFileDescriptor::doSeek(off_t offset, int whence)
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferFromFileDescriptor::doTruncate(off_t length)
|
||||
void WriteBufferFromFileDescriptor::truncate(off_t length)
|
||||
{
|
||||
int res = ftruncate(fd, length);
|
||||
if (-1 == res)
|
||||
|
@ -35,19 +35,15 @@ public:
|
||||
|
||||
~WriteBufferFromFileDescriptor() override;
|
||||
|
||||
int getFD() const override
|
||||
int getFD() const
|
||||
{
|
||||
return fd;
|
||||
}
|
||||
|
||||
off_t getPositionInFile() override;
|
||||
|
||||
void sync() override;
|
||||
|
||||
private:
|
||||
off_t doSeek(off_t offset, int whence) override;
|
||||
|
||||
void doTruncate(off_t length) override;
|
||||
off_t seek(off_t offset, int whence);
|
||||
void truncate(off_t length);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -136,7 +136,6 @@ void WriteBufferFromS3::writePart(const String & data)
|
||||
{
|
||||
auto etag = outcome.GetResult().GetETag();
|
||||
part_tags.push_back(etag);
|
||||
total_size += data.size();
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"Writing part finished. "
|
||||
|
@ -38,10 +38,6 @@ private:
|
||||
|
||||
Logger * log = &Logger::get("WriteBufferFromS3");
|
||||
|
||||
protected:
|
||||
// Total size of all uploaded parts.
|
||||
size_t total_size = 0;
|
||||
|
||||
public:
|
||||
explicit WriteBufferFromS3(
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/WriteBufferFromFileBase.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
#include <IO/WriteBufferFromFileBase.h>
|
||||
#include <Compression/CompressedReadBufferFromFile.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/WriteBufferFromFileBase.h>
|
||||
#include <Compression/CompressionFactory.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
|
Loading…
Reference in New Issue
Block a user