mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 11:32:03 +00:00
1060 lines
27 KiB
Python
Executable File
1060 lines
27 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import gzip
|
|
import io
|
|
import json
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
from copy import deepcopy
|
|
from datetime import datetime
|
|
from typing import MutableMapping
|
|
|
|
import jinja2
|
|
from pandas import describe_option
|
|
import requests
|
|
import sqlparse
|
|
import tenacity
|
|
import xmltodict
|
|
import yaml
|
|
|
|
SELECT_VERSION = r"SELECT version()"
|
|
|
|
SELECT_UPTIME = r"""
|
|
{% if version_ge('21.3') -%}
|
|
SELECT formatReadableTimeDelta(uptime())
|
|
{% else -%}
|
|
SELECT
|
|
toString(floor(uptime() / 3600 / 24)) || ' days ' ||
|
|
toString(floor(uptime() % (24 * 3600) / 3600, 1)) || ' hours'
|
|
{% endif -%}
|
|
"""
|
|
|
|
SELECT_SYSTEM_TABLES = "SELECT name FROM system.tables WHERE database = 'system'"
|
|
|
|
SELECT_DATABASE_ENGINES = r"""SELECT
|
|
engine,
|
|
count() "count"
|
|
FROM system.databases
|
|
GROUP BY engine
|
|
"""
|
|
|
|
SELECT_DATABASES = r"""SELECT
|
|
name,
|
|
engine,
|
|
tables,
|
|
partitions,
|
|
parts,
|
|
formatReadableSize(bytes_on_disk) "disk_size"
|
|
FROM system.databases db
|
|
LEFT JOIN
|
|
(
|
|
SELECT
|
|
database,
|
|
uniq(table) "tables",
|
|
uniq(table, partition) "partitions",
|
|
count() AS parts,
|
|
sum(bytes_on_disk) "bytes_on_disk"
|
|
FROM system.parts
|
|
WHERE active
|
|
GROUP BY database
|
|
) AS db_stats ON db.name = db_stats.database
|
|
ORDER BY bytes_on_disk DESC
|
|
LIMIT 10
|
|
"""
|
|
|
|
SELECT_TABLE_ENGINES = r"""SELECT
|
|
engine,
|
|
count() "count"
|
|
FROM system.tables
|
|
WHERE database != 'system'
|
|
GROUP BY engine
|
|
"""
|
|
|
|
SELECT_DICTIONARIES = r"""SELECT
|
|
source,
|
|
type,
|
|
status,
|
|
count() "count"
|
|
FROM system.dictionaries
|
|
GROUP BY source, type, status
|
|
ORDER BY status DESC, source
|
|
"""
|
|
|
|
SELECT_ACCESS = "SHOW ACCESS"
|
|
|
|
SELECT_QUOTA_USAGE = "SHOW QUOTA"
|
|
|
|
SELECT_REPLICAS = r"""SELECT
|
|
database,
|
|
table,
|
|
is_leader,
|
|
is_readonly,
|
|
absolute_delay,
|
|
queue_size,
|
|
inserts_in_queue,
|
|
merges_in_queue
|
|
FROM system.replicas
|
|
ORDER BY absolute_delay DESC
|
|
LIMIT 10
|
|
"""
|
|
|
|
SELECT_REPLICATION_QUEUE = r"""SELECT
|
|
database,
|
|
table,
|
|
replica_name,
|
|
position,
|
|
node_name,
|
|
type,
|
|
source_replica,
|
|
parts_to_merge,
|
|
new_part_name,
|
|
create_time,
|
|
required_quorum,
|
|
is_detach,
|
|
is_currently_executing,
|
|
num_tries,
|
|
last_attempt_time,
|
|
last_exception,
|
|
concat('time: ', toString(last_postpone_time), ', number: ', toString(num_postponed), ', reason: ', postpone_reason) postpone
|
|
FROM system.replication_queue
|
|
ORDER BY create_time ASC
|
|
LIMIT 20
|
|
"""
|
|
|
|
SELECT_REPLICATED_FETCHES = r"""SELECT
|
|
database,
|
|
table,
|
|
round(elapsed, 1) "elapsed",
|
|
round(100 * progress, 1) "progress",
|
|
partition_id,
|
|
result_part_name,
|
|
result_part_path,
|
|
total_size_bytes_compressed,
|
|
bytes_read_compressed,
|
|
source_replica_path,
|
|
source_replica_hostname,
|
|
source_replica_port,
|
|
interserver_scheme,
|
|
to_detached,
|
|
thread_id
|
|
FROM system.replicated_fetches
|
|
"""
|
|
|
|
SELECT_PARTS_PER_TABLE = r"""SELECT
|
|
database,
|
|
table,
|
|
count() "partitions",
|
|
sum(part_count) "parts",
|
|
max(part_count) "max_parts_per_partition"
|
|
FROM
|
|
(
|
|
SELECT
|
|
database,
|
|
table,
|
|
partition,
|
|
count() "part_count"
|
|
FROM system.parts
|
|
WHERE active
|
|
GROUP BY database, table, partition
|
|
) partitions
|
|
GROUP BY database, table
|
|
ORDER BY max_parts_per_partition DESC
|
|
LIMIT 10
|
|
"""
|
|
|
|
SELECT_MERGES = r"""SELECT
|
|
database,
|
|
table,
|
|
round(elapsed, 1) "elapsed",
|
|
round(100 * progress, 1) "progress",
|
|
is_mutation,
|
|
partition_id,
|
|
{% if version_ge('20.3') -%}
|
|
result_part_path,
|
|
source_part_paths,
|
|
{% endif -%}
|
|
num_parts,
|
|
formatReadableSize(total_size_bytes_compressed) "total_size_compressed",
|
|
formatReadableSize(bytes_read_uncompressed) "read_uncompressed",
|
|
formatReadableSize(bytes_written_uncompressed) "written_uncompressed",
|
|
columns_written,
|
|
{% if version_ge('20.3') -%}
|
|
formatReadableSize(memory_usage) "memory_usage",
|
|
thread_id
|
|
{% else -%}
|
|
formatReadableSize(memory_usage) "memory_usage"
|
|
{% endif -%}
|
|
FROM system.merges
|
|
"""
|
|
|
|
SELECT_MUTATIONS = r"""SELECT
|
|
database,
|
|
table,
|
|
mutation_id,
|
|
command,
|
|
create_time,
|
|
{% if version_ge('20.3') -%}
|
|
parts_to_do_names,
|
|
{% endif -%}
|
|
parts_to_do,
|
|
is_done,
|
|
latest_failed_part,
|
|
latest_fail_time,
|
|
latest_fail_reason
|
|
FROM system.mutations
|
|
WHERE NOT is_done
|
|
ORDER BY create_time DESC
|
|
"""
|
|
|
|
SELECT_RECENT_DATA_PARTS = r"""SELECT
|
|
database,
|
|
table,
|
|
engine,
|
|
partition_id,
|
|
name,
|
|
{% if version_ge('20.3') -%}
|
|
part_type,
|
|
{% endif -%}
|
|
active,
|
|
level,
|
|
{% if version_ge('20.3') -%}
|
|
disk_name,
|
|
{% endif -%}
|
|
path,
|
|
marks,
|
|
rows,
|
|
bytes_on_disk,
|
|
data_compressed_bytes,
|
|
data_uncompressed_bytes,
|
|
marks_bytes,
|
|
modification_time,
|
|
remove_time,
|
|
refcount,
|
|
is_frozen,
|
|
min_date,
|
|
max_date,
|
|
min_time,
|
|
max_time,
|
|
min_block_number,
|
|
max_block_number
|
|
FROM system.parts
|
|
WHERE modification_time > now() - INTERVAL 3 MINUTE
|
|
ORDER BY modification_time DESC
|
|
"""
|
|
|
|
SELECT_DETACHED_DATA_PARTS = r"""SELECT
|
|
database,
|
|
table,
|
|
partition_id,
|
|
name,
|
|
disk,
|
|
reason,
|
|
min_block_number,
|
|
max_block_number,
|
|
level
|
|
FROM system.detached_parts
|
|
"""
|
|
|
|
SELECT_PROCESSES = r"""SELECT
|
|
elapsed,
|
|
query_id,
|
|
{% if normalize_queries -%}
|
|
normalizeQuery(query) AS normalized_query,
|
|
{% else -%}
|
|
query,
|
|
{% endif -%}
|
|
is_cancelled,
|
|
concat(toString(read_rows), ' rows / ', formatReadableSize(read_bytes)) AS read,
|
|
concat(toString(written_rows), ' rows / ', formatReadableSize(written_bytes)) AS written,
|
|
formatReadableSize(memory_usage) AS "memory usage",
|
|
user,
|
|
multiIf(empty(client_name), http_user_agent, concat(client_name, ' ', toString(client_version_major), '.', toString(client_version_minor), '.', toString(client_version_patch))) AS client,
|
|
{% if version_ge('21.3') -%}
|
|
thread_ids,
|
|
{% endif -%}
|
|
{% if version_ge('21.8') -%}
|
|
ProfileEvents,
|
|
Settings
|
|
{% else -%}
|
|
ProfileEvents.Names,
|
|
ProfileEvents.Values,
|
|
Settings.Names,
|
|
Settings.Values
|
|
{% endif -%}
|
|
FROM system.processes
|
|
ORDER BY elapsed DESC
|
|
"""
|
|
|
|
SELECT_TOP_QUERIES_BY_DURATION = r"""SELECT
|
|
type,
|
|
query_start_time,
|
|
query_duration_ms,
|
|
query_id,
|
|
query_kind,
|
|
is_initial_query,
|
|
{% if normalize_queries -%}
|
|
normalizeQuery(query) AS normalized_query,
|
|
{% else -%}
|
|
query,
|
|
{% endif -%}
|
|
concat(toString(read_rows), ' rows / ', formatReadableSize(read_bytes)) AS read,
|
|
concat(toString(written_rows), ' rows / ', formatReadableSize(written_bytes)) AS written,
|
|
concat(toString(result_rows), ' rows / ', formatReadableSize(result_bytes)) AS result,
|
|
formatReadableSize(memory_usage) AS "memory usage",
|
|
exception,
|
|
'\n' || stack_trace AS stack_trace,
|
|
user,
|
|
initial_user,
|
|
multiIf(empty(client_name), http_user_agent, concat(client_name, ' ', toString(client_version_major), '.', toString(client_version_minor), '.', toString(client_version_patch))) AS client,
|
|
client_hostname,
|
|
{% if version_ge('21.3') -%}
|
|
databases,
|
|
tables,
|
|
columns,
|
|
used_aggregate_functions,
|
|
used_aggregate_function_combinators,
|
|
used_database_engines,
|
|
used_data_type_families,
|
|
used_dictionaries,
|
|
used_formats,
|
|
used_functions,
|
|
used_storages,
|
|
used_table_functions,
|
|
thread_ids,
|
|
{% endif -%}
|
|
{% if version_ge('21.8') -%}
|
|
ProfileEvents,
|
|
Settings
|
|
{% else -%}
|
|
ProfileEvents.Names,
|
|
ProfileEvents.Values,
|
|
Settings.Names,
|
|
Settings.Values
|
|
{% endif -%}
|
|
FROM system.query_log
|
|
WHERE type != 'QueryStart'
|
|
AND event_date >= today() - 1
|
|
AND event_time >= now() - INTERVAL 1 DAY
|
|
ORDER BY query_duration_ms DESC
|
|
LIMIT 10
|
|
"""
|
|
|
|
SELECT_TOP_QUERIES_BY_MEMORY_USAGE = r"""SELECT
|
|
type,
|
|
query_start_time,
|
|
query_duration_ms,
|
|
query_id,
|
|
query_kind,
|
|
is_initial_query,
|
|
{% if normalize_queries -%}
|
|
normalizeQuery(query) AS normalized_query,
|
|
{% else -%}
|
|
query,
|
|
{% endif -%}
|
|
concat(toString(read_rows), ' rows / ', formatReadableSize(read_bytes)) AS read,
|
|
concat(toString(written_rows), ' rows / ', formatReadableSize(written_bytes)) AS written,
|
|
concat(toString(result_rows), ' rows / ', formatReadableSize(result_bytes)) AS result,
|
|
formatReadableSize(memory_usage) AS "memory usage",
|
|
exception,
|
|
'\n' || stack_trace AS stack_trace,
|
|
user,
|
|
initial_user,
|
|
multiIf(empty(client_name), http_user_agent, concat(client_name, ' ', toString(client_version_major), '.', toString(client_version_minor), '.', toString(client_version_patch))) AS client,
|
|
client_hostname,
|
|
{% if version_ge('21.3') -%}
|
|
databases,
|
|
tables,
|
|
columns,
|
|
used_aggregate_functions,
|
|
used_aggregate_function_combinators,
|
|
used_database_engines,
|
|
used_data_type_families,
|
|
used_dictionaries,
|
|
used_formats,
|
|
used_functions,
|
|
used_storages,
|
|
used_table_functions,
|
|
thread_ids,
|
|
{% endif -%}
|
|
{% if version_ge('21.8') -%}
|
|
ProfileEvents,
|
|
Settings
|
|
{% else -%}
|
|
ProfileEvents.Names,
|
|
ProfileEvents.Values,
|
|
Settings.Names,
|
|
Settings.Values
|
|
{% endif -%}
|
|
FROM system.query_log
|
|
WHERE type != 'QueryStart'
|
|
AND event_date >= today() - 1
|
|
AND event_time >= now() - INTERVAL 1 DAY
|
|
ORDER BY memory_usage DESC
|
|
LIMIT 10
|
|
"""
|
|
|
|
SELECT_FAILED_QUERIES = r"""SELECT
|
|
type,
|
|
query_start_time,
|
|
query_duration_ms,
|
|
query_id,
|
|
query_kind,
|
|
is_initial_query,
|
|
{% if normalize_queries -%}
|
|
normalizeQuery(query) AS normalized_query,
|
|
{% else -%}
|
|
query,
|
|
{% endif -%}
|
|
concat(toString(read_rows), ' rows / ', formatReadableSize(read_bytes)) AS read,
|
|
concat(toString(written_rows), ' rows / ', formatReadableSize(written_bytes)) AS written,
|
|
concat(toString(result_rows), ' rows / ', formatReadableSize(result_bytes)) AS result,
|
|
formatReadableSize(memory_usage) AS "memory usage",
|
|
exception,
|
|
'\n' || stack_trace AS stack_trace,
|
|
user,
|
|
initial_user,
|
|
multiIf(empty(client_name), http_user_agent, concat(client_name, ' ', toString(client_version_major), '.', toString(client_version_minor), '.', toString(client_version_patch))) AS client,
|
|
client_hostname,
|
|
{% if version_ge('21.3') -%}
|
|
databases,
|
|
tables,
|
|
columns,
|
|
used_aggregate_functions,
|
|
used_aggregate_function_combinators,
|
|
used_database_engines,
|
|
used_data_type_families,
|
|
used_dictionaries,
|
|
used_formats,
|
|
used_functions,
|
|
used_storages,
|
|
used_table_functions,
|
|
thread_ids,
|
|
{% endif -%}
|
|
{% if version_ge('21.8') -%}
|
|
ProfileEvents,
|
|
Settings
|
|
{% else -%}
|
|
ProfileEvents.Names,
|
|
ProfileEvents.Values,
|
|
Settings.Names,
|
|
Settings.Values
|
|
{% endif -%}
|
|
FROM system.query_log
|
|
WHERE type != 'QueryStart'
|
|
AND event_date >= today() - 1
|
|
AND event_time >= now() - INTERVAL 1 DAY
|
|
AND exception != ''
|
|
ORDER BY query_start_time DESC
|
|
LIMIT 10
|
|
"""
|
|
|
|
SELECT_STACK_TRACES = r"""SELECT
|
|
'\n' || arrayStringConcat(
|
|
arrayMap(
|
|
x,
|
|
y -> concat(x, ': ', y),
|
|
arrayMap(x -> addressToLine(x), trace),
|
|
arrayMap(x -> demangle(addressToSymbol(x)), trace)),
|
|
'\n') AS trace
|
|
FROM system.stack_trace
|
|
"""
|
|
|
|
SELECT_CRASH_LOG = r"""SELECT
|
|
event_time,
|
|
signal,
|
|
thread_id,
|
|
query_id,
|
|
'\n' || arrayStringConcat(trace_full, '\n') AS trace,
|
|
version
|
|
FROM system.crash_log
|
|
ORDER BY event_time DESC
|
|
"""
|
|
|
|
|
|
def retry(exception_types, max_attempts=5, max_interval=5):
|
|
"""
|
|
Function decorator that retries wrapped function on failures.
|
|
"""
|
|
return tenacity.retry(
|
|
retry=tenacity.retry_if_exception_type(exception_types),
|
|
wait=tenacity.wait_random_exponential(multiplier=0.5, max=max_interval),
|
|
stop=tenacity.stop_after_attempt(max_attempts),
|
|
reraise=True,
|
|
)
|
|
|
|
|
|
class ClickhouseError(Exception):
|
|
"""
|
|
ClickHouse interaction error.
|
|
"""
|
|
|
|
def __init__(self, response):
|
|
self.response = response
|
|
super().__init__(self.response.text.strip())
|
|
|
|
|
|
class ClickhouseClient:
|
|
"""
|
|
ClickHouse client.
|
|
"""
|
|
|
|
def __init__(self, *, host="localhost", port=8123, user="default", password):
|
|
self._session = requests.Session()
|
|
if user:
|
|
self._session.headers["X-ClickHouse-User"] = user
|
|
self._session.headers["X-ClickHouse-Key"] = password
|
|
self._url = f"http://{host}:{port}"
|
|
self._timeout = 60
|
|
self._ch_version = None
|
|
|
|
@property
|
|
def clickhouse_version(self):
|
|
if self._ch_version is None:
|
|
self._ch_version = self.query(SELECT_VERSION)
|
|
|
|
return self._ch_version
|
|
|
|
@retry(requests.exceptions.ConnectionError)
|
|
def query(
|
|
self,
|
|
query,
|
|
query_args=None,
|
|
format=None,
|
|
post_data=None,
|
|
timeout=None,
|
|
echo=False,
|
|
dry_run=False,
|
|
):
|
|
"""
|
|
Execute query.
|
|
"""
|
|
if query_args:
|
|
query = self.render_query(query, **query_args)
|
|
|
|
if format:
|
|
query += f" FORMAT {format}"
|
|
|
|
if timeout is None:
|
|
timeout = self._timeout
|
|
|
|
if echo:
|
|
print(sqlparse.format(query, reindent=True), "\n")
|
|
|
|
if dry_run:
|
|
return None
|
|
|
|
try:
|
|
response = self._session.post(
|
|
self._url,
|
|
params={
|
|
"query": query,
|
|
},
|
|
json=post_data,
|
|
timeout=timeout,
|
|
)
|
|
|
|
response.raise_for_status()
|
|
|
|
if format in ("JSON", "JSONCompact"):
|
|
return response.json()
|
|
|
|
return response.text.strip()
|
|
except requests.exceptions.HTTPError as e:
|
|
raise ClickhouseError(e.response) from None
|
|
|
|
def render_query(self, query, **kwargs):
|
|
env = jinja2.Environment()
|
|
|
|
env.globals["version_ge"] = lambda version: version_ge(
|
|
self.clickhouse_version, version
|
|
)
|
|
|
|
template = env.from_string(query)
|
|
return template.render(kwargs)
|
|
|
|
|
|
class ClickhouseConfig:
|
|
"""
|
|
ClickHouse server configuration.
|
|
"""
|
|
|
|
def __init__(self, config):
|
|
self._config = config
|
|
|
|
def dump(self, mask_secrets=True):
|
|
config = deepcopy(self._config)
|
|
if mask_secrets:
|
|
self._mask_secrets(config)
|
|
|
|
return xmltodict.unparse(config, pretty=True)
|
|
|
|
@classmethod
|
|
def load(cls):
|
|
return ClickhouseConfig(
|
|
cls._load_config("/var/lib/clickhouse/preprocessed_configs/config.xml")
|
|
)
|
|
|
|
@staticmethod
|
|
def _load_config(config_path):
|
|
with open(config_path, "r") as file:
|
|
return xmltodict.parse(file.read())
|
|
|
|
@classmethod
|
|
def _mask_secrets(cls, config):
|
|
if isinstance(config, MutableMapping):
|
|
for key, value in list(config.items()):
|
|
if isinstance(value, MutableMapping):
|
|
cls._mask_secrets(config[key])
|
|
elif key in ("password", "secret_access_key", "header", "identity"):
|
|
config[key] = "*****"
|
|
|
|
|
|
class DiagnosticsData:
|
|
"""
|
|
Diagnostics data.
|
|
"""
|
|
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.host = args.host
|
|
self._sections = [{"section": None, "data": {}}]
|
|
|
|
def add_string(self, name, value, section=None):
|
|
self._section(section)[name] = {
|
|
"type": "string",
|
|
"value": value,
|
|
}
|
|
|
|
def add_xml_document(self, name, document, section=None):
|
|
self._section(section)[name] = {
|
|
"type": "xml",
|
|
"value": document,
|
|
}
|
|
|
|
def add_query(self, name, query, result, section=None):
|
|
self._section(section)[name] = {
|
|
"type": "query",
|
|
"query": query,
|
|
"result": result,
|
|
}
|
|
|
|
def add_command(self, name, command, result, section=None):
|
|
self._section(section)[name] = {
|
|
"type": "command",
|
|
"command": command,
|
|
"result": result,
|
|
}
|
|
|
|
def dump(self, format):
|
|
if format.startswith("json"):
|
|
result = self._dump_json()
|
|
elif format.startswith("yaml"):
|
|
result = self._dump_yaml()
|
|
else:
|
|
result = self._dump_wiki()
|
|
|
|
if format.endswith(".gz"):
|
|
compressor = gzip.GzipFile(mode="wb", fileobj=sys.stdout.buffer)
|
|
compressor.write(result.encode())
|
|
else:
|
|
print(result)
|
|
|
|
def _section(self, name=None):
|
|
if self._sections[-1]["section"] != name:
|
|
self._sections.append({"section": name, "data": {}})
|
|
|
|
return self._sections[-1]["data"]
|
|
|
|
def _dump_json(self):
|
|
"""
|
|
Dump diagnostic data in JSON format.
|
|
"""
|
|
return json.dumps(self._sections, indent=2, ensure_ascii=False)
|
|
|
|
def _dump_yaml(self):
|
|
"""
|
|
Dump diagnostic data in YAML format.
|
|
"""
|
|
return yaml.dump(self._sections, default_flow_style=False, allow_unicode=True)
|
|
|
|
def _dump_wiki(self):
|
|
"""
|
|
Dump diagnostic data in Wiki format.
|
|
"""
|
|
|
|
def _write_title(buffer, value):
|
|
buffer.write(f"### {value}\n")
|
|
|
|
def _write_subtitle(buffer, value):
|
|
buffer.write(f"#### {value}\n")
|
|
|
|
def _write_string_item(buffer, name, item):
|
|
value = item["value"]
|
|
if value != "":
|
|
value = f"**{value}**"
|
|
buffer.write(f"{name}: {value}\n")
|
|
|
|
def _write_xml_item(buffer, section_name, name, item):
|
|
if section_name:
|
|
buffer.write(f"##### {name}\n")
|
|
else:
|
|
_write_subtitle(buffer, name)
|
|
|
|
_write_result(buffer, item["value"], format="XML")
|
|
|
|
def _write_query_item(buffer, section_name, name, item):
|
|
if section_name:
|
|
buffer.write(f"##### {name}\n")
|
|
else:
|
|
_write_subtitle(buffer, name)
|
|
|
|
_write_query(buffer, item["query"])
|
|
_write_result(buffer, item["result"])
|
|
|
|
def _write_command_item(buffer, section_name, name, item):
|
|
if section_name:
|
|
buffer.write(f"##### {name}\n")
|
|
else:
|
|
_write_subtitle(buffer, name)
|
|
|
|
_write_command(buffer, item["command"])
|
|
_write_result(buffer, item["result"])
|
|
|
|
def _write_unknown_item(buffer, section_name, name, item):
|
|
if section_name:
|
|
buffer.write(f"**{name}**\n")
|
|
else:
|
|
_write_subtitle(buffer, name)
|
|
|
|
json.dump(item, buffer, indent=2)
|
|
|
|
def _write_query(buffer, query):
|
|
buffer.write("**query**\n")
|
|
buffer.write("```sql\n")
|
|
buffer.write(query)
|
|
buffer.write("\n```\n")
|
|
|
|
def _write_command(buffer, command):
|
|
buffer.write("**command**\n")
|
|
buffer.write("```\n")
|
|
buffer.write(command)
|
|
buffer.write("\n```\n")
|
|
|
|
def _write_result(buffer, result, format=None):
|
|
buffer.write("**result**\n")
|
|
buffer.write(f"```{format}\n" if format else "```\n")
|
|
buffer.write(result)
|
|
buffer.write("\n```\n")
|
|
|
|
buffer = io.StringIO()
|
|
|
|
_write_title(buffer, f"Diagnostics data for host {self.host}")
|
|
for section in self._sections:
|
|
section_name = section["section"]
|
|
if section_name:
|
|
_write_subtitle(buffer, section_name)
|
|
|
|
for name, item in section["data"].items():
|
|
if item["type"] == "string":
|
|
_write_string_item(buffer, name, item)
|
|
elif item["type"] == "query":
|
|
_write_query_item(buffer, section_name, name, item)
|
|
elif item["type"] == "command":
|
|
_write_command_item(buffer, section_name, name, item)
|
|
elif item["type"] == "xml":
|
|
_write_xml_item(buffer, section_name, name, item)
|
|
else:
|
|
_write_unknown_item(buffer, section_name, name, item)
|
|
|
|
return buffer.getvalue()
|
|
|
|
|
|
def main():
|
|
"""
|
|
Program entry point.
|
|
"""
|
|
args = parse_args()
|
|
timestamp = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
|
|
client = ClickhouseClient(
|
|
host=args.host, port=args.port, user=args.user, password=args.password
|
|
)
|
|
ch_config = ClickhouseConfig.load()
|
|
version = client.clickhouse_version
|
|
system_tables = [
|
|
row[0]
|
|
for row in execute_query(client, SELECT_SYSTEM_TABLES, format="JSONCompact")[
|
|
"data"
|
|
]
|
|
]
|
|
|
|
diagnostics = DiagnosticsData(args)
|
|
diagnostics.add_string("Version", version)
|
|
diagnostics.add_string("Timestamp", timestamp)
|
|
diagnostics.add_string("Uptime", execute_query(client, SELECT_UPTIME))
|
|
|
|
diagnostics.add_xml_document("ClickHouse configuration", ch_config.dump())
|
|
|
|
if version_ge(version, "20.8"):
|
|
add_query(
|
|
diagnostics,
|
|
"Access configuration",
|
|
client=client,
|
|
query=SELECT_ACCESS,
|
|
format="TSVRaw",
|
|
)
|
|
add_query(
|
|
diagnostics,
|
|
"Quotas",
|
|
client=client,
|
|
query=SELECT_QUOTA_USAGE,
|
|
format="Vertical",
|
|
)
|
|
|
|
add_query(
|
|
diagnostics,
|
|
"Database engines",
|
|
client=client,
|
|
query=SELECT_DATABASE_ENGINES,
|
|
format="PrettyCompactNoEscapes",
|
|
section="Schema",
|
|
)
|
|
add_query(
|
|
diagnostics,
|
|
"Databases (top 10 by size)",
|
|
client=client,
|
|
query=SELECT_DATABASES,
|
|
format="PrettyCompactNoEscapes",
|
|
section="Schema",
|
|
)
|
|
add_query(
|
|
diagnostics,
|
|
"Table engines",
|
|
client=client,
|
|
query=SELECT_TABLE_ENGINES,
|
|
format="PrettyCompactNoEscapes",
|
|
section="Schema",
|
|
)
|
|
add_query(
|
|
diagnostics,
|
|
"Dictionaries",
|
|
client=client,
|
|
query=SELECT_DICTIONARIES,
|
|
format="PrettyCompactNoEscapes",
|
|
section="Schema",
|
|
)
|
|
|
|
add_query(
|
|
diagnostics,
|
|
"Replicated tables (top 10 by absolute delay)",
|
|
client=client,
|
|
query=SELECT_REPLICAS,
|
|
format="PrettyCompactNoEscapes",
|
|
section="Replication",
|
|
)
|
|
add_query(
|
|
diagnostics,
|
|
"Replication queue (top 20 oldest tasks)",
|
|
client=client,
|
|
query=SELECT_REPLICATION_QUEUE,
|
|
format="Vertical",
|
|
section="Replication",
|
|
)
|
|
if version_ge(version, "21.3"):
|
|
add_query(
|
|
diagnostics,
|
|
"Replicated fetches",
|
|
client=client,
|
|
query=SELECT_REPLICATED_FETCHES,
|
|
format="Vertical",
|
|
section="Replication",
|
|
)
|
|
|
|
add_query(
|
|
diagnostics,
|
|
"Top 10 tables by max parts per partition",
|
|
client=client,
|
|
query=SELECT_PARTS_PER_TABLE,
|
|
format="PrettyCompactNoEscapes",
|
|
)
|
|
add_query(
|
|
diagnostics,
|
|
"Merges in progress",
|
|
client=client,
|
|
query=SELECT_MERGES,
|
|
format="Vertical",
|
|
)
|
|
add_query(
|
|
diagnostics,
|
|
"Mutations in progress",
|
|
client=client,
|
|
query=SELECT_MUTATIONS,
|
|
format="Vertical",
|
|
)
|
|
add_query(
|
|
diagnostics,
|
|
"Recent data parts (modification time within last 3 minutes)",
|
|
client=client,
|
|
query=SELECT_RECENT_DATA_PARTS,
|
|
format="Vertical",
|
|
)
|
|
|
|
add_query(
|
|
diagnostics,
|
|
"system.detached_parts",
|
|
client=client,
|
|
query=SELECT_DETACHED_DATA_PARTS,
|
|
format="PrettyCompactNoEscapes",
|
|
section="Detached data",
|
|
)
|
|
add_command(
|
|
diagnostics,
|
|
"Disk space usage",
|
|
command="du -sh -L -c /var/lib/clickhouse/data/*/*/detached/* | sort -rsh",
|
|
section="Detached data",
|
|
)
|
|
|
|
add_query(
|
|
diagnostics,
|
|
"Queries in progress (process list)",
|
|
client=client,
|
|
query=SELECT_PROCESSES,
|
|
format="Vertical",
|
|
section="Queries",
|
|
)
|
|
add_query(
|
|
diagnostics,
|
|
"Top 10 queries by duration",
|
|
client=client,
|
|
query=SELECT_TOP_QUERIES_BY_DURATION,
|
|
format="Vertical",
|
|
section="Queries",
|
|
)
|
|
add_query(
|
|
diagnostics,
|
|
"Top 10 queries by memory usage",
|
|
client=client,
|
|
query=SELECT_TOP_QUERIES_BY_MEMORY_USAGE,
|
|
format="Vertical",
|
|
section="Queries",
|
|
)
|
|
add_query(
|
|
diagnostics,
|
|
"Last 10 failed queries",
|
|
client=client,
|
|
query=SELECT_FAILED_QUERIES,
|
|
format="Vertical",
|
|
section="Queries",
|
|
)
|
|
|
|
add_query(
|
|
diagnostics,
|
|
"Stack traces",
|
|
client=client,
|
|
query=SELECT_STACK_TRACES,
|
|
format="Vertical",
|
|
)
|
|
|
|
if "crash_log" in system_tables:
|
|
add_query(
|
|
diagnostics,
|
|
"Crash log",
|
|
client=client,
|
|
query=SELECT_CRASH_LOG,
|
|
format="Vertical",
|
|
)
|
|
|
|
add_command(diagnostics, "uname", "uname -a")
|
|
|
|
diagnostics.dump(args.format)
|
|
|
|
|
|
def parse_args():
|
|
"""
|
|
Parse command-line arguments.
|
|
"""
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
"--format",
|
|
choices=["json", "yaml", "json.gz", "yaml.gz", "wiki", "wiki.gz"],
|
|
default="wiki",
|
|
)
|
|
parser.add_argument("--normalize-queries", action="store_true", default=False)
|
|
parser.add_argument("--host", dest="host", help="clickhouse host")
|
|
parser.add_argument(
|
|
"--port", dest="port", default=8123, help="clickhouse http port"
|
|
)
|
|
parser.add_argument(
|
|
"--user", dest="user", default="default", help="clickhouse user"
|
|
)
|
|
parser.add_argument("--password", dest="password", help="clickhouse password")
|
|
return parser.parse_args()
|
|
|
|
|
|
def add_query(diagnostics, name, client, query, format, section=None):
|
|
query_args = {
|
|
"normalize_queries": diagnostics.args.normalize_queries,
|
|
}
|
|
query = client.render_query(query, **query_args)
|
|
diagnostics.add_query(
|
|
name=name,
|
|
query=query,
|
|
result=execute_query(client, query, render_query=False, format=format),
|
|
section=section,
|
|
)
|
|
|
|
|
|
def execute_query(client, query, render_query=True, format=None):
|
|
if render_query:
|
|
query = client.render_query(query)
|
|
|
|
try:
|
|
return client.query(query, format=format)
|
|
except Exception as e:
|
|
return repr(e)
|
|
|
|
|
|
def add_command(diagnostics, name, command, section=None):
|
|
diagnostics.add_command(
|
|
name=name, command=command, result=execute_command(command), section=section
|
|
)
|
|
|
|
|
|
def execute_command(command, input=None):
|
|
proc = subprocess.Popen(
|
|
command,
|
|
shell=True,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
if isinstance(input, str):
|
|
input = input.encode()
|
|
|
|
stdout, stderr = proc.communicate(input=input)
|
|
|
|
if proc.returncode:
|
|
return f"failed with exit code {proc.returncode}\n{stderr.decode()}"
|
|
|
|
return stdout.decode()
|
|
|
|
|
|
def version_ge(version1, version2):
|
|
"""
|
|
Return True if version1 is greater or equal than version2.
|
|
"""
|
|
return parse_version(version1) >= parse_version(version2)
|
|
|
|
|
|
def parse_version(version):
|
|
"""
|
|
Parse version string.
|
|
"""
|
|
return [int(x) for x in version.strip().split(".") if x.isnumeric()]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|