#include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { NamesAndTypesList StorageSystemProcesses::getNamesAndTypes() { return { {"is_initial_query", std::make_shared()}, {"user", std::make_shared()}, {"query_id", std::make_shared()}, {"address", DataTypeFactory::instance().get("IPv6")}, {"port", std::make_shared()}, {"initial_user", std::make_shared()}, {"initial_query_id", std::make_shared()}, {"initial_address", DataTypeFactory::instance().get("IPv6")}, {"initial_port", std::make_shared()}, {"interface", std::make_shared()}, {"os_user", std::make_shared()}, {"client_hostname", std::make_shared()}, {"client_name", std::make_shared()}, {"client_revision", std::make_shared()}, {"client_version_major", std::make_shared()}, {"client_version_minor", std::make_shared()}, {"client_version_patch", std::make_shared()}, {"http_method", std::make_shared()}, {"http_user_agent", std::make_shared()}, {"http_referer", std::make_shared()}, {"forwarded_for", std::make_shared()}, {"quota_key", std::make_shared()}, {"elapsed", std::make_shared()}, {"is_cancelled", std::make_shared()}, {"read_rows", std::make_shared()}, {"read_bytes", std::make_shared()}, {"total_rows_approx", std::make_shared()}, {"written_rows", std::make_shared()}, {"written_bytes", std::make_shared()}, {"memory_usage", std::make_shared()}, {"peak_memory_usage", std::make_shared()}, {"query", std::make_shared()}, {"thread_ids", std::make_shared(std::make_shared())}, {"ProfileEvents.Names", std::make_shared(std::make_shared())}, {"ProfileEvents.Values", std::make_shared(std::make_shared())}, {"Settings.Names", std::make_shared(std::make_shared())}, {"Settings.Values", std::make_shared(std::make_shared())}, {"current_database", std::make_shared()}, }; } void StorageSystemProcesses::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const { ProcessList::Info info = context->getProcessList().getInfo(true, true, true); for (const auto & process : info) { size_t i = 0; res_columns[i++]->insert(process.client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY); res_columns[i++]->insert(process.client_info.current_user); res_columns[i++]->insert(process.client_info.current_query_id); res_columns[i++]->insertData(IPv6ToBinary(process.client_info.current_address.host()).data(), 16); res_columns[i++]->insert(process.client_info.current_address.port()); res_columns[i++]->insert(process.client_info.initial_user); res_columns[i++]->insert(process.client_info.initial_query_id); res_columns[i++]->insertData(IPv6ToBinary(process.client_info.initial_address.host()).data(), 16); res_columns[i++]->insert(process.client_info.initial_address.port()); res_columns[i++]->insert(UInt64(process.client_info.interface)); res_columns[i++]->insert(process.client_info.os_user); res_columns[i++]->insert(process.client_info.client_hostname); res_columns[i++]->insert(process.client_info.client_name); res_columns[i++]->insert(process.client_info.client_tcp_protocol_version); res_columns[i++]->insert(process.client_info.client_version_major); res_columns[i++]->insert(process.client_info.client_version_minor); res_columns[i++]->insert(process.client_info.client_version_patch); res_columns[i++]->insert(UInt64(process.client_info.http_method)); res_columns[i++]->insert(process.client_info.http_user_agent); res_columns[i++]->insert(process.client_info.http_referer); res_columns[i++]->insert(process.client_info.forwarded_for); res_columns[i++]->insert(process.client_info.quota_key); res_columns[i++]->insert(process.elapsed_seconds); res_columns[i++]->insert(process.is_cancelled); res_columns[i++]->insert(process.read_rows); res_columns[i++]->insert(process.read_bytes); res_columns[i++]->insert(process.total_rows); res_columns[i++]->insert(process.written_rows); res_columns[i++]->insert(process.written_bytes); res_columns[i++]->insert(process.memory_usage); res_columns[i++]->insert(process.peak_memory_usage); res_columns[i++]->insert(process.query); { Array threads_array; threads_array.reserve(process.thread_ids.size()); for (const UInt64 thread_id : process.thread_ids) threads_array.emplace_back(thread_id); res_columns[i++]->insert(threads_array); } { IColumn * column_profile_events_names = res_columns[i++].get(); IColumn * column_profile_events_values = res_columns[i++].get(); if (process.profile_counters) ProfileEvents::dumpToArrayColumns(*process.profile_counters, column_profile_events_names, column_profile_events_values, true); else { column_profile_events_names->insertDefault(); column_profile_events_values->insertDefault(); } } { IColumn * column_settings_names = res_columns[i++].get(); IColumn * column_settings_values = res_columns[i++].get(); if (process.query_settings) process.query_settings->dumpToArrayColumns(column_settings_names, column_settings_values, true); else { column_settings_names->insertDefault(); column_settings_values->insertDefault(); } } res_columns[i++]->insert(process.current_database); } } }