Merge remote-tracking branch 'origin/master' into pr-local-plan

This commit is contained in:
Igor Nikonov 2024-07-12 21:21:04 +00:00
commit f42fe31396
20 changed files with 407 additions and 98 deletions

View File

@ -86,6 +86,7 @@ RUN curl -L --no-verbose -O 'https://archive.apache.org/dist/hadoop/common/hadoo
ENV MINIO_ROOT_USER="clickhouse" ENV MINIO_ROOT_USER="clickhouse"
ENV MINIO_ROOT_PASSWORD="clickhouse" ENV MINIO_ROOT_PASSWORD="clickhouse"
ENV EXPORT_S3_STORAGE_POLICIES=1 ENV EXPORT_S3_STORAGE_POLICIES=1
ENV CLICKHOUSE_GRPC_CLIENT="/usr/share/clickhouse-utils/grpc-client/clickhouse-grpc-client.py"
RUN npm install -g azurite@3.30.0 \ RUN npm install -g azurite@3.30.0 \
&& npm install -g tslib && npm install -g node && npm install -g tslib && npm install -g node

View File

@ -8,6 +8,7 @@ cryptography==3.4.8
dbus-python==1.2.18 dbus-python==1.2.18
distro==1.7.0 distro==1.7.0
docutils==0.17.1 docutils==0.17.1
grpcio==1.47.0
gyp==0.1 gyp==0.1
httplib2==0.20.2 httplib2==0.20.2
idna==3.3 idna==3.3
@ -28,6 +29,7 @@ packaging==24.1
pandas==1.5.3 pandas==1.5.3
pip==24.1.1 pip==24.1.1
pipdeptree==2.23.0 pipdeptree==2.23.0
protobuf==4.25.3
pyarrow==15.0.0 pyarrow==15.0.0
pyasn1==0.4.8 pyasn1==0.4.8
PyJWT==2.3.0 PyJWT==2.3.0

View File

@ -6,8 +6,8 @@ source /setup_export_logs.sh
# fail on errors, verbose and export all env variables # fail on errors, verbose and export all env variables
set -e -x -a set -e -x -a
MAX_RUN_TIME=${MAX_RUN_TIME:-10800} MAX_RUN_TIME=${MAX_RUN_TIME:-7200}
MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 10800 : MAX_RUN_TIME)) MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 7200 : MAX_RUN_TIME))
USE_DATABASE_REPLICATED=${USE_DATABASE_REPLICATED:=0} USE_DATABASE_REPLICATED=${USE_DATABASE_REPLICATED:=0}
USE_SHARED_CATALOG=${USE_SHARED_CATALOG:=0} USE_SHARED_CATALOG=${USE_SHARED_CATALOG:=0}
@ -320,7 +320,7 @@ export -f run_tests
# This should be enough to setup job and collect artifacts # This should be enough to setup job and collect artifacts
TIMEOUT=$((MAX_RUN_TIME - 600)) TIMEOUT=$((MAX_RUN_TIME - 700))
if [ "$NUM_TRIES" -gt "1" ]; then if [ "$NUM_TRIES" -gt "1" ]; then
# We don't run tests with Ordinary database in PRs, only in master. # We don't run tests with Ordinary database in PRs, only in master.
# So run new/changed tests with Ordinary at least once in flaky check. # So run new/changed tests with Ordinary at least once in flaky check.

View File

