2017-12-12 15:54:03 +00:00
|
|
|
#include <Storages/System/StorageSystemPartsBase.h>
|
|
|
|
#include <Common/escapeForFileName.h>
|
|
|
|
#include <Columns/ColumnString.h>
|
|
|
|
#include <DataTypes/DataTypeString.h>
|
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
|
|
|
#include <DataTypes/DataTypeDateTime.h>
|
|
|
|
#include <DataTypes/DataTypeDate.h>
|
|
|
|
#include <DataStreams/OneBlockInputStream.h>
|
|
|
|
#include <Storages/StorageMergeTree.h>
|
|
|
|
#include <Storages/StorageReplicatedMergeTree.h>
|
|
|
|
#include <Storages/VirtualColumnUtils.h>
|
|
|
|
#include <Databases/IDatabase.h>
|
|
|
|
#include <Parsers/queryToString.h>
|
2018-03-26 14:18:04 +00:00
|
|
|
#include <Parsers/ASTIdentifier.h>
|
2017-12-12 15:54:03 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
bool StorageSystemPartsBase::hasStateColumn(const Names & column_names)
|
|
|
|
{
|
|
|
|
bool has_state_column = false;
|
|
|
|
Names real_column_names;
|
|
|
|
|
|
|
|
for (const String & column_name : column_names)
|
|
|
|
{
|
|
|
|
if (column_name == "_state")
|
|
|
|
has_state_column = true;
|
|
|
|
else
|
|
|
|
real_column_names.emplace_back(column_name);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Do not check if only _state column is requested
|
|
|
|
if (!(has_state_column && real_column_names.empty()))
|
|
|
|
check(real_column_names);
|
|
|
|
|
|
|
|
return has_state_column;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
class StoragesInfoStream
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
StoragesInfoStream(const SelectQueryInfo & query_info, const Context & context, bool has_state_column)
|
|
|
|
: has_state_column(has_state_column)
|
|
|
|
{
|
|
|
|
/// Will apply WHERE to subset of columns and then add more columns.
|
|
|
|
/// This is kind of complicated, but we use WHERE to do less work.
|
|
|
|
|
|
|
|
Block block_to_filter;
|
|
|
|
|
2017-12-28 18:20:53 +00:00
|
|
|
MutableColumnPtr table_column_mut = ColumnString::create();
|
|
|
|
MutableColumnPtr engine_column_mut = ColumnString::create();
|
|
|
|
MutableColumnPtr active_column_mut = ColumnUInt8::create();
|
2017-12-12 15:54:03 +00:00
|
|
|
|
|
|
|
{
|
|
|
|
Databases databases = context.getDatabases();
|
|
|
|
|
|
|
|
/// Add column 'database'.
|
2017-12-28 18:20:53 +00:00
|
|
|
MutableColumnPtr database_column_mut = ColumnString::create();
|
2017-12-12 15:54:03 +00:00
|
|
|
for (const auto & database : databases)
|
2018-08-13 09:11:58 +00:00
|
|
|
{
|
2018-08-14 02:18:57 +00:00
|
|
|
if (context.hasDatabaseAccessRights(database.first))
|
2018-08-13 09:11:58 +00:00
|
|
|
database_column_mut->insert(database.first);
|
|
|
|
}
|
2017-12-12 15:54:03 +00:00
|
|
|
block_to_filter.insert(ColumnWithTypeAndName(
|
2017-12-28 18:20:53 +00:00
|
|
|
std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));
|
2017-12-12 15:54:03 +00:00
|
|
|
|
|
|
|
/// Filter block_to_filter with column 'database'.
|
|
|
|
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
|
|
|
|
rows = block_to_filter.rows();
|
|
|
|
|
|
|
|
/// Block contains new columns, update database_column.
|
2017-12-28 18:20:53 +00:00
|
|
|
ColumnPtr database_column = block_to_filter.getByName("database").column;
|
2017-12-12 15:54:03 +00:00
|
|
|
|
|
|
|
if (rows)
|
|
|
|
{
|
|
|
|
/// Add columns 'table', 'engine', 'active'
|
|
|
|
|
2017-12-28 18:20:53 +00:00
|
|
|
IColumn::Offsets offsets(rows);
|
2017-12-12 15:54:03 +00:00
|
|
|
|
|
|
|
for (size_t i = 0; i < rows; ++i)
|
|
|
|
{
|
2017-12-28 18:20:53 +00:00
|
|
|
String database_name = (*database_column)[i].get<String>();
|
2017-12-12 15:54:03 +00:00
|
|
|
const DatabasePtr database = databases.at(database_name);
|
|
|
|
|
|
|
|
offsets[i] = i ? offsets[i - 1] : 0;
|
|
|
|
for (auto iterator = database->getIterator(context); iterator->isValid(); iterator->next())
|
|
|
|
{
|
|
|
|
String table_name = iterator->name();
|
|
|
|
StoragePtr storage = iterator->table();
|
|
|
|
String engine_name = storage->getName();
|
|
|
|
|
|
|
|
if (!dynamic_cast<StorageMergeTree *>(&*storage) &&
|
|
|
|
!dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
|
|
|
|
continue;
|
|
|
|
|
|
|
|
storages[std::make_pair(database_name, iterator->name())] = storage;
|
|
|
|
|
|
|
|
/// Add all combinations of flag 'active'.
|
|
|
|
for (UInt64 active : {0, 1})
|
|
|
|
{
|
2017-12-28 18:20:53 +00:00
|
|
|
table_column_mut->insert(table_name);
|
|
|
|
engine_column_mut->insert(engine_name);
|
|
|
|
active_column_mut->insert(active);
|
2017-12-12 15:54:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
offsets[i] += 2;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (size_t i = 0; i < block_to_filter.columns(); ++i)
|
|
|
|
{
|
|
|
|
ColumnPtr & column = block_to_filter.safeGetByPosition(i).column;
|
|
|
|
column = column->replicate(offsets);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-03-10 00:26:17 +00:00
|
|
|
block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared<DataTypeString>(), "table"));
|
|
|
|
block_to_filter.insert(ColumnWithTypeAndName(std::move(engine_column_mut), std::make_shared<DataTypeString>(), "engine"));
|
|
|
|
block_to_filter.insert(ColumnWithTypeAndName(std::move(active_column_mut), std::make_shared<DataTypeUInt8>(), "active"));
|
|
|
|
|
2017-12-12 15:54:03 +00:00
|
|
|
if (rows)
|
|
|
|
{
|
|
|
|
/// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'.
|
|
|
|
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
|
|
|
|
rows = block_to_filter.rows();
|
|
|
|
}
|
|
|
|
|
|
|
|
database_column = block_to_filter.getByName("database").column;
|
|
|
|
table_column = block_to_filter.getByName("table").column;
|
|
|
|
active_column = block_to_filter.getByName("active").column;
|
|
|
|
|
|
|
|
next_row = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
StorageSystemPartsBase::StoragesInfo next()
|
|
|
|
{
|
|
|
|
StorageSystemPartsBase::StoragesInfo info;
|
|
|
|
info.storage = nullptr;
|
|
|
|
|
|
|
|
while (next_row < rows)
|
|
|
|
{
|
|
|
|
|
|
|
|
info.database = (*database_column)[next_row].get<String>();
|
|
|
|
info.table = (*table_column)[next_row].get<String>();
|
|
|
|
|
|
|
|
auto isSameTable = [& info, this] (size_t next_row) -> bool
|
|
|
|
{
|
|
|
|
return (*database_column)[next_row].get<String>() == info.database &&
|
|
|
|
(*table_column)[next_row].get<String>() == info.table;
|
|
|
|
};
|
|
|
|
|
|
|
|
/// What 'active' value we need.
|
|
|
|
bool need[2]{}; /// [active]
|
|
|
|
for (; next_row < rows && isSameTable(next_row); ++next_row)
|
|
|
|
{
|
|
|
|
bool active = (*active_column)[next_row].get<UInt64>() != 0;
|
|
|
|
need[active] = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
info.storage = storages.at(std::make_pair(info.database, info.table));
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2018-03-26 14:18:04 +00:00
|
|
|
/// For table not to be dropped and set of columns to remain constant.
|
2017-12-12 15:54:03 +00:00
|
|
|
info.table_lock = info.storage->lockStructure(false, __PRETTY_FUNCTION__);
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
/** There are case when IStorage::drop was called,
|
|
|
|
* but we still own the object.
|
|
|
|
* Then table will throw exception at attempt to lock it.
|
|
|
|
* Just skip the table.
|
|
|
|
*/
|
|
|
|
if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
|
|
|
|
continue;
|
|
|
|
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
|
|
|
info.engine = info.storage->getName();
|
|
|
|
|
|
|
|
info.data = nullptr;
|
|
|
|
|
|
|
|
if (auto merge_tree = dynamic_cast<StorageMergeTree *>(&*info.storage))
|
|
|
|
{
|
|
|
|
info.data = &merge_tree->getData();
|
|
|
|
}
|
|
|
|
else if (auto replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(&*info.storage))
|
|
|
|
{
|
|
|
|
info.data = &replicated_merge_tree->getData();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
throw Exception("Unknown engine " + info.engine, ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
|
|
|
using State = MergeTreeDataPart::State;
|
|
|
|
auto & all_parts_state = info.all_parts_state;
|
|
|
|
auto & all_parts = info.all_parts;
|
|
|
|
|
|
|
|
if (need[0])
|
|
|
|
{
|
|
|
|
/// If has_state_column is requested, return all states.
|
|
|
|
if (!has_state_column)
|
|
|
|
all_parts = info.data->getDataPartsVector({State::Committed, State::Outdated}, &all_parts_state);
|
|
|
|
else
|
|
|
|
all_parts = info.data->getAllDataPartsVector(&all_parts_state);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
all_parts = info.data->getDataPartsVector({State::Committed}, &all_parts_state);
|
|
|
|
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
return info;
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
bool has_state_column;
|
|
|
|
|
|
|
|
ColumnPtr database_column;
|
|
|
|
ColumnPtr table_column;
|
|
|
|
ColumnPtr active_column;
|
|
|
|
|
|
|
|
size_t next_row;
|
|
|
|
size_t rows;
|
|
|
|
|
|
|
|
using StoragesMap = std::map<std::pair<String, String>, StoragePtr>;
|
|
|
|
StoragesMap storages;
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
BlockInputStreams StorageSystemPartsBase::read(
|
|
|
|
const Names & column_names,
|
|
|
|
const SelectQueryInfo & query_info,
|
|
|
|
const Context & context,
|
2018-04-19 15:18:26 +00:00
|
|
|
QueryProcessingStage::Enum processed_stage,
|
2017-12-12 15:54:03 +00:00
|
|
|
const size_t /*max_block_size*/,
|
|
|
|
const unsigned /*num_streams*/)
|
|
|
|
{
|
|
|
|
bool has_state_column = hasStateColumn(column_names);
|
2018-04-19 15:18:26 +00:00
|
|
|
checkQueryProcessingStage(processed_stage, context);
|
2017-12-12 15:54:03 +00:00
|
|
|
|
|
|
|
StoragesInfoStream stream(query_info, context, has_state_column);
|
|
|
|
|
2017-12-28 18:20:53 +00:00
|
|
|
/// Create the result.
|
2017-12-12 15:54:03 +00:00
|
|
|
|
2018-03-06 20:18:34 +00:00
|
|
|
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
|
2017-12-12 15:54:03 +00:00
|
|
|
if (has_state_column)
|
2018-03-06 20:18:34 +00:00
|
|
|
res_columns.push_back(ColumnString::create());
|
2017-12-12 15:54:03 +00:00
|
|
|
|
|
|
|
while (StoragesInfo info = stream.next())
|
|
|
|
{
|
2018-03-06 20:18:34 +00:00
|
|
|
processNextStorage(res_columns, info, has_state_column);
|
2017-12-12 15:54:03 +00:00
|
|
|
}
|
|
|
|
|
2017-12-28 18:20:53 +00:00
|
|
|
Block block = getSampleBlock();
|
|
|
|
if (has_state_column)
|
|
|
|
block.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state"));
|
|
|
|
|
2018-03-06 20:18:34 +00:00
|
|
|
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(block.cloneWithColumns(std::move(res_columns))));
|
2017-12-12 15:54:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
NameAndTypePair StorageSystemPartsBase::getColumn(const String & column_name) const
|
|
|
|
{
|
|
|
|
if (column_name == "_state")
|
|
|
|
return NameAndTypePair("_state", std::make_shared<DataTypeString>());
|
|
|
|
|
|
|
|
return ITableDeclaration::getColumn(column_name);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool StorageSystemPartsBase::hasColumn(const String & column_name) const
|
|
|
|
{
|
|
|
|
if (column_name == "_state")
|
|
|
|
return true;
|
|
|
|
|
|
|
|
return ITableDeclaration::hasColumn(column_name);
|
|
|
|
}
|
|
|
|
|
2018-01-25 14:42:39 +00:00
|
|
|
StorageSystemPartsBase::StorageSystemPartsBase(std::string name_, NamesAndTypesList && columns_)
|
|
|
|
: name(std::move(name_))
|
|
|
|
{
|
2018-03-26 14:18:04 +00:00
|
|
|
NamesAndTypesList aliases;
|
|
|
|
ColumnDefaults defaults;
|
|
|
|
auto add_alias = [&](const String & alias_name, const String & column_name)
|
|
|
|
{
|
|
|
|
DataTypePtr type;
|
|
|
|
for (const NameAndTypePair & col : columns_)
|
|
|
|
{
|
|
|
|
if (col.name == column_name)
|
|
|
|
{
|
|
|
|
type = col.type;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (!type)
|
|
|
|
throw Exception("No column " + column_name + " in table system." + name, ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
aliases.push_back({alias_name, type});
|
|
|
|
defaults[alias_name] = ColumnDefault{ColumnDefaultKind::Alias, std::make_shared<ASTIdentifier>(column_name)};
|
|
|
|
};
|
|
|
|
|
|
|
|
/// Add aliases for old column names for backwards compatibility.
|
|
|
|
add_alias("bytes", "bytes_on_disk");
|
|
|
|
add_alias("marks_size", "marks_bytes");
|
|
|
|
|
|
|
|
setColumns(ColumnsDescription(std::move(columns_), {}, std::move(aliases), std::move(defaults)));
|
2018-01-25 14:42:39 +00:00
|
|
|
}
|
|
|
|
|
2017-12-12 15:54:03 +00:00
|
|
|
}
|