From dc98b50f146e199ebf89bf290527c7f1e276b14c Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 8 Feb 2019 16:24:24 +0300 Subject: [PATCH] Fix HTTP insert --- .../DataStreams/InputStreamFromASTInsertQuery.cpp | 8 +++++--- .../DataStreams/InputStreamFromASTInsertQuery.h | 2 +- dbms/src/Interpreters/InterpreterInsertQuery.cpp | 15 +++++---------- dbms/src/Interpreters/executeQuery.cpp | 14 +++++++++----- dbms/src/Parsers/ASTInsertQuery.h | 3 +++ dbms/tests/performance/trim/trim_whitespace.xml | 2 -- 6 files changed, 23 insertions(+), 21 deletions(-) diff --git a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp index b78b7a59db6..5475b75b994 100644 --- a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp +++ b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp @@ -17,7 +17,7 @@ namespace ErrorCodes InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( - const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context) + const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context) { const ASTInsertQuery * ast_insert_query = dynamic_cast(ast.get()); @@ -36,7 +36,9 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( ConcatReadBuffer::ReadBuffers buffers; if (ast_insert_query->data) buffers.push_back(input_buffer_ast_part.get()); - buffers.push_back(&input_buffer_tail_part); + + if (input_buffer_tail_part) + buffers.push_back(input_buffer_tail_part); /** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'. * - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'. @@ -44,7 +46,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( input_buffer_contacenated = std::make_unique(buffers); - res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size); + res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size); auto columns_description = ColumnsDescription::loadFromContext(context, ast_insert_query->database, ast_insert_query->table); if (columns_description && !columns_description->defaults.empty()) diff --git a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h index 876a10e563d..3ecda33289e 100644 --- a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h +++ b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h @@ -19,7 +19,7 @@ class Context; class InputStreamFromASTInsertQuery : public IBlockInputStream { public: - InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context); + InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context); Block readImpl() override { return res_stream->read(); } void readPrefixImpl() override { return res_stream->readPrefix(); } diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index 4eef1720ef7..7f579b4f88e 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -109,11 +110,12 @@ BlockIO InterpreterInsertQuery::execute() out = std::make_shared( out, table->getSampleBlock(), context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes); } + auto query_sample_block = getSampleBlock(query, table); /// Actually we don't know structure of input blocks from query/table, /// because some clients break insertion protocol (columns != header) out = std::make_shared( - out, getSampleBlock(query, table), table->getSampleBlock(), table->getColumns().defaults, context); + out, query_sample_block, table->getSampleBlock(), table->getColumns().defaults, context); auto out_wrapper = std::make_shared(out); out_wrapper->setProcessListElement(context.getProcessListElement()); @@ -143,17 +145,10 @@ BlockIO InterpreterInsertQuery::execute() throw Exception("Cannot insert column " + name_type.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN); } } - else if (query.data) + else if (query.data && !query.has_tail) /// can execute without additional data { - auto data_in = std::make_unique(query.data, query.end - query.data); - std::string format = "Values"; - if (!query.format.empty()) - format = query.format; - - res.in = context.getInputFormat(format, *data_in, table->getSampleBlock(), context.getSettingsRef().max_insert_block_size); - res.in = std::make_shared>(res.in, std::move(data_in)); + res.in = std::make_shared(query_ptr, nullptr, query_sample_block, context); res.in = std::make_shared(res.in, res.out); - res.out = nullptr; } diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index cd59a77d9fe..d04b616efd5 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -141,7 +141,8 @@ static std::tuple executeQueryImpl( const char * end, Context & context, bool internal, - QueryProcessingStage::Enum stage) + QueryProcessingStage::Enum stage, + bool has_query_tail) { time_t current_time = time(nullptr); @@ -164,9 +165,12 @@ static std::tuple executeQueryImpl( /// TODO Parser should fail early when max_query_size limit is reached. ast = parseQuery(parser, begin, end, "", max_query_size); - const auto * insert_query = dynamic_cast(ast.get()); + auto * insert_query = dynamic_cast(ast.get()); if (insert_query && insert_query->data) + { query_end = insert_query->data; + insert_query->has_tail = has_query_tail; + } else query_end = end; } @@ -434,7 +438,7 @@ BlockIO executeQuery( QueryProcessingStage::Enum stage) { BlockIO streams; - std::tie(std::ignore, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, internal, stage); + std::tie(std::ignore, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, internal, stage, false); return streams; } @@ -479,13 +483,13 @@ void executeQuery( ASTPtr ast; BlockIO streams; - std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete); + std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, !istr.eof()); try { if (streams.out) { - InputStreamFromASTInsertQuery in(ast, istr, streams, context); + InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context); copyData(in, *streams.out); } diff --git a/dbms/src/Parsers/ASTInsertQuery.h b/dbms/src/Parsers/ASTInsertQuery.h index baf2a9fce8d..17c8214c1ba 100644 --- a/dbms/src/Parsers/ASTInsertQuery.h +++ b/dbms/src/Parsers/ASTInsertQuery.h @@ -26,6 +26,9 @@ public: const char * data = nullptr; const char * end = nullptr; + /// Query has additional data, which will be sent later + bool has_tail = false; + /** Get the text that identifies this element. */ String getID(char delim) const override { return "InsertQuery" + (delim + database) + delim + table; } diff --git a/dbms/tests/performance/trim/trim_whitespace.xml b/dbms/tests/performance/trim/trim_whitespace.xml index 41449318f85..9ef5cf92611 100644 --- a/dbms/tests/performance/trim/trim_whitespace.xml +++ b/dbms/tests/performance/trim/trim_whitespace.xml @@ -4,8 +4,6 @@ CREATE TABLE IF NOT EXISTS whitespaces(value String) ENGINE = MergeTree() PARTITION BY tuple() ORDER BY tuple() INSERT INTO whitespaces SELECT value FROM (SELECT arrayStringConcat(groupArray(' ')) AS spaces, concat(spaces, toString(any(number)), spaces) AS value FROM numbers(100000000) GROUP BY pow(number, intHash32(number) % 4) % 12345678) - INSERT INTO whitespaces SELECT value FROM (SELECT arrayStringConcat(groupArray(' ')) AS spaces, concat(spaces, toString(any(number)), spaces) AS value FROM numbers(100000000) GROUP BY pow(number, intHash32(number) % 4) % 12345678) - INSERT INTO whitespaces SELECT value FROM (SELECT arrayStringConcat(groupArray(' ')) AS spaces, concat(spaces, toString(any(number)), spaces) AS value FROM numbers(100000000) GROUP BY pow(number, intHash32(number) % 4) % 12345678)