This commit is contained in:
avogar 2021-11-09 16:14:07 +03:00
parent 1cd1677b0c
commit 73d1918410
26 changed files with 883 additions and 400 deletions

View File

@ -617,7 +617,7 @@ class IColumn;
M(String, format_template_row, "", "Path to file which contains format string for rows (for Template format)", 0) \ M(String, format_template_row, "", "Path to file which contains format string for rows (for Template format)", 0) \
M(String, format_template_rows_between_delimiter, "\n", "Delimiter between rows (for Template format)", 0) \ M(String, format_template_rows_between_delimiter, "\n", "Delimiter between rows (for Template format)", 0) \
\ \
M(String, format_custom_escaping_rule, "Escaped", "Field escaping rule (for CustomSeparated format)", 0) \ M(EscapingRule, format_custom_escaping_rule, "Escaped", "Field escaping rule (for CustomSeparated format)", 0) \
M(String, format_custom_field_delimiter, "\t", "Delimiter between fields (for CustomSeparated format)", 0) \ M(String, format_custom_field_delimiter, "\t", "Delimiter between fields (for CustomSeparated format)", 0) \
M(String, format_custom_row_before_delimiter, "", "Delimiter before field of the first column (for CustomSeparated format)", 0) \ M(String, format_custom_row_before_delimiter, "", "Delimiter before field of the first column (for CustomSeparated format)", 0) \
M(String, format_custom_row_after_delimiter, "\n", "Delimiter after field of the last column (for CustomSeparated format)", 0) \ M(String, format_custom_row_after_delimiter, "\n", "Delimiter after field of the last column (for CustomSeparated format)", 0) \
@ -626,7 +626,7 @@ class IColumn;
M(String, format_custom_result_after_delimiter, "", "Suffix after result set (for CustomSeparated format)", 0) \ M(String, format_custom_result_after_delimiter, "", "Suffix after result set (for CustomSeparated format)", 0) \
\ \
M(String, format_regexp, "", "Regular expression (for Regexp format)", 0) \ M(String, format_regexp, "", "Regular expression (for Regexp format)", 0) \
M(String, format_regexp_escaping_rule, "Raw", "Field escaping rule (for Regexp format)", 0) \ M(EscapingRule, format_regexp_escaping_rule, "Raw", "Field escaping rule (for Regexp format)", 0) \
M(Bool, format_regexp_skip_unmatched, false, "Skip lines unmatched by regular expression (for Regexp format", 0) \ M(Bool, format_regexp_skip_unmatched, false, "Skip lines unmatched by regular expression (for Regexp format", 0) \
\ \
M(Bool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \ M(Bool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \

View File

@ -121,4 +121,13 @@ IMPLEMENT_SETTING_ENUM(EnumComparingMode, ErrorCodes::BAD_ARGUMENTS,
{{"by_names", FormatSettings::EnumComparingMode::BY_NAMES}, {{"by_names", FormatSettings::EnumComparingMode::BY_NAMES},
{"by_values", FormatSettings::EnumComparingMode::BY_VALUES}, {"by_values", FormatSettings::EnumComparingMode::BY_VALUES},
{"by_names_case_insensitive", FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE}}) {"by_names_case_insensitive", FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE}})
IMPLEMENT_SETTING_ENUM(EscapingRule, ErrorCodes::BAD_ARGUMENTS,
{{"None", FormatSettings::EscapingRule::None},
{"Escaped", FormatSettings::EscapingRule::Escaped},
{"Quoted", FormatSettings::EscapingRule::Quoted},
{"CSV", FormatSettings::EscapingRule::CSV},
{"JSON", FormatSettings::EscapingRule::JSON},
{"XML", FormatSettings::EscapingRule::XML},
{"Raw", FormatSettings::EscapingRule::Raw}})
} }

View File

@ -170,4 +170,6 @@ DECLARE_SETTING_ENUM(ShortCircuitFunctionEvaluation)
DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparingMode) DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparingMode)
DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)
} }

View File

@ -0,0 +1,224 @@
#include <Formats/EscapingRuleUtils.h>
#include <DataTypes/Serializations/SerializationNullable.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_READ_ALL_DATA;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
}
FormatSettings::EscapingRule stringToEscapingRule(const String & escaping_rule)
{
if (escaping_rule.empty())
return FormatSettings::EscapingRule::None;
else if (escaping_rule == "None")
return FormatSettings::EscapingRule::None;
else if (escaping_rule == "Escaped")
return FormatSettings::EscapingRule::Escaped;
else if (escaping_rule == "Quoted")
return FormatSettings::EscapingRule::Quoted;
else if (escaping_rule == "CSV")
return FormatSettings::EscapingRule::CSV;
else if (escaping_rule == "JSON")
return FormatSettings::EscapingRule::JSON;
else if (escaping_rule == "XML")
return FormatSettings::EscapingRule::XML;
else if (escaping_rule == "Raw")
return FormatSettings::EscapingRule::Raw;
else
throw Exception("Unknown escaping rule \"" + escaping_rule + "\"", ErrorCodes::BAD_ARGUMENTS);
}
String escapingRuleToString(FormatSettings::EscapingRule escaping_rule)
{
switch (escaping_rule)
{
case FormatSettings::EscapingRule::None:
return "None";
case FormatSettings::EscapingRule::Escaped:
return "Escaped";
case FormatSettings::EscapingRule::Quoted:
return "Quoted";
case FormatSettings::EscapingRule::CSV:
return "CSV";
case FormatSettings::EscapingRule::JSON:
return "JSON";
case FormatSettings::EscapingRule::XML:
return "XML";
case FormatSettings::EscapingRule::Raw:
return "Raw";
}
__builtin_unreachable();
}
void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings)
{
String tmp;
constexpr const char * field_name = "<SKIPPED COLUMN>";
constexpr size_t field_name_len = 16;
switch (escaping_rule)
{
case FormatSettings::EscapingRule::None:
/// Empty field, just skip spaces
break;
case FormatSettings::EscapingRule::Escaped:
readEscapedString(tmp, buf);
break;
case FormatSettings::EscapingRule::Quoted:
readQuotedString(tmp, buf);
break;
case FormatSettings::EscapingRule::CSV:
readCSVString(tmp, buf, format_settings.csv);
break;
case FormatSettings::EscapingRule::JSON:
skipJSONField(buf, StringRef(field_name, field_name_len));
break;
case FormatSettings::EscapingRule::Raw:
readString(tmp, buf);
break;
default:
__builtin_unreachable();
}
}
bool deserializeFieldByEscapingRule(
const DataTypePtr & type,
const SerializationPtr & serialization,
IColumn & column,
ReadBuffer & buf,
FormatSettings::EscapingRule escaping_rule,
const FormatSettings & format_settings)
{
bool read = true;
bool parse_as_nullable = format_settings.null_as_default && !type->isNullable() && !type->isLowCardinalityNullable();
switch (escaping_rule)
{
case FormatSettings::EscapingRule::Escaped:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextEscapedImpl(column, buf, format_settings, serialization);
else
serialization->deserializeTextEscaped(column, buf, format_settings);
break;
case FormatSettings::EscapingRule::Quoted:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextQuotedImpl(column, buf, format_settings, serialization);
else
serialization->deserializeTextQuoted(column, buf, format_settings);
break;
case FormatSettings::EscapingRule::CSV:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextCSVImpl(column, buf, format_settings, serialization);
else
serialization->deserializeTextCSV(column, buf, format_settings);
break;
case FormatSettings::EscapingRule::JSON:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextJSONImpl(column, buf, format_settings, serialization);
else
serialization->deserializeTextJSON(column, buf, format_settings);
break;
case FormatSettings::EscapingRule::Raw:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextRawImpl(column, buf, format_settings, serialization);
else
serialization->deserializeTextRaw(column, buf, format_settings);
break;
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Escaping rule {} is not suitable for deserialization", escapingRuleToString(escaping_rule));
}
return read;
}
void serializeFieldByEscapingRule(
const IColumn & column,
const ISerialization & serialization,
WriteBuffer & out,
size_t row_num,
FormatSettings::EscapingRule escaping_rule,
const FormatSettings & format_settings)
{
switch (escaping_rule)
{
case FormatSettings::EscapingRule::Escaped:
serialization.serializeTextEscaped(column, row_num, out, format_settings);
break;
case FormatSettings::EscapingRule::Quoted:
serialization.serializeTextQuoted(column, row_num, out, format_settings);
break;
case FormatSettings::EscapingRule::CSV:
serialization.serializeTextCSV(column, row_num, out, format_settings);
break;
case FormatSettings::EscapingRule::JSON:
serialization.serializeTextJSON(column, row_num, out, format_settings);
break;
case FormatSettings::EscapingRule::XML:
serialization.serializeTextXML(column, row_num, out, format_settings);
break;
case FormatSettings::EscapingRule::Raw:
serialization.serializeTextRaw(column, row_num, out, format_settings);
break;
case FormatSettings::EscapingRule::None:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot serialize field with None escaping rule");
}
}
void writeStringByEscapingRule(const String & value, WriteBuffer & out, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings)
{
switch (escaping_rule)
{
case FormatSettings::EscapingRule::Quoted:
writeQuotedString(value, out);
break;
case FormatSettings::EscapingRule::JSON:
writeJSONString(value, out, format_settings);
break;
case FormatSettings::EscapingRule::Raw:
writeString(value, out);
break;
case FormatSettings::EscapingRule::CSV:
writeCSVString(value, out);
break;
case FormatSettings::EscapingRule::Escaped:
writeEscapedString(value, out);
break;
case FormatSettings::EscapingRule::XML:
writeXMLStringForTextElement(value, out);
break;
case FormatSettings::EscapingRule::None:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot serialize string with None escaping rule");
}
}
String readStringByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings)
{
String result;
switch (escaping_rule)
{
case FormatSettings::EscapingRule::Quoted:
readQuotedString(result, buf);
break;
case FormatSettings::EscapingRule::JSON:
readJSONString(result, buf);
break;
case FormatSettings::EscapingRule::Raw:
readString(result, buf);
break;
case FormatSettings::EscapingRule::CSV:
readCSVString(result, buf, format_settings.csv);
break;
case FormatSettings::EscapingRule::Escaped:
readEscapedString(result, buf);
break;
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot read string with {} escaping rule", escapingRuleToString(escaping_rule));
}
return result;
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Formats/FormatSettings.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/Serializations/ISerialization.h>
#include <IO/ReadBuffer.h>
namespace DB
{
FormatSettings::EscapingRule stringToEscapingRule(const String & escaping_rule);
String escapingRuleToString(FormatSettings::EscapingRule escaping_rule);
void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings);
bool deserializeFieldByEscapingRule(
const DataTypePtr & type,
const SerializationPtr & serialization,
IColumn & column,
ReadBuffer & buf,
FormatSettings::EscapingRule escaping_rule,
const FormatSettings & format_settings);
void serializeFieldByEscapingRule(
const IColumn & column,
const ISerialization & serialization,
WriteBuffer & out,
size_t row_num,
FormatSettings::EscapingRule escaping_rule,
const FormatSettings & format_settings);
void writeStringByEscapingRule(const String & value, WriteBuffer & out, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings);
String readStringByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings);
}