@ -11,6 +11,7 @@ TIMEOUT_SIGN = "[ Timeout! "
UNKNOWN_SIGN = "[ UNKNOWN " UNKNOWN_SIGN = "[ UNKNOWN "
SKIPPED_SIGN = "[ SKIPPED " SKIPPED_SIGN = "[ SKIPPED "
HUNG_SIGN = "Found hung queries in processlist" HUNG_SIGN = "Found hung queries in processlist"
SERVER_DIED_SIGN = "Server died, terminating all processes"
DATABASE_SIGN = "Database: " DATABASE_SIGN = "Database: "
SUCCESS_FINISH_SIGNS = ["All tests have finished", "No tests were run"] SUCCESS_FINISH_SIGNS = ["All tests have finished", "No tests were run"]
@ -25,6 +26,7 @@ def process_test_log(log_path, broken_tests):
failed = 0 failed = 0
success = 0 success = 0
hung = False hung = False
server_died = False
retries = False retries = False
success_finish = False success_finish = False
test_results = [] test_results = []
@ -41,6 +43,8 @@ def process_test_log(log_path, broken_tests):
if HUNG_SIGN in line: if HUNG_SIGN in line:
hung = True hung = True
break break
if SERVER_DIED_SIGN in line:
server_died = True
if RETRIES_SIGN in line: if RETRIES_SIGN in line:
retries = True retries = True
if any( if any(
@ -123,6 +127,7 @@ def process_test_log(log_path, broken_tests):
failed, failed,
success, success,
hung, hung,
server_died,
success_finish, success_finish,
retries, retries,
test_results, test_results,
@ -150,6 +155,7 @@ def process_result(result_path, broken_tests):
failed, failed,
success, success,
hung, hung,
server_died,
success_finish, success_finish,
retries, retries,
test_results, test_results,
@ -165,6 +171,10 @@ def process_result(result_path, broken_tests):
description = "Some queries hung, " description = "Some queries hung, "
state = "failure" state = "failure"
test_results.append(("Some queries hung", "FAIL", "0", "")) test_results.append(("Some queries hung", "FAIL", "0", ""))
elif server_died:
description = "Server died, "
state = "failure"
test_results.append(("Server died", "FAIL", "0", ""))
elif not success_finish: elif not success_finish:
description = "Tests are not finished, " description = "Tests are not finished, "
state = "failure" state = "failure"
@ -218,5 +228,20 @@ if __name__ == "__main__":
state, description, test_results = process_result(args.in_results_dir, broken_tests) state, description, test_results = process_result(args.in_results_dir, broken_tests)
logging.info("Result parsed") logging.info("Result parsed")
status = (state, description) status = (state, description)
def test_result_comparator(item):
# sort by status then by check name
order = {
"FAIL": 0,
"Timeout": 1,
"NOT_FAILED": 2,
"BROKEN": 3,
"OK": 4,
"SKIPPED": 5,
}
return order.get(item[1], 10), str(item[0]), item[1]
test_results.sort(key=test_result_comparator)
write_results(args.out_results_file, args.out_status_file, test_results, status) write_results(args.out_results_file, args.out_status_file, test_results, status)
logging.info("Result written") logging.info("Result written")

View File

@ -996,6 +996,10 @@ void ZooKeeper::receiveEvent()
if (request_info.callback) if (request_info.callback)
request_info.callback(*response); request_info.callback(*response);
/// Finalize current session if we receive a hardware error from ZooKeeper
if (err != Error::ZOK && isHardwareError(err))
finalize(/*error_send*/ false, /*error_receive*/ true, fmt::format("Hardware error: {}", err));
} }

View File

