Removed bad code around general compression methods

This commit is contained in:
Alexey Milovidov 2020-01-04 10:31:00 +03:00
parent 42226b1a96
commit 0f4a58ecaa
16 changed files with 199 additions and 195 deletions

View File

@ -20,8 +20,6 @@
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <IO/BrotliReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
@ -317,13 +315,11 @@ void HTTPHandler::processQuery(
client_supports_http_compression = true;
http_response_compression_method = CompressionMethod::Zlib;
}
#if USE_BROTLI
else if (http_response_compression_methods == "br")
{
client_supports_http_compression = true;
http_response_compression_method = CompressionMethod::Brotli;
}
#endif
}
/// Client can pass a 'compress' flag in the query string. In this case the query result is
@ -400,32 +396,9 @@ void HTTPHandler::processQuery(
std::unique_ptr<ReadBuffer> in_post_raw = std::make_unique<ReadBufferFromIStream>(istr);
/// Request body can be compressed using algorithm specified in the Content-Encoding header.
std::unique_ptr<ReadBuffer> in_post;
String http_request_compression_method_str = request.get("Content-Encoding", "");
if (!http_request_compression_method_str.empty())
{
if (http_request_compression_method_str == "gzip")
{
in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Gzip);
}
else if (http_request_compression_method_str == "deflate")
{
in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Zlib);
}
#if USE_BROTLI
else if (http_request_compression_method_str == "br")
{
in_post = std::make_unique<BrotliReadBuffer>(std::move(in_post_raw));
}
#endif
else
{
throw Exception("Unknown Content-Encoding of HTTP request: " + http_request_compression_method_str,
ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}
}
else
in_post = std::move(in_post_raw);
std::unique_ptr<ReadBuffer> in_post = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromIStream>(istr), chooseCompressionMethod({}, http_request_compression_method_str));
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
/// 'decompress' query parameter.

View File