View File

@ -47,6 +47,17 @@ struct FormatSettings
UnixTimestamp UnixTimestamp
}; };
enum class EscapingRule
{
None,
Escaped,
Quoted,
CSV,
JSON,
XML,
Raw
};
DateTimeOutputFormat date_time_output_format = DateTimeOutputFormat::Simple; DateTimeOutputFormat date_time_output_format = DateTimeOutputFormat::Simple;
UInt64 input_allow_errors_num = 0; UInt64 input_allow_errors_num = 0;
@ -89,7 +100,7 @@ struct FormatSettings
std::string row_after_delimiter; std::string row_after_delimiter;
std::string row_between_delimiter; std::string row_between_delimiter;
std::string field_delimiter; std::string field_delimiter;
std::string escaping_rule; EscapingRule escaping_rule = EscapingRule::Escaped;
} custom; } custom;
struct struct
@ -148,7 +159,7 @@ struct FormatSettings
struct struct
{ {
std::string regexp; std::string regexp;
std::string escaping_rule; EscapingRule escaping_rule = EscapingRule::Raw;
bool skip_unmatched = false; bool skip_unmatched = false;
} regexp; } regexp;

View File

@ -1,9 +1,9 @@
#include <Formats/ParsedTemplateFormatString.h> #include <Formats/ParsedTemplateFormatString.h>
#include <Formats/verbosePrintString.h> #include <Formats/verbosePrintString.h>
#include <Formats/EscapingRuleUtils.h>
#include <IO/ReadBufferFromMemory.h> #include <IO/ReadBufferFromMemory.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
namespace DB namespace DB
@ -83,7 +83,7 @@ void ParsedTemplateFormatString::parse(const String & format_string, const Colum
state = Format; state = Format;
else if (*pos == '}') else if (*pos == '}')
{ {
formats.push_back(ColumnFormat::None); escaping_rules.push_back(EscapingRule::None);
delimiters.emplace_back(); delimiters.emplace_back();
state = Delimiter; state = Delimiter;
} }
@ -108,7 +108,7 @@ void ParsedTemplateFormatString::parse(const String & format_string, const Colum
case Format: case Format:
if (*pos == '}') if (*pos == '}')
{ {
formats.push_back(stringToFormat(String(token_begin, pos - token_begin))); escaping_rules.push_back(stringToEscapingRule(String(token_begin, pos - token_begin)));
token_begin = pos + 1; token_begin = pos + 1;
delimiters.emplace_back(); delimiters.emplace_back();
state = Delimiter; state = Delimiter;
@ -120,56 +120,11 @@ void ParsedTemplateFormatString::parse(const String & format_string, const Colum
delimiters.back().append(token_begin, pos - token_begin); delimiters.back().append(token_begin, pos - token_begin);
} }
ParsedTemplateFormatString::ColumnFormat ParsedTemplateFormatString::stringToFormat(const String & col_format)
{
if (col_format.empty())
return ColumnFormat::None;
else if (col_format == "None")
return ColumnFormat::None;
else if (col_format == "Escaped")
return ColumnFormat::Escaped;
else if (col_format == "Quoted")
return ColumnFormat::Quoted;
else if (col_format == "CSV")
return ColumnFormat::Csv;
else if (col_format == "JSON")
return ColumnFormat::Json;
else if (col_format == "XML")
return ColumnFormat::Xml;
else if (col_format == "Raw")
return ColumnFormat::Raw;
else
throw Exception("Unknown field format \"" + col_format + "\"", ErrorCodes::BAD_ARGUMENTS);
}
size_t ParsedTemplateFormatString::columnsCount() const size_t ParsedTemplateFormatString::columnsCount() const
{ {
return format_idx_to_column_idx.size(); return format_idx_to_column_idx.size();
} }
String ParsedTemplateFormatString::formatToString(ParsedTemplateFormatString::ColumnFormat format)
{
switch (format)
{
case ColumnFormat::None:
return "None";
case ColumnFormat::Escaped:
return "Escaped";
case ColumnFormat::Quoted:
return "Quoted";
case ColumnFormat::Csv:
return "CSV";
case ColumnFormat::Json:
return "Json";
case ColumnFormat::Xml:
return "Xml";
case ColumnFormat::Raw:
return "Raw";
}
__builtin_unreachable();
}
const char * ParsedTemplateFormatString::readMayBeQuotedColumnNameInto(const char * pos, size_t size, String & s) const char * ParsedTemplateFormatString::readMayBeQuotedColumnNameInto(const char * pos, size_t size, String & s)
{ {
s.clear(); s.clear();
@ -197,7 +152,7 @@ String ParsedTemplateFormatString::dump() const
res << "\nDelimiter " << 0 << ": "; res << "\nDelimiter " << 0 << ": ";
verbosePrintString(delimiters.front().c_str(), delimiters.front().c_str() + delimiters.front().size(), res); verbosePrintString(delimiters.front().c_str(), delimiters.front().c_str() + delimiters.front().size(), res);
size_t num_columns = std::max(formats.size(), format_idx_to_column_idx.size()); size_t num_columns = std::max(escaping_rules.size(), format_idx_to_column_idx.size());
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
{ {
res << "\nColumn " << i << ": \""; res << "\nColumn " << i << ": \"";
@ -216,7 +171,7 @@ String ParsedTemplateFormatString::dump() const
else else
res << *format_idx_to_column_idx[i]; res << *format_idx_to_column_idx[i];
res << "), Format " << (i < formats.size() ? formatToString(formats[i]) : "<ERROR>"); res << "), Format " << (i < escaping_rules.size() ? escapingRuleToString(escaping_rules[i]) : "<ERROR>");
res << "\nDelimiter " << i + 1 << ": "; res << "\nDelimiter " << i + 1 << ": ";
if (delimiters.size() <= i + 1) if (delimiters.size() <= i + 1)
@ -235,34 +190,4 @@ void ParsedTemplateFormatString::throwInvalidFormat(const String & message, size
ErrorCodes::INVALID_TEMPLATE_FORMAT); ErrorCodes::INVALID_TEMPLATE_FORMAT);
} }
ParsedTemplateFormatString ParsedTemplateFormatString::setupCustomSeparatedResultsetFormat(const FormatSettings::Custom & settings)
{
/// Set resultset format to "result_before_delimiter ${data} result_after_delimiter"
ParsedTemplateFormatString resultset_format;
resultset_format.delimiters.emplace_back(settings.result_before_delimiter);
resultset_format.delimiters.emplace_back(settings.result_after_delimiter);
resultset_format.formats.emplace_back(ParsedTemplateFormatString::ColumnFormat::None);
resultset_format.format_idx_to_column_idx.emplace_back(0);
resultset_format.column_names.emplace_back("data");
return resultset_format;
}
ParsedTemplateFormatString ParsedTemplateFormatString::setupCustomSeparatedRowFormat(const FormatSettings::Custom & settings, const Block & sample)
{
/// Set row format to
/// "row_before_delimiter ${Col0:escaping} field_delimiter ${Col1:escaping} field_delimiter ... ${ColN:escaping} row_after_delimiter"
ParsedTemplateFormatString::ColumnFormat escaping = ParsedTemplateFormatString::stringToFormat(settings.escaping_rule);
ParsedTemplateFormatString row_format;
row_format.delimiters.emplace_back(settings.row_before_delimiter);
for (size_t i = 0; i < sample.columns(); ++i)
{
row_format.formats.emplace_back(escaping);
row_format.format_idx_to_column_idx.emplace_back(i);
row_format.column_names.emplace_back(sample.getByPosition(i).name);
bool last_column = i == sample.columns() - 1;
row_format.delimiters.emplace_back(last_column ? settings.row_after_delimiter : settings.field_delimiter);
}
return row_format;
}
} }

View File

@ -15,23 +15,14 @@ using Strings = std::vector<String>;
struct ParsedTemplateFormatString struct ParsedTemplateFormatString
{ {
enum class ColumnFormat using EscapingRule = FormatSettings::EscapingRule;
{
None,
Escaped,
Quoted,
Csv,
Json,
Xml,
Raw
};
/// Format string has syntax: "Delimiter0 ${ColumnName0:Format0} Delimiter1 ${ColumnName1:Format1} Delimiter2" /// Format string has syntax: "Delimiter0 ${ColumnName0:Format0} Delimiter1 ${ColumnName1:Format1} Delimiter2"
/// The following vectors is filled with corresponding values, delimiters.size() - 1 = formats.size() = format_idx_to_column_idx.size() /// The following vectors is filled with corresponding values, delimiters.size() - 1 = formats.size() = format_idx_to_column_idx.size()
/// If format_idx_to_column_idx[i] has no value, then TemplateRowInputFormat will skip i-th column. /// If format_idx_to_column_idx[i] has no value, then TemplateRowInputFormat will skip i-th column.
std::vector<String> delimiters; std::vector<String> delimiters;
std::vector<ColumnFormat> formats; std::vector<EscapingRule> escaping_rules;
std::vector<std::optional<size_t>> format_idx_to_column_idx; std::vector<std::optional<size_t>> format_idx_to_column_idx;
/// For diagnostic info /// For diagnostic info
@ -44,16 +35,11 @@ struct ParsedTemplateFormatString
void parse(const String & format_string, const ColumnIdxGetter & idx_by_name); void parse(const String & format_string, const ColumnIdxGetter & idx_by_name);
static ColumnFormat stringToFormat(const String & format);
static String formatToString(ColumnFormat format);
static const char * readMayBeQuotedColumnNameInto(const char * pos, size_t size, String & s); static const char * readMayBeQuotedColumnNameInto(const char * pos, size_t size, String & s);
size_t columnsCount() const; size_t columnsCount() const;
String dump() const; String dump() const;
[[noreturn]] void throwInvalidFormat(const String & message, size_t column) const; [[noreturn]] void throwInvalidFormat(const String & message, size_t column) const;
static ParsedTemplateFormatString setupCustomSeparatedResultsetFormat(const FormatSettings::Custom & settings);
static ParsedTemplateFormatString setupCustomSeparatedRowFormat(const FormatSettings::Custom & settings, const Block & sample);
}; };
} }

