SeekableReadBuffer initial implementation. IDisk readFile should return SeekableBuffer.

This commit is contained in:
Pavel Kovalenko 2020-01-22 19:17:25 +03:00 committed by Pavel Kovalenko
parent c28e224e24
commit dce424fe11
15 changed files with 152 additions and 83 deletions

View File

@ -25,8 +25,8 @@
<logger>
<!-- Possible levels: https://github.com/pocoproject/poco/blob/develop/Foundation/include/Poco/Logger.h#L105 -->
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<log>/home/jokserfn/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/home/jokserfn/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<!-- <console>1</console> --> <!-- Default behavior is autodetection (log to console if not daemon mode and is tty) -->
@ -127,13 +127,13 @@
<!-- Path to data directory, with trailing slash. -->
<path>/var/lib/clickhouse/</path>
<path>/home/jokserfn/var/lib/clickhouse/</path>
<!-- Path to temporary data for processing hard queries. -->
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
<tmp_path>/home/jokserfn/var/lib/clickhouse/tmp/</tmp_path>
<!-- Directory with user provided files that are accessible by 'file' table function. -->
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
<user_files_path>/home/jokserfn/var/lib/clickhouse/user_files/</user_files_path>
<!-- Path to configuration file with users, access rights, profiles of settings, quotas. -->
<users_config>users.xml</users_config>
@ -327,44 +327,44 @@
-->
<!-- Query log. Used only for queries with setting log_queries = 1. -->
<query_log>
<!-- What table to insert data. If table is not exist, it will be created.
<!-- <query_log>
&lt;!&ndash; What table to insert data. If table is not exist, it will be created.
When query log structure is changed after system update,
then old table will be renamed and new table will be created automatically.
-->
&ndash;&gt;
<database>system</database>
<table>query_log</table>
<!--
&lt;!&ndash;
PARTITION BY expr https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
Example:
event_date
toMonday(event_date)
toYYYYMM(event_date)
toStartOfHour(event_time)
-->
&ndash;&gt;
<partition_by>toYYYYMM(event_date)</partition_by>
<!-- Interval of flushing data. -->
&lt;!&ndash; Interval of flushing data. &ndash;&gt;
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_log>
</query_log>-->
<!-- Trace log. Stores stack traces collected by query profilers.
See query_profiler_real_time_period_ns and query_profiler_cpu_time_period_ns settings. -->
<trace_log>
<!-- <trace_log>
<database>system</database>
<table>trace_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</trace_log>
</trace_log>-->
<!-- Query thread log. Has information about all threads participated in query execution.
Used only for queries with setting log_query_threads = 1. -->
<query_thread_log>
<!-- <query_thread_log>
<database>system</database>
<table>query_thread_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_thread_log>
</query_thread_log>-->
<!-- Uncomment if use part log.
Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads).
@ -489,7 +489,7 @@
<!-- Directory in <clickhouse-path> containing schema files for various input formats.
The directory will be created if it doesn't exist.
-->
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
<format_schema_path>/home/jokserfn/var/lib/clickhouse/format_schemas/</format_schema_path>
<!-- Uncomment to use query masking rules.

View File

@ -200,7 +200,7 @@ void DiskLocal::copyFile(const String & from_path, const String & to_path)
Poco::File(disk_path + from_path).copyTo(disk_path + to_path);
}
std::unique_ptr<ReadBuffer> DiskLocal::readFile(const String & path, size_t buf_size) const
std::unique_ptr<SeekableReadBuffer> DiskLocal::readFile(const String & path, size_t buf_size) const
{
return std::make_unique<ReadBufferFromFile>(disk_path + path, buf_size);
}

View File

@ -66,7 +66,7 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override;

View File

