Merge branch 'master' into remove-system-functions-is-deterministic

This commit is contained in:
Alexey Milovidov 2024-07-18 03:36:03 +02:00
commit dde2f10c37
33 changed files with 322 additions and 201 deletions

View File

@ -26,7 +26,10 @@ RUN apt-get update \
zstd \
--yes --no-install-recommends \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /var/cache/debconf /tmp/*
&& rm -rf /var/lib/apt/lists/* /var/cache/debconf /tmp/* \
&& groupadd --system --gid 1000 clickhouse \
&& useradd --system --gid 1000 --uid 1000 -m clickhouse
# ^ For some reason, groupadd and useradd are needed for tests with 'expect', but I don't know, why.
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r /requirements.txt

View File

@ -9,7 +9,7 @@ trap 'kill $(jobs -pr) ||:' EXIT
stage=${stage:-}
# Compiler version, normally set by Dockerfile
export LLVM_VERSION=${LLVM_VERSION:-17}
export LLVM_VERSION=${LLVM_VERSION:-18}
# A variable to pass additional flags to CMake.
# Here we explicitly default it to nothing so that bash doesn't complain about

View File

@ -17,6 +17,7 @@ ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=18
RUN apt-get update \
&& apt-get install \
sudo \
apt-transport-https \
apt-utils \
ca-certificates \

View File

@ -185,6 +185,7 @@ You can pass parameters to `clickhouse-client` (all parameters have a default va
- `--format, -f` Use the specified default format to output the result.
- `--vertical, -E` If specified, use the [Vertical format](../interfaces/formats.md#vertical) by default to output the result. This is the same as `format=Vertical`. In this format, each value is printed on a separate line, which is helpful when displaying wide tables.
- `--time, -t` If specified, print the query execution time to stderr in non-interactive mode.
- `--memory-usage` If specified, print memory usage to stderr in non-interactive mode]. Possible values: 'none' - do not print memory usage, 'default' - print number of bytes, 'readable' - print memory usage in human-readable format.
- `--stacktrace` If specified, also print the stack trace if an exception occurs.
- `--config-file` The name of the configuration file.
- `--secure` If specified, will connect to server over secure connection (TLS). You might need to configure your CA certificates in the [configuration file](#configuration_files). The available configuration settings are the same as for [server-side TLS configuration](../operations/server-configuration-parameters/settings.md#server_configuration_parameters-openssl).
@ -339,7 +340,7 @@ clickhouse-client clickhouse://some_user%40some_mail.com@localhost:9000
Connect to one of provides hosts: `192.168.1.15`, `192.168.1.25`.
``` bash
clickhouse-client clickhouse://192.168.1.15,192.168.1.25
clickhouse-client clickhouse://192.168.1.15,192.168.1.25
```
### Configuration Files {#configuration_files}
@ -367,7 +368,7 @@ Example of a config file:
```
Or the same config in a YAML format:
```yaml
user: username
password: 'password'

View File

@ -76,7 +76,7 @@ WHERE macro = 'test';
└───────┴──────────────┘
```
## FQDN
## fqdn
Returns the fully qualified domain name of the ClickHouse server.
@ -86,7 +86,7 @@ Returns the fully qualified domain name of the ClickHouse server.
fqdn();
```
Aliases: `fullHostName`, 'FQDN'.
Aliases: `fullHostName`, `FQDN`.
**Returned value**

View File

@ -2070,9 +2070,18 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
progress_indication.writeFinalProgress();
output_stream << std::endl << std::endl;
}
else if (getClientConfiguration().getBool("print-time-to-stderr", false))
else
{
error_stream << progress_indication.elapsedSeconds() << "\n";
const auto & config = getClientConfiguration();
if (config.getBool("print-time-to-stderr", false))
error_stream << progress_indication.elapsedSeconds() << "\n";
const auto & print_memory_mode = config.getString("print-memory-to-stderr", "");
auto peak_memeory_usage = std::max<Int64>(progress_indication.getMemoryUsage().peak, 0);
if (print_memory_mode == "default")
error_stream << peak_memeory_usage << "\n";
else if (print_memory_mode == "readable")
error_stream << formatReadableSizeWithBinarySuffix(peak_memeory_usage) << "\n";
}
if (!is_interactive && getClientConfiguration().getBool("print-num-processed-rows", false))
@ -3036,6 +3045,7 @@ void ClientBase::init(int argc, char ** argv)
("disable_suggestion,A", "Disable loading suggestion data. Note that suggestion data is loaded asynchronously through a second connection to ClickHouse server. Also it is reasonable to disable suggestion if you want to paste a query with TAB characters. Shorthand option -A is for those who get used to mysql client.")
("wait_for_suggestions_to_load", "Load suggestion data synchonously.")
("time,t", "print query execution time to stderr in non-interactive mode (for benchmarks)")
("memory-usage", po::value<std::string>()->implicit_value("default")->default_value("none"), "print memory usage to stderr in non-interactive mode (for benchmarks). Values: 'none', 'default', 'readable'")
("echo", "in batch mode, print query before execution")
@ -3121,6 +3131,14 @@ void ClientBase::init(int argc, char ** argv)
/// Output execution time to stderr in batch mode.
if (options.count("time"))
getClientConfiguration().setBool("print-time-to-stderr", true);
if (options.count("memory-usage"))
{
const auto & memory_usage_mode = options["memory-usage"].as<std::string>();
if (memory_usage_mode != "none" && memory_usage_mode != "default" && memory_usage_mode != "readable")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown memory-usage mode: {}", memory_usage_mode);
getClientConfiguration().setString("print-memory-to-stderr", memory_usage_mode);
}
if (options.count("query"))
queries = options["query"].as<std::vector<std::string>>();
if (options.count("query_id"))

View File

@ -54,8 +54,6 @@ public:
struct ReplicaInfo
{
bool collaborate_with_initiator{false};
size_t all_replicas_count{0};
size_t number_of_current_replica{0};
};

View File

@ -142,13 +142,12 @@ void MultiplexedConnections::sendQuery(
modified_settings.group_by_two_level_threshold = 0;
modified_settings.group_by_two_level_threshold_bytes = 0;
}
}
if (replica_info)
{
client_info.collaborate_with_initiator = true;
client_info.count_participating_replicas = replica_info->all_replicas_count;
client_info.number_of_current_replica = replica_info->number_of_current_replica;
}
if (replica_info)
{
client_info.collaborate_with_initiator = true;
client_info.number_of_current_replica = replica_info->number_of_current_replica;
}
/// FIXME: Remove once we will make `allow_experimental_analyzer` obsolete setting.

View File

@ -72,11 +72,6 @@ public:
/// How much seconds passed since query execution start.
double elapsedSeconds() const { return getElapsedNanoseconds() / 1e9; }
void updateThreadEventData(HostToTimesMap & new_hosts_data);
private:
double getCPUUsage();
struct MemoryUsage
{
UInt64 total = 0;
@ -86,6 +81,11 @@ private:
MemoryUsage getMemoryUsage() const;
void updateThreadEventData(HostToTimesMap & new_hosts_data);
private:
double getCPUUsage();
UInt64 getElapsedNanoseconds() const;
/// This flag controls whether to show the progress bar. We start showing it after

View File

@ -1184,7 +1184,7 @@ private:
if (icolumn->size() != vec_to.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Argument column '{}' size {} doesn't match result column size {} of function {}",
icolumn->getName(), icolumn->size(), vec_to.size(), getName());
icolumn->getName(), icolumn->size(), vec_to.size(), getName());
if constexpr (Keyed)
if (key_cols.size() != vec_to.size() && key_cols.size() != 1)
@ -1223,6 +1223,9 @@ private:
else executeGeneric<first>(key_cols, icolumn, vec_to);
}
/// Return a fixed random-looking magic number when input is empty.
static constexpr auto filler = 0xe28dbde7fe22e41c;
void executeForArgument(const KeyColumnsType & key_cols, const IDataType * type, const IColumn * column, typename ColumnVector<ToType>::Container & vec_to, bool & is_first) const
{
/// Flattening of tuples.
@ -1231,6 +1234,11 @@ private:
const auto & tuple_columns = tuple->getColumns();
const DataTypes & tuple_types = typeid_cast<const DataTypeTuple &>(*type).getElements();
size_t tuple_size = tuple_columns.size();
if (0 == tuple_size && is_first)
for (auto & hash : vec_to)
hash = static_cast<ToType>(filler);
for (size_t i = 0; i < tuple_size; ++i)
executeForArgument(key_cols, tuple_types[i].get(), tuple_columns[i].get(), vec_to, is_first);
}
@ -1239,6 +1247,11 @@ private:
const auto & tuple_columns = tuple_const->getColumns();
const DataTypes & tuple_types = typeid_cast<const DataTypeTuple &>(*type).getElements();
size_t tuple_size = tuple_columns.size();
if (0 == tuple_size && is_first)
for (auto & hash : vec_to)
hash = static_cast<ToType>(filler);
for (size_t i = 0; i < tuple_size; ++i)
{
auto tmp = ColumnConst::create(tuple_columns[i], column->size());
@ -1300,10 +1313,7 @@ public:
constexpr size_t first_data_argument = Keyed;
if (arguments.size() <= first_data_argument)
{
/// Return a fixed random-looking magic number when input is empty
vec_to.assign(input_rows_count, static_cast<ToType>(0xe28dbde7fe22e41c));
}
vec_to.assign(input_rows_count, static_cast<ToType>(filler));
KeyColumnsType key_cols{};
if constexpr (Keyed)

View File

@ -114,6 +114,34 @@ namespace
else if (query.grantees)
user.grantees = *query.grantees;
}
time_t getValidUntilFromAST(ASTPtr valid_until, ContextPtr context)
{
if (context)
valid_until = evaluateConstantExpressionAsLiteral(valid_until, context);
const String valid_until_str = checkAndGetLiteralArgument<String>(valid_until, "valid_until");
if (valid_until_str == "infinity")
return 0;
time_t time = 0;
ReadBufferFromString in(valid_until_str);
if (context)
{
const auto & time_zone = DateLUT::instance("");
const auto & utc_time_zone = DateLUT::instance("UTC");
parseDateTimeBestEffort(time, in, time_zone, utc_time_zone);
}
else
{
readDateTimeText(time, in);
}
return time;
}
}
BlockIO InterpreterCreateUserQuery::execute()
@ -134,23 +162,7 @@ BlockIO InterpreterCreateUserQuery::execute()
std::optional<time_t> valid_until;
if (query.valid_until)
{
const ASTPtr valid_until_literal = evaluateConstantExpressionAsLiteral(query.valid_until, getContext());
const String valid_until_str = checkAndGetLiteralArgument<String>(valid_until_literal, "valid_until");
time_t time = 0;
if (valid_until_str != "infinity")
{
const auto & time_zone = DateLUT::instance("");
const auto & utc_time_zone = DateLUT::instance("UTC");
ReadBufferFromString in(valid_until_str);
parseDateTimeBestEffort(time, in, time_zone, utc_time_zone);
}
valid_until = time;
}
valid_until = getValidUntilFromAST(query.valid_until, getContext());
std::optional<RolesOrUsersSet> default_roles_from_query;
if (query.default_roles)
@ -259,7 +271,11 @@ void InterpreterCreateUserQuery::updateUserFromQuery(User & user, const ASTCreat
if (query.auth_data)
auth_data = AuthenticationData::fromAST(*query.auth_data, {}, !query.attach);
updateUserFromQueryImpl(user, query, auth_data, {}, {}, {}, {}, {}, allow_no_password, allow_plaintext_password, true);
std::optional<time_t> valid_until;
if (query.valid_until)
valid_until = getValidUntilFromAST(query.valid_until, {});
updateUserFromQueryImpl(user, query, auth_data, {}, {}, {}, {}, valid_until, allow_no_password, allow_plaintext_password, true);
}
void registerInterpreterCreateUserQuery(InterpreterFactory & factory)

View File

@ -95,7 +95,7 @@ void ClientInfo::write(WriteBuffer & out, UInt64 server_protocol_revision) const
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS)
{
writeVarUInt(static_cast<UInt64>(collaborate_with_initiator), out);
writeVarUInt(count_participating_replicas, out);
writeVarUInt(obsolete_count_participating_replicas, out);
writeVarUInt(number_of_current_replica, out);
}
}
@ -185,7 +185,7 @@ void ClientInfo::read(ReadBuffer & in, UInt64 client_protocol_revision)
UInt64 value;
readVarUInt(value, in);
collaborate_with_initiator = static_cast<bool>(value);
readVarUInt(count_participating_replicas, in);
readVarUInt(obsolete_count_participating_replicas, in);
readVarUInt(number_of_current_replica, in);
}
}

View File

@ -127,7 +127,7 @@ public:
/// For parallel processing on replicas
bool collaborate_with_initiator{false};
UInt64 count_participating_replicas{0};
UInt64 obsolete_count_participating_replicas{0};
UInt64 number_of_current_replica{0};
enum class BackgroundOperationType : uint8_t

View File

@ -5001,13 +5001,6 @@ void Context::setConnectionClientVersion(UInt64 client_version_major, UInt64 cli
client_info.connection_tcp_protocol_version = client_tcp_protocol_version;
}
void Context::setReplicaInfo(bool collaborate_with_initiator, size_t all_replicas_count, size_t number_of_current_replica)
{
client_info.collaborate_with_initiator = collaborate_with_initiator;
client_info.count_participating_replicas = all_replicas_count;
client_info.number_of_current_replica = number_of_current_replica;
}
void Context::increaseDistributedDepth()
{
++client_info.distributed_depth;

View File

@ -699,7 +699,6 @@ public:
void setInitialQueryStartTime(std::chrono::time_point<std::chrono::system_clock> initial_query_start_time);
void setQuotaClientKey(const String & quota_key);
void setConnectionClientVersion(UInt64 client_version_major, UInt64 client_version_minor, UInt64 client_version_patch, unsigned client_tcp_protocol_version);
void setReplicaInfo(bool collaborate_with_initiator, size_t all_replicas_count, size_t number_of_current_replica);
void increaseDistributedDepth();
const OpenTelemetry::TracingContext & getClientTraceContext() const { return client_info.client_trace_context; }
OpenTelemetry::TracingContext & getClientTraceContext() { return client_info.client_trace_context; }

View File

@ -412,8 +412,8 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
const auto & shard = cluster->getShardsInfo().at(0);
size_t all_replicas_count = current_settings.max_parallel_replicas;
if (all_replicas_count > shard.getAllNodeCount())
size_t max_replicas_to_use = current_settings.max_parallel_replicas;
if (max_replicas_to_use > shard.getAllNodeCount())
{
LOG_INFO(
getLogger("ReadFromParallelRemoteReplicasStep"),
@ -421,14 +421,14 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
"Will use the latter number to execute the query.",
current_settings.max_parallel_replicas,
shard.getAllNodeCount());
all_replicas_count = shard.getAllNodeCount();
max_replicas_to_use = shard.getAllNodeCount();
}
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> shuffled_pool;
if (all_replicas_count < shard.getAllNodeCount())
if (max_replicas_to_use < shard.getAllNodeCount())
{
shuffled_pool = shard.pool->getShuffledPools(current_settings);
shuffled_pool.resize(all_replicas_count);
shuffled_pool.resize(max_replicas_to_use);
}
else
{
@ -438,11 +438,10 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func);
}
for (size_t i=0; i < all_replicas_count; ++i)
for (size_t i=0; i < max_replicas_to_use; ++i)
{
IConnections::ReplicaInfo replica_info
{
.all_replicas_count = all_replicas_count,
/// we should use this number specifically because efficiency of data distribution by consistent hash depends on it.
.number_of_current_replica = i,
};

View File

@ -622,8 +622,9 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response
{
LOG_INFO(
log,
"Total time to process a request took too long ({}ms).\nRequest info: {}",
elapsed,
"Total time to process a request in session {} took too long ({}ms).\nRequest info: {}",
session_id,
elapsed_ms,
request->toString(/*short_format=*/true));
}

