diff --git a/dbms/src/Formats/ConstantExpressionTemplate.cpp b/dbms/src/Formats/ConstantExpressionTemplate.cpp index a6b6f999681..c2cae3429ac 100644 --- a/dbms/src/Formats/ConstantExpressionTemplate.cpp +++ b/dbms/src/Formats/ConstantExpressionTemplate.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include @@ -198,7 +198,7 @@ void ConstantExpressionTemplate::parseExpression(ReadBuffer & istr, const Format type_info = WhichDataType(dynamic_cast(type).getNestedType()); Tokens tokens_number(istr.position(), istr.buffer().end()); - TokenIterator iterator(tokens_number); + IParser::Pos iterator(tokens_number); Expected expected; ASTPtr ast; if (nullable && parser_null.parse(iterator, ast, expected)) diff --git a/dbms/src/Formats/ValuesBlockInputStream.cpp b/dbms/src/Formats/ValuesBlockInputStream.cpp index e16bf619060..253609eca03 100644 --- a/dbms/src/Formats/ValuesBlockInputStream.cpp +++ b/dbms/src/Formats/ValuesBlockInputStream.cpp @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include #include #include @@ -33,8 +33,8 @@ namespace ErrorCodes ValuesBlockInputStream::ValuesBlockInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, - const FormatSettings & format_settings, UInt64 max_block_size_, UInt64 rows_portion_size_) - : istr(istr_), header(header_), context(std::make_unique(context_)), format_settings(format_settings), + const FormatSettings & format_settings_, UInt64 max_block_size_, UInt64 rows_portion_size_) + : istr(istr_), header(header_), context(std::make_unique(context_)), format_settings(format_settings_), max_block_size(max_block_size_), rows_portion_size(rows_portion_size_), num_columns(header.columns()), attempts_to_generate_template(num_columns), rows_parsed_using_template(num_columns) { @@ -48,8 +48,14 @@ Block ValuesBlockInputStream::readImpl() { MutableColumns columns = header.cloneEmptyColumns(); - for (size_t rows_in_block = 0; rows_in_block < max_block_size; ++rows_in_block) + for (size_t rows_in_block = 0, batch = 0; rows_in_block < max_block_size; ++rows_in_block, ++batch) { + if (rows_portion_size && batch == rows_portion_size) + { + batch = 0; + if (!checkTimeLimit() || isCancelled()) + break; + } try { skipWhitespaceIfAny(istr); @@ -187,7 +193,7 @@ ValuesBlockInputStream::parseExpression(IColumn & column, size_t column_idx, boo // TODO make tokenizer to work with buffers, not only with continuous memory Tokens tokens(istr.position(), istr.buffer().end()); - TokenIterator token_iterator(tokens); + IParser::Pos token_iterator(tokens); ASTPtr ast; if (!parser.parse(token_iterator, ast, expected)) @@ -276,9 +282,11 @@ void registerInputFormatValues(FormatFactory & factory) const Block & sample, const Context & context, UInt64 max_block_size, + UInt64 rows_portion_size, + FormatFactory::ReadCallback, const FormatSettings & settings) { - return std::make_shared(buf, sample, context, settings, max_block_size); + return std::make_shared(buf, sample, context, settings, max_block_size, rows_portion_size); }); } diff --git a/dbms/src/Formats/ValuesBlockInputStream.h b/dbms/src/Formats/ValuesBlockInputStream.h index 7f6a4b06a3f..1d1ac03031e 100644 --- a/dbms/src/Formats/ValuesBlockInputStream.h +++ b/dbms/src/Formats/ValuesBlockInputStream.h @@ -6,6 +6,7 @@ #include #include +#include namespace DB { @@ -23,7 +24,7 @@ public: * If interpret_expressions is true, it will, in addition, try to use SQL parser and interpreter * in case when streaming parser could not parse field (this is very slow). */ - ValuesBlockInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, const FormatSettings & format_settings, UInt64 max_block_size_); + ValuesBlockInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, const FormatSettings & format_settings_, UInt64 max_block_size_, UInt64 rows_portion_size_); String getName() const override { return "ValuesBlockOutputStream"; } Block getHeader() const override { return header; } @@ -50,6 +51,7 @@ private: std::unique_ptr context; /// pimpl const FormatSettings format_settings; UInt64 max_block_size; + UInt64 rows_portion_size; size_t num_columns; size_t total_rows = 0; diff --git a/dbms/src/Processors/Formats/IRowInputFormat.h b/dbms/src/Processors/Formats/IRowInputFormat.h index 72a6c813701..5676819bcc2 100644 --- a/dbms/src/Processors/Formats/IRowInputFormat.h +++ b/dbms/src/Processors/Formats/IRowInputFormat.h @@ -36,6 +36,8 @@ struct RowInputFormatParams OverflowMode timeout_overflow_mode = OverflowMode::THROW; }; +bool isParseError(int code); + ///Row oriented input format: reads data row by row. class IRowInputFormat : public IInputFormat {