ClickHouse/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp

62 lines
2.3 KiB
C++
Raw Normal View History

2018-01-10 00:04:08 +00:00
#include <Parsers/ASTInsertQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSetQuery.h>
2018-01-10 00:04:08 +00:00
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
2018-01-10 00:04:08 +00:00
#include <DataStreams/BlockIO.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h>
2018-07-10 17:20:55 +00:00
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Storages/ColumnsDescription.h>
2017-01-03 01:42:17 +00:00
2016-12-09 10:10:12 +00:00
namespace DB
{
2018-01-10 00:04:08 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
2016-12-09 10:10:12 +00:00
InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
2019-03-29 14:50:48 +00:00
const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context)
2016-12-09 10:10:12 +00:00
{
2019-03-11 13:22:51 +00:00
const auto * ast_insert_query = ast->as<ASTInsertQuery>();
2016-12-09 10:10:12 +00:00
if (!ast_insert_query)
throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
2016-12-09 10:10:12 +00:00
String format = ast_insert_query->format;
if (format.empty())
format = "Values";
2016-12-09 10:10:12 +00:00
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.
2016-12-09 10:10:12 +00:00
input_buffer_ast_part = std::make_unique<ReadBufferFromMemory>(
ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0);
2016-12-09 10:10:12 +00:00
ConcatReadBuffer::ReadBuffers buffers;
if (ast_insert_query->data)
buffers.push_back(input_buffer_ast_part.get());
2019-02-08 13:24:24 +00:00
if (input_buffer_tail_part)
buffers.push_back(input_buffer_tail_part);
2016-12-09 10:10:12 +00:00
/** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
* - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'.
*/
2016-12-09 10:10:12 +00:00
input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
2016-12-09 10:10:12 +00:00
2019-02-08 13:24:24 +00:00
res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size);
2018-07-13 12:46:29 +00:00
auto columns_description = ColumnsDescription::loadFromContext(context, ast_insert_query->database, ast_insert_query->table);
2019-03-15 17:40:16 +00:00
if (columns_description)
{
auto column_defaults = columns_description->getDefaults();
if (!column_defaults.empty())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, column_defaults, context);
}
2016-12-09 10:10:12 +00:00
}
}