Fix the issue with async Distributed INSERTs and network_compression_method #18741

This commit is contained in:
Alexey Milovidov 2021-01-06 03:24:42 +03:00
parent 176358f0b4
commit 438f0f971b
4 changed files with 25 additions and 4 deletions

View File

@ -16,8 +16,8 @@ private:
bool nextImpl() override;
public:
CompressedReadBuffer(ReadBuffer & in_)
: CompressedReadBufferBase(&in_), BufferWithOwnMemory<ReadBuffer>(0)
CompressedReadBuffer(ReadBuffer & in_, bool allow_different_codecs_ = false)
: CompressedReadBufferBase(&in_, allow_different_codecs_), BufferWithOwnMemory<ReadBuffer>(0)
{
}

View File

@ -1181,7 +1181,7 @@ void TCPHandler::receiveUnexpectedData()
std::shared_ptr<ReadBuffer> maybe_compressed_in;
if (last_block_in.compression == Protocol::Compression::Enable)
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
else
maybe_compressed_in = in;
@ -1198,8 +1198,11 @@ void TCPHandler::initBlockInput()
{
if (!state.block_in)
{
/// 'allow_different_codecs' is set to true, because some parts of compressed data can be precompressed in advance
/// with another codec that the rest of the data. Example: data sent by Distributed tables.
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
state.maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
else
state.maybe_compressed_in = in;

View File

@ -0,0 +1,16 @@
DROP TABLE IF EXISTS local;
DROP TABLE IF EXISTS distributed;
CREATE TABLE local (x UInt8) ENGINE = Memory;
CREATE TABLE distributed AS local ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), local, x);
SET insert_distributed_sync = 0, network_compression_method = 'zstd';
INSERT INTO distributed SELECT number FROM numbers(256);
SYSTEM FLUSH DISTRIBUTED distributed;
SELECT count() FROM local;
SELECT count() FROM distributed;
DROP TABLE local;
DROP TABLE distributed;