@ -30,14 +30,14 @@ public:
BrotliEncoderState * state;
};
BrotliWriteBuffer::BrotliWriteBuffer(WriteBuffer & out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
, out(out_)
BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
, out(std::move(out_))
{
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast<uint32_t>(compression_level));
// Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81)
@ -68,9 +68,9 @@ void BrotliWriteBuffer::nextImpl()
do
{
out.nextIfAtEnd();
out_data = reinterpret_cast<unsigned char *>(out.position());
out_capacity = out.buffer().end() - out.position();
out->nextIfAtEnd();
out_data = reinterpret_cast<unsigned char *>(out->position());
out_capacity = out->buffer().end() - out->position();
int result = BrotliEncoderCompressStream(
brotli->state,
@ -81,7 +81,7 @@ void BrotliWriteBuffer::nextImpl()
&out_data,
nullptr);
out.position() = out.buffer().end() - out_capacity;
out->position() = out->buffer().end() - out_capacity;
if (result == 0)
{
@ -100,9 +100,9 @@ void BrotliWriteBuffer::finish()
while (true)
{
out.nextIfAtEnd();
out_data = reinterpret_cast<unsigned char *>(out.position());
out_capacity = out.buffer().end() - out.position();
out->nextIfAtEnd();
out_data = reinterpret_cast<unsigned char *>(out->position());
out_capacity = out->buffer().end() - out->position();
int result = BrotliEncoderCompressStream(
brotli->state,
@ -113,7 +113,7 @@ void BrotliWriteBuffer::finish()
&out_data,
nullptr);
out.position() = out.buffer().end() - out_capacity;
out->position() = out->buffer().end() - out_capacity;
if (BrotliEncoderIsFinished(brotli->state))
{

View File

@ -10,11 +10,11 @@ class BrotliWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
BrotliWriteBuffer(
WriteBuffer & out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
std::unique_ptr<WriteBuffer> out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
~BrotliWriteBuffer() override;
@ -30,9 +30,9 @@ private:
const uint8_t * in_data;
size_t out_capacity;
uint8_t * out_data;
uint8_t * out_data;
WriteBuffer & out;
std::unique_ptr<WriteBuffer> out;
bool finished = false;
};

View File

@ -0,0 +1,95 @@
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <IO/BrotliReadBuffer.h>
#include <IO/BrotliWriteBuffer.h>
#include <Common/config.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
std::string toContentEncodingName(CompressionMethod method)
{
switch (method)
{
case CompressionMethod::Gzip: return "gzip";
case CompressionMethod::Zlib: return "deflate";
case CompressionMethod::Brotli: return "br";
case CompressionMethod::None: return "";
}
__builtin_unreachable();
}
CompressionMethod chooseCompressionMethod(const std::string & path, const std::string & hint)
{
std::string file_extension;
if (hint.empty() || hint == "auto")
{
auto pos = path.find_last_of('.');
if (pos != std::string::npos)
file_extension = path.substr(pos + 1, std::string::npos);
}
const std::string * method_str = file_extension.empty() ? &hint : &file_extension;
if (*method_str == "gzip" || *method_str == "gz")
return CompressionMethod::Gzip;
if (*method_str == "deflate")
return CompressionMethod::Zlib;
if (*method_str == "brotli" || *method_str == "br")
return CompressionMethod::Brotli;
if (*method_str == "none")
return CompressionMethod::None;
if (!file_extension.empty())
return CompressionMethod::None; /// Unrecognized file extension.
throw Exception("Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'br' are supported as compression methods",
ErrorCodes::NOT_IMPLEMENTED);
}
std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(std::unique_ptr<ReadBuffer> nested, CompressionMethod method)
{
if (method == CompressionMethod::Gzip || method == CompressionMethod::Zlib)
return std::make_unique<ZlibInflatingReadBuffer>(std::move(nested), method);
#if USE_BROTLI
if (method == CompressionMethod::Brotli)
return std::make_unique<BrotliReadBuffer>(std::move(nested));
#endif
if (method == CompressionMethod::None)
return nested;
throw Exception("Unsupported compression method", ErrorCodes::NOT_IMPLEMENTED);
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level)
{
if (method == DB::CompressionMethod::Gzip || method == CompressionMethod::Zlib)
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(nested), method, level);
#if USE_BROTLI
if (method == DB::CompressionMethod::Brotli)
return std::make_unique<BrotliWriteBuffer>(std::move(nested), level);
#endif
if (method == CompressionMethod::None)
return nested;
throw Exception("Unsupported compression method", ErrorCodes::NOT_IMPLEMENTED);
}
}

View File

@ -1,8 +1,20 @@
#pragma once
#include <string>
#include <memory>
namespace DB
{
class ReadBuffer;
class WriteBuffer;
/** These are "generally recognizable" compression methods for data import/export.
* Do not mess with more efficient compression methods used by ClickHouse internally
* (they use non-standard framing, indexes, checksums...)
*/
enum class CompressionMethod
{
/// DEFLATE compression with gzip header and CRC32 checksum.
@ -15,4 +27,17 @@ enum class CompressionMethod
None
};
/// How the compression method is named in HTTP.
std::string toContentEncodingName(CompressionMethod method);
/** Choose compression method from path and hint.
* if hint is "auto" or empty string, then path is analyzed,
* otherwise path parameter is ignored and hint is used as compression method name.
* path is arbitrary string that will be analyzed for file extension (gz, br...) that determines compression.
*/
CompressionMethod chooseCompressionMethod(const std::string & path, const std::string & hint);
std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(std::unique_ptr<ReadBuffer> nested, CompressionMethod method);
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level);
}

View File

@ -29,22 +29,13 @@
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/VarInt.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <DataTypes/DataTypeDateTime.h>
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdouble-promotion"
#endif
#include <double-conversion/double-conversion.h>
#ifdef __clang__
#pragma clang diagnostic pop
#endif
/// 1 GiB
#define DEFAULT_MAX_STRING_SIZE (1ULL << 30)
@ -1024,21 +1015,11 @@ void skipToNextLineOrEOF(ReadBuffer & buf);
/// Skip to next character after next unescaped \n. If no \n in stream, skip to end. Does not throw on invalid escape sequences.
void skipToUnescapedNextLineOrEOF(ReadBuffer & buf);
template <class TReadBuffer, class... Types>
std::unique_ptr<ReadBuffer> getReadBuffer(const DB::CompressionMethod method, Types&&... args)
{
if (method == DB::CompressionMethod::Gzip)
{
auto read_buf = std::make_unique<TReadBuffer>(std::forward<Types>(args)...);
return std::make_unique<ZlibInflatingReadBuffer>(std::move(read_buf), method);
}
return std::make_unique<TReadBuffer>(args...);
}
/** This function just copies the data from buffer's internal position (in.position())
* to current position (from arguments) into memory.
*/
void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current);
void saveUpToPosition(ReadBuffer & in, Memory<> & memory, char * current);
/** This function is negative to eof().
* In fact it returns whether the data was loaded to internal ReadBuffers's buffer or not.
@ -1047,6 +1028,6 @@ void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current);
* of our buffer and the current cursor in the end of the buffer. When we call eof() it calls next().
* And this function can fill the buffer with new data, so we will lose the data from previous buffer state.
*/
bool loadAtPosition(ReadBuffer & in, DB::Memory<> & memory, char * & current);
bool loadAtPosition(ReadBuffer & in, Memory<> & memory, char * & current);
}

