From a7c43c3e2dccace1d22cedbb6153a5d8a80e15a5 Mon Sep 17 00:00:00 2001 From: Artur <613623@mail.ru> Date: Fri, 10 Sep 2021 13:59:22 +0000 Subject: [PATCH] add compression key-word and some tests --- programs/client/Client.cpp | 17 +++- src/Parsers/ASTInsertQuery.cpp | 2 + src/Parsers/ASTInsertQuery.h | 1 + src/Parsers/ParserInsertQuery.cpp | 47 ++++++++++- .../getSourceFromFromASTInsertQuery.cpp | 17 +++- .../02024_compression_in_query.reference | 10 +++ .../0_stateless/02024_compression_in_query.sh | 84 +++++++++++++++++++ 7 files changed, 172 insertions(+), 6 deletions(-) create mode 100644 tests/queries/0_stateless/02024_compression_in_query.reference create mode 100755 tests/queries/0_stateless/02024_compression_in_query.sh diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index e5b6df666db..9f06387799a 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -1913,16 +1913,30 @@ private: { /// If INSERT data must be sent. auto * parsed_insert_query = parsed_query->as(); + /// If query isn't parsed, no information can be gor from it. if (!parsed_insert_query) return; + /// If data is got from file (maybe compressed file) if (parsed_insert_query->infile) { + /// Get name of this file (path to file) const auto & in_file_node = parsed_insert_query->infile->as(); const auto in_file = in_file_node.value.safeGet(); - auto in_buffer = wrapReadBufferWithCompressionMethod(std::make_unique(in_file), chooseCompressionMethod(in_file, "")); + std::string compression_method; + /// Compression method can be specified in query + if (parsed_insert_query->compression) + { + const auto & compression_method_node = parsed_insert_query->compression->as(); + compression_method = compression_method_node.value.safeGet(); + } + /// Otherwise, it will be detected from file name automaticly (by chooseCompressionMethod) + /// Buffer for reading from file is created and wrapped with appropriate compression method + auto in_buffer = wrapReadBufferWithCompressionMethod(std::make_unique(in_file), chooseCompressionMethod(in_file, compression_method)); + + /// Now data is ready to be sent on server. try { sendDataFrom(*in_buffer, sample, columns_description); @@ -1933,6 +1947,7 @@ private: throw; } } + /// If query already has data to sent else if (parsed_insert_query->data) { /// Send data contained in the query. diff --git a/src/Parsers/ASTInsertQuery.cpp b/src/Parsers/ASTInsertQuery.cpp index 745585ae175..d167373a6b5 100644 --- a/src/Parsers/ASTInsertQuery.cpp +++ b/src/Parsers/ASTInsertQuery.cpp @@ -57,6 +57,8 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s if (infile) { settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM INFILE " << (settings.hilite ? hilite_none : "") << infile->as().value.safeGet(); + if (compression) + settings.ostr << (settings.hilite ? hilite_keyword : "") << " COMPRESSION " << (settings.hilite ? hilite_none : "") << compression->as().value.safeGet(); } if (!format.empty()) { diff --git a/src/Parsers/ASTInsertQuery.h b/src/Parsers/ASTInsertQuery.h index 6eab3e7acac..9d2e7aa9865 100644 --- a/src/Parsers/ASTInsertQuery.h +++ b/src/Parsers/ASTInsertQuery.h @@ -18,6 +18,7 @@ public: String format; ASTPtr select; ASTPtr infile; + ASTPtr compression; ASTPtr watch; ASTPtr table_function; ASTPtr partition_by; diff --git a/src/Parsers/ParserInsertQuery.cpp b/src/Parsers/ParserInsertQuery.cpp index 19457f027bf..1bb0a174231 100644 --- a/src/Parsers/ParserInsertQuery.cpp +++ b/src/Parsers/ParserInsertQuery.cpp @@ -25,8 +25,10 @@ namespace ErrorCodes bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { + /// Create parsers ParserKeyword s_insert_into("INSERT INTO"); ParserKeyword s_from_infile("FROM INFILE"); + ParserKeyword s_compression("COMPRESSION"); ParserKeyword s_table("TABLE"); ParserKeyword s_function("FUNCTION"); ParserToken s_dot(TokenType::Dot); @@ -45,6 +47,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserStringLiteral infile_name_p; ParserExpressionWithOptionalAlias exp_elem_p(false); + /// create ASTPtr variables (result of parsing will be put in them). + /// They will be used to initialize ASTInsertQuery's fields. ASTPtr database; ASTPtr table; ASTPtr infile; @@ -55,20 +59,28 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ASTPtr table_function; ASTPtr settings_ast; ASTPtr partition_by_expr; + ASTPtr compression; /// Insertion data const char * data = nullptr; + /// Check for key words `INSERT INTO`. If it isn't found, the query can't be parsed as insert query. if (!s_insert_into.ignore(pos, expected)) return false; + /// try to find 'TABLE' s_table.ignore(pos, expected); + /// Search for 'FUNCTION'. If this key word is in query, read fields for insertion into 'TABLE FUNCTION'. + /// Word table is optional for table functions. (for example, s3 table function) + /// Otherwise fill 'TABLE' fields. if (s_function.ignore(pos, expected)) { + /// Read function name if (!table_function_p.parse(pos, table_function, expected)) return false; + /// Support insertion values with partition by. if (s_partition_by.ignore(pos, expected)) { if (!exp_elem_p.parse(pos, partition_by_expr, expected)) @@ -77,9 +89,12 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) } else { + /// Read one word. It can be table or database name. if (!name_p.parse(pos, table, expected)) return false; + /// If there is a dot, previous name was database name, + /// so read table name after dot. if (s_dot.ignore(pos, expected)) { database = table; @@ -98,26 +113,41 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) return false; } - Pos before_values = pos; - + /// Check if file is a source of data. if (s_from_infile.ignore(pos, expected)) { + /// Read its name to process it later if (!infile_name_p.parse(pos, infile, expected)) return false; + + /// Check for 'COMPRESSION' parameter (optional) + if (s_compression.ignore(pos, expected)) + { + /// Read compression name. Create parser for this purpose. + ParserStringLiteral compression_p; + if (!compression_p.parse(pos, compression, expected)) + return false; + } } + Pos before_values = pos; + /// VALUES or FROM INFILE or FORMAT or SELECT if (!infile && s_values.ignore(pos, expected)) { + /// If VALUES is defined in query, everything except setting will be parsed as data data = pos->begin; } else if (s_format.ignore(pos, expected)) { + /// If FORMAT is defined, read format name if (!name_p.parse(pos, format, expected)) return false; } else if (s_select.ignore(pos, expected) || s_with.ignore(pos,expected)) { + /// If SELECT is defined, return to position before select and parse + /// rest of query as SELECT query. pos = before_values; ParserSelectWithUnionQuery select_p; select_p.parse(pos, select, expected); @@ -128,6 +158,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) } else if (s_watch.ignore(pos, expected)) { + /// If WATCH is defind, return to position before WATCH and parse + /// rest of query as WATCH query. pos = before_values; ParserWatchQuery watch_p; watch_p.parse(pos, watch, expected); @@ -138,11 +170,14 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) } else { + /// If all previous conditions were false, query is incorrect return false; } + /// Read SETTINGS if they are defined if (s_settings.ignore(pos, expected)) { + /// Settings are written like SET query, so parse them with ParserSetQuery ParserSetQuery parser_settings(true); if (!parser_settings.parse(pos, settings_ast, expected)) return false; @@ -155,13 +190,14 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) InsertQuerySettingsPushDownVisitor(visitor_data).visit(select); } - + /// In case of defined format, data follows it. if (format && !infile) { Pos last_token = pos; --last_token; data = last_token->end; + /// If format name is followed by ';' (end of query symbol) there is no data to insert. if (data < end && *data == ';') throw Exception("You have excessive ';' symbol before data for INSERT.\n" "Example:\n\n" @@ -184,11 +220,16 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ++data; } + /// Create query and fill its fields. auto query = std::make_shared(); node = query; if (infile) + { query->infile = infile; + if (compression) + query->compression = compression; + } if (table_function) { diff --git a/src/Processors/Transforms/getSourceFromFromASTInsertQuery.cpp b/src/Processors/Transforms/getSourceFromFromASTInsertQuery.cpp index 9e64bd954fa..1b09d3f7cd0 100644 --- a/src/Processors/Transforms/getSourceFromFromASTInsertQuery.cpp +++ b/src/Processors/Transforms/getSourceFromFromASTInsertQuery.cpp @@ -34,6 +34,7 @@ Pipe getSourceFromFromASTInsertQuery( ContextPtr context, const ASTPtr & input_function) { + /// get ast query const auto * ast_insert_query = ast->as(); if (!ast_insert_query) @@ -51,10 +52,10 @@ Pipe getSourceFromFromASTInsertQuery( } /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query. - auto input_buffer_ast_part = std::make_unique( ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0); + /// Input buffer will be defind by reading from file buffer or by ConcatReadBuffer (concatenation of data and tail) std::unique_ptr input_buffer; if (ast_insert_query->infile) @@ -63,10 +64,21 @@ Pipe getSourceFromFromASTInsertQuery( const auto & in_file_node = ast_insert_query->infile->as(); const auto in_file = in_file_node.value.safeGet(); - input_buffer = wrapReadBufferWithCompressionMethod(std::make_unique(in_file), chooseCompressionMethod(in_file, "")); + /// It can be compressed and compression method maybe specified in query + std::string compression_method; + if (ast_insert_query->compression) + { + const auto & compression_method_node = ast_insert_query->compression->as(); + compression_method = compression_method_node.value.safeGet(); + } + + /// Otherwise, it will be detected from file name automaticly (by chooseCompressionMethod) + /// Buffer for reading from file is created and wrapped with appropriate compression method + input_buffer = wrapReadBufferWithCompressionMethod(std::make_unique(in_file), chooseCompressionMethod(in_file, compression_method)); } else { + /// concatenation of data and tail ConcatReadBuffer::ReadBuffers buffers; if (ast_insert_query->data) buffers.push_back(input_buffer_ast_part.get()); @@ -81,6 +93,7 @@ Pipe getSourceFromFromASTInsertQuery( input_buffer = std::make_unique(buffers); } + /// Create a source from input buffer using format from query auto source = FormatFactory::instance().getInput(format, *input_buffer, header, context, context->getSettings().max_insert_block_size); Pipe pipe(source); diff --git a/tests/queries/0_stateless/02024_compression_in_query.reference b/tests/queries/0_stateless/02024_compression_in_query.reference new file mode 100644 index 00000000000..616c9e56a32 --- /dev/null +++ b/tests/queries/0_stateless/02024_compression_in_query.reference @@ -0,0 +1,10 @@ +Hello, World! From client first. +Hello, World! From client second. +Hello, World! From client first. +Hello, World! From client first. +Hello, World! From client second. +Hello, World! From local first. +Hello, World! From local second. +Hello, World! From local first. +Hello, World! From local first. +Hello, World! From local second. diff --git a/tests/queries/0_stateless/02024_compression_in_query.sh b/tests/queries/0_stateless/02024_compression_in_query.sh new file mode 100755 index 00000000000..86d6c428b7a --- /dev/null +++ b/tests/queries/0_stateless/02024_compression_in_query.sh @@ -0,0 +1,84 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +set -e + +#____________________CLIENT__________________ +# clear files from previous tests. +[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output.gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output.gz +[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output +[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz +[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz.gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz.gz +[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz_to_decomp ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz_to_decomp +[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_to_decomp ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_to_decomp + +# create files using compression method and without it to check that both queries work correct +${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client first.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' FORMAT TabSeparated;" +${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client second.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz' COMPRESSION 'GZ' FORMAT TabSeparated;" + +# check content of files +cp ${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp.gz +gunzip ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp.gz +cat ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp + +cp ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp.gz +gunzip ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp.gz +cat ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp + +# create table to check inserts +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test_compression_keyword;" +${CLICKHOUSE_CLIENT} --query "CREATE TABLE test_compression_keyword (text String) Engine=Memory;" + +# insert them +${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' FORMAT TabSeparated;" +${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' COMPRESSION 'gz' FORMAT TabSeparated;" +${CLICKHOUSE_CLIENT} --query "INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz' COMPRESSION 'gz' FORMAT TabSeparated;" + +# check result +${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_compression_keyword;" + +# delete all created elements +rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp" +rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz" +rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp" +rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz" +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test_compression_keyword;" + +#____________________LOCAL__________________ +# clear files from previous tests. +[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output.gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output.gz +[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output +[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz +[ -e "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz.gz ] && rm "${CLICKHOUSE_TMP}"/test_comp_for_input_and_output_without_gz.gz + +# create files using compression method and without it to check that both queries work correct +${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local first.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' FORMAT TabSeparated;" +${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local second.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz' COMPRESSION 'GZ' FORMAT TabSeparated;" + +# check content of files +cp ${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp.gz +gunzip ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp.gz +cat ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp + +cp ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp.gz +gunzip ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp.gz +cat ${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp + +# create table to check inserts +${CLICKHOUSE_LOCAL} --query " +DROP TABLE IF EXISTS test_compression_keyword; +CREATE TABLE test_compression_keyword (text String) Engine=Memory; +INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' FORMAT TabSeparated; +INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz' COMPRESSION 'gz' FORMAT TabSeparated; +INSERT INTO TABLE test_compression_keyword FROM INFILE '${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz' COMPRESSION 'gz' FORMAT TabSeparated; +SELECT * FROM test_compression_keyword; +" + +# delete all created elements +rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_to_decomp" +rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output.gz" +rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz_to_decomp" +rm -f "${CLICKHOUSE_TMP}/test_comp_for_input_and_output_without_gz" \ No newline at end of file