@ -49,7 +49,7 @@ namespace
const String & dest_blob_, const String & dest_blob_,
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings_, std::shared_ptr<const AzureBlobStorage::RequestSettings> settings_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_, ThreadPoolCallbackRunnerUnsafe<void> schedule_,
const Poco::Logger * log_) LoggerPtr log_)
: create_read_buffer(create_read_buffer_) : create_read_buffer(create_read_buffer_)
, client(client_) , client(client_)
, offset (offset_) , offset (offset_)
@ -74,7 +74,7 @@ namespace
const String & dest_blob; const String & dest_blob;
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings; std::shared_ptr<const AzureBlobStorage::RequestSettings> settings;
ThreadPoolCallbackRunnerUnsafe<void> schedule; ThreadPoolCallbackRunnerUnsafe<void> schedule;
const Poco::Logger * log; const LoggerPtr log;
size_t max_single_part_upload_size; size_t max_single_part_upload_size;
struct UploadPartTask struct UploadPartTask
@ -83,7 +83,6 @@ namespace
size_t part_size; size_t part_size;
std::vector<std::string> block_ids; std::vector<std::string> block_ids;
bool is_finished = false; bool is_finished = false;
std::exception_ptr exception;
}; };
size_t normal_part_size; size_t normal_part_size;
@ -92,6 +91,7 @@ namespace
std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks; std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks;
int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
std::exception_ptr bg_exception TSA_GUARDED_BY(bg_tasks_mutex);
std::mutex bg_tasks_mutex; std::mutex bg_tasks_mutex;
std::condition_variable bg_tasks_condvar; std::condition_variable bg_tasks_condvar;
@ -186,7 +186,7 @@ namespace
} }
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(log, fmt::format("While performing multipart upload of blob {} in container {}", dest_blob, dest_container_for_logging));
waitForAllBackgroundTasks(); waitForAllBackgroundTasks();
throw; throw;
} }
@ -242,7 +242,12 @@ namespace
} }
catch (...) catch (...)
{ {
task->exception = std::current_exception(); std::lock_guard lock(bg_tasks_mutex);
if (!bg_exception)
{
tryLogCurrentException(log, "While writing part");
bg_exception = std::current_exception(); /// The exception will be rethrown after all background tasks stop working.
}
} }
task_finish_notify(); task_finish_notify();
}, Priority{}); }, Priority{});
@ -299,13 +304,13 @@ namespace
/// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock /// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock
bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); }); bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(bg_tasks); auto exception = TSA_SUPPRESS_WARNING_FOR_READ(bg_exception);
for (auto & task : tasks) if (exception)
{ std::rethrow_exception(exception);
if (task.exception)
std::rethrow_exception(task.exception); const auto & tasks = TSA_SUPPRESS_WARNING_FOR_READ(bg_tasks);
for (const auto & task : tasks)
block_ids.insert(block_ids.end(),task.block_ids.begin(), task.block_ids.end()); block_ids.insert(block_ids.end(),task.block_ids.begin(), task.block_ids.end());
}
} }
}; };
} }
@ -321,7 +326,8 @@ void copyDataToAzureBlobStorageFile(
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings, std::shared_ptr<const AzureBlobStorage::RequestSettings> settings,
ThreadPoolCallbackRunnerUnsafe<void> schedule) ThreadPoolCallbackRunnerUnsafe<void> schedule)
{ {
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyDataToAzureBlobStorageFile")}; auto log = getLogger("copyDataToAzureBlobStorageFile");
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, log};
helper.performCopy(); helper.performCopy();
} }
@ -339,9 +345,11 @@ void copyAzureBlobStorageFile(
const ReadSettings & read_settings, const ReadSettings & read_settings,
ThreadPoolCallbackRunnerUnsafe<void> schedule) ThreadPoolCallbackRunnerUnsafe<void> schedule)
{ {
auto log = getLogger("copyAzureBlobStorageFile");
if (settings->use_native_copy) if (settings->use_native_copy)
{ {
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob); LOG_TRACE(log, "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob);
ProfileEvents::increment(ProfileEvents::AzureCopyObject); ProfileEvents::increment(ProfileEvents::AzureCopyObject);
if (dest_client->GetClickhouseOptions().IsClientForDisk) if (dest_client->GetClickhouseOptions().IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject); ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
@ -352,7 +360,7 @@ void copyAzureBlobStorageFile(
if (size < settings->max_single_part_copy_size) if (size < settings->max_single_part_copy_size)
{ {
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy blob sync {} -> {}", src_blob, dest_blob); LOG_TRACE(log, "Copy blob sync {} -> {}", src_blob, dest_blob);
block_blob_client_dest.CopyFromUri(source_uri); block_blob_client_dest.CopyFromUri(source_uri);
} }
else else
@ -368,7 +376,7 @@ void copyAzureBlobStorageFile(
if (copy_status.HasValue() && copy_status.Value() == Azure::Storage::Blobs::Models::CopyStatus::Success) if (copy_status.HasValue() && copy_status.Value() == Azure::Storage::Blobs::Models::CopyStatus::Success)
{ {
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy of {} to {} finished", properties_model.CopySource.Value(), dest_blob); LOG_TRACE(log, "Copy of {} to {} finished", properties_model.CopySource.Value(), dest_blob);
} }
else else
{ {
@ -382,14 +390,14 @@ void copyAzureBlobStorageFile(
} }
else else
{ {
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob); LOG_TRACE(log, "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob);
auto create_read_buffer = [&] auto create_read_buffer = [&]
{ {
return std::make_unique<ReadBufferFromAzureBlobStorage>( return std::make_unique<ReadBufferFromAzureBlobStorage>(
src_client, src_blob, read_settings, settings->max_single_read_retries, settings->max_single_download_retries); src_client, src_blob, read_settings, settings->max_single_read_retries, settings->max_single_download_retries);
}; };
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyAzureBlobStorageFile")}; UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, log};
helper.performCopy(); helper.performCopy();
} }
} }

View File

@ -98,7 +98,6 @@ namespace
size_t part_size; size_t part_size;
String tag; String tag;
bool is_finished = false; bool is_finished = false;
std::exception_ptr exception;
}; };
size_t num_parts; size_t num_parts;
@ -111,6 +110,7 @@ namespace
size_t num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; size_t num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
size_t num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; size_t num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
size_t num_finished_parts TSA_GUARDED_BY(bg_tasks_mutex) = 0; size_t num_finished_parts TSA_GUARDED_BY(bg_tasks_mutex) = 0;
std::exception_ptr bg_exception TSA_GUARDED_BY(bg_tasks_mutex);
std::mutex bg_tasks_mutex; std::mutex bg_tasks_mutex;
std::condition_variable bg_tasks_condvar; std::condition_variable bg_tasks_condvar;
@ -273,7 +273,7 @@ namespace
} }
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(log, fmt::format("While performing multipart upload of {}", dest_key));
// Multipart upload failed because it wasn't possible to schedule all the tasks. // Multipart upload failed because it wasn't possible to schedule all the tasks.
// To avoid execution of already scheduled tasks we abort MultipartUpload. // To avoid execution of already scheduled tasks we abort MultipartUpload.
abortMultipartUpload(); abortMultipartUpload();
@ -385,7 +385,12 @@ namespace
} }
catch (...) catch (...)
{ {
task->exception = std::current_exception(); std::lock_guard lock(bg_tasks_mutex);
if (!bg_exception)
{
tryLogCurrentException(log, fmt::format("While writing part #{}", task->part_number));
bg_exception = std::current_exception(); /// The exception will be rethrown after all background tasks stop working.
}
} }
task_finish_notify(); task_finish_notify();
}, Priority{}); }, Priority{});
@ -435,22 +440,21 @@ namespace
/// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock /// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock
bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); }); bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(bg_tasks); auto exception = TSA_SUPPRESS_WARNING_FOR_READ(bg_exception);
for (auto & task : tasks) if (exception)
{ {
if (task.exception) /// abortMultipartUpload() might be called already, see processUploadPartRequest().
{ /// However if there were concurrent uploads at that time, those part uploads might or might not succeed.
/// abortMultipartUpload() might be called already, see processUploadPartRequest(). /// As a result, it might be necessary to abort a given multipart upload multiple times in order to completely free
/// However if there were concurrent uploads at that time, those part uploads might or might not succeed. /// all storage consumed by all parts.
/// As a result, it might be necessary to abort a given multipart upload multiple times in order to completely free abortMultipartUpload();
/// all storage consumed by all parts.
abortMultipartUpload();
std::rethrow_exception(task.exception); std::rethrow_exception(exception);
}
part_tags.push_back(task.tag);
} }
const auto & tasks = TSA_SUPPRESS_WARNING_FOR_READ(bg_tasks);
for (const auto & task : tasks)
part_tags.push_back(task.tag);
} }
}; };

