Merge pull request #22237 from ClickHouse/protocol-compression-auto

Autodetect compression #22234
This commit is contained in:
alexey-milovidov 2021-04-18 03:13:54 +03:00 committed by GitHub
commit 7f3a40e1b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 32 additions and 7 deletions

View File

@ -7,6 +7,8 @@
#include <IO/ConnectionTimeouts.h> #include <IO/ConnectionTimeouts.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/isLocalAddress.h>
#include <Common/DNSResolver.h>
#include <common/setTerminalEcho.h> #include <common/setTerminalEcho.h>
#include <ext/scope_guard.h> #include <ext/scope_guard.h>
@ -60,7 +62,9 @@ ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfigurati
#endif #endif
} }
compression = config.getBool("compression", true) ? Protocol::Compression::Enable : Protocol::Compression::Disable; /// By default compression is disabled if address looks like localhost.
compression = config.getBool("compression", !isLocalAddress(DNSResolver::instance().resolveHost(host)))
? Protocol::Compression::Enable : Protocol::Compression::Disable;
timeouts = ConnectionTimeouts( timeouts = ConnectionTimeouts(
Poco::Timespan(config.getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0), Poco::Timespan(config.getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),

View File

@ -139,6 +139,8 @@ public:
UInt16 getPort() const; UInt16 getPort() const;
const String & getDefaultDatabase() const; const String & getDefaultDatabase() const;
Protocol::Compression getCompression() const { return compression; }
/// If last flag is true, you need to call sendExternalTablesData after. /// If last flag is true, you need to call sendExternalTablesData after.
void sendQuery( void sendQuery(
const ConnectionTimeouts & timeouts, const ConnectionTimeouts & timeouts,

View File

@ -103,10 +103,14 @@ Cluster::Address::Address(
password = config.getString(config_prefix + ".password", ""); password = config.getString(config_prefix + ".password", "");
default_database = config.getString(config_prefix + ".default_database", ""); default_database = config.getString(config_prefix + ".default_database", "");
secure = config.getBool(config_prefix + ".secure", false) ? Protocol::Secure::Enable : Protocol::Secure::Disable; secure = config.getBool(config_prefix + ".secure", false) ? Protocol::Secure::Enable : Protocol::Secure::Disable;
compression = config.getBool(config_prefix + ".compression", true) ? Protocol::Compression::Enable : Protocol::Compression::Disable;
priority = config.getInt(config_prefix + ".priority", 1); priority = config.getInt(config_prefix + ".priority", 1);
const char * port_type = secure == Protocol::Secure::Enable ? "tcp_port_secure" : "tcp_port"; const char * port_type = secure == Protocol::Secure::Enable ? "tcp_port_secure" : "tcp_port";
is_local = isLocal(config.getInt(port_type, 0)); is_local = isLocal(config.getInt(port_type, 0));
/// By default compression is disabled if address looks like localhost.
/// NOTE: it's still enabled when interacting with servers on different port, but we don't want to complicate the logic.
compression = config.getBool(config_prefix + ".compression", !is_local)
? Protocol::Compression::Enable : Protocol::Compression::Disable;
} }

View File

@ -231,7 +231,12 @@ namespace
block_in.readSuffix(); block_in.readSuffix();
} }
void writeRemoteConvert(const DistributedHeader & distributed_header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log) void writeRemoteConvert(
const DistributedHeader & distributed_header,
RemoteBlockOutputStream & remote,
bool compression_expected,
ReadBufferFromFile & in,
Poco::Logger * log)
{ {
if (!remote.getHeader()) if (!remote.getHeader())
{ {
@ -262,6 +267,14 @@ namespace
return; return;
} }
/// If connection does not use compression, we have to uncompress the data.
if (!compression_expected)
{
writeAndConvert(remote, in);
return;
}
/// Otherwise write data as it was already prepared (more efficient path).
CheckingCompressedReadBuffer checking_in(in); CheckingCompressedReadBuffer checking_in(in);
remote.writePrepared(checking_in); remote.writePrepared(checking_in);
} }
@ -545,7 +558,8 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
distributed_header.insert_settings, distributed_header.insert_settings,
distributed_header.client_info}; distributed_header.client_info};
remote.writePrefix(); remote.writePrefix();
writeRemoteConvert(distributed_header, remote, in, log); bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
writeRemoteConvert(distributed_header, remote, compression_expected, in, log);
remote.writeSuffix(); remote.writeSuffix();
} }
catch (const Exception & e) catch (const Exception & e)
@ -690,7 +704,8 @@ struct StorageDistributedDirectoryMonitor::Batch
distributed_header.client_info); distributed_header.client_info);
remote->writePrefix(); remote->writePrefix();
} }
writeRemoteConvert(distributed_header, *remote, in, parent.log); bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log);
} }
if (remote) if (remote)

View File

@ -1,2 +1,2 @@
SET max_memory_usage = 1; SET max_memory_usage = 1, max_untracked_memory = 1000000;
select 'test', count(*) from zeros_mt(1000000) where not ignore(zero); -- { serverError 241 } select 'test', count(*) from zeros_mt(1000000) where not ignore(zero); -- { serverError 241 }