JSONRowOutputStream [#CONV-7445]

This commit is contained in:
Vyacheslav Alipov 2013-05-15 11:18:58 +00:00
parent e03e85184f
commit 8685024166
3 changed files with 213 additions and 0 deletions

View File

@ -0,0 +1,39 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Core/Block.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/WriteBufferValidUTF8.h>
#include <DB/DataStreams/IRowOutputStream.h>
namespace DB
{
/** Поток для вывода данных в формате JSON.
*/
class JSONRowOutputStream : public IRowOutputStream
{
public:
JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_);
void writeField(const Field & field);
void writeFieldDelimiter();
void writeRowStartDelimiter();
void writeRowEndDelimiter();
void writePrefix();
void writeSuffix();
protected:
typedef std::vector<NameAndTypePair> NamesAndTypesVector;
//WriteBufferValidUTF8 ostr;
WriteBuffer & ostr;
size_t field_number;
size_t row_count;
NamesAndTypesVector fields;
};
}

View File

@ -0,0 +1,94 @@
#include <DB/DataStreams/JSONRowOutputStream.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
using Poco::SharedPtr;
JSONRowOutputStream::JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_)
: ostr(ostr_), field_number(0), row_count(0)
{
NamesAndTypesList columns(sample_.getColumnsList());
fields.assign(columns.begin(), columns.end());
}
void JSONRowOutputStream::writePrefix()
{
writeString("{\n", ostr);
writeString("\t\"meta\":\n", ostr);
writeString("\t[\n", ostr);
for (size_t i = 0; i < fields.size(); ++i)
{
writeString("\t\t{\n", ostr);
writeString("\t\t\t\"name\": ", ostr);
writeDoubleQuotedString(fields[i].first, ostr);
writeString(",\n", ostr);
writeString("\t\t\t\"type\": ", ostr);
writeDoubleQuotedString(fields[i].second->getName(), ostr);
writeChar('\n', ostr);
writeString("\t\t}", ostr);
if (i + 1 < fields.size())
writeChar(',', ostr);
writeChar('\n', ostr);
}
writeString("\t],\n", ostr);
writeChar('\n', ostr);
writeString("\t\"data\":\n", ostr);
writeString("\t[\n", ostr);
}
void JSONRowOutputStream::writeField(const Field & field)
{
writeString("\t\t\t", ostr);
writeDoubleQuotedString(fields[field_number].first, ostr);
writeString(": ", ostr);
fields[field_number].second->serializeTextQuoted(field, ostr);
++field_number;
}
void JSONRowOutputStream::writeFieldDelimiter()
{
writeString(",\n", ostr);
}
void JSONRowOutputStream::writeRowStartDelimiter()
{
if (row_count > 0)
writeString(",\n", ostr);
writeString("\t\t{\n", ostr);
}
void JSONRowOutputStream::writeRowEndDelimiter()
{
writeChar('\n', ostr);
writeString("\t\t}", ostr);
field_number = 0;
++row_count;
}
void JSONRowOutputStream::writeSuffix()
{
writeChar('\n', ostr);
writeString("\t],\n", ostr);
writeChar('\n', ostr);
writeString("\t\"rows\": ", ostr);
writeIntText(row_count, ostr);
writeChar('\n', ostr);
writeString("}\n", ostr);
}
}

View File

@ -0,0 +1,80 @@
#include <map>
#include <list>
#include <iostream>
#include <fstream>
#include <boost/assign/list_inserter.hpp>
#include <Poco/SharedPtr.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataStreams/TabSeparatedRowInputStream.h>
#include <DB/DataStreams/TabSeparatedBlockOutputStream.h>
#include <DB/DataStreams/JSONRowOutputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/Storages/StorageLog.h>
using Poco::SharedPtr;
int main(int argc, char ** argv)
{
try
{
DB::NamesAndTypesListPtr names_and_types_list = new DB::NamesAndTypesList;
boost::assign::push_back(*names_and_types_list)
("WatchID", new DB::DataTypeUInt64)
("ClientIP", new DB::DataTypeUInt32)
("Referer", new DB::DataTypeString)
("URL", new DB::DataTypeString)
("IsLink", new DB::DataTypeUInt8)
("OriginalUserAgent", new DB::DataTypeString)
("EventTime", new DB::DataTypeDateTime)
;
SharedPtr<DB::DataTypes> data_types = new DB::DataTypes;
for (DB::NamesAndTypesList::const_iterator it = names_and_types_list->begin(); it != names_and_types_list->end(); ++it)
data_types->push_back(it->second);
DB::Block sample;
for (DB::NamesAndTypesList::const_iterator it = names_and_types_list->begin(); it != names_and_types_list->end(); ++it)
{
DB::ColumnWithNameAndType elem;
elem.name = it->first;
elem.type = it->second;
elem.column = elem.type->createColumn();
sample.insert(elem);
}
{
std::ifstream istr("json_test.in");
std::ofstream ostr("json_test.out");
DB::ReadBufferFromIStream in_buf(istr);
DB::WriteBufferFromOStream out_buf(ostr);
DB::TabSeparatedRowInputStream row_input(in_buf, sample, true, true);
DB::JSONRowOutputStream row_output(out_buf, sample);
DB::copyData(row_input, row_output);
}
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}
return 0;
}