From 6f56c3280fec07611dd11a60dc0bbafa46384d94 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 14 Apr 2021 00:53:39 +0300 Subject: [PATCH] Uncompress data in Distributed sends if needed --- src/Client/Connection.h | 2 ++ src/Interpreters/Cluster.cpp | 1 - src/Storages/Distributed/DirectoryMonitor.cpp | 21 ++++++++++++++++--- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/Client/Connection.h b/src/Client/Connection.h index 65ed956a60b..d4cd98ea5bc 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -139,6 +139,8 @@ public: UInt16 getPort() const; const String & getDefaultDatabase() const; + Protocol::Compression getCompression() const { return compression; } + /// If last flag is true, you need to call sendExternalTablesData after. void sendQuery( const ConnectionTimeouts & timeouts, diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 2f5acad873b..48b33589087 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -111,7 +111,6 @@ Cluster::Address::Address( /// NOTE: it's still enabled when interacting with servers on different port, but we don't want to complicate the logic. compression = config.getBool(config_prefix + ".compression", !is_local) ? Protocol::Compression::Enable : Protocol::Compression::Disable; - } diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 2afa9747c60..ad23892823f 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -231,7 +231,12 @@ namespace block_in.readSuffix(); } - void writeRemoteConvert(const DistributedHeader & distributed_header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log) + void writeRemoteConvert( + const DistributedHeader & distributed_header, + RemoteBlockOutputStream & remote, + bool compression_expected, + ReadBufferFromFile & in, + Poco::Logger * log) { if (!remote.getHeader()) { @@ -262,6 +267,14 @@ namespace return; } + /// If connection does not use compression, we have to uncompress the data. + if (!compression_expected) + { + writeAndConvert(remote, in); + return; + } + + /// Otherwise write data as it was already prepared (more efficient path). CheckingCompressedReadBuffer checking_in(in); remote.writePrepared(checking_in); } @@ -545,7 +558,8 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa distributed_header.insert_settings, distributed_header.client_info}; remote.writePrefix(); - writeRemoteConvert(distributed_header, remote, in, log); + bool compression_expected = connection->getCompression() == Protocol::Compression::Enable; + writeRemoteConvert(distributed_header, remote, compression_expected, in, log); remote.writeSuffix(); } catch (const Exception & e) @@ -690,7 +704,8 @@ struct StorageDistributedDirectoryMonitor::Batch distributed_header.client_info); remote->writePrefix(); } - writeRemoteConvert(distributed_header, *remote, in, parent.log); + bool compression_expected = connection->getCompression() == Protocol::Compression::Enable; + writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log); } if (remote)