#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int CANNOT_GET_CREATE_TABLE_QUERY; extern const int TABLE_IS_DROPPED; } StorageSystemTables::StorageSystemTables(const std::string & name_) : name(name_) { setColumns(ColumnsDescription( { {"database", std::make_shared()}, {"name", std::make_shared()}, {"engine", std::make_shared()}, {"is_temporary", std::make_shared()}, {"data_path", std::make_shared()}, {"metadata_path", std::make_shared()}, {"metadata_modification_time", std::make_shared()}, {"dependencies_database", std::make_shared(std::make_shared())}, {"dependencies_table", std::make_shared(std::make_shared())}, {"create_table_query", std::make_shared()}, {"engine_full", std::make_shared()}, {"partition_key", std::make_shared()}, {"sorting_key", std::make_shared()}, {"primary_key", std::make_shared()}, {"sampling_key", std::make_shared()}, })); } static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & context) { MutableColumnPtr column = ColumnString::create(); for (const auto & db : context.getDatabases()) column->insert(db.first); Block block { ColumnWithTypeAndName(std::move(column), std::make_shared(), "database") }; VirtualColumnUtils::filterBlockWithQuery(query, block, context); return block.getByPosition(0).column; } class TablesBlockInputStream : public IBlockInputStream { public: TablesBlockInputStream( std::vector columns_mask_, Block header_, UInt64 max_block_size_, ColumnPtr databases_, const Context & context_) : columns_mask(std::move(columns_mask_)), header(std::move(header_)), max_block_size(max_block_size_), databases(std::move(databases_)), context(context_) {} String getName() const override { return "Tables"; } Block getHeader() const override { return header; } protected: Block readImpl() override { if (done) return {}; Block res = header; MutableColumns res_columns = header.cloneEmptyColumns(); size_t rows_count = 0; while (rows_count < max_block_size) { if (tables_it && !tables_it->isValid()) ++database_idx; while (database_idx < databases->size() && (!tables_it || !tables_it->isValid())) { database_name = databases->getDataAt(database_idx).toString(); database = context.tryGetDatabase(database_name); if (!database || !context.hasDatabaseAccessRights(database_name)) { /// Database was deleted just now or the user has no access. ++database_idx; continue; } break; } /// This is for temporary tables. They are output in single block regardless to max_block_size. if (database_idx >= databases->size()) { if (context.hasSessionContext()) { Tables external_tables = context.getSessionContext().getExternalTables(); for (auto table : external_tables) { size_t src_index = 0; size_t res_index = 0; if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insert(table.first); if (columns_mask[src_index++]) res_columns[res_index++]->insert(table.second->getName()); if (columns_mask[src_index++]) res_columns[res_index++]->insert(1u); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insert(table.second->getName()); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); } } res.setColumns(std::move(res_columns)); done = true; return res; } if (!tables_it || !tables_it->isValid()) tables_it = database->getIterator(context); for (; rows_count < max_block_size && tables_it->isValid(); tables_it->next()) { auto table_name = tables_it->name(); const StoragePtr & table = tables_it->table(); TableStructureReadLockHolder lock; try { lock = table->lockStructureForShare(false, context.getCurrentQueryId()); } catch (const Exception & e) { if (e.code() == ErrorCodes::TABLE_IS_DROPPED) continue; throw; } ++rows_count; size_t src_index = 0; size_t res_index = 0; if (columns_mask[src_index++]) res_columns[res_index++]->insert(database_name); if (columns_mask[src_index++]) res_columns[res_index++]->insert(table_name); if (columns_mask[src_index++]) res_columns[res_index++]->insert(table->getName()); if (columns_mask[src_index++]) res_columns[res_index++]->insert(0u); // is_temporary if (columns_mask[src_index++]) res_columns[res_index++]->insert(table->getDataPath()); if (columns_mask[src_index++]) res_columns[res_index++]->insert(database->getTableMetadataPath(table_name)); if (columns_mask[src_index++]) res_columns[res_index++]->insert(static_cast(database->getTableMetadataModificationTime(context, table_name))); { Array dependencies_table_name_array; Array dependencies_database_name_array; if (columns_mask[src_index] || columns_mask[src_index + 1]) { const auto dependencies = context.getDependencies(database_name, table_name); dependencies_table_name_array.reserve(dependencies.size()); dependencies_database_name_array.reserve(dependencies.size()); for (const auto & dependency : dependencies) { dependencies_table_name_array.push_back(dependency.second); dependencies_database_name_array.push_back(dependency.first); } } if (columns_mask[src_index++]) res_columns[res_index++]->insert(dependencies_database_name_array); if (columns_mask[src_index++]) res_columns[res_index++]->insert(dependencies_table_name_array); } if (columns_mask[src_index] || columns_mask[src_index + 1]) { ASTPtr ast = database->tryGetCreateTableQuery(context, table_name); if (columns_mask[src_index++]) res_columns[res_index++]->insert(ast ? queryToString(ast) : ""); if (columns_mask[src_index++]) { String engine_full; if (ast) { const auto & ast_create = ast->as(); if (ast_create.storage) { engine_full = queryToString(*ast_create.storage); static const char * const extra_head = " ENGINE = "; if (startsWith(engine_full, extra_head)) engine_full = engine_full.substr(strlen(extra_head)); } } res_columns[res_index++]->insert(engine_full); } } else src_index += 2; ASTPtr expression_ptr; if (columns_mask[src_index++]) { if ((expression_ptr = table->getPartitionKeyAST())) res_columns[res_index++]->insert(queryToString(expression_ptr)); else res_columns[res_index++]->insertDefault(); } if (columns_mask[src_index++]) { if ((expression_ptr = table->getSortingKeyAST())) res_columns[res_index++]->insert(queryToString(expression_ptr)); else res_columns[res_index++]->insertDefault(); } if (columns_mask[src_index++]) { if ((expression_ptr = table->getPrimaryKeyAST())) res_columns[res_index++]->insert(queryToString(expression_ptr)); else res_columns[res_index++]->insertDefault(); } if (columns_mask[src_index++]) { if ((expression_ptr = table->getSamplingKeyAST())) res_columns[res_index++]->insert(queryToString(expression_ptr)); else res_columns[res_index++]->insertDefault(); } } } res.setColumns(std::move(res_columns)); return res; } private: std::vector columns_mask; Block header; UInt64 max_block_size; ColumnPtr databases; size_t database_idx = 0; DatabaseIteratorPtr tables_it; const Context context; bool done = false; DatabasePtr database; std::string database_name; }; BlockInputStreams StorageSystemTables::read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum /*processed_stage*/, const size_t max_block_size, const unsigned /*num_streams*/) { check(column_names); /// Create a mask of what columns are needed in the result. NameSet names_set(column_names.begin(), column_names.end()); Block sample_block = getSampleBlock(); Block res_block; std::vector columns_mask(sample_block.columns()); for (size_t i = 0, size = columns_mask.size(); i < size; ++i) { if (names_set.count(sample_block.getByPosition(i).name)) { columns_mask[i] = 1; res_block.insert(sample_block.getByPosition(i)); } } ColumnPtr filtered_databases_column = getFilteredDatabases(query_info.query, context); return {std::make_shared( std::move(columns_mask), std::move(res_block), max_block_size, std::move(filtered_databases_column), context)}; } }