Merge pull request #8862 from Jokser/idisk-seekable-readbuffer

Log engine support for S3 and SeekableReadBuffer
This commit is contained in:
alexey-milovidov 2020-02-02 03:55:05 +03:00 committed by GitHub
commit 8fa7d9f04e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 865 additions and 263 deletions

View File

@ -1070,7 +1070,7 @@ try
if (!silent)
std::cerr << "Generating data\n";
file_in.seek(0);
file_in.seek(0, SEEK_SET);
BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size);
BlockOutputStreamPtr output = context.getOutputFormat(output_format, file_out, header);

View File

@ -39,7 +39,7 @@ bool CachedCompressedReadBuffer::nextImpl()
{
/// If not, read it from the file.
initInput();
file_in->seek(file_pos);
file_in->seek(file_pos, SEEK_SET);
owned_cell = std::make_shared<UncompressedCacheCell>();

View File

@ -55,7 +55,7 @@ void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t
}
else
{
file_in.seek(offset_in_compressed_file);
file_in.seek(offset_in_compressed_file, SEEK_SET);
bytes += offset();
nextImpl();

View File

@ -200,7 +200,7 @@ void DiskLocal::copyFile(const String & from_path, const String & to_path)
Poco::File(disk_path + from_path).copyTo(disk_path + to_path);
}
std::unique_ptr<ReadBuffer> DiskLocal::readFile(const String & path, size_t buf_size) const
std::unique_ptr<SeekableReadBuffer> DiskLocal::readFile(const String & path, size_t buf_size) const
{
return std::make_unique<ReadBufferFromFile>(disk_path + path, buf_size);
}

View File

@ -66,7 +66,7 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override;

View File

