ClickHouse/utils/zero_copy/zero_copy_schema_converter.py

130 lines
4.8 KiB
Python
Raw Normal View History

2021-12-20 17:24:33 +00:00
#!/usr/bin/env python3
import argparse
import socket
import uuid
from kazoo.client import KazooClient
def parse_args():
"""
Parse command-line arguments.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--hosts', default=socket.getfqdn() + ':2181', help='ZooKeeper hosts (host:port,host:port,...)')
parser.add_argument('-s', '--secure', default=False, action='store_true', help='Use secure connection')
parser.add_argument('--cert', default='', help='Client TLS certificate file')
parser.add_argument('--key', default='', help='Client TLS key file')
parser.add_argument('--ca', default='', help='Client TLS ca file')
parser.add_argument('-u', '--user', default='', help='ZooKeeper ACL user')
parser.add_argument('-p', '--password', default='', help='ZooKeeper ACL password')
parser.add_argument('-r', '--root', default='/clickhouse', help='ZooKeeper root path for ClickHouse')
parser.add_argument('-z', '--zcroot', default='zero_copy', help='ZooKeeper node for new zero-copy data')
2021-12-21 14:55:20 +00:00
parser.add_argument('--dryrun', default=False, action='store_true', help='Do not perform any actions')
2021-12-20 17:24:33 +00:00
parser.add_argument('--cleanup', default=False, action='store_true', help='Clean old nodes')
parser.add_argument('-v', '--verbose', action='store_true', default=False, help='Verbose mode')
return parser.parse_args()
2021-12-21 14:55:20 +00:00
# Several folders to heuristic that zookeepr node is folder node
2021-12-20 17:24:33 +00:00
# May be false positive when someone creates set of tables with same paths
table_nodes = ['alter_partition_version', 'block_numbers', 'blocks', 'columns', 'leader_election']
zc_nodes = ['zero_copy_s3', 'zero_copy_hdfs']
def convert_node(client, args, path, zc_node):
base_path = f'{path}/{zc_node}/shared'
parts = client.get_children(base_path)
table_id_path = f'{path}/table_id'
table_id = ''
if client.exists(table_id_path):
table_id = client.get(table_id_path)[0].decode('UTF-8')
else:
table_id = str(uuid.uuid4())
if args.verbose:
print(f'Make table_id "{table_id_path}" = "{table_id}"')
if not args.dryrun:
client.create(table_id_path, bytes(table_id, 'UTF-8'))
for part in parts:
part_path = f'{base_path}/{part}'
uniq_ids = client.get_children(part_path)
for uniq_id in uniq_ids:
uniq_path = f'{part_path}/{uniq_id}'
replicas = client.get_children(uniq_path)
for replica in replicas:
replica_path = f'{uniq_path}/{replica}'
new_path = f'{args.root}/{args.zcroot}/{zc_node}/{table_id}/{part}/{uniq_id}/{replica}'
if not client.exists(new_path):
if args.verbose:
print(f'Make node "{new_path}"')
if not args.dryrun:
client.ensure_path(f'{args.root}/{args.zcroot}/{zc_node}/{table_id}/{part}/{uniq_id}')
client.create(new_path, value=b'lock')
if args.cleanup:
if args.verbose:
print(f'Remove node "{replica_path}"')
if not args.dryrun:
client.delete(replica_path)
if args.cleanup and not args.dryrun:
client.delete(uniq_path)
if args.cleanup and not args.dryrun:
client.delete(part_path)
if args.cleanup and not args.dryrun:
client.delete(base_path)
client.delete(f'{path}/{zc_node}')
def convert_table(client, args, path, nodes):
print(f'Convert table nodes by path "{path}"')
for zc_node in zc_nodes:
if zc_node in nodes:
convert_node(client, args, path, zc_node)
def is_like_a_table(nodes):
for tn in table_nodes:
if tn not in nodes:
return False
return True
def scan_recursive(client, args, path):
nodes = client.get_children(path)
if is_like_a_table(nodes):
convert_table(client, args, path, nodes)
else:
for node in nodes:
scan_recursive(client, args, f'{path}/{node}')
def scan(client, args):
nodes = client.get_children(args.root)
for node in nodes:
if node != args.zcroot:
scan_recursive(client, args, f'{args.root}/{node}')
def get_client(args):
client = KazooClient(connection_retry=3,
command_retry=3,
timeout=1,
hosts=args.hosts,
use_ssl=args.secure,
certfile=args.cert,
keyfile=args.key,
ca=args.ca
)
client.start()
if (args.user and args.password):
client.add_auth('digest', f'{args.user}:{args.password}')
return client
def main():
args = parse_args()
client = get_client(args)
scan(client, args)
if __name__ == '__main__':
main()