View File

@ -50,6 +50,8 @@ void registerInputFormatAvro(FormatFactory & factory);
void registerOutputFormatAvro(FormatFactory & factory); void registerOutputFormatAvro(FormatFactory & factory);
void registerInputFormatRawBLOB(FormatFactory & factory); void registerInputFormatRawBLOB(FormatFactory & factory);
void registerOutputFormatRawBLOB(FormatFactory & factory); void registerOutputFormatRawBLOB(FormatFactory & factory);
void registerInputFormatCustomSeparated(FormatFactory & factory);
void registerOutputFormatCustomSeparated(FormatFactory & factory);
/// Output only (presentational) formats. /// Output only (presentational) formats.
@ -115,6 +117,8 @@ void registerFormats()
registerOutputFormatMsgPack(factory); registerOutputFormatMsgPack(factory);
registerInputFormatRawBLOB(factory); registerInputFormatRawBLOB(factory);
registerOutputFormatRawBLOB(factory); registerOutputFormatRawBLOB(factory);
registerInputFormatCustomSeparated(factory);
registerOutputFormatCustomSeparated(factory);
registerInputFormatORC(factory); registerInputFormatORC(factory);
registerOutputFormatORC(factory); registerOutputFormatORC(factory);

View File

@ -5,6 +5,7 @@ namespace DB
{ {
class WriteBuffer; class WriteBuffer;
class ReadBuffer;
/** Print string in double quotes and with control characters in "<NAME>" form - for output diagnostic info to user. /** Print string in double quotes and with control characters in "<NAME>" form - for output diagnostic info to user.

View File

@ -1166,4 +1166,50 @@ bool loadAtPosition(ReadBuffer & in, Memory<> & memory, char * & current)
return loaded_more; return loaded_more;
} }
/// Searches for delimiter in input stream and sets buffer position after delimiter (if found) or EOF (if not)
static void findAndSkipNextDelimiter(PeekableReadBuffer & buf, const String & delimiter)
{
if (delimiter.empty())
return;
while (!buf.eof())
{
void * pos = memchr(buf.position(), delimiter[0], buf.available());
if (!pos)
{
buf.position() += buf.available();
continue;
}
buf.position() = static_cast<ReadBuffer::Position>(pos);
PeekableReadBufferCheckpoint checkpoint{buf};
if (checkString(delimiter, buf))
return;
buf.rollbackToCheckpoint();
++buf.position();
}
}
void skipToNextRowOrEof(PeekableReadBuffer & buf, const String & row_after_delimiter, const String & row_between_delimiter, bool skip_spaces)
{
if (row_after_delimiter.empty())
{
findAndSkipNextDelimiter(buf, row_between_delimiter);
return;
}
while (true)
{
findAndSkipNextDelimiter(buf, row_after_delimiter);
if (skip_spaces)
skipWhitespaceIfAny(buf);
if (checkString(row_between_delimiter, buf))
break;
}
}
} }

View File

@ -30,6 +30,7 @@
#include <IO/CompressionMethod.h> #include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h> #include <IO/ReadBufferFromMemory.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/VarInt.h> #include <IO/VarInt.h>
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
@ -1324,6 +1325,9 @@ void saveUpToPosition(ReadBuffer & in, Memory<Allocator<false>> & memory, char *
*/ */
bool loadAtPosition(ReadBuffer & in, Memory<Allocator<false>> & memory, char * & current); bool loadAtPosition(ReadBuffer & in, Memory<Allocator<false>> & memory, char * & current);
/// Skip data until start of the next row or eof (the end of row is determined by two delimiters:
/// row_after_delimiter and row_between_delimiter).
void skipToNextRowOrEof(PeekableReadBuffer & buf, const String & row_after_delimiter, const String & row_between_delimiter, bool skip_spaces);
struct PcgDeserializer struct PcgDeserializer
{ {

View File

@ -0,0 +1,233 @@
#include <Processors/Formats/Impl/CustomSeparatedRowInputFormat.h>
#include <Processors/Formats/Impl/TemplateRowInputFormat.h>
#include <Formats/EscapingRuleUtils.h>
#include <Formats/registerWithNamesAndTypes.h>
#include <IO/Operators.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
CustomSeparatedRowInputFormat::CustomSeparatedRowInputFormat(
const Block & header_,
ReadBuffer & in_,
const Params & params_,
bool with_names_,
bool with_types_,
bool ignore_spaces_,
const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes(header_, in_, params_, with_names_, with_types_, format_settings_)
, buf(in_)
, ignore_spaces(ignore_spaces_)
, escaping_rule(format_settings_.custom.escaping_rule)
{
/// In case of CustomSeparatedWithNames(AndTypes) formats and enabled setting input_format_with_names_use_header we don't know
/// the exact number of columns in data (because it can contain unknown columns). So, if field_delimiter and row_after_delimiter are
/// the same and row_between_delimiter is empty, we won't be able to determine the end of row while reading column names or types.
if ((with_types_ || with_names_) && format_settings_.with_names_use_header
&& format_settings_.custom.field_delimiter == format_settings_.custom.row_after_delimiter
&& format_settings_.custom.row_between_delimiter.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Input format CustomSeparatedWithNames(AndTypes) cannot work properly with enabled setting input_format_with_names_use_header, "
"when format_custom_field_delimiter and format_custom_row_after_delimiter are the same and format_custom_row_between_delimiter is empty.");
}
}
void CustomSeparatedRowInputFormat::skipPrefixBeforeHeader()
{
skipSpaces();
assertString(format_settings.custom.result_before_delimiter, buf);
}
void CustomSeparatedRowInputFormat::skipRowStartDelimiter()
{
skipSpaces();
assertString(format_settings.custom.row_before_delimiter, buf);
}
void CustomSeparatedRowInputFormat::skipFieldDelimiter()
{
skipSpaces();
assertString(format_settings.custom.field_delimiter, buf);
}
void CustomSeparatedRowInputFormat::skipRowEndDelimiter()
{
skipSpaces();
assertString(format_settings.custom.row_after_delimiter, buf);
}
void CustomSeparatedRowInputFormat::skipRowBetweenDelimiter()
{
skipSpaces();
assertString(format_settings.custom.row_between_delimiter, buf);
}
void CustomSeparatedRowInputFormat::skipField()
{
skipSpaces();
skipFieldByEscapingRule(buf, escaping_rule, format_settings);
}
bool CustomSeparatedRowInputFormat::checkEndOfRow()
{
PeekableReadBufferCheckpoint checkpoint{buf, true};
skipSpaces();
if (!checkString(format_settings.custom.row_after_delimiter, buf))
return false;
skipSpaces();
/// At the end of row after row_after_delimiter we expect result_after_delimiter or row_between_delimiter.
if (checkString(format_settings.custom.row_between_delimiter, buf))
return true;
buf.rollbackToCheckpoint();
skipSpaces();
buf.ignore(format_settings.custom.row_after_delimiter.size());
return checkForSuffixImpl(true);
}
std::vector<String> CustomSeparatedRowInputFormat::readHeaderRow()
{
std::vector<String> values;
skipRowStartDelimiter();
do
{
if (!values.empty())
skipFieldDelimiter();
skipSpaces();
values.push_back(readStringByEscapingRule(buf, escaping_rule, format_settings));
}
while (!checkEndOfRow());
skipRowEndDelimiter();
return values;
}
void CustomSeparatedRowInputFormat::skipHeaderRow()
{
size_t columns = getPort().getHeader().columns();
skipRowStartDelimiter();
for (size_t i = 0; i != columns; ++i)
{
skipField();
if (i + 1 != columns)
skipFieldDelimiter();
}
skipRowEndDelimiter();
}
bool CustomSeparatedRowInputFormat::readField(IColumn & column, const DataTypePtr & type, const SerializationPtr & serialization, bool, const String &)
{
skipSpaces();
return deserializeFieldByEscapingRule(type, serialization, column, buf, escaping_rule, format_settings);
}
bool CustomSeparatedRowInputFormat::checkForSuffixImpl(bool check_eof)
{
skipSpaces();
if (unlikely(checkString(format_settings.custom.result_after_delimiter, buf)))
{
skipSpaces();
if (!check_eof)
return true;
if (buf.eof())
return true;
}
return false;
}
bool CustomSeparatedRowInputFormat::tryParseSuffixWithDiagnosticInfo(WriteBuffer & out)
{
PeekableReadBufferCheckpoint checkpoint{buf};
if (checkForSuffixImpl(false))
{
if (in->eof())
out << "<End of stream>\n";
else
out << " There is some data after suffix\n";
return false;
}
buf.rollbackToCheckpoint();
return true;
}
bool CustomSeparatedRowInputFormat::checkForSuffix()
{
PeekableReadBufferCheckpoint checkpoint{buf};
if (checkForSuffixImpl(true))
return true;
buf.rollbackToCheckpoint();
return false;
}
bool CustomSeparatedRowInputFormat::allowSyncAfterError() const
{
return !format_settings.custom.row_after_delimiter.empty() || !format_settings.custom.row_between_delimiter.empty();
}
void CustomSeparatedRowInputFormat::syncAfterError()
{
skipToNextRowOrEof(buf, format_settings.custom.row_after_delimiter, format_settings.custom.row_between_delimiter, ignore_spaces);
end_of_stream = buf.eof();
/// It can happen that buf.position() is not at the beginning of row
/// if some delimiters is similar to row_format.delimiters.back() and row_between_delimiter.
/// It will cause another parsing error.
}
bool CustomSeparatedRowInputFormat::parseRowStartWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.row_before_delimiter, "delimiter before first firld", ignore_spaces);
}
bool CustomSeparatedRowInputFormat::parseFieldDelimiterWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.field_delimiter, "delimiter between fields", ignore_spaces);
}
bool CustomSeparatedRowInputFormat::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.row_after_delimiter, "delimiter after last field", ignore_spaces);
}
bool CustomSeparatedRowInputFormat::parseRowBetweenDelimiterWithDiagnosticInfo(WriteBuffer & out)
{
return parseDelimiterWithDiagnosticInfo(out, buf, format_settings.custom.row_between_delimiter, "delimiter between rows", ignore_spaces);
}
void CustomSeparatedRowInputFormat::resetParser()
{
RowInputFormatWithNamesAndTypes::resetParser();
buf.reset();
}
void registerInputFormatCustomSeparated(FormatFactory & factory)
{
for (bool ignore_spaces : {false, true})
{
auto register_func = [&](const String & format_name, bool with_names, bool with_types)
{
factory.registerInputFormat(format_name, [=](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<CustomSeparatedRowInputFormat>(sample, buf, params, with_names, with_types, ignore_spaces, settings);
});
};
registerWithNamesAndTypes(ignore_spaces ? "CustomSeparatedIgnoreSpaces" : "CustomSeparated", register_func);
}
}
}

