mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 11:02:08 +00:00
Merge pull request #3159 from zhang2014/fix/ISSUES-3134
ISSUES-3134 fix merge and distributed engine query stage
This commit is contained in:
commit
65f7c390f0
@ -21,6 +21,12 @@
|
|||||||
#include <Columns/ColumnString.h>
|
#include <Columns/ColumnString.h>
|
||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <Databases/IDatabase.h>
|
#include <Databases/IDatabase.h>
|
||||||
|
#include <Interpreters/SettingsCommon.h>
|
||||||
|
#include <DataStreams/MaterializingBlockInputStream.h>
|
||||||
|
#include <DataStreams/FilterBlockInputStream.h>
|
||||||
|
#include <ext/range.h>
|
||||||
|
#include <Parsers/ASTFunction.h>
|
||||||
|
#include <Parsers/queryToString.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -32,6 +38,7 @@ namespace ErrorCodes
|
|||||||
extern const int INCOMPATIBLE_SOURCE_TABLES;
|
extern const int INCOMPATIBLE_SOURCE_TABLES;
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
||||||
|
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -138,8 +145,8 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
|
|||||||
{
|
{
|
||||||
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
|
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
|
||||||
|
|
||||||
auto database = context.getDatabase(source_database);
|
DatabasePtr database = context.getDatabase(source_database);
|
||||||
auto iterator = database->getIterator(context);
|
DatabaseIteratorPtr iterator = database->getIterator(context);
|
||||||
|
|
||||||
size_t selected_table_size = 0;
|
size_t selected_table_size = 0;
|
||||||
|
|
||||||
@ -149,14 +156,17 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
|
|||||||
{
|
{
|
||||||
auto & table = iterator->table();
|
auto & table = iterator->table();
|
||||||
if (table.get() != this)
|
if (table.get() != this)
|
||||||
|
{
|
||||||
++selected_table_size;
|
++selected_table_size;
|
||||||
|
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
iterator->next();
|
iterator->next();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto fetch_or_mergeable_stage = std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
|
return selected_table_size == 1 ? stage_in_source_tables : std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
|
||||||
return selected_table_size == 1 ? stage_in_source_tables : fetch_or_mergeable_stage;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -184,34 +194,6 @@ BlockInputStreams StorageMerge::read(
|
|||||||
real_column_names.push_back(name);
|
real_column_names.push_back(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** First we make list of selected tables to find out its size.
|
|
||||||
* This is necessary to correctly pass the recommended number of threads to each table.
|
|
||||||
*/
|
|
||||||
StorageListWithLocks selected_tables = getSelectedTables();
|
|
||||||
|
|
||||||
const ASTPtr & query = query_info.query;
|
|
||||||
|
|
||||||
for (const auto & elem : selected_tables)
|
|
||||||
{
|
|
||||||
/// If PREWHERE is used in query, you need to make sure that all tables support this.
|
|
||||||
if (typeid_cast<const ASTSelectQuery &>(*query).prewhere_expression)
|
|
||||||
if (!elem.first->supportsPrewhere())
|
|
||||||
throw Exception("Storage " + elem.first->getName() + " doesn't support PREWHERE.",
|
|
||||||
ErrorCodes::ILLEGAL_PREWHERE);
|
|
||||||
}
|
|
||||||
|
|
||||||
Block virtual_columns_block = getBlockWithVirtualColumns(selected_tables);
|
|
||||||
|
|
||||||
/// If _table column is requested, try filtering
|
|
||||||
if (has_table_virtual_column)
|
|
||||||
{
|
|
||||||
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
|
|
||||||
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
|
|
||||||
|
|
||||||
/// Remove unused tables from the list
|
|
||||||
selected_tables.remove_if([&] (const auto & elem) { return values.find(elem.first->getTableName()) == values.end(); });
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Just in case, turn off optimization "transfer to PREWHERE",
|
/** Just in case, turn off optimization "transfer to PREWHERE",
|
||||||
* since there is no certainty that it works when one of table is MergeTree and other is not.
|
* since there is no certainty that it works when one of table is MergeTree and other is not.
|
||||||
*/
|
*/
|
||||||
@ -219,111 +201,54 @@ BlockInputStreams StorageMerge::read(
|
|||||||
modified_context.getSettingsRef().optimize_move_to_prewhere = false;
|
modified_context.getSettingsRef().optimize_move_to_prewhere = false;
|
||||||
|
|
||||||
/// What will be result structure depending on query processed stage in source tables?
|
/// What will be result structure depending on query processed stage in source tables?
|
||||||
Block header;
|
Block header = getQueryHeader(column_names, query_info, context, processed_stage);
|
||||||
|
|
||||||
|
/** First we make list of selected tables to find out its size.
|
||||||
|
* This is necessary to correctly pass the recommended number of threads to each table.
|
||||||
|
*/
|
||||||
|
StorageListWithLocks selected_tables = getSelectedTables(query_info.query, has_table_virtual_column, true);
|
||||||
|
|
||||||
|
if (selected_tables.empty())
|
||||||
|
return createSourceStreams(
|
||||||
|
query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column);
|
||||||
|
|
||||||
|
size_t remaining_streams = num_streams;
|
||||||
size_t tables_count = selected_tables.size();
|
size_t tables_count = selected_tables.size();
|
||||||
|
|
||||||
size_t curr_table_number = 0;
|
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
|
||||||
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it, ++curr_table_number)
|
|
||||||
{
|
{
|
||||||
StoragePtr table = it->first;
|
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
|
||||||
auto & table_lock = it->second;
|
size_t current_streams = std::min(current_need_streams, remaining_streams);
|
||||||
|
remaining_streams -= current_streams;
|
||||||
|
current_streams = std::max(1, current_streams);
|
||||||
|
|
||||||
/// If there are only virtual columns in query, you must request at least one other column.
|
StoragePtr storage = it->first;
|
||||||
if (real_column_names.size() == 0)
|
TableStructureReadLockPtr struct_lock = it->second;
|
||||||
real_column_names.push_back(ExpressionActions::getSmallestColumn(table->getColumns().getAllPhysical()));
|
|
||||||
|
|
||||||
/// Substitute virtual column for its value when querying tables.
|
|
||||||
ASTPtr modified_query_ast = query->clone();
|
|
||||||
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_table", table->getTableName());
|
|
||||||
|
|
||||||
SelectQueryInfo modified_query_info;
|
|
||||||
modified_query_info.query = modified_query_ast;
|
|
||||||
modified_query_info.prewhere_info = query_info.prewhere_info;
|
|
||||||
modified_query_info.sets = query_info.sets;
|
|
||||||
|
|
||||||
BlockInputStreams source_streams;
|
BlockInputStreams source_streams;
|
||||||
|
|
||||||
if (curr_table_number < num_streams)
|
if (current_streams)
|
||||||
{
|
{
|
||||||
source_streams = table->read(
|
source_streams = createSourceStreams(
|
||||||
real_column_names,
|
query_info, processed_stage, max_block_size, header, storage,
|
||||||
modified_query_info,
|
struct_lock, real_column_names, modified_context, current_streams, has_table_virtual_column);
|
||||||
modified_context,
|
|
||||||
processed_stage,
|
|
||||||
max_block_size,
|
|
||||||
tables_count >= num_streams ? 1 : (num_streams / tables_count));
|
|
||||||
|
|
||||||
if (!header)
|
|
||||||
{
|
|
||||||
switch (processed_stage)
|
|
||||||
{
|
|
||||||
case QueryProcessingStage::FetchColumns:
|
|
||||||
{
|
|
||||||
header = getSampleBlockForColumns(column_names);
|
|
||||||
|
|
||||||
if (query_info.prewhere_info)
|
|
||||||
{
|
|
||||||
query_info.prewhere_info->prewhere_actions->execute(header);
|
|
||||||
header = materializeBlock(header);
|
|
||||||
if (query_info.prewhere_info->remove_prewhere_column)
|
|
||||||
header.erase(query_info.prewhere_info->prewhere_column_name);
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case QueryProcessingStage::WithMergeableState:
|
|
||||||
case QueryProcessingStage::Complete:
|
|
||||||
header = materializeBlock(InterpreterSelectQuery(
|
|
||||||
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
|
|
||||||
processed_stage, true).getSampleBlock());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (has_table_virtual_column)
|
|
||||||
for (auto & stream : source_streams)
|
|
||||||
stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
|
|
||||||
stream, std::make_shared<DataTypeString>(), table->getTableName(), "_table");
|
|
||||||
|
|
||||||
/// Subordinary tables could have different but convertible types, like numeric types of different width.
|
|
||||||
/// We must return streams with structure equals to structure of Merge table.
|
|
||||||
for (auto & stream : source_streams)
|
|
||||||
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/// If many streams, initialize it lazily, to avoid long delay before start of query processing.
|
source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(
|
||||||
source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(header, [=]() -> BlockInputStreamPtr
|
header, [=, &real_column_names, &modified_context]() -> BlockInputStreamPtr
|
||||||
{
|
|
||||||
BlockInputStreams streams = table->read(
|
|
||||||
real_column_names,
|
|
||||||
modified_query_info,
|
|
||||||
modified_context,
|
|
||||||
processed_stage,
|
|
||||||
max_block_size,
|
|
||||||
1);
|
|
||||||
|
|
||||||
if (streams.empty())
|
|
||||||
{
|
{
|
||||||
return std::make_shared<NullBlockInputStream>(header);
|
BlockInputStreams streams = createSourceStreams(query_info, processed_stage, max_block_size,
|
||||||
}
|
header, storage, struct_lock, real_column_names,
|
||||||
else
|
modified_context, current_streams, has_table_virtual_column, true);
|
||||||
{
|
|
||||||
BlockInputStreamPtr stream = streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(streams) : streams[0];
|
|
||||||
|
|
||||||
if (has_table_virtual_column)
|
if (!streams.empty() && streams.size() != 1)
|
||||||
stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
|
throw Exception("LogicalError: the lazy stream size must to be one or empty.", ErrorCodes::LOGICAL_ERROR);
|
||||||
stream, std::make_shared<DataTypeString>(), table->getTableName(), "_table");
|
|
||||||
|
|
||||||
return std::make_shared<ConvertingBlockInputStream>(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
|
return streams.empty() ? std::make_shared<NullBlockInputStream>(header) : streams[0];
|
||||||
}
|
}));
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto & stream : source_streams)
|
|
||||||
stream->addTableLock(table_lock);
|
|
||||||
|
|
||||||
res.insert(res.end(), source_streams.begin(), source_streams.end());
|
res.insert(res.end(), source_streams.begin(), source_streams.end());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -334,15 +259,79 @@ BlockInputStreams StorageMerge::read(
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Construct a block consisting only of possible values of virtual columns
|
BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
|
||||||
Block StorageMerge::getBlockWithVirtualColumns(const StorageListWithLocks & selected_tables) const
|
const size_t max_block_size, const Block & header, const StoragePtr & storage,
|
||||||
|
const TableStructureReadLockPtr & struct_lock, Names & real_column_names,
|
||||||
|
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
|
||||||
|
bool concat_streams)
|
||||||
{
|
{
|
||||||
auto column = ColumnString::create();
|
SelectQueryInfo modified_query_info;
|
||||||
|
modified_query_info.sets = query_info.sets;
|
||||||
|
modified_query_info.query = query_info.query->clone();
|
||||||
|
modified_query_info.prewhere_info = query_info.prewhere_info;
|
||||||
|
|
||||||
for (const auto & elem : selected_tables)
|
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", storage ? storage->getTableName() : "");
|
||||||
column->insert(elem.first->getTableName());
|
|
||||||
|
|
||||||
return Block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "_table")};
|
if (!storage)
|
||||||
|
return BlockInputStreams{
|
||||||
|
InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header),
|
||||||
|
processed_stage, true).execute().in};
|
||||||
|
|
||||||
|
BlockInputStreams source_streams;
|
||||||
|
|
||||||
|
if (processed_stage <= storage->getQueryProcessingStage(modified_context))
|
||||||
|
{
|
||||||
|
/// If there are only virtual columns in query, you must request at least one other column.
|
||||||
|
if (real_column_names.size() ==0)
|
||||||
|
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical()));
|
||||||
|
|
||||||
|
source_streams = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size,
|
||||||
|
UInt32(streams_num));
|
||||||
|
}
|
||||||
|
else if (processed_stage > storage->getQueryProcessingStage(modified_context))
|
||||||
|
{
|
||||||
|
typeid_cast<ASTSelectQuery *>(modified_query_info.query.get())->replaceDatabaseAndTable(source_database, storage->getTableName());
|
||||||
|
|
||||||
|
/// Maximum permissible parallelism is streams_num
|
||||||
|
modified_context.getSettingsRef().max_threads = UInt64(streams_num);
|
||||||
|
modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1;
|
||||||
|
|
||||||
|
InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, Names{}, processed_stage};
|
||||||
|
BlockInputStreamPtr interpreter_stream = interpreter.execute().in;
|
||||||
|
|
||||||
|
/** Materialization is needed, since from distributed storage the constants come materialized.
|
||||||
|
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
|
||||||
|
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
|
||||||
|
*/
|
||||||
|
source_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(interpreter_stream));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!source_streams.empty())
|
||||||
|
{
|
||||||
|
if (concat_streams)
|
||||||
|
{
|
||||||
|
BlockInputStreamPtr stream =
|
||||||
|
source_streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(source_streams) : source_streams[0];
|
||||||
|
|
||||||
|
source_streams.resize(1);
|
||||||
|
source_streams[0] = stream;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (BlockInputStreamPtr & source_stream : source_streams)
|
||||||
|
{
|
||||||
|
if (has_table_virtual_column)
|
||||||
|
source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
|
||||||
|
source_stream, std::make_shared<DataTypeString>(), storage->getTableName(), "_table");
|
||||||
|
|
||||||
|
/// Subordinary tables could have different but convertible types, like numeric types of different width.
|
||||||
|
/// We must return streams with structure equals to structure of Merge table.
|
||||||
|
convertingSourceStream(header, modified_context, modified_query_info.query, source_stream, processed_stage);
|
||||||
|
|
||||||
|
source_stream->addTableLock(struct_lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return source_streams;
|
||||||
}
|
}
|
||||||
|
|
||||||
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables() const
|
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables() const
|
||||||
@ -367,6 +356,46 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables() const
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr & query, bool has_virtual_column, bool get_lock) const
|
||||||
|
{
|
||||||
|
StorageListWithLocks selected_tables;
|
||||||
|
DatabasePtr database = context.getDatabase(source_database);
|
||||||
|
DatabaseIteratorPtr iterator = database->getIterator(context);
|
||||||
|
|
||||||
|
auto virtual_column = ColumnString::create();
|
||||||
|
|
||||||
|
while (iterator->isValid())
|
||||||
|
{
|
||||||
|
if (table_name_regexp.match(iterator->name()))
|
||||||
|
{
|
||||||
|
StoragePtr storage = iterator->table();
|
||||||
|
|
||||||
|
if (query && typeid_cast<ASTSelectQuery *>(query.get())->prewhere_expression && !storage->supportsPrewhere())
|
||||||
|
throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE.", ErrorCodes::ILLEGAL_PREWHERE);
|
||||||
|
|
||||||
|
if (storage.get() != this)
|
||||||
|
{
|
||||||
|
virtual_column->insert(storage->getTableName());
|
||||||
|
selected_tables.emplace_back(storage, get_lock ? storage->lockStructure(false, __PRETTY_FUNCTION__) : TableStructureReadLockPtr{});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
iterator->next();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (has_virtual_column)
|
||||||
|
{
|
||||||
|
Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(virtual_column), std::make_shared<DataTypeString>(), "_table")};
|
||||||
|
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
|
||||||
|
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
|
||||||
|
|
||||||
|
/// Remove unused tables from the list
|
||||||
|
selected_tables.remove_if([&] (const auto & elem) { return values.find(elem.first->getTableName()) == values.end(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
return selected_tables;
|
||||||
|
}
|
||||||
|
|
||||||
void StorageMerge::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
|
void StorageMerge::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
|
||||||
{
|
{
|
||||||
for (const auto & param : params)
|
for (const auto & param : params)
|
||||||
@ -381,6 +410,69 @@ void StorageMerge::alter(const AlterCommands & params, const String & database_n
|
|||||||
setColumns(new_columns);
|
setColumns(new_columns);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Block StorageMerge::getQueryHeader(
|
||||||
|
const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage)
|
||||||
|
{
|
||||||
|
switch (processed_stage)
|
||||||
|
{
|
||||||
|
case QueryProcessingStage::FetchColumns:
|
||||||
|
{
|
||||||
|
Block header = getSampleBlockForColumns(column_names);
|
||||||
|
if (query_info.prewhere_info)
|
||||||
|
{
|
||||||
|
query_info.prewhere_info->prewhere_actions->execute(header);
|
||||||
|
header = materializeBlock(header);
|
||||||
|
if (query_info.prewhere_info->remove_prewhere_column)
|
||||||
|
header.erase(query_info.prewhere_info->prewhere_column_name);
|
||||||
|
}
|
||||||
|
return header;
|
||||||
|
}
|
||||||
|
case QueryProcessingStage::WithMergeableState:
|
||||||
|
case QueryProcessingStage::Complete:
|
||||||
|
return materializeBlock(InterpreterSelectQuery(
|
||||||
|
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
|
||||||
|
processed_stage, true).getSampleBlock());
|
||||||
|
}
|
||||||
|
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
void StorageMerge::convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
|
||||||
|
BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage)
|
||||||
|
{
|
||||||
|
Block before_block_header = source_stream->getHeader();
|
||||||
|
source_stream = std::make_shared<ConvertingBlockInputStream>(context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
|
||||||
|
|
||||||
|
ASTPtr where_expression = typeid_cast<ASTSelectQuery *>(query.get())->where_expression;
|
||||||
|
|
||||||
|
if (!where_expression)
|
||||||
|
return;
|
||||||
|
|
||||||
|
for (size_t column_index : ext::range(0, header.columns()))
|
||||||
|
{
|
||||||
|
ColumnWithTypeAndName header_column = header.getByPosition(column_index);
|
||||||
|
ColumnWithTypeAndName before_column = before_block_header.getByName(header_column.name);
|
||||||
|
/// If the processed_stage greater than FetchColumns and the block structure between streams is different.
|
||||||
|
/// the where expression maybe invalid because of convertingBlockInputStream.
|
||||||
|
/// So we need to throw exception.
|
||||||
|
if (!header_column.type->equals(*before_column.type.get()) && processed_stage > QueryProcessingStage::FetchColumns)
|
||||||
|
{
|
||||||
|
NamesAndTypesList source_columns = getSampleBlock().getNamesAndTypesList();
|
||||||
|
NameAndTypePair virtual_column = getColumn("_table");
|
||||||
|
source_columns.insert(source_columns.end(), virtual_column);
|
||||||
|
ExpressionActionsPtr actions = ExpressionAnalyzer{where_expression, context, {}, source_columns}.getActions(false, false);
|
||||||
|
Names required_columns = actions->getRequiredColumns();
|
||||||
|
|
||||||
|
for (const auto required_column : required_columns)
|
||||||
|
{
|
||||||
|
if (required_column == header_column.name)
|
||||||
|
throw Exception("Block structure mismatch in Merge Storage: different types:\n" + before_block_header.dumpStructure()
|
||||||
|
+ "\n" + header.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void registerStorageMerge(StorageFactory & factory)
|
void registerStorageMerge(StorageFactory & factory)
|
||||||
{
|
{
|
||||||
|
@ -58,7 +58,7 @@ private:
|
|||||||
|
|
||||||
StorageListWithLocks getSelectedTables() const;
|
StorageListWithLocks getSelectedTables() const;
|
||||||
|
|
||||||
Block getBlockWithVirtualColumns(const StorageListWithLocks & selected_tables) const;
|
StorageMerge::StorageListWithLocks getSelectedTables(const ASTPtr & query, bool has_virtual_column, bool get_lock) const;
|
||||||
|
|
||||||
template <typename F>
|
template <typename F>
|
||||||
StoragePtr getFirstTable(F && predicate) const;
|
StoragePtr getFirstTable(F && predicate) const;
|
||||||
@ -70,6 +70,18 @@ protected:
|
|||||||
const String & source_database_,
|
const String & source_database_,
|
||||||
const String & table_name_regexp_,
|
const String & table_name_regexp_,
|
||||||
const Context & context_);
|
const Context & context_);
|
||||||
|
|
||||||
|
Block getQueryHeader(const Names & column_names, const SelectQueryInfo & query_info,
|
||||||
|
const Context & context, QueryProcessingStage::Enum processed_stage);
|
||||||
|
|
||||||
|
BlockInputStreams createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
|
||||||
|
const size_t max_block_size, const Block & header, const StoragePtr & storage,
|
||||||
|
const TableStructureReadLockPtr & struct_lock, Names & real_column_names,
|
||||||
|
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
|
||||||
|
bool concat_streams = false);
|
||||||
|
|
||||||
|
void convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
|
||||||
|
BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,2 +1,56 @@
|
|||||||
300
|
--------------Single Local------------
|
||||||
300
|
2018-08-01 100
|
||||||
|
2018-08-01 100 test_local_1
|
||||||
|
100 test_local_1
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 100
|
||||||
|
--------------Single Distributed------------
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 100 test_distributed_1
|
||||||
|
100 test_distributed_1
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 100
|
||||||
|
--------------Local Merge Local------------
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 200
|
||||||
|
2018-08-01 100 test_local_1
|
||||||
|
2018-08-01 200 test_local_2
|
||||||
|
100 test_local_1
|
||||||
|
200 test_local_2
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 200
|
||||||
|
--------------Local Merge Distributed------------
|
||||||
|
2018-08-01 200
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 200 test_distributed_2
|
||||||
|
2018-08-01 100 test_local_1
|
||||||
|
200 test_distributed_2
|
||||||
|
100 test_local_1
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 200
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 200
|
||||||
|
--------------Distributed Merge Distributed------------
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 200
|
||||||
|
2018-08-01 100 test_distributed_1
|
||||||
|
2018-08-01 200 test_distributed_2
|
||||||
|
100 test_distributed_1
|
||||||
|
200 test_distributed_2
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 200
|
||||||
|
2018-08-01 100
|
||||||
|
2018-08-01 200
|
||||||
|
--------------Implicit type conversion------------
|
||||||
|
2018-08-01 -1
|
||||||
|
2018-08-01 1
|
||||||
|
2018-08-01 -1
|
||||||
|
2018-08-01 1
|
||||||
|
2018-08-01 1
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
SET send_logs_level = 'none';
|
||||||
|
|
||||||
DROP TABLE IF EXISTS test.test_local_1;
|
DROP TABLE IF EXISTS test.test_local_1;
|
||||||
DROP TABLE IF EXISTS test.test_local_2;
|
DROP TABLE IF EXISTS test.test_local_2;
|
||||||
DROP TABLE IF EXISTS test.test_distributed_1;
|
DROP TABLE IF EXISTS test.test_distributed_1;
|
||||||
@ -11,10 +13,76 @@ CREATE TABLE test.test_distributed_2 AS test.test_local_2 ENGINE = Distributed('
|
|||||||
INSERT INTO test.test_local_1 VALUES ('2018-08-01',100);
|
INSERT INTO test.test_local_1 VALUES ('2018-08-01',100);
|
||||||
INSERT INTO test.test_local_2 VALUES ('2018-08-01',200);
|
INSERT INTO test.test_local_2 VALUES ('2018-08-01',200);
|
||||||
|
|
||||||
SELECT sum(value) FROM merge('test', 'test_local_1|test_distributed_2');
|
SELECT '--------------Single Local------------';
|
||||||
SELECT sum(value) FROM merge('test', 'test_distributed_1|test_distributed_2');
|
SELECT * FROM merge('test', 'test_local_1');
|
||||||
|
SELECT *, _table FROM merge('test', 'test_local_1') ORDER BY _table;
|
||||||
|
SELECT sum(value), _table FROM merge('test', 'test_local_1') GROUP BY _table ORDER BY _table;
|
||||||
|
SELECT * FROM merge('test', 'test_local_1') WHERE _table = 'test_local_1';
|
||||||
|
SELECT * FROM merge('test', 'test_local_1') PREWHERE _table = 'test_local_1'; -- { serverError 8 }
|
||||||
|
SELECT * FROM merge('test', 'test_local_1') WHERE _table in ('test_local_1', 'test_local_2');
|
||||||
|
SELECT * FROM merge('test', 'test_local_1') PREWHERE _table in ('test_local_1', 'test_local_2'); -- { serverError 8 }
|
||||||
|
|
||||||
|
SELECT '--------------Single Distributed------------';
|
||||||
|
SELECT * FROM merge('test', 'test_distributed_1');
|
||||||
|
SELECT *, _table FROM merge('test', 'test_distributed_1') ORDER BY _table;
|
||||||
|
SELECT sum(value), _table FROM merge('test', 'test_distributed_1') GROUP BY _table ORDER BY _table;
|
||||||
|
SELECT * FROM merge('test', 'test_distributed_1') WHERE _table = 'test_distributed_1';
|
||||||
|
SELECT * FROM merge('test', 'test_distributed_1') PREWHERE _table = 'test_distributed_1';
|
||||||
|
SELECT * FROM merge('test', 'test_distributed_1') WHERE _table in ('test_distributed_1', 'test_distributed_2');
|
||||||
|
SELECT * FROM merge('test', 'test_distributed_1') PREWHERE _table in ('test_distributed_1', 'test_distributed_2');
|
||||||
|
|
||||||
|
SELECT '--------------Local Merge Local------------';
|
||||||
|
SELECT * FROM merge('test', 'test_local_1|test_local_2') ORDER BY _table;
|
||||||
|
SELECT *, _table FROM merge('test', 'test_local_1|test_local_2') ORDER BY _table;
|
||||||
|
SELECT sum(value), _table FROM merge('test', 'test_local_1|test_local_2') GROUP BY _table ORDER BY _table;
|
||||||
|
SELECT * FROM merge('test', 'test_local_1|test_local_2') WHERE _table = 'test_local_1';
|
||||||
|
SELECT * FROM merge('test', 'test_local_1|test_local_2') PREWHERE _table = 'test_local_1'; -- {serverError 8}
|
||||||
|
SELECT * FROM merge('test', 'test_local_1|test_local_2') WHERE _table in ('test_local_1', 'test_local_2') ORDER BY value;
|
||||||
|
SELECT * FROM merge('test', 'test_local_1|test_local_2') PREWHERE _table in ('test_local_1', 'test_local_2') ORDER BY value; -- {serverError 8}
|
||||||
|
|
||||||
|
SELECT '--------------Local Merge Distributed------------';
|
||||||
|
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') ORDER BY _table;
|
||||||
|
SELECT *, _table FROM merge('test', 'test_local_1|test_distributed_2') ORDER BY _table;
|
||||||
|
SELECT sum(value), _table FROM merge('test', 'test_local_1|test_distributed_2') GROUP BY _table ORDER BY _table;
|
||||||
|
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') WHERE _table = 'test_local_1';
|
||||||
|
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') PREWHERE _table = 'test_local_1';
|
||||||
|
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') WHERE _table in ('test_local_1', 'test_distributed_2') ORDER BY value;
|
||||||
|
SELECT * FROM merge('test', 'test_local_1|test_distributed_2') PREWHERE _table in ('test_local_1', 'test_distributed_2') ORDER BY value;
|
||||||
|
|
||||||
|
SELECT '--------------Distributed Merge Distributed------------';
|
||||||
|
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') ORDER BY _table;
|
||||||
|
SELECT *, _table FROM merge('test', 'test_distributed_1|test_distributed_2') ORDER BY _table;
|
||||||
|
SELECT sum(value), _table FROM merge('test', 'test_distributed_1|test_distributed_2') GROUP BY _table ORDER BY _table;
|
||||||
|
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') WHERE _table = 'test_distributed_1';
|
||||||
|
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') PREWHERE _table = 'test_distributed_1';
|
||||||
|
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') WHERE _table in ('test_distributed_1', 'test_distributed_2') ORDER BY value;
|
||||||
|
SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') PREWHERE _table in ('test_distributed_1', 'test_distributed_2') ORDER BY value;
|
||||||
|
|
||||||
DROP TABLE IF EXISTS test.test_local_1;
|
DROP TABLE IF EXISTS test.test_local_1;
|
||||||
DROP TABLE IF EXISTS test.test_local_2;
|
DROP TABLE IF EXISTS test.test_local_2;
|
||||||
DROP TABLE IF EXISTS test.test_distributed_1;
|
DROP TABLE IF EXISTS test.test_distributed_1;
|
||||||
DROP TABLE IF EXISTS test.test_distributed_2;
|
DROP TABLE IF EXISTS test.test_distributed_2;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.test_u64_local;
|
||||||
|
DROP TABLE IF EXISTS test.test_s64_local;
|
||||||
|
DROP TABLE IF EXISTS test.test_u64_distributed;
|
||||||
|
DROP TABLE IF EXISTS test.test_s64_distributed;
|
||||||
|
|
||||||
|
CREATE TABLE test.test_s64_local (date Date, value Int64) ENGINE = MergeTree(date, date, 8192);
|
||||||
|
CREATE TABLE test.test_u64_local (date Date, value UInt64) ENGINE = MergeTree(date, date, 8192);
|
||||||
|
CREATE TABLE test.test_s64_distributed AS test.test_s64_local ENGINE = Distributed('test_shard_localhost', 'test', test_s64_local, rand());
|
||||||
|
CREATE TABLE test.test_u64_distributed AS test.test_u64_local ENGINE = Distributed('test_shard_localhost', 'test', test_u64_local, rand());
|
||||||
|
|
||||||
|
INSERT INTO test.test_s64_local VALUES ('2018-08-01', -1);
|
||||||
|
INSERT INTO test.test_u64_local VALUES ('2018-08-01', 1);
|
||||||
|
|
||||||
|
SELECT '--------------Implicit type conversion------------';
|
||||||
|
SELECT * FROM merge('test', 'test_s64_distributed|test_u64_distributed') ORDER BY value;
|
||||||
|
SELECT * FROM merge('test', 'test_s64_distributed|test_u64_distributed') WHERE date = '2018-08-01' ORDER BY value;
|
||||||
|
SELECT * FROM merge('test', 'test_s64_distributed|test_u64_distributed') WHERE _table = 'test_u64_distributed' ORDER BY value;
|
||||||
|
SELECT * FROM merge('test', 'test_s64_distributed|test_u64_distributed') WHERE value = 1; -- { serverError 171}
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS test.test_u64_local;
|
||||||
|
DROP TABLE IF EXISTS test.test_s64_local;
|
||||||
|
DROP TABLE IF EXISTS test.test_u64_distributed;
|
||||||
|
DROP TABLE IF EXISTS test.test_s64_distributed;
|
||||||
|
Loading…
Reference in New Issue
Block a user