mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge branch 'master' into limit_linker_jobs_on_aarch
This commit is contained in:
commit
5289a3dc22
@ -86,6 +86,7 @@ RUN curl -L --no-verbose -O 'https://archive.apache.org/dist/hadoop/common/hadoo
|
||||
ENV MINIO_ROOT_USER="clickhouse"
|
||||
ENV MINIO_ROOT_PASSWORD="clickhouse"
|
||||
ENV EXPORT_S3_STORAGE_POLICIES=1
|
||||
ENV CLICKHOUSE_GRPC_CLIENT="/usr/share/clickhouse-utils/grpc-client/clickhouse-grpc-client.py"
|
||||
|
||||
RUN npm install -g azurite@3.30.0 \
|
||||
&& npm install -g tslib && npm install -g node
|
||||
|
@ -8,6 +8,7 @@ cryptography==3.4.8
|
||||
dbus-python==1.2.18
|
||||
distro==1.7.0
|
||||
docutils==0.17.1
|
||||
grpcio==1.47.0
|
||||
gyp==0.1
|
||||
httplib2==0.20.2
|
||||
idna==3.3
|
||||
@ -28,6 +29,7 @@ packaging==24.1
|
||||
pandas==1.5.3
|
||||
pip==24.1.1
|
||||
pipdeptree==2.23.0
|
||||
protobuf==4.25.3
|
||||
pyarrow==15.0.0
|
||||
pyasn1==0.4.8
|
||||
PyJWT==2.3.0
|
||||
|
@ -6,8 +6,8 @@ source /setup_export_logs.sh
|
||||
# fail on errors, verbose and export all env variables
|
||||
set -e -x -a
|
||||
|
||||
MAX_RUN_TIME=${MAX_RUN_TIME:-10800}
|
||||
MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 10800 : MAX_RUN_TIME))
|
||||
MAX_RUN_TIME=${MAX_RUN_TIME:-7200}
|
||||
MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 7200 : MAX_RUN_TIME))
|
||||
|
||||
USE_DATABASE_REPLICATED=${USE_DATABASE_REPLICATED:=0}
|
||||
USE_SHARED_CATALOG=${USE_SHARED_CATALOG:=0}
|
||||
@ -320,7 +320,7 @@ export -f run_tests
|
||||
|
||||
|
||||
# This should be enough to setup job and collect artifacts
|
||||
TIMEOUT=$((MAX_RUN_TIME - 600))
|
||||
TIMEOUT=$((MAX_RUN_TIME - 700))
|
||||
if [ "$NUM_TRIES" -gt "1" ]; then
|
||||
# We don't run tests with Ordinary database in PRs, only in master.
|
||||
# So run new/changed tests with Ordinary at least once in flaky check.
|
||||
|
@ -11,6 +11,7 @@ TIMEOUT_SIGN = "[ Timeout! "
|
||||
UNKNOWN_SIGN = "[ UNKNOWN "
|
||||
SKIPPED_SIGN = "[ SKIPPED "
|
||||
HUNG_SIGN = "Found hung queries in processlist"
|
||||
SERVER_DIED_SIGN = "Server died, terminating all processes"
|
||||
DATABASE_SIGN = "Database: "
|
||||
|
||||
SUCCESS_FINISH_SIGNS = ["All tests have finished", "No tests were run"]
|
||||
@ -25,6 +26,7 @@ def process_test_log(log_path, broken_tests):
|
||||
failed = 0
|
||||
success = 0
|
||||
hung = False
|
||||
server_died = False
|
||||
retries = False
|
||||
success_finish = False
|
||||
test_results = []
|
||||
@ -41,6 +43,8 @@ def process_test_log(log_path, broken_tests):
|
||||
if HUNG_SIGN in line:
|
||||
hung = True
|
||||
break
|
||||
if SERVER_DIED_SIGN in line:
|
||||
server_died = True
|
||||
if RETRIES_SIGN in line:
|
||||
retries = True
|
||||
if any(
|
||||
@ -123,6 +127,7 @@ def process_test_log(log_path, broken_tests):
|
||||
failed,
|
||||
success,
|
||||
hung,
|
||||
server_died,
|
||||
success_finish,
|
||||
retries,
|
||||
test_results,
|
||||
@ -150,6 +155,7 @@ def process_result(result_path, broken_tests):
|
||||
failed,
|
||||
success,
|
||||
hung,
|
||||
server_died,
|
||||
success_finish,
|
||||
retries,
|
||||
test_results,
|
||||
@ -165,6 +171,10 @@ def process_result(result_path, broken_tests):
|
||||
description = "Some queries hung, "
|
||||
state = "failure"
|
||||
test_results.append(("Some queries hung", "FAIL", "0", ""))
|
||||
elif server_died:
|
||||
description = "Server died, "
|
||||
state = "failure"
|
||||
test_results.append(("Server died", "FAIL", "0", ""))
|
||||
elif not success_finish:
|
||||
description = "Tests are not finished, "
|
||||
state = "failure"
|
||||
@ -218,5 +228,20 @@ if __name__ == "__main__":
|
||||
state, description, test_results = process_result(args.in_results_dir, broken_tests)
|
||||
logging.info("Result parsed")
|
||||
status = (state, description)
|
||||
|
||||
def test_result_comparator(item):
|
||||
# sort by status then by check name
|
||||
order = {
|
||||
"FAIL": 0,
|
||||
"Timeout": 1,
|
||||
"NOT_FAILED": 2,
|
||||
"BROKEN": 3,
|
||||
"OK": 4,
|
||||
"SKIPPED": 5,
|
||||
}
|
||||
return order.get(item[1], 10), str(item[0]), item[1]
|
||||
|
||||
test_results.sort(key=test_result_comparator)
|
||||
|
||||
write_results(args.out_results_file, args.out_status_file, test_results, status)
|
||||
logging.info("Result written")
|
||||
|
@ -996,6 +996,10 @@ void ZooKeeper::receiveEvent()
|
||||
|
||||
if (request_info.callback)
|
||||
request_info.callback(*response);
|
||||
|
||||
/// Finalize current session if we receive a hardware error from ZooKeeper
|
||||
if (err != Error::ZOK && isHardwareError(err))
|
||||
finalize(/*error_send*/ false, /*error_receive*/ true, fmt::format("Hardware error: {}", err));
|
||||
}
|
||||
|
||||
|
||||
|
@ -25,7 +25,7 @@ namespace DB
|
||||
template <typename To, typename From>
|
||||
inline To assert_cast(From && from)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
#ifdef ABORT_ON_LOGICAL_ERROR
|
||||
try
|
||||
{
|
||||
if constexpr (std::is_pointer_v<To>)
|
||||
|
@ -49,7 +49,7 @@ namespace
|
||||
const String & dest_blob_,
|
||||
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings_,
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
|
||||
const Poco::Logger * log_)
|
||||
LoggerPtr log_)
|
||||
: create_read_buffer(create_read_buffer_)
|
||||
, client(client_)
|
||||
, offset (offset_)
|
||||
@ -74,7 +74,7 @@ namespace
|
||||
const String & dest_blob;
|
||||
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings;
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule;
|
||||
const Poco::Logger * log;
|
||||
const LoggerPtr log;
|
||||
size_t max_single_part_upload_size;
|
||||
|
||||
struct UploadPartTask
|
||||
@ -83,7 +83,6 @@ namespace
|
||||
size_t part_size;
|
||||
std::vector<std::string> block_ids;
|
||||
bool is_finished = false;
|
||||
std::exception_ptr exception;
|
||||
};
|
||||
|
||||
size_t normal_part_size;
|
||||
@ -92,6 +91,7 @@ namespace
|
||||
std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks;
|
||||
int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
|
||||
int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
|
||||
std::exception_ptr bg_exception TSA_GUARDED_BY(bg_tasks_mutex);
|
||||
std::mutex bg_tasks_mutex;
|
||||
std::condition_variable bg_tasks_condvar;
|
||||
|
||||
@ -186,7 +186,7 @@ namespace
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
tryLogCurrentException(log, fmt::format("While performing multipart upload of blob {} in container {}", dest_blob, dest_container_for_logging));
|
||||
waitForAllBackgroundTasks();
|
||||
throw;
|
||||
}
|
||||
@ -242,7 +242,12 @@ namespace
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
task->exception = std::current_exception();
|
||||
std::lock_guard lock(bg_tasks_mutex);
|
||||
if (!bg_exception)
|
||||
{
|
||||
tryLogCurrentException(log, "While writing part");
|
||||
bg_exception = std::current_exception(); /// The exception will be rethrown after all background tasks stop working.
|
||||
}
|
||||
}
|
||||
task_finish_notify();
|
||||
}, Priority{});
|
||||
@ -299,13 +304,13 @@ namespace
|
||||
/// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock
|
||||
bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
|
||||
|
||||
auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(bg_tasks);
|
||||
for (auto & task : tasks)
|
||||
{
|
||||
if (task.exception)
|
||||
std::rethrow_exception(task.exception);
|
||||
auto exception = TSA_SUPPRESS_WARNING_FOR_READ(bg_exception);
|
||||
if (exception)
|
||||
std::rethrow_exception(exception);
|
||||
|
||||
const auto & tasks = TSA_SUPPRESS_WARNING_FOR_READ(bg_tasks);
|
||||
for (const auto & task : tasks)
|
||||
block_ids.insert(block_ids.end(),task.block_ids.begin(), task.block_ids.end());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -321,7 +326,8 @@ void copyDataToAzureBlobStorageFile(
|
||||
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings,
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule)
|
||||
{
|
||||
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyDataToAzureBlobStorageFile")};
|
||||
auto log = getLogger("copyDataToAzureBlobStorageFile");
|
||||
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, log};
|
||||
helper.performCopy();
|
||||
}
|
||||
|
||||
@ -339,9 +345,11 @@ void copyAzureBlobStorageFile(
|
||||
const ReadSettings & read_settings,
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule)
|
||||
{
|
||||
auto log = getLogger("copyAzureBlobStorageFile");
|
||||
|
||||
if (settings->use_native_copy)
|
||||
{
|
||||
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob);
|
||||
LOG_TRACE(log, "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob);
|
||||
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
|
||||
if (dest_client->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
|
||||
@ -352,7 +360,7 @@ void copyAzureBlobStorageFile(
|
||||
|
||||
if (size < settings->max_single_part_copy_size)
|
||||
{
|
||||
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy blob sync {} -> {}", src_blob, dest_blob);
|
||||
LOG_TRACE(log, "Copy blob sync {} -> {}", src_blob, dest_blob);
|
||||
block_blob_client_dest.CopyFromUri(source_uri);
|
||||
}
|
||||
else
|
||||
@ -368,7 +376,7 @@ void copyAzureBlobStorageFile(
|
||||
|
||||
if (copy_status.HasValue() && copy_status.Value() == Azure::Storage::Blobs::Models::CopyStatus::Success)
|
||||
{
|
||||
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy of {} to {} finished", properties_model.CopySource.Value(), dest_blob);
|
||||
LOG_TRACE(log, "Copy of {} to {} finished", properties_model.CopySource.Value(), dest_blob);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -382,14 +390,14 @@ void copyAzureBlobStorageFile(
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob);
|
||||
LOG_TRACE(log, "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob);
|
||||
auto create_read_buffer = [&]
|
||||
{
|
||||
return std::make_unique<ReadBufferFromAzureBlobStorage>(
|
||||
src_client, src_blob, read_settings, settings->max_single_read_retries, settings->max_single_download_retries);
|
||||
};
|
||||
|
||||
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyAzureBlobStorageFile")};
|
||||
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, log};
|
||||
helper.performCopy();
|
||||
}
|
||||
}
|
||||
|
@ -98,7 +98,6 @@ namespace
|
||||
size_t part_size;
|
||||
String tag;
|
||||
bool is_finished = false;
|
||||
std::exception_ptr exception;
|
||||
};
|
||||
|
||||
size_t num_parts;
|
||||
@ -111,6 +110,7 @@ namespace
|
||||
size_t num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
|
||||
size_t num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
|
||||
size_t num_finished_parts TSA_GUARDED_BY(bg_tasks_mutex) = 0;
|
||||
std::exception_ptr bg_exception TSA_GUARDED_BY(bg_tasks_mutex);
|
||||
std::mutex bg_tasks_mutex;
|
||||
std::condition_variable bg_tasks_condvar;
|
||||
|
||||
@ -273,7 +273,7 @@ namespace
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
tryLogCurrentException(log, fmt::format("While performing multipart upload of {}", dest_key));
|
||||
// Multipart upload failed because it wasn't possible to schedule all the tasks.
|
||||
// To avoid execution of already scheduled tasks we abort MultipartUpload.
|
||||
abortMultipartUpload();
|
||||
@ -385,7 +385,12 @@ namespace
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
task->exception = std::current_exception();
|
||||
std::lock_guard lock(bg_tasks_mutex);
|
||||
if (!bg_exception)
|
||||
{
|
||||
tryLogCurrentException(log, fmt::format("While writing part #{}", task->part_number));
|
||||
bg_exception = std::current_exception(); /// The exception will be rethrown after all background tasks stop working.
|
||||
}
|
||||
}
|
||||
task_finish_notify();
|
||||
}, Priority{});
|
||||
@ -435,22 +440,21 @@ namespace
|
||||
/// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock
|
||||
bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
|
||||
|
||||
auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(bg_tasks);
|
||||
for (auto & task : tasks)
|
||||
auto exception = TSA_SUPPRESS_WARNING_FOR_READ(bg_exception);
|
||||
if (exception)
|
||||
{
|
||||
if (task.exception)
|
||||
{
|
||||
/// abortMultipartUpload() might be called already, see processUploadPartRequest().
|
||||
/// However if there were concurrent uploads at that time, those part uploads might or might not succeed.
|
||||
/// As a result, it might be necessary to abort a given multipart upload multiple times in order to completely free
|
||||
/// all storage consumed by all parts.
|
||||
abortMultipartUpload();
|
||||
/// abortMultipartUpload() might be called already, see processUploadPartRequest().
|
||||
/// However if there were concurrent uploads at that time, those part uploads might or might not succeed.
|
||||
/// As a result, it might be necessary to abort a given multipart upload multiple times in order to completely free
|
||||
/// all storage consumed by all parts.
|
||||
abortMultipartUpload();
|
||||
|
||||
std::rethrow_exception(task.exception);
|
||||
}
|
||||
|
||||
part_tags.push_back(task.tag);
|
||||
std::rethrow_exception(exception);
|
||||
}
|
||||
|
||||
const auto & tasks = TSA_SUPPRESS_WARNING_FOR_READ(bg_tasks);
|
||||
for (const auto & task : tasks)
|
||||
part_tags.push_back(task.tag);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -995,6 +995,10 @@ def main() -> int:
|
||||
ci_settings,
|
||||
args.skip_jobs,
|
||||
)
|
||||
|
||||
if IS_CI and pr_info.is_pr:
|
||||
ci_cache.filter_out_not_affected_jobs()
|
||||
|
||||
ci_cache.print_status()
|
||||
|
||||
if IS_CI and not pr_info.is_merge_queue:
|
||||
|
@ -674,6 +674,78 @@ class CiCache:
|
||||
bucket=S3_BUILDS_BUCKET, file_path=result_json_path, s3_path=s3_path
|
||||
)
|
||||
|
||||
def filter_out_not_affected_jobs(self):
|
||||
"""
|
||||
Filter is to be applied in PRs to remove jobs that are not affected by the change
|
||||
It removes jobs from @jobs_to_do if it is a:
|
||||
1. test job and it is in @jobs_to_wait (no need to wait not affected jobs in PRs)
|
||||
2. test job and it has finished on release branch (even if failed)
|
||||
2. build job which is not required by any test job that is left in @jobs_to_do
|
||||
|
||||
:return:
|
||||
"""
|
||||
# 1.
|
||||
remove_from_await_list = []
|
||||
for job_name, job_config in self.jobs_to_wait.items():
|
||||
if CI.is_test_job(job_name) and job_name != CI.JobNames.BUILD_CHECK:
|
||||
remove_from_await_list.append(job_name)
|
||||
for job in remove_from_await_list:
|
||||
print(f"Filter job [{job}] - test job and not affected by the change")
|
||||
del self.jobs_to_wait[job]
|
||||
del self.jobs_to_do[job]
|
||||
|
||||
# 2.
|
||||
remove_from_to_do = []
|
||||
for job_name, job_config in self.jobs_to_do.items():
|
||||
if CI.is_test_job(job_name) and job_name != CI.JobNames.BUILD_CHECK:
|
||||
batches_to_remove = []
|
||||
assert job_config.batches is not None
|
||||
for batch in job_config.batches:
|
||||
if self.is_failed(
|
||||
job_name, batch, job_config.num_batches, release_branch=True
|
||||
):
|
||||
print(
|
||||
f"Filter [{job_name}/{batch}] - not affected by the change (failed on release branch)"
|
||||
)
|
||||
batches_to_remove.append(batch)
|
||||
for batch in batches_to_remove:
|
||||
job_config.batches.remove(batch)
|
||||
if not job_config.batches:
|
||||
print(
|
||||
f"Filter [{job_name}] - not affected by the change (failed on release branch)"
|
||||
)
|
||||
remove_from_to_do.append(job_name)
|
||||
for job in remove_from_to_do:
|
||||
del self.jobs_to_do[job]
|
||||
|
||||
# 3.
|
||||
required_builds = [] # type: List[str]
|
||||
for job_name, job_config in self.jobs_to_do.items():
|
||||
if CI.is_test_job(job_name) and job_config.required_builds:
|
||||
required_builds += job_config.required_builds
|
||||
required_builds = list(set(required_builds))
|
||||
|
||||
remove_builds = [] # type: List[str]
|
||||
has_builds_to_do = False
|
||||
for job_name, job_config in self.jobs_to_do.items():
|
||||
if CI.is_build_job(job_name):
|
||||
if job_name not in required_builds:
|
||||
remove_builds.append(job_name)
|
||||
else:
|
||||
has_builds_to_do = True
|
||||
|
||||
for build_job in remove_builds:
|
||||
print(
|
||||
f"Filter build job [{build_job}] - not affected and not required by test jobs"
|
||||
)
|
||||
del self.jobs_to_do[build_job]
|
||||
if build_job in self.jobs_to_wait:
|
||||
del self.jobs_to_wait[build_job]
|
||||
|
||||
if not has_builds_to_do and CI.JobNames.BUILD_CHECK in self.jobs_to_do:
|
||||
print(f"Filter job [{CI.JobNames.BUILD_CHECK}] - no builds to do")
|
||||
del self.jobs_to_do[CI.JobNames.BUILD_CHECK]
|
||||
|
||||
def await_pending_jobs(self, is_release: bool, dry_run: bool = False) -> None:
|
||||
"""
|
||||
await pending jobs to be finished
|
||||
|
@ -378,7 +378,7 @@ class CommonJobConfigs:
|
||||
),
|
||||
run_command='functional_test_check.py "$CHECK_NAME"',
|
||||
runner_type=Runners.FUNC_TESTER,
|
||||
timeout=10800,
|
||||
timeout=7200,
|
||||
)
|
||||
STATEFUL_TEST = JobConfig(
|
||||
job_name_keyword="stateful",
|
||||
|
@ -108,6 +108,7 @@ def get_run_command(
|
||||
"--privileged "
|
||||
f"{ci_logs_args}"
|
||||
f"--volume={repo_path}/tests:/usr/share/clickhouse-test "
|
||||
f"--volume={repo_path}/utils/grpc-client:/usr/share/clickhouse-utils/grpc-client "
|
||||
f"{volume_with_broken_test}"
|
||||
f"--volume={result_path}:/test_output "
|
||||
f"--volume={server_log_path}:/var/log/clickhouse-server "
|
||||
|
@ -1,4 +1,5 @@
|
||||
"""Module to get the token for GitHub"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
import json
|
||||
import time
|
||||
|
@ -417,7 +417,7 @@ class TestCIConfig(unittest.TestCase):
|
||||
assert not ci_cache.jobs_to_skip
|
||||
assert not ci_cache.jobs_to_wait
|
||||
|
||||
# pretend there are pending jobs that we neet to wait
|
||||
# pretend there are pending jobs that we need to wait
|
||||
ci_cache.jobs_to_wait = dict(ci_cache.jobs_to_do)
|
||||
for job, config in ci_cache.jobs_to_wait.items():
|
||||
assert not config.pending_batches
|
||||
@ -489,3 +489,87 @@ class TestCIConfig(unittest.TestCase):
|
||||
self.assertCountEqual(
|
||||
list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip, all_jobs_in_wf
|
||||
)
|
||||
|
||||
def test_ci_py_filters_not_affected_jobs_in_prs(self):
|
||||
"""
|
||||
checks ci.py filters not affected jobs in PRs
|
||||
"""
|
||||
settings = CiSettings()
|
||||
settings.no_ci_cache = True
|
||||
pr_info = PRInfo(github_event=_TEST_EVENT_JSON)
|
||||
pr_info.event_type = EventType.PUSH
|
||||
pr_info.number = 0
|
||||
assert pr_info.is_release and not pr_info.is_merge_queue
|
||||
ci_cache = CIPY._configure_jobs(
|
||||
S3Helper(), pr_info, settings, skip_jobs=False, dry_run=True
|
||||
)
|
||||
self.assertTrue(not ci_cache.jobs_to_skip, "Must be no jobs in skip list")
|
||||
all_jobs_in_wf = list(ci_cache.jobs_to_do)
|
||||
assert not ci_cache.jobs_to_wait
|
||||
assert not ci_cache.jobs_to_skip
|
||||
|
||||
# pretend there are pending jobs that we need to wait
|
||||
for job, job_config in ci_cache.jobs_to_do.items():
|
||||
ci_cache.jobs_to_wait[job] = job_config
|
||||
|
||||
# remove couple tests from to_wait and
|
||||
# expect they are preserved in @jobs_to_to along with required package_asan
|
||||
del ci_cache.jobs_to_wait[CI.JobNames.STATELESS_TEST_ASAN]
|
||||
del ci_cache.jobs_to_wait[CI.JobNames.INTEGRATION_TEST_TSAN]
|
||||
del ci_cache.jobs_to_wait[CI.JobNames.STATELESS_TEST_MSAN]
|
||||
|
||||
# pretend we have some batches failed for one of the job from the to_do list
|
||||
failed_job = CI.JobNames.INTEGRATION_TEST_TSAN
|
||||
failed_job_config = ci_cache.jobs_to_do[failed_job]
|
||||
FAILED_BATCHES = [0, 3]
|
||||
for batch in FAILED_BATCHES:
|
||||
assert batch < failed_job_config.num_batches
|
||||
record = CiCache.Record(
|
||||
record_type=CiCache.RecordType.FAILED,
|
||||
job_name=failed_job,
|
||||
job_digest=ci_cache.job_digests[failed_job],
|
||||
batch=batch,
|
||||
num_batches=failed_job_config.num_batches,
|
||||
release_branch=True,
|
||||
)
|
||||
for record_t_, records_ in ci_cache.records.items():
|
||||
if record_t_.value == CiCache.RecordType.FAILED.value:
|
||||
records_[record.to_str_key()] = record
|
||||
|
||||
# pretend we have all batches failed for one of the job from the to_do list
|
||||
failed_job = CI.JobNames.STATELESS_TEST_MSAN
|
||||
failed_job_config = ci_cache.jobs_to_do[failed_job]
|
||||
assert failed_job_config.num_batches > 1
|
||||
for batch in range(failed_job_config.num_batches):
|
||||
record = CiCache.Record(
|
||||
record_type=CiCache.RecordType.FAILED,
|
||||
job_name=failed_job,
|
||||
job_digest=ci_cache.job_digests[failed_job],
|
||||
batch=batch,
|
||||
num_batches=failed_job_config.num_batches,
|
||||
release_branch=True,
|
||||
)
|
||||
for record_t_, records_ in ci_cache.records.items():
|
||||
if record_t_.value == CiCache.RecordType.FAILED.value:
|
||||
records_[record.to_str_key()] = record
|
||||
|
||||
ci_cache.filter_out_not_affected_jobs()
|
||||
expected_to_do = [
|
||||
CI.JobNames.STATELESS_TEST_ASAN,
|
||||
CI.BuildNames.PACKAGE_ASAN,
|
||||
CI.JobNames.INTEGRATION_TEST_TSAN,
|
||||
CI.BuildNames.PACKAGE_TSAN,
|
||||
CI.JobNames.BUILD_CHECK,
|
||||
]
|
||||
self.assertCountEqual(
|
||||
list(ci_cache.jobs_to_wait),
|
||||
[
|
||||
CI.BuildNames.PACKAGE_ASAN,
|
||||
CI.BuildNames.PACKAGE_TSAN,
|
||||
CI.JobNames.BUILD_CHECK,
|
||||
],
|
||||
)
|
||||
self.assertCountEqual(list(ci_cache.jobs_to_do), expected_to_do)
|
||||
self.assertTrue(ci_cache.jobs_to_do[CI.JobNames.INTEGRATION_TEST_TSAN].batches)
|
||||
for batch in ci_cache.jobs_to_do[CI.JobNames.INTEGRATION_TEST_TSAN].batches:
|
||||
self.assertTrue(batch not in FAILED_BATCHES)
|
||||
|
@ -1750,7 +1750,7 @@ class TestCase:
|
||||
return TestResult(
|
||||
self.name,
|
||||
TestStatus.FAIL,
|
||||
FailureReason.INTERNAL_QUERY_FAIL,
|
||||
FailureReason.TIMEOUT,
|
||||
total_time,
|
||||
self.add_info_about_settings(
|
||||
self.get_description_from_exception_info(sys.exc_info())
|
||||
@ -2189,11 +2189,26 @@ def run_tests_array(all_tests_with_params: Tuple[List[str], int, TestSuite, bool
|
||||
sys.stdout.flush()
|
||||
|
||||
while True:
|
||||
test_result = test_case.run(
|
||||
args, test_suite, client_options, server_logs_level
|
||||
)
|
||||
test_result = test_case.process_result(test_result, MESSAGES)
|
||||
if not test_result.need_retry:
|
||||
# This is the upper level timeout
|
||||
# It helps with completely frozen processes, like in case of gdb errors
|
||||
def timeout_handler(signum, frame):
|
||||
raise TimeoutError("Test execution timed out")
|
||||
|
||||
signal.signal(signal.SIGALRM, timeout_handler)
|
||||
signal.alarm(int(args.timeout * 1.1))
|
||||
test_result = None
|
||||
try:
|
||||
test_result = test_case.run(
|
||||
args, test_suite, client_options, server_logs_level
|
||||
)
|
||||
test_result = test_case.process_result(test_result, MESSAGES)
|
||||
break
|
||||
except TimeoutError:
|
||||
break
|
||||
finally:
|
||||
signal.alarm(0)
|
||||
|
||||
if not test_result or not test_result.need_retry:
|
||||
break
|
||||
restarted_tests.append(test_result)
|
||||
|
||||
@ -2452,6 +2467,10 @@ def override_envs(*args_, **kwargs):
|
||||
run_tests_array(*args_, **kwargs)
|
||||
|
||||
|
||||
def run_tests_process(*args, **kwargs):
|
||||
return run_tests_array(*args, **kwargs)
|
||||
|
||||
|
||||
def do_run_tests(jobs, test_suite: TestSuite):
|
||||
if jobs > 1 and len(test_suite.parallel_tests) > 0:
|
||||
print(
|
||||
@ -2475,39 +2494,70 @@ def do_run_tests(jobs, test_suite: TestSuite):
|
||||
# of failures will be nearly the same for all tests from the group.
|
||||
random.shuffle(test_suite.parallel_tests)
|
||||
|
||||
batch_size = max(1, len(test_suite.parallel_tests) // jobs)
|
||||
batch_size = max(1, (len(test_suite.parallel_tests) // jobs) + 1)
|
||||
parallel_tests_array = []
|
||||
for job in range(jobs):
|
||||
range_ = job * batch_size, job * batch_size + batch_size
|
||||
batch = test_suite.parallel_tests[range_[0] : range_[1]]
|
||||
parallel_tests_array.append((batch, batch_size, test_suite, True))
|
||||
|
||||
try:
|
||||
with multiprocessing.Pool(processes=jobs + 1) as pool:
|
||||
future = pool.map_async(run_tests_array, parallel_tests_array)
|
||||
processes = []
|
||||
|
||||
if args.run_sequential_tests_in_parallel:
|
||||
# Run parallel tests and sequential tests at the same time
|
||||
# Sequential tests will use different ClickHouse instance
|
||||
# In this process we can safely override values in `args` and `os.environ`
|
||||
future_seq = pool.map_async(
|
||||
override_envs,
|
||||
[
|
||||
(
|
||||
test_suite.sequential_tests,
|
||||
len(test_suite.sequential_tests),
|
||||
test_suite,
|
||||
False,
|
||||
)
|
||||
],
|
||||
)
|
||||
future_seq.wait()
|
||||
for test_batch in parallel_tests_array:
|
||||
process = multiprocessing.Process(
|
||||
target=run_tests_process, args=(test_batch,)
|
||||
)
|
||||
processes.append(process)
|
||||
process.start()
|
||||
|
||||
future.wait()
|
||||
finally:
|
||||
pool.terminate()
|
||||
pool.close()
|
||||
pool.join()
|
||||
if args.run_sequential_tests_in_parallel:
|
||||
# Run parallel tests and sequential tests at the same time
|
||||
# Sequential tests will use different ClickHouse instance
|
||||
# In this process we can safely override values in `args` and `os.environ`
|
||||
process = multiprocessing.Process(
|
||||
target=override_envs,
|
||||
args=(
|
||||
(
|
||||
test_suite.sequential_tests,
|
||||
len(test_suite.sequential_tests),
|
||||
test_suite,
|
||||
False,
|
||||
),
|
||||
),
|
||||
)
|
||||
processes.append(process)
|
||||
process.start()
|
||||
|
||||
while processes:
|
||||
sys.stdout.flush()
|
||||
# Periodically check the server for hangs
|
||||
# and stop all processes in this case
|
||||
try:
|
||||
clickhouse_execute(
|
||||
args,
|
||||
query="SELECT 1 /*hang up check*/",
|
||||
max_http_retries=5,
|
||||
timeout=20,
|
||||
)
|
||||
except Exception:
|
||||
print("Hang up check failed")
|
||||
server_died.set()
|
||||
|
||||
if server_died.is_set():
|
||||
print("Server died, terminating all processes...")
|
||||
kill_gdb_if_any()
|
||||
# Wait for test results
|
||||
sleep(args.timeout)
|
||||
for p in processes:
|
||||
if p.is_alive():
|
||||
p.terminate()
|
||||
break
|
||||
|
||||
for p in processes[:]:
|
||||
if not p.is_alive():
|
||||
processes.remove(p)
|
||||
|
||||
sleep(5)
|
||||
|
||||
if not args.run_sequential_tests_in_parallel:
|
||||
run_tests_array(
|
||||
@ -3358,6 +3408,14 @@ def parse_args():
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
class Terminated(KeyboardInterrupt):
|
||||
pass
|
||||
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
raise Terminated(f"Terminated with {sig} signal")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
stop_time = None
|
||||
exit_code = multiprocessing.Value("i", 0)
|
||||
@ -3369,6 +3427,9 @@ if __name__ == "__main__":
|
||||
# infinite tests processes left
|
||||
# (new process group is required to avoid killing some parent processes)
|
||||
os.setpgid(0, 0)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGHUP, signal_handler)
|
||||
|
||||
try:
|
||||
args = parse_args()
|
||||
|
3
tests/config/config.d/grpc_protocol.xml
Normal file
3
tests/config/config.d/grpc_protocol.xml
Normal file
@ -0,0 +1,3 @@
|
||||
<clickhouse>
|
||||
<grpc_port>9100</grpc_port>
|
||||
</clickhouse>
|
@ -27,6 +27,7 @@ ln -sf $SRC_PATH/config.d/secure_ports.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/grpc_protocol.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/
|
||||
|
1
tests/queries/0_stateless/03203_grpc_protocol.reference
Normal file
1
tests/queries/0_stateless/03203_grpc_protocol.reference
Normal file
@ -0,0 +1 @@
|
||||
ok
|
14
tests/queries/0_stateless/03203_grpc_protocol.sh
Executable file
14
tests/queries/0_stateless/03203_grpc_protocol.sh
Executable file
@ -0,0 +1,14 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-fasttest
|
||||
# Tag no-fasttest: In fasttest, ENABLE_LIBRARIES=0, so the grpc library is not built
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
if [[ -z "$CLICKHOUSE_GRPC_CLIENT" ]]; then
|
||||
CLICKHOUSE_GRPC_CLIENT="$CURDIR/../../../utils/grpc-client/clickhouse-grpc-client.py"
|
||||
fi
|
||||
|
||||
# Simple test.
|
||||
$CLICKHOUSE_GRPC_CLIENT --query "SELECT 'ok'"
|
52
utils/grpc-client/generate_pb2.py
Executable file
52
utils/grpc-client/generate_pb2.py
Executable file
@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# This is a helper utility.
|
||||
# It generates files in the "pb2" folder using the protocol buffer compiler.
|
||||
# This script must be called manually after any change pf "clickhouse_grpc.proto"
|
||||
|
||||
import grpc_tools # pip3 install grpcio-tools
|
||||
|
||||
import os, shutil, subprocess
|
||||
|
||||
|
||||
# Settings.
|
||||
script_path = os.path.realpath(__file__)
|
||||
script_name = os.path.basename(script_path)
|
||||
script_dir = os.path.dirname(script_path)
|
||||
root_dir = os.path.abspath(os.path.join(script_dir, "../.."))
|
||||
|
||||
grpc_proto_dir = os.path.abspath(os.path.join(root_dir, "src/Server/grpc_protos"))
|
||||
grpc_proto_filename = "clickhouse_grpc.proto"
|
||||
|
||||
# Files in the "pb2" folder which will be generated by this script.
|
||||
pb2_filenames = ["clickhouse_grpc_pb2.py", "clickhouse_grpc_pb2_grpc.py"]
|
||||
pb2_dir = os.path.join(script_dir, "pb2")
|
||||
|
||||
|
||||
# Processes the protobuf schema with the protocol buffer compiler and generates the "pb2" folder.
|
||||
def generate_pb2():
|
||||
print(f"Generating files:")
|
||||
for pb2_filename in pb2_filenames:
|
||||
print(os.path.join(pb2_dir, pb2_filename))
|
||||
|
||||
os.makedirs(pb2_dir, exist_ok=True)
|
||||
|
||||
cmd = [
|
||||
"python3",
|
||||
"-m",
|
||||
"grpc_tools.protoc",
|
||||
"-I" + grpc_proto_dir,
|
||||
"--python_out=" + pb2_dir,
|
||||
"--grpc_python_out=" + pb2_dir,
|
||||
os.path.join(grpc_proto_dir, grpc_proto_filename),
|
||||
]
|
||||
subprocess.run(cmd)
|
||||
|
||||
for pb2_filename in pb2_filenames:
|
||||
assert os.path.exists(os.path.join(pb2_dir, pb2_filename))
|
||||
print("Done! (generate_pb2)")
|
||||
|
||||
|
||||
# MAIN
|
||||
if __name__ == "__main__":
|
||||
generate_pb2()
|
@ -1,29 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import grpc_tools # pip3 install grpcio-tools
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
|
||||
script_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
dest_dir = script_dir
|
||||
src_dir = os.path.abspath(os.path.join(script_dir, "../../../src/Server/grpc_protos"))
|
||||
src_filename = "clickhouse_grpc.proto"
|
||||
|
||||
|
||||
def generate():
|
||||
cmd = [
|
||||
"python3",
|
||||
"-m",
|
||||
"grpc_tools.protoc",
|
||||
"-I" + src_dir,
|
||||
"--python_out=" + dest_dir,
|
||||
"--grpc_python_out=" + dest_dir,
|
||||
os.path.join(src_dir, src_filename),
|
||||
]
|
||||
subprocess.run(cmd)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
generate()
|
Loading…
Reference in New Issue
Block a user