View File

@ -105,58 +105,23 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
{
if (compress)
{
if (compression_method == CompressionMethod::Gzip)
{
auto content_encoding_name = toContentEncodingName(compression_method);
#if defined(POCO_CLICKHOUSE_PATCH)
*response_header_ostr << "Content-Encoding: gzip\r\n";
*response_header_ostr << "Content-Encoding: " << content_encoding_name << "\r\n";
#else
response.set("Content-Encoding", "gzip");
response_body_ostr = &(response.send());
#endif
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out = &*deflating_buf;
}
else if (compression_method == CompressionMethod::Zlib)
{
#if defined(POCO_CLICKHOUSE_PATCH)
*response_header_ostr << "Content-Encoding: deflate\r\n";
#else
response.set("Content-Encoding", "deflate");
response_body_ostr = &(response.send());
#endif
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out = &*deflating_buf;
}
#if USE_BROTLI
else if (compression_method == CompressionMethod::Brotli)
{
#if defined(POCO_CLICKHOUSE_PATCH)
*response_header_ostr << "Content-Encoding: br\r\n";
#else
response.set("Content-Encoding", "br");
response_body_ostr = &(response.send());
#endif
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
brotli_buf.emplace(*out_raw, compression_level, working_buffer.size(), working_buffer.begin());
out = &*brotli_buf;
}
response.set("Content-Encoding", content_encoding_name);
response_body_ostr = &(response.send());
#endif
else
throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse",
ErrorCodes::LOGICAL_ERROR);
/// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy.
out = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromOStream>(*response_body_ostr), compression_method, compression_level);
}
else
{
#if !defined(POCO_CLICKHOUSE_PATCH)
response_body_ostr = &(response.send());
#endif
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr, working_buffer.size(), working_buffer.begin());
out = &*out_raw;
out = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
}
}
@ -169,6 +134,9 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
out->position() = position();
out->next();
}
internal_buffer = out->internalBuffer();
working_buffer = out->buffer();
}
@ -177,9 +145,8 @@ WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
Poco::Net::HTTPServerResponse & response_,
unsigned keep_alive_timeout_,
bool compress_,
CompressionMethod compression_method_,
size_t size)
: BufferWithOwnMemory<WriteBuffer>(size)
CompressionMethod compression_method_)
: WriteBuffer(nullptr, 0)
, request(request_)
, response(response_)
, keep_alive_timeout(keep_alive_timeout_)

View File

@ -6,10 +6,8 @@
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Version.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <IO/BrotliWriteBuffer.h>
#include <IO/HTTPCommon.h>
#include <IO/Progress.h>
#include <Common/NetException.h>
@ -42,7 +40,7 @@ namespace DB
/// Also this class write and flush special X-ClickHouse-Progress HTTP headers
/// if no data was sent at the time of progress notification.
/// This allows to implement progress bar in HTTP clients.
class WriteBufferFromHTTPServerResponse : public BufferWithOwnMemory<WriteBuffer>
class WriteBufferFromHTTPServerResponse : public WriteBuffer
{
private:
Poco::Net::HTTPServerRequest & request;
@ -52,7 +50,7 @@ private:
unsigned keep_alive_timeout = 0;
bool compress = false;
CompressionMethod compression_method;
int compression_level = Z_DEFAULT_COMPRESSION;
int compression_level = 1;
std::ostream * response_body_ostr = nullptr;
@ -60,13 +58,7 @@ private:
std::ostream * response_header_ostr = nullptr;
#endif
std::unique_ptr<WriteBufferFromOStream> out_raw;
std::optional<ZlibDeflatingWriteBuffer> deflating_buf;
#if USE_BROTLI
std::optional<BrotliWriteBuffer> brotli_buf;
#endif
WriteBuffer * out = nullptr; /// Uncompressed HTTP body is written to this buffer. Points to out_raw or possibly to deflating_buf.
std::unique_ptr<WriteBuffer> out;
bool headers_started_sending = false;
bool headers_finished_sending = false; /// If true, you could not add any headers.
@ -99,8 +91,7 @@ public:
Poco::Net::HTTPServerResponse & response_,
unsigned keep_alive_timeout_,
bool compress_ = false, /// If true - set Content-Encoding header and compress the result.
CompressionMethod compression_method_ = CompressionMethod::Gzip,
size_t size = DBMS_DEFAULT_BUFFER_SIZE);
CompressionMethod compression_method_ = CompressionMethod::Gzip);
/// Writes progess in repeating HTTP headers.
void onProgress(const Progress & progress);

