ClickHouse/ci/praktika/__main__.py
2024-11-15 12:55:30 +01:00

95 lines
3.0 KiB
Python

import argparse
import sys
from praktika.html_prepare import Html
from praktika.utils import Utils
from praktika.validator import Validator
from praktika.yaml_generator import YamlGenerator
def create_parser():
parser = argparse.ArgumentParser(prog="python3 -m praktika")
subparsers = parser.add_subparsers(dest="command", help="Available subcommands")
run_parser = subparsers.add_parser("run", help="Job Runner")
run_parser.add_argument("--job", help="Job Name", type=str, required=True)
run_parser.add_argument(
"--workflow",
help="Workflow Name (required if job name is not uniq per config)",
type=str,
default="",
)
run_parser.add_argument(
"--no-docker",
help="Do not run job in docker even if job config says so, for local test",
action="store_true",
)
run_parser.add_argument(
"--docker",
help="Custom docker image for job run, for local test",
type=str,
default="",
)
run_parser.add_argument(
"--param",
help="Custom parameter to pass into a job script, it's up to job script how to use it, for local test",
type=str,
default=None,
)
run_parser.add_argument(
"--ci",
help="When not set - dummy env will be generated, for local test",
action="store_true",
default="",
)
_yaml_parser = subparsers.add_parser("yaml", help="Generates Yaml Workflows")
_html_parser = subparsers.add_parser("html", help="Uploads HTML page for reports")
return parser
if __name__ == "__main__":
parser = create_parser()
args = parser.parse_args()
if args.command == "yaml":
Validator().validate()
YamlGenerator().generate()
elif args.command == "html":
Html.prepare()
elif args.command == "run":
from praktika.mangle import _get_workflows
from praktika.runner import Runner
workflows = _get_workflows(name=args.workflow or None)
job_workflow_pairs = []
for workflow in workflows:
job = workflow.find_job(args.job, lazy=True)
if job:
job_workflow_pairs.append((job, workflow))
if not job_workflow_pairs:
Utils.raise_with_error(
f"Failed to find job [{args.job}] workflow [{args.workflow}]"
)
elif len(job_workflow_pairs) > 1:
Utils.raise_with_error(
f"More than one job [{args.job}] found - try specifying workflow name with --workflow"
)
else:
job, workflow = job_workflow_pairs[0][0], job_workflow_pairs[0][1]
print(f"Going to run job [{job.name}], workflow [{workflow.name}]")
Runner().run(
workflow=workflow,
job=job,
docker=args.docker,
dummy_env=not args.ci,
no_docker=args.no_docker,
param=args.param,
)
else:
parser.print_help()
sys.exit(1)