ClickHouse/dbms/src/Storages/StorageMerge.cpp

504 lines
20 KiB
C++
Raw Normal View History

#include <DataStreams/AddingConstColumnBlockInputStream.h>
#include <DataStreams/narrowBlockInputStreams.h>
#include <DataStreams/LazyBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
2018-02-23 01:00:47 +00:00
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <Storages/StorageMerge.h>
#include <Storages/StorageFactory.h>
2017-11-20 05:22:54 +00:00
#include <Storages/VirtualColumnUtils.h>
2018-12-25 23:14:39 +00:00
#include <Storages/AlterCommands.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/evaluateConstantExpression.h>
2018-02-23 01:00:47 +00:00
#include <Interpreters/InterpreterSelectQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
#include <Databases/IDatabase.h>
#include <Interpreters/SettingsCommon.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <ext/range.h>
2019-01-09 15:44:20 +00:00
#include <algorithm>
#include <Parsers/ASTFunction.h>
#include <Parsers/queryToString.h>
2012-05-30 05:53:09 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_PREWHERE;
extern const int INCOMPATIBLE_SOURCE_TABLES;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
2018-07-06 16:22:15 +00:00
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
}
StorageMerge::StorageMerge(
const std::string & name_,
const ColumnsDescription & columns_,
const String & source_database_,
const String & table_name_regexp_,
const Context & context_)
: IStorage{columns_},
name(name_), source_database(source_database_),
table_name_regexp(table_name_regexp_), global_context(context_)
2012-05-30 05:53:09 +00:00
{
}
/// NOTE Structure of underlying tables as well as their set are not constant,
/// so the results of these methods may become obsolete after the call.
NameAndTypePair StorageMerge::getColumn(const String & column_name) const
2014-01-16 14:52:13 +00:00
{
/// virtual column of the Merge table itself
if (column_name == "_table")
return { column_name, std::make_shared<DataTypeString>() };
2018-07-06 16:22:15 +00:00
if (IStorage::hasColumn(column_name))
return IStorage::getColumn(column_name);
2014-07-31 09:11:49 +00:00
/// virtual (and real) columns of the underlying tables
auto first_table = getFirstTable([](auto &&) { return true; });
if (first_table)
return first_table->getColumn(column_name);
2018-07-06 16:22:15 +00:00
throw Exception("There is no column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
2014-01-16 14:52:13 +00:00
}
2014-07-31 09:11:49 +00:00
bool StorageMerge::hasColumn(const String & column_name) const
2014-01-16 14:52:13 +00:00
{
if (column_name == "_table")
return true;
2018-07-06 16:22:15 +00:00
if (IStorage::hasColumn(column_name))
return true;
auto first_table = getFirstTable([](auto &&) { return true; });
if (first_table)
return first_table->hasColumn(column_name);
2018-07-06 16:22:15 +00:00
return false;
2014-01-16 14:52:13 +00:00
}
2012-05-30 05:53:09 +00:00
2018-02-23 01:00:47 +00:00
template <typename F>
StoragePtr StorageMerge::getFirstTable(F && predicate) const
{
auto database = global_context.getDatabase(source_database);
auto iterator = database->getIterator(global_context);
2018-02-23 01:00:47 +00:00
while (iterator->isValid())
{
2018-02-23 01:00:47 +00:00
if (table_name_regexp.match(iterator->name()))
{
2018-02-23 01:00:47 +00:00
auto & table = iterator->table();
if (table.get() != this && predicate(table))
return table;
}
2018-02-23 01:00:47 +00:00
iterator->next();
}
2018-02-23 01:00:47 +00:00
return {};
}
bool StorageMerge::isRemote() const
{
auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table->isRemote(); });
return first_remote_table != nullptr;
}
2017-07-21 20:59:01 +00:00
bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const
2018-03-16 09:00:04 +00:00
{
/// It's beneficial if it is true for at least one table.
StorageListWithLocks selected_tables = getSelectedTables(query_context.getCurrentQueryId());
2018-03-16 09:00:04 +00:00
size_t i = 0;
for (const auto & table : selected_tables)
{
if (table.first->mayBenefitFromIndexForIn(left_in_operand, query_context))
2018-03-16 09:00:04 +00:00
return true;
++i;
/// For simplicity reasons, check only first ten tables.
if (i > 10)
break;
}
return false;
}
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context) const
{
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
DatabasePtr database = context.getDatabase(source_database);
DatabaseIteratorPtr iterator = database->getIterator(context);
size_t selected_table_size = 0;
while (iterator->isValid())
{
if (table_name_regexp.match(iterator->name()))
{
auto & table = iterator->table();
if (table.get() != this)
{
++selected_table_size;
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context));
}
}
iterator->next();
}
return selected_table_size == 1 ? stage_in_source_tables : std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
}
2012-05-30 05:53:09 +00:00
BlockInputStreams StorageMerge::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
2017-06-02 15:54:39 +00:00
const unsigned num_streams)
2012-05-30 05:53:09 +00:00
{
BlockInputStreams res;
2018-02-23 01:00:47 +00:00
bool has_table_virtual_column = false;
Names real_column_names;
real_column_names.reserve(column_names.size());
for (const auto & column_name : column_names)
2018-02-23 01:00:47 +00:00
{
if (column_name == "_table")
2018-02-23 01:00:47 +00:00
has_table_virtual_column = true;
else
real_column_names.push_back(column_name);
2018-02-23 01:00:47 +00:00
}
/** Just in case, turn off optimization "transfer to PREWHERE",
* since there is no certainty that it works when one of table is MergeTree and other is not.
*/
Context modified_context = context;
modified_context.getSettingsRef().optimize_move_to_prewhere = false;
2018-02-23 01:00:47 +00:00
/// What will be result structure depending on query processed stage in source tables?
Block header = getQueryHeader(column_names, query_info, context, processed_stage);
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
*/
StorageListWithLocks selected_tables = getSelectedTables(
query_info.query, has_table_virtual_column, true, context.getCurrentQueryId());
2018-02-21 06:47:32 +00:00
if (selected_tables.empty())
return createSourceStreams(
query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column);
size_t remaining_streams = num_streams;
size_t tables_count = selected_tables.size();
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
2018-11-26 00:56:50 +00:00
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
size_t current_streams = std::min(current_need_streams, remaining_streams);
remaining_streams -= current_streams;
2019-01-09 15:44:20 +00:00
current_streams = std::max(size_t(1), current_streams);
StoragePtr storage = it->first;
TableStructureReadLockHolder struct_lock = it->second;
2017-07-21 20:59:01 +00:00
BlockInputStreams source_streams;
if (current_streams)
{
source_streams = createSourceStreams(
query_info, processed_stage, max_block_size, header, storage,
struct_lock, real_column_names, modified_context, current_streams, has_table_virtual_column);
}
else
{
source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(
2018-12-23 08:21:55 +00:00
header, [=]() mutable -> BlockInputStreamPtr
{
BlockInputStreams streams = createSourceStreams(query_info, processed_stage, max_block_size,
header, storage, struct_lock, real_column_names,
modified_context, current_streams, has_table_virtual_column, true);
if (!streams.empty() && streams.size() != 1)
throw Exception("LogicalError: the lazy stream size must to be one or empty.", ErrorCodes::LOGICAL_ERROR);
return streams.empty() ? std::make_shared<NullBlockInputStream>(header) : streams[0];
}));
}
res.insert(res.end(), source_streams.begin(), source_streams.end());
}
if (res.empty())
return res;
res = narrowBlockInputStreams(res, num_streams);
return res;
2012-05-30 05:53:09 +00:00
}
BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
2019-02-10 16:55:12 +00:00
const UInt64 max_block_size, const Block & header, const StoragePtr & storage,
const TableStructureReadLockHolder & struct_lock, Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams)
{
SelectQueryInfo modified_query_info = query_info;
modified_query_info.query = query_info.query->clone();
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", storage ? storage->getTableName() : "");
if (!storage)
return BlockInputStreams{
InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header),
2019-03-15 15:57:18 +00:00
analyze(SelectQueryOptions(processed_stage))).execute().in};
BlockInputStreams source_streams;
if (processed_stage <= storage->getQueryProcessingStage(modified_context))
{
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.size() ==0)
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical()));
source_streams = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size,
UInt32(streams_num));
}
else if (processed_stage > storage->getQueryProcessingStage(modified_context))
{
2019-03-11 13:22:51 +00:00
modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, storage->getTableName());
/// Maximum permissible parallelism is streams_num
modified_context.getSettingsRef().max_threads = UInt64(streams_num);
modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1;
2019-03-15 15:57:18 +00:00
InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage)};
BlockInputStreamPtr interpreter_stream = interpreter.execute().in;
/** Materialization is needed, since from distributed storage the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
source_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(interpreter_stream));
}
if (!source_streams.empty())
{
if (concat_streams)
{
BlockInputStreamPtr stream =
source_streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(source_streams) : source_streams[0];
source_streams.resize(1);
source_streams[0] = stream;
}
for (BlockInputStreamPtr & source_stream : source_streams)
{
if (has_table_virtual_column)
source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
source_stream, std::make_shared<DataTypeString>(), storage->getTableName(), "_table");
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, modified_context, modified_query_info.query, source_stream, processed_stage);
source_stream->addTableLock(struct_lock);
}
}
return source_streams;
}
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String & query_id) const
2013-09-23 12:01:19 +00:00
{
StorageListWithLocks selected_tables;
auto database = global_context.getDatabase(source_database);
auto iterator = database->getIterator(global_context);
while (iterator->isValid())
{
if (table_name_regexp.match(iterator->name()))
{
auto & table = iterator->table();
if (table.get() != this)
selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id));
}
iterator->next();
}
return selected_tables;
2012-05-30 05:53:09 +00:00
}
2013-09-23 12:01:19 +00:00
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr & query, bool has_virtual_column, bool get_lock, const String & query_id) const
{
StorageListWithLocks selected_tables;
DatabasePtr database = global_context.getDatabase(source_database);
DatabaseIteratorPtr iterator = database->getIterator(global_context);
auto virtual_column = ColumnString::create();
while (iterator->isValid())
{
if (table_name_regexp.match(iterator->name()))
{
StoragePtr storage = iterator->table();
2019-03-11 13:22:51 +00:00
if (query && query->as<ASTSelectQuery>()->prewhere_expression && !storage->supportsPrewhere())
throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE.", ErrorCodes::ILLEGAL_PREWHERE);
if (storage.get() != this)
{
virtual_column->insert(storage->getTableName());
selected_tables.emplace_back(storage, get_lock ? storage->lockStructureForShare(false, query_id) : TableStructureReadLockHolder{});
}
}
iterator->next();
}
if (has_virtual_column)
{
Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(virtual_column), std::make_shared<DataTypeString>(), "_table")};
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, global_context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
/// Remove unused tables from the list
selected_tables.remove_if([&] (const auto & elem) { return values.find(elem.first->getTableName()) == values.end(); });
}
return selected_tables;
}
2019-03-05 10:12:20 +00:00
void StorageMerge::alter(
const AlterCommands & params, const String & database_name, const String & table_name,
2019-03-07 20:52:25 +00:00
const Context & context, TableStructureWriteLockHolder & table_lock_holder)
2013-09-23 12:01:19 +00:00
{
2019-03-07 20:52:25 +00:00
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
2016-05-13 21:08:19 +00:00
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
auto new_columns = getColumns();
auto new_indices = getIndicesDescription();
params.apply(new_columns);
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
context.getDatabase(database_name)->alterTable(context, table_name, new_columns, new_indices, {});
setColumns(new_columns);
2013-09-23 12:01:19 +00:00
}
Block StorageMerge::getQueryHeader(
const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage)
{
switch (processed_stage)
{
case QueryProcessingStage::FetchColumns:
{
Block header = getSampleBlockForColumns(column_names);
if (query_info.prewhere_info)
{
query_info.prewhere_info->prewhere_actions->execute(header);
header = materializeBlock(header);
if (query_info.prewhere_info->remove_prewhere_column)
header.erase(query_info.prewhere_info->prewhere_column_name);
}
return header;
}
case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete:
return materializeBlock(InterpreterSelectQuery(
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
2019-03-15 15:57:18 +00:00
analyze(SelectQueryOptions(processed_stage))).getSampleBlock());
}
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
}
void StorageMerge::convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage)
{
Block before_block_header = source_stream->getHeader();
source_stream = std::make_shared<ConvertingBlockInputStream>(context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
2019-03-11 13:22:51 +00:00
auto where_expression = query->as<ASTSelectQuery>()->where_expression;
if (!where_expression)
return;
for (size_t column_index : ext::range(0, header.columns()))
{
ColumnWithTypeAndName header_column = header.getByPosition(column_index);
ColumnWithTypeAndName before_column = before_block_header.getByName(header_column.name);
/// If the processed_stage greater than FetchColumns and the block structure between streams is different.
/// the where expression maybe invalid because of convertingBlockInputStream.
/// So we need to throw exception.
if (!header_column.type->equals(*before_column.type.get()) && processed_stage > QueryProcessingStage::FetchColumns)
{
NamesAndTypesList source_columns = getSampleBlock().getNamesAndTypesList();
NameAndTypePair virtual_column = getColumn("_table");
source_columns.insert(source_columns.end(), virtual_column);
2019-01-09 16:16:59 +00:00
auto syntax_result = SyntaxAnalyzer(context).analyze(where_expression, source_columns);
ExpressionActionsPtr actions = ExpressionAnalyzer{where_expression, syntax_result, context}.getActions(false, false);
Names required_columns = actions->getRequiredColumns();
for (const auto & required_column : required_columns)
{
if (required_column == header_column.name)
throw Exception("Block structure mismatch in Merge Storage: different types:\n" + before_block_header.dumpStructure()
+ "\n" + header.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
}
}
}
}
void registerStorageMerge(StorageFactory & factory)
{
factory.registerStorage("Merge", [](const StorageFactory::Arguments & args)
{
/** In query, the name of database is specified as table engine argument which contains source tables,
* as well as regex for source-table names.
*/
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 2)
throw Exception("Storage Merge requires exactly 2 parameters"
" - name of source database and regexp for table names.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context);
2019-03-15 17:09:14 +00:00
String source_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
String table_name_regexp = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
return StorageMerge::create(
args.table_name, args.columns,
source_database, table_name_regexp, args.context);
});
}
2013-09-23 12:01:19 +00:00
}