@ -4,6 +4,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h>
#include <IO/SeekableReadBuffer.h>
namespace DB
@ -218,7 +219,7 @@ void DiskMemory::copyFile(const String & /*from_path*/, const String & /*to_path
throw Exception("Method copyFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
std::unique_ptr<ReadBuffer> DiskMemory::readFile(const String & path, size_t /*buf_size*/) const
std::unique_ptr<SeekableReadBuffer> DiskMemory::readFile(const String & path, size_t /*buf_size*/) const
{
std::lock_guard lock(mutex);

View File

@ -60,7 +60,8 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<SeekableReadBuffer> readFile(
const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<WriteBuffer> writeFile(
const String & path,

View File

@ -189,6 +189,7 @@ bool DiskS3::isDirectory(const String & path) const
size_t DiskS3::getFileSize(const String & path) const
{
// TODO: Consider storing actual file size in meta file.
Aws::S3::Model::GetObjectRequest request;
request.SetBucket(bucket);
request.SetKey(getS3Path(path));
@ -264,13 +265,14 @@ void DiskS3::copyFile(const String & from_path, const String & to_path)
writeKeyToFile(s3_to_path, metadata_path + to_path);
}
std::unique_ptr<ReadBuffer> DiskS3::readFile(const String & path, size_t buf_size) const
std::unique_ptr<SeekableReadBuffer> DiskS3::readFile(const String & path, size_t buf_size) const
{
return std::make_unique<ReadBufferFromS3>(client, bucket, getS3Path(path), buf_size);
}
std::unique_ptr<WriteBuffer> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
// TODO: Optimize append mode. Consider storing several S3 references in one meta file.
if (!exists(path) || mode == WriteMode::Rewrite)
{
String new_s3_path = s3_root_path + getRandomName();

View File

@ -7,6 +7,7 @@
# include <aws/s3/S3Client.h>
# include <Poco/DirectoryIterator.h>
# include <IO/SeekableReadBuffer.h>
namespace DB
@ -61,7 +62,7 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBuffer> readFile(const String & path, size_t buf_size) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & path, size_t buf_size) const override;
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size, WriteMode mode) override;

View File

@ -26,6 +26,7 @@ class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
class ReadBuffer;
class SeekableReadBuffer;
class WriteBuffer;
/**
@ -121,8 +122,8 @@ public:
/// Copy the file from `from_path` to `to_path`.
virtual void copyFile(const String & from_path, const String & to_path) = 0;
/// Open the file for read and return ReadBuffer object.
virtual std::unique_ptr<ReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const = 0;
/// Open the file for read and return SeekableReadBuffer object.
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const = 0;
/// Open the file for write and return WriteBuffer object.
virtual std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) = 0;

View File

@ -3,13 +3,12 @@
namespace DB
{
ReadBufferFromFileBase::ReadBufferFromFileBase()
: BufferWithOwnMemory<ReadBuffer>(0)
ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory<SeekableReadBuffer>(0)
{
}
ReadBufferFromFileBase::ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
: BufferWithOwnMemory<SeekableReadBuffer>(buf_size, existing_memory, alignment)
{
}
@ -17,9 +16,4 @@ ReadBufferFromFileBase::~ReadBufferFromFileBase()
{
}
off_t ReadBufferFromFileBase::seek(off_t off, int whence)
{
return doSeek(off, whence);
}
}

View File

@ -7,18 +7,18 @@
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <port/clock.h>
#include "SeekableReadBuffer.h"
namespace DB
{
class ReadBufferFromFileBase : public BufferWithOwnMemory<ReadBuffer>
class ReadBufferFromFileBase : public BufferWithOwnMemory<SeekableReadBuffer>
{
public:
ReadBufferFromFileBase();
ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
ReadBufferFromFileBase(ReadBufferFromFileBase &&) = default;
~ReadBufferFromFileBase() override;
off_t seek(off_t off, int whence = SEEK_SET);
virtual off_t getPositionInFile() = 0;
virtual std::string getFileName() const = 0;
virtual int getFD() const = 0;
@ -44,8 +44,6 @@ protected:
ProfileCallback profile_callback;
clockid_t clock_type{};
/// Children implementation should be able to seek backwards
virtual off_t doSeek(off_t off, int whence) = 0;
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <IO/ReadBuffer.h>
#include "SeekableReadBuffer.h"
namespace DB
@ -10,17 +11,20 @@ namespace DB
* In comparison with just ReadBuffer, it only adds convenient constructors, that do const_cast.
* In fact, ReadBuffer will not modify data in buffer, but it requires non-const pointer.
*/
class ReadBufferFromMemory : public ReadBuffer
class ReadBufferFromMemory : public SeekableReadBuffer
{
public:
ReadBufferFromMemory(const char * buf, size_t size)
: ReadBuffer(const_cast<char *>(buf), size, 0) {}
: SeekableReadBuffer(const_cast<char *>(buf), size, 0) {}
ReadBufferFromMemory(const unsigned char * buf, size_t size)
: ReadBuffer(const_cast<char *>(reinterpret_cast<const char *>(buf)), size, 0) {}
: SeekableReadBuffer(const_cast<char *>(reinterpret_cast<const char *>(buf)), size, 0) {}
ReadBufferFromMemory(const signed char * buf, size_t size)
: ReadBuffer(const_cast<char *>(reinterpret_cast<const char *>(buf)), size, 0) {}
: SeekableReadBuffer(const_cast<char *>(reinterpret_cast<const char *>(buf)), size, 0) {}
protected:
off_t doSeek(off_t, int) override { return 0; }
};
}

View File

@ -9,6 +9,8 @@
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/S3Client.h>
#include <utility>
namespace DB
{
@ -18,28 +20,28 @@ namespace ErrorCodes
}
ReadBufferFromS3::ReadBufferFromS3(const std::shared_ptr<Aws::S3::S3Client> & client_ptr,
const String & bucket,
const String & key,
size_t buffer_size_): ReadBuffer(nullptr, 0)
ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t buffer_size_
)
: SeekableReadBuffer(nullptr, 0)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
, buffer_size(buffer_size_)
{
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
if (outcome.IsSuccess())
{
read_result = outcome.GetResultWithOwnership();
impl = std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size_);
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
bool ReadBufferFromS3::nextImpl()
{
if (!initialized)
{
impl = initialize();
initialized = true;
}
if (!impl->next())
return false;
internal_buffer = impl->buffer();
@ -47,6 +49,31 @@ bool ReadBufferFromS3::nextImpl()
return true;
}
off_t ReadBufferFromS3::doSeek(off_t offset_, int) {
if (!initialized && offset_)
offset = offset_;
return offset;
}
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize() {
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (offset != 0)
req.SetRange(std::to_string(offset) + "-");
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
if (outcome.IsSuccess())
{
read_result = outcome.GetResultWithOwnership();
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size);
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
}
#endif

View File

@ -9,6 +9,7 @@
#include <IO/HTTPCommon.h>
#include <IO/ReadBuffer.h>
#include <aws/s3/model/GetObjectResult.h>
#include "SeekableReadBuffer.h"
namespace Aws::S3
{
@ -17,24 +18,36 @@ namespace Aws::S3
namespace DB
{
/** Perform S3 HTTP GET request and provide response to read.
*/
class ReadBufferFromS3 : public ReadBuffer
/**
* Perform S3 HTTP GET request and provide response to read.
*/
class ReadBufferFromS3 : public SeekableReadBuffer
{
private:
Logger * log = &Logger::get("ReadBufferFromS3");
std::shared_ptr<Aws::S3::S3Client> client_ptr;
String bucket;
String key;
size_t buffer_size;
bool initialized = false;
off_t offset = 0;
Aws::S3::Model::GetObjectResult read_result;
protected:
std::unique_ptr<ReadBuffer> impl;
Logger * log = &Logger::get("ReadBufferFromS3");
protected:
off_t doSeek(off_t off, int whence) override;
public:
explicit ReadBufferFromS3(const std::shared_ptr<Aws::S3::S3Client> & client_ptr,
const String & bucket,
const String & key,
explicit ReadBufferFromS3(std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
bool nextImpl() override;
private:
std::unique_ptr<ReadBuffer> initialize();
};
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <string>
#include <ctime>
#include <functional>
#include <fcntl.h>
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <port/clock.h>
namespace DB {
class SeekableReadBuffer : public ReadBuffer {
public:
SeekableReadBuffer(Position ptr, size_t size)
: ReadBuffer(ptr, size) {}
SeekableReadBuffer(Position ptr, size_t size, size_t offset)
: ReadBuffer(ptr, size, offset) {}
off_t seek(off_t off, int whence = SEEK_SET) {
return doSeek(off, whence);
};
protected:
/// Children implementation should be able to seek backwards
virtual off_t doSeek(off_t off, int whence) = 0;
};
}

View File

@ -5,8 +5,6 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadHelpers.h>
@ -83,14 +81,14 @@ private:
struct Stream
{
Stream(const DiskPtr & disk, const String & data_path, size_t offset, size_t max_read_buffer_size_)
: plain(fullPath(disk, data_path), std::min(max_read_buffer_size_, disk->getFileSize(data_path))),
compressed(plain)
: plain(disk->readFile(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path)))),
compressed(*plain)
{
if (offset)
plain.seek(offset);
plain->seek(offset);
}
ReadBufferFromFile plain;
std::unique_ptr<SeekableReadBuffer> plain;
CompressedReadBuffer compressed;
};
@ -111,7 +109,7 @@ public:
explicit LogBlockOutputStream(StorageLog & storage_)
: storage(storage_),
lock(storage.rwlock),
marks_stream(fullPath(storage.disk, storage.marks_file_path), 4096, O_APPEND | O_CREAT | O_WRONLY)
marks_stream(storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Append))
{
}
@ -139,13 +137,13 @@ private:
struct Stream
{
Stream(const DiskPtr & disk, const String & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) :
plain(fullPath(disk, data_path), max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain, std::move(codec), max_compress_block_size),
plain(disk->writeFile(data_path, max_compress_block_size, WriteMode::Append)),
compressed(*plain, std::move(codec), max_compress_block_size),
plain_offset(disk->getFileSize(data_path))
{
}
WriteBufferFromFile plain;
std::unique_ptr<WriteBuffer> plain;
CompressedWriteBuffer compressed;
size_t plain_offset; /// How many bytes were in the file at the time the LogBlockOutputStream was created.
@ -153,7 +151,7 @@ private:
void finalize()
{
compressed.next();
plain.next();
plain->next();
}
};
@ -165,7 +163,7 @@ private:
using WrittenStreams = std::set<String>;
WriteBufferFromFile marks_stream; /// Declared below `lock` to make the file open when rwlock is captured.
std::unique_ptr<WriteBuffer> marks_stream; /// Declared below `lock` to make the file open when rwlock is captured.
using SerializeState = IDataType::SerializeBinaryBulkStatePtr;
using SerializeStates = std::map<String, SerializeState>;
@ -302,7 +300,7 @@ void LogBlockOutputStream::writeSuffix()
}
/// Finish write.
marks_stream.next();
marks_stream->next();
for (auto & name_stream : streams)
name_stream.second.finalize();
@ -372,7 +370,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
Mark mark;
mark.rows = (file.marks.empty() ? 0 : file.marks.back().rows) + column.size();
mark.offset = stream_it->second.plain_offset + stream_it->second.plain.count();
mark.offset = stream_it->second.plain_offset + stream_it->second.plain->count();
out_marks.emplace_back(file.column_index, mark);
}, settings.path);
@ -402,8 +400,8 @@ void LogBlockOutputStream::writeMarks(MarksForColumns && marks)
for (const auto & mark : marks)
{
writeIntBinary(mark.second.rows, marks_stream);
writeIntBinary(mark.second.offset, marks_stream);
writeIntBinary(mark.second.rows, *marks_stream);
writeIntBinary(mark.second.offset, *marks_stream);
size_t column_index = mark.first;
storage.files[storage.column_names_by_idx[column_index]].marks.push_back(mark.second);