View File

@ -995,6 +995,10 @@ def main() -> int:
ci_settings, ci_settings,
args.skip_jobs, args.skip_jobs,
) )
if IS_CI and pr_info.is_pr:
ci_cache.filter_out_not_affected_jobs()
ci_cache.print_status() ci_cache.print_status()
if IS_CI and not pr_info.is_merge_queue: if IS_CI and not pr_info.is_merge_queue:

View File

@ -674,6 +674,78 @@ class CiCache:
bucket=S3_BUILDS_BUCKET, file_path=result_json_path, s3_path=s3_path bucket=S3_BUILDS_BUCKET, file_path=result_json_path, s3_path=s3_path
) )
def filter_out_not_affected_jobs(self):
"""
Filter is to be applied in PRs to remove jobs that are not affected by the change
It removes jobs from @jobs_to_do if it is a:
1. test job and it is in @jobs_to_wait (no need to wait not affected jobs in PRs)
2. test job and it has finished on release branch (even if failed)
2. build job which is not required by any test job that is left in @jobs_to_do
:return:
"""
# 1.
remove_from_await_list = []
for job_name, job_config in self.jobs_to_wait.items():
if CI.is_test_job(job_name) and job_name != CI.JobNames.BUILD_CHECK:
remove_from_await_list.append(job_name)
for job in remove_from_await_list:
print(f"Filter job [{job}] - test job and not affected by the change")
del self.jobs_to_wait[job]
del self.jobs_to_do[job]
# 2.
remove_from_to_do = []
for job_name, job_config in self.jobs_to_do.items():
if CI.is_test_job(job_name) and job_name != CI.JobNames.BUILD_CHECK:
batches_to_remove = []
assert job_config.batches is not None
for batch in job_config.batches:
if self.is_failed(
job_name, batch, job_config.num_batches, release_branch=True
):
print(
f"Filter [{job_name}/{batch}] - not affected by the change (failed on release branch)"
)
batches_to_remove.append(batch)
for batch in batches_to_remove:
job_config.batches.remove(batch)
if not job_config.batches:
print(
f"Filter [{job_name}] - not affected by the change (failed on release branch)"
)
remove_from_to_do.append(job_name)
for job in remove_from_to_do:
del self.jobs_to_do[job]
# 3.
required_builds = [] # type: List[str]
for job_name, job_config in self.jobs_to_do.items():
if CI.is_test_job(job_name) and job_config.required_builds:
required_builds += job_config.required_builds
required_builds = list(set(required_builds))
remove_builds = [] # type: List[str]
has_builds_to_do = False
for job_name, job_config in self.jobs_to_do.items():
if CI.is_build_job(job_name):
if job_name not in required_builds:
remove_builds.append(job_name)
else:
has_builds_to_do = True
for build_job in remove_builds:
print(
f"Filter build job [{build_job}] - not affected and not required by test jobs"
)
del self.jobs_to_do[build_job]
if build_job in self.jobs_to_wait:
del self.jobs_to_wait[build_job]
if not has_builds_to_do and CI.JobNames.BUILD_CHECK in self.jobs_to_do:
print(f"Filter job [{CI.JobNames.BUILD_CHECK}] - no builds to do")
del self.jobs_to_do[CI.JobNames.BUILD_CHECK]
def await_pending_jobs(self, is_release: bool, dry_run: bool = False) -> None: def await_pending_jobs(self, is_release: bool, dry_run: bool = False) -> None:
""" """
await pending jobs to be finished await pending jobs to be finished

