ISSUES-3134 fix merge and distributed engine query stage

This commit is contained in:
zhang2014 2018-09-18 19:09:21 +08:00
parent 8bfdcc6464
commit 882125dd6d
4 changed files with 261 additions and 129 deletions

View File

@ -21,6 +21,7 @@
#include <Columns/ColumnString.h>
#include <Common/typeid_cast.h>
#include <Databases/IDatabase.h>
#include <DataStreams/MaterializingBlockInputStream.h>
namespace DB
@ -138,8 +139,8 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
{
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
auto database = context.getDatabase(source_database);
auto iterator = database->getIterator(context);
DatabasePtr database = context.getDatabase(source_database);
DatabaseIteratorPtr iterator = database->getIterator(context);
size_t selected_table_size = 0;
@ -149,14 +150,18 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
{
auto & table = iterator->table();
if (table.get() != this)
{
++selected_table_size;
stage_in_source_tables = table->getQueryProcessingStage(context);
}
}
iterator->next();
}
auto fetch_or_mergeable_stage = std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
return selected_table_size == 1 ? stage_in_source_tables : fetch_or_mergeable_stage;
auto fetch_stage = std::max(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
return selected_table_size == 1 ? fetch_stage : QueryProcessingStage::WithMergeableState;
}
@ -184,34 +189,6 @@ BlockInputStreams StorageMerge::read(
real_column_names.push_back(name);
}
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
*/
StorageListWithLocks selected_tables = getSelectedTables();
const ASTPtr & query = query_info.query;
for (const auto & elem : selected_tables)
{
/// If PREWHERE is used in query, you need to make sure that all tables support this.
if (typeid_cast<const ASTSelectQuery &>(*query).prewhere_expression)
if (!elem.first->supportsPrewhere())
throw Exception("Storage " + elem.first->getName() + " doesn't support PREWHERE.",
ErrorCodes::ILLEGAL_PREWHERE);
}
Block virtual_columns_block = getBlockWithVirtualColumns(selected_tables);
/// If _table column is requested, try filtering
if (has_table_virtual_column)
{
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
/// Remove unused tables from the list
selected_tables.remove_if([&] (const auto & elem) { return values.find(elem.first->getTableName()) == values.end(); });
}
/** Just in case, turn off optimization "transfer to PREWHERE",
* since there is no certainty that it works when one of table is MergeTree and other is not.
*/
@ -219,111 +196,48 @@ BlockInputStreams StorageMerge::read(
modified_context.getSettingsRef().optimize_move_to_prewhere = false;
/// What will be result structure depending on query processed stage in source tables?
Block header;
Block header = getQueryHeader(column_names, query_info, context, processed_stage);
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
*/
StorageListWithLocks selected_tables = getSelectedTables(query_info.query, has_table_virtual_column, true);
size_t remaining_streams = num_streams;
size_t tables_count = selected_tables.size();
size_t curr_table_number = 0;
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it, ++curr_table_number)
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
StoragePtr table = it->first;
auto & table_lock = it->second;
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
size_t current_streams = std::min(current_need_streams, remaining_streams);
remaining_streams -= current_streams;
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.size() == 0)
real_column_names.push_back(ExpressionActions::getSmallestColumn(table->getColumns().getAllPhysical()));
/// Substitute virtual column for its value when querying tables.
ASTPtr modified_query_ast = query->clone();
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_table", table->getTableName());
SelectQueryInfo modified_query_info;
modified_query_info.query = modified_query_ast;
modified_query_info.prewhere_info = query_info.prewhere_info;
modified_query_info.sets = query_info.sets;
StoragePtr storage = it->first;
TableStructureReadLockPtr struct_lock = it->second;
BlockInputStreams source_streams;
if (curr_table_number < num_streams)
if (current_streams)
{
source_streams = table->read(
real_column_names,
modified_query_info,
modified_context,
processed_stage,
max_block_size,
tables_count >= num_streams ? 1 : (num_streams / tables_count));
if (!header)
{
switch (processed_stage)
{
case QueryProcessingStage::FetchColumns:
{
header = getSampleBlockForColumns(column_names);
if (query_info.prewhere_info)
{
query_info.prewhere_info->prewhere_actions->execute(header);
header = materializeBlock(header);
if (query_info.prewhere_info->remove_prewhere_column)
header.erase(query_info.prewhere_info->prewhere_column_name);
}
break;
}
case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete:
header = materializeBlock(InterpreterSelectQuery(
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
processed_stage, true).getSampleBlock());
break;
}
}
if (has_table_virtual_column)
for (auto & stream : source_streams)
stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
stream, std::make_shared<DataTypeString>(), table->getTableName(), "_table");
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
for (auto & stream : source_streams)
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
source_streams = createSourceStreams(
query_info, processed_stage, max_block_size, real_column_names, modified_context, header, storage,
struct_lock, current_streams, has_table_virtual_column);
}
else
{
/// If many streams, initialize it lazily, to avoid long delay before start of query processing.
source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(header, [=]() -> BlockInputStreamPtr
{
BlockInputStreams streams = table->read(
real_column_names,
modified_query_info,
modified_context,
processed_stage,
max_block_size,
1);
BlockInputStreams streams = createSourceStreams(query_info, processed_stage, max_block_size, real_column_names,
modified_context, header, storage,
struct_lock, current_streams, has_table_virtual_column);
if (streams.empty())
{
return std::make_shared<NullBlockInputStream>(header);
}
else
{
BlockInputStreamPtr stream = streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(streams) : streams[0];
if (streams.size() != 1)
throw Exception("LogicalError: the lazy stream size must to be one.", ErrorCodes::LOGICAL_ERROR);
if (has_table_virtual_column)
stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
stream, std::make_shared<DataTypeString>(), table->getTableName(), "_table");
return std::make_shared<ConvertingBlockInputStream>(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
}
return streams[0];
}));
}
for (auto & stream : source_streams)
stream->addTableLock(table_lock);
res.insert(res.end(), source_streams.begin(), source_streams.end());
}
@ -334,15 +248,63 @@ BlockInputStreams StorageMerge::read(
return res;
}
/// Construct a block consisting only of possible values of virtual columns
Block StorageMerge::getBlockWithVirtualColumns(const StorageListWithLocks & selected_tables) const
BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size, const Names & real_column_names,
const Context & modified_context, const Block & header, const StoragePtr & storage,
const TableStructureReadLockPtr & struct_lock, size_t current_streams,
bool has_table_virtual_column) const
{
auto column = ColumnString::create();
SelectQueryInfo modified_query_info;
modified_query_info.query = query_info.query->clone();
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", storage->getTableName());
modified_query_info.prewhere_info = query_info.prewhere_info;
modified_query_info.sets = query_info.sets;
for (const auto & elem : selected_tables)
column->insert(elem.first->getTableName());
BlockInputStreams source_streams;
return Block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "_table")};
if (processed_stage <= storage->getQueryProcessingStage(modified_context))
{
source_streams = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size,
current_streams);
}
else if (processed_stage > storage->getQueryProcessingStage(modified_context))
{
typeid_cast<ASTSelectQuery *>(modified_query_info.query.get())->replaceDatabaseAndTable(source_database, storage->getTableName());
/// TODO: set num_streams
InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, Names{}, processed_stage};
BlockInputStreamPtr interpreter_stream = interpreter.execute().in;
/** Materialization is needed, since from distributed storage the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
source_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(interpreter_stream));
}
if (!current_streams)
{
BlockInputStreamPtr stream =
source_streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(source_streams) : source_streams[0];
source_streams.resize(1);
source_streams[0] = stream;
}
for (BlockInputStreamPtr & source_stream : source_streams)
{
if (has_table_virtual_column)
source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
source_stream, std::make_shared<DataTypeString>(), storage->getTableName(), "_table");
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
source_stream = std::make_shared<ConvertingBlockInputStream>(
modified_context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
source_stream->addTableLock(struct_lock);
}
return source_streams;
}
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables() const
@ -367,6 +329,47 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables() const
}
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr & query, bool has_virtual_column, bool get_lock) const
{
StorageListWithLocks selected_tables;
DatabasePtr database = context.getDatabase(source_database);
DatabaseIteratorPtr iterator = database->getIterator(context);
auto virtual_column = ColumnString::create();
while (iterator->isValid())
{
if (table_name_regexp.match(iterator->name()))
{
StoragePtr storage = iterator->table();
if (query && typeid_cast<ASTSelectQuery *>(query.get())->prewhere_expression && !storage->supportsPrewhere())
throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE.",
ErrorCodes::ILLEGAL_PREWHERE);
if (storage.get() != this)
{
virtual_column->insert(storage->getTableName());
selected_tables.emplace_back(storage, get_lock ? storage->lockStructure(false, __PRETTY_FUNCTION__) : TableStructureReadLockPtr{});
}
}
iterator->next();
}
if (has_virtual_column)
{
Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(virtual_column), std::make_shared<DataTypeString>(), "_table")};
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
/// Remove unused tables from the list
selected_tables.remove_if([&] (const auto & elem) { return values.find(elem.first->getTableName()) == values.end(); });
}
return selected_tables;
}
void StorageMerge::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
{
for (const auto & param : params)
@ -381,6 +384,32 @@ void StorageMerge::alter(const AlterCommands & params, const String & database_n
setColumns(new_columns);
}
Block StorageMerge::getQueryHeader(
const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage)
{
switch (processed_stage)
{
case QueryProcessingStage::FetchColumns:
{
Block header = getSampleBlockForColumns(column_names);
if (query_info.prewhere_info)
{
query_info.prewhere_info->prewhere_actions->execute(header);
header = materializeBlock(header);
if (query_info.prewhere_info->remove_prewhere_column)
header.erase(query_info.prewhere_info->prewhere_column_name);
}
return header;
}
case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete:
return materializeBlock(InterpreterSelectQuery(
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
processed_stage, true).getSampleBlock());
}
throw Exception("Logical Error: unknow processed stage.", ErrorCodes::LOGICAL_ERROR);
}
void registerStorageMerge(StorageFactory & factory)
{

View File

@ -58,7 +58,7 @@ private:
StorageListWithLocks getSelectedTables() const;
Block getBlockWithVirtualColumns(const StorageListWithLocks & selected_tables) const;
StorageMerge::StorageListWithLocks getSelectedTables(const ASTPtr & query, bool has_virtual_column, bool get_lock) const;
template <typename F>
StoragePtr getFirstTable(F && predicate) const;
@ -70,6 +70,14 @@ protected:
const String & source_database_,
const String & table_name_regexp_,
const Context & context_);
Block getQueryHeader(const Names & column_names, const SelectQueryInfo & query_info,
const Context & context, QueryProcessingStage::Enum processed_stage);
BlockInputStreams createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size, const Names &real_column_names, const Context & modified_context,
const Block & header, const StoragePtr & storage, const TableStructureReadLockPtr & struct_lock,
size_t current_streams, bool has_table_virtual_column) const;
};
}

View File

@ -1,2 +1,55 @@
300
300
--------------Single Local------------
2018-08-01 100
2018-08-01 100 test_local_1
100 test_local_1
2018-08-01 100
2018-08-01 100
2018-08-01 100
2018-08-01 100
--------------Single Distributed------------
2018-08-01 100
2018-08-01 100 test_distributed_1
100 test_distributed_1
2018-08-01 100
2018-08-01 100
2018-08-01 100
2018-08-01 100
--------------Local Merge Local------------
2018-08-01 100
2018-08-01 200
2018-08-01 100 test_local_1
2018-08-01 200 test_local_2
100 test_local_1
200 test_local_2
2018-08-01 100
2018-08-01 100
2018-08-01 100
2018-08-01 200
2018-08-01 100
2018-08-01 200
--------------Local Merge Distributed------------
2018-08-01 200
2018-08-01 100
2018-08-01 200 test_distributed_2
2018-08-01 100 test_local_1
200 test_distributed_2
100 test_local_1
2018-08-01 100
2018-08-01 100
2018-08-01 100
2018-08-01 200
2018-08-01 100
2018-08-01 200
--------------Distributed Merge Distributed------------
2018-08-01 100
2018-08-01 200
2018-08-01 100 test_distributed_1
2018-08-01 200 test_distributed_2
100 test_distributed_1
200 test_distributed_2
2018-08-01 100
2018-08-01 100
2018-08-01 100
2018-08-01 200
2018-08-01 100
2018-08-01 200

View File

@ -11,8 +11,50 @@ CREATE TABLE test.test_distributed_2 AS test.test_local_2 ENGINE = Distributed('
INSERT INTO test.test_local_1 VALUES ('2018-08-01',100);
INSERT INTO test.test_local_2 VALUES ('2018-08-01',200);
SELECT sum(value) FROM merge('test', 'test_local_1|test_distributed_2');
SELECT sum(value) FROM merge('test', 'test_distributed_1|test_distributed_2');
SELECT '--------------Single Local------------';
SELECT * FROM merge('test', 'test_local_1');
SELECT *, _table FROM merge('test', 'test_local_1') ORDER BY _table;
SELECT sum(value), _table FROM merge('test', 'test_local_1') GROUP BY _table ORDER BY _table;
SELECT * FROM merge('test', 'test_local_1') WHERE _table = 'test_local_1';
SELECT * FROM merge('test', 'test_local_1') PREWHERE _table = 'test_local_1';
SELECT * FROM merge('test', 'test_local_1') WHERE _table in ('test_local_1', 'test_local_2');
SELECT * FROM merge('test', 'test_local_1') PREWHERE _table in ('test_local_1', 'test_local_2');
SELECT '--------------Single Distributed------------';
SELECT * FROM merge('test', 'test_distributed_1');
SELECT *, _table FROM merge('test', 'test_distributed_1') ORDER BY _table;
SELECT sum(value), _table FROM merge('test', 'test_distributed_1') GROUP BY _table ORDER BY _table;
SELECT * FROM merge('test', 'test_distributed_1') WHERE _table = 'test_distributed_1';
SELECT * FROM merge('test', 'test_distributed_1') PREWHERE _table = 'test_distributed_1';
SELECT * FROM merge('test', 'test_distributed_1') WHERE _table in ('test_distributed_1', 'test_distributed_2');
SELECT * FROM merge('test', 'test_distributed_1') PREWHERE _table in ('test_distributed_1', 'test_distributed_2');
SELECT '--------------Local Merge Local------------';
SELECT * FROM merge('test', 'test_local_1|test_local_2');
SELECT *, _table FROM merge('test', 'test_local_1|test_local_2') ORDER BY _table;
SELECT sum(value), _table FROM merge('test', 'test_local_1|test_local_2') GROUP BY _table ORDER BY _table;
SELECT * FROM merge('test', 'test_local_1|test_local_2') WHERE _table = 'test_local_1';
SELECT * FROM merge('test', 'test_local_1|test_local_2') PREWHERE _table = 'test_local_1';
SELECT * FROM merge('test', 'test_local_1|test_local_2') WHERE _table in ('test_local_1', 'test_local_2') ORDER BY value;
SELECT * FROM merge('test', 'test_local_1|test_local_2') PREWHERE _table in ('test_local_1', 'test_local_2') ORDER BY value;
SELECT '--------------Local Merge Distributed------------';
SELECT * FROM merge('test', 'test_local_1|test_distributed_2');
SELECT *, _table FROM merge('test', 'test_local_1|test_distributed_2') ORDER BY _table;
SELECT sum(value), _table FROM merge('test', 'test_local_1|test_distributed_2') GROUP BY _table ORDER BY _table;
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') WHERE _table = 'test_local_1';
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') PREWHERE _table = 'test_local_1';
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') WHERE _table in ('test_local_1', 'test_distributed_2') ORDER BY value;
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') PREWHERE _table in ('test_local_1', 'test_distributed_2') ORDER BY value;
SELECT '--------------Distributed Merge Distributed------------';
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2');
SELECT *, _table FROM merge('test', 'test_distributed_1|test_distributed_2') ORDER BY _table;
SELECT sum(value), _table FROM merge('test', 'test_distributed_1|test_distributed_2') GROUP BY _table ORDER BY _table;
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') WHERE _table = 'test_distributed_1';
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') PREWHERE _table = 'test_distributed_1';
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') WHERE _table in ('test_distributed_1', 'test_distributed_2') ORDER BY value;
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') PREWHERE _table in ('test_distributed_1', 'test_distributed_2') ORDER BY value;
DROP TABLE IF EXISTS test.test_local_1;
DROP TABLE IF EXISTS test.test_local_2;