TemplateRowInputStream

This commit is contained in:
Alexander Tokmakov 2019-04-08 00:30:54 +03:00 committed by Alexander Tokmakov
parent 05d6e23373
commit 79015898cf
5 changed files with 295 additions and 112 deletions

View File

@ -49,6 +49,9 @@ static FormatSettings getInputFormatSetting(const Settings & settings)
format_settings.date_time_input_format = settings.date_time_input_format;
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
format_settings.template_settings.format = settings.format_schema;
format_settings.template_settings.row_format = settings.format_schema_rows;
format_settings.template_settings.row_between_delimiter = settings.format_schema_rows_between_delimiter;
return format_settings;
}
@ -200,6 +203,7 @@ void registerInputFormatParquet(FormatFactory & factory);
void registerOutputFormatParquet(FormatFactory & factory);
void registerInputFormatProtobuf(FormatFactory & factory);
void registerOutputFormatProtobuf(FormatFactory & factory);
void registerInputFormatTemplate(FormatFactory & factory);
void registerOutputFormatTemplate(FormatFactory &factory);
void registerInputFormatProcessorNative(FormatFactory & factory);
@ -274,6 +278,7 @@ FormatFactory::FormatFactory()
registerInputFormatCapnProto(*this);
registerInputFormatParquet(*this);
registerOutputFormatParquet(*this);
registerInputFormatTemplate(*this);
registerOutputFormatTemplate(*this);
registerOutputFormatMySQLWire(*this);

View File

