mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 07:01:59 +00:00
Merge pull request #27134 from FArthur-cmd/compress_output_3473
add compression for INTO OUTFILE
This commit is contained in:
commit
71e5cfe3ca
@ -197,7 +197,7 @@ private:
|
||||
std::unique_ptr<ShellCommand> pager_cmd;
|
||||
|
||||
/// The user can specify to redirect query output to a file.
|
||||
std::optional<WriteBufferFromFile> out_file_buf;
|
||||
std::unique_ptr<WriteBuffer> out_file_buf;
|
||||
BlockOutputStreamPtr block_out_stream;
|
||||
|
||||
/// The user could specify special file for server logs (stderr by default)
|
||||
@ -2238,8 +2238,11 @@ private:
|
||||
const auto & out_file_node = query_with_output->out_file->as<ASTLiteral &>();
|
||||
const auto & out_file = out_file_node.value.safeGet<std::string>();
|
||||
|
||||
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
|
||||
out_buf = &*out_file_buf;
|
||||
out_file_buf = wrapWriteBufferWithCompressionMethod(
|
||||
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
|
||||
chooseCompressionMethod(out_file, ""),
|
||||
/* compression level = */ 3
|
||||
);
|
||||
|
||||
// We are writing to file, so default format is the same as in non-interactive mode.
|
||||
if (is_interactive && is_default_format)
|
||||
@ -2259,9 +2262,9 @@ private:
|
||||
|
||||
/// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly.
|
||||
if (!need_render_progress)
|
||||
block_out_stream = context->getOutputStreamParallelIfPossible(current_format, *out_buf, block);
|
||||
block_out_stream = context->getOutputStreamParallelIfPossible(current_format, out_file_buf ? *out_file_buf : *out_buf, block);
|
||||
else
|
||||
block_out_stream = context->getOutputStream(current_format, *out_buf, block);
|
||||
block_out_stream = context->getOutputStream(current_format, out_file_buf ? *out_file_buf : *out_buf, block);
|
||||
|
||||
block_out_stream->writePrefix();
|
||||
}
|
||||
|
@ -1011,22 +1011,31 @@ void executeQuery(
|
||||
const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
|
||||
|
||||
WriteBuffer * out_buf = &ostr;
|
||||
std::optional<WriteBufferFromFile> out_file_buf;
|
||||
std::unique_ptr<WriteBuffer> compressed_buffer;
|
||||
if (ast_query_with_output && ast_query_with_output->out_file)
|
||||
{
|
||||
if (!allow_into_outfile)
|
||||
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
|
||||
|
||||
const auto & out_file = ast_query_with_output->out_file->as<ASTLiteral &>().value.safeGet<std::string>();
|
||||
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
|
||||
out_buf = &*out_file_buf;
|
||||
compressed_buffer = wrapWriteBufferWithCompressionMethod(
|
||||
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
|
||||
chooseCompressionMethod(out_file, ""),
|
||||
/* compression level = */ 3
|
||||
);
|
||||
}
|
||||
|
||||
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
|
||||
? getIdentifierName(ast_query_with_output->format)
|
||||
: context->getDefaultFormat();
|
||||
|
||||
auto out = FormatFactory::instance().getOutputStreamParallelIfPossible(format_name, *out_buf, streams.in->getHeader(), context, {}, output_format_settings);
|
||||
auto out = FormatFactory::instance().getOutputStreamParallelIfPossible(
|
||||
format_name,
|
||||
compressed_buffer ? *compressed_buffer : *out_buf,
|
||||
streams.in->getHeader(),
|
||||
context,
|
||||
{},
|
||||
output_format_settings);
|
||||
|
||||
/// Save previous progress callback if any. TODO Do it more conveniently.
|
||||
auto previous_progress_callback = context->getProgressCallback();
|
||||
@ -1050,15 +1059,18 @@ void executeQuery(
|
||||
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
|
||||
|
||||
WriteBuffer * out_buf = &ostr;
|
||||
std::optional<WriteBufferFromFile> out_file_buf;
|
||||
std::unique_ptr<WriteBuffer> compressed_buffer;
|
||||
if (ast_query_with_output && ast_query_with_output->out_file)
|
||||
{
|
||||
if (!allow_into_outfile)
|
||||
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
|
||||
|
||||
const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>();
|
||||
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
|
||||
out_buf = &*out_file_buf;
|
||||
compressed_buffer = wrapWriteBufferWithCompressionMethod(
|
||||
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
|
||||
chooseCompressionMethod(out_file, ""),
|
||||
/* compression level = */ 3
|
||||
);
|
||||
}
|
||||
|
||||
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
|
||||
@ -1072,7 +1084,14 @@ void executeQuery(
|
||||
return std::make_shared<MaterializingTransform>(header);
|
||||
});
|
||||
|
||||
auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name, *out_buf, pipeline.getHeader(), context, {}, output_format_settings);
|
||||
auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(
|
||||
format_name,
|
||||
compressed_buffer ? *compressed_buffer : *out_buf,
|
||||
pipeline.getHeader(),
|
||||
context,
|
||||
{},
|
||||
output_format_settings);
|
||||
|
||||
out->setAutoFlush();
|
||||
|
||||
/// Save previous progress callback if any. TODO Do it more conveniently.
|
||||
|
@ -0,0 +1,2 @@
|
||||
Hello, World! From client.
|
||||
Hello, World! From local.
|
23
tests/queries/0_stateless/02001_compress_output_file.sh
Executable file
23
tests/queries/0_stateless/02001_compress_output_file.sh
Executable file
@ -0,0 +1,23 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
set -e
|
||||
|
||||
[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz'"
|
||||
gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz
|
||||
cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client
|
||||
|
||||
rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client"
|
||||
|
||||
[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz
|
||||
|
||||
${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz'"
|
||||
gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz
|
||||
cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local
|
||||
|
||||
rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local"
|
Loading…
Reference in New Issue
Block a user