compress INTO OUTFILE with parameter compress

This commit is contained in:
Artur 2021-08-03 11:33:52 +00:00
parent 0b5e3ce195
commit c55ead0fdb
4 changed files with 69 additions and 13 deletions

View File

@ -1,4 +1,6 @@
#include <string>
#include "ConnectionParameters.h"
#include "IO/CompressionMethod.h"
#include "QueryFuzzer.h"
#include "Suggest.h"
#include "TestHint.h"
@ -197,7 +199,7 @@ private:
std::unique_ptr<ShellCommand> pager_cmd;
/// The user can specify to redirect query output to a file.
std::optional<WriteBufferFromFile> out_file_buf;
std::unique_ptr<WriteBuffer> out_file_buf;
BlockOutputStreamPtr block_out_stream;
/// The user could specify special file for server logs (stderr by default)
@ -2238,8 +2240,18 @@ private:
const auto & out_file_node = query_with_output->out_file->as<ASTLiteral &>();
const auto & out_file = out_file_node.value.safeGet<std::string>();
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
out_buf = &*out_file_buf;
std::string compression_method;
if (query_with_output->compression)
{
const auto & compression_method_node = query_with_output->compression->as<ASTLiteral &>();
compression_method = compression_method_node.value.safeGet<std::string>();
}
out_file_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
chooseCompressionMethod(out_file, compression_method),
/* compression level = */ 3
);
// We are writing to file, so default format is the same as in non-interactive mode.
if (is_interactive && is_default_format)
@ -2259,9 +2271,9 @@ private:
/// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly.
if (!need_render_progress)
block_out_stream = context->getOutputStreamParallelIfPossible(current_format, *out_buf, block);
block_out_stream = context->getOutputStreamParallelIfPossible(current_format, out_file_buf ? *out_file_buf : *out_buf, block);
else
block_out_stream = context->getOutputStream(current_format, *out_buf, block);
block_out_stream = context->getOutputStream(current_format, out_file_buf ? *out_file_buf : *out_buf, block);
block_out_stream->writePrefix();
}

View File

@ -49,6 +49,7 @@
#include <Common/ProfileEvents.h>
#include <Common/SensitiveDataMasker.h>
#include "IO/CompressionMethod.h"
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
@ -1010,22 +1011,38 @@ void executeQuery(
const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
WriteBuffer * out_buf = &ostr;
std::optional<WriteBufferFromFile> out_file_buf;
std::unique_ptr<WriteBuffer> compressed_buffer;
if (ast_query_with_output && ast_query_with_output->out_file)
{
if (!allow_into_outfile)
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
const auto & out_file = ast_query_with_output->out_file->as<ASTLiteral &>().value.safeGet<std::string>();
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
out_buf = &*out_file_buf;
std::string compression_method;
if (ast_query_with_output->compression)
{
const auto & compression_method_node = ast_query_with_output->compression->as<ASTLiteral &>();
compression_method = compression_method_node.value.safeGet<std::string>();
}
compressed_buffer = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
chooseCompressionMethod(out_file, compression_method),
/* compression level = */ 3
);
}
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
auto out = FormatFactory::instance().getOutputStreamParallelIfPossible(format_name, *out_buf, streams.in->getHeader(), context, {}, output_format_settings);
auto out = FormatFactory::instance().getOutputStreamParallelIfPossible(
format_name,
compressed_buffer ? *compressed_buffer : *out_buf,
streams.in->getHeader(),
context,
{},
output_format_settings);
/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context->getProgressCallback();
@ -1049,15 +1066,25 @@ void executeQuery(
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
WriteBuffer * out_buf = &ostr;
std::optional<WriteBufferFromFile> out_file_buf;
std::unique_ptr<WriteBuffer> compressed_buffer;
if (ast_query_with_output && ast_query_with_output->out_file)
{
if (!allow_into_outfile)
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>();
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
out_buf = &*out_file_buf;
std::string compression_method;
if (ast_query_with_output->compression)
{
const auto & compression_method_node = ast_query_with_output->compression->as<ASTLiteral &>();
compression_method = compression_method_node.value.safeGet<std::string>();
}
compressed_buffer = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
chooseCompressionMethod(out_file, compression_method),
/* compression level = */ 3
);
}
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
@ -1071,7 +1098,13 @@ void executeQuery(
return std::make_shared<MaterializingTransform>(header);
});
auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name, *out_buf, pipeline.getHeader(), context, {}, output_format_settings);
auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(
format_name,
compressed_buffer ? *compressed_buffer : *out_buf,
pipeline.getHeader(),
context,
{},
output_format_settings);
out->setAutoFlush();
/// Save previous progress callback if any. TODO Do it more conveniently.

View File

@ -2,6 +2,7 @@
#include <Parsers/IAST.h>
#include <IO/Operators.h>
#include "Parsers/IAST_fwd.h"
namespace DB
@ -16,6 +17,7 @@ public:
ASTPtr out_file;
ASTPtr format;
ASTPtr settings_ast;
ASTPtr compression;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const final;

View File

@ -23,6 +23,7 @@
#include <Parsers/ParserShowPrivilegesQuery.h>
#include <Parsers/ParserExplainQuery.h>
#include <Parsers/QueryWithOutputSettingsPushDownVisitor.h>
#include "Common/Exception.h"
namespace DB
@ -86,6 +87,14 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
if (!out_file_p.parse(pos, query_with_output.out_file, expected))
return false;
ParserKeyword s_compression_method("COMPRESSION");
if (s_compression_method.ignore(pos, expected))
{
ParserStringLiteral compression;
if (!compression.parse(pos, query_with_output.compression, expected))
return false;
}
query_with_output.children.push_back(query_with_output.out_file);
}