From d31358a280808e1ce9d662b587a1e12552ffe44f Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 2 Sep 2019 19:26:22 +0300 Subject: [PATCH] change ValuesBlockInputStream to ValuesBlockInputFormat --- .../Processors/Formats/IRowInputFormat.cpp | 2 +- dbms/src/Processors/Formats/IRowInputFormat.h | 1 + .../Impl}/ConstantExpressionTemplate.cpp | 2 +- .../Impl}/ConstantExpressionTemplate.h | 0 .../Formats/Impl/ValuesBlockInputFormat.cpp} | 114 +++++++++--------- .../Formats/Impl/ValuesBlockInputFormat.h} | 33 ++--- .../Formats/Impl/ValuesRowInputFormat.cpp | 4 +- 7 files changed, 81 insertions(+), 75 deletions(-) rename dbms/src/{Formats => Processors/Formats/Impl}/ConstantExpressionTemplate.cpp (99%) rename dbms/src/{Formats => Processors/Formats/Impl}/ConstantExpressionTemplate.h (100%) rename dbms/src/{Formats/ValuesBlockInputStream.cpp => Processors/Formats/Impl/ValuesBlockInputFormat.cpp} (70%) rename dbms/src/{Formats/ValuesBlockInputStream.h => Processors/Formats/Impl/ValuesBlockInputFormat.h} (63%) diff --git a/dbms/src/Processors/Formats/IRowInputFormat.cpp b/dbms/src/Processors/Formats/IRowInputFormat.cpp index 8ceca472032..b80f91f5089 100644 --- a/dbms/src/Processors/Formats/IRowInputFormat.cpp +++ b/dbms/src/Processors/Formats/IRowInputFormat.cpp @@ -48,7 +48,7 @@ static bool handleOverflowMode(OverflowMode mode, const String & message, int co } -static bool checkTimeLimit(const IRowInputFormat::Params & params, const Stopwatch & stopwatch) +bool checkTimeLimit(const IRowInputFormat::Params & params, const Stopwatch & stopwatch) { if (params.max_execution_time != 0 && stopwatch.elapsed() > static_cast(params.max_execution_time.totalMicroseconds()) * 1000) diff --git a/dbms/src/Processors/Formats/IRowInputFormat.h b/dbms/src/Processors/Formats/IRowInputFormat.h index 5676819bcc2..c59b92fcab9 100644 --- a/dbms/src/Processors/Formats/IRowInputFormat.h +++ b/dbms/src/Processors/Formats/IRowInputFormat.h @@ -37,6 +37,7 @@ struct RowInputFormatParams }; bool isParseError(int code); +bool checkTimeLimit(const RowInputFormatParams & params, const Stopwatch & stopwatch); ///Row oriented input format: reads data row by row. class IRowInputFormat : public IInputFormat diff --git a/dbms/src/Formats/ConstantExpressionTemplate.cpp b/dbms/src/Processors/Formats/Impl/ConstantExpressionTemplate.cpp similarity index 99% rename from dbms/src/Formats/ConstantExpressionTemplate.cpp rename to dbms/src/Processors/Formats/Impl/ConstantExpressionTemplate.cpp index c2cae3429ac..23ff0771abe 100644 --- a/dbms/src/Formats/ConstantExpressionTemplate.cpp +++ b/dbms/src/Processors/Formats/Impl/ConstantExpressionTemplate.cpp @@ -14,7 +14,7 @@ #include #include #include -#include +#include #include diff --git a/dbms/src/Formats/ConstantExpressionTemplate.h b/dbms/src/Processors/Formats/Impl/ConstantExpressionTemplate.h similarity index 100% rename from dbms/src/Formats/ConstantExpressionTemplate.h rename to dbms/src/Processors/Formats/Impl/ConstantExpressionTemplate.h diff --git a/dbms/src/Formats/ValuesBlockInputStream.cpp b/dbms/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp similarity index 70% rename from dbms/src/Formats/ValuesBlockInputStream.cpp rename to dbms/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp index 253609eca03..63d557379e9 100644 --- a/dbms/src/Formats/ValuesBlockInputStream.cpp +++ b/dbms/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp @@ -4,9 +4,8 @@ #include #include #include -#include +#include #include -#include #include #include #include @@ -32,41 +31,42 @@ namespace ErrorCodes } -ValuesBlockInputStream::ValuesBlockInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, - const FormatSettings & format_settings_, UInt64 max_block_size_, UInt64 rows_portion_size_) - : istr(istr_), header(header_), context(std::make_unique(context_)), format_settings(format_settings_), - max_block_size(max_block_size_), rows_portion_size(rows_portion_size_), num_columns(header.columns()), - attempts_to_generate_template(num_columns), rows_parsed_using_template(num_columns) +ValuesBlockInputFormat::ValuesBlockInputFormat(ReadBuffer & in_, const Block & header_, const RowInputFormatParams & params_, + const Context & context_, const FormatSettings & format_settings_) + : IInputFormat(header_, buf), buf(in_), params(params_), context(std::make_unique(context_)), + format_settings(format_settings_), num_columns(header_.columns()), + attempts_to_generate_template(num_columns), rows_parsed_using_template(num_columns), templates(num_columns) { - templates.resize(header.columns()); /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. - skipBOMIfExists(istr); + skipBOMIfExists(buf); + /// TODO remove before merge const_cast(this->format_settings).values.interpret_expressions = false; } -Block ValuesBlockInputStream::readImpl() +Chunk ValuesBlockInputFormat::generate() { + const Block & header = getPort().getHeader(); MutableColumns columns = header.cloneEmptyColumns(); - for (size_t rows_in_block = 0, batch = 0; rows_in_block < max_block_size; ++rows_in_block, ++batch) + for (size_t rows_in_block = 0, batch = 0; rows_in_block < params.max_block_size; ++rows_in_block, ++batch) { - if (rows_portion_size && batch == rows_portion_size) + if (params.rows_portion_size && batch == params.rows_portion_size) { batch = 0; - if (!checkTimeLimit() || isCancelled()) + if (!checkTimeLimit(params, total_stopwatch) || isCancelled()) break; } try { - skipWhitespaceIfAny(istr); - if (istr.eof() || *istr.position() == ';') + skipWhitespaceIfAny(buf); + if (buf.eof() || *buf.position() == ';') break; - assertChar('(', istr); + assertChar('(', buf); for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) { - skipWhitespaceIfAny(istr); - PeekableReadBufferCheckpoint checkpoint{istr}; + skipWhitespaceIfAny(buf); + PeekableReadBufferCheckpoint checkpoint{buf}; bool parse_separate_value = !parseExpressionUsingTemplate(columns[column_idx], column_idx); @@ -77,9 +77,9 @@ Block ValuesBlockInputStream::readImpl() readValue(*columns[column_idx], column_idx, shouldGenerateNewTemplate(column_idx)); } - skipWhitespaceIfAny(istr); - if (!istr.eof() && *istr.position() == ',') - ++istr.position(); + skipWhitespaceIfAny(buf); + if (!buf.eof() && *buf.position() == ',') + ++buf.position(); ++total_rows; } @@ -106,20 +106,24 @@ Block ValuesBlockInputStream::readImpl() } if (columns.empty() || columns[0]->empty()) + { + readSuffix(); return {}; + } - return header.cloneWithColumns(std::move(columns)); + size_t rows_in_block = columns[0]->size(); + return Chunk{std::move(columns), rows_in_block}; } -bool ValuesBlockInputStream::parseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx) +bool ValuesBlockInputFormat::parseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx) { if (templates[column_idx]) { /// Try to parse expression using template if one was successfully generated while parsing the first row try { - templates[column_idx].value().parseExpression(istr, format_settings); - assertDelimAfterValue(column_idx); + templates[column_idx].value().parseExpression(buf, format_settings); + assertDelimiterAfterValue(column_idx); ++rows_parsed_using_template[column_idx]; return true; } @@ -139,23 +143,24 @@ bool ValuesBlockInputStream::parseExpressionUsingTemplate(MutableColumnPtr & col /// Do not use the template anymore and fallback to slow SQL parser templates[column_idx].reset(); ++attempts_to_generate_template[column_idx]; - istr.rollbackToCheckpoint(); + buf.rollbackToCheckpoint(); } } return false; } -void ValuesBlockInputStream::readValue(IColumn & column, size_t column_idx, bool generate_template) +void ValuesBlockInputFormat::readValue(IColumn & column, size_t column_idx, bool generate_template) { bool rollback_on_exception = false; try { - header.getByPosition(column_idx).type->deserializeAsTextQuoted(column, istr, format_settings); + const Block & header = getPort().getHeader(); + header.getByPosition(column_idx).type->deserializeAsTextQuoted(column, buf, format_settings); rollback_on_exception = true; - skipWhitespaceIfAny(istr); + skipWhitespaceIfAny(buf); - assertDelimAfterValue(column_idx); + assertDelimiterAfterValue(column_idx); } catch (const Exception & e) { @@ -176,7 +181,7 @@ void ValuesBlockInputStream::readValue(IColumn & column, size_t column_idx, bool if (rollback_on_exception) column.popBack(1); - istr.rollbackToCheckpoint(); + buf.rollbackToCheckpoint(); parseExpression(column, column_idx, generate_template); } else @@ -185,22 +190,23 @@ void ValuesBlockInputStream::readValue(IColumn & column, size_t column_idx, bool } void -ValuesBlockInputStream::parseExpression(IColumn & column, size_t column_idx, bool generate_template) +ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx, bool generate_template) { + const Block & header = getPort().getHeader(); const IDataType & type = *header.getByPosition(column_idx).type; Expected expected; // TODO make tokenizer to work with buffers, not only with continuous memory - Tokens tokens(istr.position(), istr.buffer().end()); + Tokens tokens(buf.position(), buf.buffer().end()); IParser::Pos token_iterator(tokens); ASTPtr ast; if (!parser.parse(token_iterator, ast, expected)) { - istr.rollbackToCheckpoint(); + buf.rollbackToCheckpoint(); throw Exception("Cannot parse expression of type " + type.getName() + " here: " - + String(istr.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, istr.buffer().end() - istr.position())), + + String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position())), ErrorCodes::SYNTAX_ERROR); } @@ -210,15 +216,15 @@ ValuesBlockInputStream::parseExpression(IColumn & column, size_t column_idx, boo /// Check that we are indeed allowed to insert a NULL. if (value.isNull() && !type.isNullable()) { - istr.rollbackToCheckpoint(); + buf.rollbackToCheckpoint(); throw Exception{"Expression returns value " + applyVisitor(FieldVisitorToString(), value) + ", that is out of range of type " + type.getName() + ", at: " + - String(istr.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, istr.buffer().end() - istr.position())), + String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position())), ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE}; } - istr.position() = const_cast(token_iterator->begin); + buf.position() = const_cast(token_iterator->begin); if (format_settings.values.deduce_templates_of_expressions && generate_template) { @@ -228,9 +234,9 @@ ValuesBlockInputStream::parseExpression(IColumn & column, size_t column_idx, boo try { templates[column_idx] = ConstantExpressionTemplate(type, TokenIterator(tokens), token_iterator, ast, *context); - istr.rollbackToCheckpoint(); - templates[column_idx].value().parseExpression(istr, format_settings); - assertDelimAfterValue(column_idx); + buf.rollbackToCheckpoint(); + templates[column_idx].value().parseExpression(buf, format_settings); + assertDelimiterAfterValue(column_idx); return; } catch (...) @@ -239,25 +245,25 @@ ValuesBlockInputStream::parseExpression(IColumn & column, size_t column_idx, boo throw; /// Continue parsing without template templates[column_idx].reset(); - istr.position() = const_cast(token_iterator->begin); + buf.position() = const_cast(token_iterator->begin); } } - assertDelimAfterValue(column_idx); + assertDelimiterAfterValue(column_idx); column.insert(value); } -void ValuesBlockInputStream::assertDelimAfterValue(size_t column_idx) +void ValuesBlockInputFormat::assertDelimiterAfterValue(size_t column_idx) { - skipWhitespaceIfAny(istr); + skipWhitespaceIfAny(buf); if (column_idx + 1 != num_columns) - assertChar(',', istr); + assertChar(',', buf); else - assertChar(')', istr); + assertChar(')', buf); } -bool ValuesBlockInputStream::shouldGenerateNewTemplate(size_t column_idx) +bool ValuesBlockInputFormat::shouldGenerateNewTemplate(size_t column_idx) { // TODO better heuristic constexpr size_t max_attempts = 3; @@ -275,18 +281,16 @@ bool ValuesBlockInputStream::shouldGenerateNewTemplate(size_t column_idx) } -void registerInputFormatValues(FormatFactory & factory) +void registerInputFormatProcessorValues(FormatFactory & factory) { - factory.registerInputFormat("Values", []( + factory.registerInputFormatProcessor("Values", []( ReadBuffer & buf, - const Block & sample, + const Block & header, const Context & context, - UInt64 max_block_size, - UInt64 rows_portion_size, - FormatFactory::ReadCallback, + const RowInputFormatParams & params, const FormatSettings & settings) { - return std::make_shared(buf, sample, context, settings, max_block_size, rows_portion_size); + return std::make_shared(buf, header, params, context, settings); }); } diff --git a/dbms/src/Formats/ValuesBlockInputStream.h b/dbms/src/Processors/Formats/Impl/ValuesBlockInputFormat.h similarity index 63% rename from dbms/src/Formats/ValuesBlockInputStream.h rename to dbms/src/Processors/Formats/Impl/ValuesBlockInputFormat.h index 1d1ac03031e..3cb6c65b33f 100644 --- a/dbms/src/Formats/ValuesBlockInputStream.h +++ b/dbms/src/Processors/Formats/Impl/ValuesBlockInputFormat.h @@ -1,9 +1,10 @@ #pragma once #include -#include +#include +#include #include -#include +#include #include #include @@ -17,41 +18,41 @@ class ReadBuffer; /** Stream to read data in VALUES format (as in INSERT query). */ -class ValuesBlockInputStream : public IBlockInputStream +class ValuesBlockInputFormat : public IInputFormat { public: /** Data is parsed using fast, streaming parser. * If interpret_expressions is true, it will, in addition, try to use SQL parser and interpreter * in case when streaming parser could not parse field (this is very slow). */ - ValuesBlockInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, const FormatSettings & format_settings_, UInt64 max_block_size_, UInt64 rows_portion_size_); + ValuesBlockInputFormat(ReadBuffer & in_, const Block & header_, const RowInputFormatParams & params_, + const Context & context_, const FormatSettings & format_settings_); - String getName() const override { return "ValuesBlockOutputStream"; } - Block getHeader() const override { return header; } - - - void readPrefix() override { } - void readSuffix() override { } + String getName() const override { return "ValuesBlockInputFormat"; } private: typedef std::vector> ConstantExpressionTemplates; - Block readImpl() override; + Chunk generate() override; bool parseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx); void readValue(IColumn & column, size_t column_idx, bool generate_template); void parseExpression(IColumn & column, size_t column_idx, bool generate_template); - inline void assertDelimAfterValue(size_t column_idx); + inline void assertDelimiterAfterValue(size_t column_idx); bool shouldGenerateNewTemplate(size_t column_idx); + void readSuffix() { buf.assertCanBeDestructed(); } + private: - PeekableReadBuffer istr; - Block header; + PeekableReadBuffer buf; + + RowInputFormatParams params; + Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; + std::unique_ptr context; /// pimpl const FormatSettings format_settings; - UInt64 max_block_size; - UInt64 rows_portion_size; + size_t num_columns; size_t total_rows = 0; diff --git a/dbms/src/Processors/Formats/Impl/ValuesRowInputFormat.cpp b/dbms/src/Processors/Formats/Impl/ValuesRowInputFormat.cpp index a0340fe4e25..df8473d122b 100644 --- a/dbms/src/Processors/Formats/Impl/ValuesRowInputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/ValuesRowInputFormat.cpp @@ -149,9 +149,9 @@ bool ValuesRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) } -void registerInputFormatProcessorValues(FormatFactory & factory) +void _registerInputFormatProcessorValues(FormatFactory & factory) { - factory.registerInputFormatProcessor("Values", []( + factory.registerInputFormatProcessor("_Values", []( ReadBuffer & buf, const Block & sample, const Context & context,