View File

@ -26,7 +26,6 @@
#include <IO/VarInt.h>
#include <IO/DoubleConverter.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <Formats/FormatSettings.h>
@ -955,15 +954,4 @@ inline String toString(const T & x)
return buf.str();
}
template <class TWriteBuffer, class... Types>
std::unique_ptr<WriteBuffer> getWriteBuffer(const DB::CompressionMethod method, Types&&... args)
{
if (method == DB::CompressionMethod::Gzip)
{
auto write_buf = std::make_unique<TWriteBuffer>(std::forward<Types>(args)...);
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(write_buf), method, 1 /* compression level */);
}
return std::make_unique<TWriteBuffer>(args...);
}
}

View File

@ -425,21 +425,4 @@ BlockInputStreams IStorage::read(
return res;
}
DB::CompressionMethod IStorage::chooseCompressionMethod(const String & uri, const String & compression_method)
{
if (compression_method == "auto" || compression_method == "")
{
if (endsWith(uri, ".gz"))
return DB::CompressionMethod::Gzip;
else
return DB::CompressionMethod::None;
}
else if (compression_method == "gzip")
return DB::CompressionMethod::Gzip;
else if (compression_method == "none")
return DB::CompressionMethod::None;
else
throw Exception("Only auto, none, gzip supported as compression method", ErrorCodes::NOT_IMPLEMENTED);
}
}

View File

@ -5,7 +5,6 @@
#include <DataStreams/IBlockStream_fwd.h>
#include <Databases/IDatabase.h>
#include <Interpreters/CancellationCode.h>
#include <IO/CompressionMethod.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/TableStructureLockHolder.h>
@ -440,8 +439,6 @@ public:
return {};
}
static DB::CompressionMethod chooseCompressionMethod(const String & uri, const String & compression_method);
private:
/// You always need to take the next three locks in this order.

View File

@ -200,12 +200,12 @@ public:
}
storage->table_fd_was_used = true;
read_buf = getReadBuffer<ReadBufferFromFileDescriptor>(compression_method, storage->table_fd);
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromFileDescriptor>(storage->table_fd), compression_method);
}
else
{
shared_lock = std::shared_lock(storage->rwlock);
read_buf = getReadBuffer<ReadBufferFromFile>(compression_method, file_path);
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromFile>(file_path), compression_method);
}
reader = FormatFactory::instance().getInput(storage->format_name, *read_buf, storage->getSampleBlock(), context, max_block_size);
@ -266,7 +266,7 @@ BlockInputStreams StorageFile::read(
for (const auto & file_path : paths)
{
BlockInputStreamPtr cur_block = std::make_shared<StorageFileBlockInputStream>(
std::static_pointer_cast<StorageFile>(shared_from_this()), context, max_block_size, file_path, IStorage::chooseCompressionMethod(file_path, compression_method));
std::static_pointer_cast<StorageFile>(shared_from_this()), context, max_block_size, file_path, chooseCompressionMethod(file_path, compression_method));
blocks_input.push_back(column_defaults.empty() ? cur_block : std::make_shared<AddingDefaultsBlockInputStream>(cur_block, column_defaults, context));
}
return narrowBlockInputStreams(blocks_input, num_streams);
@ -288,13 +288,15 @@ public:
* INSERT data; SELECT *; last SELECT returns only insert_data
*/
storage.table_fd_was_used = true;
write_buf = getWriteBuffer<WriteBufferFromFileDescriptor>(compression_method, storage.table_fd);
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromFileDescriptor>(storage.table_fd), compression_method, 1);
}
else
{
if (storage.paths.size() != 1)
throw Exception("Table '" + storage.table_name + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
write_buf = getWriteBuffer<WriteBufferFromFile>(compression_method, storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromFile>(storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT),
compression_method, 1);
}
writer = FormatFactory::instance().getOutput(storage.format_name, *write_buf, storage.getSampleBlock(), context);
@ -333,8 +335,7 @@ BlockOutputStreamPtr StorageFile::write(
const ASTPtr & /*query*/,
const Context & context)
{
return std::make_shared<StorageFileBlockOutputStream>(*this,
IStorage::chooseCompressionMethod(paths[0], compression_method), context);
return std::make_shared<StorageFileBlockOutputStream>(*this, chooseCompressionMethod(paths[0], compression_method), context);
}
Strings StorageFile::getDataPaths() const

View File

@ -67,7 +67,7 @@ public:
UInt64 max_block_size,
const CompressionMethod compression_method)
{
auto read_buf = getReadBuffer<ReadBufferFromHDFS>(compression_method, uri);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri), compression_method);
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
@ -112,7 +112,7 @@ public:
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = getWriteBuffer<WriteBufferFromHDFS>(compression_method, uri);
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri), compression_method, 1);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
@ -213,7 +213,7 @@ BlockInputStreams StorageHDFS::read(
for (const auto & res_path : res_paths)
{
result.push_back(std::make_shared<HDFSBlockInputStream>(uri_without_path + res_path, format_name, getSampleBlock(), context_,
max_block_size, IStorage::chooseCompressionMethod(res_path, compression_method)));
max_block_size, chooseCompressionMethod(res_path, compression_method)));
}
return narrowBlockInputStreams(result, num_streams);
@ -231,7 +231,7 @@ BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const Context
format_name,
getSampleBlock(),
context,
IStorage::chooseCompressionMethod(uri, compression_method));
chooseCompressionMethod(uri, compression_method));
}
void registerStorageHDFS(StorageFactory & factory)

