Merge pull request #2 from FArthur-cmd/compress_output_3473

add compression key-word and some tests
This commit is contained in:
Filatenkov Artur 2021-09-10 17:04:56 +03:00 committed by GitHub
commit a117804914
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 172 additions and 6 deletions

View File

@ -1913,16 +1913,30 @@ private:
{
/// If INSERT data must be sent.
auto * parsed_insert_query = parsed_query->as<ASTInsertQuery>();
/// If query isn't parsed, no information can be gor from it.
if (!parsed_insert_query)
return;
/// If data is got from file (maybe compressed file)
if (parsed_insert_query->infile)
{
/// Get name of this file (path to file)
const auto & in_file_node = parsed_insert_query->infile->as<ASTLiteral &>();
const auto in_file = in_file_node.value.safeGet<std::string>();
auto in_buffer = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromFile>(in_file), chooseCompressionMethod(in_file, ""));
std::string compression_method;
/// Compression method can be specified in query
if (parsed_insert_query->compression)
{
const auto & compression_method_node = parsed_insert_query->compression->as<ASTLiteral &>();
compression_method = compression_method_node.value.safeGet<std::string>();
}
/// Otherwise, it will be detected from file name automaticly (by chooseCompressionMethod)
/// Buffer for reading from file is created and wrapped with appropriate compression method
auto in_buffer = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromFile>(in_file), chooseCompressionMethod(in_file, compression_method));
/// Now data is ready to be sent on server.
try
{
sendDataFrom(*in_buffer, sample, columns_description);
@ -1933,6 +1947,7 @@ private:
throw;
}
}
/// If query already has data to sent
else if (parsed_insert_query->data)
{
/// Send data contained in the query.

View File

@ -57,6 +57,8 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
if (infile)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM INFILE " << (settings.hilite ? hilite_none : "") << infile->as<ASTLiteral &>().value.safeGet<std::string>();
if (compression)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " COMPRESSION " << (settings.hilite ? hilite_none : "") << compression->as<ASTLiteral &>().value.safeGet<std::string>();
}
if (!format.empty())
{

View File

@ -18,6 +18,7 @@ public:
String format;
ASTPtr select;
ASTPtr infile;
ASTPtr compression;
ASTPtr watch;
ASTPtr table_function;
ASTPtr partition_by;

View File

@ -25,8 +25,10 @@ namespace ErrorCodes
bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
/// Create parsers
ParserKeyword s_insert_into("INSERT INTO");
ParserKeyword s_from_infile("FROM INFILE");
ParserKeyword s_compression("COMPRESSION");
ParserKeyword s_table("TABLE");
ParserKeyword s_function("FUNCTION");
ParserToken s_dot(TokenType::Dot);
@ -45,6 +47,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserStringLiteral infile_name_p;
ParserExpressionWithOptionalAlias exp_elem_p(false);
/// create ASTPtr variables (result of parsing will be put in them).
/// They will be used to initialize ASTInsertQuery's fields.
ASTPtr database;
ASTPtr table;
ASTPtr infile;
@ -55,20 +59,28 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr table_function;
ASTPtr settings_ast;
ASTPtr partition_by_expr;
ASTPtr compression;
/// Insertion data
const char * data = nullptr;
/// Check for key words `INSERT INTO`. If it isn't found, the query can't be parsed as insert query.
if (!s_insert_into.ignore(pos, expected))
return false;
/// try to find 'TABLE'
s_table.ignore(pos, expected);
/// Search for 'FUNCTION'. If this key word is in query, read fields for insertion into 'TABLE FUNCTION'.
/// Word table is optional for table functions. (for example, s3 table function)
/// Otherwise fill 'TABLE' fields.
if (s_function.ignore(pos, expected))
{
/// Read function name
if (!table_function_p.parse(pos, table_function, expected))
return false;
/// Support insertion values with partition by.
if (s_partition_by.ignore(pos, expected))
{
if (!exp_elem_p.parse(pos, partition_by_expr, expected))
@ -77,9 +89,12 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
else
{
/// Read one word. It can be table or database name.
if (!name_p.parse(pos, table, expected))
return false;
/// If there is a dot, previous name was database name,
/// so read table name after dot.
if (s_dot.ignore(pos, expected))
{
database = table;
@ -98,26 +113,41 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
}
Pos before_values = pos;
/// Check if file is a source of data.
if (s_from_infile.ignore(pos, expected))
{
/// Read its name to process it later
if (!infile_name_p.parse(pos, infile, expected))
return false;
/// Check for 'COMPRESSION' parameter (optional)
if (s_compression.ignore(pos, expected))
{
/// Read compression name. Create parser for this purpose.
ParserStringLiteral compression_p;
if (!compression_p.parse(pos, compression, expected))
return false;
}
}
Pos before_values = pos;
/// VALUES or FROM INFILE or FORMAT or SELECT
if (!infile && s_values.ignore(pos, expected))
{
/// If VALUES is defined in query, everything except setting will be parsed as data
data = pos->begin;
}
else if (s_format.ignore(pos, expected))
{
/// If FORMAT is defined, read format name
if (!name_p.parse(pos, format, expected))
return false;
}
else if (s_select.ignore(pos, expected) || s_with.ignore(pos,expected))
{
/// If SELECT is defined, return to position before select and parse
/// rest of query as SELECT query.
pos = before_values;
ParserSelectWithUnionQuery select_p;
select_p.parse(pos, select, expected);
@ -128,6 +158,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
else if (s_watch.ignore(pos, expected))
{
/// If WATCH is defind, return to position before WATCH and parse
/// rest of query as WATCH query.
pos = before_values;
ParserWatchQuery watch_p;
watch_p.parse(pos, watch, expected);
@ -138,11 +170,14 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
else
{
/// If all previous conditions were false, query is incorrect
return false;
}
/// Read SETTINGS if they are defined
if (s_settings.ignore(pos, expected))
{
/// Settings are written like SET query, so parse them with ParserSetQuery
ParserSetQuery parser_settings(true);
if (!parser_settings.parse(pos, settings_ast, expected))
return false;
@ -155,13 +190,14 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
InsertQuerySettingsPushDownVisitor(visitor_data).visit(select);
}
/// In case of defined format, data follows it.
if (format && !infile)
{
Pos last_token = pos;
--last_token;
data = last_token->end;
/// If format name is followed by ';' (end of query symbol) there is no data to insert.
if (data < end && *data == ';')
throw Exception("You have excessive ';' symbol before data for INSERT.\n"
"Example:\n\n"
@ -184,11 +220,16 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
++data;
}
/// Create query and fill its fields.
auto query = std::make_shared<ASTInsertQuery>();
node = query;
if (infile)
{
query->infile = infile;
if (compression)
query->compression = compression;
}
if (table_function)
{

View File

@ -34,6 +34,7 @@ Pipe getSourceFromFromASTInsertQuery(
ContextPtr context,
const ASTPtr & input_function)
{
/// get ast query
const auto * ast_insert_query = ast->as<ASTInsertQuery>();
if (!ast_insert_query)
@ -51,10 +52,10 @@ Pipe getSourceFromFromASTInsertQuery(
}
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.
auto input_buffer_ast_part = std::make_unique<ReadBufferFromMemory>(
ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0);
/// Input buffer will be defind by reading from file buffer or by ConcatReadBuffer (concatenation of data and tail)
std::unique_ptr<ReadBuffer> input_buffer;
if (ast_insert_query->infile)
@ -63,10 +64,21 @@ Pipe getSourceFromFromASTInsertQuery(
const auto & in_file_node = ast_insert_query->infile->as<ASTLiteral &>();
const auto in_file = in_file_node.value.safeGet<std::string>();
input_buffer = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromFile>(in_file), chooseCompressionMethod(in_file, ""));
/// It can be compressed and compression method maybe specified in query
std::string compression_method;
if (ast_insert_query->compression)
{
const auto & compression_method_node = ast_insert_query->compression->as<ASTLiteral &>();
compression_method = compression_method_node.value.safeGet<std::string>();
}
/// Otherwise, it will be detected from file name automaticly (by chooseCompressionMethod)
/// Buffer for reading from file is created and wrapped with appropriate compression method
input_buffer = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromFile>(in_file), chooseCompressionMethod(in_file, compression_method));
}
else
{
/// concatenation of data and tail
ConcatReadBuffer::ReadBuffers buffers;
if (ast_insert_query->data)
buffers.push_back(input_buffer_ast_part.get());
@ -81,6 +93,7 @@ Pipe getSourceFromFromASTInsertQuery(
input_buffer = std::make_unique<ConcatReadBuffer>(buffers);
}
/// Create a source from input buffer using format from query
auto source = FormatFactory::instance().getInput(format, *input_buffer, header, context, context->getSettings().max_insert_block_size);
Pipe pipe(source);

View File

@ -0,0 +1,10 @@
Hello, World! From client first.
Hello, World! From client second.
Hello, World! From client first.
Hello, World! From client first.
Hello, World! From client second.
Hello, World! From local first.
Hello, World! From local second.
Hello, World! From local first.
Hello, World! From local first.
Hello, World! From local second.

View File

@ -0,0 +1,84 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
#____________________CLIENT__________________
# clear files from previous tests.
[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output.gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output.gz
[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output
[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz
[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz.gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz.gz
[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz_to_decomp ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz_to_decomp
[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_to_decomp ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_to_decomp
# create files using compression method and without it to check that both queries work correct
${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client first.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' FORMAT TabSeparated;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client second.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz' COMPRESSION 'GZ' FORMAT TabSeparated;"
# check content of files
cp ${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp.gz
gunzip ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp.gz
cat ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp
cp ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp.gz
gunzip ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp.gz
cat ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp
# create table to check inserts
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test_compression_keyword;"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE test_compression_keyword (text String) Engine=Memory;"
# insert them
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' FORMAT TabSeparated;"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' COMPRESSION 'gz' FORMAT TabSeparated;"
${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz' COMPRESSION 'gz' FORMAT TabSeparated;"
# check result
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_compression_keyword;"
# delete all created elements
rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp"
rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz"
rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp"
rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test_compression_keyword;"
#____________________LOCAL__________________
# clear files from previous tests.
[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output.gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output.gz
[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output
[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz
[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz.gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz.gz
# create files using compression method and without it to check that both queries work correct
${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local first.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' FORMAT TabSeparated;"
${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local second.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz' COMPRESSION 'GZ' FORMAT TabSeparated;"
# check content of files
cp ${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp.gz
gunzip ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp.gz
cat ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp
cp ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp.gz
gunzip ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp.gz
cat ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp
# create table to check inserts
${CLICKHOUSE_LOCAL} --query "
DROP TABLE IF EXISTS test_compression_keyword;
CREATE TABLE test_compression_keyword (text String) Engine=Memory;
INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' FORMAT TabSeparated;
INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' COMPRESSION 'gz' FORMAT TabSeparated;
INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz' COMPRESSION 'gz' FORMAT TabSeparated;
SELECT * FROM test_compression_keyword;
"
# delete all created elements
rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp"
rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz"
rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp"
rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz"