Split implementations to different files. [#CLICKHOUSE-2070]

This commit is contained in:
Vitaliy Lyudvichenko 2017-02-22 18:29:43 +03:00 committed by alexey-milovidov
parent 15254f91d2
commit c73971d7ec
9 changed files with 273 additions and 250 deletions

View File

@ -0,0 +1,43 @@
#pragma once
#include <functional>
#include <DB/IO/WriteBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED;
}
class CascadeWriteBuffer : public WriteBuffer
{
public:
using WriteBufferPtrs = std::vector<WriteBufferPtr>;
using WriteBufferConstructor = std::function<WriteBufferPtr (const WriteBufferPtr & prev_buf)>;
using WriteBufferConstructors = std::vector<WriteBufferConstructor>;
CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_ = {});
void nextImpl() override;
/// Should be called once
void getResultBuffers(WriteBufferPtrs & res);
private:
WriteBuffer * getNextBuffer();
WriteBufferPtrs prepared_sources;
WriteBufferConstructors lazy_sources;
size_t first_lazy_source_num;
size_t num_sources;
WriteBuffer * curr_buffer;
size_t curr_buffer_num;
};
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <memory>
#include <DB/IO/ReadBuffer.h>
namespace DB
{
struct IReadableWriteBuffer
{
/// Creates read buffer from current write buffer.
/// Returned buffer points to the first byte of original buffer.
/// Original stream becomes invalid.
virtual std::shared_ptr<ReadBuffer> getReadBuffer() = 0;
virtual ~IReadableWriteBuffer() {}
};
}

View File

@ -1,72 +1,17 @@
#include <functional>
#pragma once
#include <forward_list>
#include <iostream>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/IReadableWriteBuffer.h>
#include <DB/Core/Defines.h>
#include <DB/Common/Allocator.h>
#include <DB/Common/Exception.h>
#include <Poco/File.h>
#include <Poco/Path.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
class CascadeWriteBuffer : public WriteBuffer
{
public:
using WriteBufferPtrs = std::vector<WriteBufferPtr>;
using WriteBufferConstructor = std::function<WriteBufferPtr (const WriteBufferPtr & prev_buf)>;
using WriteBufferConstructors = std::vector<WriteBufferConstructor>;
CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_ = {});
void nextImpl() override;
/// Should be called once
void getResultBuffers(WriteBufferPtrs & res);
private:
WriteBuffer * getNextBuffer();
WriteBufferPtrs prepared_sources;
WriteBufferConstructors lazy_sources;
size_t first_lazy_source_num;
size_t num_sources;
WriteBuffer * curr_buffer;
size_t curr_buffer_num;
};
class ReadBufferFromMemoryWriteBuffer;
class ReadBufferFromTemporaryWriteBuffer;
struct IReadableWriteBuffer
{
/// Creates read buffer from current write buffer
/// Returned buffer points to the first byte of original buffer and finds with current position.
/// Original stream becomes invalid.
virtual std::shared_ptr<ReadBuffer> getReadBuffer() = 0;
virtual ~IReadableWriteBuffer() {}
};
/// Allow to write large data into memory
/// Stores data in memory chunks, size of cunks are exponentially increasing duting write
/// Data could be reread after write
class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer
{
public:
@ -100,26 +45,4 @@ protected:
};
/// Rereadable WriteBuffer, could be used as disk buffer
/// Creates unique temporary in directory (and directory itself)
class WriteBufferFromTemporaryFile : public WriteBufferFromFile, public IReadableWriteBuffer
{
public:
using Ptr = std::shared_ptr<WriteBufferFromTemporaryFile>;
/// path_template examle "/opt/clickhouse/tmp/data.XXXXXX"
static Ptr create(const std::string & path_template_);
std::shared_ptr<ReadBuffer> getReadBuffer() override;
~WriteBufferFromTemporaryFile() override;
protected:
WriteBufferFromTemporaryFile(int fd, const std::string & tmp_path)
: WriteBufferFromFile(fd, tmp_path)
{}
};
}