@ -2,6 +2,7 @@
#include "DiskFactory.h"
#include <IO/ReadBufferFromString.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h>
@ -36,6 +37,36 @@ private:
std::vector<String>::iterator iter;
};
void WriteIndirectBuffer::finalize()
{
next();
WriteBufferFromVector::finalize();
auto iter = disk->files.find(path);
if (iter == disk->files.end())
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
// Resize to the actual number of bytes written to string.
value.resize(count());
if (mode == WriteMode::Rewrite)
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, value});
else if (mode == WriteMode::Append)
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, iter->second.data + value});
}
WriteIndirectBuffer::~WriteIndirectBuffer()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
ReservationPtr DiskMemory::reserve(UInt64 /*bytes*/)
{
@ -94,7 +125,7 @@ size_t DiskMemory::getFileSize(const String & path) const
if (iter == files.end())
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
return iter->second.data.size();
return iter->second.data.length();
}
void DiskMemory::createDirectory(const String & path)
@ -218,7 +249,7 @@ void DiskMemory::copyFile(const String & /*from_path*/, const String & /*to_path
throw Exception("Method copyFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
std::unique_ptr<ReadBuffer> DiskMemory::readFile(const String & path, size_t /*buf_size*/) const
std::unique_ptr<SeekableReadBuffer> DiskMemory::readFile(const String & path, size_t /*buf_size*/) const
{
std::lock_guard lock(mutex);
@ -241,13 +272,10 @@ std::unique_ptr<WriteBuffer> DiskMemory::writeFile(const String & path, size_t /
throw Exception(
"Failed to create file '" + path + "'. Directory " + parent_path + " does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
iter = files.emplace(path, FileData{FileType::File}).first;
files.emplace(path, FileData{FileType::File});
}
if (mode == WriteMode::Append)
return std::make_unique<WriteBufferFromString>(iter->second.data, WriteBufferFromString::AppendModeTag{});
else
return std::make_unique<WriteBufferFromString>(iter->second.data);
return std::make_unique<WriteIndirectBuffer>(this, path, mode);
}
void DiskMemory::remove(const String & path)

View File

@ -1,17 +1,33 @@
#pragma once
#include <Disks/IDisk.h>
#include <mutex>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
#include <Disks/IDisk.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
class DiskMemory;
class ReadBuffer;
class WriteBuffer;
// This class is responsible to update files metadata after buffer is finalized.
class WriteIndirectBuffer : public WriteBufferFromOwnString
{
public:
WriteIndirectBuffer(DiskMemory * disk_, String path_, WriteMode mode_) : disk(disk_), path(std::move(path_)), mode(mode_) {}
~WriteIndirectBuffer() override;
void finalize() override;
private:
DiskMemory * disk;
String path;
WriteMode mode;
};
/** Implementation of Disk intended only for testing purposes.
* All filesystem objects are stored in memory and lost on server restart.
@ -60,12 +76,10 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<WriteBuffer> writeFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
WriteMode mode = WriteMode::Rewrite) override;
std::unique_ptr<WriteBuffer>
writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override;
void remove(const String & path) override;
@ -76,6 +90,8 @@ private:
void replaceFileImpl(const String & from_path, const String & to_path);
private:
friend class WriteIndirectBuffer;
enum class FileType
{
File,
@ -87,7 +103,8 @@ private:
FileType type;
String data;
explicit FileData(FileType type_) : type(type_) { }
FileData(FileType type_, String data_) : type(type_), data(std::move(data_)) {}
explicit FileData(FileType type_) : type(type_), data("") {}
};
using Files = std::unordered_map<String, FileData>; /// file path -> file data

View File

@ -4,12 +4,13 @@
# include "DiskFactory.h"
# include <random>
# include <IO/S3Common.h>
# include <IO/ReadBufferFromS3.h>
# include <IO/WriteBufferFromS3.h>
# include <utility>
# include <IO/ReadBufferFromFile.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/ReadBufferFromS3.h>
# include <IO/ReadHelpers.h>
# include <IO/S3Common.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/WriteBufferFromS3.h>
# include <IO/WriteHelpers.h>
# include <Poco/File.h>
# include <Common/checkStackSize.h>
@ -27,6 +28,9 @@ namespace ErrorCodes
extern const int FILE_ALREADY_EXISTS;
extern const int FILE_DOESNT_EXIST;
extern const int PATH_ACCESS_DENIED;
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int UNKNOWN_FORMAT;
}
namespace
@ -41,34 +45,209 @@ namespace
}
}
String readKeyFromFile(const String & path)
/**
* S3 metadata file layout:
* Number of S3 objects, Total size of all S3 objects.
* Each S3 object represents path where object located in S3 and size of object.
*/
struct Metadata
{
String key;
ReadBufferFromFile buf(path, 1024); /* reasonable buffer size for small file */
readStringUntilEOF(key, buf);
return key;
// Metadata file version.
const UInt32 VERSION = 1;
using PathAndSize = std::pair<String, size_t>;
// Path to metadata file on local FS.
String metadata_file_path;
// S3 object references count.
UInt32 s3_objects_count;
// Total size of all S3 objects.
size_t total_size;
// S3 objects paths and their sizes.
std::vector<PathAndSize> s3_objects;
explicit Metadata(const Poco::File & file) : Metadata(file.path(), false) {}
// Load metadata by path or create empty if `create` flag is set.
explicit Metadata(const String & file_path, bool create = false)
: metadata_file_path(file_path), s3_objects_count(0), total_size(0), s3_objects(0)
{
if (create)
return;
ReadBufferFromFile buf(file_path, 1024); /* reasonable buffer size for small file */
UInt32 version;
readIntText(version, buf);
if (version != VERSION)
throw Exception(
"Unknown metadata file version. Path: " + file_path + ", Version: " + std::to_string(version)
+ ", Expected version: " + std::to_string(VERSION),
ErrorCodes::UNKNOWN_FORMAT);
assertChar('\n', buf);
readIntText(s3_objects_count, buf);
assertChar('\t', buf);
readIntText(total_size, buf);
assertChar('\n', buf);
s3_objects.resize(s3_objects_count);
for (UInt32 i = 0; i < s3_objects_count; ++i)
{
String path;
size_t size;
readIntText(size, buf);
assertChar('\t', buf);
readEscapedString(path, buf);
assertChar('\n', buf);
s3_objects[i] = std::make_pair(path, size);
}
}
void writeKeyToFile(const String & key, const String & path)
void addObject(const String & path, size_t size)
{
WriteBufferFromFile buf(path, 1024);
writeString(key, buf);
buf.next();
++s3_objects_count;
total_size += size;
s3_objects.emplace_back(path, size);
}
/// Stores data in S3 and the object key in file in local filesystem.
void save()
{
WriteBufferFromFile buf(metadata_file_path, 1024);
writeIntText(VERSION, buf);
writeChar('\n', buf);
writeIntText(s3_objects_count, buf);
writeChar('\t', buf);
writeIntText(total_size, buf);
writeChar('\n', buf);
for (UInt32 i = 0; i < s3_objects_count; ++i)
{
auto path_and_size = s3_objects[i];
writeIntText(path_and_size.second, buf);
writeChar('\t', buf);
writeEscapedString(path_and_size.first, buf);
writeChar('\n', buf);
}
buf.finalize();
}
};
// Reads data from S3.
// It supports reading from multiple S3 paths that resides in Metadata.
class ReadIndirectBufferFromS3 : public BufferWithOwnMemory<SeekableReadBuffer>
{
public:
ReadIndirectBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, Metadata metadata_, size_t buf_size_)
: BufferWithOwnMemory(buf_size_)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, metadata(std::move(metadata_))
, buf_size(buf_size_)
, offset(0)
, initialized(false)
, current_buf_idx(0)
, current_buf(nullptr)
{
}
off_t seek(off_t offset_, int whence) override
{
if (initialized)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0 || metadata.total_size <= static_cast<UInt64>(offset_))
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset_) + ", Max: " + std::to_string(metadata.total_size),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
offset = offset_;
return offset;
}
private:
std::unique_ptr<ReadBufferFromS3> initialize()
{
for (UInt32 i = 0; i < metadata.s3_objects_count; ++i)
{
current_buf_idx = i;
auto path = metadata.s3_objects[i].first;
auto size = metadata.s3_objects[i].second;
if (size > offset)
{
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, path, buf_size);
buf->seek(offset, SEEK_SET);
return buf;
}
offset -= size;
}
return nullptr;
}
bool nextImpl() override
{
// Find first available buffer that fits to given offset.
if (!initialized)
{
current_buf = initialize();
initialized = true;
}
// If current buffer has remaining data - use it.
if (current_buf && current_buf->next())
{
working_buffer = current_buf->buffer();
return true;
}
// If there is no available buffers - nothing to read.
if (current_buf_idx + 1 >= metadata.s3_objects_count)
return false;
++current_buf_idx;
auto path = metadata.s3_objects[current_buf_idx].first;
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, path, buf_size);
current_buf->next();
working_buffer = current_buf->buffer();
return true;
}
private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
const String & bucket;
Metadata metadata;
size_t buf_size;
size_t offset;
bool initialized;
UInt32 current_buf_idx;
std::unique_ptr<ReadBufferFromS3> current_buf;
};
/// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS.
class WriteIndirectBufferFromS3 : public WriteBufferFromS3
{
public:
WriteIndirectBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> & client_ptr_,
const String & bucket_,
const String & metadata_path_,
Metadata metadata_,
const String & s3_path_,
size_t min_upload_part_size,
size_t buf_size_)
: WriteBufferFromS3(client_ptr_, bucket_, s3_path_, min_upload_part_size, buf_size_)
, metadata_path(metadata_path_)
, metadata(std::move(metadata_))
, s3_path(s3_path_)
{
}
@ -76,7 +255,8 @@ namespace
void finalize() override
{
WriteBufferFromS3::finalize();
writeKeyToFile(s3_path, metadata_path);
metadata.addObject(s3_path, total_size);
metadata.save();
finalized = true;
}
@ -97,8 +277,8 @@ namespace
private:
bool finalized = false;
const String metadata_path;
const String s3_path;
Metadata metadata;
String s3_path;
};
}
@ -192,19 +372,8 @@ bool DiskS3::isDirectory(const String & path) const
size_t DiskS3::getFileSize(const String & path) const
{
Aws::S3::Model::GetObjectRequest request;
request.SetBucket(bucket);
request.SetKey(getS3Path(path));
auto outcome = client->GetObject(request);
if (!outcome.IsSuccess())
{
auto & err = outcome.GetError();
throw Exception(err.GetMessage(), static_cast<int>(err.GetErrorType()));
}
else
{
return outcome.GetResult().GetContentLength();
}
Metadata metadata(metadata_path + path);
return metadata.total_size;
}
void DiskS3::createDirectory(const String & path)
@ -232,7 +401,7 @@ void DiskS3::clearDirectory(const String & path)
void DiskS3::moveFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
throw Exception("File already exists " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
Poco::File(metadata_path + from_path).renameTo(metadata_path + to_path);
}
@ -256,53 +425,88 @@ void DiskS3::copyFile(const String & from_path, const String & to_path)
if (exists(to_path))
remove(to_path);
String s3_from_path = readKeyFromFile(metadata_path + from_path);
String s3_to_path = s3_root_path + getRandomName();
Metadata from(metadata_path + from_path);
Metadata to(metadata_path + to_path, true);
for (UInt32 i = 0; i < from.s3_objects_count; ++i)
{
auto path = from.s3_objects[i].first;
auto size = from.s3_objects[i].second;
auto new_path = s3_root_path + getRandomName();
Aws::S3::Model::CopyObjectRequest req;
req.SetBucket(bucket);
req.SetCopySource(s3_from_path);
req.SetKey(s3_to_path);
req.SetCopySource(path);
req.SetKey(new_path);
throwIfError(client->CopyObject(req));
writeKeyToFile(s3_to_path, metadata_path + to_path);
to.addObject(new_path, size);
}
std::unique_ptr<ReadBuffer> DiskS3::readFile(const String & path, size_t buf_size) const
to.save();
}
std::unique_ptr<SeekableReadBuffer> DiskS3::readFile(const String & path, size_t buf_size) const
{
return std::make_unique<ReadBufferFromS3>(client, bucket, getS3Path(path), buf_size);
Metadata metadata(metadata_path + path);
LOG_DEBUG(
&Logger::get("DiskS3"),
"Read from file by path: " << backQuote(metadata_path + path) << " Existing S3 objects: " << metadata.s3_objects_count);
return std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, buf_size);
}
std::unique_ptr<WriteBuffer> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
if (!exists(path) || mode == WriteMode::Rewrite)
bool exist = exists(path);
// Reference to store new S3 object.
auto s3_path = s3_root_path + getRandomName();
if (!exist || mode == WriteMode::Rewrite)
{
String new_s3_path = s3_root_path + getRandomName();
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata_path + path, new_s3_path,
min_upload_part_size, buf_size);
// If metadata file exists - remove and create new.
if (exist)
remove(path);
Metadata metadata(metadata_path + path, true);
// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
metadata.save();
LOG_DEBUG(&Logger::get("DiskS3"), "Write to file by path: " << backQuote(metadata_path + path) << " New S3 path: " << s3_path);
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, min_upload_part_size, buf_size);
}
else
{
auto old_s3_path = getS3Path(path);
ReadBufferFromS3 read_buffer(client, bucket, old_s3_path, buf_size);
auto writeBuffer = std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata_path + path,
old_s3_path, min_upload_part_size, buf_size);
std::vector<char> buffer(buf_size);
while (!read_buffer.eof())
writeBuffer->write(buffer.data(), read_buffer.read(buffer.data(), buf_size));
return writeBuffer;
Metadata metadata(metadata_path + path);
LOG_DEBUG(
&Logger::get("DiskS3"),
"Append to file by path: " << backQuote(metadata_path + path) << " New S3 path: " << s3_path
<< " Existing S3 objects: " << metadata.s3_objects_count);
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, min_upload_part_size, buf_size);
}
}
void DiskS3::remove(const String & path)
{
LOG_DEBUG(&Logger::get("DiskS3"), "Remove file by path: " << backQuote(metadata_path + path));
Poco::File file(metadata_path + path);
if (file.isFile())
{
Metadata metadata(file);
for (UInt32 i = 0; i < metadata.s3_objects_count; ++i)
{
auto s3_path = metadata.s3_objects[i].first;
// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(getS3Path(path));
request.SetKey(s3_path);
throwIfError(client->DeleteObject(request));
}
}
file.remove();
}
@ -313,25 +517,14 @@ void DiskS3::removeRecursive(const String & path)
Poco::File file(metadata_path + path);
if (file.isFile())
{
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(getS3Path(path));
throwIfError(client->DeleteObject(request));
remove(metadata_path + path);
}
else
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
removeRecursive(it->path());
}
file.remove();
}
String DiskS3::getS3Path(const String & path) const
{
if (!exists(path))
throw Exception("File not found: " + path, ErrorCodes::FILE_DOESNT_EXIST);
return readKeyFromFile(metadata_path + path);
}
String DiskS3::getRandomName() const
@ -395,6 +588,26 @@ DiskS3Reservation::~DiskS3Reservation()
}
}
inline void checkWriteAccess(std::shared_ptr<DiskS3> & disk)
{
auto file = disk->writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write("test", 4);
}
inline void checkReadAccess(const String & disk_name, std::shared_ptr<DiskS3> & disk)
{
auto file = disk->readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read access to S3 bucket in disk " + disk_name, ErrorCodes::PATH_ACCESS_DENIED);
}
inline void checkRemoveAccess(std::shared_ptr<DiskS3> & disk)
{
disk->remove("test_acl");
}
void registerDiskS3(DiskFactory & factory)
{
auto creator = [](const String & name,
@ -419,21 +632,9 @@ void registerDiskS3(DiskFactory & factory)
context.getSettingsRef().s3_min_upload_part_size);
/// This code is used only to check access to the corresponding disk.
{
auto file = s3disk->writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write("test", 4);
}
{
auto file = s3disk->readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read accecss to S3 bucket in disk " + name, ErrorCodes::PATH_ACCESS_DENIED);
}
{
s3disk->remove("test_acl");
}
checkWriteAccess(s3disk);
checkReadAccess(name, s3disk);
checkRemoveAccess(s3disk);
return s3disk;
};