View File

@ -0,0 +1,64 @@
#pragma once
#include <Processors/Formats/RowInputFormatWithNamesAndTypes.h>
#include <Formats/ParsedTemplateFormatString.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/ReadHelpers.h>
namespace DB
{
class CustomSeparatedRowInputFormat : public RowInputFormatWithNamesAndTypes
{
public:
CustomSeparatedRowInputFormat(
const Block & header_,
ReadBuffer & in_,
const Params & params_,
bool with_names_, bool with_types_, bool ignore_spaces_, const FormatSettings & format_settings_);
void resetParser() override;
String getName() const override { return "CustomSeparatedRowInputFormat"; }
private:
using EscapingRule = FormatSettings::EscapingRule;
bool readField(IColumn & column, const DataTypePtr & type, const SerializationPtr & serialization, bool is_last_file_column, const String & column_name) override;
void skipField(size_t /*file_column*/) override { skipField(); }
void skipField();
void skipNames() override { skipHeaderRow(); }
void skipTypes() override { skipHeaderRow(); }
void skipHeaderRow();
void skipPrefixBeforeHeader() override;
void skipRowStartDelimiter() override;
void skipFieldDelimiter() override;
void skipRowEndDelimiter() override;
void skipRowBetweenDelimiter() override;
bool checkForSuffix() override;
bool allowSyncAfterError() const override;
void syncAfterError() override;
bool parseRowStartWithDiagnosticInfo(WriteBuffer & out) override;
bool parseFieldDelimiterWithDiagnosticInfo(WriteBuffer & out) override;
bool parseRowEndWithDiagnosticInfo(WriteBuffer & out) override;
bool parseRowBetweenDelimiterWithDiagnosticInfo(WriteBuffer & out) override;
bool tryParseSuffixWithDiagnosticInfo(WriteBuffer & out) override;
std::vector<String> readNames() override { return readHeaderRow(); }
std::vector<String> readTypes() override { return readHeaderRow(); }
std::vector<String> readHeaderRow();
bool checkEndOfRow();
bool checkForSuffixImpl(bool check_eof);
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(buf); }
PeekableReadBuffer buf;
bool ignore_spaces;
EscapingRule escaping_rule;
};
}

View File

