mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
SeekableReadBuffer refactoring.
Store size and multiple references for S3 metadata file. Log engine support for S3.
This commit is contained in:
parent
dce424fe11
commit
abfacdaadc
@ -1070,7 +1070,7 @@ try
|
||||
if (!silent)
|
||||
std::cerr << "Generating data\n";
|
||||
|
||||
file_in.seek(0);
|
||||
file_in.seek(0, SEEK_SET);
|
||||
|
||||
BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size);
|
||||
BlockOutputStreamPtr output = context.getOutputFormat(output_format, file_out, header);
|
||||
|
@ -39,7 +39,7 @@ bool CachedCompressedReadBuffer::nextImpl()
|
||||
{
|
||||
/// If not, read it from the file.
|
||||
initInput();
|
||||
file_in->seek(file_pos);
|
||||
file_in->seek(file_pos, SEEK_SET);
|
||||
|
||||
owned_cell = std::make_shared<UncompressedCacheCell>();
|
||||
|
||||
|
@ -55,7 +55,7 @@ void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t
|
||||
}
|
||||
else
|
||||
{
|
||||
file_in.seek(offset_in_compressed_file);
|
||||
file_in.seek(offset_in_compressed_file, SEEK_SET);
|
||||
|
||||
bytes += offset();
|
||||
nextImpl();
|
||||
|
@ -4,6 +4,7 @@
|
||||
# include "DiskFactory.h"
|
||||
|
||||
# include <random>
|
||||
# include <utility>
|
||||
# include <IO/S3Common.h>
|
||||
# include <IO/ReadBufferFromS3.h>
|
||||
# include <IO/WriteBufferFromS3.h>
|
||||
@ -27,6 +28,7 @@ namespace ErrorCodes
|
||||
extern const int FILE_ALREADY_EXISTS;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int PATH_ACCESS_DENIED;
|
||||
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -41,41 +43,192 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
String readKeyFromFile(const String & path)
|
||||
/**
|
||||
* S3 metadata file layout:
|
||||
* Number of references to S3 objects, Total size of all S3 objects.
|
||||
* Each reference to S3 object and size.
|
||||
*/
|
||||
struct Metadata
|
||||
{
|
||||
String key;
|
||||
ReadBufferFromFile buf(path, 1024); /* reasonable buffer size for small file */
|
||||
readStringUntilEOF(key, buf);
|
||||
return key;
|
||||
}
|
||||
// Path to metadata file on local FS.
|
||||
String local_path;
|
||||
// S3 object references count.
|
||||
UInt32 ref_count;
|
||||
// Total size of all S3 objects.
|
||||
size_t total_size;
|
||||
// References to S3 objects and their sizes.
|
||||
std::vector<std::pair<String, size_t>> references;
|
||||
|
||||
void writeKeyToFile(const String & key, const String & path)
|
||||
explicit Metadata(const Poco::File & file) : Metadata(file.path(), false) { }
|
||||
|
||||
// Load metadata by path or create empty if `create` flag is set.
|
||||
explicit Metadata(const String & path, bool create = false) :
|
||||
local_path(path), ref_count(0), total_size(0), references(0)
|
||||
{
|
||||
if (create)
|
||||
return;
|
||||
|
||||
char x; // To skip separators.
|
||||
ReadBufferFromFile buf(path, 1024); /* reasonable buffer size for small file */
|
||||
readIntText(ref_count, buf);
|
||||
readChar(x, buf);
|
||||
readIntText(total_size, buf);
|
||||
readChar(x, buf);
|
||||
references = std::vector<std::pair<String, size_t>> (ref_count);
|
||||
for (UInt32 i = 0; i < ref_count; ++i)
|
||||
{
|
||||
String ref;
|
||||
size_t size;
|
||||
readIntText(size, buf);
|
||||
readChar(x, buf);
|
||||
readEscapedString(ref, buf);
|
||||
readChar(x, buf);
|
||||
references[i] = std::make_pair(ref, size);
|
||||
}
|
||||
}
|
||||
|
||||
void addReference(const String & ref, size_t size)
|
||||
{
|
||||
ref_count++;
|
||||
total_size += size;
|
||||
references.emplace_back(ref, size);
|
||||
}
|
||||
|
||||
void save() {
|
||||
WriteBufferFromFile buf(local_path, 1024);
|
||||
writeIntText(ref_count, buf);
|
||||
writeChar('\t', buf);
|
||||
writeIntText(total_size, buf);
|
||||
writeChar('\n', buf);
|
||||
for (UInt32 i = 0; i < ref_count; ++i)
|
||||
{
|
||||
auto ref_and_size = references[i];
|
||||
writeIntText(ref_and_size.second, buf);
|
||||
writeChar('\t', buf);
|
||||
writeEscapedString(ref_and_size.first, buf);
|
||||
writeChar('\n', buf);
|
||||
}
|
||||
buf.finalize();
|
||||
}
|
||||
};
|
||||
|
||||
// Reads data from S3.
|
||||
// It supports multiple S3 references and reads them one by one.
|
||||
class ReadIndirectBufferFromS3 : public BufferWithOwnMemory<SeekableReadBuffer>
|
||||
{
|
||||
WriteBufferFromFile buf(path, 1024);
|
||||
writeString(key, buf);
|
||||
buf.next();
|
||||
}
|
||||
public:
|
||||
ReadIndirectBufferFromS3(
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
|
||||
const String & bucket_,
|
||||
Metadata metadata_,
|
||||
size_t buf_size_
|
||||
) : BufferWithOwnMemory(buf_size_)
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, bucket(bucket_)
|
||||
, metadata(std::move(metadata_))
|
||||
, buf_size(buf_size_)
|
||||
, offset(0)
|
||||
, initialized(false)
|
||||
, current_buf_idx(0)
|
||||
, current_buf(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
/// Stores data in S3 and the object key in file in local filesystem.
|
||||
off_t seek(off_t off, int) override {
|
||||
if (!initialized)
|
||||
{
|
||||
if (off < 0 || metadata.total_size <= static_cast<UInt64>(off))
|
||||
throw Exception("Seek position is out of bounds. "
|
||||
"Offset: " + std::to_string(off) + ", Max: " + std::to_string(metadata.total_size),
|
||||
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
||||
|
||||
offset = off;
|
||||
}
|
||||
return offset;
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<ReadBufferFromS3> initialize()
|
||||
{
|
||||
for (UInt32 i = 0; i < metadata.ref_count; ++i)
|
||||
{
|
||||
current_buf_idx = i;
|
||||
auto ref = metadata.references[i].first;
|
||||
auto size = metadata.references[i].second;
|
||||
if (size > offset)
|
||||
{
|
||||
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, ref, buf_size);
|
||||
buf->seek(offset, SEEK_SET);
|
||||
return buf;
|
||||
}
|
||||
offset -= size;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
bool nextImpl() override
|
||||
{
|
||||
// Find first available buffer according to offset.
|
||||
if (!initialized)
|
||||
{
|
||||
current_buf = initialize();
|
||||
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
// If current buffer has remaining data - use it.
|
||||
if (current_buf && current_buf->next())
|
||||
{
|
||||
working_buffer = current_buf->buffer();
|
||||
return true;
|
||||
}
|
||||
|
||||
// If there is no available buffers - nothing to read.
|
||||
if (current_buf_idx + 1 >= metadata.ref_count)
|
||||
return false;
|
||||
|
||||
current_buf_idx++;
|
||||
auto ref = metadata.references[current_buf_idx].first;
|
||||
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, ref, buf_size);
|
||||
current_buf->next();
|
||||
working_buffer = current_buf->buffer();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr;
|
||||
const String & bucket;
|
||||
Metadata metadata;
|
||||
size_t buf_size;
|
||||
|
||||
size_t offset;
|
||||
bool initialized;
|
||||
UInt32 current_buf_idx;
|
||||
std::unique_ptr<ReadBufferFromS3> current_buf;
|
||||
};
|
||||
|
||||
/// Stores data in S3 and appends the object key (reference) to metadata file on local FS.
|
||||
class WriteIndirectBufferFromS3 : public WriteBufferFromS3
|
||||
{
|
||||
public:
|
||||
WriteIndirectBufferFromS3(
|
||||
std::shared_ptr<Aws::S3::S3Client> & client_ptr_,
|
||||
const String & bucket_,
|
||||
const String & metadata_path_,
|
||||
const String & s3_path_,
|
||||
Metadata metadata_,
|
||||
const String & s3_ref_,
|
||||
size_t buf_size_)
|
||||
: WriteBufferFromS3(client_ptr_, bucket_, s3_path_, DEFAULT_BLOCK_SIZE, buf_size_)
|
||||
, metadata_path(metadata_path_)
|
||||
, s3_path(s3_path_)
|
||||
: WriteBufferFromS3(client_ptr_, bucket_, s3_ref_, DEFAULT_BLOCK_SIZE, buf_size_)
|
||||
, metadata(std::move(metadata_))
|
||||
, s3_ref(s3_ref_)
|
||||
{
|
||||
}
|
||||
|
||||
void finalize() override
|
||||
{
|
||||
WriteBufferFromS3::finalize();
|
||||
writeKeyToFile(s3_path, metadata_path);
|
||||
metadata.addReference(s3_ref, total_size);
|
||||
metadata.save();
|
||||
finalized = true;
|
||||
}
|
||||
|
||||
@ -96,8 +249,8 @@ namespace
|
||||
|
||||
private:
|
||||
bool finalized = false;
|
||||
const String metadata_path;
|
||||
const String s3_path;
|
||||
Metadata metadata;
|
||||
String s3_ref;
|
||||
};
|
||||
}
|
||||
|
||||
@ -179,7 +332,7 @@ bool DiskS3::exists(const String & path) const
|
||||
|
||||
bool DiskS3::isFile(const String & path) const
|
||||
{
|
||||
return Poco::File(metadata_path + path).isFile();
|
||||
return Poco::File(metadata_path + path).isFile();
|
||||
}
|
||||
|
||||
bool DiskS3::isDirectory(const String & path) const
|
||||
@ -189,20 +342,8 @@ bool DiskS3::isDirectory(const String & path) const
|
||||
|
||||
size_t DiskS3::getFileSize(const String & path) const
|
||||
{
|
||||
// TODO: Consider storing actual file size in meta file.
|
||||
Aws::S3::Model::GetObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(getS3Path(path));
|
||||
auto outcome = client->GetObject(request);
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
auto & err = outcome.GetError();
|
||||
throw Exception(err.GetMessage(), static_cast<int>(err.GetErrorType()));
|
||||
}
|
||||
else
|
||||
{
|
||||
return outcome.GetResult().GetContentLength();
|
||||
}
|
||||
Metadata metadata(metadata_path + path);
|
||||
return metadata.total_size;
|
||||
}
|
||||
|
||||
void DiskS3::createDirectory(const String & path)
|
||||
@ -230,7 +371,7 @@ void DiskS3::clearDirectory(const String & path)
|
||||
void DiskS3::moveFile(const String & from_path, const String & to_path)
|
||||
{
|
||||
if (exists(to_path))
|
||||
throw Exception("File already exists " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
|
||||
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
|
||||
Poco::File(metadata_path + from_path).renameTo(metadata_path + to_path);
|
||||
}
|
||||
|
||||
@ -254,51 +395,90 @@ void DiskS3::copyFile(const String & from_path, const String & to_path)
|
||||
if (exists(to_path))
|
||||
remove(to_path);
|
||||
|
||||
String s3_from_path = readKeyFromFile(metadata_path + from_path);
|
||||
String s3_to_path = s3_root_path + getRandomName();
|
||||
Metadata from(metadata_path + from_path);
|
||||
Metadata to(metadata_path + to_path, true);
|
||||
|
||||
Aws::S3::Model::CopyObjectRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetCopySource(s3_from_path);
|
||||
req.SetKey(s3_to_path);
|
||||
throwIfError(client->CopyObject(req));
|
||||
writeKeyToFile(s3_to_path, metadata_path + to_path);
|
||||
for (UInt32 i = 0; i < from.ref_count; ++i)
|
||||
{
|
||||
auto ref = from.references[i].first;
|
||||
auto size = from.references[i].second;
|
||||
auto new_ref = s3_root_path + getRandomName();
|
||||
Aws::S3::Model::CopyObjectRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetCopySource(ref);
|
||||
req.SetKey(new_ref);
|
||||
throwIfError(client->CopyObject(req));
|
||||
|
||||
to.addReference(new_ref, size);
|
||||
}
|
||||
|
||||
to.save();
|
||||
}
|
||||
|
||||
std::unique_ptr<SeekableReadBuffer> DiskS3::readFile(const String & path, size_t buf_size) const
|
||||
{
|
||||
return std::make_unique<ReadBufferFromS3>(client, bucket, getS3Path(path), buf_size);
|
||||
Metadata metadata(metadata_path + path);
|
||||
|
||||
LOG_DEBUG(
|
||||
&Logger::get("DiskS3"),
|
||||
"Read from file by path: " << backQuote(metadata_path + path)
|
||||
<< " Existing S3 references: " << metadata.ref_count);
|
||||
|
||||
return std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, buf_size);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBuffer> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode)
|
||||
{
|
||||
// TODO: Optimize append mode. Consider storing several S3 references in one meta file.
|
||||
if (!exists(path) || mode == WriteMode::Rewrite)
|
||||
bool exist = exists(path);
|
||||
// Reference to store new S3 object.
|
||||
auto s3_ref = s3_root_path + getRandomName();
|
||||
if (!exist || mode == WriteMode::Rewrite)
|
||||
{
|
||||
String new_s3_path = s3_root_path + getRandomName();
|
||||
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata_path + path, new_s3_path, buf_size);
|
||||
// If metadata file exists - remove and create new.
|
||||
if (exist)
|
||||
remove(path);
|
||||
|
||||
Metadata metadata(metadata_path + path, true);
|
||||
// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
|
||||
metadata.save();
|
||||
|
||||
LOG_DEBUG(
|
||||
&Logger::get("DiskS3"),
|
||||
"Write to file by path: " << backQuote(metadata_path + path) << " New S3 reference: " << s3_ref);
|
||||
|
||||
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_ref, buf_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto old_s3_path = getS3Path(path);
|
||||
ReadBufferFromS3 read_buffer(client, bucket, old_s3_path, buf_size);
|
||||
auto writeBuffer = std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata_path + path, old_s3_path, buf_size);
|
||||
std::vector<char> buffer(buf_size);
|
||||
while (!read_buffer.eof())
|
||||
writeBuffer->write(buffer.data(), read_buffer.read(buffer.data(), buf_size));
|
||||
return writeBuffer;
|
||||
Metadata metadata(metadata_path + path);
|
||||
|
||||
LOG_DEBUG(
|
||||
&Logger::get("DiskS3"),
|
||||
"Append to file by path: " << backQuote(metadata_path + path) << " New S3 reference: " << s3_ref
|
||||
<< " Existing S3 references: " << metadata.ref_count);
|
||||
|
||||
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_ref, buf_size);
|
||||
}
|
||||
}
|
||||
|
||||
void DiskS3::remove(const String & path)
|
||||
{
|
||||
LOG_DEBUG(&Logger::get("DiskS3"), "Remove file by path: " << backQuote(metadata_path + path));
|
||||
|
||||
Poco::File file(metadata_path + path);
|
||||
if (file.isFile())
|
||||
{
|
||||
Aws::S3::Model::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(getS3Path(path));
|
||||
throwIfError(client->DeleteObject(request));
|
||||
Metadata metadata(file);
|
||||
for (UInt32 i = 0; i < metadata.ref_count; ++i)
|
||||
{
|
||||
auto ref = metadata.references[i].first;
|
||||
|
||||
// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
|
||||
Aws::S3::Model::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(ref);
|
||||
throwIfError(client->DeleteObject(request));
|
||||
}
|
||||
}
|
||||
file.remove();
|
||||
}
|
||||
@ -310,25 +490,14 @@ void DiskS3::removeRecursive(const String & path)
|
||||
Poco::File file(metadata_path + path);
|
||||
if (file.isFile())
|
||||
{
|
||||
Aws::S3::Model::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(getS3Path(path));
|
||||
throwIfError(client->DeleteObject(request));
|
||||
remove(metadata_path + path);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
|
||||
removeRecursive(it->path());
|
||||
file.remove();
|
||||
}
|
||||
file.remove();
|
||||
}
|
||||
|
||||
String DiskS3::getS3Path(const String & path) const
|
||||
{
|
||||
if (!exists(path))
|
||||
throw Exception("File not found: " + path, ErrorCodes::FILE_DOESNT_EXIST);
|
||||
|
||||
return readKeyFromFile(metadata_path + path);
|
||||
}
|
||||
|
||||
String DiskS3::getRandomName() const
|
||||
|
@ -71,8 +71,6 @@ public:
|
||||
void removeRecursive(const String & path) override;
|
||||
|
||||
private:
|
||||
String getS3Path(const String & path) const;
|
||||
|
||||
String getRandomName() const;
|
||||
|
||||
bool tryReserve(UInt64 bytes);
|
||||
|
@ -105,7 +105,7 @@ off_t MMapReadBufferFromFileDescriptor::getPositionInFile()
|
||||
return count();
|
||||
}
|
||||
|
||||
off_t MMapReadBufferFromFileDescriptor::doSeek(off_t offset, int whence)
|
||||
off_t MMapReadBufferFromFileDescriptor::seek(off_t offset, int whence)
|
||||
{
|
||||
off_t new_pos;
|
||||
if (whence == SEEK_SET)
|
||||
|
@ -13,14 +13,14 @@ namespace DB
|
||||
*/
|
||||
class MMapReadBufferFromFileDescriptor : public ReadBufferFromFileBase
|
||||
{
|
||||
public:
|
||||
off_t seek(off_t off, int whence) override;
|
||||
|
||||
protected:
|
||||
MMapReadBufferFromFileDescriptor() {}
|
||||
|
||||
void init(int fd_, size_t offset, size_t length_);
|
||||
void init(int fd_, size_t offset);
|
||||
|
||||
off_t doSeek(off_t off, int whence) override;
|
||||
|
||||
public:
|
||||
MMapReadBufferFromFileDescriptor(int fd_, size_t offset_, size_t length_);
|
||||
|
||||
|
@ -149,7 +149,7 @@ bool ReadBufferAIO::nextImpl()
|
||||
return true;
|
||||
}
|
||||
|
||||
off_t ReadBufferAIO::doSeek(off_t off, int whence)
|
||||
off_t ReadBufferAIO::seek(off_t off, int whence)
|
||||
{
|
||||
off_t new_pos_in_file;
|
||||
|
||||
|
@ -40,11 +40,11 @@ public:
|
||||
std::string getFileName() const override { return filename; }
|
||||
int getFD() const override { return fd; }
|
||||
|
||||
off_t seek(off_t off, int whence) override;
|
||||
|
||||
private:
|
||||
///
|
||||
bool nextImpl() override;
|
||||
///
|
||||
off_t doSeek(off_t off, int whence) override;
|
||||
/// Synchronously read the data.
|
||||
void synchronousRead();
|
||||
/// Get data from an asynchronous request.
|
||||
|
@ -99,7 +99,7 @@ bool ReadBufferFromFileDescriptor::nextImpl()
|
||||
|
||||
|
||||
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
|
||||
off_t ReadBufferFromFileDescriptor::doSeek(off_t offset, int whence)
|
||||
off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
|
||||
{
|
||||
off_t new_pos;
|
||||
if (whence == SEEK_SET)
|
||||
|
@ -37,10 +37,10 @@ public:
|
||||
return pos_in_file - (working_buffer.end() - pos);
|
||||
}
|
||||
|
||||
private:
|
||||
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
|
||||
off_t doSeek(off_t offset, int whence) override;
|
||||
off_t seek(off_t off, int whence) override;
|
||||
|
||||
private:
|
||||
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
|
||||
bool poll(size_t timeout_microseconds);
|
||||
};
|
||||
|
@ -23,8 +23,7 @@ public:
|
||||
ReadBufferFromMemory(const signed char * buf, size_t size)
|
||||
: SeekableReadBuffer(const_cast<char *>(reinterpret_cast<const char *>(buf)), size, 0) {}
|
||||
|
||||
protected:
|
||||
off_t doSeek(off_t, int) override { return 0; }
|
||||
off_t seek(off_t, int) override { return 0; }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ bool ReadBufferFromS3::nextImpl()
|
||||
return true;
|
||||
}
|
||||
|
||||
off_t ReadBufferFromS3::doSeek(off_t offset_, int) {
|
||||
off_t ReadBufferFromS3::seek(off_t offset_, int) {
|
||||
if (!initialized && offset_)
|
||||
offset = offset_;
|
||||
|
||||
@ -57,11 +57,14 @@ off_t ReadBufferFromS3::doSeek(off_t offset_, int) {
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize() {
|
||||
LOG_DEBUG(log, "Read S3 object. "
|
||||
"Bucket: " + bucket + ", Key: " + key + ", Offset: " + std::to_string(offset));
|
||||
|
||||
Aws::S3::Model::GetObjectRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
if (offset != 0)
|
||||
req.SetRange(std::to_string(offset) + "-");
|
||||
req.SetRange("bytes=" + std::to_string(offset) + "-");
|
||||
|
||||
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
|
||||
|
||||
|
@ -35,9 +35,6 @@ private:
|
||||
|
||||
Logger * log = &Logger::get("ReadBufferFromS3");
|
||||
|
||||
protected:
|
||||
off_t doSeek(off_t off, int whence) override;
|
||||
|
||||
public:
|
||||
explicit ReadBufferFromS3(std::shared_ptr<Aws::S3::S3Client> client_ptr_,
|
||||
const String & bucket_,
|
||||
@ -46,6 +43,8 @@ public:
|
||||
|
||||
bool nextImpl() override;
|
||||
|
||||
off_t seek(off_t off, int whence) override;
|
||||
|
||||
private:
|
||||
std::unique_ptr<ReadBuffer> initialize();
|
||||
};
|
||||
|
@ -17,13 +17,7 @@ public:
|
||||
SeekableReadBuffer(Position ptr, size_t size, size_t offset)
|
||||
: ReadBuffer(ptr, size, offset) {}
|
||||
|
||||
off_t seek(off_t off, int whence = SEEK_SET) {
|
||||
return doSeek(off, whence);
|
||||
};
|
||||
|
||||
protected:
|
||||
/// Children implementation should be able to seek backwards
|
||||
virtual off_t doSeek(off_t off, int whence) = 0;
|
||||
virtual off_t seek(off_t off, int whence) = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ WriteBufferFromS3::WriteBufferFromS3(
|
||||
, key(key_)
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, minimum_upload_part_size {minimum_upload_part_size_}
|
||||
, temporary_buffer {std::make_unique<WriteBufferFromString>(buffer_string)}
|
||||
, temporary_buffer {std::make_unique<WriteBufferFromOwnString>()}
|
||||
, last_part_size {0}
|
||||
{
|
||||
initiate();
|
||||
@ -60,9 +60,9 @@ void WriteBufferFromS3::nextImpl()
|
||||
if (last_part_size > minimum_upload_part_size)
|
||||
{
|
||||
temporary_buffer->finalize();
|
||||
writePart(buffer_string);
|
||||
writePart(temporary_buffer->str());
|
||||
last_part_size = 0;
|
||||
temporary_buffer = std::make_unique<WriteBufferFromString>(buffer_string);
|
||||
temporary_buffer = std::make_unique<WriteBufferFromOwnString>();
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,11 +70,9 @@ void WriteBufferFromS3::nextImpl()
|
||||
void WriteBufferFromS3::finalize()
|
||||
{
|
||||
next();
|
||||
|
||||
temporary_buffer->finalize();
|
||||
if (!buffer_string.empty())
|
||||
{
|
||||
writePart(buffer_string);
|
||||
}
|
||||
writePart(temporary_buffer->str());
|
||||
|
||||
complete();
|
||||
}
|
||||
@ -104,7 +102,7 @@ void WriteBufferFromS3::initiate()
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
upload_id = outcome.GetResult().GetUploadId();
|
||||
LOG_DEBUG(log, "Multipart upload initiated. Upload id = " + upload_id);
|
||||
LOG_DEBUG(log, "Multipart upload initiated. Upload id: " << upload_id);
|
||||
}
|
||||
else
|
||||
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
@ -113,6 +111,9 @@ void WriteBufferFromS3::initiate()
|
||||
|
||||
void WriteBufferFromS3::writePart(const String & data)
|
||||
{
|
||||
if (data.empty())
|
||||
return;
|
||||
|
||||
if (part_tags.size() == S3_WARN_MAX_PARTS)
|
||||
{
|
||||
// Don't throw exception here by ourselves but leave the decision to take by S3 server.
|
||||
@ -130,11 +131,16 @@ void WriteBufferFromS3::writePart(const String & data)
|
||||
|
||||
auto outcome = client_ptr->UploadPart(req);
|
||||
|
||||
LOG_TRACE(log, "Writing part. "
|
||||
"Bucket: " << bucket << ", Key: " << key << ", Upload_id: " << upload_id << ", Data size: " << data.size());
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
auto etag = outcome.GetResult().GetETag();
|
||||
part_tags.push_back(etag);
|
||||
LOG_DEBUG(log, "Write part " + std::to_string(part_tags.size()) + " finished. Upload id = " + upload_id + ". Etag = " + etag);
|
||||
total_size += data.size();
|
||||
LOG_DEBUG(log, "Writing part finished. "
|
||||
"Total parts: " << part_tags.size() << ", Upload_id: " << upload_id << ", Etag: " << etag);
|
||||
}
|
||||
else
|
||||
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
@ -143,24 +149,30 @@ void WriteBufferFromS3::writePart(const String & data)
|
||||
|
||||
void WriteBufferFromS3::complete()
|
||||
{
|
||||
LOG_DEBUG(log, "Completing multipart upload. "
|
||||
"Bucket: " + bucket + ", Key: " + key + ", Upload_id: " + upload_id);
|
||||
|
||||
Aws::S3::Model::CompleteMultipartUploadRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
req.SetUploadId(upload_id);
|
||||
|
||||
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
|
||||
for (size_t i = 0; i < part_tags.size(); ++i)
|
||||
if (!part_tags.empty())
|
||||
{
|
||||
Aws::S3::Model::CompletedPart part;
|
||||
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1));
|
||||
}
|
||||
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
|
||||
for (size_t i = 0; i < part_tags.size(); ++i)
|
||||
{
|
||||
Aws::S3::Model::CompletedPart part;
|
||||
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1));
|
||||
}
|
||||
|
||||
req.SetMultipartUpload(multipart_upload);
|
||||
req.SetMultipartUpload(multipart_upload);
|
||||
}
|
||||
|
||||
auto outcome = client_ptr->CompleteMultipartUpload(req);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
LOG_DEBUG(log, "Multipart upload completed. Upload_id = " + upload_id);
|
||||
LOG_DEBUG(log, "Multipart upload completed. Upload_id: " << upload_id);
|
||||
else
|
||||
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
}
|
||||
|
@ -28,8 +28,7 @@ private:
|
||||
String key;
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr;
|
||||
size_t minimum_upload_part_size;
|
||||
String buffer_string;
|
||||
std::unique_ptr<WriteBufferFromString> temporary_buffer;
|
||||
std::unique_ptr<WriteBufferFromOwnString> temporary_buffer;
|
||||
size_t last_part_size;
|
||||
|
||||
/// Upload in S3 is made in parts.
|
||||
@ -39,6 +38,10 @@ private:
|
||||
|
||||
Logger * log = &Logger::get("WriteBufferFromS3");
|
||||
|
||||
protected:
|
||||
// Total size of all uploaded parts.
|
||||
size_t total_size = 0;
|
||||
|
||||
public:
|
||||
explicit WriteBufferFromS3(std::shared_ptr<Aws::S3::S3Client> client_ptr_,
|
||||
const String & bucket_,
|
||||
|
@ -85,7 +85,7 @@ private:
|
||||
compressed(*plain)
|
||||
{
|
||||
if (offset)
|
||||
plain->seek(offset);
|
||||
plain->seek(offset, SEEK_SET);
|
||||
}
|
||||
|
||||
std::unique_ptr<SeekableReadBuffer> plain;
|
||||
@ -109,7 +109,7 @@ public:
|
||||
explicit LogBlockOutputStream(StorageLog & storage_)
|
||||
: storage(storage_),
|
||||
lock(storage.rwlock),
|
||||
marks_stream(storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Append))
|
||||
marks_stream(storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite))
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -34,16 +34,26 @@ def cluster():
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_tinylog_s3(cluster):
|
||||
@pytest.mark.parametrize("log_engine,files_overhead", [("TinyLog", 1), ("Log", 2)])
|
||||
def test_log_family_s3(cluster, log_engine, files_overhead):
|
||||
node = cluster.instances["node"]
|
||||
minio = cluster.minio_client
|
||||
|
||||
node.query("CREATE TABLE s3_test (id UInt64) Engine=TinyLog")
|
||||
node.query("INSERT INTO s3_test SELECT number FROM numbers(3)")
|
||||
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n"
|
||||
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2
|
||||
node.query("INSERT INTO s3_test SELECT number + 3 FROM numbers(3)")
|
||||
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n5\n"
|
||||
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2
|
||||
node.query("DROP TABLE s3_test")
|
||||
node.query("CREATE TABLE s3_test (id UInt64) Engine={}".format(log_engine))
|
||||
|
||||
node.query("INSERT INTO s3_test SELECT number FROM numbers(5)")
|
||||
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n"
|
||||
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 1 + files_overhead
|
||||
|
||||
node.query("INSERT INTO s3_test SELECT number + 5 FROM numbers(3)")
|
||||
assert node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n"
|
||||
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2 + files_overhead
|
||||
|
||||
node.query("INSERT INTO s3_test SELECT number + 8 FROM numbers(1)")
|
||||
assert node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n8\n"
|
||||
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 3 + files_overhead
|
||||
|
||||
node.query("TRUNCATE TABLE s3_test")
|
||||
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
|
||||
|
||||
node.query("DROP TABLE s3_test")
|
Loading…
Reference in New Issue
Block a user