Merge pull request #3453 from yandex/fix-union-header

Fix union header
This commit is contained in:
alexey-milovidov 2018-10-23 23:18:36 +03:00 committed by GitHub
commit 165890a3e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 0 deletions

View File

@ -49,6 +49,7 @@
#include <Parsers/queryToString.h>
#include <ext/map.h>
#include <memory>
#include <DataStreams/ConvertingBlockInputStream.h>
namespace DB
@ -1293,6 +1294,17 @@ void InterpreterSelectQuery::executeUnion(Pipeline & pipeline)
/// If there are still several streams, then we combine them into one
if (pipeline.hasMoreThanOneStream())
{
/// Unify streams in case they have different headers.
auto first_header = pipeline.streams.at(0)->getHeader();
for (size_t i = 1; i < pipeline.streams.size(); ++i)
{
auto & stream = pipeline.streams[i];
auto header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(first_header, header))
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, first_header, mode);
}
pipeline.firstStream() = std::make_shared<UnionBlockInputStream<>>(pipeline.streams, pipeline.stream_with_non_joined_data, max_streams);
pipeline.stream_with_non_joined_data = nullptr;
pipeline.streams.resize(1);

View File

@ -0,0 +1,18 @@
<yandex>
<remote_servers>
<two_shards>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</two_shards>
</remote_servers>
</yandex>

View File

@ -0,0 +1,44 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in (node1, node2):
node.query('''
CREATE TABLE default.t1_local
(
event_date Date DEFAULT toDate(event_time),
event_time DateTime,
log_type UInt32,
account_id String
)
ENGINE = MergeTree(event_date, (event_time, account_id), 8192);
''')
node.query('''
CREATE TABLE default.t1 AS default.t1_local
ENGINE = Distributed('two_shards', 'default', 't1_local', rand());
''')
yield cluster
finally:
cluster.shutdown()
def test_read(started_cluster):
assert node1.query('''SELECT event_date, event_time, log_type
FROM default.t1
WHERE (log_type = 30305) AND (account_id = '111111')
LIMIT 1''').strip() == ''