Merge pull request #27655 from FArthur-cmd/import_from_infile_syntax

Import `FROM INFILE`
This commit is contained in:
tavplubix 2021-08-18 15:36:32 +03:00 committed by GitHub
commit 9b45b26dd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 106 additions and 17 deletions

View File

@ -2,6 +2,7 @@
#include "Common/MemoryTracker.h"
#include "Columns/ColumnsNumber.h"
#include "ConnectionParameters.h"
#include "IO/CompressionMethod.h"
#include "QueryFuzzer.h"
#include "Suggest.h"
#include "TestHint.h"
@ -1823,7 +1824,7 @@ private:
void processInsertQuery()
{
const auto parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof())))
if ((!parsed_insert_query.data && !parsed_insert_query.infile) && (is_interactive || (!stdin_is_a_tty && std_in.eof())))
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
connection->sendQuery(
@ -1894,7 +1895,24 @@ private:
if (!parsed_insert_query)
return;
if (parsed_insert_query->data)
if (parsed_insert_query->infile)
{
const auto & in_file_node = parsed_insert_query->infile->as<ASTLiteral &>();
const auto in_file = in_file_node.value.safeGet<std::string>();
auto in_buffer = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromFile>(in_file), chooseCompressionMethod(in_file, ""));
try
{
sendDataFrom(*in_buffer, sample, columns_description);
}
catch (Exception & e)
{
e.addMessage("data for INSERT was parsed from file");
throw;
}
}
else if (parsed_insert_query->data)
{
/// Send data contained in the query.
ReadBufferFromMemory data_in(parsed_insert_query->data, parsed_insert_query->end - parsed_insert_query->data);

View File

@ -1,6 +1,7 @@
#include <iomanip>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Common/quoteString.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
@ -48,11 +49,15 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
}
else
{
if (infile)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM INFILE " << (settings.hilite ? hilite_none : "") << infile->as<ASTLiteral &>().value.safeGet<std::string>();
}
if (!format.empty())
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FORMAT " << (settings.hilite ? hilite_none : "") << format;
}
else
else if (!infile)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " VALUES" << (settings.hilite ? hilite_none : "");
}

View File