View File

@ -62,7 +62,7 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBuffer> readFile(const String & path, size_t buf_size) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & path, size_t buf_size) const override;
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
@ -71,8 +71,6 @@ public:
void removeRecursive(const String & path) override;
private:
String getS3Path(const String & path) const;
String getRandomName() const;
bool tryReserve(UInt64 bytes);

View File

@ -25,7 +25,7 @@ using DiskDirectoryIteratorPtr = std::unique_ptr<IDiskDirectoryIterator>;
class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
class ReadBuffer;
class SeekableReadBuffer;
class WriteBuffer;
/**
@ -121,8 +121,8 @@ public:
/// Copy the file from `from_path` to `to_path`.
virtual void copyFile(const String & from_path, const String & to_path) = 0;
/// Open the file for read and return ReadBuffer object.
virtual std::unique_ptr<ReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const = 0;
/// Open the file for read and return SeekableReadBuffer object.
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const = 0;
/// Open the file for write and return WriteBuffer object.
virtual std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) = 0;

View File

@ -1,9 +1,8 @@
#include <gtest/gtest.h>
#include <Disks/DiskLocal.h>
#include <Disks/DiskMemory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include "gtest_disk.h"
#if !__clang__
# pragma GCC diagnostic push
@ -33,6 +32,12 @@ void destroyDisk(DB::DiskPtr & disk)
disk.reset();
}
template <>
void destroyDisk<DB::DiskMemory>(DB::DiskPtr & disk)
{
disk.reset();
}
template <>
void destroyDisk<DB::DiskLocal>(DB::DiskPtr & disk)
{
@ -92,6 +97,41 @@ TYPED_TEST(DiskTest, writeFile)
}
TYPED_TEST(DiskTest, readFile)
{
const auto & disk = this->getDisk();
{
std::unique_ptr<DB::WriteBuffer> out = disk->writeFile("test_file");
writeString("test data", *out);
}
// Test SEEK_SET
{
DB::String data;
std::unique_ptr<DB::SeekableReadBuffer> in = disk->readFile("test_file");
in->seek(5, SEEK_SET);
readString(data, *in);
EXPECT_EQ("data", data);
}
// Test SEEK_CUR
{
std::unique_ptr<DB::SeekableReadBuffer> in = disk->readFile("test_file");
String buf(4, '0');
in->readStrict(buf.data(), 4);
EXPECT_EQ("test", buf);
// Skip whitespace
in->seek(1, SEEK_CUR);
in->readStrict(buf.data(), 4);
EXPECT_EQ("data", buf);
}
}
TYPED_TEST(DiskTest, iterateDirectory)
{
const auto & disk = this->getDisk();

View File

@ -0,0 +1,21 @@
#include <Disks/DiskLocal.h>
#include <Disks/DiskMemory.h>
#include <Disks/IDisk.h>
template <typename T>
DB::DiskPtr createDisk();
template <>
DB::DiskPtr createDisk<DB::DiskMemory>();
template <>
DB::DiskPtr createDisk<DB::DiskLocal>();
template <typename T>
void destroyDisk(DB::DiskPtr & disk);
template <>
void destroyDisk<DB::DiskLocal>(DB::DiskPtr & disk);
template <>
void destroyDisk<DB::DiskMemory>(DB::DiskPtr & disk);

View File

@ -105,7 +105,7 @@ off_t MMapReadBufferFromFileDescriptor::getPositionInFile()
return count();
}
off_t MMapReadBufferFromFileDescriptor::doSeek(off_t offset, int whence)
off_t MMapReadBufferFromFileDescriptor::seek(off_t offset, int whence)
{
off_t new_pos;
if (whence == SEEK_SET)

View File

@ -13,14 +13,14 @@ namespace DB
*/
class MMapReadBufferFromFileDescriptor : public ReadBufferFromFileBase
{
public:
off_t seek(off_t off, int whence) override;
protected:
MMapReadBufferFromFileDescriptor() {}
void init(int fd_, size_t offset, size_t length_);
void init(int fd_, size_t offset);
off_t doSeek(off_t off, int whence) override;
public:
MMapReadBufferFromFileDescriptor(int fd_, size_t offset_, size_t length_);

View File

@ -149,7 +149,7 @@ bool ReadBufferAIO::nextImpl()
return true;
}
off_t ReadBufferAIO::doSeek(off_t off, int whence)
off_t ReadBufferAIO::seek(off_t off, int whence)
{
off_t new_pos_in_file;

View File

@ -40,11 +40,11 @@ public:
std::string getFileName() const override { return filename; }
int getFD() const override { return fd; }
off_t seek(off_t off, int whence) override;
private:
///
bool nextImpl() override;
///
off_t doSeek(off_t off, int whence) override;
/// Synchronously read the data.
void synchronousRead();
/// Get data from an asynchronous request.

View File

@ -3,13 +3,12 @@
namespace DB
{
ReadBufferFromFileBase::ReadBufferFromFileBase()
: BufferWithOwnMemory<ReadBuffer>(0)
ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory<SeekableReadBuffer>(0)
{
}
ReadBufferFromFileBase::ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
: BufferWithOwnMemory<SeekableReadBuffer>(buf_size, existing_memory, alignment)
{
}
@ -17,9 +16,4 @@ ReadBufferFromFileBase::~ReadBufferFromFileBase()
{
}
off_t ReadBufferFromFileBase::seek(off_t off, int whence)
{
return doSeek(off, whence);
}
}

View File

@ -7,18 +7,18 @@
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <port/clock.h>
#include "SeekableReadBuffer.h"
namespace DB
{
class ReadBufferFromFileBase : public BufferWithOwnMemory<ReadBuffer>
class ReadBufferFromFileBase : public BufferWithOwnMemory<SeekableReadBuffer>
{
public:
ReadBufferFromFileBase();
ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
ReadBufferFromFileBase(ReadBufferFromFileBase &&) = default;
~ReadBufferFromFileBase() override;
off_t seek(off_t off, int whence = SEEK_SET);
virtual off_t getPositionInFile() = 0;
virtual std::string getFileName() const = 0;
virtual int getFD() const = 0;
@ -44,8 +44,6 @@ protected:
ProfileCallback profile_callback;
clockid_t clock_type{};
/// Children implementation should be able to seek backwards
virtual off_t doSeek(off_t off, int whence) = 0;
};
}

View File

@ -99,7 +99,7 @@ bool ReadBufferFromFileDescriptor::nextImpl()
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t ReadBufferFromFileDescriptor::doSeek(off_t offset, int whence)
off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
{
off_t new_pos;
if (whence == SEEK_SET)

View File

@ -37,10 +37,10 @@ public:
return pos_in_file - (working_buffer.end() - pos);
}
private:
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t doSeek(off_t offset, int whence) override;
off_t seek(off_t off, int whence) override;
private:
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
bool poll(size_t timeout_microseconds);
};