View File

@ -99,7 +99,8 @@ def set_capacity(
continue
raise ValueError("Queue status is not in ['in_progress', 'queued']")
scale_down, scale_up = get_scales(runner_type)
# scale_down, scale_up = get_scales(runner_type)
_, scale_up = get_scales(runner_type)
# With lyfecycle hooks some instances are actually free because some of
# them are in 'Terminating:Wait' state
effective_capacity = max(
@ -110,7 +111,7 @@ def set_capacity(
# How much nodes are free (positive) or need to be added (negative)
capacity_reserve = effective_capacity - running - queued
stop = False
if capacity_reserve < 0:
if capacity_reserve <= 0:
# This part is about scaling up
capacity_deficit = -capacity_reserve
# It looks that we are still OK, since no queued jobs exist
@ -158,41 +159,43 @@ def set_capacity(
)
return
# Now we will calculate if we need to scale down
stop = stop or asg["DesiredCapacity"] == asg["MinSize"]
new_capacity = asg["DesiredCapacity"] - (capacity_reserve // scale_down)
new_capacity = max(new_capacity, asg["MinSize"])
new_capacity = min(new_capacity, asg["MaxSize"])
stop = stop or asg["DesiredCapacity"] == new_capacity
if stop:
logging.info(
"Do not decrease ASG %s capacity, current capacity=%s, effective "
"capacity=%s, minimum capacity=%s, running jobs=%s, queue size=%s",
asg["AutoScalingGroupName"],
asg["DesiredCapacity"],
effective_capacity,
asg["MinSize"],
running,
queued,
)
return
logging.info(
"The ASG %s capacity will be decreased to %s, current capacity=%s, effective "
"capacity=%s, minimum capacity=%s, running jobs=%s, queue size=%s",
asg["AutoScalingGroupName"],
new_capacity,
asg["DesiredCapacity"],
effective_capacity,
asg["MinSize"],
running,
queued,
)
if not dry_run:
client.set_desired_capacity(
AutoScalingGroupName=asg["AutoScalingGroupName"],
DesiredCapacity=new_capacity,
)
# FIXME: try decreasing capacity from runners that finished their jobs and have no job assigned
# IMPORTANT: Runner init script must be of version that supports ASG decrease
# # Now we will calculate if we need to scale down
# stop = stop or asg["DesiredCapacity"] == asg["MinSize"]
# new_capacity = asg["DesiredCapacity"] - (capacity_reserve // scale_down)
# new_capacity = max(new_capacity, asg["MinSize"])
# new_capacity = min(new_capacity, asg["MaxSize"])
# stop = stop or asg["DesiredCapacity"] == new_capacity
# if stop:
# logging.info(
# "Do not decrease ASG %s capacity, current capacity=%s, effective "
# "capacity=%s, minimum capacity=%s, running jobs=%s, queue size=%s",
# asg["AutoScalingGroupName"],
# asg["DesiredCapacity"],
# effective_capacity,
# asg["MinSize"],
# running,
# queued,
# )
# return
#
# logging.info(
# "The ASG %s capacity will be decreased to %s, current capacity=%s, effective "
# "capacity=%s, minimum capacity=%s, running jobs=%s, queue size=%s",
# asg["AutoScalingGroupName"],
# new_capacity,
# asg["DesiredCapacity"],
# effective_capacity,
# asg["MinSize"],
# running,
# queued,
# )
# if not dry_run:
# client.set_desired_capacity(
# AutoScalingGroupName=asg["AutoScalingGroupName"],
# DesiredCapacity=new_capacity,
# )
def main(dry_run: bool = True) -> None:

View File

@ -97,19 +97,34 @@ class TestSetCapacity(unittest.TestCase):
),
TestCase("lower-min", 10, 5, 20, [Queue("queued", 5, "lower-min")], 10),
# Decrease capacity
TestCase("w/reserve", 1, 13, 20, [Queue("queued", 5, "w/reserve")], 5),
# FIXME: Tests changed for lambda that can only scale up
# TestCase("w/reserve", 1, 13, 20, [Queue("queued", 5, "w/reserve")], 5),
TestCase("w/reserve", 1, 13, 20, [Queue("queued", 5, "w/reserve")], -1),
# TestCase(
# "style-checker", 1, 13, 20, [Queue("queued", 5, "style-checker")], 5
# ),
TestCase(
"style-checker", 1, 13, 20, [Queue("queued", 5, "style-checker")], 5
"style-checker", 1, 13, 20, [Queue("queued", 5, "style-checker")], -1
),
TestCase("w/reserve", 1, 23, 20, [Queue("queued", 17, "w/reserve")], 17),
TestCase("decrease", 1, 13, 20, [Queue("in_progress", 3, "decrease")], 3),
# TestCase("w/reserve", 1, 23, 20, [Queue("queued", 17, "w/reserve")], 17),
TestCase("w/reserve", 1, 23, 20, [Queue("queued", 17, "w/reserve")], -1),
# TestCase("decrease", 1, 13, 20, [Queue("in_progress", 3, "decrease")], 3),
TestCase("decrease", 1, 13, 20, [Queue("in_progress", 3, "decrease")], -1),
# TestCase(
# "style-checker",
# 1,
# 13,
# 20,
# [Queue("in_progress", 5, "style-checker")],
# 5,
# ),
TestCase(
"style-checker",
1,
13,
20,
[Queue("in_progress", 5, "style-checker")],
5,
-1,
),
)
for t in test_cases:

View File

@ -415,7 +415,8 @@ class CI:
JobNames.INTEGRATION_TEST_FLAKY: CommonJobConfigs.INTEGRATION_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_ASAN],
pr_only=True,
reference_job_name=JobNames.INTEGRATION_TEST_TSAN,
# TODO: approach with reference job names does not work because digest may not be calculated if job skipped in wf
# reference_job_name=JobNames.INTEGRATION_TEST_TSAN,
),
JobNames.COMPATIBILITY_TEST: CommonJobConfigs.COMPATIBILITY_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_RELEASE],
@ -460,7 +461,8 @@ class CI:
required_builds=[BuildNames.PACKAGE_ASAN],
pr_only=True,
timeout=3600,
reference_job_name=JobNames.STATELESS_TEST_RELEASE,
# TODO: approach with reference job names does not work because digest may not be calculated if job skipped in wf
# reference_job_name=JobNames.STATELESS_TEST_RELEASE,
),
JobNames.JEPSEN_KEEPER: JobConfig(
required_builds=[BuildNames.BINARY_RELEASE],

View File

@ -17,9 +17,19 @@ from download_release_packages import download_last_release
from env_helper import REPO_COPY, REPORT_PATH, TEMP_PATH
from get_robot_token import get_parameter_from_ssm
from pr_info import PRInfo
from report import ERROR, SUCCESS, JobReport, StatusType, TestResults, read_test_results
from report import (
ERROR,
SUCCESS,
JobReport,
StatusType,
TestResults,
read_test_results,
FAILURE,
)
from stopwatch import Stopwatch
from tee_popen import TeePopen
from ci_config import CI
from ci_utils import Utils
NO_CHANGES_MSG = "Nothing to run"
@ -351,7 +361,23 @@ def main():
additional_files=additional_logs,
).dump(to_file=args.report_to_file if args.report_to_file else None)
should_block_ci = False
if state != SUCCESS:
should_block_ci = True
if state == FAILURE and CI.is_required(check_name):
failed_cnt = Utils.get_failed_tests_number(description)
print(
f"Job status is [{state}] with [{failed_cnt}] failed test cases. status description [{description}]"
)
if (
failed_cnt
and failed_cnt <= CI.MAX_TOTAL_FAILURES_PER_JOB_BEFORE_BLOCKING_CI
):
print(f"Won't block the CI workflow")
should_block_ci = False
if should_block_ci:
sys.exit(1)