@ -0,0 +1,98 @@
#include <Processors/Formats/Impl/CustomSeparatedRowOutputFormat.h>
#include <Formats/registerWithNamesAndTypes.h>
#include <Formats/EscapingRuleUtils.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
CustomSeparatedRowOutputFormat::CustomSeparatedRowOutputFormat(
const Block & header_, WriteBuffer & out_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_, bool with_names_, bool with_types_)
: IRowOutputFormat(header_, out_, params_)
, with_names(with_names_)
, with_types(with_types_)
, format_settings(format_settings_)
, escaping_rule(format_settings.custom.escaping_rule)
{
}
void CustomSeparatedRowOutputFormat::writeLine(const std::vector<String> & values)
{
for (const auto & value : values)
writeStringByEscapingRule(value, out, escaping_rule, format_settings);
}
void CustomSeparatedRowOutputFormat::writePrefix()
{
writeString(format_settings.custom.result_before_delimiter, out);
const auto & header = getPort(PortKind::Main).getHeader();
if (with_names)
{
writeLine(header.getNames());
writeRowBetweenDelimiter();
}
if (with_types)
{
writeLine(header.getDataTypeNames());
writeRowBetweenDelimiter();
}
}
void CustomSeparatedRowOutputFormat::writeSuffix()
{
writeString(format_settings.custom.result_after_delimiter, out);
}
void CustomSeparatedRowOutputFormat::writeRowStartDelimiter()
{
writeString(format_settings.custom.row_before_delimiter, out);
}
void CustomSeparatedRowOutputFormat::writeFieldDelimiter()
{
writeString(format_settings.custom.field_delimiter, out);
}
void CustomSeparatedRowOutputFormat::writeRowEndDelimiter()
{
writeString(format_settings.custom.row_after_delimiter, out);
}
void CustomSeparatedRowOutputFormat::writeRowBetweenDelimiter()
{
writeString(format_settings.custom.row_between_delimiter, out);
}
void CustomSeparatedRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
{
serializeFieldByEscapingRule(column, serialization, out, row_num, escaping_rule, format_settings);
}
void registerOutputFormatCustomSeparated(FormatFactory & factory)
{
auto register_func = [&](const String & format_name, bool with_names, bool with_types)
{
factory.registerOutputFormat(format_name, [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<CustomSeparatedRowOutputFormat>(sample, buf, params, settings, with_names, with_types);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};
registerWithNamesAndTypes("CustomSeparated", register_func);
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Processors/Formats/IRowOutputFormat.h>
#include <Formats/ParsedTemplateFormatString.h>
namespace DB
{
class WriteBuffer;
class CustomSeparatedRowOutputFormat : public IRowOutputFormat
{
public:
CustomSeparatedRowOutputFormat(const Block & header_, WriteBuffer & out_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_, bool with_names_, bool with_types_);
String getName() const override { return "CustomSeparatedRowOutputFormat"; }
private:
using EscapingRule = FormatSettings::EscapingRule;
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
void writePrefix() override;
void writeSuffix() override;
void writeLine(const std::vector<String> & values);
bool with_names;
bool with_types;
const FormatSettings format_settings;
EscapingRule escaping_rule;
};
}

View File

@ -2,6 +2,7 @@
#include <base/find_symbols.h> #include <base/find_symbols.h>
#include <Processors/Formats/Impl/RegexpRowInputFormat.h> #include <Processors/Formats/Impl/RegexpRowInputFormat.h>
#include <DataTypes/Serializations/SerializationNullable.h> #include <DataTypes/Serializations/SerializationNullable.h>
#include <Formats/EscapingRuleUtils.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
namespace DB namespace DB
@ -19,7 +20,7 @@ RegexpRowInputFormat::RegexpRowInputFormat(
: IRowInputFormat(header_, in_, std::move(params_)) : IRowInputFormat(header_, in_, std::move(params_))
, buf(in_) , buf(in_)
, format_settings(format_settings_) , format_settings(format_settings_)
, field_format(stringToFormat(format_settings_.regexp.escaping_rule)) , escaping_rule(format_settings_.regexp.escaping_rule)
, regexp(format_settings_.regexp.regexp) , regexp(format_settings_.regexp.regexp)
{ {
size_t fields_count = regexp.NumberOfCapturingGroups(); size_t fields_count = regexp.NumberOfCapturingGroups();
@ -42,72 +43,19 @@ void RegexpRowInputFormat::resetParser()
buf.reset(); buf.reset();
} }
RegexpRowInputFormat::ColumnFormat RegexpRowInputFormat::stringToFormat(const String & format)
{
if (format == "Escaped")
return ColumnFormat::Escaped;
if (format == "Quoted")
return ColumnFormat::Quoted;
if (format == "CSV")
return ColumnFormat::Csv;
if (format == "JSON")
return ColumnFormat::Json;
if (format == "Raw")
return ColumnFormat::Raw;
throw Exception("Unsupported column format \"" + format + "\".", ErrorCodes::BAD_ARGUMENTS);
}
bool RegexpRowInputFormat::readField(size_t index, MutableColumns & columns) bool RegexpRowInputFormat::readField(size_t index, MutableColumns & columns)
{ {
const auto & type = getPort().getHeader().getByPosition(index).type; const auto & type = getPort().getHeader().getByPosition(index).type;
bool parse_as_nullable = format_settings.null_as_default && !type->isNullable() && !type->isLowCardinalityNullable();
bool read = true;
ReadBuffer field_buf(const_cast<char *>(matched_fields[index].data()), matched_fields[index].size(), 0); ReadBuffer field_buf(const_cast<char *>(matched_fields[index].data()), matched_fields[index].size(), 0);
try try
{ {
const auto & serialization = serializations[index]; return deserializeFieldByEscapingRule(type, serializations[index], *columns[index], field_buf, escaping_rule, format_settings);
switch (field_format)
{
case ColumnFormat::Escaped:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextEscapedImpl(*columns[index], field_buf, format_settings, serialization);
else
serialization->deserializeTextEscaped(*columns[index], field_buf, format_settings);
break;
case ColumnFormat::Quoted:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextQuotedImpl(*columns[index], field_buf, format_settings, serialization);
else
serialization->deserializeTextQuoted(*columns[index], field_buf, format_settings);
break;
case ColumnFormat::Csv:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextCSVImpl(*columns[index], field_buf, format_settings, serialization);
else
serialization->deserializeTextCSV(*columns[index], field_buf, format_settings);
break;
case ColumnFormat::Json:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextJSONImpl(*columns[index], field_buf, format_settings, serialization);
else
serialization->deserializeTextJSON(*columns[index], field_buf, format_settings);
break;
case ColumnFormat::Raw:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextRawImpl(*columns[index], field_buf, format_settings, serialization);
else
serialization->deserializeTextRaw(*columns[index], field_buf, format_settings);
break;
default:
break;
}
} }
catch (Exception & e) catch (Exception & e)
{ {
e.addMessage("(while reading the value of column " + getPort().getHeader().getByPosition(index).name + ")"); e.addMessage("(while reading the value of column " + getPort().getHeader().getByPosition(index).name + ")");
throw; throw;
} }
return read;
} }
void RegexpRowInputFormat::readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext) void RegexpRowInputFormat::readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext)

View File

@ -25,7 +25,7 @@ class ReadBuffer;
class RegexpRowInputFormat : public IRowInputFormat class RegexpRowInputFormat : public IRowInputFormat
{ {
using ColumnFormat = ParsedTemplateFormatString::ColumnFormat; using EscapingRule = FormatSettings::EscapingRule;
public: public:
RegexpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_); RegexpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_);
@ -37,11 +37,10 @@ public:
private: private:
bool readField(size_t index, MutableColumns & columns); bool readField(size_t index, MutableColumns & columns);
void readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext); void readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext);
static ColumnFormat stringToFormat(const String & format);
PeekableReadBuffer buf; PeekableReadBuffer buf;
const FormatSettings format_settings; const FormatSettings format_settings;
const ColumnFormat field_format; const EscapingRule escaping_rule;
const RE2 regexp; const RE2 regexp;
// The vector of fields extracted from line using regexp. // The vector of fields extracted from line using regexp.

View File

@ -30,7 +30,7 @@ void TabSeparatedRowOutputFormat::writeLine(const std::vector<String> & values)
} }
} }
void TabSeparatedRowOutputFormat::doWritePrefix() void TabSeparatedRowOutputFormat::writePrefix()
{ {
const auto & header = getPort(PortKind::Main).getHeader(); const auto & header = getPort(PortKind::Main).getHeader();

View File

@ -35,7 +35,7 @@ public:
void writeBeforeTotals() override; void writeBeforeTotals() override;
void writeBeforeExtremes() override; void writeBeforeExtremes() override;
void doWritePrefix() override; void writePrefix() override;
/// https://www.iana.org/assignments/media-types/text/tab-separated-values /// https://www.iana.org/assignments/media-types/text/tab-separated-values
String getContentType() const override { return "text/tab-separated-values; charset=UTF-8"; } String getContentType() const override { return "text/tab-separated-values; charset=UTF-8"; }

View File

@ -1,5 +1,6 @@
#include <Processors/Formats/Impl/TemplateBlockOutputFormat.h> #include <Processors/Formats/Impl/TemplateBlockOutputFormat.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
@ -39,7 +40,7 @@ TemplateBlockOutputFormat::TemplateBlockOutputFormat(const Block & header_, Writ
case static_cast<size_t>(ResultsetPart::Totals): case static_cast<size_t>(ResultsetPart::Totals):
case static_cast<size_t>(ResultsetPart::ExtremesMin): case static_cast<size_t>(ResultsetPart::ExtremesMin):
case static_cast<size_t>(ResultsetPart::ExtremesMax): case static_cast<size_t>(ResultsetPart::ExtremesMax):
if (format.formats[i] != ColumnFormat::None) if (format.escaping_rules[i] != EscapingRule::None)
format.throwInvalidFormat("Serialization type for data, totals, min and max must be empty or None", i); format.throwInvalidFormat("Serialization type for data, totals, min and max must be empty or None", i);
break; break;
case static_cast<size_t>(ResultsetPart::Rows): case static_cast<size_t>(ResultsetPart::Rows):
@ -47,7 +48,7 @@ TemplateBlockOutputFormat::TemplateBlockOutputFormat(const Block & header_, Writ
case static_cast<size_t>(ResultsetPart::TimeElapsed): case static_cast<size_t>(ResultsetPart::TimeElapsed):
case static_cast<size_t>(ResultsetPart::RowsRead): case static_cast<size_t>(ResultsetPart::RowsRead):
case static_cast<size_t>(ResultsetPart::BytesRead): case static_cast<size_t>(ResultsetPart::BytesRead):
if (format.formats[i] == ColumnFormat::None) if (format.escaping_rules[i] == EscapingRule::None)
format.throwInvalidFormat("Serialization type for output part rows, rows_before_limit, time, " format.throwInvalidFormat("Serialization type for output part rows, rows_before_limit, time, "
"rows_read or bytes_read is not specified", i); "rows_read or bytes_read is not specified", i);
break; break;
@ -68,7 +69,7 @@ TemplateBlockOutputFormat::TemplateBlockOutputFormat(const Block & header_, Writ
if (header_.columns() <= *row_format.format_idx_to_column_idx[i]) if (header_.columns() <= *row_format.format_idx_to_column_idx[i])
row_format.throwInvalidFormat("Column index " + std::to_string(*row_format.format_idx_to_column_idx[i]) + row_format.throwInvalidFormat("Column index " + std::to_string(*row_format.format_idx_to_column_idx[i]) +
" must be less then number of columns (" + std::to_string(header_.columns()) + ")", i); " must be less then number of columns (" + std::to_string(header_.columns()) + ")", i);
if (row_format.formats[i] == ColumnFormat::None) if (row_format.escaping_rules[i] == EscapingRule::None)
row_format.throwInvalidFormat("Serialization type for file column is not specified", i); row_format.throwInvalidFormat("Serialization type for file column is not specified", i);
} }
} }
@ -105,44 +106,17 @@ void TemplateBlockOutputFormat::writeRow(const Chunk & chunk, size_t row_num)
writeString(row_format.delimiters[j], out); writeString(row_format.delimiters[j], out);
size_t col_idx = *row_format.format_idx_to_column_idx[j]; size_t col_idx = *row_format.format_idx_to_column_idx[j];
serializeField(*chunk.getColumns()[col_idx], *serializations[col_idx], row_num, row_format.formats[j]); serializeFieldByEscapingRule(*chunk.getColumns()[col_idx], *serializations[col_idx], out, row_num, row_format.escaping_rules[j], settings);
} }
writeString(row_format.delimiters[columns], out); writeString(row_format.delimiters[columns], out);
} }
void TemplateBlockOutputFormat::serializeField(const IColumn & column, const ISerialization & serialization, size_t row_num, ColumnFormat col_format) template <typename U, typename V> void TemplateBlockOutputFormat::writeValue(U value, EscapingRule escaping_rule)
{
switch (col_format)
{
case ColumnFormat::Escaped:
serialization.serializeTextEscaped(column, row_num, out, settings);
break;
case ColumnFormat::Quoted:
serialization.serializeTextQuoted(column, row_num, out, settings);
break;
case ColumnFormat::Csv:
serialization.serializeTextCSV(column, row_num, out, settings);
break;
case ColumnFormat::Json:
serialization.serializeTextJSON(column, row_num, out, settings);
break;
case ColumnFormat::Xml:
serialization.serializeTextXML(column, row_num, out, settings);
break;
case ColumnFormat::Raw:
serialization.serializeTextRaw(column, row_num, out, settings);
break;
default:
__builtin_unreachable();
}
}
template <typename U, typename V> void TemplateBlockOutputFormat::writeValue(U value, ColumnFormat col_format)
{ {
auto type = std::make_unique<V>(); auto type = std::make_unique<V>();
auto col = type->createColumn(); auto col = type->createColumn();
col->insert(value); col->insert(value);
serializeField(*col, *type->getDefaultSerialization(), 0, col_format); serializeFieldByEscapingRule(*col, *type->getDefaultSerialization(), out, 0, escaping_rule, settings);
} }
void TemplateBlockOutputFormat::consume(Chunk chunk) void TemplateBlockOutputFormat::consume(Chunk chunk)
@ -201,21 +175,21 @@ void TemplateBlockOutputFormat::finalize()
writeRow(extremes, 1); writeRow(extremes, 1);
break; break;
case ResultsetPart::Rows: case ResultsetPart::Rows:
writeValue<size_t, DataTypeUInt64>(row_count, format.formats[i]); writeValue<size_t, DataTypeUInt64>(row_count, format.escaping_rules[i]);
break; break;
case ResultsetPart::RowsBeforeLimit: case ResultsetPart::RowsBeforeLimit:
if (!rows_before_limit_set) if (!rows_before_limit_set)
format.throwInvalidFormat("Cannot print rows_before_limit for this request", i); format.throwInvalidFormat("Cannot print rows_before_limit for this request", i);
writeValue<size_t, DataTypeUInt64>(rows_before_limit, format.formats[i]); writeValue<size_t, DataTypeUInt64>(rows_before_limit, format.escaping_rules[i]);
break; break;
case ResultsetPart::TimeElapsed: case ResultsetPart::TimeElapsed:
writeValue<double, DataTypeFloat64>(watch.elapsedSeconds(), format.formats[i]); writeValue<double, DataTypeFloat64>(watch.elapsedSeconds(), format.escaping_rules[i]);
break; break;
case ResultsetPart::RowsRead: case ResultsetPart::RowsRead:
writeValue<size_t, DataTypeUInt64>(progress.read_rows.load(), format.formats[i]); writeValue<size_t, DataTypeUInt64>(progress.read_rows.load(), format.escaping_rules[i]);
break; break;
case ResultsetPart::BytesRead: case ResultsetPart::BytesRead:
writeValue<size_t, DataTypeUInt64>(progress.read_bytes.load(), format.formats[i]); writeValue<size_t, DataTypeUInt64>(progress.read_bytes.load(), format.escaping_rules[i]);
break; break;
default: default:
break; break;
@ -240,7 +214,7 @@ void registerOutputFormatTemplate(FormatFactory & factory)
{ {
/// Default format string: "${data}" /// Default format string: "${data}"
resultset_format.delimiters.resize(2); resultset_format.delimiters.resize(2);
resultset_format.formats.emplace_back(ParsedTemplateFormatString::ColumnFormat::None); resultset_format.escaping_rules.emplace_back(ParsedTemplateFormatString::EscapingRule::None);
resultset_format.format_idx_to_column_idx.emplace_back(0); resultset_format.format_idx_to_column_idx.emplace_back(0);
resultset_format.column_names.emplace_back("data"); resultset_format.column_names.emplace_back("data");
} }
@ -266,17 +240,5 @@ void registerOutputFormatTemplate(FormatFactory & factory)
return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter); return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter);
}); });
factory.registerOutputFormat("CustomSeparated", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams &,
const FormatSettings & settings)
{
ParsedTemplateFormatString resultset_format = ParsedTemplateFormatString::setupCustomSeparatedResultsetFormat(settings.custom);
ParsedTemplateFormatString row_format = ParsedTemplateFormatString::setupCustomSeparatedRowFormat(settings.custom, sample);
return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.custom.row_between_delimiter);
});
} }
} }

