diff --git a/dbms/include/DB/DataStreams/JSONRowOutputStream.h b/dbms/include/DB/DataStreams/JSONRowOutputStream.h new file mode 100644 index 00000000000..13319232471 --- /dev/null +++ b/dbms/include/DB/DataStreams/JSONRowOutputStream.h @@ -0,0 +1,39 @@ +#pragma once + +#include + +#include +#include +#include +#include + + +namespace DB +{ + +/** Поток для вывода данных в формате JSON. + */ +class JSONRowOutputStream : public IRowOutputStream +{ +public: + JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_); + + void writeField(const Field & field); + void writeFieldDelimiter(); + void writeRowStartDelimiter(); + void writeRowEndDelimiter(); + void writePrefix(); + void writeSuffix(); + +protected: + typedef std::vector NamesAndTypesVector; + + //WriteBufferValidUTF8 ostr; + WriteBuffer & ostr; + size_t field_number; + size_t row_count; + NamesAndTypesVector fields; +}; + +} + diff --git a/dbms/src/DataStreams/JSONRowOutputStream.cpp b/dbms/src/DataStreams/JSONRowOutputStream.cpp new file mode 100644 index 00000000000..8c39b337d9c --- /dev/null +++ b/dbms/src/DataStreams/JSONRowOutputStream.cpp @@ -0,0 +1,94 @@ +#include + +#include + + +namespace DB +{ + +using Poco::SharedPtr; + + +JSONRowOutputStream::JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_) + : ostr(ostr_), field_number(0), row_count(0) +{ + NamesAndTypesList columns(sample_.getColumnsList()); + fields.assign(columns.begin(), columns.end()); +} + + +void JSONRowOutputStream::writePrefix() +{ + writeString("{\n", ostr); + writeString("\t\"meta\":\n", ostr); + writeString("\t[\n", ostr); + + for (size_t i = 0; i < fields.size(); ++i) + { + writeString("\t\t{\n", ostr); + + writeString("\t\t\t\"name\": ", ostr); + writeDoubleQuotedString(fields[i].first, ostr); + writeString(",\n", ostr); + writeString("\t\t\t\"type\": ", ostr); + writeDoubleQuotedString(fields[i].second->getName(), ostr); + writeChar('\n', ostr); + + writeString("\t\t}", ostr); + if (i + 1 < fields.size()) + writeChar(',', ostr); + writeChar('\n', ostr); + } + + writeString("\t],\n", ostr); + writeChar('\n', ostr); + writeString("\t\"data\":\n", ostr); + writeString("\t[\n", ostr); +} + + +void JSONRowOutputStream::writeField(const Field & field) +{ + writeString("\t\t\t", ostr); + writeDoubleQuotedString(fields[field_number].first, ostr); + writeString(": ", ostr); + fields[field_number].second->serializeTextQuoted(field, ostr); + ++field_number; +} + + +void JSONRowOutputStream::writeFieldDelimiter() +{ + writeString(",\n", ostr); +} + + +void JSONRowOutputStream::writeRowStartDelimiter() +{ + if (row_count > 0) + writeString(",\n", ostr); + writeString("\t\t{\n", ostr); +} + + +void JSONRowOutputStream::writeRowEndDelimiter() +{ + writeChar('\n', ostr); + writeString("\t\t}", ostr); + field_number = 0; + ++row_count; +} + + +void JSONRowOutputStream::writeSuffix() +{ + writeChar('\n', ostr); + writeString("\t],\n", ostr); + writeChar('\n', ostr); + writeString("\t\"rows\": ", ostr); + writeIntText(row_count, ostr); + writeChar('\n', ostr); + writeString("}\n", ostr); +} + +} diff --git a/dbms/src/DataStreams/tests/json_streams.cpp b/dbms/src/DataStreams/tests/json_streams.cpp new file mode 100644 index 00000000000..a2035c49aa2 --- /dev/null +++ b/dbms/src/DataStreams/tests/json_streams.cpp @@ -0,0 +1,80 @@ +#include +#include +#include +#include + +#include + +#include + +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + + +using Poco::SharedPtr; + + +int main(int argc, char ** argv) +{ + try + { + DB::NamesAndTypesListPtr names_and_types_list = new DB::NamesAndTypesList; + + boost::assign::push_back(*names_and_types_list) + ("WatchID", new DB::DataTypeUInt64) + ("ClientIP", new DB::DataTypeUInt32) + ("Referer", new DB::DataTypeString) + ("URL", new DB::DataTypeString) + ("IsLink", new DB::DataTypeUInt8) + ("OriginalUserAgent", new DB::DataTypeString) + ("EventTime", new DB::DataTypeDateTime) + ; + + SharedPtr data_types = new DB::DataTypes; + + for (DB::NamesAndTypesList::const_iterator it = names_and_types_list->begin(); it != names_and_types_list->end(); ++it) + data_types->push_back(it->second); + + DB::Block sample; + for (DB::NamesAndTypesList::const_iterator it = names_and_types_list->begin(); it != names_and_types_list->end(); ++it) + { + DB::ColumnWithNameAndType elem; + elem.name = it->first; + elem.type = it->second; + elem.column = elem.type->createColumn(); + sample.insert(elem); + } + + { + std::ifstream istr("json_test.in"); + std::ofstream ostr("json_test.out"); + + DB::ReadBufferFromIStream in_buf(istr); + DB::WriteBufferFromOStream out_buf(ostr); + + DB::TabSeparatedRowInputStream row_input(in_buf, sample, true, true); + DB::JSONRowOutputStream row_output(out_buf, sample); + + DB::copyData(row_input, row_output); + } + } + catch (const DB::Exception & e) + { + std::cerr << e.what() << ", " << e.displayText() << std::endl; + return 1; + } + + return 0; +}