#!/usr/bin/env python3 import argparse import gzip import io import json import socket import subprocess import sys from copy import deepcopy from datetime import datetime from typing import MutableMapping import jinja2 from pandas import describe_option import requests import sqlparse import tenacity import xmltodict import yaml SELECT_VERSION = r'SELECT version()' SELECT_UPTIME = r''' {% if version_ge('21.3') -%} SELECT formatReadableTimeDelta(uptime()) {% else -%} SELECT toString(floor(uptime() / 3600 / 24)) || ' days ' || toString(floor(uptime() % (24 * 3600) / 3600, 1)) || ' hours' {% endif -%} ''' SELECT_SYSTEM_TABLES = "SELECT name FROM system.tables WHERE database = 'system'" SELECT_DATABASE_ENGINES = r'''SELECT engine, count() "count" FROM system.databases GROUP BY engine ''' SELECT_DATABASES = r'''SELECT name, engine, tables, partitions, parts, formatReadableSize(bytes_on_disk) "disk_size" FROM system.databases db LEFT JOIN ( SELECT database, uniq(table) "tables", uniq(table, partition) "partitions", count() AS parts, sum(bytes_on_disk) "bytes_on_disk" FROM system.parts WHERE active GROUP BY database ) AS db_stats ON db.name = db_stats.database ORDER BY bytes_on_disk DESC LIMIT 10 ''' SELECT_TABLE_ENGINES = r'''SELECT engine, count() "count" FROM system.tables WHERE database != 'system' GROUP BY engine ''' SELECT_DICTIONARIES = r'''SELECT source, type, status, count() "count" FROM system.dictionaries GROUP BY source, type, status ORDER BY status DESC, source ''' SELECT_ACCESS = "SHOW ACCESS" SELECT_QUOTA_USAGE = "SHOW QUOTA" SELECT_REPLICAS = r'''SELECT database, table, is_leader, is_readonly, absolute_delay, queue_size, inserts_in_queue, merges_in_queue FROM system.replicas ORDER BY absolute_delay DESC LIMIT 10 ''' SELECT_REPLICATION_QUEUE = r'''SELECT database, table, replica_name, position, node_name, type, source_replica, parts_to_merge, new_part_name, create_time, required_quorum, is_detach, is_currently_executing, num_tries, last_attempt_time, last_exception, concat('time: ', toString(last_postpone_time), ', number: ', toString(num_postponed), ', reason: ', postpone_reason) postpone FROM system.replication_queue ORDER BY create_time ASC LIMIT 20 ''' SELECT_REPLICATED_FETCHES = r'''SELECT database, table, round(elapsed, 1) "elapsed", round(100 * progress, 1) "progress", partition_id, result_part_name, result_part_path, total_size_bytes_compressed, bytes_read_compressed, source_replica_path, source_replica_hostname, source_replica_port, interserver_scheme, to_detached, thread_id FROM system.replicated_fetches ''' SELECT_PARTS_PER_TABLE = r'''SELECT database, table, count() "partitions", sum(part_count) "parts", max(part_count) "max_parts_per_partition" FROM ( SELECT database, table, partition, count() "part_count" FROM system.parts WHERE active GROUP BY database, table, partition ) partitions GROUP BY database, table ORDER BY max_parts_per_partition DESC LIMIT 10 ''' SELECT_MERGES = r'''SELECT database, table, round(elapsed, 1) "elapsed", round(100 * progress, 1) "progress", is_mutation, partition_id, {% if version_ge('20.3') -%} result_part_path, source_part_paths, {% endif -%} num_parts, formatReadableSize(total_size_bytes_compressed) "total_size_compressed", formatReadableSize(bytes_read_uncompressed) "read_uncompressed", formatReadableSize(bytes_written_uncompressed) "written_uncompressed", columns_written, {% if version_ge('20.3') -%} formatReadableSize(memory_usage) "memory_usage", thread_id {% else -%} formatReadableSize(memory_usage) "memory_usage" {% endif -%} FROM system.merges ''' SELECT_MUTATIONS = r'''SELECT database, table, mutation_id, command, create_time, {% if version_ge('20.3') -%} parts_to_do_names, {% endif -%} parts_to_do, is_done, latest_failed_part, latest_fail_time, latest_fail_reason FROM system.mutations WHERE NOT is_done ORDER BY create_time DESC ''' SELECT_RECENT_DATA_PARTS = r'''SELECT database, table, engine, partition_id, name, {% if version_ge('20.3') -%} part_type, {% endif -%} active, level, {% if version_ge('20.3') -%} disk_name, {% endif -%} path, marks, rows, bytes_on_disk, data_compressed_bytes, data_uncompressed_bytes, marks_bytes, modification_time, remove_time, refcount, is_frozen, min_date, max_date, min_time, max_time, min_block_number, max_block_number FROM system.parts WHERE modification_time > now() - INTERVAL 3 MINUTE ORDER BY modification_time DESC ''' SELECT_DETACHED_DATA_PARTS = r'''SELECT database, table, partition_id, name, disk, reason, min_block_number, max_block_number, level FROM system.detached_parts ''' SELECT_PROCESSES = r'''SELECT elapsed, query_id, {% if normalize_queries -%} normalizeQuery(query) AS normalized_query, {% else -%} query, {% endif -%} is_cancelled, concat(toString(read_rows), ' rows / ', formatReadableSize(read_bytes)) AS read, concat(toString(written_rows), ' rows / ', formatReadableSize(written_bytes)) AS written, formatReadableSize(memory_usage) AS "memory usage", user, multiIf(empty(client_name), http_user_agent, concat(client_name, ' ', toString(client_version_major), '.', toString(client_version_minor), '.', toString(client_version_patch))) AS client, {% if version_ge('21.3') -%} thread_ids, {% endif -%} {% if version_ge('21.8') -%} ProfileEvents, Settings {% else -%} ProfileEvents.Names, ProfileEvents.Values, Settings.Names, Settings.Values {% endif -%} FROM system.processes ORDER BY elapsed DESC ''' SELECT_TOP_QUERIES_BY_DURATION = r'''SELECT type, query_start_time, query_duration_ms, query_id, query_kind, is_initial_query, {% if normalize_queries -%} normalizeQuery(query) AS normalized_query, {% else -%} query, {% endif -%} concat(toString(read_rows), ' rows / ', formatReadableSize(read_bytes)) AS read, concat(toString(written_rows), ' rows / ', formatReadableSize(written_bytes)) AS written, concat(toString(result_rows), ' rows / ', formatReadableSize(result_bytes)) AS result, formatReadableSize(memory_usage) AS "memory usage", exception, '\n' || stack_trace AS stack_trace, user, initial_user, multiIf(empty(client_name), http_user_agent, concat(client_name, ' ', toString(client_version_major), '.', toString(client_version_minor), '.', toString(client_version_patch))) AS client, client_hostname, {% if version_ge('21.3') -%} databases, tables, columns, used_aggregate_functions, used_aggregate_function_combinators, used_database_engines, used_data_type_families, used_dictionaries, used_formats, used_functions, used_storages, used_table_functions, thread_ids, {% endif -%} {% if version_ge('21.8') -%} ProfileEvents, Settings {% else -%} ProfileEvents.Names, ProfileEvents.Values, Settings.Names, Settings.Values {% endif -%} FROM system.query_log WHERE type != 'QueryStart' AND event_date >= today() - 1 AND event_time >= now() - INTERVAL 1 DAY ORDER BY query_duration_ms DESC LIMIT 10 ''' SELECT_TOP_QUERIES_BY_MEMORY_USAGE = r'''SELECT type, query_start_time, query_duration_ms, query_id, query_kind, is_initial_query, {% if normalize_queries -%} normalizeQuery(query) AS normalized_query, {% else -%} query, {% endif -%} concat(toString(read_rows), ' rows / ', formatReadableSize(read_bytes)) AS read, concat(toString(written_rows), ' rows / ', formatReadableSize(written_bytes)) AS written, concat(toString(result_rows), ' rows / ', formatReadableSize(result_bytes)) AS result, formatReadableSize(memory_usage) AS "memory usage", exception, '\n' || stack_trace AS stack_trace, user, initial_user, multiIf(empty(client_name), http_user_agent, concat(client_name, ' ', toString(client_version_major), '.', toString(client_version_minor), '.', toString(client_version_patch))) AS client, client_hostname, {% if version_ge('21.3') -%} databases, tables, columns, used_aggregate_functions, used_aggregate_function_combinators, used_database_engines, used_data_type_families, used_dictionaries, used_formats, used_functions, used_storages, used_table_functions, thread_ids, {% endif -%} {% if version_ge('21.8') -%} ProfileEvents, Settings {% else -%} ProfileEvents.Names, ProfileEvents.Values, Settings.Names, Settings.Values {% endif -%} FROM system.query_log WHERE type != 'QueryStart' AND event_date >= today() - 1 AND event_time >= now() - INTERVAL 1 DAY ORDER BY memory_usage DESC LIMIT 10 ''' SELECT_FAILED_QUERIES = r'''SELECT type, query_start_time, query_duration_ms, query_id, query_kind, is_initial_query, {% if normalize_queries -%} normalizeQuery(query) AS normalized_query, {% else -%} query, {% endif -%} concat(toString(read_rows), ' rows / ', formatReadableSize(read_bytes)) AS read, concat(toString(written_rows), ' rows / ', formatReadableSize(written_bytes)) AS written, concat(toString(result_rows), ' rows / ', formatReadableSize(result_bytes)) AS result, formatReadableSize(memory_usage) AS "memory usage", exception, '\n' || stack_trace AS stack_trace, user, initial_user, multiIf(empty(client_name), http_user_agent, concat(client_name, ' ', toString(client_version_major), '.', toString(client_version_minor), '.', toString(client_version_patch))) AS client, client_hostname, {% if version_ge('21.3') -%} databases, tables, columns, used_aggregate_functions, used_aggregate_function_combinators, used_database_engines, used_data_type_families, used_dictionaries, used_formats, used_functions, used_storages, used_table_functions, thread_ids, {% endif -%} {% if version_ge('21.8') -%} ProfileEvents, Settings {% else -%} ProfileEvents.Names, ProfileEvents.Values, Settings.Names, Settings.Values {% endif -%} FROM system.query_log WHERE type != 'QueryStart' AND event_date >= today() - 1 AND event_time >= now() - INTERVAL 1 DAY AND exception != '' ORDER BY query_start_time DESC LIMIT 10 ''' SELECT_STACK_TRACES = r'''SELECT '\n' || arrayStringConcat( arrayMap( x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') AS trace FROM system.stack_trace ''' SELECT_CRASH_LOG = r'''SELECT event_time, signal, thread_id, query_id, '\n' || arrayStringConcat(trace_full, '\n') AS trace, version FROM system.crash_log ORDER BY event_time DESC ''' def retry(exception_types, max_attempts=5, max_interval=5): """ Function decorator that retries wrapped function on failures. """ return tenacity.retry( retry=tenacity.retry_if_exception_type(exception_types), wait=tenacity.wait_random_exponential(multiplier=0.5, max=max_interval), stop=tenacity.stop_after_attempt(max_attempts), reraise=True) class ClickhouseError(Exception): """ ClickHouse interaction error. """ def __init__(self, response): self.response = response super().__init__(self.response.text.strip()) class ClickhouseClient: """ ClickHouse client. """ def __init__(self, *, host="localhost", port=8123, user="default", password): self._session = requests.Session() if user: self._session.headers['X-ClickHouse-User'] = user self._session.headers['X-ClickHouse-Key'] = password self._url = f'http://{host}:{port}' self._timeout = 60 self._ch_version = None @property def clickhouse_version(self): if self._ch_version is None: self._ch_version = self.query(SELECT_VERSION) return self._ch_version @retry(requests.exceptions.ConnectionError) def query(self, query, query_args=None, format=None, post_data=None, timeout=None, echo=False, dry_run=False): """ Execute query. """ if query_args: query = self.render_query(query, **query_args) if format: query += f' FORMAT {format}' if timeout is None: timeout = self._timeout if echo: print(sqlparse.format(query, reindent=True), '\n') if dry_run: return None try: response = self._session.post(self._url, params={ 'query': query, }, json=post_data, timeout=timeout) response.raise_for_status() if format in ('JSON', 'JSONCompact'): return response.json() return response.text.strip() except requests.exceptions.HTTPError as e: raise ClickhouseError(e.response) from None def render_query(self, query, **kwargs): env = jinja2.Environment() env.globals['version_ge'] = lambda version: version_ge(self.clickhouse_version, version) template = env.from_string(query) return template.render(kwargs) class ClickhouseConfig: """ ClickHouse server configuration. """ def __init__(self, config): self._config = config def dump(self, mask_secrets=True): config = deepcopy(self._config) if mask_secrets: self._mask_secrets(config) return xmltodict.unparse(config, pretty=True) @classmethod def load(cls): return ClickhouseConfig(cls._load_config('/var/lib/clickhouse/preprocessed_configs/config.xml')) @staticmethod def _load_config(config_path): with open(config_path, 'r') as file: return xmltodict.parse(file.read()) @classmethod def _mask_secrets(cls, config): if isinstance(config, MutableMapping): for key, value in list(config.items()): if isinstance(value, MutableMapping): cls._mask_secrets(config[key]) elif key in ('password', 'secret_access_key', 'header', 'identity'): config[key] = '*****' class DiagnosticsData: """ Diagnostics data. """ def __init__(self, args): self.args = args self.host = args.host self._sections = [{'section': None, 'data': {}}] def add_string(self, name, value, section=None): self._section(section)[name] = { 'type': 'string', 'value': value, } def add_xml_document(self, name, document, section=None): self._section(section)[name] = { 'type': 'xml', 'value': document, } def add_query(self, name, query, result, section=None): self._section(section)[name] = { 'type': 'query', 'query': query, 'result': result, } def add_command(self, name, command, result, section=None): self._section(section)[name] = { 'type': 'command', 'command': command, 'result': result, } def dump(self, format): if format.startswith('json'): result = self._dump_json() elif format.startswith('yaml'): result = self._dump_yaml() else: result = self._dump_wiki() if format.endswith('.gz'): compressor = gzip.GzipFile(mode='wb', fileobj=sys.stdout.buffer) compressor.write(result.encode()) else: print(result) def _section(self, name=None): if self._sections[-1]['section'] != name: self._sections.append({'section': name, 'data': {}}) return self._sections[-1]['data'] def _dump_json(self): """ Dump diagnostic data in JSON format. """ return json.dumps(self._sections, indent=2, ensure_ascii=False) def _dump_yaml(self): """ Dump diagnostic data in YAML format. """ return yaml.dump(self._sections, default_flow_style=False, allow_unicode=True) def _dump_wiki(self): """ Dump diagnostic data in Wiki format. """ def _write_title(buffer, value): buffer.write(f'### {value}\n') def _write_subtitle(buffer, value): buffer.write(f'#### {value}\n') def _write_string_item(buffer, name, item): value = item['value'] if value != '': value = f'**{value}**' buffer.write(f'{name}: {value}\n') def _write_xml_item(buffer, section_name, name, item): if section_name: buffer.write(f'##### {name}\n') else: _write_subtitle(buffer, name) _write_result(buffer, item['value'], format='XML') def _write_query_item(buffer, section_name, name, item): if section_name: buffer.write(f'##### {name}\n') else: _write_subtitle(buffer, name) _write_query(buffer, item['query']) _write_result(buffer, item['result']) def _write_command_item(buffer, section_name, name, item): if section_name: buffer.write(f'##### {name}\n') else: _write_subtitle(buffer, name) _write_command(buffer, item['command']) _write_result(buffer, item['result']) def _write_unknown_item(buffer, section_name, name, item): if section_name: buffer.write(f'**{name}**\n') else: _write_subtitle(buffer, name) json.dump(item, buffer, indent=2) def _write_query(buffer, query): buffer.write('**query**\n') buffer.write('```sql\n') buffer.write(query) buffer.write('\n```\n') def _write_command(buffer, command): buffer.write('**command**\n') buffer.write('```\n') buffer.write(command) buffer.write('\n```\n') def _write_result(buffer, result, format=None): buffer.write('**result**\n') buffer.write(f'```{format}\n' if format else '```\n') buffer.write(result) buffer.write('\n```\n') buffer = io.StringIO() _write_title(buffer, f'Diagnostics data for host {self.host}') for section in self._sections: section_name = section['section'] if section_name: _write_subtitle(buffer, section_name) for name, item in section['data'].items(): if item['type'] == 'string': _write_string_item(buffer, name, item) elif item['type'] == 'query': _write_query_item(buffer, section_name, name, item) elif item['type'] == 'command': _write_command_item(buffer, section_name, name, item) elif item['type'] == 'xml': _write_xml_item(buffer, section_name, name, item) else: _write_unknown_item(buffer, section_name, name, item) return buffer.getvalue() def main(): """ Program entry point. """ args = parse_args() timestamp = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S') client = ClickhouseClient(host=args.host, port=args.port, user=args.user, password=args.password) ch_config = ClickhouseConfig.load() version = client.clickhouse_version system_tables = [row[0] for row in execute_query(client, SELECT_SYSTEM_TABLES, format='JSONCompact')['data']] diagnostics = DiagnosticsData(args) diagnostics.add_string('Version', version) diagnostics.add_string('Timestamp', timestamp) diagnostics.add_string('Uptime', execute_query(client, SELECT_UPTIME)) diagnostics.add_xml_document('ClickHouse configuration', ch_config.dump()) if version_ge(version, '20.8'): add_query(diagnostics, 'Access configuration', client=client, query=SELECT_ACCESS, format='TSVRaw') add_query(diagnostics, 'Quotas', client=client, query=SELECT_QUOTA_USAGE, format='Vertical') add_query(diagnostics, 'Database engines', client=client, query=SELECT_DATABASE_ENGINES, format='PrettyCompactNoEscapes', section='Schema') add_query(diagnostics, 'Databases (top 10 by size)', client=client, query=SELECT_DATABASES, format='PrettyCompactNoEscapes', section='Schema') add_query(diagnostics, 'Table engines', client=client, query=SELECT_TABLE_ENGINES, format='PrettyCompactNoEscapes', section='Schema') add_query(diagnostics, 'Dictionaries', client=client, query=SELECT_DICTIONARIES, format='PrettyCompactNoEscapes', section='Schema') add_query(diagnostics, 'Replicated tables (top 10 by absolute delay)', client=client, query=SELECT_REPLICAS, format='PrettyCompactNoEscapes', section='Replication') add_query(diagnostics, 'Replication queue (top 20 oldest tasks)', client=client, query=SELECT_REPLICATION_QUEUE, format='Vertical', section='Replication') if version_ge(version, '21.3'): add_query(diagnostics, 'Replicated fetches', client=client, query=SELECT_REPLICATED_FETCHES, format='Vertical', section='Replication') add_query(diagnostics, 'Top 10 tables by max parts per partition', client=client, query=SELECT_PARTS_PER_TABLE, format='PrettyCompactNoEscapes') add_query(diagnostics, 'Merges in progress', client=client, query=SELECT_MERGES, format='Vertical') add_query(diagnostics, 'Mutations in progress', client=client, query=SELECT_MUTATIONS, format='Vertical') add_query(diagnostics, 'Recent data parts (modification time within last 3 minutes)', client=client, query=SELECT_RECENT_DATA_PARTS, format='Vertical') add_query(diagnostics, 'system.detached_parts', client=client, query=SELECT_DETACHED_DATA_PARTS, format='PrettyCompactNoEscapes', section='Detached data') add_command(diagnostics, 'Disk space usage', command='du -sh -L -c /var/lib/clickhouse/data/*/*/detached/* | sort -rsh', section='Detached data') add_query(diagnostics, 'Queries in progress (process list)', client=client, query=SELECT_PROCESSES, format='Vertical', section='Queries') add_query(diagnostics, 'Top 10 queries by duration', client=client, query=SELECT_TOP_QUERIES_BY_DURATION, format='Vertical', section='Queries') add_query(diagnostics, 'Top 10 queries by memory usage', client=client, query=SELECT_TOP_QUERIES_BY_MEMORY_USAGE, format='Vertical', section='Queries') add_query(diagnostics, 'Last 10 failed queries', client=client, query=SELECT_FAILED_QUERIES, format='Vertical', section='Queries') add_query(diagnostics, 'Stack traces', client=client, query=SELECT_STACK_TRACES, format='Vertical') if 'crash_log' in system_tables: add_query(diagnostics, 'Crash log', client=client, query=SELECT_CRASH_LOG, format='Vertical') add_command(diagnostics, 'uname', 'uname -a') diagnostics.dump(args.format) def parse_args(): """ Parse command-line arguments. """ parser = argparse.ArgumentParser() parser.add_argument('--format', choices=['json', 'yaml', 'json.gz', 'yaml.gz', 'wiki', 'wiki.gz'], default='wiki') parser.add_argument('--normalize-queries', action='store_true', default=False) parser.add_argument('--host', dest="host", help="clickhouse host") parser.add_argument('--port', dest="port", default=8123, help="clickhouse http port") parser.add_argument('--user', dest="user", default="default", help="clickhouse user") parser.add_argument('--password', dest="password", help="clickhouse password") return parser.parse_args() def add_query(diagnostics, name, client, query, format, section=None): query_args = { 'normalize_queries': diagnostics.args.normalize_queries, } query = client.render_query(query, **query_args) diagnostics.add_query( name=name, query=query, result=execute_query(client, query, render_query=False, format=format), section=section) def execute_query(client, query, render_query=True, format=None): if render_query: query = client.render_query(query) try: return client.query(query, format=format) except Exception as e: return repr(e) def add_command(diagnostics, name, command, section=None): diagnostics.add_command( name=name, command=command, result=execute_command(command), section=section) def execute_command(command, input=None): proc = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if isinstance(input, str): input = input.encode() stdout, stderr = proc.communicate(input=input) if proc.returncode: return f'failed with exit code {proc.returncode}\n{stderr.decode()}' return stdout.decode() def version_ge(version1, version2): """ Return True if version1 is greater or equal than version2. """ return parse_version(version1) >= parse_version(version2) def parse_version(version): """ Parse version string. """ return [int(x) for x in version.strip().split('.') if x.isnumeric()] if __name__ == '__main__': main()