From 79015898cf1a046cfe64894a4496465c540a511b Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 8 Apr 2019 00:30:54 +0300 Subject: [PATCH] TemplateRowInputStream --- dbms/src/Formats/FormatFactory.cpp | 5 + .../src/Formats/TemplateBlockOutputStream.cpp | 198 +++++++++--------- dbms/src/Formats/TemplateBlockOutputStream.h | 33 +-- dbms/src/Formats/TemplateRowInputStream.cpp | 126 +++++++++++ dbms/src/Formats/TemplateRowInputStream.h | 45 ++++ 5 files changed, 295 insertions(+), 112 deletions(-) create mode 100644 dbms/src/Formats/TemplateRowInputStream.cpp create mode 100644 dbms/src/Formats/TemplateRowInputStream.h diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index 4ed5a59a94c..fa78089b259 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -49,6 +49,9 @@ static FormatSettings getInputFormatSetting(const Settings & settings) format_settings.date_time_input_format = settings.date_time_input_format; format_settings.input_allow_errors_num = settings.input_format_allow_errors_num; format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio; + format_settings.template_settings.format = settings.format_schema; + format_settings.template_settings.row_format = settings.format_schema_rows; + format_settings.template_settings.row_between_delimiter = settings.format_schema_rows_between_delimiter; return format_settings; } @@ -200,6 +203,7 @@ void registerInputFormatParquet(FormatFactory & factory); void registerOutputFormatParquet(FormatFactory & factory); void registerInputFormatProtobuf(FormatFactory & factory); void registerOutputFormatProtobuf(FormatFactory & factory); +void registerInputFormatTemplate(FormatFactory & factory); void registerOutputFormatTemplate(FormatFactory &factory); void registerInputFormatProcessorNative(FormatFactory & factory); @@ -274,6 +278,7 @@ FormatFactory::FormatFactory() registerInputFormatCapnProto(*this); registerInputFormatParquet(*this); registerOutputFormatParquet(*this); + registerInputFormatTemplate(*this); registerOutputFormatTemplate(*this); registerOutputFormatMySQLWire(*this); diff --git a/dbms/src/Formats/TemplateBlockOutputStream.cpp b/dbms/src/Formats/TemplateBlockOutputStream.cpp index 1950655dce6..c43b880e349 100644 --- a/dbms/src/Formats/TemplateBlockOutputStream.cpp +++ b/dbms/src/Formats/TemplateBlockOutputStream.cpp @@ -13,12 +13,110 @@ namespace ErrorCodes extern const int INVALID_TEMPLATE_FORMAT; } +ParsedTemplateFormat::ParsedTemplateFormat(const String & format_string, const ColumnIdxGetter & idxByName) +{ + enum ParserState + { + Delimiter, + Column, + Format + }; + const char * pos = format_string.c_str(); + const char * token_begin = pos; + ParserState state = Delimiter; + delimiters.emplace_back(); + for (; *pos; ++pos) + { + switch (state) + { + case Delimiter: + if (*pos == '$') + { + delimiters.back().append(token_begin, pos - token_begin); + ++pos; + if (*pos == '{') + { + token_begin = pos + 1; + state = Column; + } + else if (*pos == '$') + { + token_begin = pos; + } + else + { + throw Exception("invalid template: pos " + std::to_string(pos - format_string.c_str()) + + ": expected '{' or '$' after '$'", ErrorCodes::INVALID_TEMPLATE_FORMAT); + } + } + break; + + case Column: + if (*pos == ':') + { + size_t column_idx = idxByName(String(token_begin, pos - token_begin)); + format_idx_to_column_idx.push_back(column_idx); + token_begin = pos + 1; + state = Format; + } + else if (*pos == '}') + { + size_t column_idx = idxByName(String(token_begin, pos - token_begin)); + format_idx_to_column_idx.push_back(column_idx); + formats.push_back(ColumnFormat::Default); + delimiters.emplace_back(); + token_begin = pos + 1; + state = Delimiter; + } + break; + + case Format: + if (*pos == '}') + { + formats.push_back(stringToFormat(String(token_begin, pos - token_begin))); + token_begin = pos + 1; + delimiters.emplace_back(); + state = Delimiter; + } + } + } + if (state != Delimiter) + throw Exception("invalid template: check parentheses balance", ErrorCodes::INVALID_TEMPLATE_FORMAT); + delimiters.back().append(token_begin, pos - token_begin); +} + + +ParsedTemplateFormat::ColumnFormat ParsedTemplateFormat::stringToFormat(const String & col_format) +{ + if (col_format.empty()) + return ColumnFormat::Default; + else if (col_format == "Escaped") + return ColumnFormat::Escaped; + else if (col_format == "Quoted") + return ColumnFormat::Quoted; + else if (col_format == "JSON") + return ColumnFormat::Json; + else if (col_format == "XML") + return ColumnFormat::Xml; + else if (col_format == "Raw") + return ColumnFormat::Raw; + else + throw Exception("invalid template: unknown field format " + col_format, ErrorCodes::INVALID_TEMPLATE_FORMAT); +} + +size_t ParsedTemplateFormat::columnsCount() const +{ + return format_idx_to_column_idx.size(); +} + + + TemplateBlockOutputStream::TemplateBlockOutputStream(WriteBuffer & ostr_, const Block & sample, const FormatSettings & settings_) : ostr(ostr_), header(sample), settings(settings_) { static const String default_format("${result}"); const String & format_str = settings.template_settings.format.empty() ? default_format : settings.template_settings.format; - format = parseFormatString(format_str, [&](const String & partName) + format = ParsedTemplateFormat(format_str, [&](const String & partName) { return static_cast(stringToOutputPart(partName)); }); @@ -36,7 +134,7 @@ TemplateBlockOutputStream::TemplateBlockOutputStream(WriteBuffer & ostr_, const case OutputPart::ExtremesMax: if (format.formats[i] != ColumnFormat::Default) throw Exception("invalid template: wrong serialization type for result, totals, min or max", - ErrorCodes::INVALID_TEMPLATE_FORMAT); + ErrorCodes::INVALID_TEMPLATE_FORMAT); break; default: break; @@ -46,7 +144,7 @@ TemplateBlockOutputStream::TemplateBlockOutputStream(WriteBuffer & ostr_, const if (resultIdx != 0) throw Exception("invalid template: ${result} must be the first output part", ErrorCodes::INVALID_TEMPLATE_FORMAT); - row_format = parseFormatString(settings.template_settings.row_format, [&](const String & colName) + row_format = ParsedTemplateFormat(settings.template_settings.row_format, [&](const String & colName) { return header.getPositionByName(colName); }); @@ -55,100 +153,6 @@ TemplateBlockOutputStream::TemplateBlockOutputStream(WriteBuffer & ostr_, const throw Exception("invalid template: no columns specified", ErrorCodes::INVALID_TEMPLATE_FORMAT); } - -TemplateBlockOutputStream::ParsedFormat TemplateBlockOutputStream::parseFormatString(const String & s, const ColumnIdxGetter & idxByName) -{ - enum ParserState - { - Delimiter, - Column, - Format - }; - ParsedFormat parsed_format; - const char * pos = s.c_str(); - const char * token_begin = pos; - ParserState state = Delimiter; - parsed_format.delimiters.emplace_back(); - for (; *pos; ++pos) - { - switch (state) - { - case Delimiter: - if (*pos == '$') - { - parsed_format.delimiters.back().append(token_begin, pos - token_begin); - ++pos; - if (*pos == '{') - { - token_begin = pos + 1; - state = Column; - } - else if (*pos == '$') - { - token_begin = pos; - } - else - { - throw Exception("invalid template: pos " + std::to_string(pos - s.c_str()) + - ": expected '{' or '$' after '$'", ErrorCodes::INVALID_TEMPLATE_FORMAT); - } - } - break; - - case Column: - if (*pos == ':') - { - size_t column_idx = idxByName(String(token_begin, pos - token_begin)); - parsed_format.format_idx_to_column_idx.push_back(column_idx); - token_begin = pos + 1; - state = Format; - } - else if (*pos == '}') - { - size_t column_idx = idxByName(String(token_begin, pos - token_begin)); - parsed_format.format_idx_to_column_idx.push_back(column_idx); - parsed_format.formats.push_back(ColumnFormat::Default); - parsed_format.delimiters.emplace_back(); - token_begin = pos + 1; - state = Delimiter; - } - break; - - case Format: - if (*pos == '}') - { - parsed_format.formats.push_back(stringToFormat(String(token_begin, pos - token_begin))); - token_begin = pos + 1; - parsed_format.delimiters.emplace_back(); - state = Delimiter; - } - } - } - if (state != Delimiter) - throw Exception("invalid template: check parentheses balance", ErrorCodes::INVALID_TEMPLATE_FORMAT); - parsed_format.delimiters.back().append(token_begin, pos - token_begin); - return parsed_format; -} - - -TemplateBlockOutputStream::ColumnFormat TemplateBlockOutputStream::stringToFormat(const String & col_format) -{ - if (col_format.empty()) - return ColumnFormat::Default; - else if (col_format == "Escaped") - return ColumnFormat::Escaped; - else if (col_format == "Quoted") - return ColumnFormat::Quoted; - else if (col_format == "JSON") - return ColumnFormat::Json; - else if (col_format == "XML") - return ColumnFormat::Xml; - else if (col_format == "Raw") - return ColumnFormat::Raw; - else - throw Exception("invalid template: unknown field format " + col_format, ErrorCodes::INVALID_TEMPLATE_FORMAT); -} - TemplateBlockOutputStream::OutputPart TemplateBlockOutputStream::stringToOutputPart(const String & part) { if (part == "result") diff --git a/dbms/src/Formats/TemplateBlockOutputStream.h b/dbms/src/Formats/TemplateBlockOutputStream.h index e4a5f8a2a2a..29b79979ff1 100644 --- a/dbms/src/Formats/TemplateBlockOutputStream.h +++ b/dbms/src/Formats/TemplateBlockOutputStream.h @@ -10,9 +10,8 @@ namespace DB { -class TemplateBlockOutputStream : public IBlockOutputStream +struct ParsedTemplateFormat { -public: enum class ColumnFormat { Default, @@ -22,7 +21,22 @@ public: Xml, Raw }; + std::vector delimiters; + std::vector formats; + std::vector format_idx_to_column_idx; + typedef std::function ColumnIdxGetter; + + ParsedTemplateFormat() = default; + ParsedTemplateFormat(const String & format_string, const ColumnIdxGetter & idxByName); + static ColumnFormat stringToFormat(const String & format); + size_t columnsCount() const; +}; + +class TemplateBlockOutputStream : public IBlockOutputStream +{ + using ColumnFormat = ParsedTemplateFormat::ColumnFormat; +public: TemplateBlockOutputStream(WriteBuffer & ostr_, const Block & sample, const FormatSettings & settings_); Block getHeader() const override { return header; } @@ -51,18 +65,7 @@ private: BytesRead }; - struct ParsedFormat - { - std::vector delimiters; - std::vector formats; - std::vector format_idx_to_column_idx; - }; - - typedef std::function ColumnIdxGetter; - - ColumnFormat stringToFormat(const String & format); OutputPart stringToOutputPart(const String & part); - ParsedFormat parseFormatString(const String & s, const ColumnIdxGetter & idxByName); void writeRow(const Block & block, size_t row_num); void serializeField(const IColumn & column, const IDataType & type, size_t row_num, ColumnFormat format); template void writeValue(U value, ColumnFormat col_format); @@ -72,8 +75,8 @@ private: Block header; const FormatSettings settings; - ParsedFormat format; - ParsedFormat row_format; + ParsedTemplateFormat format; + ParsedTemplateFormat row_format; size_t rows_before_limit; Block totals; diff --git a/dbms/src/Formats/TemplateRowInputStream.cpp b/dbms/src/Formats/TemplateRowInputStream.cpp new file mode 100644 index 00000000000..8e178cfb92b --- /dev/null +++ b/dbms/src/Formats/TemplateRowInputStream.cpp @@ -0,0 +1,126 @@ +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int INVALID_TEMPLATE_FORMAT; +} + + +TemplateRowInputStream::TemplateRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & settings_, bool ignore_spaces_) + : istr(istr_), header(header_), types(header.getDataTypes()), settings(settings_), ignore_spaces(ignore_spaces_) +{ + static const String default_format("${data}"); + const String & format_str = settings.template_settings.format.empty() ? default_format : settings.template_settings.format; + format = ParsedTemplateFormat(format_str, [&](const String & partName) { + if (partName == "data") + return 0; + throw Exception("invalid template format: unknown input part " + partName, ErrorCodes::INVALID_TEMPLATE_FORMAT); + }); + + if (format.formats.size() != 1 || format.formats[0] != ColumnFormat::Default) + throw Exception("invalid template format: format_schema must be \"prefix ${data} suffix\"", ErrorCodes::INVALID_TEMPLATE_FORMAT); + + + row_format = ParsedTemplateFormat(settings.template_settings.row_format, [&](const String & colName) { + return header.getPositionByName(colName); + }); + + std::vector column_in_format(header.columns(), false); + for (size_t i = 0; i < row_format.columnsCount(); ++i) + { + size_t col_idx = row_format.format_idx_to_column_idx[i]; + if (column_in_format[col_idx]) + throw Exception("invalid template format: duplicate column " + header.getColumnsWithTypeAndName()[col_idx].name, + ErrorCodes::INVALID_TEMPLATE_FORMAT); + column_in_format[col_idx] = true; + + if (row_format.formats[i] == ColumnFormat::Xml || row_format.formats[i] == ColumnFormat::Raw) + throw Exception("invalid template format: XML and Raw deserialization is not supported", ErrorCodes::INVALID_TEMPLATE_FORMAT); + } +} + +void TemplateRowInputStream::readPrefix() +{ + skipSpaces(); + assertString(format.delimiters.front(), istr); +} + +bool TemplateRowInputStream::read(MutableColumns & columns, RowReadExtension & extra) +{ + skipSpaces(); + + // TODO check for suffix, not for EOF + if (istr.eof()) + return false; + + if (row_count) + { + assertString(settings.template_settings.row_between_delimiter, istr); + } + + extra.read_columns.assign(columns.size(), false); + + for (size_t i = 0; i < row_format.columnsCount(); ++i) + { + skipSpaces(); + assertString(row_format.delimiters[i], istr); + size_t col_idx = row_format.format_idx_to_column_idx[i]; + skipSpaces(); + deserializeField(*types[col_idx], *columns[col_idx], row_format.formats[i]); + extra.read_columns[col_idx] = true; + } + + skipSpaces(); + assertString(row_format.delimiters.back(), istr); + + for (size_t i = 0; i < columns.size(); ++i) + if (!extra.read_columns[i]) + header.getByPosition(i).type->insertDefaultInto(*columns[i]); + + ++row_count; + return true; +} + +void TemplateRowInputStream::deserializeField(const IDataType & type, IColumn & column, ColumnFormat col_format) +{ + switch (col_format) + { + case ColumnFormat::Default: + case ColumnFormat::Escaped: + type.deserializeAsTextEscaped(column, istr, settings); + break; + case ColumnFormat::Quoted: + type.deserializeAsTextQuoted(column, istr, settings); + break; + case ColumnFormat::Json: + type.deserializeAsTextJSON(column, istr, settings); + break; + default: + break; + } +} + + +void registerInputFormatTemplate(FormatFactory & factory) +{ + for (bool ignore_spaces : {false, true}) + { + factory.registerInputFormat(ignore_spaces ? "TemplateIgnoreSpaces" : "Template", [=]( + ReadBuffer & buf, + const Block & sample, + const Context &, + UInt64 max_block_size, + const FormatSettings & settings) { + return std::make_shared( + std::make_shared(buf, sample, settings, ignore_spaces), + sample, max_block_size, settings); + }); + } +} + +} diff --git a/dbms/src/Formats/TemplateRowInputStream.h b/dbms/src/Formats/TemplateRowInputStream.h new file mode 100644 index 00000000000..bab67d29d7b --- /dev/null +++ b/dbms/src/Formats/TemplateRowInputStream.h @@ -0,0 +1,45 @@ +#pragma once + +#include +#include +#include +#include +#include + + +namespace DB +{ + +class TemplateRowInputStream : public IRowInputStream +{ + using ColumnFormat = ParsedTemplateFormat::ColumnFormat; +public: + TemplateRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & settings_, bool ignore_spaces_); + + bool read(MutableColumns & columns, RowReadExtension & extra) override; + + void readPrefix() override; + + // TODO + //bool allowSyncAfterError() const override; + //void syncAfterError() override; + //String getDiagnosticInfo() override; + +private: + void deserializeField(const IDataType & type, IColumn & column, ColumnFormat col_format); + inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(istr); } + +private: + ReadBuffer & istr; + Block header; + DataTypes types; + + FormatSettings settings; + ParsedTemplateFormat format; + ParsedTemplateFormat row_format; + const bool ignore_spaces; + + size_t row_count = 0; +}; + +}