correct infile form local

This commit is contained in:
Artur 2021-08-17 12:01:31 +00:00
parent 4c1f06258f
commit e8e650b16b
3 changed files with 29 additions and 14 deletions

View File

@ -4,6 +4,7 @@
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <IO/ConcatReadBuffer.h> #include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h> #include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromFile.h>
#include <DataStreams/BlockIO.h> #include <DataStreams/BlockIO.h>
#include <Processors/Transforms/getSourceFromFromASTInsertQuery.h> #include <Processors/Transforms/getSourceFromFromASTInsertQuery.h>
#include <Processors/Transforms/AddingDefaultsTransform.h> #include <Processors/Transforms/AddingDefaultsTransform.h>
@ -11,6 +12,8 @@
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include "IO/CompressionMethod.h"
#include "Parsers/ASTLiteral.h"
namespace DB namespace DB
@ -36,7 +39,7 @@ Pipe getSourceFromFromASTInsertQuery(
if (!ast_insert_query) if (!ast_insert_query)
throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
if (ast_insert_query->infile) if (ast_insert_query->infile && context->getApplicationType() == Context::ApplicationType::SERVER)
throw Exception("Query has infile and was send directly to server", ErrorCodes::UNKNOWN_TYPE_OF_QUERY); throw Exception("Query has infile and was send directly to server", ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
String format = ast_insert_query->format; String format = ast_insert_query->format;
@ -52,20 +55,32 @@ Pipe getSourceFromFromASTInsertQuery(
auto input_buffer_ast_part = std::make_unique<ReadBufferFromMemory>( auto input_buffer_ast_part = std::make_unique<ReadBufferFromMemory>(
ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0); ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0);
ConcatReadBuffer::ReadBuffers buffers; std::unique_ptr<ReadBuffer> input_buffer;
if (ast_insert_query->data)
buffers.push_back(input_buffer_ast_part.get());
if (input_buffer_tail_part) if (ast_insert_query->infile)
buffers.push_back(input_buffer_tail_part); {
/// Data can be from infile
const auto & in_file_node = ast_insert_query->infile->as<ASTLiteral &>();
const auto in_file = in_file_node.value.safeGet<std::string>();
/** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'. input_buffer = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromFile>(in_file), chooseCompressionMethod(in_file, ""));
* - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'. } else
*/ {
ConcatReadBuffer::ReadBuffers buffers;
if (ast_insert_query->data)
buffers.push_back(input_buffer_ast_part.get());
auto input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers); if (input_buffer_tail_part)
buffers.push_back(input_buffer_tail_part);
auto source = FormatFactory::instance().getInput(format, *input_buffer_contacenated, header, context, context->getSettings().max_insert_block_size); /** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
* - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'.
*/
input_buffer = std::make_unique<ConcatReadBuffer>(buffers);
}
auto source = FormatFactory::instance().getInput(format, *input_buffer, header, context, context->getSettings().max_insert_block_size);
Pipe pipe(source); Pipe pipe(source);
if (context->getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function) if (context->getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function)
@ -83,7 +98,7 @@ Pipe getSourceFromFromASTInsertQuery(
} }
source->addBuffer(std::move(input_buffer_ast_part)); source->addBuffer(std::move(input_buffer_ast_part));
source->addBuffer(std::move(input_buffer_contacenated)); source->addBuffer(std::move(input_buffer));
return pipe; return pipe;
} }

View File

@ -1,3 +1,3 @@
Hello Hello
Correct Local Hello
Correct URL Correct URL

View File

@ -19,7 +19,7 @@ ${CLICKHOUSE_CLIENT} --query "INSERT INTO test_infile FROM INFILE '${CLICKHOUSE_
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_infile;" ${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_infile;"
# if it not fails, select will print information # if it not fails, select will print information
${CLICKHOUSE_LOCAL} --query "CREATE TABLE test_infile (word String) ENGINE=Memory(); INSERT INTO test_infile FROM INFILE '${CLICKHOUSE_TMP}/test_infile.gz' FORMAT CSV; SELECT * from test_infile;" 2>&1 | grep -q "UNKNOWN_TYPE_OF_QUERY" && echo "Correct Local" || echo 'Fail' ${CLICKHOUSE_LOCAL} --query "CREATE TABLE test_infile (word String) ENGINE=Memory(); INSERT INTO test_infile FROM INFILE '${CLICKHOUSE_TMP}/test_infile.gz' FORMAT CSV; SELECT * from test_infile;"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=DROP+TABLE" -d 'IF EXISTS test_infile_url' ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=DROP+TABLE" -d 'IF EXISTS test_infile_url'
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=CREATE" -d 'TABLE test_infile_url (x String) ENGINE = Memory' ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=CREATE" -d 'TABLE test_infile_url (x String) ENGINE = Memory'