View File

@ -12,7 +12,7 @@ namespace DB
class TemplateBlockOutputFormat : public IOutputFormat class TemplateBlockOutputFormat : public IOutputFormat
{ {
using ColumnFormat = ParsedTemplateFormatString::ColumnFormat; using EscapingRule = FormatSettings::EscapingRule;
public: public:
TemplateBlockOutputFormat(const Block & header_, WriteBuffer & out_, const FormatSettings & settings_, TemplateBlockOutputFormat(const Block & header_, WriteBuffer & out_, const FormatSettings & settings_,
ParsedTemplateFormatString format_, ParsedTemplateFormatString row_format_, ParsedTemplateFormatString format_, ParsedTemplateFormatString row_format_,
@ -47,8 +47,7 @@ protected:
void finalize() override; void finalize() override;
void writeRow(const Chunk & chunk, size_t row_num); void writeRow(const Chunk & chunk, size_t row_num);
void serializeField(const IColumn & column, const ISerialization & serialization, size_t row_num, ColumnFormat format); template <typename U, typename V> void writeValue(U value, EscapingRule escaping_rule);
template <typename U, typename V> void writeValue(U value, ColumnFormat col_format);
protected: protected:
const FormatSettings settings; const FormatSettings settings;

View File

@ -1,6 +1,7 @@
#include <Processors/Formats/Impl/TemplateRowInputFormat.h> #include <Processors/Formats/Impl/TemplateRowInputFormat.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Formats/verbosePrintString.h> #include <Formats/verbosePrintString.h>
#include <Formats/EscapingRuleUtils.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <DataTypes/DataTypeNothing.h> #include <DataTypes/DataTypeNothing.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
@ -38,14 +39,14 @@ TemplateRowInputFormat::TemplateRowInputFormat(const Block & header_, ReadBuffer
format.throwInvalidFormat("Invalid input part", i); format.throwInvalidFormat("Invalid input part", i);
if (has_data) if (has_data)
format.throwInvalidFormat("${data} can occur only once", i); format.throwInvalidFormat("${data} can occur only once", i);
if (format.formats[i] != ColumnFormat::None) if (format.escaping_rules[i] != EscapingRule::None)
format.throwInvalidFormat("${data} must have empty or None deserialization type", i); format.throwInvalidFormat("${data} must have empty or None deserialization type", i);
has_data = true; has_data = true;
format_data_idx = i; format_data_idx = i;
} }
else else
{ {
if (format.formats[i] == ColumnFormat::Xml) if (format.escaping_rules[i] == EscapingRule::XML)
format.throwInvalidFormat("XML deserialization is not supported", i); format.throwInvalidFormat("XML deserialization is not supported", i);
} }
} }
@ -54,7 +55,7 @@ TemplateRowInputFormat::TemplateRowInputFormat(const Block & header_, ReadBuffer
std::vector<UInt8> column_in_format(header_.columns(), false); std::vector<UInt8> column_in_format(header_.columns(), false);
for (size_t i = 0; i < row_format.columnsCount(); ++i) for (size_t i = 0; i < row_format.columnsCount(); ++i)
{ {
if (row_format.formats[i] == ColumnFormat::Xml) if (row_format.escaping_rules[i] == EscapingRule::XML)
row_format.throwInvalidFormat("XML deserialization is not supported", i); row_format.throwInvalidFormat("XML deserialization is not supported", i);
if (row_format.format_idx_to_column_idx[i]) if (row_format.format_idx_to_column_idx[i])
@ -62,7 +63,7 @@ TemplateRowInputFormat::TemplateRowInputFormat(const Block & header_, ReadBuffer
if (header_.columns() <= *row_format.format_idx_to_column_idx[i]) if (header_.columns() <= *row_format.format_idx_to_column_idx[i])
row_format.throwInvalidFormat("Column index " + std::to_string(*row_format.format_idx_to_column_idx[i]) + row_format.throwInvalidFormat("Column index " + std::to_string(*row_format.format_idx_to_column_idx[i]) +
" must be less then number of columns (" + std::to_string(header_.columns()) + ")", i); " must be less then number of columns (" + std::to_string(header_.columns()) + ")", i);
if (row_format.formats[i] == ColumnFormat::None) if (row_format.escaping_rules[i] == EscapingRule::None)
row_format.throwInvalidFormat("Column is not skipped, but deserialization type is None", i); row_format.throwInvalidFormat("Column is not skipped, but deserialization type is None", i);
size_t col_idx = *row_format.format_idx_to_column_idx[i]; size_t col_idx = *row_format.format_idx_to_column_idx[i];
@ -111,12 +112,12 @@ ReturnType TemplateRowInputFormat::tryReadPrefixOrSuffix(size_t & input_part_beg
{ {
skipSpaces(); skipSpaces();
if constexpr (throw_exception) if constexpr (throw_exception)
skipField(format.formats[input_part_beg]); skipField(format.escaping_rules[input_part_beg]);
else else
{ {
try try
{ {
skipField(format.formats[input_part_beg]); skipField(format.escaping_rules[input_part_beg]);
} }
catch (const Exception & e) catch (const Exception & e)
{ {
@ -176,7 +177,7 @@ bool TemplateRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
extra.read_columns[col_idx] = deserializeField(data_types[col_idx], serializations[col_idx], *columns[col_idx], i); extra.read_columns[col_idx] = deserializeField(data_types[col_idx], serializations[col_idx], *columns[col_idx], i);
} }
else else
skipField(row_format.formats[i]); skipField(row_format.escaping_rules[i]);
} }
@ -192,49 +193,14 @@ bool TemplateRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
bool TemplateRowInputFormat::deserializeField(const DataTypePtr & type, bool TemplateRowInputFormat::deserializeField(const DataTypePtr & type,
const SerializationPtr & serialization, IColumn & column, size_t file_column) const SerializationPtr & serialization, IColumn & column, size_t file_column)
{ {
ColumnFormat col_format = row_format.formats[file_column]; EscapingRule escaping_rule = row_format.escaping_rules[file_column];
bool read = true; if (escaping_rule == EscapingRule::CSV)
bool parse_as_nullable = settings.null_as_default && !type->isNullable() && !type->isLowCardinalityNullable(); /// Will read unquoted string until settings.csv.delimiter
settings.csv.delimiter = row_format.delimiters[file_column + 1].empty() ? default_csv_delimiter :
row_format.delimiters[file_column + 1].front();
try try
{ {
switch (col_format) return deserializeFieldByEscapingRule(type, serialization, column, buf, escaping_rule, settings);
{
case ColumnFormat::Escaped:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextEscapedImpl(column, buf, settings, serialization);
else
serialization->deserializeTextEscaped(column, buf, settings);
break;
case ColumnFormat::Quoted:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextQuotedImpl(column, buf, settings, serialization);
else
serialization->deserializeTextQuoted(column, buf, settings);
break;
case ColumnFormat::Csv:
/// Will read unquoted string until settings.csv.delimiter
settings.csv.delimiter = row_format.delimiters[file_column + 1].empty() ? default_csv_delimiter :
row_format.delimiters[file_column + 1].front();
if (parse_as_nullable)
read = SerializationNullable::deserializeTextCSVImpl(column, buf, settings, serialization);
else
serialization->deserializeTextCSV(column, buf, settings);
break;
case ColumnFormat::Json:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextJSONImpl(column, buf, settings, serialization);
else
serialization->deserializeTextJSON(column, buf, settings);
break;
case ColumnFormat::Raw:
if (parse_as_nullable)
read = SerializationNullable::deserializeTextRawImpl(column, buf, settings, serialization);
else
serialization->deserializeTextRaw(column, buf, settings);
break;
default:
__builtin_unreachable();
}
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -242,36 +208,13 @@ bool TemplateRowInputFormat::deserializeField(const DataTypePtr & type,
throwUnexpectedEof(); throwUnexpectedEof();
throw; throw;
} }
return read;
} }
void TemplateRowInputFormat::skipField(TemplateRowInputFormat::ColumnFormat col_format) void TemplateRowInputFormat::skipField(TemplateRowInputFormat::EscapingRule escaping_rule)
{ {
String tmp;
constexpr const char * field_name = "<SKIPPED COLUMN>";
constexpr size_t field_name_len = 16;
try try
{ {
switch (col_format) skipFieldByEscapingRule(buf, escaping_rule, settings);
{
case ColumnFormat::None:
/// Empty field, just skip spaces
break;
case ColumnFormat::Escaped:
readEscapedString(tmp, buf);
break;
case ColumnFormat::Quoted:
readQuotedString(tmp, buf);
break;
case ColumnFormat::Csv:
readCSVString(tmp, buf, settings.csv);
break;
case ColumnFormat::Json:
skipJSONField(buf, StringRef(field_name, field_name_len));
break;
default:
__builtin_unreachable();
}
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -344,29 +287,13 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col
out << "\nUsing format string (from format_schema_rows): " << row_format.dump() << "\n"; out << "\nUsing format string (from format_schema_rows): " << row_format.dump() << "\n";
out << "\nTrying to parse next row, because suffix does not match:\n"; out << "\nTrying to parse next row, because suffix does not match:\n";
try if (likely(row_num != 1) && !parseDelimiterWithDiagnosticInfo(out, buf, row_between_delimiter, "delimiter between rows", ignore_spaces))
{
if (likely(row_num != 1))
assertString(row_between_delimiter, buf);
}
catch (const DB::Exception &)
{
writeErrorStringForWrongDelimiter(out, "delimiter between rows", row_between_delimiter);
return false; return false;
}
for (size_t i = 0; i < row_format.columnsCount(); ++i) for (size_t i = 0; i < row_format.columnsCount(); ++i)
{ {
skipSpaces(); if (!parseDelimiterWithDiagnosticInfo(out, buf, row_format.delimiters[i], "delimiter before field " + std::to_string(i), ignore_spaces))
try
{
assertString(row_format.delimiters[i], buf);
}
catch (const DB::Exception &)
{
writeErrorStringForWrongDelimiter(out, "delimiter before field " + std::to_string(i), row_format.delimiters[i]);
return false; return false;
}
skipSpaces(); skipSpaces();
if (row_format.format_idx_to_column_idx[i]) if (row_format.format_idx_to_column_idx[i])
@ -377,7 +304,7 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col
*columns[col_idx], out, i)) *columns[col_idx], out, i))
{ {
out << "Maybe it's not possible to deserialize field " + std::to_string(i) + out << "Maybe it's not possible to deserialize field " + std::to_string(i) +
" as " + ParsedTemplateFormatString::formatToString(row_format.formats[i]); " as " + escapingRuleToString(row_format.escaping_rules[i]);
return false; return false;
} }
} }
@ -391,39 +318,39 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col
} }
} }
skipSpaces(); return parseDelimiterWithDiagnosticInfo(out, buf, row_format.delimiters.back(), "delimiter after last field", ignore_spaces);
}
bool parseDelimiterWithDiagnosticInfo(WriteBuffer & out, ReadBuffer & buf, const String & delimiter, const String & description, bool skip_spaces)
{
if (skip_spaces)
skipWhitespaceIfAny(buf);
try try
{ {
assertString(row_format.delimiters.back(), buf); assertString(delimiter, buf);
} }
catch (const DB::Exception &) catch (const DB::Exception &)
{ {
writeErrorStringForWrongDelimiter(out, "delimiter after last field", row_format.delimiters.back()); out << "ERROR: There is no " << description << ": expected ";
verbosePrintString(delimiter.data(), delimiter.data() + delimiter.size(), out);
out << ", got ";
if (buf.eof())
out << "<End of stream>";
else
verbosePrintString(buf.position(), std::min(buf.position() + delimiter.size() + 10, buf.buffer().end()), out);
out << '\n';
return false; return false;
} }
return true; return true;
} }
void TemplateRowInputFormat::writeErrorStringForWrongDelimiter(WriteBuffer & out, const String & description, const String & delim)
{
out << "ERROR: There is no " << description << ": expected ";
verbosePrintString(delim.data(), delim.data() + delim.size(), out);
out << ", got ";
if (buf.eof())
out << "<End of stream>";
else
verbosePrintString(buf.position(), std::min(buf.position() + delim.size() + 10, buf.buffer().end()), out);
out << '\n';
}
void TemplateRowInputFormat::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column) void TemplateRowInputFormat::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
{ {
const auto & index = row_format.format_idx_to_column_idx[file_column]; const auto & index = row_format.format_idx_to_column_idx[file_column];
if (index) if (index)
deserializeField(type, serializations[*index], column, file_column); deserializeField(type, serializations[*index], column, file_column);
else else
skipField(row_format.formats[file_column]); skipField(row_format.escaping_rules[file_column]);
} }
bool TemplateRowInputFormat::isGarbageAfterField(size_t, ReadBuffer::Position) bool TemplateRowInputFormat::isGarbageAfterField(size_t, ReadBuffer::Position)
@ -439,62 +366,13 @@ bool TemplateRowInputFormat::allowSyncAfterError() const
void TemplateRowInputFormat::syncAfterError() void TemplateRowInputFormat::syncAfterError()
{ {
bool at_beginning_of_row_or_eof = false; skipToNextRowOrEof(buf, row_format.delimiters.back(), row_between_delimiter, ignore_spaces);
while (!at_beginning_of_row_or_eof) end_of_stream = buf.eof();
{
skipToNextDelimiterOrEof(row_format.delimiters.back());
if (buf.eof())
{
end_of_stream = true;
return;
}
buf.ignore(row_format.delimiters.back().size());
skipSpaces();
if (checkForSuffix())
return;
bool last_delimiter_in_row_found = !row_format.delimiters.back().empty();
if (last_delimiter_in_row_found && checkString(row_between_delimiter, buf))
at_beginning_of_row_or_eof = true;
else
skipToNextDelimiterOrEof(row_between_delimiter);
if (buf.eof())
at_beginning_of_row_or_eof = end_of_stream = true;
}
/// It can happen that buf.position() is not at the beginning of row /// It can happen that buf.position() is not at the beginning of row
/// if some delimiters is similar to row_format.delimiters.back() and row_between_delimiter. /// if some delimiters is similar to row_format.delimiters.back() and row_between_delimiter.
/// It will cause another parsing error. /// It will cause another parsing error.
} }
/// Searches for delimiter in input stream and sets buffer position to the beginning of delimiter (if found) or EOF (if not)
void TemplateRowInputFormat::skipToNextDelimiterOrEof(const String & delimiter)
{
if (delimiter.empty())
return;
while (!buf.eof())
{
void * pos = memchr(buf.position(), delimiter[0], buf.available());
if (!pos)
{
buf.position() += buf.available();
continue;
}
buf.position() = static_cast<ReadBuffer::Position>(pos);
PeekableReadBufferCheckpoint checkpoint{buf};
if (checkString(delimiter, buf))
return;
buf.rollbackToCheckpoint();
++buf.position();
}
}
void TemplateRowInputFormat::throwUnexpectedEof() void TemplateRowInputFormat::throwUnexpectedEof()
{ {
throw ParsingException("Unexpected EOF while parsing row " + std::to_string(row_num) + ". " throw ParsingException("Unexpected EOF while parsing row " + std::to_string(row_num) + ". "
@ -524,7 +402,7 @@ void registerInputFormatTemplate(FormatFactory & factory)
{ {
/// Default format string: "${data}" /// Default format string: "${data}"
resultset_format.delimiters.resize(2); resultset_format.delimiters.resize(2);
resultset_format.formats.emplace_back(ParsedTemplateFormatString::ColumnFormat::None); resultset_format.escaping_rules.emplace_back(ParsedTemplateFormatString::EscapingRule::None);
resultset_format.format_idx_to_column_idx.emplace_back(0); resultset_format.format_idx_to_column_idx.emplace_back(0);
resultset_format.column_names.emplace_back("data"); resultset_format.column_names.emplace_back("data");
} }
@ -554,21 +432,6 @@ void registerInputFormatTemplate(FormatFactory & factory)
return std::make_shared<TemplateRowInputFormat>(sample, buf, params, settings, ignore_spaces, resultset_format, row_format, settings.template_settings.row_between_delimiter); return std::make_shared<TemplateRowInputFormat>(sample, buf, params, settings, ignore_spaces, resultset_format, row_format, settings.template_settings.row_between_delimiter);
}); });
} }
for (bool ignore_spaces : {false, true})
{
factory.registerInputFormat(ignore_spaces ? "CustomSeparatedIgnoreSpaces" : "CustomSeparated", [=](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
ParsedTemplateFormatString resultset_format = ParsedTemplateFormatString::setupCustomSeparatedResultsetFormat(settings.custom);
ParsedTemplateFormatString row_format = ParsedTemplateFormatString::setupCustomSeparatedRowFormat(settings.custom, sample);
return std::make_shared<TemplateRowInputFormat>(sample, buf, params, settings, ignore_spaces, resultset_format, row_format, settings.custom.row_between_delimiter);
});
}
} }
} }

