From 856108efe989b79513c326127a8950418374f6e4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 10 Nov 2012 04:43:53 +0000 Subject: [PATCH] dbms: added 'RowBinary' format [#CONV-2944]. --- .../DB/DataStreams/BinaryRowInputStream.h | 30 +++++++++++++++ .../DB/DataStreams/BinaryRowOutputStream.h | 33 +++++++++++++++++ dbms/src/DataStreams/BinaryRowInputStream.cpp | 37 +++++++++++++++++++ .../src/DataStreams/BinaryRowOutputStream.cpp | 30 +++++++++++++++ dbms/src/DataStreams/FormatFactory.cpp | 6 +++ 5 files changed, 136 insertions(+) create mode 100644 dbms/include/DB/DataStreams/BinaryRowInputStream.h create mode 100644 dbms/include/DB/DataStreams/BinaryRowOutputStream.h create mode 100644 dbms/src/DataStreams/BinaryRowInputStream.cpp create mode 100644 dbms/src/DataStreams/BinaryRowOutputStream.cpp diff --git a/dbms/include/DB/DataStreams/BinaryRowInputStream.h b/dbms/include/DB/DataStreams/BinaryRowInputStream.h new file mode 100644 index 00000000000..12d203c2030 --- /dev/null +++ b/dbms/include/DB/DataStreams/BinaryRowInputStream.h @@ -0,0 +1,30 @@ +#pragma once + +#include + +#include +#include +#include + + +namespace DB +{ + +/** Поток для ввода данных в бинарном построчном формате. + */ +class BinaryRowInputStream : public IRowInputStream +{ +public: + BinaryRowInputStream(ReadBuffer & istr_, const Block & sample_); + + Row read(); + + RowInputStreamPtr clone() { return new BinaryRowInputStream(istr, sample); } + +private: + ReadBuffer & istr; + const Block sample; + DataTypes data_types; +}; + +} diff --git a/dbms/include/DB/DataStreams/BinaryRowOutputStream.h b/dbms/include/DB/DataStreams/BinaryRowOutputStream.h new file mode 100644 index 00000000000..6c0e64ffe90 --- /dev/null +++ b/dbms/include/DB/DataStreams/BinaryRowOutputStream.h @@ -0,0 +1,33 @@ +#pragma once + +#include + +#include +#include +#include + + +namespace DB +{ + +/** Поток для вывода данных в бинарном построчном формате. + */ +class BinaryRowOutputStream : public IRowOutputStream +{ +public: + BinaryRowOutputStream(WriteBuffer & ostr_, const Block & sample_); + + void writeField(const Field & field); + void writeRowEndDelimiter(); + + RowOutputStreamPtr clone() { return new BinaryRowOutputStream(ostr, sample); } + +protected: + WriteBuffer & ostr; + const Block sample; + DataTypes data_types; + size_t field_number; +}; + +} + diff --git a/dbms/src/DataStreams/BinaryRowInputStream.cpp b/dbms/src/DataStreams/BinaryRowInputStream.cpp new file mode 100644 index 00000000000..bed8c83fded --- /dev/null +++ b/dbms/src/DataStreams/BinaryRowInputStream.cpp @@ -0,0 +1,37 @@ +#include + + +namespace DB +{ + +BinaryRowInputStream::BinaryRowInputStream(ReadBuffer & istr_, const Block & sample_) + : istr(istr_), sample(sample_) +{ + size_t columns = sample.columns(); + data_types.resize(columns); + for (size_t i = 0; i < columns; ++i) + data_types[i] = sample.getByPosition(i).type; +} + + +Row BinaryRowInputStream::read() +{ + Row res; + size_t size = data_types.size(); + res.resize(size); + + for (size_t i = 0; i < size; ++i) + { + if (i == 0 && istr.eof()) + { + res.clear(); + return res; + } + + data_types[i]->deserializeBinary(res[i], istr); + } + + return res; +} + +} diff --git a/dbms/src/DataStreams/BinaryRowOutputStream.cpp b/dbms/src/DataStreams/BinaryRowOutputStream.cpp new file mode 100644 index 00000000000..7888e06c277 --- /dev/null +++ b/dbms/src/DataStreams/BinaryRowOutputStream.cpp @@ -0,0 +1,30 @@ +#include + + +namespace DB +{ + + +BinaryRowOutputStream::BinaryRowOutputStream(WriteBuffer & ostr_, const Block & sample_) + : ostr(ostr_), sample(sample_), field_number(0) +{ + size_t columns = sample.columns(); + data_types.resize(columns); + for (size_t i = 0; i < columns; ++i) + data_types[i] = sample.getByPosition(i).type; +} + + +void BinaryRowOutputStream::writeField(const Field & field) +{ + data_types[field_number]->serializeBinary(field, ostr); + ++field_number; +} + + +void BinaryRowOutputStream::writeRowEndDelimiter() +{ + field_number = 0; +} + +} diff --git a/dbms/src/DataStreams/FormatFactory.cpp b/dbms/src/DataStreams/FormatFactory.cpp index 2ab936fc13e..faada8dcebb 100644 --- a/dbms/src/DataStreams/FormatFactory.cpp +++ b/dbms/src/DataStreams/FormatFactory.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include #include @@ -26,6 +28,8 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu return new NativeBlockInputStream(buf, data_type_factory); else if (name == "TabSeparated") return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample), sample, max_block_size); + else if (name == "RowBinary") + return new BlockInputStreamFromRowInputStream(new BinaryRowInputStream(buf, sample), sample, max_block_size); else if (name == "TabSeparatedWithNames") return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample, true), sample, max_block_size); else if (name == "TabSeparatedWithNamesAndTypes") @@ -46,6 +50,8 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer & return new NativeBlockOutputStream(buf); else if (name == "TabSeparated") return new BlockOutputStreamFromRowOutputStream(new TabSeparatedRowOutputStream(buf, sample)); + else if (name == "RowBinary") + return new BlockOutputStreamFromRowOutputStream(new BinaryRowOutputStream(buf, sample)); else if (name == "TabSeparatedWithNames") return new BlockOutputStreamFromRowOutputStream(new TabSeparatedRowOutputStream(buf, sample, true)); else if (name == "TabSeparatedWithNamesAndTypes")