#include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; } StorageSystemColumns::StorageSystemColumns(const std::string & name_) : name(name_) { setColumns(ColumnsDescription( { { "database", std::make_shared() }, { "table", std::make_shared() }, { "name", std::make_shared() }, { "type", std::make_shared() }, { "default_kind", std::make_shared() }, { "default_expression", std::make_shared() }, { "data_compressed_bytes", std::make_shared() }, { "data_uncompressed_bytes", std::make_shared() }, { "marks_bytes", std::make_shared() }, { "comment", std::make_shared() }, { "is_in_partition_key", std::make_shared() }, { "is_in_sorting_key", std::make_shared() }, { "is_in_primary_key", std::make_shared() }, { "is_in_sampling_key", std::make_shared() }, { "compression_codec", std::make_shared() }, })); } namespace { using Storages = std::map, StoragePtr>; } class ColumnsBlockInputStream : public IBlockInputStream { public: ColumnsBlockInputStream( const std::vector & columns_mask, const Block & header, UInt64 max_block_size, ColumnPtr databases, ColumnPtr tables, Storages storages, String query_id_) : columns_mask(columns_mask), header(header), max_block_size(max_block_size) , databases(databases), tables(tables), storages(std::move(storages)) , query_id(std::move(query_id_)), total_tables(tables->size()) { } String getName() const override { return "Columns"; } Block getHeader() const override { return header; } protected: Block readImpl() override { if (db_table_num >= total_tables) return {}; Block res = header; MutableColumns res_columns = header.cloneEmptyColumns(); size_t rows_count = 0; while (rows_count < max_block_size && db_table_num < total_tables) { const std::string database_name = (*databases)[db_table_num].get(); const std::string table_name = (*tables)[db_table_num].get(); ++db_table_num; NamesAndTypesList columns; ColumnDefaults column_defaults; ColumnComments column_comments; ColumnCodecs column_codecs; Names cols_required_for_partition_key; Names cols_required_for_sorting_key; Names cols_required_for_primary_key; Names cols_required_for_sampling; MergeTreeData::ColumnSizeByName column_sizes; { StoragePtr storage = storages.at(std::make_pair(database_name, table_name)); TableStructureReadLockPtr table_lock; try { table_lock = storage->lockStructure(false, query_id); } catch (const Exception & e) { /** There are case when IStorage::drop was called, * but we still own the object. * Then table will throw exception at attempt to lock it. * Just skip the table. */ if (e.code() == ErrorCodes::TABLE_IS_DROPPED) continue; else throw; } columns = storage->getColumns().getAll(); column_codecs = storage->getColumns().codecs; column_defaults = storage->getColumns().defaults; column_comments = storage->getColumns().comments; cols_required_for_partition_key = storage->getColumnsRequiredForPartitionKey(); cols_required_for_sorting_key = storage->getColumnsRequiredForSortingKey(); cols_required_for_primary_key = storage->getColumnsRequiredForPrimaryKey(); cols_required_for_sampling = storage->getColumnsRequiredForSampling(); /** Info about sizes of columns for tables of MergeTree family. * NOTE: It is possible to add getter for this info to IStorage interface. */ if (auto storage_concrete_plain = dynamic_cast(storage.get())) { column_sizes = storage_concrete_plain->getData().getColumnSizes(); } else if (auto storage_concrete_replicated = dynamic_cast(storage.get())) { column_sizes = storage_concrete_replicated->getData().getColumnSizes(); } } for (const auto & column : columns) { size_t src_index = 0; size_t res_index = 0; if (columns_mask[src_index++]) res_columns[res_index++]->insert(database_name); if (columns_mask[src_index++]) res_columns[res_index++]->insert(table_name); if (columns_mask[src_index++]) res_columns[res_index++]->insert(column.name); if (columns_mask[src_index++]) res_columns[res_index++]->insert(column.type->getName()); { const auto it = column_defaults.find(column.name); if (it == std::end(column_defaults)) { if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); } else { if (columns_mask[src_index++]) res_columns[res_index++]->insert(toString(it->second.kind)); if (columns_mask[src_index++]) res_columns[res_index++]->insert(queryToString(it->second.expression)); } } { const auto it = column_sizes.find(column.name); if (it == std::end(column_sizes)) { if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); } else { if (columns_mask[src_index++]) res_columns[res_index++]->insert(it->second.data_compressed); if (columns_mask[src_index++]) res_columns[res_index++]->insert(it->second.data_uncompressed); if (columns_mask[src_index++]) res_columns[res_index++]->insert(it->second.marks); } } { const auto it = column_comments.find(column.name); if (it == std::end(column_comments)) { if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); } else { if (columns_mask[src_index++]) res_columns[res_index++]->insert(it->second); } } { auto find_in_vector = [&key = column.name](const Names& names) { return std::find(names.cbegin(), names.cend(), key) != names.end(); }; if (columns_mask[src_index++]) res_columns[res_index++]->insert(find_in_vector(cols_required_for_partition_key)); if (columns_mask[src_index++]) res_columns[res_index++]->insert(find_in_vector(cols_required_for_sorting_key)); if (columns_mask[src_index++]) res_columns[res_index++]->insert(find_in_vector(cols_required_for_primary_key)); if (columns_mask[src_index++]) res_columns[res_index++]->insert(find_in_vector(cols_required_for_sampling)); } { const auto it = column_codecs.find(column.name); if (it == std::end(column_codecs)) { if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); } else { if (columns_mask[src_index++]) res_columns[res_index++]->insert("CODEC(" + it->second->getCodecDesc() + ")"); } } ++rows_count; } } res.setColumns(std::move(res_columns)); return res; } private: std::vector columns_mask; Block header; UInt64 max_block_size; ColumnPtr databases; ColumnPtr tables; Storages storages; String query_id; size_t db_table_num = 0; size_t total_tables; }; BlockInputStreams StorageSystemColumns::read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum /*processed_stage*/, const size_t max_block_size, const unsigned /*num_streams*/) { check(column_names); /// Create a mask of what columns are needed in the result. NameSet names_set(column_names.begin(), column_names.end()); Block sample_block = getSampleBlock(); Block res_block; std::vector columns_mask(sample_block.columns()); for (size_t i = 0, size = columns_mask.size(); i < size; ++i) { if (names_set.count(sample_block.getByPosition(i).name)) { columns_mask[i] = 1; res_block.insert(sample_block.getByPosition(i)); } } Block block_to_filter; Storages storages; { Databases databases = context.getDatabases(); /// Add `database` column. MutableColumnPtr database_column_mut = ColumnString::create(); for (const auto & database : databases) { if (context.hasDatabaseAccessRights(database.first)) database_column_mut->insert(database.first); } block_to_filter.insert(ColumnWithTypeAndName(std::move(database_column_mut), std::make_shared(), "database")); /// Filter block with `database` column. VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context); if (!block_to_filter.rows()) return {std::make_shared(res_block)}; ColumnPtr & database_column = block_to_filter.getByName("database").column; size_t rows = database_column->size(); /// Add `table` column. MutableColumnPtr table_column_mut = ColumnString::create(); IColumn::Offsets offsets(rows); for (size_t i = 0; i < rows; ++i) { const std::string database_name = (*database_column)[i].get(); const DatabasePtr database = databases.at(database_name); offsets[i] = i ? offsets[i - 1] : 0; for (auto iterator = database->getIterator(context); iterator->isValid(); iterator->next()) { const String & table_name = iterator->name(); storages.emplace(std::piecewise_construct, std::forward_as_tuple(database_name, table_name), std::forward_as_tuple(iterator->table())); table_column_mut->insert(table_name); ++offsets[i]; } } database_column = database_column->replicate(offsets); block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared(), "table")); } /// Filter block with `database` and `table` columns. VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context); if (!block_to_filter.rows()) return {std::make_shared(res_block)}; ColumnPtr filtered_database_column = block_to_filter.getByName("database").column; ColumnPtr filtered_table_column = block_to_filter.getByName("table").column; return {std::make_shared( std::move(columns_mask), std::move(res_block), max_block_size, std::move(filtered_database_column), std::move(filtered_table_column), std::move(storages), context.getCurrentQueryId())}; } }