Merge branch 'master' into fix-group-by-use-null-subquery-scope

This commit is contained in:
Nikolai Kochetov 2024-07-15 19:44:35 +02:00
commit 746b6f4b58
14 changed files with 209 additions and 62 deletions

View File

@ -102,6 +102,8 @@ jobs:
--job-name '${{inputs.test_name}}' \ --job-name '${{inputs.test_name}}' \
--run \ --run \
--run-command '''${{inputs.run_command}}''' --run-command '''${{inputs.run_command}}'''
# shellcheck disable=SC2319
echo "JOB_EXIT_CODE=$?" >> "$GITHUB_ENV"
- name: Post run - name: Post run
if: ${{ !cancelled() }} if: ${{ !cancelled() }}
run: | run: |

View File

@ -25,7 +25,7 @@ source /utils.lib
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --silent --inMemoryPersistence & azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --silent --inMemoryPersistence &
./setup_minio.sh stateful ./setup_minio.sh stateful
./mc admin trace clickminio > /test_output/rubbish.log & ./mc admin trace clickminio > /test_output/minio.log &
MC_ADMIN_PID=$! MC_ADMIN_PID=$!
config_logs_export_cluster /etc/clickhouse-server/config.d/system_logs_export.yaml config_logs_export_cluster /etc/clickhouse-server/config.d/system_logs_export.yaml

View File

@ -54,7 +54,7 @@ source /utils.lib
/usr/share/clickhouse-test/config/install.sh /usr/share/clickhouse-test/config/install.sh
./setup_minio.sh stateless ./setup_minio.sh stateless
m./c admin trace clickminio > /test_output/rubbish.log & ./mc admin trace clickminio > /test_output/minio.log &
MC_ADMIN_PID=$! MC_ADMIN_PID=$!
./setup_hdfs_minicluster.sh ./setup_hdfs_minicluster.sh

View File

@ -10,7 +10,7 @@ cd hadoop-3.3.1
export JAVA_HOME=/usr export JAVA_HOME=/usr
mkdir -p target/test/data mkdir -p target/test/data
chown clickhouse ./target/test/data chown clickhouse ./target/test/data
sudo -E -u clickhouse bin/mapred minicluster -format -nomr -nnport 12222 >> /test_output/garbage.log 2>&1 & sudo -E -u clickhouse bin/mapred minicluster -format -nomr -nnport 12222 >> /test_output/hdfs_minicluster.log 2>&1 &
while ! nc -z localhost 12222; do while ! nc -z localhost 12222; do
sleep 1 sleep 1

View File

@ -1068,6 +1068,15 @@ def main() -> int:
if build_result: if build_result:
if build_result.status == SUCCESS: if build_result.status == SUCCESS:
previous_status = build_result.status previous_status = build_result.status
JobReport(
status=SUCCESS,
description="",
test_results=[],
start_time="",
duration=0.0,
additional_files=[],
job_skipped=True,
).dump()
else: else:
# FIXME: Consider reusing failures for build jobs. # FIXME: Consider reusing failures for build jobs.
# Just remove this if/else - that makes build job starting and failing immediately # Just remove this if/else - that makes build job starting and failing immediately
@ -1265,12 +1274,17 @@ def main() -> int:
elif job_report.pre_report: elif job_report.pre_report:
print(f"ERROR: Job was killed - generate evidence") print(f"ERROR: Job was killed - generate evidence")
job_report.update_duration() job_report.update_duration()
# Job was killed! ret_code = os.getenv("JOB_EXIT_CODE", "")
if ret_code:
try:
job_report.exit_code = int(ret_code)
except ValueError:
pass
if Utils.is_killed_with_oom(): if Utils.is_killed_with_oom():
print("WARNING: OOM while job execution") print("WARNING: OOM while job execution")
error = f"Out Of Memory, exit_code {job_report.exit_code}, after {job_report.duration}s" error = f"Out Of Memory, exit_code {job_report.exit_code}, after {int(job_report.duration)}s"
else: else:
error = f"Unknown, exit_code {job_report.exit_code}, after {job_report.duration}s" error = f"Unknown, exit_code {job_report.exit_code}, after {int(job_report.duration)}s"
CIBuddy().post_error(error, job_name=_get_ext_check_name(args.job_name)) CIBuddy().post_error(error, job_name=_get_ext_check_name(args.job_name))
if CI.is_test_job(args.job_name): if CI.is_test_job(args.job_name):
gh = GitHub(get_best_robot_token(), per_page=100) gh = GitHub(get_best_robot_token(), per_page=100)

View File

