diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index c5e43449557..5b204751ba0 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -197,7 +197,7 @@ private: std::unique_ptr pager_cmd; /// The user can specify to redirect query output to a file. - std::optional out_file_buf; + std::unique_ptr out_file_buf; BlockOutputStreamPtr block_out_stream; /// The user could specify special file for server logs (stderr by default) @@ -2238,8 +2238,11 @@ private: const auto & out_file_node = query_with_output->out_file->as(); const auto & out_file = out_file_node.value.safeGet(); - out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); - out_buf = &*out_file_buf; + out_file_buf = wrapWriteBufferWithCompressionMethod( + std::make_unique(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT), + chooseCompressionMethod(out_file, ""), + /* compression level = */ 3 + ); // We are writing to file, so default format is the same as in non-interactive mode. if (is_interactive && is_default_format) @@ -2259,9 +2262,9 @@ private: /// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly. if (!need_render_progress) - block_out_stream = context->getOutputStreamParallelIfPossible(current_format, *out_buf, block); + block_out_stream = context->getOutputStreamParallelIfPossible(current_format, out_file_buf ? *out_file_buf : *out_buf, block); else - block_out_stream = context->getOutputStream(current_format, *out_buf, block); + block_out_stream = context->getOutputStream(current_format, out_file_buf ? *out_file_buf : *out_buf, block); block_out_stream->writePrefix(); } diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index e02ba6ebad7..a6e50f54a9e 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -1011,22 +1011,31 @@ void executeQuery( const auto * ast_query_with_output = dynamic_cast(ast.get()); WriteBuffer * out_buf = &ostr; - std::optional out_file_buf; + std::unique_ptr compressed_buffer; if (ast_query_with_output && ast_query_with_output->out_file) { if (!allow_into_outfile) throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED); const auto & out_file = ast_query_with_output->out_file->as().value.safeGet(); - out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); - out_buf = &*out_file_buf; + compressed_buffer = wrapWriteBufferWithCompressionMethod( + std::make_unique(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT), + chooseCompressionMethod(out_file, ""), + /* compression level = */ 3 + ); } String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) ? getIdentifierName(ast_query_with_output->format) : context->getDefaultFormat(); - auto out = FormatFactory::instance().getOutputStreamParallelIfPossible(format_name, *out_buf, streams.in->getHeader(), context, {}, output_format_settings); + auto out = FormatFactory::instance().getOutputStreamParallelIfPossible( + format_name, + compressed_buffer ? *compressed_buffer : *out_buf, + streams.in->getHeader(), + context, + {}, + output_format_settings); /// Save previous progress callback if any. TODO Do it more conveniently. auto previous_progress_callback = context->getProgressCallback(); @@ -1050,15 +1059,18 @@ void executeQuery( const ASTQueryWithOutput * ast_query_with_output = dynamic_cast(ast.get()); WriteBuffer * out_buf = &ostr; - std::optional out_file_buf; + std::unique_ptr compressed_buffer; if (ast_query_with_output && ast_query_with_output->out_file) { if (!allow_into_outfile) throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED); const auto & out_file = typeid_cast(*ast_query_with_output->out_file).value.safeGet(); - out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); - out_buf = &*out_file_buf; + compressed_buffer = wrapWriteBufferWithCompressionMethod( + std::make_unique(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT), + chooseCompressionMethod(out_file, ""), + /* compression level = */ 3 + ); } String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) @@ -1072,7 +1084,14 @@ void executeQuery( return std::make_shared(header); }); - auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name, *out_buf, pipeline.getHeader(), context, {}, output_format_settings); + auto out = FormatFactory::instance().getOutputFormatParallelIfPossible( + format_name, + compressed_buffer ? *compressed_buffer : *out_buf, + pipeline.getHeader(), + context, + {}, + output_format_settings); + out->setAutoFlush(); /// Save previous progress callback if any. TODO Do it more conveniently. diff --git a/tests/queries/0_stateless/02001_compress_output_file.reference b/tests/queries/0_stateless/02001_compress_output_file.reference new file mode 100644 index 00000000000..6f51dfc24e1 --- /dev/null +++ b/tests/queries/0_stateless/02001_compress_output_file.reference @@ -0,0 +1,2 @@ +Hello, World! From client. +Hello, World! From local. diff --git a/tests/queries/0_stateless/02001_compress_output_file.sh b/tests/queries/0_stateless/02001_compress_output_file.sh new file mode 100755 index 00000000000..11df227cc14 --- /dev/null +++ b/tests/queries/0_stateless/02001_compress_output_file.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +set -e + +[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz + +${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz'" +gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz +cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client + +rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client" + +[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz + +${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz'" +gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz +cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local + +rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local"