View File

@ -23,10 +23,13 @@ from report import (
TestResult,
TestResults,
read_test_results,
FAILURE,
)
from stopwatch import Stopwatch
import integration_tests_runner as runner
from ci_config import CI
from ci_utils import Utils
def get_json_params_dict(
@ -233,7 +236,23 @@ def main():
additional_files=additional_logs,
).dump(to_file=args.report_to_file if args.report_to_file else None)
should_block_ci = False
if state != SUCCESS:
should_block_ci = True
if state == FAILURE and CI.is_required(check_name):
failed_cnt = Utils.get_failed_tests_number(description)
print(
f"Job status is [{state}] with [{failed_cnt}] failed test cases. status description [{description}]"
)
if (
failed_cnt
and failed_cnt <= CI.MAX_TOTAL_FAILURES_PER_JOB_BEFORE_BLOCKING_CI
):
print(f"Won't block the CI workflow")
should_block_ci = False
if should_block_ci:
sys.exit(1)

View File

@ -3,48 +3,27 @@
import re
from typing import Tuple
# Individual trusted contirbutors who are not in any trusted organization.
# Individual trusted contributors who are not in any trusted organization.
# Can be changed in runtime: we will append users that we learned to be in
# a trusted org, to save GitHub API calls.
TRUSTED_CONTRIBUTORS = {
e.lower()
for e in [
"achimbab", # Kakao corp
"Algunenano", # Raúl Marín, ClickHouse, Inc
"amosbird",
"azat", # SEMRush
"bharatnc", # Many contributions.
"bobrik", # Seasoned contributor, CloudFlare
"cwurm", # ClickHouse, Inc
"den-crane", # Documentation contributor
"hagen1778", # Roman Khavronenko, seasoned contributor
"hczhcz",
"hexiaoting", # Seasoned contributor
"ildus", # adjust, ex-pgpro
"javisantana", # a Spanish ClickHouse enthusiast, ex-Carto
"kreuzerkrieg",
"nikvas0",
"nvartolomei", # Seasoned contributor, CloudFlare
"spongedu", # Seasoned contributor
"taiyang-li",
"ucasFL", # Amos Bird's friend
"vdimir", # ClickHouse, Inc
"YiuRULE",
"zlobober", # Developer of YT
"ilejn", # Arenadata, responsible for Kerberized Kafka
"thomoco", # ClickHouse, Inc
"BoloniniD", # Seasoned contributor, HSE
"tonickkozlov", # Cloudflare
"tylerhannan", # ClickHouse, Inc
"myrrc", # Mike Kot, DoubleCloud
"thevar1able", # ClickHouse, Inc
"aalexfvk",
"MikhailBurdukov",
"tsolodov", # ClickHouse, Inc
"kitaisreal",
"k-morozov", # Konstantin Morozov, Yandex Cloud
"justindeguzman", # ClickHouse, Inc
"jrdi", # ClickHouse contributor, TinyBird
"XuJia0210", # ClickHouse, Inc
]
}

