Customizable prefix, suffix and row delimiter

This commit is contained in:
Alexander Tokmakov 2019-02-18 00:23:44 +03:00 committed by Alexander Tokmakov
parent 4f7720139a
commit 05d6e23373
5 changed files with 218 additions and 58 deletions

View File

@ -212,6 +212,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingMilliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.") \
M(SettingMilliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from streaming storages.") \
M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \
M(SettingString, format_schema_rows, "", "Row format string for Template format") \
M(SettingString, format_schema_rows_between_delimiter, "\n", "Delimiter between rows for Template format") \
M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \
M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \
M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout") \

View File

@ -65,6 +65,9 @@ static FormatSettings getOutputFormatSetting(const Settings & settings)
format_settings.pretty.max_rows = settings.output_format_pretty_max_rows;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.template_settings.format = settings.format_schema;
format_settings.template_settings.row_format = settings.format_schema_rows;
format_settings.template_settings.row_between_delimiter = settings.format_schema_rows_between_delimiter;
format_settings.write_statistics = settings.output_format_write_statistics;
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;

View File

@ -48,6 +48,15 @@ struct FormatSettings
Values values;
struct Template
{
String format;
String row_format;
String row_between_delimiter;
};
Template template_settings;
bool skip_unknown_fields = false;
bool with_names_use_header = false;
bool write_statistics = true;

View File

