This commit is contained in:
Alexander Tokmakov 2019-05-16 05:05:44 +03:00 committed by Alexander Tokmakov
parent 3e214ced62
commit 6fd1319bba
2 changed files with 112 additions and 117 deletions

View File

@ -35,104 +35,60 @@ namespace ErrorCodes
ValuesBlockInputStream::ValuesBlockInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_,
const FormatSettings & format_settings, UInt64 max_block_size_)
: istr(istr_), header(header_), context(std::make_unique<Context>(context_)),
format_settings(format_settings), max_block_size(max_block_size_)
format_settings(format_settings), max_block_size(max_block_size_), num_columns(header.columns())
{
templates.resize(header.columns());
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(istr);
}
bool ValuesBlockInputStream::read(MutableColumns & columns)
Block ValuesBlockInputStream::readImpl()
{
size_t num_columns = columns.size();
MutableColumns columns = header.cloneEmptyColumns();
for (size_t rows_in_block = 0; rows_in_block < max_block_size; ++rows_in_block)
{
try
{
skipWhitespaceIfAny(istr);
if (istr.eof() || *istr.position() == ';')
return false;
/** Typically, this is the usual format for streaming parsing.
* But as an exception, it also supports processing arbitrary expressions instead of values.
* This is very inefficient. But if there are no expressions, then there is no overhead.
*/
break;
assertChar('(', istr);
for (size_t i = 0; i < num_columns; ++i)
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
{
skipWhitespaceIfAny(istr);
istr.setCheckpoint();
bool rollback_on_exception = false;
bool parse_separate_value = true;
if (templates[column_idx])
{
/// Try to parse expression using template if one was successfully generated while parsing the first row
try
{
if (templates[i])
{
templates[i].value().parseExpression(istr, format_settings);
templates[column_idx].value().parseExpression(istr, format_settings);
assertDelimAfterValue(column_idx);
parse_separate_value = false;
}
else
catch (DB::Exception & e)
{
header.getByPosition(i).type->deserializeAsTextQuoted(*columns[i], istr, format_settings);
rollback_on_exception = true;
}
skipWhitespaceIfAny(istr);
if (i != num_columns - 1)
assertChar(',', istr);
else
assertChar(')', istr);
}
catch (const Exception & e)
{
if (!format_settings.values.interpret_expressions)
if (e.code() != ErrorCodes::CANNOT_PARSE_EXPRESSION_USING_TEMPLATE)
throw;
/** The normal streaming parser could not parse the value.
* Let's try to parse it with a SQL parser as a constant expression.
* This is an exceptional case.
*/
if (e.code() == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED
|| e.code() == ErrorCodes::CANNOT_PARSE_QUOTED_STRING
|| e.code() == ErrorCodes::CANNOT_PARSE_NUMBER
|| e.code() == ErrorCodes::CANNOT_PARSE_DATE
|| e.code() == ErrorCodes::CANNOT_PARSE_DATETIME
|| e.code() == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT
|| e.code() == ErrorCodes::CANNOT_PARSE_EXPRESSION_USING_TEMPLATE)
{
if (rollback_on_exception)
columns[i]->popBack(1);
// TODO read(MutableColumns & columns) should not know number of rows in block an should not assign to columns
if (likely(rows_in_block))
{
if (e.code() == ErrorCodes::CANNOT_PARSE_EXPRESSION_USING_TEMPLATE)
{
/// Expression in the current row is not match generated on the first row template.
/// Evaluate expressions, which were parsed using this template.
columns[i] = std::move(*templates[i].value().evaluateAll()).mutate();
/// And do not use the template anymore.
templates[i].reset();
}
else if (templates[i])
throw;
}
columns[column_idx] = std::move(*templates[column_idx].value().evaluateAll()).mutate();
/// Do not use the template anymore and fallback to slow SQL parser
templates[column_idx].reset();
istr.rollbackToCheckpoint();
parseExpression(columns, i, rows_in_block == 0);
skipWhitespaceIfAny(istr);
if (i != num_columns - 1)
assertChar(',', istr);
else
assertChar(')', istr);
}
else
throw;
}
/// Parse value using fast streaming parser for literals and slow SQL parser for expressions.
/// If there is SQL expression in the first row, template of this expression will be generated,
/// so it makes possible to parse next rows much faster if expressions in next rows have the same structure
if (parse_separate_value)
readValue(*columns[column_idx], column_idx, rows_in_block == 0);
istr.dropCheckpoint();
}
@ -140,19 +96,6 @@ bool ValuesBlockInputStream::read(MutableColumns & columns)
if (!istr.eof() && *istr.position() == ',')
++istr.position();
return true;
}
Block ValuesBlockInputStream::readImpl()
{
MutableColumns columns = header.cloneEmptyColumns();
for (rows_in_block = 0; rows_in_block < max_block_size; ++rows_in_block)
{
try
{
if (!read(columns))
break;
++total_rows;
}
catch (Exception & e)
@ -163,7 +106,7 @@ Block ValuesBlockInputStream::readImpl()
}
}
/// Evaluate expressions, which were parsed using this template, if any
/// Evaluate expressions, which were parsed using templates, if any
for (size_t i = 0; i < columns.size(); ++i)
{
if (templates[i])
@ -179,8 +122,47 @@ Block ValuesBlockInputStream::readImpl()
return header.cloneWithColumns(std::move(columns));
}
Field
ValuesBlockInputStream::parseExpression(MutableColumns & columns, size_t column_idx, bool generate_template)
void ValuesBlockInputStream::readValue(IColumn & column, size_t column_idx, bool generate_template)
{
bool rollback_on_exception = false;
try
{
header.getByPosition(column_idx).type->deserializeAsTextQuoted(column, istr, format_settings);
rollback_on_exception = true;
skipWhitespaceIfAny(istr);
assertDelimAfterValue(column_idx);
}
catch (const Exception & e)
{
if (!format_settings.values.interpret_expressions)
throw;
/** The normal streaming parser could not parse the value.
* Let's try to parse it with a SQL parser as a constant expression.
* This is an exceptional case.
*/
if (e.code() == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED
|| e.code() == ErrorCodes::CANNOT_PARSE_QUOTED_STRING
|| e.code() == ErrorCodes::CANNOT_PARSE_NUMBER
|| e.code() == ErrorCodes::CANNOT_PARSE_DATE
|| e.code() == ErrorCodes::CANNOT_PARSE_DATETIME
|| e.code() == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT)
{
if (rollback_on_exception)
column.popBack(1);
istr.rollbackToCheckpoint();
parseExpression(column, column_idx, generate_template);
}
else
throw;
}
}
void
ValuesBlockInputStream::parseExpression(IColumn & column, size_t column_idx, bool generate_template)
{
const IDataType & type = *header.getByPosition(column_idx).type;
@ -217,25 +199,37 @@ ValuesBlockInputStream::parseExpression(MutableColumns & columns, size_t column_
if (generate_template)
{
if (templates[column_idx])
throw DB::Exception("Template for column " + std::to_string(column_idx) + " already exists and it was not evaluated yet",
ErrorCodes::LOGICAL_ERROR);
try
{
templates[column_idx] = ConstantExpressionTemplate(type, TokenIterator(tokens), token_iterator, *context);
istr.rollbackToCheckpoint();
templates[column_idx].value().parseExpression(istr, format_settings);
assertDelimAfterValue(column_idx);
return;
}
catch (DB::Exception &)
catch (...)
{
/// Continue parsing without template
templates[column_idx].reset();
columns[column_idx]->insert(value);
istr.position() = const_cast<char *>(token_iterator->begin);
}
}
assertDelimAfterValue(column_idx);
column.insert(value);
}
void ValuesBlockInputStream::assertDelimAfterValue(size_t column_idx)
{
skipWhitespaceIfAny(istr);
if (column_idx + 1 != num_columns)
assertChar(',', istr);
else
{
columns[column_idx]->insert(value);
}
return value;
assertChar(')', istr);
}

View File

@ -32,14 +32,14 @@ public:
void readPrefix() override { }
void readSuffix() override { }
bool read(MutableColumns & columns);
private:
typedef std::vector<std::optional<ConstantExpressionTemplate>> ConstantExpressionTemplates;
Block readImpl() override;
void readValue(IColumn & column, size_t column_idx, bool generate_template);
Field parseExpression(MutableColumns & columns, size_t column_idx, bool generate_template);
void parseExpression(IColumn & column, size_t column_idx, bool generate_template);
inline void assertDelimAfterValue(size_t column_idx);
private:
PeekableReadBuffer istr;
@ -47,8 +47,9 @@ private:
std::unique_ptr<Context> context; /// pimpl
const FormatSettings format_settings;
UInt64 max_block_size;
UInt64 rows_in_block = 0;
size_t num_columns;
size_t total_rows = 0;
ParserExpression parser;
ConstantExpressionTemplates templates;
};