ISSUES-3134 fix merge and distributed engine query stage

This commit is contained in:
zhang2014 2018-09-20 13:40:06 +08:00
parent 8a8189c7e9
commit e0e805b1f6
5 changed files with 112 additions and 97 deletions

View File

@ -25,6 +25,8 @@
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <ext/range.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/queryToString.h>
namespace DB
@ -36,6 +38,7 @@ namespace ErrorCodes
extern const int INCOMPATIBLE_SOURCE_TABLES;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
}
@ -155,7 +158,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
if (table.get() != this)
{
++selected_table_size;
stage_in_source_tables = table->getQueryProcessingStage(context);
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context));
}
}
@ -163,8 +166,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
iterator->next();
}
auto fetch_stage = std::max(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
return selected_table_size == 1 ? fetch_stage : QueryProcessingStage::WithMergeableState;
return selected_table_size == 1 ? stage_in_source_tables : std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
}
@ -208,7 +210,7 @@ BlockInputStreams StorageMerge::read(
if (selected_tables.empty())
return createSourceStreams(
query_info, processed_stage, max_block_size, modified_context, header, {}, {}, real_column_names, 0, has_table_virtual_column);
query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column);
size_t remaining_streams = num_streams;
size_t tables_count = selected_tables.size();
@ -218,6 +220,7 @@ BlockInputStreams StorageMerge::read(
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
size_t current_streams = std::min(current_need_streams, remaining_streams);
remaining_streams -= current_streams;
current_streams = std::max(1, current_streams);
StoragePtr storage = it->first;
TableStructureReadLockPtr struct_lock = it->second;
@ -227,25 +230,23 @@ BlockInputStreams StorageMerge::read(
if (current_streams)
{
source_streams = createSourceStreams(
query_info, processed_stage, max_block_size, modified_context, header, storage,
struct_lock, real_column_names, current_streams, has_table_virtual_column);
query_info, processed_stage, max_block_size, header, storage,
struct_lock, real_column_names, modified_context, current_streams, has_table_virtual_column);
}
else
{
source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(header, [=, &real_column_names]() -> BlockInputStreamPtr
{
BlockInputStreams streams = createSourceStreams(query_info, processed_stage, max_block_size,
modified_context, header, storage,
struct_lock, real_column_names, current_streams, has_table_virtual_column);
source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(
header, [=, &real_column_names, &modified_context]() -> BlockInputStreamPtr
{
BlockInputStreams streams = createSourceStreams(query_info, processed_stage, max_block_size,
header, storage, struct_lock, real_column_names,
modified_context, current_streams, has_table_virtual_column, true);
if (streams.empty())
return std::make_shared<NullBlockInputStream>(header);
if (!streams.empty() && streams.size() != 1)
throw Exception("LogicalError: the lazy stream size must to be one or empty.", ErrorCodes::LOGICAL_ERROR);
if (streams.size() != 1)
throw Exception("LogicalError: the lazy stream size must to be one.", ErrorCodes::LOGICAL_ERROR);
return streams[0];
}));
return streams.empty() ? std::make_shared<NullBlockInputStream>(header) : streams[0];
}));
}
res.insert(res.end(), source_streams.begin(), source_streams.end());
@ -259,15 +260,15 @@ BlockInputStreams StorageMerge::read(
}
BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size, const Context & modified_context,
const Block & header, const StoragePtr & storage,
const size_t max_block_size, const Block & header, const StoragePtr & storage,
const TableStructureReadLockPtr & struct_lock, Names & real_column_names,
size_t current_streams, bool has_table_virtual_column)
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams)
{
SelectQueryInfo modified_query_info;
modified_query_info.sets = query_info.sets;
modified_query_info.query = query_info.query->clone();
modified_query_info.prewhere_info = query_info.prewhere_info;
modified_query_info.sets = query_info.sets;
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", storage ? storage->getTableName() : "");
@ -285,13 +286,16 @@ BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & quer
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical()));
source_streams = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size,
current_streams);
UInt32(streams_num));
}
else if (processed_stage > storage->getQueryProcessingStage(modified_context))
{
typeid_cast<ASTSelectQuery *>(modified_query_info.query.get())->replaceDatabaseAndTable(source_database, storage->getTableName());
/// TODO: set num_streams
/// Maximum permissible parallelism is streams_num
modified_context.getSettingsRef().max_threads = UInt64(streams_num);
modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1;
InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, Names{}, processed_stage};
BlockInputStreamPtr interpreter_stream = interpreter.execute().in;
@ -304,7 +308,7 @@ BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & quer
if (!source_streams.empty())
{
if (!current_streams)
if (concat_streams)
{
BlockInputStreamPtr stream =
source_streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(source_streams) : source_streams[0];
@ -321,7 +325,7 @@ BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & quer
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, modified_context, storage, modified_query_info, source_stream, processed_stage);
convertingSourceStream(header, modified_context, modified_query_info.query, source_stream, processed_stage);
source_stream->addTableLock(struct_lock);
}
@ -429,59 +433,43 @@ Block StorageMerge::getQueryHeader(
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
processed_stage, true).getSampleBlock());
}
throw Exception("Logical Error: unknow processed stage.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
}
void StorageMerge::convertingSourceStream(const Block & header, const Context & context, const StoragePtr & storage,
const SelectQueryInfo & query_info, BlockInputStreamPtr & source_stream,
QueryProcessingStage::Enum processed_stage)
void StorageMerge::convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage)
{
Block before_convert_header = source_stream->getHeader();
Block before_block_header = source_stream->getHeader();
source_stream = std::make_shared<ConvertingBlockInputStream>(context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
Block after_convert_header = source_stream->getHeader();
bool different_header = false;
ASTPtr where_expression = typeid_cast<ASTSelectQuery *>(query.get())->where_expression;
for (size_t column_index : ext::range(0, before_convert_header.columns()))
{
ColumnWithTypeAndName before_column = before_convert_header.getByPosition(column_index);
if (after_convert_header.has(before_column.name))
{
ColumnWithTypeAndName after_column = after_convert_header.getByName(before_column.name);
if (before_column.type != after_column.type)
{
different_header = true;
break;
}
}
}
if (!different_header)
if (!where_expression)
return;
/// HACK: Re-run filter stream for align processed_stage
if (processed_stage > QueryProcessingStage::FetchColumns)
for (size_t column_index : ext::range(0, header.columns()))
{
ASTSelectQuery * select_query = typeid_cast<ASTSelectQuery *>(query_info.query.get());
if (select_query->where_expression)
ColumnWithTypeAndName header_column = header.getByPosition(column_index);
ColumnWithTypeAndName before_column = before_block_header.getByName(header_column.name);
/// If the processed_stage greater than FetchColumns and the block structure between streams is different.
/// the where expression maybe invalid because of convertingBlockInputStream.
/// So we need to throw exception.
if (!header_column.type->equals(*before_column.type.get()) && processed_stage > QueryProcessingStage::FetchColumns)
{
Block expression_input = header.cloneEmpty();
NameAndTypePair name_and_type = getColumn("_table");
expression_input.insert({ name_and_type.type->createColumn(), name_and_type.type, name_and_type.name });
ExpressionAnalyzer analyzer = ExpressionAnalyzer{select_query->where_expression, context, {}, expression_input.getNamesAndTypesList()};
NamesAndTypesList source_columns = getSampleBlock().getNamesAndTypesList();
NameAndTypePair virtual_column = getColumn("_table");
source_columns.insert(source_columns.end(), virtual_column);
ExpressionActionsPtr actions = ExpressionAnalyzer{where_expression, context, {}, source_columns}.getActions(false, false);
Names required_columns = actions->getRequiredColumns();
ExpressionActionsPtr where_actions = analyzer.getActions(false, false);
if (!after_convert_header.has("_table"))
source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
source_stream, std::make_shared<DataTypeString>(), storage->getTableName(), "_table");
source_stream = std::make_shared<FilterBlockInputStream>(
source_stream, where_actions, select_query->where_expression->getColumnName(), true);
if (!after_convert_header.has("_table") && !header.has("_table"))
source_stream = std::make_shared<ConvertingBlockInputStream>(context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
for (const auto required_column : required_columns)
{
if (required_column == header_column.name)
throw Exception("Block structure mismatch in Merge Storage: different types:\n" + before_block_header.dumpStructure()
+ "\n" + header.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
}
}
}
}

View File

@ -75,13 +75,13 @@ protected:
const Context & context, QueryProcessingStage::Enum processed_stage);
BlockInputStreams createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size, const Context & modified_context,
const Block & header, const StoragePtr & storage, const TableStructureReadLockPtr & struct_lock,
Names & real_column_names, size_t current_streams, bool has_table_virtual_column);
const size_t max_block_size, const Block & header, const StoragePtr & storage,
const TableStructureReadLockPtr & struct_lock, Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams = false);
void convertingSourceStream(const Block & header, const Context & context, const StoragePtr & storage,
const SelectQueryInfo & query_info, BlockInputStreamPtr & source_stream,
QueryProcessingStage::Enum processed_stage);
void convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage);
};
}

