Template format implementation

This commit is contained in:
Alexander Tokmakov 2019-02-10 18:42:56 +03:00 committed by Alexander Tokmakov
parent ea9b36e531
commit 12e08417d6
5 changed files with 236 additions and 1 deletions

View File

@ -434,6 +434,7 @@ namespace ErrorCodes
extern const int BAD_QUERY_PARAMETER = 457; extern const int BAD_QUERY_PARAMETER = 457;
extern const int CANNOT_UNLINK = 458; extern const int CANNOT_UNLINK = 458;
extern const int CANNOT_SET_THREAD_PRIORITY = 459; extern const int CANNOT_SET_THREAD_PRIORITY = 459;
extern const int INVALID_TEMPLATE_FORMAT = 460;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;

View File

@ -29,7 +29,7 @@ public:
String getContentType() const override { return row_output->getContentType(); } String getContentType() const override { return row_output->getContentType(); }
private: protected:
RowOutputStreamPtr row_output; RowOutputStreamPtr row_output;
Block header; Block header;
bool first_row = true; bool first_row = true;

View File

@ -197,6 +197,7 @@ void registerInputFormatParquet(FormatFactory & factory);
void registerOutputFormatParquet(FormatFactory & factory); void registerOutputFormatParquet(FormatFactory & factory);
void registerInputFormatProtobuf(FormatFactory & factory); void registerInputFormatProtobuf(FormatFactory & factory);
void registerOutputFormatProtobuf(FormatFactory & factory); void registerOutputFormatProtobuf(FormatFactory & factory);
void registerOutputFormatTemplate(FormatFactory &factory);
void registerInputFormatProcessorNative(FormatFactory & factory); void registerInputFormatProcessorNative(FormatFactory & factory);
void registerOutputFormatProcessorNative(FormatFactory & factory); void registerOutputFormatProcessorNative(FormatFactory & factory);
@ -270,6 +271,7 @@ FormatFactory::FormatFactory()
registerInputFormatCapnProto(*this); registerInputFormatCapnProto(*this);
registerInputFormatParquet(*this); registerInputFormatParquet(*this);
registerOutputFormatParquet(*this); registerOutputFormatParquet(*this);
registerOutputFormatTemplate(*this);
registerOutputFormatMySQLWire(*this); registerOutputFormatMySQLWire(*this);

View File