View File

@ -27,7 +27,6 @@ from report import SUCCESS, FAILURE
from env_helper import GITHUB_UPSTREAM_REPOSITORY, GITHUB_REPOSITORY
from synchronizer_utils import SYNC_BRANCH_PREFIX
from ci_config import CI
from ci_utils import Utils
# The team name for accepted approvals
TEAM_NAME = getenv("GITHUB_TEAM_NAME", "core")
@ -249,74 +248,22 @@ def main():
repo = gh.get_repo(args.repo)
if args.set_ci_status:
# set Mergeable check status and exit
assert args.wf_status in (FAILURE, SUCCESS)
# set mergeable check status and exit
commit = get_commit(gh, args.pr_info.sha)
statuses = get_commit_filtered_statuses(commit)
max_failed_tests_per_job = 0
job_name_with_max_failures = None
total_failed_tests = 0
failed_to_get_info = False
has_failed_statuses = False
for status in statuses:
if not CI.is_required(status.context) or status.context in (
CI.StatusNames.SYNC,
CI.StatusNames.PR_CHECK,
):
# CI.StatusNames.SYNC or CI.StatusNames.PR_CHECK should not be checked
continue
print(f"Check status [{status.context}], [{status.state}]")
if status.state == FAILURE:
if CI.is_required(status.context) and status.state != SUCCESS:
print(f"WARNING: Failed status [{status.context}], [{status.state}]")
has_failed_statuses = True
failed_cnt = Utils.get_failed_tests_number(status.description)
if failed_cnt is None:
failed_to_get_info = True
print(
f"WARNING: failed to get number of failed tests from [{status.description}]"
)
else:
if failed_cnt > max_failed_tests_per_job:
job_name_with_max_failures = status.context
max_failed_tests_per_job = failed_cnt
total_failed_tests += failed_cnt
print(
f"Failed test cases in [{status.context}] is [{failed_cnt}], total failures [{total_failed_tests}]"
)
elif status.state != SUCCESS and status.context not in (
CI.StatusNames.SYNC,
CI.StatusNames.PR_CHECK,
):
# do not block CI on failures in (CI.StatusNames.SYNC, CI.StatusNames.PR_CHECK)
has_failed_statuses = True
print(
f"Unexpected status for [{status.context}]: [{status.state}] - block further testing"
)
failed_to_get_info = True
can_continue = True
if total_failed_tests > CI.MAX_TOTAL_FAILURES_BEFORE_BLOCKING_CI:
print(
f"Required check has [{total_failed_tests}] failed - block further testing"
)
can_continue = False
if max_failed_tests_per_job > CI.MAX_TOTAL_FAILURES_PER_JOB_BEFORE_BLOCKING_CI:
print(
f"Job [{job_name_with_max_failures}] has [{max_failed_tests_per_job}] failures - block further testing"
)
can_continue = False
if failed_to_get_info:
print("Unexpected commit status state - block further testing")
can_continue = False
if args.wf_status != SUCCESS and not has_failed_statuses:
# workflow failed but reason is unknown as no failed statuses present
can_continue = False
print(
"WARNING: Either the runner is faulty or the operating status is unknown. The first is self-healing, the second requires investigation."
)
if args.wf_status == SUCCESS or has_failed_statuses:
# do not set mergeable check status if args.wf_status == failure, apparently it has died runners and is to be restarted
# set Mergeable check if workflow is successful (green)
# or if we have GH statuses with failures (red)
# to avoid false-green on a died runner
state = trigger_mergeable_check(
commit,
statuses,
@ -333,10 +280,10 @@ def main():
print(
"Workflow failed but no failed statuses found (died runner?) - cannot set Mergeable Check status"
)
if not can_continue:
if args.wf_status == SUCCESS and not has_failed_statuses:
sys.exit(0)
else:
sys.exit(1)
sys.exit(0)
# An ugly and not nice fix to patch the wrong organization URL,
# see https://github.com/PyGithub/PyGithub/issues/2395#issuecomment-1378629710

View File

@ -1,5 +1,7 @@
#!/usr/bin/env bash
set -e
usage() {
echo "Usage: $0 ENVIRONMENT" >&2
echo "Valid values for ENVIRONMENT: staging, production" >&2
@ -55,7 +57,7 @@ EOF
body() {
local first_line
first_line=$(sed -n '/^# THE SCRIPT START$/{=;q}' "$SOURCE_SCRIPT")
first_line=$(sed -n '/^# THE SCRIPT START$/{=;q;}' "$SOURCE_SCRIPT")
if [ -z "$first_line" ]; then
echo "The pattern '# THE SCRIPT START' is not found in $SOURCE_SCRIPT" >&2
exit 1

View File

@ -50,7 +50,7 @@ set -uo pipefail
# set accordingly to a runner role #
####################################
echo "Running init script"
echo "Running init v1"
export DEBIAN_FRONTEND=noninteractive
export RUNNER_HOME=/home/ubuntu/actions-runner
@ -90,7 +90,6 @@ terminate_delayed() {
# IF `sleep` IS CHANGED, CHANGE ANOTHER VALUE IN `pgrep`
sleep=13.14159265358979323846
echo "Going to terminate the runner's instance in $sleep seconds"
INSTANCE_ID=$(ec2metadata --instance-id)
# We execute it with `at` to not have it as an orphan process, but launched independently
# GH Runners kill all remain processes
echo "sleep '$sleep'; aws ec2 terminate-instances --instance-ids $INSTANCE_ID" | at now || \
@ -111,11 +110,17 @@ declare -f terminate_delayed >> /tmp/actions-hooks/common.sh
terminate_and_exit() {
# Terminate instance and exit from the script instantly
echo "Going to terminate the runner's instance"
INSTANCE_ID=$(ec2metadata --instance-id)
aws ec2 terminate-instances --instance-ids "$INSTANCE_ID"
exit 0
}
terminate_decrease_and_exit() {
# Terminate instance and exit from the script instantly
echo "Going to terminate the runner's instance and decrease asg capacity"
aws autoscaling terminate-instance-in-auto-scaling-group --instance-id "$INSTANCE_ID" --should-decrement-desired-capacity
exit 0
}
declare -f terminate_and_exit >> /tmp/actions-hooks/common.sh
check_spot_instance_is_old() {
@ -324,7 +329,7 @@ while true; do
sudo -u ubuntu ./config.sh remove --token "$(get_runner_token)" \
|| continue
echo "Runner didn't launch or have assigned jobs after ${RUNNER_AGE} seconds, shutting down"
terminate_and_exit
terminate_decrease_and_exit
fi
fi
else

View File

@ -5,7 +5,7 @@ from time import sleep
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node")
node = cluster.add_instance("node", stay_alive=True)
@pytest.fixture(scope="module")
@ -84,3 +84,22 @@ def test_details(started_cluster):
node.query("SHOW CREATE USER user_details_time_only")
== f"CREATE USER user_details_time_only VALID UNTIL \\'{until_year}-01-01 22:03:40\\'\n"
)
def test_restart(started_cluster):
node.query("CREATE USER user_restart VALID UNTIL '06/11/2010 08:03:20 Z+3'")
assert (
node.query("SHOW CREATE USER user_restart")
== "CREATE USER user_restart VALID UNTIL \\'2010-11-06 05:03:20\\'\n"
)
node.restart_clickhouse()
assert (
node.query("SHOW CREATE USER user_restart")
== "CREATE USER user_restart VALID UNTIL \\'2010-11-06 05:03:20\\'\n"
)
error = "Authentication failed"
assert error in node.query_and_get_error("SELECT 1", user="user_restart")

View File

@ -1,4 +1,5 @@
-- Tags: no-random-settings, no-s3-storage
-- Tags: no-random-settings, no-object-storage
-- Tag no-object-storage: this test relies on the number of opened files in MergeTree that can differ in object storages
SET allow_experimental_dynamic_type = 1;
DROP TABLE IF EXISTS test_dynamic;

View File

@ -0,0 +1,6 @@
Ok
Ok
Ok
Ok
Ok
Ok

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -t -q "SELECT sleepEachRow(2) FORMAT Null" 2>&1 | grep -q "^2\." && echo "Ok" || echo "Fail"
${CLICKHOUSE_CLIENT} --time -q "SELECT sleepEachRow(2) FORMAT Null" 2>&1 | grep -q "^2\." && echo "Ok" || echo "Fail"
${CLICKHOUSE_CLIENT} --memory-usage -q "SELECT sum(number) FROM numbers(10_000) FORMAT Null" 2>&1 | grep -q "^[0-9]\+$" && echo "Ok" || echo "Fail"
${CLICKHOUSE_CLIENT} --memory-usage=none -q "SELECT sum(number) FROM numbers(10_000) FORMAT Null" # expected no output
${CLICKHOUSE_CLIENT} --memory-usage=default -q "SELECT sum(number) FROM numbers(10_000) FORMAT Null" 2>&1 | grep -q "^[0-9]\+$" && echo "Ok" || echo "Fail"
${CLICKHOUSE_CLIENT} --memory-usage=readable -q "SELECT sum(number) FROM numbers(10_000) FORMAT Null" 2>&1 | grep -q "^[0-9].*B$" && echo "Ok" || echo "Fail"
${CLICKHOUSE_CLIENT} --memory-usage=unknown -q "SELECT sum(number) FROM numbers(10_000) FORMAT Null" 2>&1 | grep -q "BAD_ARGUMENTS" && echo "Ok" || echo "Fail"

View File

@ -0,0 +1,22 @@
16324913028386710556
16324913028386710556
5049034479224883533
7385293435322750976
12248912094175844631
5049034479224883533
5887129541803688833
5887129541803688833
13747979201178469747
5887129541803688833
15520217392480966957
16324913028386710556
16324913028386710556
5049034479224883533
7385293435322750976
12248912094175844631
5049034479224883533
5887129541803688833
5887129541803688833
13747979201178469747
5887129541803688833
15520217392480966957

View File

@ -0,0 +1,23 @@
SELECT sipHash64(());
SELECT sipHash64((), ());
SELECT sipHash64((), 1);
SELECT sipHash64(1, ());
SELECT sipHash64(1, (), 1);
SELECT sipHash64((), 1, ());
SELECT sipHash64((), (1, 2));
SELECT sipHash64((), (1, 2));
SELECT sipHash64((1, 2), ());
SELECT sipHash64((), (1, 2), ());
SELECT sipHash64((1, 2), (), (3, 4));
SELECT sipHash64(materialize(()));
SELECT sipHash64(materialize(()), materialize(()));
SELECT sipHash64(materialize(()), 1);
SELECT sipHash64(1, materialize(()));
SELECT sipHash64(1, materialize(()), 1);
SELECT sipHash64((), 1, materialize(()));
SELECT sipHash64(materialize(()), (1, 2));
SELECT sipHash64(materialize(()), (1, 2));
SELECT sipHash64((1, 2), materialize(()));
SELECT sipHash64(materialize(()), (1, 2), ());
SELECT sipHash64((1, 2), materialize(()), (3, 4));

View File

@ -1653,6 +1653,7 @@ formated
formatschema
formatter
formatters
fqdn
frac
freezed
fromDaysSinceYearZero