Fix HTTP insert

This commit is contained in:
alesapin 2019-02-08 16:24:24 +03:00
parent 0a4c99efe4
commit dc98b50f14
6 changed files with 23 additions and 21 deletions

View File

@ -17,7 +17,7 @@ namespace ErrorCodes
InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context)
const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context)
{
const ASTInsertQuery * ast_insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());
@ -36,7 +36,9 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
ConcatReadBuffer::ReadBuffers buffers;
if (ast_insert_query->data)
buffers.push_back(input_buffer_ast_part.get());
buffers.push_back(&input_buffer_tail_part);
if (input_buffer_tail_part)
buffers.push_back(input_buffer_tail_part);
/** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
* - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'.
@ -44,7 +46,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size);
auto columns_description = ColumnsDescription::loadFromContext(context, ast_insert_query->database, ast_insert_query->table);
if (columns_description && !columns_description->defaults.empty())

View File

@ -19,7 +19,7 @@ class Context;
class InputStreamFromASTInsertQuery : public IBlockInputStream
{
public:
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context);
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context);
Block readImpl() override { return res_stream->read(); }
void readPrefixImpl() override { return res_stream->readPrefix(); }

View File

@ -11,6 +11,7 @@
#include <DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/copyData.h>
#include <Parsers/ASTInsertQuery.h>
@ -109,11 +110,12 @@ BlockIO InterpreterInsertQuery::execute()
out = std::make_shared<SquashingBlockOutputStream>(
out, table->getSampleBlock(), context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
}
auto query_sample_block = getSampleBlock(query, table);
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, getSampleBlock(query, table), table->getSampleBlock(), table->getColumns().defaults, context);
out, query_sample_block, table->getSampleBlock(), table->getColumns().defaults, context);
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(context.getProcessListElement());
@ -143,17 +145,10 @@ BlockIO InterpreterInsertQuery::execute()
throw Exception("Cannot insert column " + name_type.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
}
}
else if (query.data)
else if (query.data && !query.has_tail) /// can execute without additional data
{
auto data_in = std::make_unique<ReadBufferFromMemory>(query.data, query.end - query.data);
std::string format = "Values";
if (!query.format.empty())
format = query.format;
res.in = context.getInputFormat(format, *data_in, table->getSampleBlock(), context.getSettingsRef().max_insert_block_size);
res.in = std::make_shared<OwningBlockInputStream<ReadBufferFromMemory>>(res.in, std::move(data_in));
res.in = std::make_shared<InputStreamFromASTInsertQuery>(query_ptr, nullptr, query_sample_block, context);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, res.out);
res.out = nullptr;
}

View File

@ -141,7 +141,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const char * end,
Context & context,
bool internal,
QueryProcessingStage::Enum stage)
QueryProcessingStage::Enum stage,
bool has_query_tail)
{
time_t current_time = time(nullptr);
@ -164,9 +165,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// TODO Parser should fail early when max_query_size limit is reached.
ast = parseQuery(parser, begin, end, "", max_query_size);
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());
auto * insert_query = dynamic_cast<ASTInsertQuery *>(ast.get());
if (insert_query && insert_query->data)
{
query_end = insert_query->data;
insert_query->has_tail = has_query_tail;
}
else
query_end = end;
}
@ -434,7 +438,7 @@ BlockIO executeQuery(
QueryProcessingStage::Enum stage)
{
BlockIO streams;
std::tie(std::ignore, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, internal, stage);
std::tie(std::ignore, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, internal, stage, false);
return streams;
}
@ -479,13 +483,13 @@ void executeQuery(
ASTPtr ast;
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete);
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, !istr.eof());
try
{
if (streams.out)
{
InputStreamFromASTInsertQuery in(ast, istr, streams, context);
InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context);
copyData(in, *streams.out);
}

View File

@ -26,6 +26,9 @@ public:
const char * data = nullptr;
const char * end = nullptr;
/// Query has additional data, which will be sent later
bool has_tail = false;
/** Get the text that identifies this element. */
String getID(char delim) const override { return "InsertQuery" + (delim + database) + delim + table; }

View File

@ -4,8 +4,6 @@
<create_query>CREATE TABLE IF NOT EXISTS whitespaces(value String) ENGINE = MergeTree() PARTITION BY tuple() ORDER BY tuple()</create_query>
<fill_query> INSERT INTO whitespaces SELECT value FROM (SELECT arrayStringConcat(groupArray(' ')) AS spaces, concat(spaces, toString(any(number)), spaces) AS value FROM numbers(100000000) GROUP BY pow(number, intHash32(number) % 4) % 12345678)</fill_query>
<fill_query> INSERT INTO whitespaces SELECT value FROM (SELECT arrayStringConcat(groupArray(' ')) AS spaces, concat(spaces, toString(any(number)), spaces) AS value FROM numbers(100000000) GROUP BY pow(number, intHash32(number) % 4) % 12345678)</fill_query>
<fill_query> INSERT INTO whitespaces SELECT value FROM (SELECT arrayStringConcat(groupArray(' ')) AS spaces, concat(spaces, toString(any(number)), spaces) AS value FROM numbers(100000000) GROUP BY pow(number, intHash32(number) % 4) % 12345678)</fill_query>
<stop_conditions>
<all_of>