@ -13,12 +13,110 @@ namespace ErrorCodes
extern const int INVALID_TEMPLATE_FORMAT;
}
ParsedTemplateFormat::ParsedTemplateFormat(const String & format_string, const ColumnIdxGetter & idxByName)
{
enum ParserState
{
Delimiter,
Column,
Format
};
const char * pos = format_string.c_str();
const char * token_begin = pos;
ParserState state = Delimiter;
delimiters.emplace_back();
for (; *pos; ++pos)
{
switch (state)
{
case Delimiter:
if (*pos == '$')
{
delimiters.back().append(token_begin, pos - token_begin);
++pos;
if (*pos == '{')
{
token_begin = pos + 1;
state = Column;
}
else if (*pos == '$')
{
token_begin = pos;
}
else
{
throw Exception("invalid template: pos " + std::to_string(pos - format_string.c_str()) +
": expected '{' or '$' after '$'", ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
}
break;
case Column:
if (*pos == ':')
{
size_t column_idx = idxByName(String(token_begin, pos - token_begin));
format_idx_to_column_idx.push_back(column_idx);
token_begin = pos + 1;
state = Format;
}
else if (*pos == '}')
{
size_t column_idx = idxByName(String(token_begin, pos - token_begin));
format_idx_to_column_idx.push_back(column_idx);
formats.push_back(ColumnFormat::Default);
delimiters.emplace_back();
token_begin = pos + 1;
state = Delimiter;
}
break;
case Format:
if (*pos == '}')
{
formats.push_back(stringToFormat(String(token_begin, pos - token_begin)));
token_begin = pos + 1;
delimiters.emplace_back();
state = Delimiter;
}
}
}
if (state != Delimiter)
throw Exception("invalid template: check parentheses balance", ErrorCodes::INVALID_TEMPLATE_FORMAT);
delimiters.back().append(token_begin, pos - token_begin);
}
ParsedTemplateFormat::ColumnFormat ParsedTemplateFormat::stringToFormat(const String & col_format)
{
if (col_format.empty())
return ColumnFormat::Default;
else if (col_format == "Escaped")
return ColumnFormat::Escaped;
else if (col_format == "Quoted")
return ColumnFormat::Quoted;
else if (col_format == "JSON")
return ColumnFormat::Json;
else if (col_format == "XML")
return ColumnFormat::Xml;
else if (col_format == "Raw")
return ColumnFormat::Raw;
else
throw Exception("invalid template: unknown field format " + col_format, ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
size_t ParsedTemplateFormat::columnsCount() const
{
return format_idx_to_column_idx.size();
}
TemplateBlockOutputStream::TemplateBlockOutputStream(WriteBuffer & ostr_, const Block & sample, const FormatSettings & settings_)
: ostr(ostr_), header(sample), settings(settings_)
{
static const String default_format("${result}");
const String & format_str = settings.template_settings.format.empty() ? default_format : settings.template_settings.format;
format = parseFormatString(format_str, [&](const String & partName)
format = ParsedTemplateFormat(format_str, [&](const String & partName)
{
return static_cast<size_t>(stringToOutputPart(partName));
});
@ -36,7 +134,7 @@ TemplateBlockOutputStream::TemplateBlockOutputStream(WriteBuffer & ostr_, const
case OutputPart::ExtremesMax:
if (format.formats[i] != ColumnFormat::Default)
throw Exception("invalid template: wrong serialization type for result, totals, min or max",
ErrorCodes::INVALID_TEMPLATE_FORMAT);
ErrorCodes::INVALID_TEMPLATE_FORMAT);
break;
default:
break;
@ -46,7 +144,7 @@ TemplateBlockOutputStream::TemplateBlockOutputStream(WriteBuffer & ostr_, const
if (resultIdx != 0)
throw Exception("invalid template: ${result} must be the first output part", ErrorCodes::INVALID_TEMPLATE_FORMAT);
row_format = parseFormatString(settings.template_settings.row_format, [&](const String & colName)
row_format = ParsedTemplateFormat(settings.template_settings.row_format, [&](const String & colName)
{
return header.getPositionByName(colName);
});
@ -55,100 +153,6 @@ TemplateBlockOutputStream::TemplateBlockOutputStream(WriteBuffer & ostr_, const
throw Exception("invalid template: no columns specified", ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
TemplateBlockOutputStream::ParsedFormat TemplateBlockOutputStream::parseFormatString(const String & s, const ColumnIdxGetter & idxByName)
{
enum ParserState
{
Delimiter,
Column,
Format
};
ParsedFormat parsed_format;
const char * pos = s.c_str();
const char * token_begin = pos;
ParserState state = Delimiter;
parsed_format.delimiters.emplace_back();
for (; *pos; ++pos)
{
switch (state)
{
case Delimiter:
if (*pos == '$')
{
parsed_format.delimiters.back().append(token_begin, pos - token_begin);
++pos;
if (*pos == '{')
{
token_begin = pos + 1;
state = Column;
}
else if (*pos == '$')
{
token_begin = pos;
}
else
{
throw Exception("invalid template: pos " + std::to_string(pos - s.c_str()) +
": expected '{' or '$' after '$'", ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
}
break;
case Column:
if (*pos == ':')
{
size_t column_idx = idxByName(String(token_begin, pos - token_begin));
parsed_format.format_idx_to_column_idx.push_back(column_idx);
token_begin = pos + 1;
state = Format;
}
else if (*pos == '}')
{
size_t column_idx = idxByName(String(token_begin, pos - token_begin));
parsed_format.format_idx_to_column_idx.push_back(column_idx);
parsed_format.formats.push_back(ColumnFormat::Default);
parsed_format.delimiters.emplace_back();
token_begin = pos + 1;
state = Delimiter;
}
break;
case Format:
if (*pos == '}')
{
parsed_format.formats.push_back(stringToFormat(String(token_begin, pos - token_begin)));
token_begin = pos + 1;
parsed_format.delimiters.emplace_back();
state = Delimiter;
}
}
}
if (state != Delimiter)
throw Exception("invalid template: check parentheses balance", ErrorCodes::INVALID_TEMPLATE_FORMAT);
parsed_format.delimiters.back().append(token_begin, pos - token_begin);
return parsed_format;
}
TemplateBlockOutputStream::ColumnFormat TemplateBlockOutputStream::stringToFormat(const String & col_format)
{
if (col_format.empty())
return ColumnFormat::Default;
else if (col_format == "Escaped")
return ColumnFormat::Escaped;
else if (col_format == "Quoted")
return ColumnFormat::Quoted;
else if (col_format == "JSON")
return ColumnFormat::Json;
else if (col_format == "XML")
return ColumnFormat::Xml;
else if (col_format == "Raw")
return ColumnFormat::Raw;
else
throw Exception("invalid template: unknown field format " + col_format, ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
TemplateBlockOutputStream::OutputPart TemplateBlockOutputStream::stringToOutputPart(const String & part)
{
if (part == "result")

View File

@ -10,9 +10,8 @@
namespace DB
{
class TemplateBlockOutputStream : public IBlockOutputStream
struct ParsedTemplateFormat
{
public:
enum class ColumnFormat
{
Default,
@ -22,7 +21,22 @@ public:
Xml,
Raw
};
std::vector<String> delimiters;
std::vector<ColumnFormat> formats;
std::vector<size_t> format_idx_to_column_idx;
typedef std::function<size_t(const String &)> ColumnIdxGetter;
ParsedTemplateFormat() = default;
ParsedTemplateFormat(const String & format_string, const ColumnIdxGetter & idxByName);
static ColumnFormat stringToFormat(const String & format);
size_t columnsCount() const;
};
class TemplateBlockOutputStream : public IBlockOutputStream
{
using ColumnFormat = ParsedTemplateFormat::ColumnFormat;
public:
TemplateBlockOutputStream(WriteBuffer & ostr_, const Block & sample, const FormatSettings & settings_);
Block getHeader() const override { return header; }
@ -51,18 +65,7 @@ private:
BytesRead
};
struct ParsedFormat
{
std::vector<String> delimiters;
std::vector<ColumnFormat> formats;
std::vector<size_t> format_idx_to_column_idx;
};
typedef std::function<size_t(const String &)> ColumnIdxGetter;
ColumnFormat stringToFormat(const String & format);
OutputPart stringToOutputPart(const String & part);
ParsedFormat parseFormatString(const String & s, const ColumnIdxGetter & idxByName);
void writeRow(const Block & block, size_t row_num);
void serializeField(const IColumn & column, const IDataType & type, size_t row_num, ColumnFormat format);
template <typename U, typename V> void writeValue(U value, ColumnFormat col_format);
@ -72,8 +75,8 @@ private:
Block header;
const FormatSettings settings;
ParsedFormat format;
ParsedFormat row_format;
ParsedTemplateFormat format;
ParsedTemplateFormat row_format;
size_t rows_before_limit;
Block totals;

View File

@ -0,0 +1,126 @@
#include <Formats/TemplateRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_TEMPLATE_FORMAT;
}
TemplateRowInputStream::TemplateRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & settings_, bool ignore_spaces_)
: istr(istr_), header(header_), types(header.getDataTypes()), settings(settings_), ignore_spaces(ignore_spaces_)
{
static const String default_format("${data}");
const String & format_str = settings.template_settings.format.empty() ? default_format : settings.template_settings.format;
format = ParsedTemplateFormat(format_str, [&](const String & partName) {
if (partName == "data")
return 0;
throw Exception("invalid template format: unknown input part " + partName, ErrorCodes::INVALID_TEMPLATE_FORMAT);
});
if (format.formats.size() != 1 || format.formats[0] != ColumnFormat::Default)
throw Exception("invalid template format: format_schema must be \"prefix ${data} suffix\"", ErrorCodes::INVALID_TEMPLATE_FORMAT);
row_format = ParsedTemplateFormat(settings.template_settings.row_format, [&](const String & colName) {
return header.getPositionByName(colName);
});
std::vector<UInt8> column_in_format(header.columns(), false);
for (size_t i = 0; i < row_format.columnsCount(); ++i)
{
size_t col_idx = row_format.format_idx_to_column_idx[i];
if (column_in_format[col_idx])
throw Exception("invalid template format: duplicate column " + header.getColumnsWithTypeAndName()[col_idx].name,
ErrorCodes::INVALID_TEMPLATE_FORMAT);
column_in_format[col_idx] = true;
if (row_format.formats[i] == ColumnFormat::Xml || row_format.formats[i] == ColumnFormat::Raw)
throw Exception("invalid template format: XML and Raw deserialization is not supported", ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
}
void TemplateRowInputStream::readPrefix()
{
skipSpaces();
assertString(format.delimiters.front(), istr);
}
bool TemplateRowInputStream::read(MutableColumns & columns, RowReadExtension & extra)
{
skipSpaces();
// TODO check for suffix, not for EOF
if (istr.eof())
return false;
if (row_count)
{
assertString(settings.template_settings.row_between_delimiter, istr);
}
extra.read_columns.assign(columns.size(), false);
for (size_t i = 0; i < row_format.columnsCount(); ++i)
{
skipSpaces();
assertString(row_format.delimiters[i], istr);
size_t col_idx = row_format.format_idx_to_column_idx[i];
skipSpaces();
deserializeField(*types[col_idx], *columns[col_idx], row_format.formats[i]);
extra.read_columns[col_idx] = true;
}
skipSpaces();
assertString(row_format.delimiters.back(), istr);
for (size_t i = 0; i < columns.size(); ++i)
if (!extra.read_columns[i])
header.getByPosition(i).type->insertDefaultInto(*columns[i]);
++row_count;
return true;
}
void TemplateRowInputStream::deserializeField(const IDataType & type, IColumn & column, ColumnFormat col_format)
{
switch (col_format)
{
case ColumnFormat::Default:
case ColumnFormat::Escaped:
type.deserializeAsTextEscaped(column, istr, settings);
break;
case ColumnFormat::Quoted:
type.deserializeAsTextQuoted(column, istr, settings);
break;
case ColumnFormat::Json:
type.deserializeAsTextJSON(column, istr, settings);
break;
default:
break;
}
}
void registerInputFormatTemplate(FormatFactory & factory)
{
for (bool ignore_spaces : {false, true})
{
factory.registerInputFormat(ignore_spaces ? "TemplateIgnoreSpaces" : "Template", [=](
ReadBuffer & buf,
const Block & sample,
const Context &,
UInt64 max_block_size,
const FormatSettings & settings) {
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TemplateRowInputStream>(buf, sample, settings, ignore_spaces),
sample, max_block_size, settings);
});
}
}
}

View File

@ -0,0 +1,45 @@
#pragma once
#include <Core/Block.h>
#include <Formats/IRowInputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/TemplateBlockOutputStream.h>
#include <IO/ReadHelpers.h>
namespace DB
{
class TemplateRowInputStream : public IRowInputStream
{
using ColumnFormat = ParsedTemplateFormat::ColumnFormat;
public:
TemplateRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & settings_, bool ignore_spaces_);
bool read(MutableColumns & columns, RowReadExtension & extra) override;
void readPrefix() override;
// TODO
//bool allowSyncAfterError() const override;
//void syncAfterError() override;
//String getDiagnosticInfo() override;
private:
void deserializeField(const IDataType & type, IColumn & column, ColumnFormat col_format);
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(istr); }
private:
ReadBuffer & istr;
Block header;
DataTypes types;
FormatSettings settings;
ParsedTemplateFormat format;
ParsedTemplateFormat row_format;
const bool ignore_spaces;
size_t row_count = 0;
};
}