diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index d81c3b42bc0..fa6b94017fe 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -1236,6 +1236,17 @@ void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline) /// If there are several streams, then we merge them into one if (pipeline.hasMoreThanOneStream()) { + /// Unify streams in case they have different headers. + auto first_header = pipeline.streams.at(0)->getHeader(); + for (size_t i = 1; i < pipeline.streams.size(); ++i) + { + auto & stream = pipeline.streams[i]; + auto header = stream->getHeader(); + auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name; + if (!blocksHaveEqualStructure(first_header, header)) + stream = std::make_shared(context, stream, first_header, mode); + } + /** MergingSortedBlockInputStream reads the sources sequentially. * To make the data on the remote servers prepared in parallel, we wrap it in AsynchronousBlockInputStream. */ diff --git a/dbms/tests/integration/test_block_structure_mismatch/test.py b/dbms/tests/integration/test_block_structure_mismatch/test.py index 4262c312fe4..44989c53da5 100644 --- a/dbms/tests/integration/test_block_structure_mismatch/test.py +++ b/dbms/tests/integration/test_block_structure_mismatch/test.py @@ -15,8 +15,9 @@ def started_cluster(): try: cluster.start() - node1.query(''' -CREATE TABLE local_test ON CLUSTER testcluster ( + for node in (node1, node2): + node.query(''' +CREATE TABLE local_test ( t UInt64, date Date MATERIALIZED toDate(t/1000), shard UInt64, @@ -26,19 +27,17 @@ CREATE TABLE local_test ON CLUSTER testcluster ( PARTITION BY toRelativeDayNum(date) ORDER BY (t) SETTINGS index_granularity=8192 - ''') + ''') - node1.query(''' -CREATE TABLE dist_test ON CLUSTER testcluster ( + node.query(''' +CREATE TABLE dist_test ( t UInt64, shard UInt64, date Date MATERIALIZED toDate(t/1000), col1 String, col2 String ) Engine = Distributed(testcluster, default, local_test, shard) - ''') - - time.sleep(0.5) + ''') yield cluster @@ -47,5 +46,5 @@ CREATE TABLE dist_test ON CLUSTER testcluster ( def test(started_cluster): node1.query("INSERT INTO dist_test (t, shard, col1, col2) VALUES (1000, 1, 'foo', 'bar'), (1000, 2, 'x', 'y')") - time.sleep(3) + #time.sleep(3) assert node1.query("SELECT col1, col2 FROM dist_test WHERE (t < 3600000) AND (col1 = 'foo') ORDER BY t ASC") == "foo\tbar\n"