#!/usr/bin/env python3 import argparse import socket import uuid from kazoo.client import KazooClient def parse_args(): """ Parse command-line arguments. """ parser = argparse.ArgumentParser() parser.add_argument('--hosts', default=socket.getfqdn() + ':2181', help='ZooKeeper hosts (host:port,host:port,...)') parser.add_argument('-s', '--secure', default=False, action='store_true', help='Use secure connection') parser.add_argument('--cert', default='', help='Client TLS certificate file') parser.add_argument('--key', default='', help='Client TLS key file') parser.add_argument('--ca', default='', help='Client TLS ca file') parser.add_argument('-u', '--user', default='', help='ZooKeeper ACL user') parser.add_argument('-p', '--password', default='', help='ZooKeeper ACL password') parser.add_argument('-r', '--root', default='/clickhouse', help='ZooKeeper root path for ClickHouse') parser.add_argument('-z', '--zcroot', default='clickhouse/zero_copy', help='ZooKeeper node for new zero-copy data') parser.add_argument('--dryrun', default=False, action='store_true', help='Do not perform any actions') parser.add_argument('--cleanup', default=False, action='store_true', help='Clean old nodes') parser.add_argument('-v', '--verbose', action='store_true', default=False, help='Verbose mode') return parser.parse_args() # Several folders to heuristic that zookeepr node is folder node # May be false positive when someone creates set of tables with same paths table_nodes = ['alter_partition_version', 'block_numbers', 'blocks', 'columns', 'leader_election'] zc_nodes = ['zero_copy_s3', 'zero_copy_hdfs'] def convert_node(client, args, path, zc_node): base_path = f'{path}/{zc_node}/shared' parts = client.get_children(base_path) table_id_path = f'{path}/table_shared_id' table_id = '' if client.exists(table_id_path): table_id = client.get(table_id_path)[0].decode('UTF-8') else: table_id = str(uuid.uuid4()) if args.verbose: print(f'Make table_id "{table_id_path}" = "{table_id}"') if not args.dryrun: client.create(table_id_path, bytes(table_id, 'UTF-8')) for part in parts: part_path = f'{base_path}/{part}' uniq_ids = client.get_children(part_path) for uniq_id in uniq_ids: uniq_path = f'{part_path}/{uniq_id}' replicas = client.get_children(uniq_path) for replica in replicas: replica_path = f'{uniq_path}/{replica}' new_path = f'{args.root}/{args.zcroot}/{zc_node}/{table_id}/{part}/{uniq_id}/{replica}' if not client.exists(new_path): if args.verbose: print(f'Make node "{new_path}"') if not args.dryrun: client.ensure_path(f'{args.root}/{args.zcroot}/{zc_node}/{table_id}/{part}/{uniq_id}') client.create(new_path, value=b'lock') if args.cleanup: if args.verbose: print(f'Remove node "{replica_path}"') if not args.dryrun: client.delete(replica_path) if args.cleanup and not args.dryrun: client.delete(uniq_path) if args.cleanup and not args.dryrun: client.delete(part_path) if args.cleanup and not args.dryrun: client.delete(base_path) client.delete(f'{path}/{zc_node}') def convert_table(client, args, path, nodes): print(f'Convert table nodes by path "{path}"') for zc_node in zc_nodes: if zc_node in nodes: convert_node(client, args, path, zc_node) def is_like_a_table(nodes): for tn in table_nodes: if tn not in nodes: return False return True def scan_recursive(client, args, path): nodes = client.get_children(path) if is_like_a_table(nodes): convert_table(client, args, path, nodes) else: for node in nodes: scan_recursive(client, args, f'{path}/{node}') def scan(client, args): nodes = client.get_children(args.root) for node in nodes: if node != args.zcroot: scan_recursive(client, args, f'{args.root}/{node}') def get_client(args): client = KazooClient(connection_retry=3, command_retry=3, timeout=1, hosts=args.hosts, use_ssl=args.secure, certfile=args.cert, keyfile=args.key, ca=args.ca ) client.start() if (args.user and args.password): client.add_auth('digest', f'{args.user}:{args.password}') return client def main(): args = parse_args() client = get_client(args) scan(client, args) if __name__ == '__main__': main()