diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 2aba27dfc67..af17a026927 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -305,7 +305,9 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp job.local_context = std::make_unique(context); InterpreterInsertQuery interp(query_ast, *job.local_context); - job.stream = interp.execute().out; + auto block_io = interp.execute(); + assertBlocksHaveEqualStructure(block_io.out->getHeader(), shard_block, "flushing shard block for " + storage.getStorageID().getNameForLogs()); + job.stream = block_io.out; job.stream->writePrefix(); } @@ -544,6 +546,9 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_ InterpreterInsertQuery interp(query_ast, context); auto block_io = interp.execute(); + + assertBlocksHaveEqualStructure(block_io.out->getHeader(), block, "flushing " + storage.getStorageID().getNameForLogs()); + block_io.out->writePrefix(); for (size_t i = 0; i < repeats; ++i) diff --git a/tests/queries/0_stateless/00967_insert_into_distributed_different_types.reference b/tests/queries/0_stateless/00967_insert_into_distributed_different_types.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/00967_insert_into_distributed_different_types.sql b/tests/queries/0_stateless/00967_insert_into_distributed_different_types.sql new file mode 100644 index 00000000000..6b23c72981a --- /dev/null +++ b/tests/queries/0_stateless/00967_insert_into_distributed_different_types.sql @@ -0,0 +1,9 @@ +DROP TABLE IF EXISTS dist_00967; +DROP TABLE IF EXISTS underlying_00967; + +CREATE TABLE dist_00967 (key UInt64) Engine=Distributed('test_shard_localhost', currentDatabase(), underlying_00967); +-- fails for TinyLog()/MergeTree()/... but not for Memory() +CREATE TABLE underlying_00967 (key Nullable(UInt64)) Engine=TinyLog(); +INSERT INTO dist_00967 SELECT toUInt64(number) FROM system.numbers LIMIT 1; -- { serverError 171; } + +SELECT * FROM dist_00967;