diff --git a/docker/test/stateless/Dockerfile b/docker/test/stateless/Dockerfile index 5a655a3fd2b..a0e5513a3a2 100644 --- a/docker/test/stateless/Dockerfile +++ b/docker/test/stateless/Dockerfile @@ -86,6 +86,7 @@ RUN curl -L --no-verbose -O 'https://archive.apache.org/dist/hadoop/common/hadoo ENV MINIO_ROOT_USER="clickhouse" ENV MINIO_ROOT_PASSWORD="clickhouse" ENV EXPORT_S3_STORAGE_POLICIES=1 +ENV CLICKHOUSE_GRPC_CLIENT="/usr/share/clickhouse-utils/grpc-client/clickhouse-grpc-client.py" RUN npm install -g azurite@3.30.0 \ && npm install -g tslib && npm install -g node diff --git a/docker/test/stateless/requirements.txt b/docker/test/stateless/requirements.txt index 3284107e24e..74860d5fec3 100644 --- a/docker/test/stateless/requirements.txt +++ b/docker/test/stateless/requirements.txt @@ -8,6 +8,7 @@ cryptography==3.4.8 dbus-python==1.2.18 distro==1.7.0 docutils==0.17.1 +grpcio==1.47.0 gyp==0.1 httplib2==0.20.2 idna==3.3 @@ -28,6 +29,7 @@ packaging==24.1 pandas==1.5.3 pip==24.1.1 pipdeptree==2.23.0 +protobuf==4.25.3 pyarrow==15.0.0 pyasn1==0.4.8 PyJWT==2.3.0 diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 8653af51308..2728f953bea 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -996,6 +996,10 @@ void ZooKeeper::receiveEvent() if (request_info.callback) request_info.callback(*response); + + /// Finalize current session if we receive a hardware error from ZooKeeper + if (err != Error::ZOK && isHardwareError(err)) + finalize(/*error_send*/ false, /*error_receive*/ true, fmt::format("Hardware error: {}", err)); } diff --git a/tests/ci/ci.py b/tests/ci/ci.py index fac50d30022..32b87698395 100644 --- a/tests/ci/ci.py +++ b/tests/ci/ci.py @@ -995,6 +995,10 @@ def main() -> int: ci_settings, args.skip_jobs, ) + + if IS_CI and pr_info.is_pr: + ci_cache.filter_out_not_affected_jobs() + ci_cache.print_status() if IS_CI and not pr_info.is_merge_queue: diff --git a/tests/ci/ci_cache.py b/tests/ci/ci_cache.py index 8ee0ae54385..291ed56aeea 100644 --- a/tests/ci/ci_cache.py +++ b/tests/ci/ci_cache.py @@ -674,6 +674,78 @@ class CiCache: bucket=S3_BUILDS_BUCKET, file_path=result_json_path, s3_path=s3_path ) + def filter_out_not_affected_jobs(self): + """ + Filter is to be applied in PRs to remove jobs that are not affected by the change + It removes jobs from @jobs_to_do if it is a: + 1. test job and it is in @jobs_to_wait (no need to wait not affected jobs in PRs) + 2. test job and it has finished on release branch (even if failed) + 2. build job which is not required by any test job that is left in @jobs_to_do + + :return: + """ + # 1. + remove_from_await_list = [] + for job_name, job_config in self.jobs_to_wait.items(): + if CI.is_test_job(job_name) and job_name != CI.JobNames.BUILD_CHECK: + remove_from_await_list.append(job_name) + for job in remove_from_await_list: + print(f"Filter job [{job}] - test job and not affected by the change") + del self.jobs_to_wait[job] + del self.jobs_to_do[job] + + # 2. + remove_from_to_do = [] + for job_name, job_config in self.jobs_to_do.items(): + if CI.is_test_job(job_name) and job_name != CI.JobNames.BUILD_CHECK: + batches_to_remove = [] + assert job_config.batches is not None + for batch in job_config.batches: + if self.is_failed( + job_name, batch, job_config.num_batches, release_branch=True + ): + print( + f"Filter [{job_name}/{batch}] - not affected by the change (failed on release branch)" + ) + batches_to_remove.append(batch) + for batch in batches_to_remove: + job_config.batches.remove(batch) + if not job_config.batches: + print( + f"Filter [{job_name}] - not affected by the change (failed on release branch)" + ) + remove_from_to_do.append(job_name) + for job in remove_from_to_do: + del self.jobs_to_do[job] + + # 3. + required_builds = [] # type: List[str] + for job_name, job_config in self.jobs_to_do.items(): + if CI.is_test_job(job_name) and job_config.required_builds: + required_builds += job_config.required_builds + required_builds = list(set(required_builds)) + + remove_builds = [] # type: List[str] + has_builds_to_do = False + for job_name, job_config in self.jobs_to_do.items(): + if CI.is_build_job(job_name): + if job_name not in required_builds: + remove_builds.append(job_name) + else: + has_builds_to_do = True + + for build_job in remove_builds: + print( + f"Filter build job [{build_job}] - not affected and not required by test jobs" + ) + del self.jobs_to_do[build_job] + if build_job in self.jobs_to_wait: + del self.jobs_to_wait[build_job] + + if not has_builds_to_do and CI.JobNames.BUILD_CHECK in self.jobs_to_do: + print(f"Filter job [{CI.JobNames.BUILD_CHECK}] - no builds to do") + del self.jobs_to_do[CI.JobNames.BUILD_CHECK] + def await_pending_jobs(self, is_release: bool, dry_run: bool = False) -> None: """ await pending jobs to be finished diff --git a/tests/ci/functional_test_check.py b/tests/ci/functional_test_check.py index 4440d0d332c..ef9f4dc016e 100644 --- a/tests/ci/functional_test_check.py +++ b/tests/ci/functional_test_check.py @@ -108,6 +108,7 @@ def get_run_command( "--privileged " f"{ci_logs_args}" f"--volume={repo_path}/tests:/usr/share/clickhouse-test " + f"--volume={repo_path}/utils/grpc-client:/usr/share/clickhouse-utils/grpc-client " f"{volume_with_broken_test}" f"--volume={result_path}:/test_output " f"--volume={server_log_path}:/var/log/clickhouse-server " diff --git a/tests/ci/lambda_shared_package/lambda_shared/token.py b/tests/ci/lambda_shared_package/lambda_shared/token.py index 9749122bd39..3fb8f10c0e2 100644 --- a/tests/ci/lambda_shared_package/lambda_shared/token.py +++ b/tests/ci/lambda_shared_package/lambda_shared/token.py @@ -1,4 +1,5 @@ """Module to get the token for GitHub""" + from dataclasses import dataclass import json import time diff --git a/tests/ci/test_ci_config.py b/tests/ci/test_ci_config.py index 47247b91858..558faca915e 100644 --- a/tests/ci/test_ci_config.py +++ b/tests/ci/test_ci_config.py @@ -417,7 +417,7 @@ class TestCIConfig(unittest.TestCase): assert not ci_cache.jobs_to_skip assert not ci_cache.jobs_to_wait - # pretend there are pending jobs that we neet to wait + # pretend there are pending jobs that we need to wait ci_cache.jobs_to_wait = dict(ci_cache.jobs_to_do) for job, config in ci_cache.jobs_to_wait.items(): assert not config.pending_batches @@ -489,3 +489,87 @@ class TestCIConfig(unittest.TestCase): self.assertCountEqual( list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip, all_jobs_in_wf ) + + def test_ci_py_filters_not_affected_jobs_in_prs(self): + """ + checks ci.py filters not affected jobs in PRs + """ + settings = CiSettings() + settings.no_ci_cache = True + pr_info = PRInfo(github_event=_TEST_EVENT_JSON) + pr_info.event_type = EventType.PUSH + pr_info.number = 0 + assert pr_info.is_release and not pr_info.is_merge_queue + ci_cache = CIPY._configure_jobs( + S3Helper(), pr_info, settings, skip_jobs=False, dry_run=True + ) + self.assertTrue(not ci_cache.jobs_to_skip, "Must be no jobs in skip list") + all_jobs_in_wf = list(ci_cache.jobs_to_do) + assert not ci_cache.jobs_to_wait + assert not ci_cache.jobs_to_skip + + # pretend there are pending jobs that we need to wait + for job, job_config in ci_cache.jobs_to_do.items(): + ci_cache.jobs_to_wait[job] = job_config + + # remove couple tests from to_wait and + # expect they are preserved in @jobs_to_to along with required package_asan + del ci_cache.jobs_to_wait[CI.JobNames.STATELESS_TEST_ASAN] + del ci_cache.jobs_to_wait[CI.JobNames.INTEGRATION_TEST_TSAN] + del ci_cache.jobs_to_wait[CI.JobNames.STATELESS_TEST_MSAN] + + # pretend we have some batches failed for one of the job from the to_do list + failed_job = CI.JobNames.INTEGRATION_TEST_TSAN + failed_job_config = ci_cache.jobs_to_do[failed_job] + FAILED_BATCHES = [0, 3] + for batch in FAILED_BATCHES: + assert batch < failed_job_config.num_batches + record = CiCache.Record( + record_type=CiCache.RecordType.FAILED, + job_name=failed_job, + job_digest=ci_cache.job_digests[failed_job], + batch=batch, + num_batches=failed_job_config.num_batches, + release_branch=True, + ) + for record_t_, records_ in ci_cache.records.items(): + if record_t_.value == CiCache.RecordType.FAILED.value: + records_[record.to_str_key()] = record + + # pretend we have all batches failed for one of the job from the to_do list + failed_job = CI.JobNames.STATELESS_TEST_MSAN + failed_job_config = ci_cache.jobs_to_do[failed_job] + assert failed_job_config.num_batches > 1 + for batch in range(failed_job_config.num_batches): + record = CiCache.Record( + record_type=CiCache.RecordType.FAILED, + job_name=failed_job, + job_digest=ci_cache.job_digests[failed_job], + batch=batch, + num_batches=failed_job_config.num_batches, + release_branch=True, + ) + for record_t_, records_ in ci_cache.records.items(): + if record_t_.value == CiCache.RecordType.FAILED.value: + records_[record.to_str_key()] = record + + ci_cache.filter_out_not_affected_jobs() + expected_to_do = [ + CI.JobNames.STATELESS_TEST_ASAN, + CI.BuildNames.PACKAGE_ASAN, + CI.JobNames.INTEGRATION_TEST_TSAN, + CI.BuildNames.PACKAGE_TSAN, + CI.JobNames.BUILD_CHECK, + ] + self.assertCountEqual( + list(ci_cache.jobs_to_wait), + [ + CI.BuildNames.PACKAGE_ASAN, + CI.BuildNames.PACKAGE_TSAN, + CI.JobNames.BUILD_CHECK, + ], + ) + self.assertCountEqual(list(ci_cache.jobs_to_do), expected_to_do) + self.assertTrue(ci_cache.jobs_to_do[CI.JobNames.INTEGRATION_TEST_TSAN].batches) + for batch in ci_cache.jobs_to_do[CI.JobNames.INTEGRATION_TEST_TSAN].batches: + self.assertTrue(batch not in FAILED_BATCHES) diff --git a/tests/config/config.d/grpc_protocol.xml b/tests/config/config.d/grpc_protocol.xml new file mode 100644 index 00000000000..b957618120d --- /dev/null +++ b/tests/config/config.d/grpc_protocol.xml @@ -0,0 +1,3 @@ + + 9100 + diff --git a/tests/config/install.sh b/tests/config/install.sh index 8b58a519bc9..1b0edc5fc16 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -27,6 +27,7 @@ ln -sf $SRC_PATH/config.d/secure_ports.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/ +ln -sf $SRC_PATH/config.d/grpc_protocol.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/ diff --git a/tests/queries/0_stateless/03203_grpc_protocol.reference b/tests/queries/0_stateless/03203_grpc_protocol.reference new file mode 100644 index 00000000000..9766475a418 --- /dev/null +++ b/tests/queries/0_stateless/03203_grpc_protocol.reference @@ -0,0 +1 @@ +ok diff --git a/tests/queries/0_stateless/03203_grpc_protocol.sh b/tests/queries/0_stateless/03203_grpc_protocol.sh new file mode 100755 index 00000000000..d51d6382f67 --- /dev/null +++ b/tests/queries/0_stateless/03203_grpc_protocol.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# Tags: no-fasttest +# Tag no-fasttest: In fasttest, ENABLE_LIBRARIES=0, so the grpc library is not built + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +if [[ -z "$CLICKHOUSE_GRPC_CLIENT" ]]; then + CLICKHOUSE_GRPC_CLIENT="$CURDIR/../../../utils/grpc-client/clickhouse-grpc-client.py" +fi + +# Simple test. +$CLICKHOUSE_GRPC_CLIENT --query "SELECT 'ok'" diff --git a/utils/grpc-client/generate_pb2.py b/utils/grpc-client/generate_pb2.py new file mode 100755 index 00000000000..95a39023ed7 --- /dev/null +++ b/utils/grpc-client/generate_pb2.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +# This is a helper utility. +# It generates files in the "pb2" folder using the protocol buffer compiler. +# This script must be called manually after any change pf "clickhouse_grpc.proto" + +import grpc_tools # pip3 install grpcio-tools + +import os, shutil, subprocess + + +# Settings. +script_path = os.path.realpath(__file__) +script_name = os.path.basename(script_path) +script_dir = os.path.dirname(script_path) +root_dir = os.path.abspath(os.path.join(script_dir, "../..")) + +grpc_proto_dir = os.path.abspath(os.path.join(root_dir, "src/Server/grpc_protos")) +grpc_proto_filename = "clickhouse_grpc.proto" + +# Files in the "pb2" folder which will be generated by this script. +pb2_filenames = ["clickhouse_grpc_pb2.py", "clickhouse_grpc_pb2_grpc.py"] +pb2_dir = os.path.join(script_dir, "pb2") + + +# Processes the protobuf schema with the protocol buffer compiler and generates the "pb2" folder. +def generate_pb2(): + print(f"Generating files:") + for pb2_filename in pb2_filenames: + print(os.path.join(pb2_dir, pb2_filename)) + + os.makedirs(pb2_dir, exist_ok=True) + + cmd = [ + "python3", + "-m", + "grpc_tools.protoc", + "-I" + grpc_proto_dir, + "--python_out=" + pb2_dir, + "--grpc_python_out=" + pb2_dir, + os.path.join(grpc_proto_dir, grpc_proto_filename), + ] + subprocess.run(cmd) + + for pb2_filename in pb2_filenames: + assert os.path.exists(os.path.join(pb2_dir, pb2_filename)) + print("Done! (generate_pb2)") + + +# MAIN +if __name__ == "__main__": + generate_pb2() diff --git a/utils/grpc-client/pb2/generate.py b/utils/grpc-client/pb2/generate.py deleted file mode 100755 index 2f4b3bf5af7..00000000000 --- a/utils/grpc-client/pb2/generate.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python3 - -import grpc_tools # pip3 install grpcio-tools - -import os -import subprocess - - -script_dir = os.path.dirname(os.path.realpath(__file__)) -dest_dir = script_dir -src_dir = os.path.abspath(os.path.join(script_dir, "../../../src/Server/grpc_protos")) -src_filename = "clickhouse_grpc.proto" - - -def generate(): - cmd = [ - "python3", - "-m", - "grpc_tools.protoc", - "-I" + src_dir, - "--python_out=" + dest_dir, - "--grpc_python_out=" + dest_dir, - os.path.join(src_dir, src_filename), - ] - subprocess.run(cmd) - - -if __name__ == "__main__": - generate()