View File

@ -0,0 +1,29 @@
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/IReadableWriteBuffer.h>
#include <DB/IO/WriteBufferFromFile.h>
namespace DB
{
/// Rereadable WriteBuffer, could be used as disk buffer
/// Creates unique temporary in directory (and directory itself)
class WriteBufferFromTemporaryFile : public WriteBufferFromFile, public IReadableWriteBuffer
{
public:
using Ptr = std::shared_ptr<WriteBufferFromTemporaryFile>;
/// path_template examle "/opt/clickhouse/tmp/data.XXXXXX"
static Ptr create(const std::string & path_template_);
std::shared_ptr<ReadBuffer> getReadBuffer() override;
~WriteBufferFromTemporaryFile() override;
protected:
WriteBufferFromTemporaryFile(int fd, const std::string & tmp_path)
: WriteBufferFromFile(fd, tmp_path)
{}
};
}

View File

@ -0,0 +1,84 @@
#include <DB/IO/CascadeWriteBuffer.h>
#include <DB/Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER;
extern const int CANNOT_CREATE_IO_BUFFER;
}
CascadeWriteBuffer::CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_)
: WriteBuffer(nullptr, 0), prepared_sources(std::move(prepared_sources_)), lazy_sources(std::move(lazy_sources_))
{
first_lazy_source_num = prepared_sources.size();
num_sources = first_lazy_source_num + lazy_sources.size();
/// fill lazy sources by nullptr
prepared_sources.resize(num_sources);
curr_buffer_num = 0;
curr_buffer = getNextBuffer();
set(curr_buffer->buffer().begin(), curr_buffer->buffer().size());
}
void CascadeWriteBuffer::nextImpl()
{
try
{
curr_buffer->position() = position();
curr_buffer->next();
}
catch (const Exception & e)
{
if (curr_buffer_num < num_sources && e.code() == ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED)
{
/// actualize position of old buffer (it was reset by WriteBuffer::next)
curr_buffer->position() = position();
/// good situation, fetch next WriteBuffer
++curr_buffer_num;
curr_buffer = getNextBuffer();
}
else
throw;
}
set(curr_buffer->buffer().begin(), curr_buffer->buffer().size());
}
void CascadeWriteBuffer::getResultBuffers(WriteBufferPtrs & res)
{
curr_buffer->position() = position();
res = std::move(prepared_sources);
curr_buffer = nullptr;
curr_buffer_num = num_sources = 0;
}
WriteBuffer * CascadeWriteBuffer::getNextBuffer()
{
if (first_lazy_source_num <= curr_buffer_num && curr_buffer_num < num_sources)
{
if (!prepared_sources[curr_buffer_num])
{
WriteBufferPtr prev_buf = (curr_buffer_num > 0) ? prepared_sources[curr_buffer_num - 1] : nullptr;
prepared_sources[curr_buffer_num] = lazy_sources[curr_buffer_num - first_lazy_source_num](prev_buf);
}
}
else if (curr_buffer_num >= num_sources)
throw Exception("There are no WriteBuffers to write result", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER);
WriteBuffer * res = prepared_sources[curr_buffer_num].get();
if (!res)
throw Exception("Required WriteBuffer is not created", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
return res;
}
}

View File

