Merge pull request #10135 from azat/distributed-insert-converting

Convert blocks if structure does not match on INSERT into Distributed()
This commit is contained in:
alexey-milovidov 2020-04-09 05:51:36 +03:00 committed by GitHub
commit 6ff4c3aa16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 11 deletions

View File

@ -12,6 +12,8 @@
#include <IO/WriteBufferFromString.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/RemoteBlockOutputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/createBlockSelector.h>
#include <Interpreters/ExpressionActions.h>
@ -59,6 +61,26 @@ namespace ErrorCodes
extern const int CANNOT_LINK;
}
static void writeBlockConvert(const Context & context, const BlockOutputStreamPtr & out, const Block & block, const size_t repeats)
{
if (!blocksHaveEqualStructure(out->getHeader(), block))
{
ConvertingBlockInputStream convert(context,
std::make_shared<OneBlockInputStream>(block),
out->getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
for (size_t i = 0; i < repeats; ++i)
out->write(adopted_block);
}
else
{
for (size_t i = 0; i < repeats; ++i)
out->write(block);
}
}
DistributedBlockOutputStream::DistributedBlockOutputStream(
const Context & context_, StorageDistributed & storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_,
@ -306,14 +328,12 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
InterpreterInsertQuery interp(query_ast, *job.local_context);
auto block_io = interp.execute();
assertBlocksHaveEqualStructure(block_io.out->getHeader(), shard_block, "flushing shard block for " + storage.getStorageID().getNameForLogs());
job.stream = block_io.out;
job.stream->writePrefix();
}
size_t num_repetitions = shard_info.getLocalNodeCount();
for (size_t i = 0; i < num_repetitions; ++i)
job.stream->write(shard_block);
writeBlockConvert(context, job.stream, shard_block, shard_info.getLocalNodeCount());
}
job.blocks_written += 1;
@ -547,13 +567,8 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
auto block_io = interp.execute();
assertBlocksHaveEqualStructure(block_io.out->getHeader(), block, "flushing " + storage.getStorageID().getNameForLogs());
block_io.out->writePrefix();
for (size_t i = 0; i < repeats; ++i)
block_io.out->write(block);
writeBlockConvert(context, block_io.out, block, repeats);
block_io.out->writeSuffix();
}

View File

@ -4,6 +4,6 @@ DROP TABLE IF EXISTS underlying_00967;
CREATE TABLE dist_00967 (key UInt64) Engine=Distributed('test_shard_localhost', currentDatabase(), underlying_00967);
-- fails for TinyLog()/MergeTree()/... but not for Memory()
CREATE TABLE underlying_00967 (key Nullable(UInt64)) Engine=TinyLog();
INSERT INTO dist_00967 SELECT toUInt64(number) FROM system.numbers LIMIT 1; -- { serverError 171; }
INSERT INTO dist_00967 SELECT toUInt64(number) FROM system.numbers LIMIT 1;
SELECT * FROM dist_00967;