ISSUES-3134 fix merge and distributed engine query stage

This commit is contained in:
zhang2014 2018-09-19 18:16:30 +08:00
parent 80d83bb50f
commit 8a8189c7e9
4 changed files with 108 additions and 44 deletions

View File

@ -23,6 +23,8 @@
#include <Databases/IDatabase.h>
#include <Interpreters/SettingsCommon.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <ext/range.h>
namespace DB
@ -205,10 +207,8 @@ BlockInputStreams StorageMerge::read(
StorageListWithLocks selected_tables = getSelectedTables(query_info.query, has_table_virtual_column, true);
if (selected_tables.empty())
{
BlockInputStreams streams{std::make_shared<NullBlockInputStream>(header)};
return streams;
}
return createSourceStreams(
query_info, processed_stage, max_block_size, modified_context, header, {}, {}, real_column_names, 0, has_table_virtual_column);
size_t remaining_streams = num_streams;
size_t tables_count = selected_tables.size();
@ -238,6 +238,9 @@ BlockInputStreams StorageMerge::read(
modified_context, header, storage,
struct_lock, real_column_names, current_streams, has_table_virtual_column);
if (streams.empty())
return std::make_shared<NullBlockInputStream>(header);
if (streams.size() != 1)
throw Exception("LogicalError: the lazy stream size must to be one.", ErrorCodes::LOGICAL_ERROR);
@ -263,10 +266,16 @@ BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & quer
{
SelectQueryInfo modified_query_info;
modified_query_info.query = query_info.query->clone();
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", storage->getTableName());
modified_query_info.prewhere_info = query_info.prewhere_info;
modified_query_info.sets = query_info.sets;
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", storage ? storage->getTableName() : "");
if (!storage)
return BlockInputStreams{
InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header),
processed_stage, true).execute().in};
BlockInputStreams source_streams;
if (processed_stage <= storage->getQueryProcessingStage(modified_context))
@ -293,34 +302,31 @@ BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & quer
source_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(interpreter_stream));
}
if (source_streams.empty())
if (!source_streams.empty())
{
source_streams.emplace_back(std::make_shared<OneBlockInputStream>(header));
return source_streams;
if (!current_streams)
{
BlockInputStreamPtr stream =
source_streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(source_streams) : source_streams[0];
source_streams.resize(1);
source_streams[0] = stream;
}
for (BlockInputStreamPtr & source_stream : source_streams)
{
if (has_table_virtual_column)
source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
source_stream, std::make_shared<DataTypeString>(), storage->getTableName(), "_table");
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, modified_context, storage, modified_query_info, source_stream, processed_stage);
source_stream->addTableLock(struct_lock);
}
}
if (!current_streams)
{
BlockInputStreamPtr stream =
source_streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(source_streams) : source_streams[0];
source_streams.resize(1);
source_streams[0] = stream;
}
for (BlockInputStreamPtr & source_stream : source_streams)
{
if (has_table_virtual_column)
source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
source_stream, std::make_shared<DataTypeString>(), storage->getTableName(), "_table");
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
source_stream = std::make_shared<ConvertingBlockInputStream>(
modified_context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
source_stream->addTableLock(struct_lock);
}
return source_streams;
}
@ -361,8 +367,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr
StoragePtr storage = iterator->table();
if (query && typeid_cast<ASTSelectQuery *>(query.get())->prewhere_expression && !storage->supportsPrewhere())
throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE.",
ErrorCodes::ILLEGAL_PREWHERE);
throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE.", ErrorCodes::ILLEGAL_PREWHERE);
if (storage.get() != this)
{
@ -427,6 +432,59 @@ Block StorageMerge::getQueryHeader(
throw Exception("Logical Error: unknow processed stage.", ErrorCodes::LOGICAL_ERROR);
}
void StorageMerge::convertingSourceStream(const Block & header, const Context & context, const StoragePtr & storage,
const SelectQueryInfo & query_info, BlockInputStreamPtr & source_stream,
QueryProcessingStage::Enum processed_stage)
{
Block before_convert_header = source_stream->getHeader();
source_stream = std::make_shared<ConvertingBlockInputStream>(context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
Block after_convert_header = source_stream->getHeader();
bool different_header = false;
for (size_t column_index : ext::range(0, before_convert_header.columns()))
{
ColumnWithTypeAndName before_column = before_convert_header.getByPosition(column_index);
if (after_convert_header.has(before_column.name))
{
ColumnWithTypeAndName after_column = after_convert_header.getByName(before_column.name);
if (before_column.type != after_column.type)
{
different_header = true;
break;
}
}
}
if (!different_header)
return;
/// HACK: Re-run filter stream for align processed_stage
if (processed_stage > QueryProcessingStage::FetchColumns)
{
ASTSelectQuery * select_query = typeid_cast<ASTSelectQuery *>(query_info.query.get());
if (select_query->where_expression)
{
Block expression_input = header.cloneEmpty();
NameAndTypePair name_and_type = getColumn("_table");
expression_input.insert({ name_and_type.type->createColumn(), name_and_type.type, name_and_type.name });
ExpressionAnalyzer analyzer = ExpressionAnalyzer{select_query->where_expression, context, {}, expression_input.getNamesAndTypesList()};
ExpressionActionsPtr where_actions = analyzer.getActions(false, false);
if (!after_convert_header.has("_table"))
source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
source_stream, std::make_shared<DataTypeString>(), storage->getTableName(), "_table");
source_stream = std::make_shared<FilterBlockInputStream>(
source_stream, where_actions, select_query->where_expression->getColumnName(), true);
if (!after_convert_header.has("_table") && !header.has("_table"))
source_stream = std::make_shared<ConvertingBlockInputStream>(context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
}
}
}
void registerStorageMerge(StorageFactory & factory)
{

View File

@ -78,6 +78,10 @@ protected:
const size_t max_block_size, const Context & modified_context,
const Block & header, const StoragePtr & storage, const TableStructureReadLockPtr & struct_lock,
Names & real_column_names, size_t current_streams, bool has_table_virtual_column);
void convertingSourceStream(const Block & header, const Context & context, const StoragePtr & storage,
const SelectQueryInfo & query_info, BlockInputStreamPtr & source_stream,
QueryProcessingStage::Enum processed_stage);
};
}

View File

@ -1,4 +1,4 @@
UInt32 | UInt64
UInt32 | UInt64
= 1:
1
1
@ -10,25 +10,24 @@
4294967290
4294967299:
4294967299
Int64 | UInt64
Int64 | UInt64
1:
1
1
-1:
18446744073709551615
Int32 | UInt64
Int32 | UInt64
1
1
2147483650
String | FixedString(16)
String | FixedString(16)
1
1
DateTime | UInt64
DateTime | UInt64
1
1
Array(UInt32) | Array(UInt64)
[1] [0]
[1] [0]
[4294967290] [4294967290]
[4294967290] [4294967290]
[4294967299] [4294967299]
Array(UInt32) | Array(UInt64)
[1] [0]
[1] [0]
[4294967290] [4294967290]
[4294967290] [4294967290]
[4294967299] [4294967299]