@ -1,9 +1,6 @@
#include <DB/IO/MemoryReadWriteBuffer.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/Common/Exception.h>
#include <DB/Common/Allocator.h>
#include <common/likely.h>
#include <iostream>
namespace DB
{
@ -11,96 +8,6 @@ namespace DB
namespace ErrorCodes
{
extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int LOGICAL_ERROR;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER;
extern const int CANNOT_CREATE_IO_BUFFER;
}
inline std::ostream & operator << (std::ostream & stream, BufferBase & buffer)
{
stream
<< " begin=" << reinterpret_cast<void*>(buffer.buffer().begin())
<< " pos=" << buffer.position() - buffer.buffer().begin()
<< " size=" << buffer.buffer().size()
<< " int_size=" << buffer.internalBuffer().size() << "\n";
return stream;
}
CascadeWriteBuffer::CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_)
: WriteBuffer(nullptr, 0), prepared_sources(std::move(prepared_sources_)), lazy_sources(std::move(lazy_sources_))
{
first_lazy_source_num = prepared_sources.size();
num_sources = first_lazy_source_num + lazy_sources.size();
/// fill lazy sources by nullptr
prepared_sources.resize(num_sources);
curr_buffer_num = 0;
curr_buffer = getNextBuffer();
set(curr_buffer->buffer().begin(), curr_buffer->buffer().size());
}
void CascadeWriteBuffer::nextImpl()
{
try
{
curr_buffer->position() = position();
curr_buffer->next();
}
catch (const Exception & e)
{
if (curr_buffer_num < num_sources && e.code() == ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED)
{
/// actualize position of old buffer (it was reset by WriteBuffer::next)
curr_buffer->position() = position();
/// good situation, fetch next WriteBuffer
++curr_buffer_num;
curr_buffer = getNextBuffer();
}
else
throw;
}
set(curr_buffer->buffer().begin(), curr_buffer->buffer().size());
}
void CascadeWriteBuffer::getResultBuffers(WriteBufferPtrs & res)
{
curr_buffer->position() = position();
res = std::move(prepared_sources);
curr_buffer = nullptr;
curr_buffer_num = num_sources = 0;
}
WriteBuffer * CascadeWriteBuffer::getNextBuffer()
{
if (first_lazy_source_num <= curr_buffer_num && curr_buffer_num < num_sources)
{
if (!prepared_sources[curr_buffer_num])
{
WriteBufferPtr prev_buf = (curr_buffer_num > 0) ? prepared_sources[curr_buffer_num - 1] : nullptr;
prepared_sources[curr_buffer_num] = lazy_sources[curr_buffer_num - first_lazy_source_num](prev_buf);
}
}
else if (curr_buffer_num >= num_sources)
throw Exception("There are no WriteBuffers to write result", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER);
WriteBuffer * res = prepared_sources[curr_buffer_num].get();
if (!res)
throw Exception("Required WriteBuffer is not created", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
return res;
}
@ -169,8 +76,6 @@ private:
Position end_pos;
};
MemoryWriteBuffer::MemoryWriteBuffer(size_t max_total_size_, size_t initial_chunk_size_, double growth_rate_)
: WriteBuffer(nullptr, 0), max_total_size(max_total_size_), initial_chunk_size(initial_chunk_size_), growth_rate(growth_rate_)
{
@ -235,74 +140,4 @@ MemoryWriteBuffer::~MemoryWriteBuffer()
Allocator<false>().free(range.begin(), range.size());
}
WriteBufferFromTemporaryFile::Ptr WriteBufferFromTemporaryFile::create(const std::string & path_template_)
{
std::string path_template = path_template_;
if (path_template.empty() || path_template.back() != 'X')
path_template += "XXXXXX";
Poco::File(Poco::Path(path_template).makeParent()).createDirectories();
int fd = mkstemp(const_cast<char *>(path_template.c_str()));
if (fd < 0)
throw Exception("Cannot create temporary file " + path_template, ErrorCodes::CANNOT_OPEN_FILE);
return Ptr(new WriteBufferFromTemporaryFile(fd, path_template));
}
WriteBufferFromTemporaryFile::~WriteBufferFromTemporaryFile()
{
/// remove temporary file if it was not passed to ReadBuffer
if (getFD() >= 0 && !getFileName().empty())
{
Poco::File(getFileName()).remove();
}
}
class ReadBufferFromTemporaryWriteBuffer : public ReadBufferFromFile
{
public:
static ReadBufferPtr createFrom(WriteBufferFromTemporaryFile * origin)
{
int fd = origin->getFD();
std::string file_name = origin->getFileName();
off_t res = lseek(fd, 0, SEEK_SET);
if (-1 == res)
throwFromErrno("Cannot reread temporary file " + file_name, ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
return std::make_shared<ReadBufferFromTemporaryWriteBuffer>(fd, file_name);
}
ReadBufferFromTemporaryWriteBuffer(int fd, const std::string & file_name)
: ReadBufferFromFile(fd, file_name)
{}
~ReadBufferFromTemporaryWriteBuffer() override
{
/// remove temporary file
Poco::File(file_name).remove();
}
};
ReadBufferPtr WriteBufferFromTemporaryFile::getReadBuffer()
{
/// ignore buffer, write all data to file and reread it from disk
sync();
auto res = ReadBufferFromTemporaryWriteBuffer::createFrom(this);
/// invalidate FD to avoid close(fd) in destructor
setFD(-1);
file_name = {};
return res;
}
}

