Filter out some insert-settings

This commit is contained in:
Ivan Lezhankin 2021-06-17 17:59:28 +03:00
parent 37365589ed
commit 14c5eff6dd
3 changed files with 82 additions and 3 deletions

View File

@ -0,0 +1,56 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/copyData.h>
namespace DB
{
class IBlockOutputStream;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
/** An empty stream of blocks.
* But at the first read attempt, copies the data from the passed `input` to the `output`.
* This is necessary to execute the query INSERT SELECT - the query copies data, but returns nothing.
* The query could be executed without wrapping it in an empty BlockInputStream,
* but the progress of query execution and the ability to cancel the query would not work.
*/
class NullAndDoCopyBlockInputStream : public IBlockInputStream
{
public:
NullAndDoCopyBlockInputStream(const BlockInputStreamPtr & input_, BlockOutputStreamPtr output_)
: input(std::move(input_))
, output(std::move(output_))
{
children.push_back(input);
}
/// Suppress readPrefix and readSuffix, because they are called by copyData.
void readPrefix() override {}
void readSuffix() override {}
String getName() const override { return "NullAndDoCopy"; }
Block getHeader() const override { return {}; }
Block getTotals() override { return {}; }
Block getExtremes() override { return {}; }
protected:
Block readImpl() override
{
/// We do not use cancel flag here.
/// If query was cancelled, it will be processed by child streams.
/// Part of the data will be processed.
copyData(*input, *output);
return Block();
}
private:
BlockInputStreamPtr input;
BlockOutputStreamPtr output;
};
}

View File

@ -441,7 +441,28 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto * insert_query = ast->as<ASTInsertQuery>();
if (insert_query && insert_query->settings_ast)
{
/// FIXME: it's not the best solution, should be implemented in better way.
if (insert_query->expectsNativeData())
{
auto * set_query = insert_query->settings_ast->as<ASTSetQuery>();
assert(set_query);
set_query->changes.erase(
std::remove_if(
set_query->changes.begin(),
set_query->changes.end(),
[](const SettingChange & change)
{
return change.name == "format_template_row" || change.name == "format_template_rows_between_delimiter"
|| change.name == "format_template_resultset";
}),
set_query->changes.end());
}
InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();
}
if (insert_query)
{
@ -579,7 +600,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto * queue = context->getAsynchronousInsertQueue();
const bool async_insert
= queue && insert_query && !insert_query->select && (insert_query->data || insert_query->tail) && settings.async_insert_mode;
= queue && insert_query && !insert_query->select && !insert_query->expectsNativeData() && settings.async_insert_mode;
if (async_insert && queue->push(insert_query, settings))
{

View File

@ -8,8 +8,7 @@ namespace DB
class ReadBuffer;
/** INSERT query
*/
/// Insert Query
class ASTInsertQuery : public IAST
{
public:
@ -24,12 +23,15 @@ public:
ASTPtr watch;
/// Data to insert
/// FIXME: maybe merge this with 'tail' buffer and make unified 'data' buffer for non-Native data?
const char * data = nullptr;
const char * end = nullptr;
/// Query may have additional data if buffer is not nullptr
ReadBuffer * tail;
bool expectsNativeData() const { return !data && !tail; }
/// Try to find table function input() in SELECT part
void tryFindInputFunction(ASTPtr & input_function) const;