Fix block structure mismatch in MergingSorted stream.

This commit is contained in:
Nikolai Kochetov 2018-11-06 13:11:37 +03:00
parent 5367b1b8d0
commit d7992b11d8
2 changed files with 19 additions and 9 deletions

View File

@ -1236,6 +1236,17 @@ void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
/// If there are several streams, then we merge them into one /// If there are several streams, then we merge them into one
if (pipeline.hasMoreThanOneStream()) if (pipeline.hasMoreThanOneStream())
{ {
/// Unify streams in case they have different headers.
auto first_header = pipeline.streams.at(0)->getHeader();
for (size_t i = 1; i < pipeline.streams.size(); ++i)
{
auto & stream = pipeline.streams[i];
auto header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(first_header, header))
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, first_header, mode);
}
/** MergingSortedBlockInputStream reads the sources sequentially. /** MergingSortedBlockInputStream reads the sources sequentially.
* To make the data on the remote servers prepared in parallel, we wrap it in AsynchronousBlockInputStream. * To make the data on the remote servers prepared in parallel, we wrap it in AsynchronousBlockInputStream.
*/ */

View File

@ -15,8 +15,9 @@ def started_cluster():
try: try:
cluster.start() cluster.start()
node1.query(''' for node in (node1, node2):
CREATE TABLE local_test ON CLUSTER testcluster ( node.query('''
CREATE TABLE local_test (
t UInt64, t UInt64,
date Date MATERIALIZED toDate(t/1000), date Date MATERIALIZED toDate(t/1000),
shard UInt64, shard UInt64,
@ -26,19 +27,17 @@ CREATE TABLE local_test ON CLUSTER testcluster (
PARTITION BY toRelativeDayNum(date) PARTITION BY toRelativeDayNum(date)
ORDER BY (t) ORDER BY (t)
SETTINGS index_granularity=8192 SETTINGS index_granularity=8192
''') ''')
node1.query(''' node.query('''
CREATE TABLE dist_test ON CLUSTER testcluster ( CREATE TABLE dist_test (
t UInt64, t UInt64,
shard UInt64, shard UInt64,
date Date MATERIALIZED toDate(t/1000), date Date MATERIALIZED toDate(t/1000),
col1 String, col1 String,
col2 String col2 String
) Engine = Distributed(testcluster, default, local_test, shard) ) Engine = Distributed(testcluster, default, local_test, shard)
''') ''')
time.sleep(0.5)
yield cluster yield cluster
@ -47,5 +46,5 @@ CREATE TABLE dist_test ON CLUSTER testcluster (
def test(started_cluster): def test(started_cluster):
node1.query("INSERT INTO dist_test (t, shard, col1, col2) VALUES (1000, 1, 'foo', 'bar'), (1000, 2, 'x', 'y')") node1.query("INSERT INTO dist_test (t, shard, col1, col2) VALUES (1000, 1, 'foo', 'bar'), (1000, 2, 'x', 'y')")
time.sleep(3) #time.sleep(3)
assert node1.query("SELECT col1, col2 FROM dist_test WHERE (t < 3600000) AND (col1 = 'foo') ORDER BY t ASC") == "foo\tbar\n" assert node1.query("SELECT col1, col2 FROM dist_test WHERE (t < 3600000) AND (col1 = 'foo') ORDER BY t ASC") == "foo\tbar\n"