View File

@ -0,0 +1,46 @@
#include "ReadBufferFromMemory.h"
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
off_t ReadBufferFromMemory::seek(off_t offset, int whence)
{
if (whence == SEEK_SET)
{
if (offset >= 0 && working_buffer.begin() + offset < working_buffer.end())
{
pos = working_buffer.begin() + offset;
return size_t(pos - working_buffer.begin());
}
else
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
else if (whence == SEEK_CUR)
{
Position new_pos = pos + offset;
if (new_pos >= working_buffer.begin() && new_pos < working_buffer.end())
{
pos = new_pos;
return size_t(pos - working_buffer.begin());
}
else
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
else
throw Exception("Only SEEK_SET and SEEK_CUR seek modes allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
}
}

View File

@ -1,26 +1,30 @@
#pragma once
#include <IO/ReadBuffer.h>
#include "SeekableReadBuffer.h"
namespace DB
{
/** Allows to read from memory range.
* In comparison with just ReadBuffer, it only adds convenient constructors, that do const_cast.
* In fact, ReadBuffer will not modify data in buffer, but it requires non-const pointer.
*/
class ReadBufferFromMemory : public ReadBuffer
class ReadBufferFromMemory : public SeekableReadBuffer
{
public:
ReadBufferFromMemory(const char * buf, size_t size)
: ReadBuffer(const_cast<char *>(buf), size, 0) {}
ReadBufferFromMemory(const char * buf, size_t size) : SeekableReadBuffer(const_cast<char *>(buf), size, 0) {}
ReadBufferFromMemory(const unsigned char * buf, size_t size)
: ReadBuffer(const_cast<char *>(reinterpret_cast<const char *>(buf)), size, 0) {}
: SeekableReadBuffer(const_cast<char *>(reinterpret_cast<const char *>(buf)), size, 0)
{
}
ReadBufferFromMemory(const signed char * buf, size_t size)
: ReadBuffer(const_cast<char *>(reinterpret_cast<const char *>(buf)), size, 0) {}
: SeekableReadBuffer(const_cast<char *>(reinterpret_cast<const char *>(buf)), size, 0)
{
}
off_t seek(off_t off, int whence) override;
};
}

View File

@ -2,44 +2,39 @@
#if USE_AWS_S3
#include <IO/ReadBufferFromS3.h>
# include <IO/ReadBufferFromIStream.h>
# include <IO/ReadBufferFromS3.h>
#include <common/logger_useful.h>
#include <aws/s3/model/GetObjectRequest.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/GetObjectRequest.h>
# include <common/logger_useful.h>
# include <utility>
namespace DB
{
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
ReadBufferFromS3::ReadBufferFromS3(const std::shared_ptr<Aws::S3::S3Client> & client_ptr,
const String & bucket,
const String & key,
size_t buffer_size_): ReadBuffer(nullptr, 0)
ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_, size_t buffer_size_)
: SeekableReadBuffer(nullptr, 0), client_ptr(std::move(client_ptr_)), bucket(bucket_), key(key_), buffer_size(buffer_size_)
{
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
if (outcome.IsSuccess())
{
read_result = outcome.GetResultWithOwnership();
impl = std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size_);
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
bool ReadBufferFromS3::nextImpl()
{
if (!initialized)
{
impl = initialize();
initialized = true;
}
if (!impl->next())
return false;
internal_buffer = impl->buffer();
@ -47,6 +42,43 @@ bool ReadBufferFromS3::nextImpl()
return true;
}
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
{
if (initialized)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
offset = offset_;
return offset;
}
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
LOG_TRACE(log, "Read S3 object. Bucket: " + bucket + ", Key: " + key + ", Offset: " + std::to_string(offset));
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (offset != 0)
req.SetRange("bytes=" + std::to_string(offset) + "-");
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
if (outcome.IsSuccess())
{
read_result = outcome.GetResultWithOwnership();
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size);
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
}
#endif

View File

@ -9,6 +9,7 @@
# include <IO/HTTPCommon.h>
# include <IO/ReadBuffer.h>
# include <aws/s3/model/GetObjectResult.h>
# include "SeekableReadBuffer.h"
namespace Aws::S3
{
@ -17,24 +18,36 @@ namespace Aws::S3
namespace DB
{
/** Perform S3 HTTP GET request and provide response to read.
/**
* Perform S3 HTTP GET request and provide response to read.
*/
class ReadBufferFromS3 : public ReadBuffer
class ReadBufferFromS3 : public SeekableReadBuffer
{
private:
Logger * log = &Logger::get("ReadBufferFromS3");
std::shared_ptr<Aws::S3::S3Client> client_ptr;
String bucket;
String key;
size_t buffer_size;
bool initialized = false;
off_t offset = 0;
Aws::S3::Model::GetObjectResult read_result;
protected:
std::unique_ptr<ReadBuffer> impl;
Logger * log = &Logger::get("ReadBufferFromS3");
public:
explicit ReadBufferFromS3(const std::shared_ptr<Aws::S3::S3Client> & client_ptr,
const String & bucket,
const String & key,
explicit ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
bool nextImpl() override;
off_t seek(off_t off, int whence) override;
private:
std::unique_ptr<ReadBuffer> initialize();
};
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <IO/ReadBuffer.h>
namespace DB
{
class SeekableReadBuffer : public ReadBuffer
{
public:
SeekableReadBuffer(Position ptr, size_t size)
: ReadBuffer(ptr, size) {}
SeekableReadBuffer(Position ptr, size_t size, size_t offset)
: ReadBuffer(ptr, size, offset) {}
/**
* Shifts buffer current position to given offset.
* @param off Offset.
* @param whence Seek mode (@see SEEK_SET, @see SEEK_CUR).
* @return New position from the begging of underlying buffer / file.
*/
virtual off_t seek(off_t off, int whence) = 0;
};
}

View File

@ -5,18 +5,17 @@
# include <IO/WriteBufferFromS3.h>
# include <IO/WriteHelpers.h>
#include <common/logger_useful.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/CompleteMultipartUploadRequest.h>
# include <aws/s3/model/CreateMultipartUploadRequest.h>
# include <aws/s3/model/UploadPartRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
# include <common/logger_useful.h>
# include <utility>
namespace DB
{
// S3 protocol does not allow to have multipart upload with more than 10000 parts.
// In case server does not return an error on exceeding that number, we print a warning
// because custom S3 implementation may allow relaxed requirements on that.
@ -34,14 +33,13 @@ WriteBufferFromS3::WriteBufferFromS3(
const String & bucket_,
const String & key_,
size_t minimum_upload_part_size_,
size_t buffer_size_
)
size_t buffer_size_)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, bucket(bucket_)
, key(key_)
, client_ptr(std::move(client_ptr_))
, minimum_upload_part_size{minimum_upload_part_size_}
, temporary_buffer {std::make_unique<WriteBufferFromString>(buffer_string)}
, temporary_buffer{std::make_unique<WriteBufferFromOwnString>()}
, last_part_size{0}
{
initiate();
@ -60,9 +58,9 @@ void WriteBufferFromS3::nextImpl()
if (last_part_size > minimum_upload_part_size)
{
temporary_buffer->finalize();
writePart(buffer_string);
writePart(temporary_buffer->str());
last_part_size = 0;
temporary_buffer = std::make_unique<WriteBufferFromString>(buffer_string);
temporary_buffer = std::make_unique<WriteBufferFromOwnString>();
}
}
@ -70,11 +68,9 @@ void WriteBufferFromS3::nextImpl()
void WriteBufferFromS3::finalize()
{
next();
temporary_buffer->finalize();
if (!buffer_string.empty())
{
writePart(buffer_string);
}
writePart(temporary_buffer->str());
complete();
}
@ -104,7 +100,7 @@ void WriteBufferFromS3::initiate()
if (outcome.IsSuccess())
{
upload_id = outcome.GetResult().GetUploadId();
LOG_DEBUG(log, "Multipart upload initiated. Upload id = " + upload_id);
LOG_DEBUG(log, "Multipart upload initiated. Upload id: " << upload_id);
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
@ -113,6 +109,9 @@ void WriteBufferFromS3::initiate()
void WriteBufferFromS3::writePart(const String & data)
{
if (data.empty())
return;
if (part_tags.size() == S3_WARN_MAX_PARTS)
{
// Don't throw exception here by ourselves but leave the decision to take by S3 server.
@ -130,11 +129,18 @@ void WriteBufferFromS3::writePart(const String & data)
auto outcome = client_ptr->UploadPart(req);
LOG_TRACE(
log, "Writing part. Bucket: " << bucket << ", Key: " << key << ", Upload_id: " << upload_id << ", Data size: " << data.size());
if (outcome.IsSuccess())
{
auto etag = outcome.GetResult().GetETag();
part_tags.push_back(etag);
LOG_DEBUG(log, "Write part " + std::to_string(part_tags.size()) + " finished. Upload id = " + upload_id + ". Etag = " + etag);
total_size += data.size();
LOG_DEBUG(
log,
"Writing part finished. "
<< "Total parts: " << part_tags.size() << ", Upload_id: " << upload_id << ", Etag: " << etag);
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
@ -143,11 +149,15 @@ void WriteBufferFromS3::writePart(const String & data)
void WriteBufferFromS3::complete()
{
LOG_DEBUG(log, "Completing multipart upload. Bucket: " + bucket + ", Key: " + key + ", Upload_id: " + upload_id);
Aws::S3::Model::CompleteMultipartUploadRequest req;
req.SetBucket(bucket);
req.SetKey(key);
req.SetUploadId(upload_id);
if (!part_tags.empty())
{
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
for (size_t i = 0; i < part_tags.size(); ++i)
{
@ -156,11 +166,12 @@ void WriteBufferFromS3::complete()
}
req.SetMultipartUpload(multipart_upload);
}
auto outcome = client_ptr->CompleteMultipartUpload(req);
if (outcome.IsSuccess())
LOG_DEBUG(log, "Multipart upload completed. Upload_id = " + upload_id);
LOG_DEBUG(log, "Multipart upload completed. Upload_id: " << upload_id);
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}

View File

@ -7,8 +7,8 @@
# include <memory>
# include <vector>
# include <Core/Types.h>
#include <IO/HTTPCommon.h>
# include <IO/BufferWithOwnMemory.h>
# include <IO/HTTPCommon.h>
# include <IO/WriteBuffer.h>
# include <IO/WriteBufferFromString.h>
@ -28,8 +28,7 @@ private:
String key;
std::shared_ptr<Aws::S3::S3Client> client_ptr;
size_t minimum_upload_part_size;
String buffer_string;
std::unique_ptr<WriteBufferFromString> temporary_buffer;
std::unique_ptr<WriteBufferFromOwnString> temporary_buffer;
size_t last_part_size;
/// Upload in S3 is made in parts.
@ -39,8 +38,13 @@ private:
Logger * log = &Logger::get("WriteBufferFromS3");
protected:
// Total size of all uploaded parts.
size_t total_size = 0;
public:
explicit WriteBufferFromS3(std::shared_ptr<Aws::S3::S3Client> client_ptr_,
explicit WriteBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t minimum_upload_part_size_,

View File

@ -50,7 +50,7 @@ TEST(ReadBufferAIOTest, TestReadAfterAIO)
EXPECT_TRUE(testbuf.eof());
testbuf.seek(data.length() - 100);
testbuf.seek(data.length() - 100, SEEK_SET);
std::string smalldata;
smalldata.resize(100);
@ -59,7 +59,7 @@ TEST(ReadBufferAIOTest, TestReadAfterAIO)
EXPECT_TRUE(testbuf.eof());
testbuf.seek(0);
testbuf.seek(0, SEEK_SET);
std::string repeatdata;
repeatdata.resize(data.length());
size_t read_after_eof_big = testbuf.read(repeatdata.data(), repeatdata.size());

View File

@ -5,8 +5,6 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadHelpers.h>
@ -83,14 +81,14 @@ private:
struct Stream
{
Stream(const DiskPtr & disk, const String & data_path, size_t offset, size_t max_read_buffer_size_)
: plain(fullPath(disk, data_path), std::min(max_read_buffer_size_, disk->getFileSize(data_path))),
compressed(plain)
: plain(disk->readFile(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path)))),
compressed(*plain)
{
if (offset)
plain.seek(offset);
plain->seek(offset, SEEK_SET);
}
ReadBufferFromFile plain;
std::unique_ptr<SeekableReadBuffer> plain;
CompressedReadBuffer compressed;
};
@ -111,7 +109,7 @@ public:
explicit LogBlockOutputStream(StorageLog & storage_)
: storage(storage_),
lock(storage.rwlock),
marks_stream(fullPath(storage.disk, storage.marks_file_path), 4096, O_APPEND | O_CREAT | O_WRONLY)
marks_stream(storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite))
{
}
@ -139,13 +137,13 @@ private:
struct Stream
{
Stream(const DiskPtr & disk, const String & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) :
plain(fullPath(disk, data_path), max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain, std::move(codec), max_compress_block_size),
plain(disk->writeFile(data_path, max_compress_block_size, WriteMode::Append)),
compressed(*plain, std::move(codec), max_compress_block_size),
plain_offset(disk->getFileSize(data_path))
{
}
WriteBufferFromFile plain;
std::unique_ptr<WriteBuffer> plain;
CompressedWriteBuffer compressed;
size_t plain_offset; /// How many bytes were in the file at the time the LogBlockOutputStream was created.
@ -153,7 +151,7 @@ private:
void finalize()
{
compressed.next();
plain.next();
plain->next();
}
};
@ -165,7 +163,7 @@ private:
using WrittenStreams = std::set<String>;
WriteBufferFromFile marks_stream; /// Declared below `lock` to make the file open when rwlock is captured.
std::unique_ptr<WriteBuffer> marks_stream; /// Declared below `lock` to make the file open when rwlock is captured.
using SerializeState = IDataType::SerializeBinaryBulkStatePtr;
using SerializeStates = std::map<String, SerializeState>;
@ -302,7 +300,7 @@ void LogBlockOutputStream::writeSuffix()
}
/// Finish write.
marks_stream.next();
marks_stream->next();
for (auto & name_stream : streams)
name_stream.second.finalize();
@ -372,7 +370,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
Mark mark;
mark.rows = (file.marks.empty() ? 0 : file.marks.back().rows) + column.size();
mark.offset = stream_it->second.plain_offset + stream_it->second.plain.count();
mark.offset = stream_it->second.plain_offset + stream_it->second.plain->count();
out_marks.emplace_back(file.column_index, mark);
}, settings.path);
@ -402,8 +400,8 @@ void LogBlockOutputStream::writeMarks(MarksForColumns && marks)
for (const auto & mark : marks)
{
writeIntBinary(mark.second.rows, marks_stream);
writeIntBinary(mark.second.offset, marks_stream);
writeIntBinary(mark.second.rows, *marks_stream);
writeIntBinary(mark.second.offset, *marks_stream);
size_t column_index = mark.first;
storage.files[storage.column_names_by_idx[column_index]].marks.push_back(mark.second);

View File

@ -0,0 +1,162 @@
#include <gtest/gtest.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <Disks/tests/gtest_disk.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromOStream.h>
#include <Interpreters/Context.h>
#include <Storages/StorageLog.h>
#include <Common/typeid_cast.h>
#include <memory>
#if !__clang__
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wsuggest-override"
#endif
DB::Context createContext()
{
auto context = DB::Context::createGlobal();
context.makeGlobalContext();
context.setPath("./");
return context;
}
DB::StoragePtr createStorage(DB::DiskPtr & disk)
{
using namespace DB;
NamesAndTypesList names_and_types;
names_and_types.emplace_back("a", std::make_shared<DataTypeUInt64>());
StoragePtr table = StorageLog::create(
disk, "table/", StorageID("test", "test"), ColumnsDescription{names_and_types}, ConstraintsDescription{}, 1048576);
table->startup();
return table;
}
std::unique_ptr<DB::Context> context;
template <typename T>
class StorageLogTest : public testing::Test
{
public:
static void SetUpTestSuite()
{
// Create context only once.
if (!context)
context = std::make_unique<DB::Context>(createContext());
}
void SetUp() override
{
disk_ = createDisk<T>();
table_ = createStorage(disk_);
}
void TearDown() override
{
table_->shutdown();
destroyDisk<T>(disk_);
}
const DB::DiskPtr & getDisk() { return disk_; }
DB::StoragePtr & getTable() { return table_; }
private:
DB::DiskPtr disk_;
DB::StoragePtr table_;
};
typedef testing::Types<DB::DiskMemory, DB::DiskLocal> DiskImplementations;
TYPED_TEST_SUITE(StorageLogTest, DiskImplementations);
// Returns data written to table in Values format.
std::string writeData(int rows, DB::StoragePtr & table)
{
using namespace DB;
std::string data;
Block block;
{
ColumnWithTypeAndName column;
column.name = "a";
column.type = table->getColumn("a").type;
auto col = column.type->createColumn();
ColumnUInt64::Container & vec = typeid_cast<ColumnUInt64 &>(*col).getData();
vec.resize(rows);
for (size_t i = 0; i < rows; ++i)
{
vec[i] = i;
if (i > 0)
data += ",";
data += "(" + std::to_string(i) + ")";
}
column.column = std::move(col);
block.insert(column);
}
BlockOutputStreamPtr out = table->write({}, *context);
out->write(block);
return data;
}
// Returns all table data in Values format.
std::string readData(DB::StoragePtr & table)
{
using namespace DB;
Names column_names;
column_names.push_back("a");
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(*context);
BlockInputStreamPtr in = table->read(column_names, {}, *context, stage, 8192, 1)[0];
Block sample;
{
ColumnWithTypeAndName col;
col.type = std::make_shared<DataTypeUInt64>();
sample.insert(std::move(col));
}
std::ostringstream ss;
WriteBufferFromOStream out_buf(ss);
BlockOutputStreamPtr output = FormatFactory::instance().getOutput("Values", out_buf, sample, *context);
copyData(*in, *output);
output->flush();
return ss.str();
}
TYPED_TEST(StorageLogTest, testReadWrite)
{
using namespace DB;
std::string data;
// Write several chunks of data.
data += writeData(10, this->getTable());
data += ",";
data += writeData(20, this->getTable());
data += ",";
data += writeData(10, this->getTable());
ASSERT_EQ(data, readData(this->getTable()));
}

View File

@ -30,7 +30,7 @@ try
context.setPath("./");
DiskPtr disk = std::make_unique<DiskLocal>("default", "./", 0);
StoragePtr table = StorageLog::create(disk, "./", StorageID("test", "test"), ColumnsDescription{names_and_types}, ConstraintsDescription{}, 1048576);
StoragePtr table = StorageLog::create(disk, "table/", StorageID("test", "test"), ColumnsDescription{names_and_types}, ConstraintsDescription{}, 1048576);
table->startup();

View File

@ -34,16 +34,26 @@ def cluster():
cluster.shutdown()
def test_tinylog_s3(cluster):
@pytest.mark.parametrize("log_engine,files_overhead", [("TinyLog", 1), ("Log", 2)])
def test_log_family_s3(cluster, log_engine, files_overhead):
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("CREATE TABLE s3_test (id UInt64) Engine=TinyLog")
node.query("INSERT INTO s3_test SELECT number FROM numbers(3)")
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2
node.query("INSERT INTO s3_test SELECT number + 3 FROM numbers(3)")
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n5\n"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2
node.query("DROP TABLE s3_test")
node.query("CREATE TABLE s3_test (id UInt64) Engine={}".format(log_engine))
node.query("INSERT INTO s3_test SELECT number FROM numbers(5)")
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 1 + files_overhead
node.query("INSERT INTO s3_test SELECT number + 5 FROM numbers(3)")
assert node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2 + files_overhead
node.query("INSERT INTO s3_test SELECT number + 8 FROM numbers(1)")
assert node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n8\n"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 3 + files_overhead
node.query("TRUNCATE TABLE s3_test")
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
node.query("DROP TABLE s3_test")

View File

@ -65,7 +65,7 @@ void checkCompressedHeaders(const std::string & mrk_path, const std::string & bi
out << "Mark " << mark_num << ", points to " << offset_in_compressed_file << ", " << offset_in_decompressed_block << ". ";
bin_in.seek(offset_in_compressed_file);
bin_in.seek(offset_in_compressed_file, SEEK_SET);
auto sizes = stat(bin_in, out);
out << "Block sizes: " << sizes.first << ", " << sizes.second << '\n' << DB::flush;