@ -2,6 +2,7 @@
#include <Parsers/IAST.h>
#include <Interpreters/StorageID.h>
#include "Parsers/IAST_fwd.h"
namespace DB
{
@ -16,6 +17,7 @@ public:
ASTPtr columns;
String format;
ASTPtr select;
ASTPtr infile;
ASTPtr watch;
ASTPtr table_function;
ASTPtr settings_ast;

View File

@ -11,6 +11,7 @@
#include <Parsers/ParserSetQuery.h>
#include <Parsers/InsertQuerySettingsPushDownVisitor.h>
#include <Common/typeid_cast.h>
#include "Parsers/IAST_fwd.h"
namespace DB
@ -25,6 +26,7 @@ namespace ErrorCodes
bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_insert_into("INSERT INTO");
ParserKeyword s_from_infile("FROM INFILE");
ParserKeyword s_table("TABLE");
ParserKeyword s_function("FUNCTION");
ParserToken s_dot(TokenType::Dot);
@ -39,9 +41,11 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserIdentifier name_p;
ParserList columns_p(std::make_unique<ParserInsertElement>(), std::make_unique<ParserToken>(TokenType::Comma), false);
ParserFunction table_function_p{false};
ParserStringLiteral infile_name_p;
ASTPtr database;
ASTPtr table;
ASTPtr infile;
ASTPtr columns;
ASTPtr format;
ASTPtr select;
@ -86,8 +90,14 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
Pos before_values = pos;
/// VALUES or FORMAT or SELECT
if (s_values.ignore(pos, expected))
if (s_from_infile.ignore(pos, expected))
{
if (!infile_name_p.parse(pos, infile, expected))
return false;
}
/// VALUES or FROM INFILE or FORMAT or SELECT
if (!infile && s_values.ignore(pos, expected))
{
data = pos->begin;
}
@ -136,7 +146,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
if (format)
if (format && !infile)
{
Pos last_token = pos;
--last_token;
@ -167,6 +177,9 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
auto query = std::make_shared<ASTInsertQuery>();
node = query;
if (infile)
query->infile = infile;
if (table_function)
{
query->table_function = table_function;

View File

@ -4,6 +4,7 @@
#include <Formats/FormatFactory.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromFile.h>
#include <DataStreams/BlockIO.h>
#include <Processors/Transforms/getSourceFromFromASTInsertQuery.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
@ -11,6 +12,8 @@
#include <Storages/IStorage.h>
#include <Processors/Pipe.h>
#include <Processors/Formats/IInputFormat.h>
#include "IO/CompressionMethod.h"
#include "Parsers/ASTLiteral.h"
namespace DB
@ -20,6 +23,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_USAGE_OF_INPUT;
extern const int UNKNOWN_TYPE_OF_QUERY;
}
@ -35,6 +39,9 @@ Pipe getSourceFromFromASTInsertQuery(
if (!ast_insert_query)
throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
if (ast_insert_query->infile && context->getApplicationType() == Context::ApplicationType::SERVER)
throw Exception("Query has infile and was send directly to server", ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
String format = ast_insert_query->format;
if (format.empty())
{
@ -48,20 +55,33 @@ Pipe getSourceFromFromASTInsertQuery(
auto input_buffer_ast_part = std::make_unique<ReadBufferFromMemory>(
ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0);
ConcatReadBuffer::ReadBuffers buffers;
if (ast_insert_query->data)
buffers.push_back(input_buffer_ast_part.get());
std::unique_ptr<ReadBuffer> input_buffer;
if (input_buffer_tail_part)
buffers.push_back(input_buffer_tail_part);
if (ast_insert_query->infile)
{
/// Data can be from infile
const auto & in_file_node = ast_insert_query->infile->as<ASTLiteral &>();
const auto in_file = in_file_node.value.safeGet<std::string>();
/** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
* - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'.
*/
input_buffer = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromFile>(in_file), chooseCompressionMethod(in_file, ""));
}
else
{
ConcatReadBuffer::ReadBuffers buffers;
if (ast_insert_query->data)
buffers.push_back(input_buffer_ast_part.get());
auto input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
if (input_buffer_tail_part)
buffers.push_back(input_buffer_tail_part);
auto source = FormatFactory::instance().getInput(format, *input_buffer_contacenated, header, context, context->getSettings().max_insert_block_size);
/** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
* - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'.
*/
input_buffer = std::make_unique<ConcatReadBuffer>(buffers);
}
auto source = FormatFactory::instance().getInput(format, *input_buffer, header, context, context->getSettings().max_insert_block_size);
Pipe pipe(source);
if (context->getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function)
@ -79,7 +99,7 @@ Pipe getSourceFromFromASTInsertQuery(
}
source->addBuffer(std::move(input_buffer_ast_part));
source->addBuffer(std::move(input_buffer_contacenated));
source->addBuffer(std::move(input_buffer));
return pipe;
}

View File

@ -0,0 +1,3 @@
Hello
Hello
Correct URL

View File

@ -0,0 +1,28 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
[ -e "${CLICKHOUSE_TMP}"/test_infile.gz ] && rm "${CLICKHOUSE_TMP}"/test_infile.gz
[ -e "${CLICKHOUSE_TMP}"/test_infile ] && rm "${CLICKHOUSE_TMP}"/test_infile
echo "Hello" > "${CLICKHOUSE_TMP}"/test_infile
gzip "${CLICKHOUSE_TMP}"/test_infile
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test_infile;"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE test_infile (word String) ENGINE=Memory();"
${CLICKHOUSE_CLIENT} --query "INSERT INTO test_infile FROM INFILE '${CLICKHOUSE_TMP}/test_infile.gz' FORMAT CSV;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_infile;"
# if it not fails, select will print information
${CLICKHOUSE_LOCAL} --query "CREATE TABLE test_infile (word String) ENGINE=Memory(); INSERT INTO test_infile FROM INFILE '${CLICKHOUSE_TMP}/test_infile.gz' FORMAT CSV; SELECT * from test_infile;"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=DROP+TABLE" -d 'IF EXISTS test_infile_url'
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=CREATE" -d 'TABLE test_infile_url (x String) ENGINE = Memory'
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}" -d "INSERT INTO test_infile_url FROM INFILE '${CLICKHOUSE_TMP}/test_infile.gz' FORMAT CSV" 2>&1 | grep -q "UNKNOWN_TYPE_OF_QUERY" && echo "Correct URL" || echo 'Fail'
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}" -d 'SELECT x FROM test_infile_url'
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=DROP+TABLE" -d 'test_infile_url'