dbms: added 'RowBinary' format [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-11-10 04:43:53 +00:00
parent 686878a5fb
commit 856108efe9
5 changed files with 136 additions and 0 deletions

View File

@ -0,0 +1,30 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Core/Block.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/DataStreams/IRowInputStream.h>
namespace DB
{
/** Поток для ввода данных в бинарном построчном формате.
*/
class BinaryRowInputStream : public IRowInputStream
{
public:
BinaryRowInputStream(ReadBuffer & istr_, const Block & sample_);
Row read();
RowInputStreamPtr clone() { return new BinaryRowInputStream(istr, sample); }
private:
ReadBuffer & istr;
const Block sample;
DataTypes data_types;
};
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Core/Block.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/DataStreams/IRowOutputStream.h>
namespace DB
{
/** Поток для вывода данных в бинарном построчном формате.
*/
class BinaryRowOutputStream : public IRowOutputStream
{
public:
BinaryRowOutputStream(WriteBuffer & ostr_, const Block & sample_);
void writeField(const Field & field);
void writeRowEndDelimiter();
RowOutputStreamPtr clone() { return new BinaryRowOutputStream(ostr, sample); }
protected:
WriteBuffer & ostr;
const Block sample;
DataTypes data_types;
size_t field_number;
};
}

View File

@ -0,0 +1,37 @@
#include <DB/DataStreams/BinaryRowInputStream.h>
namespace DB
{
BinaryRowInputStream::BinaryRowInputStream(ReadBuffer & istr_, const Block & sample_)
: istr(istr_), sample(sample_)
{
size_t columns = sample.columns();
data_types.resize(columns);
for (size_t i = 0; i < columns; ++i)
data_types[i] = sample.getByPosition(i).type;
}
Row BinaryRowInputStream::read()
{
Row res;
size_t size = data_types.size();
res.resize(size);
for (size_t i = 0; i < size; ++i)
{
if (i == 0 && istr.eof())
{
res.clear();
return res;
}
data_types[i]->deserializeBinary(res[i], istr);
}
return res;
}
}

View File

@ -0,0 +1,30 @@
#include <DB/DataStreams/BinaryRowOutputStream.h>
namespace DB
{
BinaryRowOutputStream::BinaryRowOutputStream(WriteBuffer & ostr_, const Block & sample_)
: ostr(ostr_), sample(sample_), field_number(0)
{
size_t columns = sample.columns();
data_types.resize(columns);
for (size_t i = 0; i < columns; ++i)
data_types[i] = sample.getByPosition(i).type;
}
void BinaryRowOutputStream::writeField(const Field & field)
{
data_types[field_number]->serializeBinary(field, ostr);
++field_number;
}
void BinaryRowOutputStream::writeRowEndDelimiter()
{
field_number = 0;
}
}

View File

@ -3,6 +3,8 @@
#include <DB/DataStreams/TabSeparatedRowInputStream.h> #include <DB/DataStreams/TabSeparatedRowInputStream.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h> #include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/TabSeparatedRawRowOutputStream.h> #include <DB/DataStreams/TabSeparatedRawRowOutputStream.h>
#include <DB/DataStreams/BinaryRowInputStream.h>
#include <DB/DataStreams/BinaryRowOutputStream.h>
#include <DB/DataStreams/ValuesRowInputStream.h> #include <DB/DataStreams/ValuesRowInputStream.h>
#include <DB/DataStreams/ValuesRowOutputStream.h> #include <DB/DataStreams/ValuesRowOutputStream.h>
#include <DB/DataStreams/TabSeparatedBlockOutputStream.h> #include <DB/DataStreams/TabSeparatedBlockOutputStream.h>
@ -26,6 +28,8 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
return new NativeBlockInputStream(buf, data_type_factory); return new NativeBlockInputStream(buf, data_type_factory);
else if (name == "TabSeparated") else if (name == "TabSeparated")
return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample), sample, max_block_size); return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample), sample, max_block_size);
else if (name == "RowBinary")
return new BlockInputStreamFromRowInputStream(new BinaryRowInputStream(buf, sample), sample, max_block_size);
else if (name == "TabSeparatedWithNames") else if (name == "TabSeparatedWithNames")
return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample, true), sample, max_block_size); return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample, true), sample, max_block_size);
else if (name == "TabSeparatedWithNamesAndTypes") else if (name == "TabSeparatedWithNamesAndTypes")
@ -46,6 +50,8 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer &
return new NativeBlockOutputStream(buf); return new NativeBlockOutputStream(buf);
else if (name == "TabSeparated") else if (name == "TabSeparated")
return new BlockOutputStreamFromRowOutputStream(new TabSeparatedRowOutputStream(buf, sample)); return new BlockOutputStreamFromRowOutputStream(new TabSeparatedRowOutputStream(buf, sample));
else if (name == "RowBinary")
return new BlockOutputStreamFromRowOutputStream(new BinaryRowOutputStream(buf, sample));
else if (name == "TabSeparatedWithNames") else if (name == "TabSeparatedWithNames")
return new BlockOutputStreamFromRowOutputStream(new TabSeparatedRowOutputStream(buf, sample, true)); return new BlockOutputStreamFromRowOutputStream(new TabSeparatedRowOutputStream(buf, sample, true));
else if (name == "TabSeparatedWithNamesAndTypes") else if (name == "TabSeparatedWithNamesAndTypes")