@ -26,6 +26,7 @@ class CIBuddy:
self.pr_number = pr_info.number self.pr_number = pr_info.number
self.head_ref = pr_info.head_ref self.head_ref = pr_info.head_ref
self.commit_url = pr_info.commit_html_url self.commit_url = pr_info.commit_html_url
self.sha = pr_info.sha[:10]
@staticmethod @staticmethod
def _get_webhooks(): def _get_webhooks():
@ -69,8 +70,10 @@ class CIBuddy:
line_err = f":red_circle: *Error: {error_description}*\n\n" line_err = f":red_circle: *Error: {error_description}*\n\n"
line_ghr = f" *Runner:* `{instance_type}`, `{instance_id}`\n" line_ghr = f" *Runner:* `{instance_type}`, `{instance_id}`\n"
line_job = f" *Job:* `{job_name}`\n" line_job = f" *Job:* `{job_name}`\n"
line_pr_ = f" *PR:* <https://github.com/{self.repo}/pull/{self.pr_number}|#{self.pr_number}>\n" line_pr_ = f" *PR:* <https://github.com/{self.repo}/pull/{self.pr_number}|#{self.pr_number}>, <{self.commit_url}|{self.sha}>\n"
line_br_ = f" *Branch:* `{self.head_ref}`, <{self.commit_url}|commit>\n" line_br_ = (
f" *Branch:* `{self.head_ref}`, <{self.commit_url}|{self.sha}>\n"
)
message = line_err message = line_err
message += line_job message += line_job
if with_instance_info: if with_instance_info:
@ -85,4 +88,4 @@ class CIBuddy:
if __name__ == "__main__": if __name__ == "__main__":
# test # test
buddy = CIBuddy(dry_run=True) buddy = CIBuddy(dry_run=True)
buddy.post_error("Out of memory") buddy.post_error("TEst")

View File

