#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { StorageSystemProcesses::StorageSystemProcesses(const std::string & name_) : name(name_) { setColumns(ColumnsDescription({ { "is_initial_query", std::make_shared() }, { "user", std::make_shared() }, { "query_id", std::make_shared() }, { "address", std::make_shared() }, { "port", std::make_shared() }, { "initial_user", std::make_shared() }, { "initial_query_id", std::make_shared() }, { "initial_address", std::make_shared() }, { "initial_port", std::make_shared() }, { "interface", std::make_shared() }, { "os_user", std::make_shared() }, { "client_hostname", std::make_shared() }, { "client_name", std::make_shared() }, { "client_version_major", std::make_shared() }, { "client_version_minor", std::make_shared() }, { "client_revision", std::make_shared() }, { "http_method", std::make_shared() }, { "http_user_agent", std::make_shared() }, { "quota_key", std::make_shared() }, { "elapsed", std::make_shared() }, { "is_cancelled", std::make_shared() }, { "read_rows", std::make_shared() }, { "read_bytes", std::make_shared() }, { "total_rows_approx", std::make_shared() }, { "written_rows", std::make_shared() }, { "written_bytes", std::make_shared() }, { "memory_usage", std::make_shared() }, { "peak_memory_usage", std::make_shared() }, { "query", std::make_shared() }, { "thread_numbers", std::make_shared(std::make_shared()) }, { "ProfileEvents.Names", std::make_shared(std::make_shared()) }, { "ProfileEvents.Values", std::make_shared(std::make_shared()) }, { "Settings.Names", std::make_shared(std::make_shared()) }, { "Settings.Values", std::make_shared(std::make_shared()) } })); } BlockInputStreams StorageSystemProcesses::read( const Names & column_names, const SelectQueryInfo &, const Context & context, QueryProcessingStage::Enum & processed_stage, const size_t /*max_block_size*/, const unsigned /*num_streams*/) { processed_stage = QueryProcessingStage::FetchColumns; check(column_names); Block res_block = getSampleBlock().cloneEmpty(); MutableColumns res_columns = res_block.cloneEmptyColumns(); ProcessList::Info info = context.getProcessList().getInfo(true, true, true); for (const auto & process : info) { size_t i = 0; res_columns[i++]->insert(UInt64(process.client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)); res_columns[i++]->insert(process.client_info.current_user); res_columns[i++]->insert(process.client_info.current_query_id); res_columns[i++]->insert(process.client_info.current_address.host().toString()); res_columns[i++]->insert(UInt64(process.client_info.current_address.port())); res_columns[i++]->insert(process.client_info.initial_user); res_columns[i++]->insert(process.client_info.initial_query_id); res_columns[i++]->insert(process.client_info.initial_address.host().toString()); res_columns[i++]->insert(UInt64(process.client_info.initial_address.port())); res_columns[i++]->insert(UInt64(process.client_info.interface)); res_columns[i++]->insert(process.client_info.os_user); res_columns[i++]->insert(process.client_info.client_hostname); res_columns[i++]->insert(process.client_info.client_name); res_columns[i++]->insert(process.client_info.client_version_major); res_columns[i++]->insert(process.client_info.client_version_minor); res_columns[i++]->insert(UInt64(process.client_info.client_revision)); res_columns[i++]->insert(UInt64(process.client_info.http_method)); res_columns[i++]->insert(process.client_info.http_user_agent); res_columns[i++]->insert(process.client_info.quota_key); res_columns[i++]->insert(process.elapsed_seconds); res_columns[i++]->insert(UInt64(process.is_cancelled)); res_columns[i++]->insert(UInt64(process.read_rows)); res_columns[i++]->insert(UInt64(process.read_bytes)); res_columns[i++]->insert(UInt64(process.total_rows)); res_columns[i++]->insert(UInt64(process.written_rows)); res_columns[i++]->insert(UInt64(process.written_bytes)); res_columns[i++]->insert(process.memory_usage); res_columns[i++]->insert(process.peak_memory_usage); res_columns[i++]->insert(process.query); { Array threads_array; threads_array.reserve(process.thread_numbers.size()); for (const UInt32 thread_number : process.thread_numbers) threads_array.emplace_back(UInt64(thread_number)); res_columns[i++]->insert(threads_array); } { IColumn * column_profile_events_names = res_columns[i++].get(); IColumn * column_profile_events_values = res_columns[i++].get(); if (process.profile_counters) ProfileEvents::dumpToArrayColumns(*process.profile_counters, column_profile_events_names, column_profile_events_values, true); else { column_profile_events_names->insertDefault(); column_profile_events_values->insertDefault(); } } { IColumn * column_settings_names = res_columns[i++].get(); IColumn * column_settings_values = res_columns[i++].get(); if (process.query_settings) process.query_settings->dumpToArrayColumns(column_settings_names, column_settings_values, true); else { column_settings_names->insertDefault(); column_settings_values->insertDefault(); } } } return BlockInputStreams(1, std::make_shared(res_block.cloneWithColumns(std::move(res_columns)))); } }