View File

@ -13,7 +13,7 @@ namespace DB
class TemplateRowInputFormat : public RowInputFormatWithDiagnosticInfo class TemplateRowInputFormat : public RowInputFormatWithDiagnosticInfo
{ {
using ColumnFormat = ParsedTemplateFormatString::ColumnFormat; using EscapingRule = FormatSettings::EscapingRule;
public: public:
TemplateRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_, TemplateRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
FormatSettings settings_, bool ignore_spaces_, FormatSettings settings_, bool ignore_spaces_,
@ -35,7 +35,7 @@ private:
bool deserializeField(const DataTypePtr & type, bool deserializeField(const DataTypePtr & type,
const SerializationPtr & serialization, IColumn & column, size_t file_column); const SerializationPtr & serialization, IColumn & column, size_t file_column);
void skipField(ColumnFormat col_format); void skipField(EscapingRule escaping_rule);
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(buf); } inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(buf); }
template <typename ReturnType = void> template <typename ReturnType = void>
@ -47,11 +47,7 @@ private:
void tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column) override; void tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column) override;
bool isGarbageAfterField(size_t after_col_idx, ReadBuffer::Position pos) override; bool isGarbageAfterField(size_t after_col_idx, ReadBuffer::Position pos) override;
void writeErrorStringForWrongDelimiter(WriteBuffer & out, const String & description, const String & delim);
void skipToNextDelimiterOrEof(const String & delimiter);
private:
PeekableReadBuffer buf; PeekableReadBuffer buf;
const DataTypes data_types; const DataTypes data_types;
@ -68,4 +64,6 @@ private:
const std::string row_between_delimiter; const std::string row_between_delimiter;
}; };
bool parseDelimiterWithDiagnosticInfo(WriteBuffer & out, ReadBuffer & buf, const String & delimiter, const String & description, bool skip_spaces);
} }

