From 6fd1319bba4d4f4f11749317013d8f6b2e05dc73 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Thu, 16 May 2019 05:05:44 +0300 Subject: [PATCH] Fixes --- dbms/src/Formats/ValuesBlockInputStream.cpp | 220 ++++++++++---------- dbms/src/Formats/ValuesBlockInputStream.h | 9 +- 2 files changed, 112 insertions(+), 117 deletions(-) diff --git a/dbms/src/Formats/ValuesBlockInputStream.cpp b/dbms/src/Formats/ValuesBlockInputStream.cpp index 549f3fee3d5..0eccdab19ca 100644 --- a/dbms/src/Formats/ValuesBlockInputStream.cpp +++ b/dbms/src/Formats/ValuesBlockInputStream.cpp @@ -35,124 +35,67 @@ namespace ErrorCodes ValuesBlockInputStream::ValuesBlockInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, const FormatSettings & format_settings, UInt64 max_block_size_) : istr(istr_), header(header_), context(std::make_unique(context_)), - format_settings(format_settings), max_block_size(max_block_size_) + format_settings(format_settings), max_block_size(max_block_size_), num_columns(header.columns()) { templates.resize(header.columns()); /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. skipBOMIfExists(istr); } - -bool ValuesBlockInputStream::read(MutableColumns & columns) -{ - size_t num_columns = columns.size(); - - skipWhitespaceIfAny(istr); - - if (istr.eof() || *istr.position() == ';') - return false; - - /** Typically, this is the usual format for streaming parsing. - * But as an exception, it also supports processing arbitrary expressions instead of values. - * This is very inefficient. But if there are no expressions, then there is no overhead. - */ - - assertChar('(', istr); - - for (size_t i = 0; i < num_columns; ++i) - { - skipWhitespaceIfAny(istr); - istr.setCheckpoint(); - - bool rollback_on_exception = false; - try - { - if (templates[i]) - { - templates[i].value().parseExpression(istr, format_settings); - } - else - { - header.getByPosition(i).type->deserializeAsTextQuoted(*columns[i], istr, format_settings); - rollback_on_exception = true; - } - - skipWhitespaceIfAny(istr); - - if (i != num_columns - 1) - assertChar(',', istr); - else - assertChar(')', istr); - } - catch (const Exception & e) - { - if (!format_settings.values.interpret_expressions) - throw; - - /** The normal streaming parser could not parse the value. - * Let's try to parse it with a SQL parser as a constant expression. - * This is an exceptional case. - */ - if (e.code() == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED - || e.code() == ErrorCodes::CANNOT_PARSE_QUOTED_STRING - || e.code() == ErrorCodes::CANNOT_PARSE_NUMBER - || e.code() == ErrorCodes::CANNOT_PARSE_DATE - || e.code() == ErrorCodes::CANNOT_PARSE_DATETIME - || e.code() == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT - || e.code() == ErrorCodes::CANNOT_PARSE_EXPRESSION_USING_TEMPLATE) - { - if (rollback_on_exception) - columns[i]->popBack(1); - - // TODO read(MutableColumns & columns) should not know number of rows in block an should not assign to columns - if (likely(rows_in_block)) - { - if (e.code() == ErrorCodes::CANNOT_PARSE_EXPRESSION_USING_TEMPLATE) - { - /// Expression in the current row is not match generated on the first row template. - /// Evaluate expressions, which were parsed using this template. - columns[i] = std::move(*templates[i].value().evaluateAll()).mutate(); - /// And do not use the template anymore. - templates[i].reset(); - } - else if (templates[i]) - throw; - } - - istr.rollbackToCheckpoint(); - parseExpression(columns, i, rows_in_block == 0); - - skipWhitespaceIfAny(istr); - - if (i != num_columns - 1) - assertChar(',', istr); - else - assertChar(')', istr); - } - else - throw; - } - - istr.dropCheckpoint(); - } - - skipWhitespaceIfAny(istr); - if (!istr.eof() && *istr.position() == ',') - ++istr.position(); - - return true; -} - Block ValuesBlockInputStream::readImpl() { MutableColumns columns = header.cloneEmptyColumns(); - for (rows_in_block = 0; rows_in_block < max_block_size; ++rows_in_block) + for (size_t rows_in_block = 0; rows_in_block < max_block_size; ++rows_in_block) { try { - if (!read(columns)) + skipWhitespaceIfAny(istr); + if (istr.eof() || *istr.position() == ';') break; + assertChar('(', istr); + + for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) + { + skipWhitespaceIfAny(istr); + istr.setCheckpoint(); + + bool parse_separate_value = true; + if (templates[column_idx]) + { + /// Try to parse expression using template if one was successfully generated while parsing the first row + try + { + templates[column_idx].value().parseExpression(istr, format_settings); + assertDelimAfterValue(column_idx); + parse_separate_value = false; + } + catch (DB::Exception & e) + { + if (e.code() != ErrorCodes::CANNOT_PARSE_EXPRESSION_USING_TEMPLATE) + throw; + /// Expression in the current row is not match generated on the first row template. + /// Evaluate expressions, which were parsed using this template. + columns[column_idx] = std::move(*templates[column_idx].value().evaluateAll()).mutate(); + /// Do not use the template anymore and fallback to slow SQL parser + templates[column_idx].reset(); + istr.rollbackToCheckpoint(); + } + } + + /// Parse value using fast streaming parser for literals and slow SQL parser for expressions. + /// If there is SQL expression in the first row, template of this expression will be generated, + /// so it makes possible to parse next rows much faster if expressions in next rows have the same structure + if (parse_separate_value) + readValue(*columns[column_idx], column_idx, rows_in_block == 0); + + istr.dropCheckpoint(); + } + + skipWhitespaceIfAny(istr); + if (!istr.eof() && *istr.position() == ',') + ++istr.position(); + ++total_rows; } catch (Exception & e) @@ -163,7 +106,7 @@ Block ValuesBlockInputStream::readImpl() } } - /// Evaluate expressions, which were parsed using this template, if any + /// Evaluate expressions, which were parsed using templates, if any for (size_t i = 0; i < columns.size(); ++i) { if (templates[i]) @@ -179,8 +122,47 @@ Block ValuesBlockInputStream::readImpl() return header.cloneWithColumns(std::move(columns)); } -Field -ValuesBlockInputStream::parseExpression(MutableColumns & columns, size_t column_idx, bool generate_template) +void ValuesBlockInputStream::readValue(IColumn & column, size_t column_idx, bool generate_template) +{ + bool rollback_on_exception = false; + try + { + header.getByPosition(column_idx).type->deserializeAsTextQuoted(column, istr, format_settings); + rollback_on_exception = true; + + skipWhitespaceIfAny(istr); + + assertDelimAfterValue(column_idx); + } + catch (const Exception & e) + { + if (!format_settings.values.interpret_expressions) + throw; + + /** The normal streaming parser could not parse the value. + * Let's try to parse it with a SQL parser as a constant expression. + * This is an exceptional case. + */ + if (e.code() == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED + || e.code() == ErrorCodes::CANNOT_PARSE_QUOTED_STRING + || e.code() == ErrorCodes::CANNOT_PARSE_NUMBER + || e.code() == ErrorCodes::CANNOT_PARSE_DATE + || e.code() == ErrorCodes::CANNOT_PARSE_DATETIME + || e.code() == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT) + { + if (rollback_on_exception) + column.popBack(1); + + istr.rollbackToCheckpoint(); + parseExpression(column, column_idx, generate_template); + } + else + throw; + } +} + +void +ValuesBlockInputStream::parseExpression(IColumn & column, size_t column_idx, bool generate_template) { const IDataType & type = *header.getByPosition(column_idx).type; @@ -217,25 +199,37 @@ ValuesBlockInputStream::parseExpression(MutableColumns & columns, size_t column_ if (generate_template) { + if (templates[column_idx]) + throw DB::Exception("Template for column " + std::to_string(column_idx) + " already exists and it was not evaluated yet", + ErrorCodes::LOGICAL_ERROR); try { templates[column_idx] = ConstantExpressionTemplate(type, TokenIterator(tokens), token_iterator, *context); istr.rollbackToCheckpoint(); templates[column_idx].value().parseExpression(istr, format_settings); + assertDelimAfterValue(column_idx); + return; } - catch (DB::Exception &) + catch (...) { /// Continue parsing without template templates[column_idx].reset(); - columns[column_idx]->insert(value); istr.position() = const_cast(token_iterator->begin); } } + + assertDelimAfterValue(column_idx); + column.insert(value); +} + +void ValuesBlockInputStream::assertDelimAfterValue(size_t column_idx) +{ + skipWhitespaceIfAny(istr); + + if (column_idx + 1 != num_columns) + assertChar(',', istr); else - { - columns[column_idx]->insert(value); - } - return value; + assertChar(')', istr); } diff --git a/dbms/src/Formats/ValuesBlockInputStream.h b/dbms/src/Formats/ValuesBlockInputStream.h index bc4c218b5a2..649b2b6d901 100644 --- a/dbms/src/Formats/ValuesBlockInputStream.h +++ b/dbms/src/Formats/ValuesBlockInputStream.h @@ -32,14 +32,14 @@ public: void readPrefix() override { } void readSuffix() override { } - bool read(MutableColumns & columns); - private: typedef std::vector> ConstantExpressionTemplates; Block readImpl() override; + void readValue(IColumn & column, size_t column_idx, bool generate_template); - Field parseExpression(MutableColumns & columns, size_t column_idx, bool generate_template); + void parseExpression(IColumn & column, size_t column_idx, bool generate_template); + inline void assertDelimAfterValue(size_t column_idx); private: PeekableReadBuffer istr; @@ -47,8 +47,9 @@ private: std::unique_ptr context; /// pimpl const FormatSettings format_settings; UInt64 max_block_size; - UInt64 rows_in_block = 0; + size_t num_columns; size_t total_rows = 0; + ParserExpression parser; ConstantExpressionTemplates templates; };