ClickHouse/utils/zero_copy/zero_copy_schema_converter.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

167 lines
5.0 KiB
Python
Raw Normal View History

2021-12-20 17:24:33 +00:00
#!/usr/bin/env python3
import argparse
import socket
import uuid
2024-09-27 10:19:39 +00:00
2021-12-20 17:24:33 +00:00
from kazoo.client import KazooClient
2021-12-20 17:24:33 +00:00
def parse_args():
"""
Parse command-line arguments.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--hosts",
default=socket.getfqdn() + ":2181",
help="ZooKeeper hosts (host:port,host:port,...)",
)
parser.add_argument(
"-s",
"--secure",
default=False,
action="store_true",
help="Use secure connection",
)
2021-12-20 17:24:33 +00:00
parser.add_argument("--cert", default="", help="Client TLS certificate file")
parser.add_argument("--key", default="", help="Client TLS key file")
parser.add_argument("--ca", default="", help="Client TLS ca file")
parser.add_argument("-u", "--user", default="", help="ZooKeeper ACL user")
parser.add_argument("-p", "--password", default="", help="ZooKeeper ACL password")
parser.add_argument(
"-r", "--root", default="/clickhouse", help="ZooKeeper root path for ClickHouse"
)
parser.add_argument(
"-z",
"--zcroot",
default="clickhouse/zero_copy",
help="ZooKeeper node for new zero-copy data",
)
2021-12-21 14:55:20 +00:00
parser.add_argument(
"--dryrun",
default=False,
action="store_true",
help="Do not perform any actions",
)
2021-12-20 17:24:33 +00:00
parser.add_argument(
"--cleanup", default=False, action="store_true", help="Clean old nodes"
)
parser.add_argument(
"-v", "--verbose", action="store_true", default=False, help="Verbose mode"
)
return parser.parse_args()
2021-12-21 14:55:20 +00:00
# Several folders to heuristic that zookeepr node is folder node
2021-12-20 17:24:33 +00:00
# May be false positive when someone creates set of tables with same paths
table_nodes = [
"alter_partition_version",
"block_numbers",
"blocks",
"columns",
"leader_election",
]
zc_nodes = ["zero_copy_s3", "zero_copy_hdfs"]
def convert_node(client, args, path, zc_node):
base_path = f"{path}/{zc_node}/shared"
parts = client.get_children(base_path)
table_id_path = f"{path}/table_shared_id"
2021-12-20 17:24:33 +00:00
table_id = ""
if client.exists(table_id_path):
table_id = client.get(table_id_path)[0].decode("UTF-8")
else:
table_id = str(uuid.uuid4())
if args.verbose:
print(f'Make table_id "{table_id_path}" = "{table_id}"')
if not args.dryrun:
client.create(table_id_path, bytes(table_id, "UTF-8"))
for part in parts:
part_path = f"{base_path}/{part}"
uniq_ids = client.get_children(part_path)
for uniq_id in uniq_ids:
uniq_path = f"{part_path}/{uniq_id}"
replicas = client.get_children(uniq_path)
for replica in replicas:
replica_path = f"{uniq_path}/{replica}"
new_path = f"{args.root}/{args.zcroot}/{zc_node}/{table_id}/{part}/{uniq_id}/{replica}"
if not client.exists(new_path):
if args.verbose:
print(f'Make node "{new_path}"')
if not args.dryrun:
client.ensure_path(
f"{args.root}/{args.zcroot}/{zc_node}/{table_id}/{part}/{uniq_id}"
)
client.create(new_path, value=b"lock")
if args.cleanup:
if args.verbose:
print(f'Remove node "{replica_path}"')
if not args.dryrun:
client.delete(replica_path)
if args.cleanup and not args.dryrun:
client.delete(uniq_path)
if args.cleanup and not args.dryrun:
client.delete(part_path)
if args.cleanup and not args.dryrun:
client.delete(base_path)
client.delete(f"{path}/{zc_node}")
def convert_table(client, args, path, nodes):
print(f'Convert table nodes by path "{path}"')
for zc_node in zc_nodes:
if zc_node in nodes:
convert_node(client, args, path, zc_node)
def is_like_a_table(nodes):
for tn in table_nodes:
if tn not in nodes:
return False
return True
def scan_recursive(client, args, path):
nodes = client.get_children(path)
if is_like_a_table(nodes):
convert_table(client, args, path, nodes)
else:
for node in nodes:
scan_recursive(client, args, f"{path}/{node}")
def scan(client, args):
nodes = client.get_children(args.root)
for node in nodes:
if node != args.zcroot:
scan_recursive(client, args, f"{args.root}/{node}")
def get_client(args):
client = KazooClient(
connection_retry=3,
command_retry=3,
timeout=1,
hosts=args.hosts,
use_ssl=args.secure,
certfile=args.cert,
keyfile=args.key,
ca=args.ca,
)
client.start()
if args.user and args.password:
client.add_auth("digest", f"{args.user}:{args.password}")
return client
def main():
args = parse_args()
client = get_client(args)
scan(client, args)
if __name__ == "__main__":
main()