Preparations [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-07-09 23:36:58 +03:00
parent 68a2caed9a
commit 3c34487f43
4 changed files with 96 additions and 63 deletions

View File

@ -0,0 +1,36 @@
#include <IO/ReadBufferFromIStream.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_READ_FROM_ISTREAM;
}
bool ReadBufferFromIStream::nextImpl()
{
istr.read(internal_buffer.begin(), internal_buffer.size());
size_t gcount = istr.gcount();
if (!gcount)
{
if (istr.eof())
return false;
else
throw Exception("Cannot read from istream", ErrorCodes::CANNOT_READ_FROM_ISTREAM);
}
else
working_buffer.resize(gcount);
return true;
}
ReadBufferFromIStream::ReadBufferFromIStream(std::istream & istr_, size_t size)
: BufferWithOwnMemory<ReadBuffer>(size), istr(istr_)
{
}
}

View File

@ -2,8 +2,6 @@
#include <iostream> #include <iostream>
#include <Common/Exception.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
@ -11,38 +9,15 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int CANNOT_READ_FROM_ISTREAM;
}
class ReadBufferFromIStream : public BufferWithOwnMemory<ReadBuffer> class ReadBufferFromIStream : public BufferWithOwnMemory<ReadBuffer>
{ {
private: private:
std::istream & istr; std::istream & istr;
bool nextImpl() override bool nextImpl() override;
{
istr.read(internal_buffer.begin(), internal_buffer.size());
size_t gcount = istr.gcount();
if (!gcount)
{
if (istr.eof())
return false;
else
throw Exception("Cannot read from istream", ErrorCodes::CANNOT_READ_FROM_ISTREAM);
}
else
working_buffer.resize(gcount);
return true;
}
public: public:
ReadBufferFromIStream(std::istream & istr_, size_t size = DBMS_DEFAULT_BUFFER_SIZE) ReadBufferFromIStream(std::istream & istr_, size_t size = DBMS_DEFAULT_BUFFER_SIZE);
: BufferWithOwnMemory<ReadBuffer>(size), istr(istr_) {}
}; };
} }

View File

@ -0,0 +1,54 @@
#include <IO/WriteBufferFromOStream.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_WRITE_TO_OSTREAM;
}
void WriteBufferFromOStream::nextImpl()
{
if (!offset())
return;
ostr->write(working_buffer.begin(), offset());
ostr->flush();
if (!ostr->good())
throw Exception("Cannot write to ostream", ErrorCodes::CANNOT_WRITE_TO_OSTREAM);
}
WriteBufferFromOStream::WriteBufferFromOStream(
size_t size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(size, existing_memory, alignment)
{
}
WriteBufferFromOStream::WriteBufferFromOStream(
std::ostream & ostr_,
size_t size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(size, existing_memory, alignment), ostr(&ostr_)
{
}
WriteBufferFromOStream::~WriteBufferFromOStream()
{
try
{
next();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -2,8 +2,6 @@
#include <iostream> #include <iostream>
#include <Common/Exception.h>
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
@ -11,53 +9,23 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int CANNOT_WRITE_TO_OSTREAM;
}
class WriteBufferFromOStream : public BufferWithOwnMemory<WriteBuffer> class WriteBufferFromOStream : public BufferWithOwnMemory<WriteBuffer>
{ {
protected: protected:
std::ostream * ostr; std::ostream * ostr;
void nextImpl() override void nextImpl() override;
{
if (!offset())
return;
ostr->write(working_buffer.begin(), offset());
ostr->flush();
if (!ostr->good())
throw Exception("Cannot write to ostream", ErrorCodes::CANNOT_WRITE_TO_OSTREAM);
}
WriteBufferFromOStream(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: BufferWithOwnMemory<WriteBuffer>(size, existing_memory, alignment)
{
}
WriteBufferFromOStream(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0);
public: public:
WriteBufferFromOStream( WriteBufferFromOStream(
std::ostream & ostr_, std::ostream & ostr_,
size_t size = DBMS_DEFAULT_BUFFER_SIZE, size_t size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0) size_t alignment = 0);
: BufferWithOwnMemory<WriteBuffer>(size, existing_memory, alignment), ostr(&ostr_) {}
~WriteBufferFromOStream() override ~WriteBufferFromOStream() override;
{
try
{
next();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}; };
} }