View File

@ -1,4 +1,4 @@
UInt32 | UInt64
UInt32 | UInt64
= 1:
1
1
@ -10,24 +10,24 @@
4294967290
4294967299:
4294967299
Int64 | UInt64
Int64 | UInt64
1:
1
1
-1:
Int32 | UInt64
-1:
Int32 | UInt64
1
1
2147483650
String | FixedString(16)
String | FixedString(16)
1
1
DateTime | UInt64
DateTime | UInt64
1
1
Array(UInt32) | Array(UInt64)
[1] [0]
[1] [0]
[4294967290] [4294967290]
[4294967290] [4294967290]
[4294967299] [4294967299]
Array(UInt32) | Array(UInt64)
[1] [0]
[1] [0]
[4294967290] [4294967290]
[4294967290] [4294967290]
[4294967299] [4294967299]

View File

@ -4,8 +4,6 @@
100 test_local_1
2018-08-01 100
2018-08-01 100
2018-08-01 100
2018-08-01 100
--------------Single Distributed------------
2018-08-01 100
2018-08-01 100 test_distributed_1
@ -23,9 +21,6 @@
200 test_local_2
2018-08-01 100
2018-08-01 100
2018-08-01 100
2018-08-01 200
2018-08-01 100
2018-08-01 200
--------------Local Merge Distributed------------
2018-08-01 200
@ -53,3 +48,9 @@
2018-08-01 200
2018-08-01 100
2018-08-01 200
--------------Implicit type conversion------------
2018-08-01 -1
2018-08-01 1
2018-08-01 -1
2018-08-01 1
2018-08-01 1

