diff --git a/src/DataStreams/NullAndDoCopyBlockInputStream.h b/src/DataStreams/NullAndDoCopyBlockInputStream.h deleted file mode 100644 index 8bfb3538f3a..00000000000 --- a/src/DataStreams/NullAndDoCopyBlockInputStream.h +++ /dev/null @@ -1,56 +0,0 @@ -#pragma once - -#include -#include - - -namespace DB -{ - -class IBlockOutputStream; -using BlockOutputStreamPtr = std::shared_ptr; - - -/** An empty stream of blocks. - * But at the first read attempt, copies the data from the passed `input` to the `output`. - * This is necessary to execute the query INSERT SELECT - the query copies data, but returns nothing. - * The query could be executed without wrapping it in an empty BlockInputStream, - * but the progress of query execution and the ability to cancel the query would not work. - */ -class NullAndDoCopyBlockInputStream : public IBlockInputStream -{ -public: - NullAndDoCopyBlockInputStream(const BlockInputStreamPtr & input_, BlockOutputStreamPtr output_) - : input(std::move(input_)) - , output(std::move(output_)) - { - children.push_back(input); - } - - /// Suppress readPrefix and readSuffix, because they are called by copyData. - void readPrefix() override {} - void readSuffix() override {} - - String getName() const override { return "NullAndDoCopy"; } - - Block getHeader() const override { return {}; } - Block getTotals() override { return {}; } - Block getExtremes() override { return {}; } - -protected: - Block readImpl() override - { - /// We do not use cancel flag here. - /// If query was cancelled, it will be processed by child streams. - /// Part of the data will be processed. - - copyData(*input, *output); - return Block(); - } - -private: - BlockInputStreamPtr input; - BlockOutputStreamPtr output; -}; - -} diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 243456bbb40..4a533a6937e 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -441,28 +441,7 @@ static std::tuple executeQueryImpl( auto * insert_query = ast->as(); if (insert_query && insert_query->settings_ast) - { - /// FIXME: it's not the best solution, should be implemented in better way. - if (insert_query->expectsNativeData()) - { - auto * set_query = insert_query->settings_ast->as(); - - assert(set_query); - - set_query->changes.erase( - std::remove_if( - set_query->changes.begin(), - set_query->changes.end(), - [](const SettingChange & change) - { - return change.name == "format_template_row" || change.name == "format_template_rows_between_delimiter" - || change.name == "format_template_resultset"; - }), - set_query->changes.end()); - } - InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext(); - } if (insert_query) { @@ -600,7 +579,7 @@ static std::tuple executeQueryImpl( auto * queue = context->getAsynchronousInsertQueue(); const bool async_insert - = queue && insert_query && !insert_query->select && !insert_query->expectsNativeData() && settings.async_insert_mode; + = queue && insert_query && !insert_query->select && (insert_query->data || insert_query->tail) && settings.async_insert_mode; if (async_insert && queue->push(insert_query, settings)) { diff --git a/src/Parsers/ASTInsertQuery.h b/src/Parsers/ASTInsertQuery.h index 61c401be492..9a6a6053c69 100644 --- a/src/Parsers/ASTInsertQuery.h +++ b/src/Parsers/ASTInsertQuery.h @@ -8,7 +8,8 @@ namespace DB class ReadBuffer; -/// Insert Query +/** INSERT query + */ class ASTInsertQuery : public IAST { public: @@ -23,15 +24,12 @@ public: ASTPtr watch; /// Data to insert - /// FIXME: maybe merge this with 'tail' buffer and make unified 'data' buffer for non-Native data? const char * data = nullptr; const char * end = nullptr; /// Query may have additional data if buffer is not nullptr ReadBuffer * tail; - bool expectsNativeData() const { return !data && !tail; } - /// Try to find table function input() in SELECT part void tryFindInputFunction(ASTPtr & input_function) const;