Added setting 'input_format_values_interpret_expressions' [#METR-23720].

This commit is contained in:
Alexey Milovidov 2016-12-08 03:25:52 +03:00
parent 59da0fd12a
commit 6546b73dcc
4 changed files with 27 additions and 11 deletions

View File

@ -11,18 +11,23 @@ class Context;
class ReadBuffer;
/** Поток для чтения данных в формате VALUES (как в INSERT запросе).
/** Stream to read data in VALUES format (as in INSERT query).
*/
class ValuesRowInputStream : public IRowInputStream
{
public:
ValuesRowInputStream(ReadBuffer & istr_, const Context & context_);
/** Data is parsed using fast, streaming parser.
* If interpret_expressions is true, it will, in addition, try to use SQL parser and interpreter
* in case when streaming parser could not parse field (this is very slow).
*/
ValuesRowInputStream(ReadBuffer & istr_, const Context & context_, bool interpret_expressions_);
bool read(Block & block) override;
private:
ReadBuffer & istr;
const Context & context;
bool interpret_expressions;
};
}

View File

@ -227,6 +227,9 @@ struct Settings
/** Skip columns with unknown names from input data (it works for JSONEachRow and TSKV formats). */ \
M(SettingBool, input_format_skip_unknown_fields, false) \
\
/** For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression. */ \
M(SettingBool, input_format_values_interpret_expressions, true) \
\
/** Controls quoting of 64-bit integers in JSON output format. */ \
M(SettingBool, output_format_json_quote_64bit_integers, true) \
\

View File

@ -44,6 +44,8 @@ namespace ErrorCodes
BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf,
const Block & sample, const Context & context, size_t max_block_size) const
{
const Settings & settings = context.getSettingsRef();
if (name == "Native")
return std::make_shared<NativeBlockInputStream>(buf);
else if (name == "RowBinary")
@ -55,19 +57,20 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
else if (name == "TabSeparatedWithNamesAndTypes")
return std::make_shared<BlockInputStreamFromRowInputStream>(std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, true), sample, max_block_size);
else if (name == "Values")
return std::make_shared<BlockInputStreamFromRowInputStream>(std::make_shared<ValuesRowInputStream>(buf, context), sample, max_block_size);
return std::make_shared<BlockInputStreamFromRowInputStream>(std::make_shared<ValuesRowInputStream>(
buf, context, settings.input_format_values_interpret_expressions), sample, max_block_size);
else if (name == "CSV")
return std::make_shared<BlockInputStreamFromRowInputStream>(std::make_shared<CSVRowInputStream>(buf, sample, ','), sample, max_block_size);
else if (name == "CSVWithNames")
return std::make_shared<BlockInputStreamFromRowInputStream>(std::make_shared<CSVRowInputStream>(buf, sample, ',', true), sample, max_block_size);
else if (name == "TSKV")
{
auto row_stream = std::make_shared<TSKVRowInputStream>(buf, sample, context.getSettingsRef().input_format_skip_unknown_fields);
auto row_stream = std::make_shared<TSKVRowInputStream>(buf, sample, settings.input_format_skip_unknown_fields);
return std::make_shared<BlockInputStreamFromRowInputStream>(std::move(row_stream), sample, max_block_size);
}
else if (name == "JSONEachRow")
{
auto row_stream = std::make_shared<JSONEachRowRowInputStream>(buf, sample, context.getSettingsRef().input_format_skip_unknown_fields);
auto row_stream = std::make_shared<JSONEachRowRowInputStream>(buf, sample, settings.input_format_skip_unknown_fields);
return std::make_shared<BlockInputStreamFromRowInputStream>(std::move(row_stream), sample, max_block_size);
}
else if (name == "TabSeparatedRaw"
@ -95,6 +98,8 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
static BlockOutputStreamPtr getOutputImpl(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context)
{
const Settings & settings = context.getSettingsRef();
if (name == "Native")
return std::make_shared<NativeBlockOutputStream>(buf);
else if (name == "RowBinary")
@ -135,16 +140,16 @@ static BlockOutputStreamPtr getOutputImpl(const String & name, WriteBuffer & buf
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<ValuesRowOutputStream>(buf));
else if (name == "JSON")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONRowOutputStream>(buf, sample,
context.getSettingsRef().output_format_write_statistics, context.getSettingsRef().output_format_json_quote_64bit_integers));
settings.output_format_write_statistics, settings.output_format_json_quote_64bit_integers));
else if (name == "JSONCompact")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONCompactRowOutputStream>(buf, sample,
context.getSettingsRef().output_format_write_statistics, context.getSettingsRef().output_format_json_quote_64bit_integers));
settings.output_format_write_statistics, settings.output_format_json_quote_64bit_integers));
else if (name == "JSONEachRow")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONEachRowRowOutputStream>(buf, sample,
context.getSettingsRef().output_format_json_quote_64bit_integers));
settings.output_format_json_quote_64bit_integers));
else if (name == "XML")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<XMLRowOutputStream>(buf, sample,
context.getSettingsRef().output_format_write_statistics));
settings.output_format_write_statistics));
else if (name == "TSKV")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TSKVRowOutputStream>(buf, sample));
else if (name == "ODBCDriver")

View File

@ -23,8 +23,8 @@ namespace ErrorCodes
}
ValuesRowInputStream::ValuesRowInputStream(ReadBuffer & istr_, const Context & context_)
: istr(istr_), context(context_)
ValuesRowInputStream::ValuesRowInputStream(ReadBuffer & istr_, const Context & context_, bool interpret_expressions_)
: istr(istr_), context(context_), interpret_expressions(interpret_expressions_)
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(istr);
@ -71,6 +71,9 @@ bool ValuesRowInputStream::read(Block & block)
}
catch (const Exception & e)
{
if (!interpret_expressions)
throw;
/** Обычный потоковый парсер не смог распарсить значение.
* Попробуем распарсить его SQL-парсером как константное выражение.
* Это исключительный случай.