#include #include #include #include #include #include #include #include #include #include #include namespace DB { StorageSystemReplicas::StorageSystemReplicas(const std::string & name_) : IStorage({"system", name_}) { setColumns(ColumnsDescription({ { "database", std::make_shared() }, { "table", std::make_shared() }, { "engine", std::make_shared() }, { "is_leader", std::make_shared() }, { "can_become_leader", std::make_shared() }, { "is_readonly", std::make_shared() }, { "is_session_expired", std::make_shared() }, { "future_parts", std::make_shared() }, { "parts_to_check", std::make_shared() }, { "zookeeper_path", std::make_shared() }, { "replica_name", std::make_shared() }, { "replica_path", std::make_shared() }, { "columns_version", std::make_shared() }, { "queue_size", std::make_shared() }, { "inserts_in_queue", std::make_shared() }, { "merges_in_queue", std::make_shared() }, { "part_mutations_in_queue", std::make_shared() }, { "queue_oldest_time", std::make_shared() }, { "inserts_oldest_time", std::make_shared() }, { "merges_oldest_time", std::make_shared() }, { "part_mutations_oldest_time", std::make_shared() }, { "oldest_part_to_get", std::make_shared() }, { "oldest_part_to_merge_to", std::make_shared() }, { "oldest_part_to_mutate_to", std::make_shared() }, { "log_max_index", std::make_shared() }, { "log_pointer", std::make_shared() }, { "last_queue_update", std::make_shared() }, { "absolute_delay", std::make_shared() }, { "total_replicas", std::make_shared() }, { "active_replicas", std::make_shared() }, })); } Pipes StorageSystemReplicas::readWithProcessors( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum /*processed_stage*/, const size_t /*max_block_size*/, const unsigned /*num_streams*/) { check(column_names); /// We collect a set of replicated tables. std::map> replicated_tables; for (const auto & db : context.getDatabases()) { /// Lazy database can not contain replicated tables if (db.second->getEngineName() == "Lazy") continue; if (context.hasDatabaseAccessRights(db.first)) { for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) if (dynamic_cast(iterator->table().get())) replicated_tables[db.first][iterator->name()] = iterator->table(); } } /// Do you need columns that require a ZooKeeper request to compute. bool with_zk_fields = false; for (const auto & column_name : column_names) { if ( column_name == "log_max_index" || column_name == "log_pointer" || column_name == "total_replicas" || column_name == "active_replicas") { with_zk_fields = true; break; } } MutableColumnPtr col_database_mut = ColumnString::create(); MutableColumnPtr col_table_mut = ColumnString::create(); MutableColumnPtr col_engine_mut = ColumnString::create(); for (auto & db : replicated_tables) { for (auto & table : db.second) { col_database_mut->insert(db.first); col_table_mut->insert(table.first); col_engine_mut->insert(table.second->getName()); } } ColumnPtr col_database = std::move(col_database_mut); ColumnPtr col_table = std::move(col_table_mut); ColumnPtr col_engine = std::move(col_engine_mut); /// Determine what tables are needed by the conditions in the query. { Block filtered_block { { col_database, std::make_shared(), "database" }, { col_table, std::make_shared(), "table" }, { col_engine, std::make_shared(), "engine" }, }; VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context); if (!filtered_block.rows()) return Pipes(); col_database = filtered_block.getByName("database").column; col_table = filtered_block.getByName("table").column; col_engine = filtered_block.getByName("engine").column; } MutableColumns res_columns = getSampleBlock().cloneEmptyColumns(); for (size_t i = 0, size = col_database->size(); i < size; ++i) { StorageReplicatedMergeTree::Status status; dynamic_cast( *replicated_tables [(*col_database)[i].safeGet()] [(*col_table)[i].safeGet()]).getStatus(status, with_zk_fields); size_t col_num = 3; res_columns[col_num++]->insert(status.is_leader); res_columns[col_num++]->insert(status.can_become_leader); res_columns[col_num++]->insert(status.is_readonly); res_columns[col_num++]->insert(status.is_session_expired); res_columns[col_num++]->insert(status.queue.future_parts); res_columns[col_num++]->insert(status.parts_to_check); res_columns[col_num++]->insert(status.zookeeper_path); res_columns[col_num++]->insert(status.replica_name); res_columns[col_num++]->insert(status.replica_path); res_columns[col_num++]->insert(status.columns_version); res_columns[col_num++]->insert(status.queue.queue_size); res_columns[col_num++]->insert(status.queue.inserts_in_queue); res_columns[col_num++]->insert(status.queue.merges_in_queue); res_columns[col_num++]->insert(status.queue.part_mutations_in_queue); res_columns[col_num++]->insert(status.queue.queue_oldest_time); res_columns[col_num++]->insert(status.queue.inserts_oldest_time); res_columns[col_num++]->insert(status.queue.merges_oldest_time); res_columns[col_num++]->insert(status.queue.part_mutations_oldest_time); res_columns[col_num++]->insert(status.queue.oldest_part_to_get); res_columns[col_num++]->insert(status.queue.oldest_part_to_merge_to); res_columns[col_num++]->insert(status.queue.oldest_part_to_mutate_to); res_columns[col_num++]->insert(status.log_max_index); res_columns[col_num++]->insert(status.log_pointer); res_columns[col_num++]->insert(status.queue.last_queue_update); res_columns[col_num++]->insert(status.absolute_delay); res_columns[col_num++]->insert(status.total_replicas); res_columns[col_num++]->insert(status.active_replicas); } Block header = getSampleBlock(); Columns columns; columns.reserve(res_columns.size()); for (auto & col : res_columns) columns.emplace_back(std::move(col)); columns[0] = std::move(col_database); columns[1] = std::move(col_table); columns[2] = std::move(col_engine); UInt64 num_rows = columns.at(0)->size(); Chunk chunk(std::move(columns), num_rows); Pipes pipes; pipes.emplace_back(std::make_shared(getSampleBlock(), std::move(chunk))); return pipes; } }