diff --git a/docker/test/stateless/Dockerfile b/docker/test/stateless/Dockerfile index 5a655a3fd2b..a0e5513a3a2 100644 --- a/docker/test/stateless/Dockerfile +++ b/docker/test/stateless/Dockerfile @@ -86,6 +86,7 @@ RUN curl -L --no-verbose -O 'https://archive.apache.org/dist/hadoop/common/hadoo ENV MINIO_ROOT_USER="clickhouse" ENV MINIO_ROOT_PASSWORD="clickhouse" ENV EXPORT_S3_STORAGE_POLICIES=1 +ENV CLICKHOUSE_GRPC_CLIENT="/usr/share/clickhouse-utils/grpc-client/clickhouse-grpc-client.py" RUN npm install -g azurite@3.30.0 \ && npm install -g tslib && npm install -g node diff --git a/docker/test/stateless/requirements.txt b/docker/test/stateless/requirements.txt index 3284107e24e..74860d5fec3 100644 --- a/docker/test/stateless/requirements.txt +++ b/docker/test/stateless/requirements.txt @@ -8,6 +8,7 @@ cryptography==3.4.8 dbus-python==1.2.18 distro==1.7.0 docutils==0.17.1 +grpcio==1.47.0 gyp==0.1 httplib2==0.20.2 idna==3.3 @@ -28,6 +29,7 @@ packaging==24.1 pandas==1.5.3 pip==24.1.1 pipdeptree==2.23.0 +protobuf==4.25.3 pyarrow==15.0.0 pyasn1==0.4.8 PyJWT==2.3.0 diff --git a/docker/test/stateless/run.sh b/docker/test/stateless/run.sh index 5f2cb95de75..2d32d188561 100755 --- a/docker/test/stateless/run.sh +++ b/docker/test/stateless/run.sh @@ -6,8 +6,8 @@ source /setup_export_logs.sh # fail on errors, verbose and export all env variables set -e -x -a -MAX_RUN_TIME=${MAX_RUN_TIME:-10800} -MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 10800 : MAX_RUN_TIME)) +MAX_RUN_TIME=${MAX_RUN_TIME:-7200} +MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 7200 : MAX_RUN_TIME)) USE_DATABASE_REPLICATED=${USE_DATABASE_REPLICATED:=0} USE_SHARED_CATALOG=${USE_SHARED_CATALOG:=0} @@ -320,7 +320,7 @@ export -f run_tests # This should be enough to setup job and collect artifacts -TIMEOUT=$((MAX_RUN_TIME - 600)) +TIMEOUT=$((MAX_RUN_TIME - 700)) if [ "$NUM_TRIES" -gt "1" ]; then # We don't run tests with Ordinary database in PRs, only in master. # So run new/changed tests with Ordinary at least once in flaky check. diff --git a/docker/test/util/process_functional_tests_result.py b/docker/test/util/process_functional_tests_result.py index fd4cc9f4bf7..4442c9d7d9e 100755 --- a/docker/test/util/process_functional_tests_result.py +++ b/docker/test/util/process_functional_tests_result.py @@ -11,6 +11,7 @@ TIMEOUT_SIGN = "[ Timeout! " UNKNOWN_SIGN = "[ UNKNOWN " SKIPPED_SIGN = "[ SKIPPED " HUNG_SIGN = "Found hung queries in processlist" +SERVER_DIED_SIGN = "Server died, terminating all processes" DATABASE_SIGN = "Database: " SUCCESS_FINISH_SIGNS = ["All tests have finished", "No tests were run"] @@ -25,6 +26,7 @@ def process_test_log(log_path, broken_tests): failed = 0 success = 0 hung = False + server_died = False retries = False success_finish = False test_results = [] @@ -41,6 +43,8 @@ def process_test_log(log_path, broken_tests): if HUNG_SIGN in line: hung = True break + if SERVER_DIED_SIGN in line: + server_died = True if RETRIES_SIGN in line: retries = True if any( @@ -123,6 +127,7 @@ def process_test_log(log_path, broken_tests): failed, success, hung, + server_died, success_finish, retries, test_results, @@ -150,6 +155,7 @@ def process_result(result_path, broken_tests): failed, success, hung, + server_died, success_finish, retries, test_results, @@ -165,6 +171,10 @@ def process_result(result_path, broken_tests): description = "Some queries hung, " state = "failure" test_results.append(("Some queries hung", "FAIL", "0", "")) + elif server_died: + description = "Server died, " + state = "failure" + test_results.append(("Server died", "FAIL", "0", "")) elif not success_finish: description = "Tests are not finished, " state = "failure" @@ -218,5 +228,20 @@ if __name__ == "__main__": state, description, test_results = process_result(args.in_results_dir, broken_tests) logging.info("Result parsed") status = (state, description) + + def test_result_comparator(item): + # sort by status then by check name + order = { + "FAIL": 0, + "Timeout": 1, + "NOT_FAILED": 2, + "BROKEN": 3, + "OK": 4, + "SKIPPED": 5, + } + return order.get(item[1], 10), str(item[0]), item[1] + + test_results.sort(key=test_result_comparator) + write_results(args.out_results_file, args.out_status_file, test_results, status) logging.info("Result written") diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 8653af51308..2728f953bea 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -996,6 +996,10 @@ void ZooKeeper::receiveEvent() if (request_info.callback) request_info.callback(*response); + + /// Finalize current session if we receive a hardware error from ZooKeeper + if (err != Error::ZOK && isHardwareError(err)) + finalize(/*error_send*/ false, /*error_receive*/ true, fmt::format("Hardware error: {}", err)); } diff --git a/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp b/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp index 128df415197..c10a7cd017a 100644 --- a/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp +++ b/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp @@ -49,7 +49,7 @@ namespace const String & dest_blob_, std::shared_ptr settings_, ThreadPoolCallbackRunnerUnsafe schedule_, - const Poco::Logger * log_) + LoggerPtr log_) : create_read_buffer(create_read_buffer_) , client(client_) , offset (offset_) @@ -74,7 +74,7 @@ namespace const String & dest_blob; std::shared_ptr settings; ThreadPoolCallbackRunnerUnsafe schedule; - const Poco::Logger * log; + const LoggerPtr log; size_t max_single_part_upload_size; struct UploadPartTask @@ -83,7 +83,6 @@ namespace size_t part_size; std::vector block_ids; bool is_finished = false; - std::exception_ptr exception; }; size_t normal_part_size; @@ -92,6 +91,7 @@ namespace std::list TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks; int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; + std::exception_ptr bg_exception TSA_GUARDED_BY(bg_tasks_mutex); std::mutex bg_tasks_mutex; std::condition_variable bg_tasks_condvar; @@ -186,7 +186,7 @@ namespace } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, fmt::format("While performing multipart upload of blob {} in container {}", dest_blob, dest_container_for_logging)); waitForAllBackgroundTasks(); throw; } @@ -242,7 +242,12 @@ namespace } catch (...) { - task->exception = std::current_exception(); + std::lock_guard lock(bg_tasks_mutex); + if (!bg_exception) + { + tryLogCurrentException(log, "While writing part"); + bg_exception = std::current_exception(); /// The exception will be rethrown after all background tasks stop working. + } } task_finish_notify(); }, Priority{}); @@ -299,13 +304,13 @@ namespace /// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); }); - auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(bg_tasks); - for (auto & task : tasks) - { - if (task.exception) - std::rethrow_exception(task.exception); + auto exception = TSA_SUPPRESS_WARNING_FOR_READ(bg_exception); + if (exception) + std::rethrow_exception(exception); + + const auto & tasks = TSA_SUPPRESS_WARNING_FOR_READ(bg_tasks); + for (const auto & task : tasks) block_ids.insert(block_ids.end(),task.block_ids.begin(), task.block_ids.end()); - } } }; } @@ -321,7 +326,8 @@ void copyDataToAzureBlobStorageFile( std::shared_ptr settings, ThreadPoolCallbackRunnerUnsafe schedule) { - UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyDataToAzureBlobStorageFile")}; + auto log = getLogger("copyDataToAzureBlobStorageFile"); + UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, log}; helper.performCopy(); } @@ -339,9 +345,11 @@ void copyAzureBlobStorageFile( const ReadSettings & read_settings, ThreadPoolCallbackRunnerUnsafe schedule) { + auto log = getLogger("copyAzureBlobStorageFile"); + if (settings->use_native_copy) { - LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob); + LOG_TRACE(log, "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob); ProfileEvents::increment(ProfileEvents::AzureCopyObject); if (dest_client->GetClickhouseOptions().IsClientForDisk) ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject); @@ -352,7 +360,7 @@ void copyAzureBlobStorageFile( if (size < settings->max_single_part_copy_size) { - LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy blob sync {} -> {}", src_blob, dest_blob); + LOG_TRACE(log, "Copy blob sync {} -> {}", src_blob, dest_blob); block_blob_client_dest.CopyFromUri(source_uri); } else @@ -368,7 +376,7 @@ void copyAzureBlobStorageFile( if (copy_status.HasValue() && copy_status.Value() == Azure::Storage::Blobs::Models::CopyStatus::Success) { - LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy of {} to {} finished", properties_model.CopySource.Value(), dest_blob); + LOG_TRACE(log, "Copy of {} to {} finished", properties_model.CopySource.Value(), dest_blob); } else { @@ -382,14 +390,14 @@ void copyAzureBlobStorageFile( } else { - LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob); + LOG_TRACE(log, "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob); auto create_read_buffer = [&] { return std::make_unique( src_client, src_blob, read_settings, settings->max_single_read_retries, settings->max_single_download_retries); }; - UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyAzureBlobStorageFile")}; + UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, log}; helper.performCopy(); } } diff --git a/src/IO/S3/copyS3File.cpp b/src/IO/S3/copyS3File.cpp index bb654c3f5c9..0b3e5e50f3d 100644 --- a/src/IO/S3/copyS3File.cpp +++ b/src/IO/S3/copyS3File.cpp @@ -98,7 +98,6 @@ namespace size_t part_size; String tag; bool is_finished = false; - std::exception_ptr exception; }; size_t num_parts; @@ -111,6 +110,7 @@ namespace size_t num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; size_t num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; size_t num_finished_parts TSA_GUARDED_BY(bg_tasks_mutex) = 0; + std::exception_ptr bg_exception TSA_GUARDED_BY(bg_tasks_mutex); std::mutex bg_tasks_mutex; std::condition_variable bg_tasks_condvar; @@ -273,7 +273,7 @@ namespace } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, fmt::format("While performing multipart upload of {}", dest_key)); // Multipart upload failed because it wasn't possible to schedule all the tasks. // To avoid execution of already scheduled tasks we abort MultipartUpload. abortMultipartUpload(); @@ -385,7 +385,12 @@ namespace } catch (...) { - task->exception = std::current_exception(); + std::lock_guard lock(bg_tasks_mutex); + if (!bg_exception) + { + tryLogCurrentException(log, fmt::format("While writing part #{}", task->part_number)); + bg_exception = std::current_exception(); /// The exception will be rethrown after all background tasks stop working. + } } task_finish_notify(); }, Priority{}); @@ -435,22 +440,21 @@ namespace /// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); }); - auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(bg_tasks); - for (auto & task : tasks) + auto exception = TSA_SUPPRESS_WARNING_FOR_READ(bg_exception); + if (exception) { - if (task.exception) - { - /// abortMultipartUpload() might be called already, see processUploadPartRequest(). - /// However if there were concurrent uploads at that time, those part uploads might or might not succeed. - /// As a result, it might be necessary to abort a given multipart upload multiple times in order to completely free - /// all storage consumed by all parts. - abortMultipartUpload(); + /// abortMultipartUpload() might be called already, see processUploadPartRequest(). + /// However if there were concurrent uploads at that time, those part uploads might or might not succeed. + /// As a result, it might be necessary to abort a given multipart upload multiple times in order to completely free + /// all storage consumed by all parts. + abortMultipartUpload(); - std::rethrow_exception(task.exception); - } - - part_tags.push_back(task.tag); + std::rethrow_exception(exception); } + + const auto & tasks = TSA_SUPPRESS_WARNING_FOR_READ(bg_tasks); + for (const auto & task : tasks) + part_tags.push_back(task.tag); } }; diff --git a/tests/ci/ci.py b/tests/ci/ci.py index fac50d30022..32b87698395 100644 --- a/tests/ci/ci.py +++ b/tests/ci/ci.py @@ -995,6 +995,10 @@ def main() -> int: ci_settings, args.skip_jobs, ) + + if IS_CI and pr_info.is_pr: + ci_cache.filter_out_not_affected_jobs() + ci_cache.print_status() if IS_CI and not pr_info.is_merge_queue: diff --git a/tests/ci/ci_cache.py b/tests/ci/ci_cache.py index 8ee0ae54385..291ed56aeea 100644 --- a/tests/ci/ci_cache.py +++ b/tests/ci/ci_cache.py @@ -674,6 +674,78 @@ class CiCache: bucket=S3_BUILDS_BUCKET, file_path=result_json_path, s3_path=s3_path ) + def filter_out_not_affected_jobs(self): + """ + Filter is to be applied in PRs to remove jobs that are not affected by the change + It removes jobs from @jobs_to_do if it is a: + 1. test job and it is in @jobs_to_wait (no need to wait not affected jobs in PRs) + 2. test job and it has finished on release branch (even if failed) + 2. build job which is not required by any test job that is left in @jobs_to_do + + :return: + """ + # 1. + remove_from_await_list = [] + for job_name, job_config in self.jobs_to_wait.items(): + if CI.is_test_job(job_name) and job_name != CI.JobNames.BUILD_CHECK: + remove_from_await_list.append(job_name) + for job in remove_from_await_list: + print(f"Filter job [{job}] - test job and not affected by the change") + del self.jobs_to_wait[job] + del self.jobs_to_do[job] + + # 2. + remove_from_to_do = [] + for job_name, job_config in self.jobs_to_do.items(): + if CI.is_test_job(job_name) and job_name != CI.JobNames.BUILD_CHECK: + batches_to_remove = [] + assert job_config.batches is not None + for batch in job_config.batches: + if self.is_failed( + job_name, batch, job_config.num_batches, release_branch=True + ): + print( + f"Filter [{job_name}/{batch}] - not affected by the change (failed on release branch)" + ) + batches_to_remove.append(batch) + for batch in batches_to_remove: + job_config.batches.remove(batch) + if not job_config.batches: + print( + f"Filter [{job_name}] - not affected by the change (failed on release branch)" + ) + remove_from_to_do.append(job_name) + for job in remove_from_to_do: + del self.jobs_to_do[job] + + # 3. + required_builds = [] # type: List[str] + for job_name, job_config in self.jobs_to_do.items(): + if CI.is_test_job(job_name) and job_config.required_builds: + required_builds += job_config.required_builds + required_builds = list(set(required_builds)) + + remove_builds = [] # type: List[str] + has_builds_to_do = False + for job_name, job_config in self.jobs_to_do.items(): + if CI.is_build_job(job_name): + if job_name not in required_builds: + remove_builds.append(job_name) + else: + has_builds_to_do = True + + for build_job in remove_builds: + print( + f"Filter build job [{build_job}] - not affected and not required by test jobs" + ) + del self.jobs_to_do[build_job] + if build_job in self.jobs_to_wait: + del self.jobs_to_wait[build_job] + + if not has_builds_to_do and CI.JobNames.BUILD_CHECK in self.jobs_to_do: + print(f"Filter job [{CI.JobNames.BUILD_CHECK}] - no builds to do") + del self.jobs_to_do[CI.JobNames.BUILD_CHECK] + def await_pending_jobs(self, is_release: bool, dry_run: bool = False) -> None: """ await pending jobs to be finished diff --git a/tests/ci/ci_definitions.py b/tests/ci/ci_definitions.py index 48e1280d939..4ae252560e9 100644 --- a/tests/ci/ci_definitions.py +++ b/tests/ci/ci_definitions.py @@ -378,7 +378,7 @@ class CommonJobConfigs: ), run_command='functional_test_check.py "$CHECK_NAME"', runner_type=Runners.FUNC_TESTER, - timeout=10800, + timeout=7200, ) STATEFUL_TEST = JobConfig( job_name_keyword="stateful", diff --git a/tests/ci/functional_test_check.py b/tests/ci/functional_test_check.py index 4440d0d332c..ef9f4dc016e 100644 --- a/tests/ci/functional_test_check.py +++ b/tests/ci/functional_test_check.py @@ -108,6 +108,7 @@ def get_run_command( "--privileged " f"{ci_logs_args}" f"--volume={repo_path}/tests:/usr/share/clickhouse-test " + f"--volume={repo_path}/utils/grpc-client:/usr/share/clickhouse-utils/grpc-client " f"{volume_with_broken_test}" f"--volume={result_path}:/test_output " f"--volume={server_log_path}:/var/log/clickhouse-server " diff --git a/tests/ci/lambda_shared_package/lambda_shared/token.py b/tests/ci/lambda_shared_package/lambda_shared/token.py index 9749122bd39..3fb8f10c0e2 100644 --- a/tests/ci/lambda_shared_package/lambda_shared/token.py +++ b/tests/ci/lambda_shared_package/lambda_shared/token.py @@ -1,4 +1,5 @@ """Module to get the token for GitHub""" + from dataclasses import dataclass import json import time diff --git a/tests/ci/test_ci_config.py b/tests/ci/test_ci_config.py index 47247b91858..558faca915e 100644 --- a/tests/ci/test_ci_config.py +++ b/tests/ci/test_ci_config.py @@ -417,7 +417,7 @@ class TestCIConfig(unittest.TestCase): assert not ci_cache.jobs_to_skip assert not ci_cache.jobs_to_wait - # pretend there are pending jobs that we neet to wait + # pretend there are pending jobs that we need to wait ci_cache.jobs_to_wait = dict(ci_cache.jobs_to_do) for job, config in ci_cache.jobs_to_wait.items(): assert not config.pending_batches @@ -489,3 +489,87 @@ class TestCIConfig(unittest.TestCase): self.assertCountEqual( list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip, all_jobs_in_wf ) + + def test_ci_py_filters_not_affected_jobs_in_prs(self): + """ + checks ci.py filters not affected jobs in PRs + """ + settings = CiSettings() + settings.no_ci_cache = True + pr_info = PRInfo(github_event=_TEST_EVENT_JSON) + pr_info.event_type = EventType.PUSH + pr_info.number = 0 + assert pr_info.is_release and not pr_info.is_merge_queue + ci_cache = CIPY._configure_jobs( + S3Helper(), pr_info, settings, skip_jobs=False, dry_run=True + ) + self.assertTrue(not ci_cache.jobs_to_skip, "Must be no jobs in skip list") + all_jobs_in_wf = list(ci_cache.jobs_to_do) + assert not ci_cache.jobs_to_wait + assert not ci_cache.jobs_to_skip + + # pretend there are pending jobs that we need to wait + for job, job_config in ci_cache.jobs_to_do.items(): + ci_cache.jobs_to_wait[job] = job_config + + # remove couple tests from to_wait and + # expect they are preserved in @jobs_to_to along with required package_asan + del ci_cache.jobs_to_wait[CI.JobNames.STATELESS_TEST_ASAN] + del ci_cache.jobs_to_wait[CI.JobNames.INTEGRATION_TEST_TSAN] + del ci_cache.jobs_to_wait[CI.JobNames.STATELESS_TEST_MSAN] + + # pretend we have some batches failed for one of the job from the to_do list + failed_job = CI.JobNames.INTEGRATION_TEST_TSAN + failed_job_config = ci_cache.jobs_to_do[failed_job] + FAILED_BATCHES = [0, 3] + for batch in FAILED_BATCHES: + assert batch < failed_job_config.num_batches + record = CiCache.Record( + record_type=CiCache.RecordType.FAILED, + job_name=failed_job, + job_digest=ci_cache.job_digests[failed_job], + batch=batch, + num_batches=failed_job_config.num_batches, + release_branch=True, + ) + for record_t_, records_ in ci_cache.records.items(): + if record_t_.value == CiCache.RecordType.FAILED.value: + records_[record.to_str_key()] = record + + # pretend we have all batches failed for one of the job from the to_do list + failed_job = CI.JobNames.STATELESS_TEST_MSAN + failed_job_config = ci_cache.jobs_to_do[failed_job] + assert failed_job_config.num_batches > 1 + for batch in range(failed_job_config.num_batches): + record = CiCache.Record( + record_type=CiCache.RecordType.FAILED, + job_name=failed_job, + job_digest=ci_cache.job_digests[failed_job], + batch=batch, + num_batches=failed_job_config.num_batches, + release_branch=True, + ) + for record_t_, records_ in ci_cache.records.items(): + if record_t_.value == CiCache.RecordType.FAILED.value: + records_[record.to_str_key()] = record + + ci_cache.filter_out_not_affected_jobs() + expected_to_do = [ + CI.JobNames.STATELESS_TEST_ASAN, + CI.BuildNames.PACKAGE_ASAN, + CI.JobNames.INTEGRATION_TEST_TSAN, + CI.BuildNames.PACKAGE_TSAN, + CI.JobNames.BUILD_CHECK, + ] + self.assertCountEqual( + list(ci_cache.jobs_to_wait), + [ + CI.BuildNames.PACKAGE_ASAN, + CI.BuildNames.PACKAGE_TSAN, + CI.JobNames.BUILD_CHECK, + ], + ) + self.assertCountEqual(list(ci_cache.jobs_to_do), expected_to_do) + self.assertTrue(ci_cache.jobs_to_do[CI.JobNames.INTEGRATION_TEST_TSAN].batches) + for batch in ci_cache.jobs_to_do[CI.JobNames.INTEGRATION_TEST_TSAN].batches: + self.assertTrue(batch not in FAILED_BATCHES) diff --git a/tests/clickhouse-test b/tests/clickhouse-test index 77e984aa960..3f136478271 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -1751,7 +1751,7 @@ class TestCase: return TestResult( self.name, TestStatus.FAIL, - FailureReason.INTERNAL_QUERY_FAIL, + FailureReason.TIMEOUT, total_time, self.add_info_about_settings( self.get_description_from_exception_info(sys.exc_info()) @@ -2190,11 +2190,26 @@ def run_tests_array(all_tests_with_params: Tuple[List[str], int, TestSuite, bool sys.stdout.flush() while True: - test_result = test_case.run( - args, test_suite, client_options, server_logs_level - ) - test_result = test_case.process_result(test_result, MESSAGES) - if not test_result.need_retry: + # This is the upper level timeout + # It helps with completely frozen processes, like in case of gdb errors + def timeout_handler(signum, frame): + raise TimeoutError("Test execution timed out") + + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(int(args.timeout * 1.1)) + test_result = None + try: + test_result = test_case.run( + args, test_suite, client_options, server_logs_level + ) + test_result = test_case.process_result(test_result, MESSAGES) + break + except TimeoutError: + break + finally: + signal.alarm(0) + + if not test_result or not test_result.need_retry: break restarted_tests.append(test_result) @@ -2453,6 +2468,10 @@ def override_envs(*args_, **kwargs): run_tests_array(*args_, **kwargs) +def run_tests_process(*args, **kwargs): + return run_tests_array(*args, **kwargs) + + def do_run_tests(jobs, test_suite: TestSuite): if jobs > 1 and len(test_suite.parallel_tests) > 0: print( @@ -2476,39 +2495,70 @@ def do_run_tests(jobs, test_suite: TestSuite): # of failures will be nearly the same for all tests from the group. random.shuffle(test_suite.parallel_tests) - batch_size = max(1, len(test_suite.parallel_tests) // jobs) + batch_size = max(1, (len(test_suite.parallel_tests) // jobs) + 1) parallel_tests_array = [] for job in range(jobs): range_ = job * batch_size, job * batch_size + batch_size batch = test_suite.parallel_tests[range_[0] : range_[1]] parallel_tests_array.append((batch, batch_size, test_suite, True)) - try: - with multiprocessing.Pool(processes=jobs + 1) as pool: - future = pool.map_async(run_tests_array, parallel_tests_array) + processes = [] - if args.run_sequential_tests_in_parallel: - # Run parallel tests and sequential tests at the same time - # Sequential tests will use different ClickHouse instance - # In this process we can safely override values in `args` and `os.environ` - future_seq = pool.map_async( - override_envs, - [ - ( - test_suite.sequential_tests, - len(test_suite.sequential_tests), - test_suite, - False, - ) - ], - ) - future_seq.wait() + for test_batch in parallel_tests_array: + process = multiprocessing.Process( + target=run_tests_process, args=(test_batch,) + ) + processes.append(process) + process.start() - future.wait() - finally: - pool.terminate() - pool.close() - pool.join() + if args.run_sequential_tests_in_parallel: + # Run parallel tests and sequential tests at the same time + # Sequential tests will use different ClickHouse instance + # In this process we can safely override values in `args` and `os.environ` + process = multiprocessing.Process( + target=override_envs, + args=( + ( + test_suite.sequential_tests, + len(test_suite.sequential_tests), + test_suite, + False, + ), + ), + ) + processes.append(process) + process.start() + + while processes: + sys.stdout.flush() + # Periodically check the server for hangs + # and stop all processes in this case + try: + clickhouse_execute( + args, + query="SELECT 1 /*hang up check*/", + max_http_retries=5, + timeout=20, + ) + except Exception: + print("Hang up check failed") + server_died.set() + + if server_died.is_set(): + print("Server died, terminating all processes...") + kill_gdb_if_any() + # Wait for test results + sleep(args.timeout) + for p in processes: + if p.is_alive(): + p.terminate() + break + + for p in processes[:]: + if not p.is_alive(): + processes.remove(p) + + sleep(5) if not args.run_sequential_tests_in_parallel: run_tests_array( @@ -3359,6 +3409,14 @@ def parse_args(): return parser.parse_args() +class Terminated(KeyboardInterrupt): + pass + + +def signal_handler(sig, frame): + raise Terminated(f"Terminated with {sig} signal") + + if __name__ == "__main__": stop_time = None exit_code = multiprocessing.Value("i", 0) @@ -3370,6 +3428,9 @@ if __name__ == "__main__": # infinite tests processes left # (new process group is required to avoid killing some parent processes) os.setpgid(0, 0) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGHUP, signal_handler) try: args = parse_args() diff --git a/tests/config/config.d/grpc_protocol.xml b/tests/config/config.d/grpc_protocol.xml new file mode 100644 index 00000000000..b957618120d --- /dev/null +++ b/tests/config/config.d/grpc_protocol.xml @@ -0,0 +1,3 @@ + + 9100 + diff --git a/tests/config/install.sh b/tests/config/install.sh index 8b58a519bc9..1b0edc5fc16 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -27,6 +27,7 @@ ln -sf $SRC_PATH/config.d/secure_ports.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/ +ln -sf $SRC_PATH/config.d/grpc_protocol.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/ diff --git a/tests/queries/0_stateless/03203_grpc_protocol.reference b/tests/queries/0_stateless/03203_grpc_protocol.reference new file mode 100644 index 00000000000..9766475a418 --- /dev/null +++ b/tests/queries/0_stateless/03203_grpc_protocol.reference @@ -0,0 +1 @@ +ok diff --git a/tests/queries/0_stateless/03203_grpc_protocol.sh b/tests/queries/0_stateless/03203_grpc_protocol.sh new file mode 100755 index 00000000000..d51d6382f67 --- /dev/null +++ b/tests/queries/0_stateless/03203_grpc_protocol.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# Tags: no-fasttest +# Tag no-fasttest: In fasttest, ENABLE_LIBRARIES=0, so the grpc library is not built + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +if [[ -z "$CLICKHOUSE_GRPC_CLIENT" ]]; then + CLICKHOUSE_GRPC_CLIENT="$CURDIR/../../../utils/grpc-client/clickhouse-grpc-client.py" +fi + +# Simple test. +$CLICKHOUSE_GRPC_CLIENT --query "SELECT 'ok'" diff --git a/utils/grpc-client/generate_pb2.py b/utils/grpc-client/generate_pb2.py new file mode 100755 index 00000000000..95a39023ed7 --- /dev/null +++ b/utils/grpc-client/generate_pb2.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +# This is a helper utility. +# It generates files in the "pb2" folder using the protocol buffer compiler. +# This script must be called manually after any change pf "clickhouse_grpc.proto" + +import grpc_tools # pip3 install grpcio-tools + +import os, shutil, subprocess + + +# Settings. +script_path = os.path.realpath(__file__) +script_name = os.path.basename(script_path) +script_dir = os.path.dirname(script_path) +root_dir = os.path.abspath(os.path.join(script_dir, "../..")) + +grpc_proto_dir = os.path.abspath(os.path.join(root_dir, "src/Server/grpc_protos")) +grpc_proto_filename = "clickhouse_grpc.proto" + +# Files in the "pb2" folder which will be generated by this script. +pb2_filenames = ["clickhouse_grpc_pb2.py", "clickhouse_grpc_pb2_grpc.py"] +pb2_dir = os.path.join(script_dir, "pb2") + + +# Processes the protobuf schema with the protocol buffer compiler and generates the "pb2" folder. +def generate_pb2(): + print(f"Generating files:") + for pb2_filename in pb2_filenames: + print(os.path.join(pb2_dir, pb2_filename)) + + os.makedirs(pb2_dir, exist_ok=True) + + cmd = [ + "python3", + "-m", + "grpc_tools.protoc", + "-I" + grpc_proto_dir, + "--python_out=" + pb2_dir, + "--grpc_python_out=" + pb2_dir, + os.path.join(grpc_proto_dir, grpc_proto_filename), + ] + subprocess.run(cmd) + + for pb2_filename in pb2_filenames: + assert os.path.exists(os.path.join(pb2_dir, pb2_filename)) + print("Done! (generate_pb2)") + + +# MAIN +if __name__ == "__main__": + generate_pb2() diff --git a/utils/grpc-client/pb2/generate.py b/utils/grpc-client/pb2/generate.py deleted file mode 100755 index 2f4b3bf5af7..00000000000 --- a/utils/grpc-client/pb2/generate.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python3 - -import grpc_tools # pip3 install grpcio-tools - -import os -import subprocess - - -script_dir = os.path.dirname(os.path.realpath(__file__)) -dest_dir = script_dir -src_dir = os.path.abspath(os.path.join(script_dir, "../../../src/Server/grpc_protos")) -src_filename = "clickhouse_grpc.proto" - - -def generate(): - cmd = [ - "python3", - "-m", - "grpc_tools.protoc", - "-I" + src_dir, - "--python_out=" + dest_dir, - "--grpc_python_out=" + dest_dir, - os.path.join(src_dir, src_filename), - ] - subprocess.run(cmd) - - -if __name__ == "__main__": - generate()