@ -0,0 +1,180 @@
#include <Formats/TemplateRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
namespace DB {
namespace ErrorCodes {
extern const int INVALID_TEMPLATE_FORMAT;
}
TemplateRowOutputStream::TemplateRowOutputStream(WriteBuffer &ostr_, const Block &sample,
const FormatSettings &settings_, const String& format_template)
: ostr(ostr_), settings(settings_)
{
parseFormatString(format_template, sample);
}
void TemplateRowOutputStream::parseFormatString(const String & s, const Block & sample)
{
enum ParserState
{
Delimiter,
Column,
Format
};
const char * pos = s.c_str();
const char * token_begin = pos;
ParserState state = Delimiter;
delimiters.emplace_back();
for (; *pos; ++pos)
{
switch (state)
{
case Delimiter:
if (*pos == '$')
{
delimiters.back().append(token_begin, pos - token_begin);
++pos;
if (*pos == '{')
{
token_begin = pos + 1;
state = Column;
}
else if (*pos == '$')
{
token_begin = pos;
}
else
{
throw Exception("invalid template: pos " + std::to_string(pos - s.c_str()) +
": expected '{' or '$' after '$'", ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
}
break;
case Column:
if (*pos == ':')
{
size_t column_idx = sample.getPositionByName(String(token_begin, pos - token_begin));
format_idx_to_column_idx.push_back(column_idx);
token_begin = pos + 1;
state = Format;
}
else if (*pos == '}')
{
size_t column_idx = sample.getPositionByName(String(token_begin, pos - token_begin));
format_idx_to_column_idx.push_back(column_idx);
formats.push_back(ColumnFormat::Default);
delimiters.emplace_back();
token_begin = pos + 1;
state = Delimiter;
}
break;
case Format:
if (*pos == '}')
{
formats.push_back(stringToFormat(String(token_begin, pos - token_begin)));
token_begin = pos + 1;
delimiters.emplace_back();
state = Delimiter;
}
}
}
if (state != Delimiter)
throw Exception("invalid template: check parentheses balance", ErrorCodes::INVALID_TEMPLATE_FORMAT);
if (delimiters.size() == 1)
throw Exception("invalid template: no columns specified", ErrorCodes::INVALID_TEMPLATE_FORMAT);
delimiters.back().append(token_begin, pos - token_begin);
}
TemplateRowOutputStream::ColumnFormat TemplateRowOutputStream::stringToFormat(const String & format)
{
if (format.empty())
return ColumnFormat::Default;
else if (format == "Escaped")
return ColumnFormat::Escaped;
else if (format == "Quoted")
return ColumnFormat::Quoted;
else if (format == "JSON")
return ColumnFormat::Json;
else if (format == "XML")
return ColumnFormat::Xml;
else if (format == "Raw")
return ColumnFormat::Raw;
else
throw Exception("invalid template: unknown field format " + format, ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
void TemplateRowOutputStream::flush()
{
ostr.next();
}
void TemplateRowOutputStream::serializeField(const ColumnWithTypeAndName & col, size_t row_num, ColumnFormat format)
{
switch (format)
{
case ColumnFormat::Default:
case ColumnFormat::Escaped:
col.type->serializeAsTextEscaped(*col.column, row_num, ostr, settings);
break;
case ColumnFormat::Quoted:
col.type->serializeAsTextQuoted(*col.column, row_num, ostr, settings);
break;
case ColumnFormat::Json:
col.type->serializeAsTextJSON(*col.column, row_num, ostr, settings);
break;
case ColumnFormat::Xml:
col.type->serializeAsTextXML(*col.column, row_num, ostr, settings);
break;
case ColumnFormat::Raw:
col.type->serializeAsText(*col.column, row_num, ostr, settings);
break;
default:
__builtin_unreachable();
}
}
void TemplateRowOutputStream::write(const Block & block, size_t row_num)
{
size_t columns = format_idx_to_column_idx.size();
for (size_t i = 0; i < columns; ++i)
{
writeString(delimiters[i], ostr);
size_t col_idx = format_idx_to_column_idx[i];
const ColumnWithTypeAndName & col = block.getByPosition(col_idx);
serializeField(col, row_num, formats[i]);
}
writeString(delimiters[columns], ostr);
}
void TemplateBlockOutputStream::write(const Block & block)
{
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
row_output->write(block, i);
}
void registerOutputFormatTemplate(FormatFactory &factory)
{
factory.registerOutputFormat("Template", [](
WriteBuffer &buf,
const Block &sample,
const Context & context,
const FormatSettings &settings) {
auto format_template = context.getSettingsRef().format_schema.toString();
return std::make_shared<TemplateBlockOutputStream>(
std::make_shared<TemplateRowOutputStream>(buf, sample, settings, format_template), sample);
});
}
}

View File

@ -0,0 +1,52 @@
#pragma once
#include <Core/Block.h>
#include <Formats/FormatSettings.h>
#include <Formats/IRowOutputStream.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB
{
class TemplateRowOutputStream : public IRowOutputStream
{
public:
enum class ColumnFormat
{
Default,
Escaped,
Quoted,
Json,
Xml,
Raw
};
TemplateRowOutputStream(WriteBuffer & ostr_, const Block & sample, const FormatSettings & settings_, const String & format_template);
void write(const Block & block, size_t row_num) override;
void writeField(const IColumn &, const IDataType &, size_t) override {};
void flush() override;
private:
ColumnFormat stringToFormat(const String & format);
void parseFormatString(const String & s, const Block & sample);
void serializeField(const ColumnWithTypeAndName & col, size_t row_num, ColumnFormat format);
private:
WriteBuffer & ostr;
const FormatSettings settings;
std::vector<String> delimiters;
std::vector<ColumnFormat> formats;
std::vector<size_t> format_idx_to_column_idx;
};
class TemplateBlockOutputStream : public BlockOutputStreamFromRowOutputStream
{
public:
TemplateBlockOutputStream(RowOutputStreamPtr row_output_, const Block & header_)
: BlockOutputStreamFromRowOutputStream(row_output_, header_) {};
void write(const Block & block) override;
};
}