View File

@ -378,7 +378,7 @@ class CommonJobConfigs:
), ),
run_command='functional_test_check.py "$CHECK_NAME"', run_command='functional_test_check.py "$CHECK_NAME"',
runner_type=Runners.FUNC_TESTER, runner_type=Runners.FUNC_TESTER,
timeout=10800, timeout=7200,
) )
STATEFUL_TEST = JobConfig( STATEFUL_TEST = JobConfig(
job_name_keyword="stateful", job_name_keyword="stateful",

View File

@ -108,6 +108,7 @@ def get_run_command(
"--privileged " "--privileged "
f"{ci_logs_args}" f"{ci_logs_args}"
f"--volume={repo_path}/tests:/usr/share/clickhouse-test " f"--volume={repo_path}/tests:/usr/share/clickhouse-test "
f"--volume={repo_path}/utils/grpc-client:/usr/share/clickhouse-utils/grpc-client "
f"{volume_with_broken_test}" f"{volume_with_broken_test}"
f"--volume={result_path}:/test_output " f"--volume={result_path}:/test_output "
f"--volume={server_log_path}:/var/log/clickhouse-server " f"--volume={server_log_path}:/var/log/clickhouse-server "

View File

@ -1,4 +1,5 @@
"""Module to get the token for GitHub""" """Module to get the token for GitHub"""
from dataclasses import dataclass from dataclasses import dataclass
import json import json
import time import time

View File

@ -417,7 +417,7 @@ class TestCIConfig(unittest.TestCase):
assert not ci_cache.jobs_to_skip assert not ci_cache.jobs_to_skip
assert not ci_cache.jobs_to_wait assert not ci_cache.jobs_to_wait
# pretend there are pending jobs that we neet to wait # pretend there are pending jobs that we need to wait
ci_cache.jobs_to_wait = dict(ci_cache.jobs_to_do) ci_cache.jobs_to_wait = dict(ci_cache.jobs_to_do)
for job, config in ci_cache.jobs_to_wait.items(): for job, config in ci_cache.jobs_to_wait.items():
assert not config.pending_batches assert not config.pending_batches
@ -489,3 +489,87 @@ class TestCIConfig(unittest.TestCase):
self.assertCountEqual( self.assertCountEqual(
list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip, all_jobs_in_wf list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip, all_jobs_in_wf
) )
def test_ci_py_filters_not_affected_jobs_in_prs(self):
"""
checks ci.py filters not affected jobs in PRs
"""
settings = CiSettings()
settings.no_ci_cache = True
pr_info = PRInfo(github_event=_TEST_EVENT_JSON)
pr_info.event_type = EventType.PUSH
pr_info.number = 0
assert pr_info.is_release and not pr_info.is_merge_queue
ci_cache = CIPY._configure_jobs(
S3Helper(), pr_info, settings, skip_jobs=False, dry_run=True
)
self.assertTrue(not ci_cache.jobs_to_skip, "Must be no jobs in skip list")
all_jobs_in_wf = list(ci_cache.jobs_to_do)
assert not ci_cache.jobs_to_wait
assert not ci_cache.jobs_to_skip
# pretend there are pending jobs that we need to wait
for job, job_config in ci_cache.jobs_to_do.items():
ci_cache.jobs_to_wait[job] = job_config
# remove couple tests from to_wait and
# expect they are preserved in @jobs_to_to along with required package_asan
del ci_cache.jobs_to_wait[CI.JobNames.STATELESS_TEST_ASAN]
del ci_cache.jobs_to_wait[CI.JobNames.INTEGRATION_TEST_TSAN]
del ci_cache.jobs_to_wait[CI.JobNames.STATELESS_TEST_MSAN]
# pretend we have some batches failed for one of the job from the to_do list
failed_job = CI.JobNames.INTEGRATION_TEST_TSAN
failed_job_config = ci_cache.jobs_to_do[failed_job]
FAILED_BATCHES = [0, 3]
for batch in FAILED_BATCHES:
assert batch < failed_job_config.num_batches
record = CiCache.Record(
record_type=CiCache.RecordType.FAILED,
job_name=failed_job,
job_digest=ci_cache.job_digests[failed_job],
batch=batch,
num_batches=failed_job_config.num_batches,
release_branch=True,
)
for record_t_, records_ in ci_cache.records.items():
if record_t_.value == CiCache.RecordType.FAILED.value:
records_[record.to_str_key()] = record
# pretend we have all batches failed for one of the job from the to_do list
failed_job = CI.JobNames.STATELESS_TEST_MSAN
failed_job_config = ci_cache.jobs_to_do[failed_job]
assert failed_job_config.num_batches > 1
for batch in range(failed_job_config.num_batches):
record = CiCache.Record(
record_type=CiCache.RecordType.FAILED,
job_name=failed_job,
job_digest=ci_cache.job_digests[failed_job],
batch=batch,
num_batches=failed_job_config.num_batches,
release_branch=True,
)
for record_t_, records_ in ci_cache.records.items():
if record_t_.value == CiCache.RecordType.FAILED.value:
records_[record.to_str_key()] = record
ci_cache.filter_out_not_affected_jobs()
expected_to_do = [
CI.JobNames.STATELESS_TEST_ASAN,
CI.BuildNames.PACKAGE_ASAN,
CI.JobNames.INTEGRATION_TEST_TSAN,
CI.BuildNames.PACKAGE_TSAN,
CI.JobNames.BUILD_CHECK,
]
self.assertCountEqual(
list(ci_cache.jobs_to_wait),
[
CI.BuildNames.PACKAGE_ASAN,
CI.BuildNames.PACKAGE_TSAN,
CI.JobNames.BUILD_CHECK,
],
)
self.assertCountEqual(list(ci_cache.jobs_to_do), expected_to_do)
self.assertTrue(ci_cache.jobs_to_do[CI.JobNames.INTEGRATION_TEST_TSAN].batches)
for batch in ci_cache.jobs_to_do[CI.JobNames.INTEGRATION_TEST_TSAN].batches:
self.assertTrue(batch not in FAILED_BATCHES)

View File

@ -1751,7 +1751,7 @@ class TestCase:
return TestResult( return TestResult(
self.name, self.name,
TestStatus.FAIL, TestStatus.FAIL,
FailureReason.INTERNAL_QUERY_FAIL, FailureReason.TIMEOUT,
total_time, total_time,
self.add_info_about_settings( self.add_info_about_settings(
self.get_description_from_exception_info(sys.exc_info()) self.get_description_from_exception_info(sys.exc_info())
@ -2190,11 +2190,26 @@ def run_tests_array(all_tests_with_params: Tuple[List[str], int, TestSuite, bool
sys.stdout.flush() sys.stdout.flush()
while True: while True:
test_result = test_case.run( # This is the upper level timeout
args, test_suite, client_options, server_logs_level # It helps with completely frozen processes, like in case of gdb errors
) def timeout_handler(signum, frame):
test_result = test_case.process_result(test_result, MESSAGES) raise TimeoutError("Test execution timed out")
if not test_result.need_retry:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(args.timeout * 1.1))
test_result = None
try:
test_result = test_case.run(
args, test_suite, client_options, server_logs_level
)
test_result = test_case.process_result(test_result, MESSAGES)
break
except TimeoutError:
break
finally:
signal.alarm(0)
if not test_result or not test_result.need_retry:
break break
restarted_tests.append(test_result) restarted_tests.append(test_result)
@ -2453,6 +2468,10 @@ def override_envs(*args_, **kwargs):
run_tests_array(*args_, **kwargs) run_tests_array(*args_, **kwargs)
def run_tests_process(*args, **kwargs):
return run_tests_array(*args, **kwargs)
def do_run_tests(jobs, test_suite: TestSuite): def do_run_tests(jobs, test_suite: TestSuite):
if jobs > 1 and len(test_suite.parallel_tests) > 0: if jobs > 1 and len(test_suite.parallel_tests) > 0:
print( print(
@ -2476,39 +2495,70 @@ def do_run_tests(jobs, test_suite: TestSuite):
# of failures will be nearly the same for all tests from the group. # of failures will be nearly the same for all tests from the group.
random.shuffle(test_suite.parallel_tests) random.shuffle(test_suite.parallel_tests)
batch_size = max(1, len(test_suite.parallel_tests) // jobs) batch_size = max(1, (len(test_suite.parallel_tests) // jobs) + 1)
parallel_tests_array = [] parallel_tests_array = []
for job in range(jobs): for job in range(jobs):
range_ = job * batch_size, job * batch_size + batch_size range_ = job * batch_size, job * batch_size + batch_size
batch = test_suite.parallel_tests[range_[0] : range_[1]] batch = test_suite.parallel_tests[range_[0] : range_[1]]
parallel_tests_array.append((batch, batch_size, test_suite, True)) parallel_tests_array.append((batch, batch_size, test_suite, True))
try: processes = []
with multiprocessing.Pool(processes=jobs + 1) as pool:
future = pool.map_async(run_tests_array, parallel_tests_array)
if args.run_sequential_tests_in_parallel: for test_batch in parallel_tests_array:
# Run parallel tests and sequential tests at the same time process = multiprocessing.Process(
# Sequential tests will use different ClickHouse instance target=run_tests_process, args=(test_batch,)
# In this process we can safely override values in `args` and `os.environ` )
future_seq = pool.map_async( processes.append(process)
override_envs, process.start()
[
(
test_suite.sequential_tests,
len(test_suite.sequential_tests),
test_suite,
False,
)
],
)
future_seq.wait()
future.wait() if args.run_sequential_tests_in_parallel:
finally: # Run parallel tests and sequential tests at the same time
pool.terminate() # Sequential tests will use different ClickHouse instance
pool.close() # In this process we can safely override values in `args` and `os.environ`
pool.join() process = multiprocessing.Process(
target=override_envs,
args=(
(
test_suite.sequential_tests,
len(test_suite.sequential_tests),
test_suite,
False,
),
),
)
processes.append(process)
process.start()
while processes:
sys.stdout.flush()
# Periodically check the server for hangs
# and stop all processes in this case
try:
clickhouse_execute(
args,
query="SELECT 1 /*hang up check*/",
max_http_retries=5,
timeout=20,
)
except Exception:
print("Hang up check failed")
server_died.set()
if server_died.is_set():
print("Server died, terminating all processes...")
kill_gdb_if_any()
# Wait for test results
sleep(args.timeout)
for p in processes:
if p.is_alive():
p.terminate()
break
for p in processes[:]:
if not p.is_alive():
processes.remove(p)
sleep(5)
if not args.run_sequential_tests_in_parallel: if not args.run_sequential_tests_in_parallel:
run_tests_array( run_tests_array(
@ -3359,6 +3409,14 @@ def parse_args():
return parser.parse_args() return parser.parse_args()
class Terminated(KeyboardInterrupt):
pass
def signal_handler(sig, frame):
raise Terminated(f"Terminated with {sig} signal")
if __name__ == "__main__": if __name__ == "__main__":
stop_time = None stop_time = None
exit_code = multiprocessing.Value("i", 0) exit_code = multiprocessing.Value("i", 0)
@ -3370,6 +3428,9 @@ if __name__ == "__main__":
# infinite tests processes left # infinite tests processes left
# (new process group is required to avoid killing some parent processes) # (new process group is required to avoid killing some parent processes)
os.setpgid(0, 0) os.setpgid(0, 0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
try: try:
args = parse_args() args = parse_args()

View File

@ -0,0 +1,3 @@
<clickhouse>
<grpc_port>9100</grpc_port>
</clickhouse>

View File

@ -27,6 +27,7 @@ ln -sf $SRC_PATH/config.d/secure_ports.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/grpc_protocol.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/

View File

@ -0,0 +1 @@
ok

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tag no-fasttest: In fasttest, ENABLE_LIBRARIES=0, so the grpc library is not built
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
if [[ -z "$CLICKHOUSE_GRPC_CLIENT" ]]; then
CLICKHOUSE_GRPC_CLIENT="$CURDIR/../../../utils/grpc-client/clickhouse-grpc-client.py"
fi
# Simple test.
$CLICKHOUSE_GRPC_CLIENT --query "SELECT 'ok'"

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
# This is a helper utility.
# It generates files in the "pb2" folder using the protocol buffer compiler.
# This script must be called manually after any change pf "clickhouse_grpc.proto"
import grpc_tools # pip3 install grpcio-tools
import os, shutil, subprocess
# Settings.
script_path = os.path.realpath(__file__)
script_name = os.path.basename(script_path)
script_dir = os.path.dirname(script_path)
root_dir = os.path.abspath(os.path.join(script_dir, "../.."))
grpc_proto_dir = os.path.abspath(os.path.join(root_dir, "src/Server/grpc_protos"))
grpc_proto_filename = "clickhouse_grpc.proto"
# Files in the "pb2" folder which will be generated by this script.
pb2_filenames = ["clickhouse_grpc_pb2.py", "clickhouse_grpc_pb2_grpc.py"]
pb2_dir = os.path.join(script_dir, "pb2")
# Processes the protobuf schema with the protocol buffer compiler and generates the "pb2" folder.
def generate_pb2():
print(f"Generating files:")
for pb2_filename in pb2_filenames:
print(os.path.join(pb2_dir, pb2_filename))
os.makedirs(pb2_dir, exist_ok=True)
cmd = [
"python3",
"-m",
"grpc_tools.protoc",
"-I" + grpc_proto_dir,
"--python_out=" + pb2_dir,
"--grpc_python_out=" + pb2_dir,
os.path.join(grpc_proto_dir, grpc_proto_filename),
]
subprocess.run(cmd)
for pb2_filename in pb2_filenames:
assert os.path.exists(os.path.join(pb2_dir, pb2_filename))
print("Done! (generate_pb2)")
# MAIN
if __name__ == "__main__":
generate_pb2()

View File

@ -1,29 +0,0 @@
#!/usr/bin/env python3
import grpc_tools # pip3 install grpcio-tools
import os
import subprocess
script_dir = os.path.dirname(os.path.realpath(__file__))
dest_dir = script_dir
src_dir = os.path.abspath(os.path.join(script_dir, "../../../src/Server/grpc_protos"))
src_filename = "clickhouse_grpc.proto"
def generate():
cmd = [
"python3",
"-m",
"grpc_tools.protoc",
"-I" + src_dir,
"--python_out=" + dest_dir,
"--grpc_python_out=" + dest_dir,
os.path.join(src_dir, src_filename),
]
subprocess.run(cmd)
if __name__ == "__main__":
generate()