diff --git a/src/DataStreams/NullAndDoCopyBlockInputStream.h b/src/DataStreams/NullAndDoCopyBlockInputStream.h new file mode 100644 index 00000000000..8bfb3538f3a --- /dev/null +++ b/src/DataStreams/NullAndDoCopyBlockInputStream.h @@ -0,0 +1,56 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +class IBlockOutputStream; +using BlockOutputStreamPtr = std::shared_ptr; + + +/** An empty stream of blocks. + * But at the first read attempt, copies the data from the passed `input` to the `output`. + * This is necessary to execute the query INSERT SELECT - the query copies data, but returns nothing. + * The query could be executed without wrapping it in an empty BlockInputStream, + * but the progress of query execution and the ability to cancel the query would not work. + */ +class NullAndDoCopyBlockInputStream : public IBlockInputStream +{ +public: + NullAndDoCopyBlockInputStream(const BlockInputStreamPtr & input_, BlockOutputStreamPtr output_) + : input(std::move(input_)) + , output(std::move(output_)) + { + children.push_back(input); + } + + /// Suppress readPrefix and readSuffix, because they are called by copyData. + void readPrefix() override {} + void readSuffix() override {} + + String getName() const override { return "NullAndDoCopy"; } + + Block getHeader() const override { return {}; } + Block getTotals() override { return {}; } + Block getExtremes() override { return {}; } + +protected: + Block readImpl() override + { + /// We do not use cancel flag here. + /// If query was cancelled, it will be processed by child streams. + /// Part of the data will be processed. + + copyData(*input, *output); + return Block(); + } + +private: + BlockInputStreamPtr input; + BlockOutputStreamPtr output; +}; + +} diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 4a533a6937e..243456bbb40 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -441,7 +441,28 @@ static std::tuple executeQueryImpl( auto * insert_query = ast->as(); if (insert_query && insert_query->settings_ast) + { + /// FIXME: it's not the best solution, should be implemented in better way. + if (insert_query->expectsNativeData()) + { + auto * set_query = insert_query->settings_ast->as(); + + assert(set_query); + + set_query->changes.erase( + std::remove_if( + set_query->changes.begin(), + set_query->changes.end(), + [](const SettingChange & change) + { + return change.name == "format_template_row" || change.name == "format_template_rows_between_delimiter" + || change.name == "format_template_resultset"; + }), + set_query->changes.end()); + } + InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext(); + } if (insert_query) { @@ -579,7 +600,7 @@ static std::tuple executeQueryImpl( auto * queue = context->getAsynchronousInsertQueue(); const bool async_insert - = queue && insert_query && !insert_query->select && (insert_query->data || insert_query->tail) && settings.async_insert_mode; + = queue && insert_query && !insert_query->select && !insert_query->expectsNativeData() && settings.async_insert_mode; if (async_insert && queue->push(insert_query, settings)) { diff --git a/src/Parsers/ASTInsertQuery.h b/src/Parsers/ASTInsertQuery.h index 9a6a6053c69..61c401be492 100644 --- a/src/Parsers/ASTInsertQuery.h +++ b/src/Parsers/ASTInsertQuery.h @@ -8,8 +8,7 @@ namespace DB class ReadBuffer; -/** INSERT query - */ +/// Insert Query class ASTInsertQuery : public IAST { public: @@ -24,12 +23,15 @@ public: ASTPtr watch; /// Data to insert + /// FIXME: maybe merge this with 'tail' buffer and make unified 'data' buffer for non-Native data? const char * data = nullptr; const char * end = nullptr; /// Query may have additional data if buffer is not nullptr ReadBuffer * tail; + bool expectsNativeData() const { return !data && !tail; } + /// Try to find table function input() in SELECT part void tryFindInputFunction(ASTPtr & input_function) const;