mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-13 18:02:24 +00:00
167 lines
5.0 KiB
Python
Executable File
167 lines
5.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import socket
|
|
import uuid
|
|
|
|
from kazoo.client import KazooClient
|
|
|
|
|
|
def parse_args():
|
|
"""
|
|
Parse command-line arguments.
|
|
"""
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
"--hosts",
|
|
default=socket.getfqdn() + ":2181",
|
|
help="ZooKeeper hosts (host:port,host:port,...)",
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--secure",
|
|
default=False,
|
|
action="store_true",
|
|
help="Use secure connection",
|
|
)
|
|
parser.add_argument("--cert", default="", help="Client TLS certificate file")
|
|
parser.add_argument("--key", default="", help="Client TLS key file")
|
|
parser.add_argument("--ca", default="", help="Client TLS ca file")
|
|
parser.add_argument("-u", "--user", default="", help="ZooKeeper ACL user")
|
|
parser.add_argument("-p", "--password", default="", help="ZooKeeper ACL password")
|
|
parser.add_argument(
|
|
"-r", "--root", default="/clickhouse", help="ZooKeeper root path for ClickHouse"
|
|
)
|
|
parser.add_argument(
|
|
"-z",
|
|
"--zcroot",
|
|
default="clickhouse/zero_copy",
|
|
help="ZooKeeper node for new zero-copy data",
|
|
)
|
|
parser.add_argument(
|
|
"--dryrun",
|
|
default=False,
|
|
action="store_true",
|
|
help="Do not perform any actions",
|
|
)
|
|
parser.add_argument(
|
|
"--cleanup", default=False, action="store_true", help="Clean old nodes"
|
|
)
|
|
parser.add_argument(
|
|
"-v", "--verbose", action="store_true", default=False, help="Verbose mode"
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
# Several folders to heuristic that zookeepr node is folder node
|
|
# May be false positive when someone creates set of tables with same paths
|
|
table_nodes = [
|
|
"alter_partition_version",
|
|
"block_numbers",
|
|
"blocks",
|
|
"columns",
|
|
"leader_election",
|
|
]
|
|
zc_nodes = ["zero_copy_s3", "zero_copy_hdfs"]
|
|
|
|
|
|
def convert_node(client, args, path, zc_node):
|
|
base_path = f"{path}/{zc_node}/shared"
|
|
parts = client.get_children(base_path)
|
|
table_id_path = f"{path}/table_shared_id"
|
|
table_id = ""
|
|
if client.exists(table_id_path):
|
|
table_id = client.get(table_id_path)[0].decode("UTF-8")
|
|
else:
|
|
table_id = str(uuid.uuid4())
|
|
if args.verbose:
|
|
print(f'Make table_id "{table_id_path}" = "{table_id}"')
|
|
if not args.dryrun:
|
|
client.create(table_id_path, bytes(table_id, "UTF-8"))
|
|
for part in parts:
|
|
part_path = f"{base_path}/{part}"
|
|
uniq_ids = client.get_children(part_path)
|
|
for uniq_id in uniq_ids:
|
|
uniq_path = f"{part_path}/{uniq_id}"
|
|
replicas = client.get_children(uniq_path)
|
|
for replica in replicas:
|
|
replica_path = f"{uniq_path}/{replica}"
|
|
new_path = f"{args.root}/{args.zcroot}/{zc_node}/{table_id}/{part}/{uniq_id}/{replica}"
|
|
if not client.exists(new_path):
|
|
if args.verbose:
|
|
print(f'Make node "{new_path}"')
|
|
if not args.dryrun:
|
|
client.ensure_path(
|
|
f"{args.root}/{args.zcroot}/{zc_node}/{table_id}/{part}/{uniq_id}"
|
|
)
|
|
client.create(new_path, value=b"lock")
|
|
if args.cleanup:
|
|
if args.verbose:
|
|
print(f'Remove node "{replica_path}"')
|
|
if not args.dryrun:
|
|
client.delete(replica_path)
|
|
if args.cleanup and not args.dryrun:
|
|
client.delete(uniq_path)
|
|
if args.cleanup and not args.dryrun:
|
|
client.delete(part_path)
|
|
if args.cleanup and not args.dryrun:
|
|
client.delete(base_path)
|
|
client.delete(f"{path}/{zc_node}")
|
|
|
|
|
|
def convert_table(client, args, path, nodes):
|
|
print(f'Convert table nodes by path "{path}"')
|
|
for zc_node in zc_nodes:
|
|
if zc_node in nodes:
|
|
convert_node(client, args, path, zc_node)
|
|
|
|
|
|
def is_like_a_table(nodes):
|
|
for tn in table_nodes:
|
|
if tn not in nodes:
|
|
return False
|
|
return True
|
|
|
|
|
|
def scan_recursive(client, args, path):
|
|
nodes = client.get_children(path)
|
|
if is_like_a_table(nodes):
|
|
convert_table(client, args, path, nodes)
|
|
else:
|
|
for node in nodes:
|
|
scan_recursive(client, args, f"{path}/{node}")
|
|
|
|
|
|
def scan(client, args):
|
|
nodes = client.get_children(args.root)
|
|
for node in nodes:
|
|
if node != args.zcroot:
|
|
scan_recursive(client, args, f"{args.root}/{node}")
|
|
|
|
|
|
def get_client(args):
|
|
client = KazooClient(
|
|
connection_retry=3,
|
|
command_retry=3,
|
|
timeout=1,
|
|
hosts=args.hosts,
|
|
use_ssl=args.secure,
|
|
certfile=args.cert,
|
|
keyfile=args.key,
|
|
ca=args.ca,
|
|
)
|
|
client.start()
|
|
if args.user and args.password:
|
|
client.add_auth("digest", f"{args.user}:{args.password}")
|
|
return client
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
client = get_client(args)
|
|
scan(client, args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|