mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Support for max_block_size in system.tables and system.columns tables #2447
This commit is contained in:
parent
046137f9c5
commit
068e50329e
@ -1,3 +1,4 @@
|
||||
#include <optional>
|
||||
#include <Storages/System/StorageSystemColumns.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/StorageMergeTree.h>
|
||||
@ -6,18 +7,27 @@
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
#include <DataStreams/NullBlockInputStream.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
NamesAndTypesList StorageSystemColumns::getNamesAndTypes()
|
||||
namespace ErrorCodes
|
||||
{
|
||||
return {
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
StorageSystemColumns::StorageSystemColumns(const std::string & name_)
|
||||
: name(name_)
|
||||
{
|
||||
setColumns(ColumnsDescription(
|
||||
{
|
||||
{ "database", std::make_shared<DataTypeString>() },
|
||||
{ "table", std::make_shared<DataTypeString>() },
|
||||
{ "name", std::make_shared<DataTypeString>() },
|
||||
@ -27,14 +37,207 @@ NamesAndTypesList StorageSystemColumns::getNamesAndTypes()
|
||||
{ "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
|
||||
{ "data_uncompressed_bytes", std::make_shared<DataTypeUInt64>() },
|
||||
{ "marks_bytes", std::make_shared<DataTypeUInt64>() },
|
||||
};
|
||||
}));
|
||||
}
|
||||
|
||||
void StorageSystemColumns::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const
|
||||
{
|
||||
Block block_to_filter;
|
||||
|
||||
std::map<std::pair<std::string, std::string>, StoragePtr> storages;
|
||||
namespace
|
||||
{
|
||||
using Storages = std::map<std::pair<std::string, std::string>, StoragePtr>;
|
||||
}
|
||||
|
||||
|
||||
class ColumnsBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
ColumnsBlockInputStream(
|
||||
const std::vector<UInt8> & columns_mask,
|
||||
const Block & header,
|
||||
size_t max_block_size,
|
||||
ColumnPtr databases,
|
||||
ColumnPtr tables,
|
||||
Storages storages,
|
||||
const Context & context)
|
||||
: columns_mask(columns_mask), header(header), max_block_size(max_block_size),
|
||||
databases(databases), tables(tables), storages(std::move(storages)), total_tables(tables->size()), context(context) {}
|
||||
|
||||
String getName() const override { return "Columns"; }
|
||||
Block getHeader() const override { return header; }
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
{
|
||||
if (db_table_num >= total_tables)
|
||||
return {};
|
||||
|
||||
Block res = header;
|
||||
MutableColumns res_columns = header.cloneEmptyColumns();
|
||||
size_t rows_count = 0;
|
||||
|
||||
while (rows_count < max_block_size && db_table_num < total_tables)
|
||||
{
|
||||
const std::string database_name = (*databases)[db_table_num].get<std::string>();
|
||||
const std::string table_name = (*tables)[db_table_num].get<std::string>();
|
||||
++db_table_num;
|
||||
|
||||
NamesAndTypesList columns;
|
||||
ColumnDefaults column_defaults;
|
||||
MergeTreeData::ColumnSizeByName column_sizes;
|
||||
|
||||
{
|
||||
StoragePtr storage = storages.at(std::make_pair(database_name, table_name));
|
||||
TableStructureReadLockPtr table_lock;
|
||||
|
||||
try
|
||||
{
|
||||
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/** There are case when IStorage::drop was called,
|
||||
* but we still own the object.
|
||||
* Then table will throw exception at attempt to lock it.
|
||||
* Just skip the table.
|
||||
*/
|
||||
if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
|
||||
continue;
|
||||
else
|
||||
throw;
|
||||
}
|
||||
|
||||
columns = storage->getColumns().getAll();
|
||||
column_defaults = storage->getColumns().defaults;
|
||||
|
||||
/** Info about sizes of columns for tables of MergeTree family.
|
||||
* NOTE: It is possible to add getter for this info to IStorage interface.
|
||||
*/
|
||||
if (auto storage_concrete_plain = dynamic_cast<StorageMergeTree *>(storage.get()))
|
||||
{
|
||||
column_sizes = storage_concrete_plain->getData().getColumnSizes();
|
||||
}
|
||||
else if (auto storage_concrete_replicated = dynamic_cast<StorageReplicatedMergeTree *>(storage.get()))
|
||||
{
|
||||
column_sizes = storage_concrete_replicated->getData().getColumnSizes();
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & column : columns)
|
||||
{
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database_name);
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table_name);
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(column.name);
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(column.type->getName());
|
||||
|
||||
{
|
||||
const auto it = column_defaults.find(column.name);
|
||||
if (it == std::end(column_defaults))
|
||||
{
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(toString(it->second.kind));
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(queryToString(it->second.expression));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
const auto it = column_sizes.find(column.name);
|
||||
if (it == std::end(column_sizes))
|
||||
{
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(static_cast<UInt64>(it->second.data_compressed));
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(static_cast<UInt64>(it->second.data_uncompressed));
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(static_cast<UInt64>(it->second.marks));
|
||||
}
|
||||
}
|
||||
|
||||
++rows_count;
|
||||
}
|
||||
}
|
||||
|
||||
res.setColumns(std::move(res_columns));
|
||||
return res;
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<UInt8> columns_mask;
|
||||
Block header;
|
||||
size_t max_block_size;
|
||||
ColumnPtr databases;
|
||||
ColumnPtr tables;
|
||||
Storages storages;
|
||||
size_t db_table_num = 0;
|
||||
size_t total_tables;
|
||||
const Context context;
|
||||
};
|
||||
|
||||
|
||||
BlockInputStreams StorageSystemColumns::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t max_block_size,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
check(column_names);
|
||||
|
||||
/// Create a mask of what columns are needed in the result.
|
||||
|
||||
NameSet names_set(column_names.begin(), column_names.end());
|
||||
|
||||
Block sample_block = getSampleBlock();
|
||||
Block res_block;
|
||||
|
||||
std::vector<UInt8> columns_mask(sample_block.columns());
|
||||
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
|
||||
{
|
||||
if (names_set.count(sample_block.getByPosition(i).name))
|
||||
{
|
||||
columns_mask[i] = 1;
|
||||
res_block.insert(sample_block.getByPosition(i));
|
||||
}
|
||||
}
|
||||
|
||||
/// Whe should exit quickly in case of LIMIT. This helps when we have extraordinarily huge number of tables.
|
||||
std::optional<UInt64> limit;
|
||||
{
|
||||
const ASTSelectQuery * select = typeid_cast<const ASTSelectQuery *>(query_info.query.get());
|
||||
if (!select)
|
||||
throw Exception("Logical error: not a SELECT query in StorageSystemColumns::read method", ErrorCodes::LOGICAL_ERROR);
|
||||
if (select->limit_length)
|
||||
limit = typeid_cast<const ASTLiteral &>(*select->limit_length).value.get<UInt64>();
|
||||
if (select->limit_offset)
|
||||
*limit += typeid_cast<const ASTLiteral &>(*select->limit_offset).value.get<UInt64>();
|
||||
}
|
||||
|
||||
Block block_to_filter;
|
||||
Storages storages;
|
||||
|
||||
{
|
||||
Databases databases = context.getDatabases();
|
||||
@ -53,9 +256,9 @@ void StorageSystemColumns::fillData(MutableColumns & res_columns, const Context
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
|
||||
|
||||
if (!block_to_filter.rows())
|
||||
return;
|
||||
return {std::make_shared<NullBlockInputStream>(res_block)};
|
||||
|
||||
ColumnPtr database_column = block_to_filter.getByName("database").column;
|
||||
ColumnPtr & database_column = block_to_filter.getByName("database").column;
|
||||
size_t rows = database_column->size();
|
||||
|
||||
/// Add `table` column.
|
||||
@ -74,16 +277,21 @@ void StorageSystemColumns::fillData(MutableColumns & res_columns, const Context
|
||||
std::forward_as_tuple(database_name, table_name),
|
||||
std::forward_as_tuple(iterator->table()));
|
||||
table_column_mut->insert(table_name);
|
||||
offsets[i] += 1;
|
||||
++offsets[i];
|
||||
|
||||
if (limit && offsets[i] >= *limit)
|
||||
break;
|
||||
}
|
||||
|
||||
if (limit && offsets[i] >= *limit)
|
||||
{
|
||||
offsets.resize(i);
|
||||
database_column = database_column->cut(0, i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < block_to_filter.columns(); ++i)
|
||||
{
|
||||
ColumnPtr & column = block_to_filter.safeGetByPosition(i).column;
|
||||
column = column->replicate(offsets);
|
||||
}
|
||||
|
||||
database_column = database_column->replicate(offsets);
|
||||
block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared<DataTypeString>(), "table"));
|
||||
}
|
||||
|
||||
@ -91,99 +299,14 @@ void StorageSystemColumns::fillData(MutableColumns & res_columns, const Context
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
|
||||
|
||||
if (!block_to_filter.rows())
|
||||
return;
|
||||
return {std::make_shared<NullBlockInputStream>(res_block)};
|
||||
|
||||
ColumnPtr filtered_database_column = block_to_filter.getByName("database").column;
|
||||
ColumnPtr filtered_table_column = block_to_filter.getByName("table").column;
|
||||
|
||||
/// We compose the result.
|
||||
size_t rows = filtered_database_column->size();
|
||||
for (size_t row_no = 0; row_no < rows; ++row_no)
|
||||
{
|
||||
const std::string database_name = (*filtered_database_column)[row_no].get<std::string>();
|
||||
const std::string table_name = (*filtered_table_column)[row_no].get<std::string>();
|
||||
|
||||
NamesAndTypesList columns;
|
||||
ColumnDefaults column_defaults;
|
||||
MergeTreeData::ColumnSizeByName column_sizes;
|
||||
|
||||
{
|
||||
StoragePtr storage = storages.at(std::make_pair(database_name, table_name));
|
||||
TableStructureReadLockPtr table_lock;
|
||||
|
||||
try
|
||||
{
|
||||
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/** There are case when IStorage::drop was called,
|
||||
* but we still own the object.
|
||||
* Then table will throw exception at attempt to lock it.
|
||||
* Just skip the table.
|
||||
*/
|
||||
if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
|
||||
continue;
|
||||
else
|
||||
throw;
|
||||
}
|
||||
|
||||
columns = storage->getColumns().getAll();
|
||||
column_defaults = storage->getColumns().defaults;
|
||||
|
||||
/** Info about sizes of columns for tables of MergeTree family.
|
||||
* NOTE: It is possible to add getter for this info to IStorage interface.
|
||||
*/
|
||||
if (auto storage_concrete_plain = dynamic_cast<StorageMergeTree *>(storage.get()))
|
||||
{
|
||||
column_sizes = storage_concrete_plain->getData().getColumnSizes();
|
||||
}
|
||||
else if (auto storage_concrete_replicated = dynamic_cast<StorageReplicatedMergeTree *>(storage.get()))
|
||||
{
|
||||
column_sizes = storage_concrete_replicated->getData().getColumnSizes();
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & column : columns)
|
||||
{
|
||||
size_t i = 0;
|
||||
|
||||
res_columns[i++]->insert(database_name);
|
||||
res_columns[i++]->insert(table_name);
|
||||
res_columns[i++]->insert(column.name);
|
||||
res_columns[i++]->insert(column.type->getName());
|
||||
|
||||
{
|
||||
const auto it = column_defaults.find(column.name);
|
||||
if (it == std::end(column_defaults))
|
||||
{
|
||||
res_columns[i++]->insertDefault();
|
||||
res_columns[i++]->insertDefault();
|
||||
}
|
||||
else
|
||||
{
|
||||
res_columns[i++]->insert(toString(it->second.kind));
|
||||
res_columns[i++]->insert(queryToString(it->second.expression));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
const auto it = column_sizes.find(column.name);
|
||||
if (it == std::end(column_sizes))
|
||||
{
|
||||
res_columns[i++]->insertDefault();
|
||||
res_columns[i++]->insertDefault();
|
||||
res_columns[i++]->insertDefault();
|
||||
}
|
||||
else
|
||||
{
|
||||
res_columns[i++]->insert(static_cast<UInt64>(it->second.data_compressed));
|
||||
res_columns[i++]->insert(static_cast<UInt64>(it->second.data_uncompressed));
|
||||
res_columns[i++]->insert(static_cast<UInt64>(it->second.marks));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return {std::make_shared<ColumnsBlockInputStream>(
|
||||
std::move(columns_mask), std::move(res_block), max_block_size,
|
||||
std::move(filtered_database_column), std::move(filtered_table_column), std::move(storages), context)};
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -11,17 +11,26 @@ class Context;
|
||||
|
||||
/** Implements system table 'columns', that allows to get information about columns for every table.
|
||||
*/
|
||||
class StorageSystemColumns : public ext::shared_ptr_helper<StorageSystemColumns>, public IStorageSystemOneBlock<StorageSystemColumns>
|
||||
class StorageSystemColumns : public ext::shared_ptr_helper<StorageSystemColumns>, public IStorage
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemColumns"; }
|
||||
|
||||
static NamesAndTypesList getNamesAndTypes();
|
||||
std::string getTableName() const override { return name; }
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
private:
|
||||
const std::string name;
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
|
||||
StorageSystemColumns(const std::string & name_);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -56,16 +56,213 @@ static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & cont
|
||||
}
|
||||
|
||||
|
||||
class TablesBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
TablesBlockInputStream(
|
||||
std::vector<UInt8> columns_mask,
|
||||
Block header,
|
||||
size_t max_block_size,
|
||||
ColumnPtr databases,
|
||||
const Context & context)
|
||||
: columns_mask(columns_mask), header(header), max_block_size(max_block_size), databases(std::move(databases)), context(context) {}
|
||||
|
||||
String getName() const override { return "Tables"; }
|
||||
Block getHeader() const override { return header; }
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
{
|
||||
if (done)
|
||||
return {};
|
||||
|
||||
Block res = header;
|
||||
MutableColumns res_columns = header.cloneEmptyColumns();
|
||||
|
||||
size_t rows_count = 0;
|
||||
while (rows_count < max_block_size)
|
||||
{
|
||||
if (tables_it && !tables_it->isValid())
|
||||
++database_idx;
|
||||
|
||||
while (database_idx < databases->size() && (!tables_it || !tables_it->isValid()))
|
||||
{
|
||||
database_name = databases->getDataAt(database_idx).toString();
|
||||
database = context.tryGetDatabase(database_name);
|
||||
|
||||
if (!database || !context.hasDatabaseAccessRights(database_name))
|
||||
{
|
||||
/// Database was deleted just now or the user has no access.
|
||||
++database_idx;
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
/// This is for temporary tables. They are output in single block regardless to max_block_size.
|
||||
if (database_idx >= databases->size())
|
||||
{
|
||||
if (context.hasSessionContext())
|
||||
{
|
||||
Tables external_tables = context.getSessionContext().getExternalTables();
|
||||
|
||||
for (auto table : external_tables)
|
||||
{
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.first);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.second->getName());
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(UInt64(1));
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.second->getName());
|
||||
}
|
||||
}
|
||||
|
||||
res.setColumns(std::move(res_columns));
|
||||
done = true;
|
||||
return res;
|
||||
}
|
||||
|
||||
if (!tables_it || !tables_it->isValid())
|
||||
tables_it = database->getIterator(context);
|
||||
|
||||
for (; rows_count < max_block_size && tables_it->isValid(); tables_it->next())
|
||||
{
|
||||
++rows_count;
|
||||
auto table_name = tables_it->name();
|
||||
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database_name);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table_name);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(tables_it->table()->getName());
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(UInt64(0));
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(tables_it->table()->getDataPath());
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database->getTableMetadataPath(table_name));
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(static_cast<UInt64>(database->getTableMetadataModificationTime(context, table_name)));
|
||||
|
||||
{
|
||||
Array dependencies_table_name_array;
|
||||
Array dependencies_database_name_array;
|
||||
if (columns_mask[src_index] || columns_mask[src_index + 1])
|
||||
{
|
||||
const auto dependencies = context.getDependencies(database_name, table_name);
|
||||
|
||||
dependencies_table_name_array.reserve(dependencies.size());
|
||||
dependencies_database_name_array.reserve(dependencies.size());
|
||||
for (const auto & dependency : dependencies)
|
||||
{
|
||||
dependencies_table_name_array.push_back(dependency.second);
|
||||
dependencies_database_name_array.push_back(dependency.first);
|
||||
}
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependencies_database_name_array);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependencies_table_name_array);
|
||||
}
|
||||
|
||||
if (columns_mask[src_index] || columns_mask[src_index + 1])
|
||||
{
|
||||
ASTPtr ast = database->tryGetCreateTableQuery(context, table_name);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(ast ? queryToString(ast) : "");
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
String engine_full;
|
||||
|
||||
if (ast)
|
||||
{
|
||||
const ASTCreateQuery & ast_create = typeid_cast<const ASTCreateQuery &>(*ast);
|
||||
if (ast_create.storage)
|
||||
{
|
||||
engine_full = queryToString(*ast_create.storage);
|
||||
|
||||
static const char * const extra_head = " ENGINE = ";
|
||||
if (startsWith(engine_full, extra_head))
|
||||
engine_full = engine_full.substr(strlen(extra_head));
|
||||
}
|
||||
}
|
||||
|
||||
res_columns[res_index++]->insert(engine_full);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res.setColumns(std::move(res_columns));
|
||||
return res;
|
||||
}
|
||||
private:
|
||||
std::vector<UInt8> columns_mask;
|
||||
Block header;
|
||||
size_t max_block_size;
|
||||
ColumnPtr databases;
|
||||
size_t database_idx = 0;
|
||||
DatabaseIteratorPtr tables_it;
|
||||
const Context context;
|
||||
bool done = false;
|
||||
DatabasePtr database;
|
||||
std::string database_name;
|
||||
};
|
||||
|
||||
|
||||
BlockInputStreams StorageSystemTables::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t /*max_block_size*/,
|
||||
const size_t max_block_size,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
check(column_names);
|
||||
|
||||
/// Create a mask of what columns are needed in the result.
|
||||
@ -85,151 +282,9 @@ BlockInputStreams StorageSystemTables::read(
|
||||
}
|
||||
}
|
||||
|
||||
MutableColumns res_columns = res_block.cloneEmptyColumns();
|
||||
|
||||
ColumnPtr filtered_databases_column = getFilteredDatabases(query_info.query, context);
|
||||
|
||||
for (size_t row_number = 0; row_number < filtered_databases_column->size(); ++row_number)
|
||||
{
|
||||
std::string database_name = filtered_databases_column->getDataAt(row_number).toString();
|
||||
|
||||
auto database = context.tryGetDatabase(database_name);
|
||||
|
||||
if (!database || !context.hasDatabaseAccessRights(database_name))
|
||||
{
|
||||
/// Database was deleted just now.
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto iterator = database->getIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
auto table_name = iterator->name();
|
||||
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database_name);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table_name);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(iterator->table()->getName());
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(UInt64(0));
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(iterator->table()->getDataPath());
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database->getTableMetadataPath(table_name));
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(static_cast<UInt64>(database->getTableMetadataModificationTime(context, table_name)));
|
||||
|
||||
{
|
||||
Array dependencies_table_name_array;
|
||||
Array dependencies_database_name_array;
|
||||
if (columns_mask[src_index] || columns_mask[src_index + 1])
|
||||
{
|
||||
const auto dependencies = context.getDependencies(database_name, table_name);
|
||||
|
||||
dependencies_table_name_array.reserve(dependencies.size());
|
||||
dependencies_database_name_array.reserve(dependencies.size());
|
||||
for (const auto & dependency : dependencies)
|
||||
{
|
||||
dependencies_table_name_array.push_back(dependency.second);
|
||||
dependencies_database_name_array.push_back(dependency.first);
|
||||
}
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependencies_database_name_array);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependencies_table_name_array);
|
||||
}
|
||||
|
||||
if (columns_mask[src_index] || columns_mask[src_index + 1])
|
||||
{
|
||||
ASTPtr ast = database->tryGetCreateTableQuery(context, table_name);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(ast ? queryToString(ast) : "");
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
String engine_full;
|
||||
|
||||
if (ast)
|
||||
{
|
||||
const ASTCreateQuery & ast_create = typeid_cast<const ASTCreateQuery &>(*ast);
|
||||
if (ast_create.storage)
|
||||
{
|
||||
engine_full = queryToString(*ast_create.storage);
|
||||
|
||||
static const char * const extra_head = " ENGINE = ";
|
||||
if (startsWith(engine_full, extra_head))
|
||||
engine_full = engine_full.substr(strlen(extra_head));
|
||||
}
|
||||
}
|
||||
|
||||
res_columns[res_index++]->insert(engine_full);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is for temporary tables.
|
||||
|
||||
if (context.hasSessionContext())
|
||||
{
|
||||
Tables external_tables = context.getSessionContext().getExternalTables();
|
||||
|
||||
for (auto table : external_tables)
|
||||
{
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.first);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.second->getName());
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(UInt64(1));
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.second->getName());
|
||||
}
|
||||
}
|
||||
|
||||
res_block.setColumns(std::move(res_columns));
|
||||
return {std::make_shared<OneBlockInputStream>(res_block)};
|
||||
return {std::make_shared<TablesBlockInputStream>(
|
||||
std::move(columns_mask), std::move(res_block), max_block_size, std::move(filtered_databases_column), context)};
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user