From 0f4a58ecaaa53c5a08712060c5b0acf4193bfba9 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 4 Jan 2020 10:31:00 +0300 Subject: [PATCH] Removed bad code around general compression methods --- dbms/programs/server/HTTPHandler.cpp | 31 +----- dbms/src/IO/BrotliWriteBuffer.cpp | 32 +++---- dbms/src/IO/BrotliWriteBuffer.h | 14 +-- dbms/src/IO/CompressionMethod.cpp | 95 +++++++++++++++++++ dbms/src/IO/CompressionMethod.h | 25 +++++ dbms/src/IO/ReadHelpers.h | 25 +---- .../IO/WriteBufferFromHTTPServerResponse.cpp | 57 +++-------- .../IO/WriteBufferFromHTTPServerResponse.h | 19 +--- dbms/src/IO/WriteHelpers.h | 12 --- dbms/src/Storages/IStorage.cpp | 17 ---- dbms/src/Storages/IStorage.h | 3 - dbms/src/Storages/StorageFile.cpp | 15 +-- dbms/src/Storages/StorageHDFS.cpp | 8 +- dbms/src/Storages/StorageS3.cpp | 9 +- dbms/src/Storages/StorageURL.cpp | 31 +++--- dbms/src/Storages/StorageXDBC.cpp | 1 - 16 files changed, 199 insertions(+), 195 deletions(-) create mode 100644 dbms/src/IO/CompressionMethod.cpp diff --git a/dbms/programs/server/HTTPHandler.cpp b/dbms/programs/server/HTTPHandler.cpp index 29d186def2d..4640220de45 100644 --- a/dbms/programs/server/HTTPHandler.cpp +++ b/dbms/programs/server/HTTPHandler.cpp @@ -20,8 +20,6 @@ #include #include #include -#include -#include #include #include #include @@ -317,13 +315,11 @@ void HTTPHandler::processQuery( client_supports_http_compression = true; http_response_compression_method = CompressionMethod::Zlib; } -#if USE_BROTLI else if (http_response_compression_methods == "br") { client_supports_http_compression = true; http_response_compression_method = CompressionMethod::Brotli; } -#endif } /// Client can pass a 'compress' flag in the query string. In this case the query result is @@ -400,32 +396,9 @@ void HTTPHandler::processQuery( std::unique_ptr in_post_raw = std::make_unique(istr); /// Request body can be compressed using algorithm specified in the Content-Encoding header. - std::unique_ptr in_post; String http_request_compression_method_str = request.get("Content-Encoding", ""); - if (!http_request_compression_method_str.empty()) - { - if (http_request_compression_method_str == "gzip") - { - in_post = std::make_unique(std::move(in_post_raw), CompressionMethod::Gzip); - } - else if (http_request_compression_method_str == "deflate") - { - in_post = std::make_unique(std::move(in_post_raw), CompressionMethod::Zlib); - } -#if USE_BROTLI - else if (http_request_compression_method_str == "br") - { - in_post = std::make_unique(std::move(in_post_raw)); - } -#endif - else - { - throw Exception("Unknown Content-Encoding of HTTP request: " + http_request_compression_method_str, - ErrorCodes::UNKNOWN_COMPRESSION_METHOD); - } - } - else - in_post = std::move(in_post_raw); + std::unique_ptr in_post = wrapReadBufferWithCompressionMethod( + std::make_unique(istr), chooseCompressionMethod({}, http_request_compression_method_str)); /// The data can also be compressed using incompatible internal algorithm. This is indicated by /// 'decompress' query parameter. diff --git a/dbms/src/IO/BrotliWriteBuffer.cpp b/dbms/src/IO/BrotliWriteBuffer.cpp index 0a0eeb52956..ac1e2b3c188 100644 --- a/dbms/src/IO/BrotliWriteBuffer.cpp +++ b/dbms/src/IO/BrotliWriteBuffer.cpp @@ -30,14 +30,14 @@ public: BrotliEncoderState * state; }; -BrotliWriteBuffer::BrotliWriteBuffer(WriteBuffer & out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment) - : BufferWithOwnMemory(buf_size, existing_memory, alignment) - , brotli(std::make_unique()) - , in_available(0) - , in_data(nullptr) - , out_capacity(0) - , out_data(nullptr) - , out(out_) +BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment) + : BufferWithOwnMemory(buf_size, existing_memory, alignment) + , brotli(std::make_unique()) + , in_available(0) + , in_data(nullptr) + , out_capacity(0) + , out_data(nullptr) + , out(std::move(out_)) { BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast(compression_level)); // Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81) @@ -68,9 +68,9 @@ void BrotliWriteBuffer::nextImpl() do { - out.nextIfAtEnd(); - out_data = reinterpret_cast(out.position()); - out_capacity = out.buffer().end() - out.position(); + out->nextIfAtEnd(); + out_data = reinterpret_cast(out->position()); + out_capacity = out->buffer().end() - out->position(); int result = BrotliEncoderCompressStream( brotli->state, @@ -81,7 +81,7 @@ void BrotliWriteBuffer::nextImpl() &out_data, nullptr); - out.position() = out.buffer().end() - out_capacity; + out->position() = out->buffer().end() - out_capacity; if (result == 0) { @@ -100,9 +100,9 @@ void BrotliWriteBuffer::finish() while (true) { - out.nextIfAtEnd(); - out_data = reinterpret_cast(out.position()); - out_capacity = out.buffer().end() - out.position(); + out->nextIfAtEnd(); + out_data = reinterpret_cast(out->position()); + out_capacity = out->buffer().end() - out->position(); int result = BrotliEncoderCompressStream( brotli->state, @@ -113,7 +113,7 @@ void BrotliWriteBuffer::finish() &out_data, nullptr); - out.position() = out.buffer().end() - out_capacity; + out->position() = out->buffer().end() - out_capacity; if (BrotliEncoderIsFinished(brotli->state)) { diff --git a/dbms/src/IO/BrotliWriteBuffer.h b/dbms/src/IO/BrotliWriteBuffer.h index 6cc2a4ec4b7..5a294354f49 100644 --- a/dbms/src/IO/BrotliWriteBuffer.h +++ b/dbms/src/IO/BrotliWriteBuffer.h @@ -10,11 +10,11 @@ class BrotliWriteBuffer : public BufferWithOwnMemory { public: BrotliWriteBuffer( - WriteBuffer & out_, - int compression_level, - size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, - char * existing_memory = nullptr, - size_t alignment = 0); + std::unique_ptr out_, + int compression_level, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + char * existing_memory = nullptr, + size_t alignment = 0); ~BrotliWriteBuffer() override; @@ -30,9 +30,9 @@ private: const uint8_t * in_data; size_t out_capacity; - uint8_t * out_data; + uint8_t * out_data; - WriteBuffer & out; + std::unique_ptr out; bool finished = false; }; diff --git a/dbms/src/IO/CompressionMethod.cpp b/dbms/src/IO/CompressionMethod.cpp new file mode 100644 index 00000000000..542a8c541db --- /dev/null +++ b/dbms/src/IO/CompressionMethod.cpp @@ -0,0 +1,95 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + + +std::string toContentEncodingName(CompressionMethod method) +{ + switch (method) + { + case CompressionMethod::Gzip: return "gzip"; + case CompressionMethod::Zlib: return "deflate"; + case CompressionMethod::Brotli: return "br"; + case CompressionMethod::None: return ""; + } + __builtin_unreachable(); +} + + +CompressionMethod chooseCompressionMethod(const std::string & path, const std::string & hint) +{ + std::string file_extension; + if (hint.empty() || hint == "auto") + { + auto pos = path.find_last_of('.'); + if (pos != std::string::npos) + file_extension = path.substr(pos + 1, std::string::npos); + } + + const std::string * method_str = file_extension.empty() ? &hint : &file_extension; + + if (*method_str == "gzip" || *method_str == "gz") + return CompressionMethod::Gzip; + if (*method_str == "deflate") + return CompressionMethod::Zlib; + if (*method_str == "brotli" || *method_str == "br") + return CompressionMethod::Brotli; + if (*method_str == "none") + return CompressionMethod::None; + if (!file_extension.empty()) + return CompressionMethod::None; /// Unrecognized file extension. + + throw Exception("Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'br' are supported as compression methods", + ErrorCodes::NOT_IMPLEMENTED); +} + + +std::unique_ptr wrapReadBufferWithCompressionMethod(std::unique_ptr nested, CompressionMethod method) +{ + if (method == CompressionMethod::Gzip || method == CompressionMethod::Zlib) + return std::make_unique(std::move(nested), method); +#if USE_BROTLI + if (method == CompressionMethod::Brotli) + return std::make_unique(std::move(nested)); +#endif + + if (method == CompressionMethod::None) + return nested; + + throw Exception("Unsupported compression method", ErrorCodes::NOT_IMPLEMENTED); +} + + +std::unique_ptr wrapWriteBufferWithCompressionMethod(std::unique_ptr nested, CompressionMethod method, int level) +{ + if (method == DB::CompressionMethod::Gzip || method == CompressionMethod::Zlib) + return std::make_unique(std::move(nested), method, level); + +#if USE_BROTLI + if (method == DB::CompressionMethod::Brotli) + return std::make_unique(std::move(nested), level); +#endif + + if (method == CompressionMethod::None) + return nested; + + throw Exception("Unsupported compression method", ErrorCodes::NOT_IMPLEMENTED); +} + +} diff --git a/dbms/src/IO/CompressionMethod.h b/dbms/src/IO/CompressionMethod.h index c54d2b581fd..b84c7e26588 100644 --- a/dbms/src/IO/CompressionMethod.h +++ b/dbms/src/IO/CompressionMethod.h @@ -1,8 +1,20 @@ #pragma once +#include +#include + + namespace DB { +class ReadBuffer; +class WriteBuffer; + +/** These are "generally recognizable" compression methods for data import/export. + * Do not mess with more efficient compression methods used by ClickHouse internally + * (they use non-standard framing, indexes, checksums...) + */ + enum class CompressionMethod { /// DEFLATE compression with gzip header and CRC32 checksum. @@ -15,4 +27,17 @@ enum class CompressionMethod None }; +/// How the compression method is named in HTTP. +std::string toContentEncodingName(CompressionMethod method); + +/** Choose compression method from path and hint. + * if hint is "auto" or empty string, then path is analyzed, + * otherwise path parameter is ignored and hint is used as compression method name. + * path is arbitrary string that will be analyzed for file extension (gz, br...) that determines compression. + */ +CompressionMethod chooseCompressionMethod(const std::string & path, const std::string & hint); + +std::unique_ptr wrapReadBufferWithCompressionMethod(std::unique_ptr nested, CompressionMethod method); +std::unique_ptr wrapWriteBufferWithCompressionMethod(std::unique_ptr nested, CompressionMethod method, int level); + } diff --git a/dbms/src/IO/ReadHelpers.h b/dbms/src/IO/ReadHelpers.h index 47206039435..7e5b5ce804f 100644 --- a/dbms/src/IO/ReadHelpers.h +++ b/dbms/src/IO/ReadHelpers.h @@ -29,22 +29,13 @@ #include #include #include +#include #include -#include #include -#ifdef __clang__ -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wdouble-promotion" -#endif - #include -#ifdef __clang__ -#pragma clang diagnostic pop -#endif - /// 1 GiB #define DEFAULT_MAX_STRING_SIZE (1ULL << 30) @@ -1024,21 +1015,11 @@ void skipToNextLineOrEOF(ReadBuffer & buf); /// Skip to next character after next unescaped \n. If no \n in stream, skip to end. Does not throw on invalid escape sequences. void skipToUnescapedNextLineOrEOF(ReadBuffer & buf); -template -std::unique_ptr getReadBuffer(const DB::CompressionMethod method, Types&&... args) -{ - if (method == DB::CompressionMethod::Gzip) - { - auto read_buf = std::make_unique(std::forward(args)...); - return std::make_unique(std::move(read_buf), method); - } - return std::make_unique(args...); -} /** This function just copies the data from buffer's internal position (in.position()) * to current position (from arguments) into memory. */ -void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current); +void saveUpToPosition(ReadBuffer & in, Memory<> & memory, char * current); /** This function is negative to eof(). * In fact it returns whether the data was loaded to internal ReadBuffers's buffer or not. @@ -1047,6 +1028,6 @@ void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current); * of our buffer and the current cursor in the end of the buffer. When we call eof() it calls next(). * And this function can fill the buffer with new data, so we will lose the data from previous buffer state. */ -bool loadAtPosition(ReadBuffer & in, DB::Memory<> & memory, char * & current); +bool loadAtPosition(ReadBuffer & in, Memory<> & memory, char * & current); } diff --git a/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp b/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp index f8bd166a4dd..0a7f574afa1 100644 --- a/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp +++ b/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp @@ -105,58 +105,23 @@ void WriteBufferFromHTTPServerResponse::nextImpl() { if (compress) { - if (compression_method == CompressionMethod::Gzip) - { + auto content_encoding_name = toContentEncodingName(compression_method); + #if defined(POCO_CLICKHOUSE_PATCH) - *response_header_ostr << "Content-Encoding: gzip\r\n"; + *response_header_ostr << "Content-Encoding: " << content_encoding_name << "\r\n"; #else - response.set("Content-Encoding", "gzip"); - response_body_ostr = &(response.send()); -#endif - out_raw = std::make_unique(*response_body_ostr); - deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin()); - out = &*deflating_buf; - } - else if (compression_method == CompressionMethod::Zlib) - { -#if defined(POCO_CLICKHOUSE_PATCH) - *response_header_ostr << "Content-Encoding: deflate\r\n"; -#else - response.set("Content-Encoding", "deflate"); - response_body_ostr = &(response.send()); -#endif - out_raw = std::make_unique(*response_body_ostr); - deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin()); - out = &*deflating_buf; - } -#if USE_BROTLI - else if (compression_method == CompressionMethod::Brotli) - { -#if defined(POCO_CLICKHOUSE_PATCH) - *response_header_ostr << "Content-Encoding: br\r\n"; -#else - response.set("Content-Encoding", "br"); - response_body_ostr = &(response.send()); -#endif - out_raw = std::make_unique(*response_body_ostr); - brotli_buf.emplace(*out_raw, compression_level, working_buffer.size(), working_buffer.begin()); - out = &*brotli_buf; - } + response.set("Content-Encoding", content_encoding_name); + response_body_ostr = &(response.send()); #endif - else - throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse", - ErrorCodes::LOGICAL_ERROR); - /// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy. + out = wrapWriteBufferWithCompressionMethod(std::make_unique(*response_body_ostr), compression_method, compression_level); } else { #if !defined(POCO_CLICKHOUSE_PATCH) response_body_ostr = &(response.send()); #endif - - out_raw = std::make_unique(*response_body_ostr, working_buffer.size(), working_buffer.begin()); - out = &*out_raw; + out = std::make_unique(*response_body_ostr); } } @@ -169,6 +134,9 @@ void WriteBufferFromHTTPServerResponse::nextImpl() out->position() = position(); out->next(); } + + internal_buffer = out->internalBuffer(); + working_buffer = out->buffer(); } @@ -177,9 +145,8 @@ WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse( Poco::Net::HTTPServerResponse & response_, unsigned keep_alive_timeout_, bool compress_, - CompressionMethod compression_method_, - size_t size) - : BufferWithOwnMemory(size) + CompressionMethod compression_method_) + : WriteBuffer(nullptr, 0) , request(request_) , response(response_) , keep_alive_timeout(keep_alive_timeout_) diff --git a/dbms/src/IO/WriteBufferFromHTTPServerResponse.h b/dbms/src/IO/WriteBufferFromHTTPServerResponse.h index 642e59e4921..9cbcab1d626 100644 --- a/dbms/src/IO/WriteBufferFromHTTPServerResponse.h +++ b/dbms/src/IO/WriteBufferFromHTTPServerResponse.h @@ -6,10 +6,8 @@ #include #include #include -#include +#include #include -#include -#include #include #include #include @@ -42,7 +40,7 @@ namespace DB /// Also this class write and flush special X-ClickHouse-Progress HTTP headers /// if no data was sent at the time of progress notification. /// This allows to implement progress bar in HTTP clients. -class WriteBufferFromHTTPServerResponse : public BufferWithOwnMemory +class WriteBufferFromHTTPServerResponse : public WriteBuffer { private: Poco::Net::HTTPServerRequest & request; @@ -52,7 +50,7 @@ private: unsigned keep_alive_timeout = 0; bool compress = false; CompressionMethod compression_method; - int compression_level = Z_DEFAULT_COMPRESSION; + int compression_level = 1; std::ostream * response_body_ostr = nullptr; @@ -60,13 +58,7 @@ private: std::ostream * response_header_ostr = nullptr; #endif - std::unique_ptr out_raw; - std::optional deflating_buf; -#if USE_BROTLI - std::optional brotli_buf; -#endif - - WriteBuffer * out = nullptr; /// Uncompressed HTTP body is written to this buffer. Points to out_raw or possibly to deflating_buf. + std::unique_ptr out; bool headers_started_sending = false; bool headers_finished_sending = false; /// If true, you could not add any headers. @@ -99,8 +91,7 @@ public: Poco::Net::HTTPServerResponse & response_, unsigned keep_alive_timeout_, bool compress_ = false, /// If true - set Content-Encoding header and compress the result. - CompressionMethod compression_method_ = CompressionMethod::Gzip, - size_t size = DBMS_DEFAULT_BUFFER_SIZE); + CompressionMethod compression_method_ = CompressionMethod::Gzip); /// Writes progess in repeating HTTP headers. void onProgress(const Progress & progress); diff --git a/dbms/src/IO/WriteHelpers.h b/dbms/src/IO/WriteHelpers.h index 082bf63e6b7..328f7b030cc 100644 --- a/dbms/src/IO/WriteHelpers.h +++ b/dbms/src/IO/WriteHelpers.h @@ -26,7 +26,6 @@ #include #include #include -#include #include @@ -955,15 +954,4 @@ inline String toString(const T & x) return buf.str(); } -template -std::unique_ptr getWriteBuffer(const DB::CompressionMethod method, Types&&... args) -{ - if (method == DB::CompressionMethod::Gzip) - { - auto write_buf = std::make_unique(std::forward(args)...); - return std::make_unique(std::move(write_buf), method, 1 /* compression level */); - } - return std::make_unique(args...); -} - } diff --git a/dbms/src/Storages/IStorage.cpp b/dbms/src/Storages/IStorage.cpp index 9dabfe0b604..e48e9896597 100644 --- a/dbms/src/Storages/IStorage.cpp +++ b/dbms/src/Storages/IStorage.cpp @@ -425,21 +425,4 @@ BlockInputStreams IStorage::read( return res; } -DB::CompressionMethod IStorage::chooseCompressionMethod(const String & uri, const String & compression_method) -{ - if (compression_method == "auto" || compression_method == "") - { - if (endsWith(uri, ".gz")) - return DB::CompressionMethod::Gzip; - else - return DB::CompressionMethod::None; - } - else if (compression_method == "gzip") - return DB::CompressionMethod::Gzip; - else if (compression_method == "none") - return DB::CompressionMethod::None; - else - throw Exception("Only auto, none, gzip supported as compression method", ErrorCodes::NOT_IMPLEMENTED); -} - } diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index 8f8a363aec1..69bbca86879 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -440,8 +439,6 @@ public: return {}; } - static DB::CompressionMethod chooseCompressionMethod(const String & uri, const String & compression_method); - private: /// You always need to take the next three locks in this order. diff --git a/dbms/src/Storages/StorageFile.cpp b/dbms/src/Storages/StorageFile.cpp index 64a603717e2..c473d4cd5f9 100644 --- a/dbms/src/Storages/StorageFile.cpp +++ b/dbms/src/Storages/StorageFile.cpp @@ -200,12 +200,12 @@ public: } storage->table_fd_was_used = true; - read_buf = getReadBuffer(compression_method, storage->table_fd); + read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(storage->table_fd), compression_method); } else { shared_lock = std::shared_lock(storage->rwlock); - read_buf = getReadBuffer(compression_method, file_path); + read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(file_path), compression_method); } reader = FormatFactory::instance().getInput(storage->format_name, *read_buf, storage->getSampleBlock(), context, max_block_size); @@ -266,7 +266,7 @@ BlockInputStreams StorageFile::read( for (const auto & file_path : paths) { BlockInputStreamPtr cur_block = std::make_shared( - std::static_pointer_cast(shared_from_this()), context, max_block_size, file_path, IStorage::chooseCompressionMethod(file_path, compression_method)); + std::static_pointer_cast(shared_from_this()), context, max_block_size, file_path, chooseCompressionMethod(file_path, compression_method)); blocks_input.push_back(column_defaults.empty() ? cur_block : std::make_shared(cur_block, column_defaults, context)); } return narrowBlockInputStreams(blocks_input, num_streams); @@ -288,13 +288,15 @@ public: * INSERT data; SELECT *; last SELECT returns only insert_data */ storage.table_fd_was_used = true; - write_buf = getWriteBuffer(compression_method, storage.table_fd); + write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique(storage.table_fd), compression_method, 1); } else { if (storage.paths.size() != 1) throw Exception("Table '" + storage.table_name + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED); - write_buf = getWriteBuffer(compression_method, storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT); + write_buf = wrapWriteBufferWithCompressionMethod( + std::make_unique(storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT), + compression_method, 1); } writer = FormatFactory::instance().getOutput(storage.format_name, *write_buf, storage.getSampleBlock(), context); @@ -333,8 +335,7 @@ BlockOutputStreamPtr StorageFile::write( const ASTPtr & /*query*/, const Context & context) { - return std::make_shared(*this, - IStorage::chooseCompressionMethod(paths[0], compression_method), context); + return std::make_shared(*this, chooseCompressionMethod(paths[0], compression_method), context); } Strings StorageFile::getDataPaths() const diff --git a/dbms/src/Storages/StorageHDFS.cpp b/dbms/src/Storages/StorageHDFS.cpp index 3f1386cca5e..1f6ca6d4893 100644 --- a/dbms/src/Storages/StorageHDFS.cpp +++ b/dbms/src/Storages/StorageHDFS.cpp @@ -67,7 +67,7 @@ public: UInt64 max_block_size, const CompressionMethod compression_method) { - auto read_buf = getReadBuffer(compression_method, uri); + auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(uri), compression_method); auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); reader = std::make_shared>(input_stream, std::move(read_buf)); @@ -112,7 +112,7 @@ public: const CompressionMethod compression_method) : sample_block(sample_block_) { - write_buf = getWriteBuffer(compression_method, uri); + write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique(uri), compression_method, 1); writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); } @@ -213,7 +213,7 @@ BlockInputStreams StorageHDFS::read( for (const auto & res_path : res_paths) { result.push_back(std::make_shared(uri_without_path + res_path, format_name, getSampleBlock(), context_, - max_block_size, IStorage::chooseCompressionMethod(res_path, compression_method))); + max_block_size, chooseCompressionMethod(res_path, compression_method))); } return narrowBlockInputStreams(result, num_streams); @@ -231,7 +231,7 @@ BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const Context format_name, getSampleBlock(), context, - IStorage::chooseCompressionMethod(uri, compression_method)); + chooseCompressionMethod(uri, compression_method)); } void registerStorageHDFS(StorageFactory & factory) diff --git a/dbms/src/Storages/StorageS3.cpp b/dbms/src/Storages/StorageS3.cpp index cf0b3df44fd..e848eb655a0 100644 --- a/dbms/src/Storages/StorageS3.cpp +++ b/dbms/src/Storages/StorageS3.cpp @@ -49,7 +49,7 @@ namespace const String & key) : name(name_) { - read_buf = getReadBuffer(compression_method, client, bucket, key); + read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(client, bucket, key), compression_method); reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); } @@ -98,7 +98,8 @@ namespace const String & key) : sample_block(sample_block_) { - write_buf = getWriteBuffer(compression_method, client, bucket, key, min_upload_part_size); + write_buf = wrapWriteBufferWithCompressionMethod( + std::make_unique(client, bucket, key, min_upload_part_size), compression_method, 1); writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); } @@ -173,7 +174,7 @@ BlockInputStreams StorageS3::read( getHeaderBlock(column_names), context, max_block_size, - IStorage::chooseCompressionMethod(uri.endpoint, compression_method), + chooseCompressionMethod(uri.endpoint, compression_method), client, uri.bucket, uri.key); @@ -194,7 +195,7 @@ BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const Context & { return std::make_shared( format_name, min_upload_part_size, getSampleBlock(), context_global, - IStorage::chooseCompressionMethod(uri.endpoint, compression_method), + chooseCompressionMethod(uri.endpoint, compression_method), client, uri.bucket, uri.key); } diff --git a/dbms/src/Storages/StorageURL.cpp b/dbms/src/Storages/StorageURL.cpp index 907e18b21cf..c8deeb24bc1 100644 --- a/dbms/src/Storages/StorageURL.cpp +++ b/dbms/src/Storages/StorageURL.cpp @@ -60,17 +60,18 @@ namespace const CompressionMethod compression_method) : name(name_) { - read_buf = getReadBuffer( - compression_method, - uri, - method, - callback, - timeouts, - context.getSettingsRef().max_http_get_redirects, - Poco::Net::HTTPBasicCredentials{}, - DBMS_DEFAULT_BUFFER_SIZE, - ReadWriteBufferFromHTTP::HTTPHeaderEntries{}, - context.getRemoteHostFilter()); + read_buf = wrapReadBufferWithCompressionMethod( + std::make_unique( + uri, + method, + callback, + timeouts, + context.getSettingsRef().max_http_get_redirects, + Poco::Net::HTTPBasicCredentials{}, + DBMS_DEFAULT_BUFFER_SIZE, + ReadWriteBufferFromHTTP::HTTPHeaderEntries{}, + context.getRemoteHostFilter()), + compression_method); reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); } @@ -117,7 +118,9 @@ namespace const CompressionMethod compression_method) : sample_block(sample_block_) { - write_buf = getWriteBuffer(compression_method, uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts); + write_buf = wrapWriteBufferWithCompressionMethod( + std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts), + compression_method, 1); writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); } @@ -196,7 +199,7 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names, context, max_block_size, ConnectionTimeouts::getHTTPTimeouts(context), - IStorage::chooseCompressionMethod(request_uri.getPath(), compression_method)); + chooseCompressionMethod(request_uri.getPath(), compression_method)); auto column_defaults = getColumns().getDefaults(); if (column_defaults.empty()) @@ -215,7 +218,7 @@ BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Cont return std::make_shared( uri, format_name, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global), - IStorage::chooseCompressionMethod(uri.toString(), compression_method)); + chooseCompressionMethod(uri.toString(), compression_method)); } void registerStorageURL(StorageFactory & factory) diff --git a/dbms/src/Storages/StorageXDBC.cpp b/dbms/src/Storages/StorageXDBC.cpp index 222eebd6377..0dcbf372b28 100644 --- a/dbms/src/Storages/StorageXDBC.cpp +++ b/dbms/src/Storages/StorageXDBC.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include