View File

@ -1,3 +1,5 @@
SET send_logs_level = 'none';
DROP TABLE IF EXISTS test.test_local_1;
DROP TABLE IF EXISTS test.test_local_2;
DROP TABLE IF EXISTS test.test_distributed_1;
@ -16,9 +18,9 @@ SELECT * FROM merge('test', 'test_local_1');
SELECT *, _table FROM merge('test', 'test_local_1') ORDER BY _table;
SELECT sum(value), _table FROM merge('test', 'test_local_1') GROUP BY _table ORDER BY _table;
SELECT * FROM merge('test', 'test_local_1') WHERE _table = 'test_local_1';
SELECT * FROM merge('test', 'test_local_1') PREWHERE _table = 'test_local_1';
SELECT * FROM merge('test', 'test_local_1') PREWHERE _table = 'test_local_1'; -- { serverError 8 }
SELECT * FROM merge('test', 'test_local_1') WHERE _table in ('test_local_1', 'test_local_2');
SELECT * FROM merge('test', 'test_local_1') PREWHERE _table in ('test_local_1', 'test_local_2');
SELECT * FROM merge('test', 'test_local_1') PREWHERE _table in ('test_local_1', 'test_local_2'); -- { serverError 8 }
SELECT '--------------Single Distributed------------';
SELECT * FROM merge('test', 'test_distributed_1');
@ -30,16 +32,16 @@ SELECT * FROM merge('test', 'test_distributed_1') WHERE _table in ('test_distrib
SELECT * FROM merge('test', 'test_distributed_1') PREWHERE _table in ('test_distributed_1', 'test_distributed_2');
SELECT '--------------Local Merge Local------------';
SELECT * FROM merge('test', 'test_local_1|test_local_2');
SELECT * FROM merge('test', 'test_local_1|test_local_2') ORDER BY _table;
SELECT *, _table FROM merge('test', 'test_local_1|test_local_2') ORDER BY _table;
SELECT sum(value), _table FROM merge('test', 'test_local_1|test_local_2') GROUP BY _table ORDER BY _table;
SELECT * FROM merge('test', 'test_local_1|test_local_2') WHERE _table = 'test_local_1';
SELECT * FROM merge('test', 'test_local_1|test_local_2') PREWHERE _table = 'test_local_1';
SELECT * FROM merge('test', 'test_local_1|test_local_2') PREWHERE _table = 'test_local_1'; -- {serverError 8}
SELECT * FROM merge('test', 'test_local_1|test_local_2') WHERE _table in ('test_local_1', 'test_local_2') ORDER BY value;
SELECT * FROM merge('test', 'test_local_1|test_local_2') PREWHERE _table in ('test_local_1', 'test_local_2') ORDER BY value;
SELECT * FROM merge('test', 'test_local_1|test_local_2') PREWHERE _table in ('test_local_1', 'test_local_2') ORDER BY value; -- {serverError 8}
SELECT '--------------Local Merge Distributed------------';
SELECT * FROM merge('test', 'test_local_1|test_distributed_2');
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') ORDER BY _table;
SELECT *, _table FROM merge('test', 'test_local_1|test_distributed_2') ORDER BY _table;
SELECT sum(value), _table FROM merge('test', 'test_local_1|test_distributed_2') GROUP BY _table ORDER BY _table;
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') WHERE _table = 'test_local_1';
@ -48,7 +50,7 @@ SELECT * FROM merge('test', 'test_local_1|test_distributed_2') WHERE _table in (
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') PREWHERE _table in ('test_local_1', 'test_distributed_2') ORDER BY value;
SELECT '--------------Distributed Merge Distributed------------';
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2');
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') ORDER BY _table;
SELECT *, _table FROM merge('test', 'test_distributed_1|test_distributed_2') ORDER BY _table;
SELECT sum(value), _table FROM merge('test', 'test_distributed_1|test_distributed_2') GROUP BY _table ORDER BY _table;
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') WHERE _table = 'test_distributed_1';
@ -60,3 +62,27 @@ DROP TABLE IF EXISTS test.test_local_1;
DROP TABLE IF EXISTS test.test_local_2;
DROP TABLE IF EXISTS test.test_distributed_1;
DROP TABLE IF EXISTS test.test_distributed_2;
DROP TABLE IF EXISTS test.test_u64_local;
DROP TABLE IF EXISTS test.test_s64_local;
DROP TABLE IF EXISTS test.test_u64_distributed;
DROP TABLE IF EXISTS test.test_s64_distributed;
CREATE TABLE test.test_s64_local (date Date, value Int64) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE test.test_u64_local (date Date, value UInt64) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE test.test_s64_distributed AS test.test_s64_local ENGINE = Distributed('test_shard_localhost', 'test', test_s64_local, rand());
CREATE TABLE test.test_u64_distributed AS test.test_u64_local ENGINE = Distributed('test_shard_localhost', 'test', test_u64_local, rand());
INSERT INTO test.test_s64_local VALUES ('2018-08-01', -1);
INSERT INTO test.test_u64_local VALUES ('2018-08-01', 1);
SELECT '--------------Implicit type conversion------------';
SELECT * FROM merge('test', 'test_s64_distributed|test_u64_distributed') ORDER BY value;
SELECT * FROM merge('test', 'test_s64_distributed|test_u64_distributed') WHERE date = '2018-08-01' ORDER BY value;
SELECT * FROM merge('test', 'test_s64_distributed|test_u64_distributed') WHERE _table = 'test_u64_distributed' ORDER BY value;
SELECT * FROM merge('test', 'test_s64_distributed|test_u64_distributed') WHERE value = 1; -- { serverError 171}
DROP TABLE IF EXISTS test.test_u64_local;
DROP TABLE IF EXISTS test.test_s64_local;
DROP TABLE IF EXISTS test.test_u64_distributed;
DROP TABLE IF EXISTS test.test_s64_distributed;