View File

@ -0,0 +1,88 @@
#include <DB/IO/WriteBufferFromTemporaryFile.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <DB/IO/ReadBufferFromFile.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_SEEK_THROUGH_FILE;
}
WriteBufferFromTemporaryFile::Ptr WriteBufferFromTemporaryFile::create(const std::string & path_template_)
{
std::string path_template = path_template_;
if (path_template.empty() || path_template.back() != 'X')
path_template += "XXXXXX";
Poco::File(Poco::Path(path_template).makeParent()).createDirectories();
int fd = mkstemp(const_cast<char *>(path_template.c_str()));
if (fd < 0)
throw Exception("Cannot create temporary file " + path_template, ErrorCodes::CANNOT_OPEN_FILE);
return Ptr(new WriteBufferFromTemporaryFile(fd, path_template));
}
WriteBufferFromTemporaryFile::~WriteBufferFromTemporaryFile()
{
/// remove temporary file if it was not passed to ReadBuffer
if (getFD() >= 0 && !getFileName().empty())
{
Poco::File(getFileName()).remove();
}
}
class ReadBufferFromTemporaryWriteBuffer : public ReadBufferFromFile
{
public:
static ReadBufferPtr createFrom(WriteBufferFromTemporaryFile * origin)
{
int fd = origin->getFD();
std::string file_name = origin->getFileName();
off_t res = lseek(fd, 0, SEEK_SET);
if (-1 == res)
throwFromErrno("Cannot reread temporary file " + file_name, ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
return std::make_shared<ReadBufferFromTemporaryWriteBuffer>(fd, file_name);
}
ReadBufferFromTemporaryWriteBuffer(int fd, const std::string & file_name)
: ReadBufferFromFile(fd, file_name)
{}
~ReadBufferFromTemporaryWriteBuffer() override
{
/// remove temporary file
Poco::File(file_name).remove();
}
};
ReadBufferPtr WriteBufferFromTemporaryFile::getReadBuffer()
{
/// ignore buffer, write all data to file and reread it from disk
sync();
auto res = ReadBufferFromTemporaryWriteBuffer::createFrom(this);
/// invalidate FD to avoid close(fd) in destructor
setFD(-1);
file_name = {};
return res;
}
}

View File

@ -1,10 +1,11 @@
#include <gtest/gtest.h>
#include "/home/vludv/ClickHouse/contrib/googletest/include/gtest/gtest.h"
#include <memory.h>
#include <Poco/File.h>
#include <DB/IO/CascadeWriteBuffer.h>
#include <DB/IO/MemoryReadWriteBuffer.h>
#include <DB/IO/WriteBufferFromTemporaryFile.h>
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/copyData.h>

View File

@ -15,11 +15,13 @@
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteBufferFromHTTPServerResponse.h>
#include <DB/IO/MemoryReadWriteBuffer.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/copyData.h>
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/IO/CascadeWriteBuffer.h>
#include <DB/IO/MemoryReadWriteBuffer.h>
#include <DB/IO/WriteBufferFromTemporaryFile.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>