Fix "Block structure mismatch" for INSERT into Distributed

Add missing conversion (via ConvertingBlockInputStream) for INSERT into
remote nodes (for sync insert, async insert and async batch insert),
like for local nodes (in DistributedBlockOutputStream::writeBlockConverted).

This is required when the structure of the Distributed table differs
from the structure of the local table.

And also add a warning message, to highlight this in logs (since this
works slower).

Fixes: #19888
This commit is contained in:
Azat Khuzhin 2021-02-01 21:02:36 +03:00
parent da367a500e
commit f53c9a6b25
4 changed files with 86 additions and 28 deletions

View File

@ -1,5 +1,7 @@
#include <DataStreams/RemoteBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/CurrentMetrics.h>
#include <Common/StringUtils/StringUtils.h>
@ -184,6 +186,37 @@ namespace
return disk->getDirectorySyncGuard(path);
return nullptr;
}
void writeRemoteConvert(const DistributedHeader & header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log)
{
if (remote.getHeader() && header.header != remote.getHeader().dumpStructure())
{
LOG_WARNING(log,
"Structure does not match (remote: {}, local: {}), implicit conversion will be done",
remote.getHeader().dumpStructure(), header.header);
CompressedReadBuffer decompressing_in(in);
/// Lack of header, requires to read blocks
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
block_in.readPrefix();
while (Block block = block_in.read())
{
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
remote.getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
remote.write(adopted_block);
}
block_in.readSuffix();
}
else
{
CheckingCompressedReadBuffer checking_in(in);
remote.writePrepared(checking_in);
}
}
}
@ -438,11 +471,8 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
auto connection = pool->get(timeouts, &header.insert_settings);
RemoteBlockOutputStream remote{*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info};
CheckingCompressedReadBuffer checking_in(in);
remote.writePrefix();
remote.writePrepared(checking_in);
writeRemoteConvert(header, remote, in, log);
remote.writeSuffix();
}
catch (const Exception & e)
@ -560,7 +590,6 @@ struct StorageDistributedDirectoryMonitor::Batch
try
{
std::unique_ptr<RemoteBlockOutputStream> remote;
bool first = true;
for (UInt64 file_idx : file_indices)
{
@ -575,16 +604,14 @@ struct StorageDistributedDirectoryMonitor::Batch
ReadBufferFromFile in(file_path->second);
const auto & header = readDistributedHeader(in, parent.log);
if (first)
if (!remote)
{
first = false;
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info);
remote->writePrefix();
}
CheckingCompressedReadBuffer checking_in(in);
remote->writePrepared(checking_in);
writeRemoteConvert(header, *remote, in, parent.log);
}
if (remote)

View File

@ -60,25 +60,27 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
}
static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & block, const size_t repeats)
{
if (!blocksHaveEqualStructure(out->getHeader(), block))
static Block adoptBlock(const Block & header, const Block & block, Poco::Logger * log)
{
if (blocksHaveEqualStructure(header, block))
return block;
LOG_WARNING(log,
"Structure does not match (remote: {}, local: {}), implicit conversion will be done.",
header.dumpStructure(), block.dumpStructure());
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
out->getHeader(),
header,
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
return convert.read();
}
static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & block, const size_t repeats, Poco::Logger * log)
{
Block adopted_block = adoptBlock(out->getHeader(), block, log);
for (size_t i = 0; i < repeats; ++i)
out->write(adopted_block);
}
else
{
for (size_t i = 0; i < repeats; ++i)
out->write(block);
}
}
DistributedBlockOutputStream::DistributedBlockOutputStream(
@ -343,7 +345,9 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
job.stream->write(shard_block);
Block adopted_shard_block = adoptBlock(job.stream->getHeader(), shard_block, log);
job.stream->write(adopted_shard_block);
}
else // local
{
@ -367,7 +371,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
job.stream->writePrefix();
}
writeBlockConvert(job.stream, shard_block, shard_info.getLocalNodeCount());
writeBlockConvert(job.stream, shard_block, shard_info.getLocalNodeCount(), log);
}
job.blocks_written += 1;
@ -589,7 +593,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
auto block_io = interp.execute();
block_io.out->writePrefix();
writeBlockConvert(block_io.out, block, repeats);
writeBlockConvert(block_io.out, block, repeats, log);
block_io.out->writeSuffix();
}

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS tmp_01683;
DROP TABLE IF EXISTS dist_01683;
SET prefer_localhost_replica=0;
-- To suppress "Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done."
SET send_logs_level='error';
CREATE TABLE tmp_01683 (n Int8) ENGINE=Memory;
CREATE TABLE dist_01683 (n UInt64) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01683, n);
SET insert_distributed_sync=1;
INSERT INTO dist_01683 VALUES (1),(2);
SET insert_distributed_sync=0;
INSERT INTO dist_01683 VALUES (1),(2);
SYSTEM FLUSH DISTRIBUTED dist_01683;
-- TODO: cover distributed_directory_monitor_batch_inserts=1
SELECT * FROM tmp_01683 ORDER BY n;
DROP TABLE tmp_01683;
DROP TABLE dist_01683;