Added some improvements after code review

This commit is contained in:
palasonicq 2019-05-30 23:12:44 +03:00
parent cf6f771fd2
commit ea86a758a1
9 changed files with 69 additions and 46 deletions

View File

@ -829,9 +829,17 @@ private:
connection->forceConnected();
/// INSERT query for which data transfer is needed (not an INSERT SELECT) is processed separately.
if (insert && (!insert->select || insert->input_function))
ASTPtr input_function;
if (insert && insert->select)
insert->tryFindInputFunction(insert->select, input_function);
/// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
if (insert && (!insert->select || input_function))
{
if (input_function && insert->format.empty())
throw Exception("FORMAT must be specified for function input()", ErrorCodes::LOGICAL_ERROR);
processInsertQuery();
}
else
processOrdinaryQuery();
}

View File

@ -188,7 +188,7 @@ void TCPHandler::runImpl()
/// Send structure of columns to client for function input()
query_context->setInputInitializer([this] (Context & context, const StoragePtr & input_storage)
{
if (&context != &*query_context)
if (&context != &query_context.value())
throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR);
state.need_receive_data_for_input = true;
@ -207,7 +207,7 @@ void TCPHandler::runImpl()
query_context->setInputBlocksReaderCallback([&global_settings, this] (Context & context) -> Block
{
if (&context != &*query_context)
if (&context != &query_context.value())
throw Exception("Unexpected context in InputBlocksReader", ErrorCodes::LOGICAL_ERROR);
size_t poll_interval;

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context)
const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context, const ASTPtr & input_function)
{
const auto * ast_insert_query = ast->as<ASTInsertQuery>();
@ -29,7 +29,11 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
String format = ast_insert_query->format;
if (format.empty())
{
if (input_function)
throw Exception("FORMAT must be specified for function input()", ErrorCodes::LOGICAL_ERROR);
format = "Values";
}
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.
@ -51,7 +55,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size);
if (context.getSettingsRef().input_format_defaults_for_omitted_fields && !ast_insert_query->input_function)
if (context.getSettingsRef().input_format_defaults_for_omitted_fields && !input_function)
{
StoragePtr storage = context.getTable(ast_insert_query->database, ast_insert_query->table);
auto column_defaults = storage->getColumns().getDefaults();

View File

@ -19,7 +19,11 @@ class Context;
class InputStreamFromASTInsertQuery : public IBlockInputStream
{
public:
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context);
InputStreamFromASTInsertQuery(const ASTPtr & ast,
ReadBuffer * input_buffer_tail_part,
const Block & header,
const Context & context,
const ASTPtr & input_function);
Block readImpl() override { return res_stream->read(); }
void readPrefixImpl() override { return res_stream->readPrefix(); }

View File

@ -147,7 +147,7 @@ BlockIO InterpreterInsertQuery::execute()
}
else if (query.data && !query.has_tail) /// can execute without additional data
{
res.in = std::make_shared<InputStreamFromASTInsertQuery>(query_ptr, nullptr, query_sample_block, context);
res.in = std::make_shared<InputStreamFromASTInsertQuery>(query_ptr, nullptr, query_sample_block, context, nullptr);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, res.out);
res.out = nullptr;
}

View File