@ -763,22 +763,13 @@ class CiCache:
# TIMEOUT * MAX_ROUNDS_TO_WAIT must be less than 6h (GH job timeout) with a room for rest RunConfig work # TIMEOUT * MAX_ROUNDS_TO_WAIT must be less than 6h (GH job timeout) with a room for rest RunConfig work
TIMEOUT = 3000 # 50 min TIMEOUT = 3000 # 50 min
MAX_ROUNDS_TO_WAIT = 6 MAX_ROUNDS_TO_WAIT = 6
MAX_JOB_NUM_TO_WAIT = 3
round_cnt = 0 round_cnt = 0
def _has_build_job():
for job in self.jobs_to_wait:
if CI.is_build_job(job):
return True
return False
if not is_release: if not is_release:
# in PRs we can wait only for builds, TIMEOUT*MAX_ROUNDS_TO_WAIT=100min is enough # in PRs we can wait only for builds, TIMEOUT*MAX_ROUNDS_TO_WAIT=100min is enough
MAX_ROUNDS_TO_WAIT = 2 MAX_ROUNDS_TO_WAIT = 2
while ( while round_cnt < MAX_ROUNDS_TO_WAIT:
len(self.jobs_to_wait) > MAX_JOB_NUM_TO_WAIT or _has_build_job()
) and round_cnt < MAX_ROUNDS_TO_WAIT:
round_cnt += 1 round_cnt += 1
GHActions.print_in_group( GHActions.print_in_group(
f"Wait pending jobs, round [{round_cnt}/{MAX_ROUNDS_TO_WAIT}]:", f"Wait pending jobs, round [{round_cnt}/{MAX_ROUNDS_TO_WAIT}]:",
@ -820,6 +811,10 @@ class CiCache:
f"Job [{job_name}_[{batch}/{num_batches}]] is not pending anymore" f"Job [{job_name}_[{batch}/{num_batches}]] is not pending anymore"
) )
job_config.batches.remove(batch) job_config.batches.remove(batch)
if not job_config.batches:
print(f"Remove job [{job_name}] from jobs_to_do")
self.jobs_to_skip.append(job_name)
del self.jobs_to_do[job_name]
else: else:
print( print(
f"NOTE: Job [{job_name}:{batch}] finished failed - do not add to ready" f"NOTE: Job [{job_name}:{batch}] finished failed - do not add to ready"
@ -830,9 +825,7 @@ class CiCache:
await_finished.add(job_name) await_finished.add(job_name)
for job in await_finished: for job in await_finished:
self.jobs_to_skip.append(job)
del self.jobs_to_wait[job] del self.jobs_to_wait[job]
del self.jobs_to_do[job]
if not dry_run: if not dry_run:
expired_sec = int(time.time()) - start_at expired_sec = int(time.time()) - start_at

View File

@ -24,6 +24,18 @@ from tee_popen import TeePopen
NO_CHANGES_MSG = "Nothing to run" NO_CHANGES_MSG = "Nothing to run"
class SensitiveFormatter(logging.Formatter):
@staticmethod
def _filter(s):
return re.sub(
r"(.*)(AZURE_CONNECTION_STRING.*\')(.*)", r"\1AZURE_CONNECTION_STRING\3", s
)
def format(self, record):
original = logging.Formatter.format(self, record)
return self._filter(original)
def get_additional_envs( def get_additional_envs(
check_name: str, run_by_hash_num: int, run_by_hash_total: int check_name: str, run_by_hash_num: int, run_by_hash_total: int
) -> List[str]: ) -> List[str]:
@ -213,6 +225,9 @@ def parse_args():
def main(): def main():
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
for handler in logging.root.handlers:
# pylint: disable=protected-access
handler.setFormatter(SensitiveFormatter(handler.formatter._fmt)) # type: ignore
stopwatch = Stopwatch() stopwatch = Stopwatch()

View File

@ -260,18 +260,29 @@ def main():
failed_to_get_info = False failed_to_get_info = False
has_failed_statuses = False has_failed_statuses = False
for status in statuses: for status in statuses:
if not CI.is_required(status.context): if not CI.is_required(status.context) or status.context in (
CI.StatusNames.SYNC,
CI.StatusNames.PR_CHECK,
):
# CI.StatusNames.SYNC or CI.StatusNames.PR_CHECK should not be checked
continue continue
print(f"Check status [{status.context}], [{status.state}]")
if status.state == FAILURE: if status.state == FAILURE:
has_failed_statuses = True has_failed_statuses = True
failed_cnt = Utils.get_failed_tests_number(status.description) failed_cnt = Utils.get_failed_tests_number(status.description)
if failed_cnt is None: if failed_cnt is None:
failed_to_get_info = True failed_to_get_info = True
print(
f"WARNING: failed to get number of failed tests from [{status.description}]"
)
else: else:
if failed_cnt > max_failed_tests_per_job: if failed_cnt > max_failed_tests_per_job:
job_name_with_max_failures = status.context job_name_with_max_failures = status.context
max_failed_tests_per_job = failed_cnt max_failed_tests_per_job = failed_cnt
total_failed_tests += failed_cnt total_failed_tests += failed_cnt
print(
f"Failed test cases in [{status.context}] is [{failed_cnt}], total failures [{total_failed_tests}]"
)
elif status.state != SUCCESS and status.context not in ( elif status.state != SUCCESS and status.context not in (
CI.StatusNames.SYNC, CI.StatusNames.SYNC,
CI.StatusNames.PR_CHECK, CI.StatusNames.PR_CHECK,

View File

@ -3,6 +3,7 @@
import csv import csv
import logging import logging
import os import os
import re
import subprocess import subprocess
import sys import sys
from pathlib import Path from pathlib import Path
@ -19,6 +20,18 @@ from stopwatch import Stopwatch
from tee_popen import TeePopen from tee_popen import TeePopen
class SensitiveFormatter(logging.Formatter):
@staticmethod
def _filter(s):
return re.sub(
r"(.*)(AZURE_CONNECTION_STRING.*\')(.*)", r"\1AZURE_CONNECTION_STRING\3", s
)
def format(self, record):
original = logging.Formatter.format(self, record)
return self._filter(original)
def get_additional_envs(check_name: str) -> List[str]: def get_additional_envs(check_name: str) -> List[str]:
result = [] result = []
azure_connection_string = get_parameter_from_ssm("azure_connection_string") azure_connection_string = get_parameter_from_ssm("azure_connection_string")
@ -117,6 +130,9 @@ def process_results(
def run_stress_test(docker_image_name: str) -> None: def run_stress_test(docker_image_name: str) -> None:
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
for handler in logging.root.handlers:
# pylint: disable=protected-access
handler.setFormatter(SensitiveFormatter(handler.formatter._fmt)) # type: ignore
stopwatch = Stopwatch() stopwatch = Stopwatch()
temp_path = Path(TEMP_PATH) temp_path = Path(TEMP_PATH)

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import copy
import unittest import unittest
import random import random
@ -416,6 +416,30 @@ class TestCIConfig(unittest.TestCase):
""" """
checks ci.py job configuration checks ci.py job configuration
""" """
def _reset_ci_cache_to_wait_all_jobs(ci_cache):
# pretend there are pending jobs that we need to wait
ci_cache.jobs_to_wait = dict(ci_cache.jobs_to_do)
for job, config in ci_cache.jobs_to_wait.items():
assert config.batches
config.pending_batches = list(config.batches)
for batch in range(config.num_batches):
record = CiCache.Record(
record_type=CiCache.RecordType.PENDING,
job_name=job,
job_digest=ci_cache.job_digests[job],
batch=batch,
num_batches=config.num_batches,
release_branch=True,
)
for record_t_, records_ in ci_cache.records.items():
if record_t_.value == CiCache.RecordType.PENDING.value:
records_[record.to_str_key()] = record
assert not ci_cache.jobs_to_skip
assert ci_cache.jobs_to_wait
ci_cache.jobs_to_skip = []
settings = CiSettings() settings = CiSettings()
settings.no_ci_cache = True settings.no_ci_cache = True
pr_info = PRInfo(github_event=_TEST_EVENT_JSON) pr_info = PRInfo(github_event=_TEST_EVENT_JSON)
@ -432,26 +456,6 @@ class TestCIConfig(unittest.TestCase):
assert not ci_cache.jobs_to_skip assert not ci_cache.jobs_to_skip
assert not ci_cache.jobs_to_wait assert not ci_cache.jobs_to_wait
# pretend there are pending jobs that we need to wait
ci_cache.jobs_to_wait = dict(ci_cache.jobs_to_do)
for job, config in ci_cache.jobs_to_wait.items():
assert not config.pending_batches
assert config.batches
config.pending_batches = list(config.batches)
for job, config in ci_cache.jobs_to_wait.items():
for batch in range(config.num_batches):
record = CiCache.Record(
record_type=CiCache.RecordType.PENDING,
job_name=job,
job_digest=ci_cache.job_digests[job],
batch=batch,
num_batches=config.num_batches,
release_branch=True,
)
for record_t_, records_ in ci_cache.records.items():
if record_t_.value == CiCache.RecordType.PENDING.value:
records_[record.to_str_key()] = record
def _test_await_for_batch( def _test_await_for_batch(
ci_cache: CiCache, record_type: CiCache.RecordType, batch: int ci_cache: CiCache, record_type: CiCache.RecordType, batch: int
) -> None: ) -> None:
@ -477,32 +481,76 @@ class TestCIConfig(unittest.TestCase):
and batch < config_.num_batches and batch < config_.num_batches
): ):
assert batch not in config_.pending_batches assert batch not in config_.pending_batches
else:
assert batch in config_.pending_batches
for _, config_ in ci_cache.jobs_to_do.items(): for _, config_ in ci_cache.jobs_to_do.items():
# jobs to do must have batches to run before/after await # jobs to do must have batches to run before/after await
# if it's an empty list after await - apparently job has not been removed after await # if it's an empty list after await - apparently job has not been removed after await
assert config_.batches assert config_.batches
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 0) _reset_ci_cache_to_wait_all_jobs(ci_cache)
# check all one-batch jobs are in jobs_to_skip _test_await_for_batch(ci_cache, CiCache.RecordType.FAILED, 0)
for job in all_jobs_in_wf: tested = False
config = CI.JOB_CONFIGS[job] for job, config in ci_cache.jobs_to_do.items():
if config.num_batches == 1: if config.batches == [0]:
self.assertTrue(job in ci_cache.jobs_to_skip) tested = True
self.assertTrue(job not in ci_cache.jobs_to_do) self.assertTrue(
else: job not in ci_cache.jobs_to_wait,
self.assertTrue(job not in ci_cache.jobs_to_skip) "Job must be removed from @jobs_to_wait, because its only batch has FAILED cache record",
self.assertTrue(job in ci_cache.jobs_to_do) )
_test_await_for_batch(ci_cache, CiCache.RecordType.FAILED, 1)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 2)
self.assertTrue(len(ci_cache.jobs_to_skip) > 0)
self.assertTrue(len(ci_cache.jobs_to_do) > 0)
self.assertCountEqual( self.assertCountEqual(
list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip, all_jobs_in_wf ci_cache.jobs_to_skip,
[],
"No jobs must be skipped, since all cache records are of type FAILED",
)
assert tested
# reset jobs_to_wait after previous test
_reset_ci_cache_to_wait_all_jobs(ci_cache)
assert not ci_cache.jobs_to_skip
# set batch 0 as SUCCESSFUL in ci cache
jobs_to_do_prev = list(ci_cache.jobs_to_do)
jobs_to_skip_prev = []
jobs_to_wait_prev = list(ci_cache.jobs_to_wait)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 0)
self.assertTrue(len(jobs_to_skip_prev) != len(ci_cache.jobs_to_skip))
self.assertTrue(len(jobs_to_wait_prev) > len(ci_cache.jobs_to_wait))
self.assertCountEqual(
list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip,
jobs_to_do_prev + jobs_to_skip_prev,
)
# set batch 1 as SUCCESSFUL in ci cache
jobs_to_do_prev = list(ci_cache.jobs_to_do)
jobs_to_skip_prev = list(ci_cache.jobs_to_skip)
jobs_to_wait_prev = list(ci_cache.jobs_to_wait)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 1)
self.assertTrue(len(jobs_to_skip_prev) != len(ci_cache.jobs_to_skip))
self.assertTrue(len(jobs_to_wait_prev) > len(ci_cache.jobs_to_wait))
self.assertCountEqual(
list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip,
jobs_to_do_prev + jobs_to_skip_prev,
)
# set batch 3, 4, 5, 6 as SUCCESSFUL in ci cache
jobs_to_do_prev = list(ci_cache.jobs_to_do)
jobs_to_skip_prev = list(ci_cache.jobs_to_skip)
jobs_to_wait_prev = list(ci_cache.jobs_to_wait)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 2)
self.assertTrue(ci_cache.jobs_to_do)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 3)
self.assertTrue(ci_cache.jobs_to_do)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 4)
self.assertTrue(ci_cache.jobs_to_do)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 5)
self.assertTrue(
not ci_cache.jobs_to_do
) # by this moment there must be no jobs left as batch 5 is currently the maximum
self.assertTrue(len(jobs_to_skip_prev) != len(ci_cache.jobs_to_skip))
self.assertTrue(len(jobs_to_wait_prev) > len(ci_cache.jobs_to_wait))
self.assertCountEqual(
list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip,
jobs_to_do_prev + jobs_to_skip_prev,
) )
def test_ci_py_filters_not_affected_jobs_in_prs(self): def test_ci_py_filters_not_affected_jobs_in_prs(self):