@ -2,6 +2,7 @@
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
@ -12,15 +13,50 @@ namespace ErrorCodes
extern const int INVALID_TEMPLATE_FORMAT;
}
TemplateBlockOutputStream::TemplateBlockOutputStream(WriteBuffer &ostr_, const Block &sample,
const FormatSettings &settings_, const String& format_template)
TemplateBlockOutputStream::TemplateBlockOutputStream(WriteBuffer & ostr_, const Block & sample, const FormatSettings & settings_)
: ostr(ostr_), header(sample), settings(settings_)
{
parseFormatString(format_template);
static const String default_format("${result}");
const String & format_str = settings.template_settings.format.empty() ? default_format : settings.template_settings.format;
format = parseFormatString(format_str, [&](const String & partName)
{
return static_cast<size_t>(stringToOutputPart(partName));
});
size_t resultIdx = format.format_idx_to_column_idx.size() + 1;
for (size_t i = 0; i < format.format_idx_to_column_idx.size(); ++i)
{
switch (static_cast<OutputPart>(format.format_idx_to_column_idx[i]))
{
case OutputPart::Result:
resultIdx = i;
BOOST_FALLTHROUGH;
case OutputPart::Totals:
case OutputPart::ExtremesMin:
case OutputPart::ExtremesMax:
if (format.formats[i] != ColumnFormat::Default)
throw Exception("invalid template: wrong serialization type for result, totals, min or max",
ErrorCodes::INVALID_TEMPLATE_FORMAT);
break;
default:
break;
}
}
if (resultIdx != 0)
throw Exception("invalid template: ${result} must be the first output part", ErrorCodes::INVALID_TEMPLATE_FORMAT);
row_format = parseFormatString(settings.template_settings.row_format, [&](const String & colName)
{
return header.getPositionByName(colName);
});
if (row_format.delimiters.size() == 1)
throw Exception("invalid template: no columns specified", ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
void TemplateBlockOutputStream::parseFormatString(const String & s)
TemplateBlockOutputStream::ParsedFormat TemplateBlockOutputStream::parseFormatString(const String & s, const ColumnIdxGetter & idxByName)
{
enum ParserState
{
@ -28,10 +64,11 @@ void TemplateBlockOutputStream::parseFormatString(const String & s)
Column,
Format
};
ParsedFormat parsed_format;
const char * pos = s.c_str();
const char * token_begin = pos;
ParserState state = Delimiter;
delimiters.emplace_back();
parsed_format.delimiters.emplace_back();
for (; *pos; ++pos)
{
switch (state)
@ -39,7 +76,7 @@ void TemplateBlockOutputStream::parseFormatString(const String & s)
case Delimiter:
if (*pos == '$')
{
delimiters.back().append(token_begin, pos - token_begin);
parsed_format.delimiters.back().append(token_begin, pos - token_begin);
++pos;
if (*pos == '{')
{
@ -61,17 +98,17 @@ void TemplateBlockOutputStream::parseFormatString(const String & s)
case Column:
if (*pos == ':')
{
size_t column_idx = header.getPositionByName(String(token_begin, pos - token_begin));
format_idx_to_column_idx.push_back(column_idx);
size_t column_idx = idxByName(String(token_begin, pos - token_begin));
parsed_format.format_idx_to_column_idx.push_back(column_idx);
token_begin = pos + 1;
state = Format;
}
else if (*pos == '}')
{
size_t column_idx = header.getPositionByName(String(token_begin, pos - token_begin));
format_idx_to_column_idx.push_back(column_idx);
formats.push_back(ColumnFormat::Default);
delimiters.emplace_back();
size_t column_idx = idxByName(String(token_begin, pos - token_begin));
parsed_format.format_idx_to_column_idx.push_back(column_idx);
parsed_format.formats.push_back(ColumnFormat::Default);
parsed_format.delimiters.emplace_back();
token_begin = pos + 1;
state = Delimiter;
}
@ -80,38 +117,60 @@ void TemplateBlockOutputStream::parseFormatString(const String & s)
case Format:
if (*pos == '}')
{
formats.push_back(stringToFormat(String(token_begin, pos - token_begin)));
parsed_format.formats.push_back(stringToFormat(String(token_begin, pos - token_begin)));
token_begin = pos + 1;
delimiters.emplace_back();
parsed_format.delimiters.emplace_back();
state = Delimiter;
}
}
}
if (state != Delimiter)
throw Exception("invalid template: check parentheses balance", ErrorCodes::INVALID_TEMPLATE_FORMAT);
if (delimiters.size() == 1)
throw Exception("invalid template: no columns specified", ErrorCodes::INVALID_TEMPLATE_FORMAT);
delimiters.back().append(token_begin, pos - token_begin);
parsed_format.delimiters.back().append(token_begin, pos - token_begin);
return parsed_format;
}
TemplateBlockOutputStream::ColumnFormat TemplateBlockOutputStream::stringToFormat(const String & format)
TemplateBlockOutputStream::ColumnFormat TemplateBlockOutputStream::stringToFormat(const String & col_format)
{
if (format.empty())
if (col_format.empty())
return ColumnFormat::Default;
else if (format == "Escaped")
else if (col_format == "Escaped")
return ColumnFormat::Escaped;
else if (format == "Quoted")
else if (col_format == "Quoted")
return ColumnFormat::Quoted;
else if (format == "JSON")
else if (col_format == "JSON")
return ColumnFormat::Json;
else if (format == "XML")
else if (col_format == "XML")
return ColumnFormat::Xml;
else if (format == "Raw")
else if (col_format == "Raw")
return ColumnFormat::Raw;
else
throw Exception("invalid template: unknown field format " + format, ErrorCodes::INVALID_TEMPLATE_FORMAT);
throw Exception("invalid template: unknown field format " + col_format, ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
TemplateBlockOutputStream::OutputPart TemplateBlockOutputStream::stringToOutputPart(const String & part)
{
if (part == "result")
return OutputPart::Result;
else if (part == "totals")
return OutputPart::Totals;
else if (part == "min")
return OutputPart::ExtremesMin;
else if (part == "max")
return OutputPart::ExtremesMax;
else if (part == "rows")
return OutputPart::Rows;
else if (part == "rows_before_limit")
return OutputPart::RowsBeforeLimit;
else if (part == "time")
return OutputPart::TimeElapsed;
else if (part == "rows_read")
return OutputPart::RowsRead;
else if (part == "bytes_read")
return OutputPart::BytesRead;
else
throw Exception("invalid template: unknown output part " + part, ErrorCodes::INVALID_TEMPLATE_FORMAT);
}
void TemplateBlockOutputStream::flush()
@ -119,58 +178,117 @@ void TemplateBlockOutputStream::flush()
ostr.next();
}
void TemplateBlockOutputStream::serializeField(const ColumnWithTypeAndName & col, size_t row_num, ColumnFormat format)
void TemplateBlockOutputStream::writeRow(const Block & block, size_t row_num)
{
switch (format)
size_t columns = row_format.format_idx_to_column_idx.size();
for (size_t j = 0; j < columns; ++j)
{
writeString(row_format.delimiters[j], ostr);
size_t col_idx = row_format.format_idx_to_column_idx[j];
const ColumnWithTypeAndName & col = block.getByPosition(col_idx);
serializeField(*col.column, *col.type, row_num, row_format.formats[j]);
}
writeString(row_format.delimiters[columns], ostr);
}
void TemplateBlockOutputStream::serializeField(const IColumn & column, const IDataType & type, size_t row_num, ColumnFormat col_format)
{
switch (col_format)
{
case ColumnFormat::Default:
case ColumnFormat::Escaped:
col.type->serializeAsTextEscaped(*col.column, row_num, ostr, settings);
type.serializeAsTextEscaped(column, row_num, ostr, settings);
break;
case ColumnFormat::Quoted:
col.type->serializeAsTextQuoted(*col.column, row_num, ostr, settings);
type.serializeAsTextQuoted(column, row_num, ostr, settings);
break;
case ColumnFormat::Json:
col.type->serializeAsTextJSON(*col.column, row_num, ostr, settings);
type.serializeAsTextJSON(column, row_num, ostr, settings);
break;
case ColumnFormat::Xml:
col.type->serializeAsTextXML(*col.column, row_num, ostr, settings);
type.serializeAsTextXML(column, row_num, ostr, settings);
break;
case ColumnFormat::Raw:
col.type->serializeAsText(*col.column, row_num, ostr, settings);
type.serializeAsText(column, row_num, ostr, settings);
break;
default:
__builtin_unreachable();
}
}
template <typename U, typename V> void TemplateBlockOutputStream::writeValue(U value, ColumnFormat col_format)
{
auto type = std::make_unique<V>();
auto col = type->createColumn();
col->insert(value);
serializeField(*col, *type, 0, col_format);
}
void TemplateBlockOutputStream::write(const Block & block)
{
size_t rows = block.rows();
size_t columns = format_idx_to_column_idx.size();
for (size_t i = 0; i < rows; ++i)
{
for (size_t j = 0; j < columns; ++j)
{
writeString(delimiters[j], ostr);
if (row_count)
writeString(settings.template_settings.row_between_delimiter, ostr);
size_t col_idx = format_idx_to_column_idx[j];
const ColumnWithTypeAndName & col = block.getByPosition(col_idx);
serializeField(col, i, formats[j]);
}
writeString(delimiters[columns], ostr);
writeRow(block, i);
++row_count;
}
}
void TemplateBlockOutputStream::writePrefix()
{
// TODO
writeString(format.delimiters.front(), ostr);
}
void TemplateBlockOutputStream::writeSuffix()
{
// TODO
size_t parts = format.format_idx_to_column_idx.size();
for (size_t j = 0; j < parts; ++j)
{
auto type = std::make_shared<DataTypeUInt64>();
ColumnWithTypeAndName col(type->createColumnConst(1, row_count), type, String("tmp"));
switch (static_cast<OutputPart>(format.format_idx_to_column_idx[j]))
{
case OutputPart::Totals:
if (!totals)
throw Exception("invalid template: cannot print totals for this request", ErrorCodes::INVALID_TEMPLATE_FORMAT);
writeRow(totals, 0);
break;
case OutputPart::ExtremesMin:
if (!extremes)
throw Exception("invalid template: cannot print extremes for this request", ErrorCodes::INVALID_TEMPLATE_FORMAT);
writeRow(extremes, 0);
break;
case OutputPart::ExtremesMax:
if (!extremes)
throw Exception("invalid template: cannot print extremes for this request", ErrorCodes::INVALID_TEMPLATE_FORMAT);
writeRow(extremes, 1);
break;
case OutputPart::Rows:
writeValue<size_t, DataTypeUInt64>(row_count, format.formats[j]);
break;
case OutputPart::RowsBeforeLimit:
writeValue<size_t, DataTypeUInt64>(rows_before_limit, format.formats[j]);
break;
case OutputPart::TimeElapsed:
writeValue<double, DataTypeFloat64>(watch.elapsedSeconds(), format.formats[j]);
break;
case OutputPart::RowsRead:
writeValue<size_t, DataTypeUInt64>(progress.rows.load(), format.formats[j]);
break;
case OutputPart::BytesRead:
writeValue<size_t, DataTypeUInt64>(progress.bytes.load(), format.formats[j]);
break;
default:
break;
}
writeString(format.delimiters[j + 1], ostr);
}
}
@ -179,11 +297,10 @@ void registerOutputFormatTemplate(FormatFactory &factory)
factory.registerOutputFormat("Template", [](
WriteBuffer & buf,
const Block & sample,
const Context & context,
const Context &,
const FormatSettings & settings)
{
auto format_template = context.getSettingsRef().format_schema.toString();
return std::make_shared<TemplateBlockOutputStream>(buf, sample, settings, format_template);
return std::make_shared<TemplateBlockOutputStream>(buf, sample, settings);
});
}
}

View File

@ -4,6 +4,7 @@
#include <Formats/FormatSettings.h>
#include <DataStreams/IBlockOutputStream.h>
#include <IO/Progress.h>
#include <Common/Stopwatch.h>
namespace DB
@ -22,7 +23,7 @@ public:
Raw
};
TemplateBlockOutputStream(WriteBuffer & ostr_, const Block & sample, const FormatSettings & settings_, const String & format_template);
TemplateBlockOutputStream(WriteBuffer & ostr_, const Block & sample, const FormatSettings & settings_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
@ -37,22 +38,50 @@ public:
void onProgress(const Progress & progress_) override { progress.incrementPiecewiseAtomically(progress_); }
private:
enum class OutputPart : size_t
{
Result,
Totals,
ExtremesMin,
ExtremesMax,
Rows,
RowsBeforeLimit,
TimeElapsed,
RowsRead,
BytesRead
};
struct ParsedFormat
{
std::vector<String> delimiters;
std::vector<ColumnFormat> formats;
std::vector<size_t> format_idx_to_column_idx;
};
typedef std::function<size_t(const String &)> ColumnIdxGetter;
ColumnFormat stringToFormat(const String & format);
void parseFormatString(const String & s);
void serializeField(const ColumnWithTypeAndName & col, size_t row_num, ColumnFormat format);
OutputPart stringToOutputPart(const String & part);
ParsedFormat parseFormatString(const String & s, const ColumnIdxGetter & idxByName);
void writeRow(const Block & block, size_t row_num);
void serializeField(const IColumn & column, const IDataType & type, size_t row_num, ColumnFormat format);
template <typename U, typename V> void writeValue(U value, ColumnFormat col_format);
private:
WriteBuffer & ostr;
Block header;
const FormatSettings settings;
std::vector<String> delimiters;
std::vector<ColumnFormat> formats;
std::vector<size_t> format_idx_to_column_idx;
ParsedFormat format;
ParsedFormat row_format;
size_t rows_before_limit;
Block totals;
Block extremes;
Progress progress;
Stopwatch watch;
size_t row_count = 0;
};
}