View File

@ -49,7 +49,7 @@ namespace
const String & key)
: name(name_)
{
read_buf = getReadBuffer<ReadBufferFromS3>(compression_method, client, bucket, key);
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
}
@ -98,7 +98,8 @@ namespace
const String & key)
: sample_block(sample_block_)
{
write_buf = getWriteBuffer<WriteBufferFromS3>(compression_method, client, bucket, key, min_upload_part_size);
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size), compression_method, 1);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
@ -173,7 +174,7 @@ BlockInputStreams StorageS3::read(
getHeaderBlock(column_names),
context,
max_block_size,
IStorage::chooseCompressionMethod(uri.endpoint, compression_method),
chooseCompressionMethod(uri.endpoint, compression_method),
client,
uri.bucket,
uri.key);
@ -194,7 +195,7 @@ BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const Context &
{
return std::make_shared<StorageS3BlockOutputStream>(
format_name, min_upload_part_size, getSampleBlock(), context_global,
IStorage::chooseCompressionMethod(uri.endpoint, compression_method),
chooseCompressionMethod(uri.endpoint, compression_method),
client, uri.bucket, uri.key);
}

View File

@ -60,17 +60,18 @@ namespace
const CompressionMethod compression_method)
: name(name_)
{
read_buf = getReadBuffer<ReadWriteBufferFromHTTP>(
compression_method,
uri,
method,
callback,
timeouts,
context.getSettingsRef().max_http_get_redirects,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
ReadWriteBufferFromHTTP::HTTPHeaderEntries{},
context.getRemoteHostFilter());
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
uri,
method,
callback,
timeouts,
context.getSettingsRef().max_http_get_redirects,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
ReadWriteBufferFromHTTP::HTTPHeaderEntries{},
context.getRemoteHostFilter()),
compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
}
@ -117,7 +118,9 @@ namespace
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = getWriteBuffer<WriteBufferFromHTTP>(compression_method, uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts);
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
compression_method, 1);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
@ -196,7 +199,7 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
context,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context),
IStorage::chooseCompressionMethod(request_uri.getPath(), compression_method));
chooseCompressionMethod(request_uri.getPath(), compression_method));
auto column_defaults = getColumns().getDefaults();
if (column_defaults.empty())
@ -215,7 +218,7 @@ BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Cont
return std::make_shared<StorageURLBlockOutputStream>(
uri, format_name, getSampleBlock(), context_global,
ConnectionTimeouts::getHTTPTimeouts(context_global),
IStorage::chooseCompressionMethod(uri.toString(), compression_method));
chooseCompressionMethod(uri.toString(), compression_method));
}
void registerStorageURL(StorageFactory & factory)

View File

@ -7,7 +7,6 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <Formats/FormatFactory.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/File.h>