View File

@ -75,6 +75,11 @@ void RowInputFormatWithNamesAndTypes::addInputColumn(const String & column_name,
void RowInputFormatWithNamesAndTypes::readPrefix() void RowInputFormatWithNamesAndTypes::readPrefix()
{ {
/// This is a bit of abstraction leakage, but we need it in parallel parsing:
/// we check if this InputFormat is working with the "real" beginning of the data.
if (getCurrentUnitNumber() != 0)
return;
if (with_names || with_types || data_types.at(0)->textCanContainOnlyValidUTF8()) if (with_names || with_types || data_types.at(0)->textCanContainOnlyValidUTF8())
{ {
/// We assume that column name or type cannot contain BOM, so, if format has header, /// We assume that column name or type cannot contain BOM, so, if format has header,
@ -82,9 +87,12 @@ void RowInputFormatWithNamesAndTypes::readPrefix()
skipBOMIfExists(*in); skipBOMIfExists(*in);
} }
/// Skip prefix before names and types.
skipPrefixBeforeHeader();
/// This is a bit of abstraction leakage, but we need it in parallel parsing: /// This is a bit of abstraction leakage, but we need it in parallel parsing:
/// we check if this InputFormat is working with the "real" beginning of the data. /// we check if this InputFormat is working with the "real" beginning of the data.
if (with_names && getCurrentUnitNumber() == 0) if (with_names)
{ {
if (format_settings.with_names_use_header) if (format_settings.with_names_use_header)
{ {
@ -108,8 +116,10 @@ void RowInputFormatWithNamesAndTypes::readPrefix()
else if (!column_mapping->is_set) else if (!column_mapping->is_set)
setupAllColumnsByTableSchema(); setupAllColumnsByTableSchema();
if (with_types && getCurrentUnitNumber() == 0) if (with_types)
{ {
/// Skip delimiter between names and types.
skipRowBetweenDelimiter();
if (format_settings.with_types_use_header) if (format_settings.with_types_use_header)
{ {
auto types = readTypes(); auto types = readTypes();
@ -148,10 +158,20 @@ void RowInputFormatWithNamesAndTypes::insertDefaultsForNotSeenColumns(MutableCol
bool RowInputFormatWithNamesAndTypes::readRow(MutableColumns & columns, RowReadExtension & ext) bool RowInputFormatWithNamesAndTypes::readRow(MutableColumns & columns, RowReadExtension & ext)
{ {
if (in->eof()) if (unlikely(end_of_stream))
return false; return false;
if (unlikely(checkForSuffix()))
{
end_of_stream = true;
return false;
}
updateDiagnosticInfo(); updateDiagnosticInfo();
if (likely(row_num != 1 || getCurrentUnitNumber() == 0 && (with_names || with_types)))
skipRowBetweenDelimiter();
skipRowStartDelimiter(); skipRowStartDelimiter();
ext.read_columns.resize(data_types.size()); ext.read_columns.resize(data_types.size());
@ -190,6 +210,7 @@ void RowInputFormatWithNamesAndTypes::resetParser()
column_mapping->column_indexes_for_input_fields.clear(); column_mapping->column_indexes_for_input_fields.clear();
column_mapping->not_presented_columns.clear(); column_mapping->not_presented_columns.clear();
column_mapping->names_of_columns.clear(); column_mapping->names_of_columns.clear();
end_of_stream = false;
} }
void RowInputFormatWithNamesAndTypes::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column) void RowInputFormatWithNamesAndTypes::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
@ -215,6 +236,12 @@ bool RowInputFormatWithNamesAndTypes::parseRowAndPrintDiagnosticInfo(MutableColu
return false; return false;
} }
if (!tryParseSuffixWithDiagnosticInfo(out))
return false;
if (likely(row_num != 1) && !parseRowBetweenDelimiterWithDiagnosticInfo(out))
return false;
if (!parseRowStartWithDiagnosticInfo(out)) if (!parseRowStartWithDiagnosticInfo(out))
return false; return false;

View File

@ -46,16 +46,22 @@ protected:
virtual void skipTypes() = 0; virtual void skipTypes() = 0;
/// Skip delimiters, if any. /// Skip delimiters, if any.
virtual void skipPrefixBeforeHeader() {}
virtual void skipRowStartDelimiter() {} virtual void skipRowStartDelimiter() {}
virtual void skipFieldDelimiter() {} virtual void skipFieldDelimiter() {}
virtual void skipRowEndDelimiter() {} virtual void skipRowEndDelimiter() {}
virtual void skipRowBetweenDelimiter() {}
/// Check suffix.
virtual bool checkForSuffix() { return in->eof(); }
/// Methods for parsing with diagnostic info. /// Methods for parsing with diagnostic info.
virtual void checkNullValueForNonNullable(DataTypePtr) {} virtual void checkNullValueForNonNullable(DataTypePtr) {}
virtual bool parseRowStartWithDiagnosticInfo(WriteBuffer &) { return true; } virtual bool parseRowStartWithDiagnosticInfo(WriteBuffer &) { return true; }
virtual bool parseFieldDelimiterWithDiagnosticInfo(WriteBuffer &) { return true; } virtual bool parseFieldDelimiterWithDiagnosticInfo(WriteBuffer &) { return true; }
virtual bool parseRowEndWithDiagnosticInfo(WriteBuffer &) { return true;} virtual bool parseRowEndWithDiagnosticInfo(WriteBuffer &) { return true;}
virtual bool parseRowBetweenDelimiterWithDiagnosticInfo(WriteBuffer &) { return true;}
virtual bool tryParseSuffixWithDiagnosticInfo(WriteBuffer &) { return true; }
bool isGarbageAfterField(size_t, ReadBuffer::Position) override {return false; } bool isGarbageAfterField(size_t, ReadBuffer::Position) override {return false; }
/// Read row with names and return the list of them. /// Read row with names and return the list of them.
@ -65,6 +71,7 @@ protected:
const FormatSettings format_settings; const FormatSettings format_settings;
DataTypes data_types; DataTypes data_types;
bool end_of_stream = false;
private: private:
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out) override; bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out) override;