Uncompress data in Distributed sends if needed

This commit is contained in:
Alexey Milovidov 2021-04-14 00:53:39 +03:00
parent e08c8a3d2c
commit 6f56c3280f
3 changed files with 20 additions and 4 deletions

View File

@ -139,6 +139,8 @@ public:
UInt16 getPort() const;
const String & getDefaultDatabase() const;
Protocol::Compression getCompression() const { return compression; }
/// If last flag is true, you need to call sendExternalTablesData after.
void sendQuery(
const ConnectionTimeouts & timeouts,

View File

@ -111,7 +111,6 @@ Cluster::Address::Address(
/// NOTE: it's still enabled when interacting with servers on different port, but we don't want to complicate the logic.
compression = config.getBool(config_prefix + ".compression", !is_local)
? Protocol::Compression::Enable : Protocol::Compression::Disable;
}

View File

@ -231,7 +231,12 @@ namespace
block_in.readSuffix();
}
void writeRemoteConvert(const DistributedHeader & distributed_header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log)
void writeRemoteConvert(
const DistributedHeader & distributed_header,
RemoteBlockOutputStream & remote,
bool compression_expected,
ReadBufferFromFile & in,
Poco::Logger * log)
{
if (!remote.getHeader())
{
@ -262,6 +267,14 @@ namespace
return;
}
/// If connection does not use compression, we have to uncompress the data.
if (!compression_expected)
{
writeAndConvert(remote, in);
return;
}
/// Otherwise write data as it was already prepared (more efficient path).
CheckingCompressedReadBuffer checking_in(in);
remote.writePrepared(checking_in);
}
@ -545,7 +558,8 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
distributed_header.insert_settings,
distributed_header.client_info};
remote.writePrefix();
writeRemoteConvert(distributed_header, remote, in, log);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
writeRemoteConvert(distributed_header, remote, compression_expected, in, log);
remote.writeSuffix();
}
catch (const Exception & e)
@ -690,7 +704,8 @@ struct StorageDistributedDirectoryMonitor::Batch
distributed_header.client_info);
remote->writePrefix();
}
writeRemoteConvert(distributed_header, *remote, in, parent.log);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log);
}
if (remote)