diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index a3b788c230f..158f5cbcf65 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -434,6 +434,7 @@ namespace ErrorCodes extern const int BAD_QUERY_PARAMETER = 457; extern const int CANNOT_UNLINK = 458; extern const int CANNOT_SET_THREAD_PRIORITY = 459; + extern const int INVALID_TEMPLATE_FORMAT = 460; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Formats/BlockOutputStreamFromRowOutputStream.h b/dbms/src/Formats/BlockOutputStreamFromRowOutputStream.h index ada924bb5b4..5cf20955bb8 100644 --- a/dbms/src/Formats/BlockOutputStreamFromRowOutputStream.h +++ b/dbms/src/Formats/BlockOutputStreamFromRowOutputStream.h @@ -29,7 +29,7 @@ public: String getContentType() const override { return row_output->getContentType(); } -private: +protected: RowOutputStreamPtr row_output; Block header; bool first_row = true; diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index 4fae140abee..d1364892811 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -197,6 +197,7 @@ void registerInputFormatParquet(FormatFactory & factory); void registerOutputFormatParquet(FormatFactory & factory); void registerInputFormatProtobuf(FormatFactory & factory); void registerOutputFormatProtobuf(FormatFactory & factory); +void registerOutputFormatTemplate(FormatFactory &factory); void registerInputFormatProcessorNative(FormatFactory & factory); void registerOutputFormatProcessorNative(FormatFactory & factory); @@ -270,6 +271,7 @@ FormatFactory::FormatFactory() registerInputFormatCapnProto(*this); registerInputFormatParquet(*this); registerOutputFormatParquet(*this); + registerOutputFormatTemplate(*this); registerOutputFormatMySQLWire(*this); diff --git a/dbms/src/Formats/TemplateRowOutputStream.cpp b/dbms/src/Formats/TemplateRowOutputStream.cpp new file mode 100644 index 00000000000..b2bf730d6f4 --- /dev/null +++ b/dbms/src/Formats/TemplateRowOutputStream.cpp @@ -0,0 +1,180 @@ +#include +#include +#include +#include + + +namespace DB { + + +namespace ErrorCodes { + extern const int INVALID_TEMPLATE_FORMAT; +} + +TemplateRowOutputStream::TemplateRowOutputStream(WriteBuffer &ostr_, const Block &sample, + const FormatSettings &settings_, const String& format_template) + : ostr(ostr_), settings(settings_) +{ + parseFormatString(format_template, sample); +} + + +void TemplateRowOutputStream::parseFormatString(const String & s, const Block & sample) +{ + enum ParserState + { + Delimiter, + Column, + Format + }; + const char * pos = s.c_str(); + const char * token_begin = pos; + ParserState state = Delimiter; + delimiters.emplace_back(); + for (; *pos; ++pos) + { + switch (state) + { + case Delimiter: + if (*pos == '$') + { + delimiters.back().append(token_begin, pos - token_begin); + ++pos; + if (*pos == '{') + { + token_begin = pos + 1; + state = Column; + } + else if (*pos == '$') + { + token_begin = pos; + } + else + { + throw Exception("invalid template: pos " + std::to_string(pos - s.c_str()) + + ": expected '{' or '$' after '$'", ErrorCodes::INVALID_TEMPLATE_FORMAT); + } + } + break; + + case Column: + if (*pos == ':') + { + size_t column_idx = sample.getPositionByName(String(token_begin, pos - token_begin)); + format_idx_to_column_idx.push_back(column_idx); + token_begin = pos + 1; + state = Format; + } + else if (*pos == '}') + { + size_t column_idx = sample.getPositionByName(String(token_begin, pos - token_begin)); + format_idx_to_column_idx.push_back(column_idx); + formats.push_back(ColumnFormat::Default); + delimiters.emplace_back(); + token_begin = pos + 1; + state = Delimiter; + } + break; + + case Format: + if (*pos == '}') + { + formats.push_back(stringToFormat(String(token_begin, pos - token_begin))); + token_begin = pos + 1; + delimiters.emplace_back(); + state = Delimiter; + } + } + } + if (state != Delimiter) + throw Exception("invalid template: check parentheses balance", ErrorCodes::INVALID_TEMPLATE_FORMAT); + if (delimiters.size() == 1) + throw Exception("invalid template: no columns specified", ErrorCodes::INVALID_TEMPLATE_FORMAT); + delimiters.back().append(token_begin, pos - token_begin); +} + + +TemplateRowOutputStream::ColumnFormat TemplateRowOutputStream::stringToFormat(const String & format) +{ + if (format.empty()) + return ColumnFormat::Default; + else if (format == "Escaped") + return ColumnFormat::Escaped; + else if (format == "Quoted") + return ColumnFormat::Quoted; + else if (format == "JSON") + return ColumnFormat::Json; + else if (format == "XML") + return ColumnFormat::Xml; + else if (format == "Raw") + return ColumnFormat::Raw; + else + throw Exception("invalid template: unknown field format " + format, ErrorCodes::INVALID_TEMPLATE_FORMAT); + +} + +void TemplateRowOutputStream::flush() +{ + ostr.next(); +} + +void TemplateRowOutputStream::serializeField(const ColumnWithTypeAndName & col, size_t row_num, ColumnFormat format) +{ + switch (format) + { + case ColumnFormat::Default: + case ColumnFormat::Escaped: + col.type->serializeAsTextEscaped(*col.column, row_num, ostr, settings); + break; + case ColumnFormat::Quoted: + col.type->serializeAsTextQuoted(*col.column, row_num, ostr, settings); + break; + case ColumnFormat::Json: + col.type->serializeAsTextJSON(*col.column, row_num, ostr, settings); + break; + case ColumnFormat::Xml: + col.type->serializeAsTextXML(*col.column, row_num, ostr, settings); + break; + case ColumnFormat::Raw: + col.type->serializeAsText(*col.column, row_num, ostr, settings); + break; + default: + __builtin_unreachable(); + } +} + +void TemplateRowOutputStream::write(const Block & block, size_t row_num) +{ + size_t columns = format_idx_to_column_idx.size(); + for (size_t i = 0; i < columns; ++i) + { + writeString(delimiters[i], ostr); + + size_t col_idx = format_idx_to_column_idx[i]; + const ColumnWithTypeAndName & col = block.getByPosition(col_idx); + serializeField(col, row_num, formats[i]); + } + writeString(delimiters[columns], ostr); +} + +void TemplateBlockOutputStream::write(const Block & block) +{ + size_t rows = block.rows(); + for (size_t i = 0; i < rows; ++i) + row_output->write(block, i); + +} + +void registerOutputFormatTemplate(FormatFactory &factory) +{ + factory.registerOutputFormat("Template", []( + WriteBuffer &buf, + const Block &sample, + const Context & context, + const FormatSettings &settings) { + auto format_template = context.getSettingsRef().format_schema.toString(); + return std::make_shared( + std::make_shared(buf, sample, settings, format_template), sample); + }); +} +} diff --git a/dbms/src/Formats/TemplateRowOutputStream.h b/dbms/src/Formats/TemplateRowOutputStream.h new file mode 100644 index 00000000000..ab7b60ee2ab --- /dev/null +++ b/dbms/src/Formats/TemplateRowOutputStream.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ + +class TemplateRowOutputStream : public IRowOutputStream +{ +public: + enum class ColumnFormat + { + Default, + Escaped, + Quoted, + Json, + Xml, + Raw + }; + + TemplateRowOutputStream(WriteBuffer & ostr_, const Block & sample, const FormatSettings & settings_, const String & format_template); + + void write(const Block & block, size_t row_num) override; + void writeField(const IColumn &, const IDataType &, size_t) override {}; + void flush() override; + +private: + ColumnFormat stringToFormat(const String & format); + void parseFormatString(const String & s, const Block & sample); + void serializeField(const ColumnWithTypeAndName & col, size_t row_num, ColumnFormat format); + +private: + WriteBuffer & ostr; + const FormatSettings settings; + std::vector delimiters; + std::vector formats; + std::vector format_idx_to_column_idx; +}; + +class TemplateBlockOutputStream : public BlockOutputStreamFromRowOutputStream +{ +public: + TemplateBlockOutputStream(RowOutputStreamPtr row_output_, const Block & header_) + : BlockOutputStreamFromRowOutputStream(row_output_, header_) {}; + void write(const Block & block) override; +}; + +}