View File

@ -6,6 +6,9 @@ INSERT INTO 02918_parallel_replicas SELECT toString(number), number % 4 FROM num
SET prefer_localhost_replica=0; SET prefer_localhost_replica=0;
--- if we try to query unavaialble replica, connection will be retried
--- but a warning log message will be printed out
SET send_logs_level='error';
-- { echoOn } -- { echoOn }
SELECT y, count() SELECT y, count()
FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas) FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas)
@ -26,5 +29,6 @@ GROUP BY y
ORDER BY y ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default'; SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default';
-- { echoOff } -- { echoOff }
SET send_logs_level='warning';
DROP TABLE 02918_parallel_replicas; DROP TABLE 02918_parallel_replicas;

View File

@ -0,0 +1,6 @@
this query used to be broken in old analyser:
c1 c2 yes true
this query worked:
yes true
experimental analyzer:
c1 c2 yes true

View File

@ -0,0 +1,35 @@
DROP TABLE IF EXISTS bugcheck1;
CREATE TABLE bugcheck1
ENGINE = MergeTree
ORDER BY tuple()
AS SELECT
'c1' as column_a,
'c2' as column_b;
select 'this query used to be broken in old analyser:';
SELECT *,
multiIf(column_b IN (SELECT 'c2' as someproduct), 'yes', 'no') AS condition_1,
multiIf(column_b = 'c2', 'true', 'false') AS condition_2
FROM (SELECT column_a, column_b FROM bugcheck1)
WHERE (condition_1 IN ('yes')) AND (condition_2 in ('true'))
settings allow_experimental_analyzer=0;
select 'this query worked:';
SELECT
multiIf(column_b IN (SELECT 'c2' as someproduct), 'yes', 'no') AS condition_1,
multiIf(column_b = 'c2', 'true', 'false') AS condition_2
FROM (SELECT column_a, column_b FROM bugcheck1)
WHERE (condition_1 IN ('yes')) AND (condition_2 in ('true'))
settings allow_experimental_analyzer=0;
select 'experimental analyzer:';
SELECT *,
multiIf(column_b IN (SELECT 'c2' as someproduct), 'yes', 'no') AS condition_1,
multiIf(column_b = 'c2', 'true', 'false') AS condition_2
FROM (SELECT column_a, column_b FROM bugcheck1)
WHERE (condition_1 IN ('yes')) AND (condition_2 in ('true'))
settings allow_experimental_analyzer=1;
DROP TABLE bugcheck1;