@ -224,18 +224,22 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Load external tables if they were provided
context.initializeExternalTablesIfSet();
/// Prepare Input storage before executing interpreter.
auto * insert_query = ast->as<ASTInsertQuery>();
if (insert_query && insert_query->input_function)
if (insert_query && insert_query->select)
{
/// If we already got a buffer with data then initialize input stream.
/// Prepare Input storage before executing interpreter if we already got a buffer with data.
if (istr)
{
StoragePtr storage = context.executeTableFunction(insert_query->input_function);
auto * input_storage = dynamic_cast<StorageInput *>(storage.get());
BlockInputStreamPtr input_stream = std::make_shared<InputStreamFromASTInsertQuery>(ast, istr,
input_storage->getSampleBlock(), context);
input_storage->setInputStream(input_stream);
ASTPtr input_function;
insert_query->tryFindInputFunction(insert_query->select, input_function);
if (input_function)
{
StoragePtr storage = context.executeTableFunction(input_function);
auto & input_storage = dynamic_cast<StorageInput &>(*storage);
BlockInputStreamPtr input_stream = std::make_shared<InputStreamFromASTInsertQuery>(ast, istr,
input_storage.getSampleBlock(), context, input_function);
input_storage.setInputStream(input_stream);
}
}
}
else
@ -513,7 +517,7 @@ void executeQuery(
{
if (streams.out)
{
InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context);
InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr);
copyData(in, *streams.out);
}

View File

@ -1,10 +1,17 @@
#include <iomanip>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
frame.need_parens = false;
@ -50,4 +57,23 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
}
}
void ASTInsertQuery::tryFindInputFunction(const ASTPtr & ast, ASTPtr & input_function) const
{
if (!ast)
return;
for (const auto & child : ast->children)
tryFindInputFunction(child, input_function);
if (const auto * table_function = ast->as<ASTFunction>())
{
if (table_function->name == "input")
{
if (input_function)
throw Exception("You can use 'input()' function only once per request.", ErrorCodes::LOGICAL_ERROR);
input_function = ast;
}
}
}
}

View File

@ -17,7 +17,6 @@ public:
ASTPtr columns;
String format;
ASTPtr select;
ASTPtr input_function;
ASTPtr table_function;
ASTPtr settings_ast;
@ -31,6 +30,9 @@ public:
/// Query has additional data, which will be sent later
bool has_tail = false;
/// Try to find table function input() in SELECT part
void tryFindInputFunction(const ASTPtr & ast, ASTPtr & input_function) const;
/** Get the text that identifies this element. */
String getID(char delim) const override { return "InsertQuery" + (delim + database) + delim + table; }
@ -41,7 +43,6 @@ public:
if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); }
if (select) { res->select = select->clone(); res->children.push_back(res->select); }
if (input_function) { res->input_function = input_function->clone(); res->children.push_back(res->input_function); }
if (table_function) { res->table_function = table_function->clone(); res->children.push_back(res->table_function); }
if (settings_ast) { res->settings_ast = settings_ast->clone(); res->children.push_back(res->settings_ast); }

View File

@ -8,7 +8,6 @@
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/ParserInsertQuery.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/typeid_cast.h>
@ -20,24 +19,6 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
}
void tryFindInputFunction(const ASTPtr & ast, ASTPtr & input_function)
{
if (!ast)
return;
for (const auto & child : ast->children)
tryFindInputFunction(child, input_function);
if (const auto * table_function = ast->as<ASTFunction>())
{
if (table_function->name == "input")
{
if (input_function)
throw Exception("You can use 'input()' function only once per request.", ErrorCodes::SYNTAX_ERROR);
input_function = ast;
}
}
}
bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
@ -63,7 +44,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr select;
ASTPtr table_function;
ASTPtr settings_ast;
ASTPtr input_function;
/// Insertion data
const char * data = nullptr;
@ -118,12 +98,9 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserSelectWithUnionQuery select_p;
select_p.parse(pos, select, expected);
/// Check if we have INSERT SELECT FROM input().
tryFindInputFunction(select, input_function);
/// FORMAT section is required if we have input() in SELECT part
if (input_function)
if (!s_format.ignore(pos, expected) || !name_p.parse(pos, format, expected))
return false;
/// FORMAT section is expected if we have input() in SELECT part
if (s_format.ignore(pos, expected) && !name_p.parse(pos, format, expected))
return false;
}
else
{
@ -182,7 +159,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->columns = columns;
query->select = select;
query->input_function = input_function;
query->settings_ast = settings_ast;
query->data = data != end ? data : nullptr;
query->end = end;