Merge branch 'master' into vdimir/client-noninteractive-mem-usage

This commit is contained in:
vdimir 2024-07-16 12:14:30 +02:00 committed by GitHub
commit 949340f437
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
54 changed files with 466 additions and 232 deletions

View File

@ -62,7 +62,7 @@ jobs:
BuildDockers:
needs: [RunConfig]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_docker.yml
uses: ./.github/workflows/docker_test_images.yml
with:
data: ${{ needs.RunConfig.outputs.data }}
CompatibilityCheckX86:

View File

@ -94,7 +94,7 @@ jobs:
echo "Generate Security"
python3 ./utils/security-generator/generate_security.py > SECURITY.md
git diff HEAD
- name: Generate ChangeLog
- name: Create ChangeLog PR
if: ${{ inputs.type == 'patch' && ! inputs.dry-run }}
uses: peter-evans/create-pull-request@v6
with:

View File

@ -58,7 +58,7 @@ jobs:
# BuildDockers:
# needs: [RunConfig]
# if: ${{ !failure() && !cancelled() }}
# uses: ./.github/workflows/reusable_docker.yml
# uses: ./.github/workflows/docker_test_images.yml
# with:
# data: ${{ needs.RunConfig.outputs.data }}
# StyleCheck:

View File

@ -51,7 +51,7 @@ jobs:
BuildDockers:
needs: [RunConfig]
if: ${{ !failure() && !cancelled() && toJson(fromJson(needs.RunConfig.outputs.data).docker_data.missing_multi) != '[]' }}
uses: ./.github/workflows/reusable_docker.yml
uses: ./.github/workflows/docker_test_images.yml
with:
data: ${{ needs.RunConfig.outputs.data }}
StyleCheck:

View File

@ -40,7 +40,7 @@ jobs:
} >> "$GITHUB_OUTPUT"
BuildDockers:
needs: [RunConfig]
uses: ./.github/workflows/reusable_docker.yml
uses: ./.github/workflows/docker_test_images.yml
with:
data: "${{ needs.RunConfig.outputs.data }}"
set_latest: true

View File

@ -72,7 +72,7 @@ jobs:
BuildDockers:
needs: [RunConfig]
if: ${{ !failure() && !cancelled() && toJson(fromJson(needs.RunConfig.outputs.data).docker_data.missing_multi) != '[]' }}
uses: ./.github/workflows/reusable_docker.yml
uses: ./.github/workflows/docker_test_images.yml
with:
data: ${{ needs.RunConfig.outputs.data }}
StyleCheck:

View File

@ -57,7 +57,7 @@ jobs:
BuildDockers:
needs: [RunConfig]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_docker.yml
uses: ./.github/workflows/docker_test_images.yml
with:
data: ${{ needs.RunConfig.outputs.data }}
CompatibilityCheckX86:

View File

@ -102,6 +102,8 @@ jobs:
--job-name '${{inputs.test_name}}' \
--run \
--run-command '''${{inputs.run_command}}'''
# shellcheck disable=SC2319
echo "JOB_EXIT_CODE=$?" >> "$GITHUB_ENV"
- name: Post run
if: ${{ !cancelled() }}
run: |

View File

@ -3,8 +3,9 @@
#include <base/defines.h>
#include <fstream>
#include <sstream>
#include <string>
namespace fs = std::filesystem;
bool cgroupsV2Enabled()
{
@ -13,11 +14,11 @@ bool cgroupsV2Enabled()
{
/// This file exists iff the host has cgroups v2 enabled.
auto controllers_file = default_cgroups_mount / "cgroup.controllers";
if (!std::filesystem::exists(controllers_file))
if (!fs::exists(controllers_file))
return false;
return true;
}
catch (const std::filesystem::filesystem_error &) /// all "underlying OS API errors", typically: permission denied
catch (const fs::filesystem_error &) /// all "underlying OS API errors", typically: permission denied
{
return false; /// not logging the exception as most callers fall back to cgroups v1
}
@ -33,8 +34,9 @@ bool cgroupsV2MemoryControllerEnabled()
/// According to https://docs.kernel.org/admin-guide/cgroup-v2.html, file "cgroup.controllers" defines which controllers are available
/// for the current + child cgroups. The set of available controllers can be restricted from level to level using file
/// "cgroups.subtree_control". It is therefore sufficient to check the bottom-most nested "cgroup.controllers" file.
std::string cgroup = cgroupV2OfProcess();
auto cgroup_dir = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup);
fs::path cgroup_dir = cgroupV2PathOfProcess();
if (cgroup_dir.empty())
return false;
std::ifstream controllers_file(cgroup_dir / "cgroup.controllers");
if (!controllers_file.is_open())
return false;
@ -46,7 +48,7 @@ bool cgroupsV2MemoryControllerEnabled()
#endif
}
std::string cgroupV2OfProcess()
fs::path cgroupV2PathOfProcess()
{
#if defined(OS_LINUX)
chassert(cgroupsV2Enabled());
@ -54,17 +56,18 @@ std::string cgroupV2OfProcess()
/// A simpler way to get the membership is:
std::ifstream cgroup_name_file("/proc/self/cgroup");
if (!cgroup_name_file.is_open())
return "";
return {};
/// With cgroups v2, there will be a *single* line with prefix "0::/"
/// (see https://docs.kernel.org/admin-guide/cgroup-v2.html)
std::string cgroup;
std::getline(cgroup_name_file, cgroup);
static const std::string v2_prefix = "0::/";
if (!cgroup.starts_with(v2_prefix))
return "";
return {};
cgroup = cgroup.substr(v2_prefix.length());
return cgroup;
/// Note: The 'root' cgroup can have an empty cgroup name, this is valid
return default_cgroups_mount / cgroup;
#else
return "";
return {};
#endif
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <filesystem>
#include <string>
#if defined(OS_LINUX)
/// I think it is possible to mount the cgroups hierarchy somewhere else (e.g. when in containers).
@ -16,7 +15,7 @@ bool cgroupsV2Enabled();
/// Assumes that cgroupsV2Enabled() is enabled.
bool cgroupsV2MemoryControllerEnabled();
/// Which cgroup does the process belong to?
/// Returns an empty string if the cgroup cannot be determined.
/// Detects which cgroup v2 the process belongs to and returns the filesystem path to the cgroup.
/// Returns an empty path the cgroup cannot be determined.
/// Assumes that cgroupsV2Enabled() is enabled.
std::string cgroupV2OfProcess();
std::filesystem::path cgroupV2PathOfProcess();

View File

@ -23,8 +23,9 @@ std::optional<uint64_t> getCgroupsV2MemoryLimit()
if (!cgroupsV2MemoryControllerEnabled())
return {};
std::string cgroup = cgroupV2OfProcess();
auto current_cgroup = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup);
std::filesystem::path current_cgroup = cgroupV2PathOfProcess();
if (current_cgroup.empty())
return {};
/// Open the bottom-most nested memory limit setting file. If there is no such file at the current
/// level, try again at the parent level as memory settings are inherited.

View File

@ -33,13 +33,9 @@ RUN apt-get update \
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r /requirements.txt
COPY * /
ENV FUZZER_ARGS="-max_total_time=60"
SHELL ["/bin/bash", "-c"]
CMD set -o pipefail \
&& timeout -s 9 1h /run_libfuzzer.py 2>&1 | ts "$(printf '%%Y-%%m-%%d %%H:%%M:%%S\t')" | tee main.log
# docker run --network=host --volume <workspace>:/workspace -e PR_TO_TEST=<> -e SHA_TO_TEST=<> clickhouse/libfuzzer

View File

@ -25,7 +25,7 @@ source /utils.lib
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --silent --inMemoryPersistence &
./setup_minio.sh stateful
./mc admin trace clickminio > /test_output/rubbish.log &
./mc admin trace clickminio > /test_output/minio.log &
MC_ADMIN_PID=$!
config_logs_export_cluster /etc/clickhouse-server/config.d/system_logs_export.yaml

View File

@ -54,7 +54,7 @@ source /utils.lib
/usr/share/clickhouse-test/config/install.sh
./setup_minio.sh stateless
m./c admin trace clickminio > /test_output/rubbish.log &
./mc admin trace clickminio > /test_output/minio.log &
MC_ADMIN_PID=$!
./setup_hdfs_minicluster.sh

View File

@ -10,7 +10,7 @@ cd hadoop-3.3.1
export JAVA_HOME=/usr
mkdir -p target/test/data
chown clickhouse ./target/test/data
sudo -E -u clickhouse bin/mapred minicluster -format -nomr -nnport 12222 >> /test_output/garbage.log 2>&1 &
sudo -E -u clickhouse bin/mapred minicluster -format -nomr -nnport 12222 >> /test_output/hdfs_minicluster.log 2>&1 &
while ! nc -z localhost 12222; do
sleep 1

View File

@ -6,7 +6,20 @@ sidebar_label: MySQL Interface
# MySQL Interface
ClickHouse supports the MySQL wire protocol. This allow tools that are MySQL-compatible to interact with ClickHouse seamlessly (e.g. [Looker Studio](../integrations/data-visualization/looker-studio-and-clickhouse.md)).
ClickHouse supports the MySQL wire protocol. This allows certain clients that do not have native ClickHouse connectors leverage the MySQL protocol instead, and it has been validated with the following BI tools:
- [Looker Studio](../integrations/data-visualization/looker-studio-and-clickhouse.md)
- [Tableau Online](../integrations/tableau-online)
- [QuickSight](../integrations/quicksight)
If you are trying other untested clients or integrations, keep in mind that there could be the following limitations:
- SSL implementation might not be fully compatible; there could be potential [TLS SNI](https://www.cloudflare.com/learning/ssl/what-is-sni/) issues.
- A particular tool might require dialect features (e.g., MySQL-specific functions or settings) that are not implemented yet.
If there is a native driver available (e.g., [DBeaver](../integrations/dbeaver)), it is always preferred to use it instead of the MySQL interface. Additionally, while most of the MySQL language clients should work fine, MySQL interface is not guaranteed to be a drop-in replacement for a codebase with existing MySQL queries.
If your use case involves a particular tool that does not have a native ClickHouse driver, and you would like to use it via the MySQL interface and you found certain incompatibilities - please [create an issue](https://github.com/ClickHouse/ClickHouse/issues) in the ClickHouse repository.
## Enabling the MySQL Interface On ClickHouse Cloud

View File

@ -1,6 +1,7 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeObject.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/NestedUtils.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
@ -680,9 +681,33 @@ QueryTreeNodePtr IdentifierResolver::tryResolveIdentifierFromStorage(
bool match_full_identifier = false;
const auto & identifier_full_name = identifier_without_column_qualifier.getFullName();
auto it = table_expression_data.column_name_to_column_node.find(identifier_full_name);
bool can_resolve_directly_from_storage = it != table_expression_data.column_name_to_column_node.end();
if (can_resolve_directly_from_storage && table_expression_data.subcolumn_names.contains(identifier_full_name))
ColumnNodePtr result_column_node;
bool can_resolve_directly_from_storage = false;
bool is_subcolumn = false;
if (auto it = table_expression_data.column_name_to_column_node.find(identifier_full_name); it != table_expression_data.column_name_to_column_node.end())
{
can_resolve_directly_from_storage = true;
is_subcolumn = table_expression_data.subcolumn_names.contains(identifier_full_name);
result_column_node = it->second;
}
/// Check if it's a dynamic subcolumn
else
{
auto [column_name, dynamic_subcolumn_name] = Nested::splitName(identifier_full_name);
auto jt = table_expression_data.column_name_to_column_node.find(column_name);
if (jt != table_expression_data.column_name_to_column_node.end() && jt->second->getColumnType()->hasDynamicSubcolumns())
{
if (auto dynamic_subcolumn_type = jt->second->getColumnType()->tryGetSubcolumnType(dynamic_subcolumn_name))
{
result_column_node = std::make_shared<ColumnNode>(NameAndTypePair{identifier_full_name, dynamic_subcolumn_type}, jt->second->getColumnSource());
can_resolve_directly_from_storage = true;
is_subcolumn = true;
}
}
}
if (can_resolve_directly_from_storage && is_subcolumn)
{
/** In the case when we have an ARRAY JOIN, we should not resolve subcolumns directly from storage.
* For example, consider the following SQL query:
@ -698,11 +723,11 @@ QueryTreeNodePtr IdentifierResolver::tryResolveIdentifierFromStorage(
if (can_resolve_directly_from_storage)
{
match_full_identifier = true;
result_expression = it->second;
result_expression = result_column_node;
}
else
{
it = table_expression_data.column_name_to_column_node.find(identifier_without_column_qualifier.at(0));
auto it = table_expression_data.column_name_to_column_node.find(identifier_without_column_qualifier.at(0));
if (it != table_expression_data.column_name_to_column_node.end())
result_expression = it->second;
}

View File

@ -3416,14 +3416,14 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
function_base = function->build(argument_columns);
/// Do not constant fold get scalar functions
bool disable_constant_folding = function_name == "__getScalar" || function_name == "shardNum" ||
function_name == "shardCount" || function_name == "hostName" || function_name == "tcpPort";
// bool disable_constant_folding = function_name == "__getScalar" || function_name == "shardNum" ||
// function_name == "shardCount" || function_name == "hostName" || function_name == "tcpPort";
/** If function is suitable for constant folding try to convert it to constant.
* Example: SELECT plus(1, 1);
* Result: SELECT 2;
*/
if (function_base->isSuitableForConstantFolding() && !disable_constant_folding)
if (function_base->isSuitableForConstantFolding()) // && !disable_constant_folding)
{
auto result_type = function_base->getResultType();
auto executable_function = function_base->prepare(argument_columns);

View File

@ -103,7 +103,7 @@ public:
Entry get(const ConnectionTimeouts & timeouts, /// NOLINT
const Settings & settings,
bool force_connected = true) override;
bool force_connected) override;
std::string getDescription() const
{

View File

@ -49,6 +49,7 @@ void logAboutProgress(LoggerPtr log, size_t processed, size_t total, AtomicStopw
AsyncLoader::Pool::Pool(const AsyncLoader::PoolInitializer & init)
: name(init.name)
, priority(init.priority)
, max_threads(init.max_threads > 0 ? init.max_threads : getNumberOfPhysicalCPUCores())
, thread_pool(std::make_unique<ThreadPool>(
init.metric_threads,
init.metric_active_threads,
@ -56,17 +57,16 @@ AsyncLoader::Pool::Pool(const AsyncLoader::PoolInitializer & init)
/* max_threads = */ std::numeric_limits<size_t>::max(), // Unlimited number of threads, we do worker management ourselves
/* max_free_threads = */ 0, // We do not require free threads
/* queue_size = */0)) // Unlimited queue to avoid blocking during worker spawning
, max_threads(init.max_threads > 0 ? init.max_threads : getNumberOfPhysicalCPUCores())
{}
AsyncLoader::Pool::Pool(Pool&& o) noexcept
: name(o.name)
, priority(o.priority)
, thread_pool(std::move(o.thread_pool))
, ready_queue(std::move(o.ready_queue))
, max_threads(o.max_threads)
, workers(o.workers)
, suspended_workers(o.suspended_workers.load()) // All these constructors are needed because std::atomic is neither copy-constructible, nor move-constructible. We never move pools after init, so it is safe.
, thread_pool(std::move(o.thread_pool))
{}
void cancelOnDependencyFailure(const LoadJobPtr & self, const LoadJobPtr & dependency, std::exception_ptr & cancel)

View File

@ -365,11 +365,11 @@ private:
{
const String name;
const Priority priority;
std::unique_ptr<ThreadPool> thread_pool; // NOTE: we avoid using a `ThreadPool` queue to be able to move jobs between pools.
std::map<UInt64, LoadJobPtr> ready_queue; // FIFO queue of jobs to be executed in this pool. Map is used for faster erasing. Key is `ready_seqno`
size_t max_threads; // Max number of workers to be spawn
size_t workers = 0; // Number of currently executing workers
std::atomic<size_t> suspended_workers{0}; // Number of workers that are blocked by `wait()` call on a job executing in the same pool (for deadlock resolution)
std::unique_ptr<ThreadPool> thread_pool; // NOTE: we avoid using a `ThreadPool` queue to be able to move jobs between pools.
explicit Pool(const PoolInitializer & init);
Pool(Pool&& o) noexcept;

View File

@ -25,6 +25,7 @@
#endif
using namespace DB;
namespace fs = std::filesystem;
namespace DB
{
@ -69,7 +70,7 @@ uint64_t readMetricFromStatFile(ReadBufferFromFile & buf, const std::string & ke
struct CgroupsV1Reader : ICgroupsReader
{
explicit CgroupsV1Reader(const std::filesystem::path & stat_file_dir) : buf(stat_file_dir / "memory.stat") { }
explicit CgroupsV1Reader(const fs::path & stat_file_dir) : buf(stat_file_dir / "memory.stat") { }
uint64_t readMemoryUsage() override
{
@ -85,7 +86,7 @@ private:
struct CgroupsV2Reader : ICgroupsReader
{
explicit CgroupsV2Reader(const std::filesystem::path & stat_file_dir)
explicit CgroupsV2Reader(const fs::path & stat_file_dir)
: current_buf(stat_file_dir / "memory.current"), stat_buf(stat_file_dir / "memory.stat")
{
}
@ -129,8 +130,9 @@ std::optional<std::string> getCgroupsV2Path()
if (!cgroupsV2MemoryControllerEnabled())
return {};
String cgroup = cgroupV2OfProcess();
auto current_cgroup = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup);
fs::path current_cgroup = cgroupV2PathOfProcess();
if (current_cgroup.empty())
return {};
/// Return the bottom-most nested current memory file. If there is no such file at the current
/// level, try again at the parent level as memory settings are inherited.
@ -138,7 +140,7 @@ std::optional<std::string> getCgroupsV2Path()
{
const auto current_path = current_cgroup / "memory.current";
const auto stat_path = current_cgroup / "memory.stat";
if (std::filesystem::exists(current_path) && std::filesystem::exists(stat_path))
if (fs::exists(current_path) && fs::exists(stat_path))
return {current_cgroup};
current_cgroup = current_cgroup.parent_path();
}
@ -148,7 +150,7 @@ std::optional<std::string> getCgroupsV2Path()
std::optional<std::string> getCgroupsV1Path()
{
auto path = default_cgroups_mount / "memory/memory.stat";
if (!std::filesystem::exists(path))
if (!fs::exists(path))
return {};
return {default_cgroups_mount / "memory"};
}

View File

@ -37,12 +37,12 @@ uint32_t getCGroupLimitedCPUCores(unsigned default_cpu_count)
/// cgroupsv2
if (cgroupsV2Enabled())
{
/// First, we identify the cgroup the process belongs
std::string cgroup = cgroupV2OfProcess();
if (cgroup.empty())
/// First, we identify the path of the cgroup the process belongs
std::filesystem::path cgroup_path = cgroupV2PathOfProcess();
if (cgroup_path.empty())
return default_cpu_count;
auto current_cgroup = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup);
auto current_cgroup = cgroup_path;
// Looking for cpu.max in directories from the current cgroup to the top level
// It does not stop on the first time since the child could have a greater value than parent
@ -62,7 +62,7 @@ uint32_t getCGroupLimitedCPUCores(unsigned default_cpu_count)
}
current_cgroup = current_cgroup.parent_path();
}
current_cgroup = default_cgroups_mount / cgroup;
current_cgroup = cgroup_path;
// Looking for cpuset.cpus.effective in directories from the current cgroup to the top level
while (current_cgroup != default_cgroups_mount.parent_path())
{

View File

@ -230,6 +230,17 @@ public:
virtual bool isDeterministicInScopeOfQuery() const { return true; }
/** This is a special flags for functions which return constant value for the server,
* but the result could be different for different servers in distributed query.
*
* This functions can't support constant folding on the initiator, but can on the follower.
* We can't apply some optimizations as well (e.g. can't remove constant result from GROUP BY key).
* So, it is convenient to have a special flag for them.
*
* Examples are: "__getScalar" and every function from serverConstants.cpp
*/
virtual bool isServerConstant() const { return false; }
/** Lets you know if the function is monotonic in a range of values.
* This is used to work with the index in a sorted chunk of data.
* And allows to use the index not only when it is written, for example `date >= const`, but also, for example, `toMonth(date) >= 11`.
@ -488,6 +499,7 @@ public:
virtual bool isInjective(const ColumnsWithTypeAndName & /*sample_columns*/) const { return false; }
virtual bool isDeterministic() const { return true; }
virtual bool isDeterministicInScopeOfQuery() const { return true; }
virtual bool isServerConstant() const { return false; }
virtual bool isStateful() const { return false; }
using ShortCircuitSettings = IFunctionBase::ShortCircuitSettings;

View File

@ -86,6 +86,8 @@ public:
bool isDeterministicInScopeOfQuery() const override { return function->isDeterministicInScopeOfQuery(); }
bool isServerConstant() const override { return function->isServerConstant(); }
bool isShortCircuit(ShortCircuitSettings & settings, size_t number_of_arguments) const override { return function->isShortCircuit(settings, number_of_arguments); }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & args) const override { return function->isSuitableForShortCircuitArgumentsExecution(args); }

View File

@ -53,6 +53,8 @@ public:
/// getMacro may return different values on different shards/replicas, so it's not constant for distributed query
bool isSuitableForConstantFolding() const override { return !is_distributed; }
bool isServerConstant() const override { return true; }
size_t getNumberOfArguments() const override
{
return 1;

View File

@ -49,6 +49,8 @@ public:
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
bool isServerConstant() const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 1 || !isString(arguments[0].type) || !arguments[0].column || !isColumnConst(*arguments[0].column))
@ -105,6 +107,8 @@ public:
bool isDeterministic() const override { return false; }
bool isServerConstant() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }

View File

@ -21,117 +21,125 @@ namespace DB
namespace
{
template<typename Derived, typename T, typename ColumnT>
class FunctionServerConstantBase : public FunctionConstantBase<Derived, T, ColumnT>
{
public:
using FunctionConstantBase<Derived, T, ColumnT>::FunctionConstantBase;
bool isServerConstant() const override { return true; }
};
#if defined(__ELF__) && !defined(OS_FREEBSD)
/// buildId() - returns the compiler build id of the running binary.
class FunctionBuildId : public FunctionConstantBase<FunctionBuildId, String, DataTypeString>
class FunctionBuildId : public FunctionServerConstantBase<FunctionBuildId, String, DataTypeString>
{
public:
static constexpr auto name = "buildId";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionBuildId>(context); }
explicit FunctionBuildId(ContextPtr context) : FunctionConstantBase(SymbolIndex::instance().getBuildIDHex(), context->isDistributed()) {}
explicit FunctionBuildId(ContextPtr context) : FunctionServerConstantBase(SymbolIndex::instance().getBuildIDHex(), context->isDistributed()) {}
};
#endif
/// Get the host name. It is constant on single server, but is not constant in distributed queries.
class FunctionHostName : public FunctionConstantBase<FunctionHostName, String, DataTypeString>
class FunctionHostName : public FunctionServerConstantBase<FunctionHostName, String, DataTypeString>
{
public:
static constexpr auto name = "hostName";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionHostName>(context); }
explicit FunctionHostName(ContextPtr context) : FunctionConstantBase(DNSResolver::instance().getHostName(), context->isDistributed()) {}
explicit FunctionHostName(ContextPtr context) : FunctionServerConstantBase(DNSResolver::instance().getHostName(), context->isDistributed()) {}
};
class FunctionServerUUID : public FunctionConstantBase<FunctionServerUUID, UUID, DataTypeUUID>
class FunctionServerUUID : public FunctionServerConstantBase<FunctionServerUUID, UUID, DataTypeUUID>
{
public:
static constexpr auto name = "serverUUID";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionServerUUID>(context); }
explicit FunctionServerUUID(ContextPtr context) : FunctionConstantBase(ServerUUID::get(), context->isDistributed()) {}
explicit FunctionServerUUID(ContextPtr context) : FunctionServerConstantBase(ServerUUID::get(), context->isDistributed()) {}
};
class FunctionTCPPort : public FunctionConstantBase<FunctionTCPPort, UInt16, DataTypeUInt16>
class FunctionTCPPort : public FunctionServerConstantBase<FunctionTCPPort, UInt16, DataTypeUInt16>
{
public:
static constexpr auto name = "tcpPort";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionTCPPort>(context); }
explicit FunctionTCPPort(ContextPtr context) : FunctionConstantBase(context->getTCPPort(), context->isDistributed()) {}
explicit FunctionTCPPort(ContextPtr context) : FunctionServerConstantBase(context->getTCPPort(), context->isDistributed()) {}
};
/// Returns timezone for current session.
class FunctionTimezone : public FunctionConstantBase<FunctionTimezone, String, DataTypeString>
class FunctionTimezone : public FunctionServerConstantBase<FunctionTimezone, String, DataTypeString>
{
public:
static constexpr auto name = "timezone";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionTimezone>(context); }
explicit FunctionTimezone(ContextPtr context) : FunctionConstantBase(DateLUT::instance().getTimeZone(), context->isDistributed()) {}
explicit FunctionTimezone(ContextPtr context) : FunctionServerConstantBase(DateLUT::instance().getTimeZone(), context->isDistributed()) {}
};
/// Returns the server time zone (timezone in which server runs).
class FunctionServerTimezone : public FunctionConstantBase<FunctionServerTimezone, String, DataTypeString>
class FunctionServerTimezone : public FunctionServerConstantBase<FunctionServerTimezone, String, DataTypeString>
{
public:
static constexpr auto name = "serverTimezone";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionServerTimezone>(context); }
explicit FunctionServerTimezone(ContextPtr context) : FunctionConstantBase(DateLUT::serverTimezoneInstance().getTimeZone(), context->isDistributed()) {}
explicit FunctionServerTimezone(ContextPtr context) : FunctionServerConstantBase(DateLUT::serverTimezoneInstance().getTimeZone(), context->isDistributed()) {}
};
/// Returns server uptime in seconds.
class FunctionUptime : public FunctionConstantBase<FunctionUptime, UInt32, DataTypeUInt32>
class FunctionUptime : public FunctionServerConstantBase<FunctionUptime, UInt32, DataTypeUInt32>
{
public:
static constexpr auto name = "uptime";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionUptime>(context); }
explicit FunctionUptime(ContextPtr context) : FunctionConstantBase(context->getUptimeSeconds(), context->isDistributed()) {}
explicit FunctionUptime(ContextPtr context) : FunctionServerConstantBase(context->getUptimeSeconds(), context->isDistributed()) {}
};
/// version() - returns the current version as a string.
class FunctionVersion : public FunctionConstantBase<FunctionVersion, String, DataTypeString>
class FunctionVersion : public FunctionServerConstantBase<FunctionVersion, String, DataTypeString>
{
public:
static constexpr auto name = "version";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionVersion>(context); }
explicit FunctionVersion(ContextPtr context) : FunctionConstantBase(VERSION_STRING, context->isDistributed()) {}
explicit FunctionVersion(ContextPtr context) : FunctionServerConstantBase(VERSION_STRING, context->isDistributed()) {}
};
/// revision() - returns the current revision.
class FunctionRevision : public FunctionConstantBase<FunctionRevision, UInt32, DataTypeUInt32>
class FunctionRevision : public FunctionServerConstantBase<FunctionRevision, UInt32, DataTypeUInt32>
{
public:
static constexpr auto name = "revision";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionRevision>(context); }
explicit FunctionRevision(ContextPtr context) : FunctionConstantBase(ClickHouseRevision::getVersionRevision(), context->isDistributed()) {}
explicit FunctionRevision(ContextPtr context) : FunctionServerConstantBase(ClickHouseRevision::getVersionRevision(), context->isDistributed()) {}
};
class FunctionZooKeeperSessionUptime : public FunctionConstantBase<FunctionZooKeeperSessionUptime, UInt32, DataTypeUInt32>
class FunctionZooKeeperSessionUptime : public FunctionServerConstantBase<FunctionZooKeeperSessionUptime, UInt32, DataTypeUInt32>
{
public:
static constexpr auto name = "zookeeperSessionUptime";
explicit FunctionZooKeeperSessionUptime(ContextPtr context)
: FunctionConstantBase(context->getZooKeeperSessionUptime(), context->isDistributed())
: FunctionServerConstantBase(context->getZooKeeperSessionUptime(), context->isDistributed())
{
}
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionZooKeeperSessionUptime>(context); }
};
class FunctionGetOSKernelVersion : public FunctionConstantBase<FunctionGetOSKernelVersion, String, DataTypeString>
class FunctionGetOSKernelVersion : public FunctionServerConstantBase<FunctionGetOSKernelVersion, String, DataTypeString>
{
public:
static constexpr auto name = "getOSKernelVersion";
explicit FunctionGetOSKernelVersion(ContextPtr context) : FunctionConstantBase(Poco::Environment::osName() + " " + Poco::Environment::osVersion(), context->isDistributed()) {}
explicit FunctionGetOSKernelVersion(ContextPtr context) : FunctionServerConstantBase(Poco::Environment::osName() + " " + Poco::Environment::osVersion(), context->isDistributed()) {}
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionGetOSKernelVersion>(context); }
};
class FunctionDisplayName : public FunctionConstantBase<FunctionDisplayName, String, DataTypeString>
class FunctionDisplayName : public FunctionServerConstantBase<FunctionDisplayName, String, DataTypeString>
{
public:
static constexpr auto name = "displayName";
explicit FunctionDisplayName(ContextPtr context) : FunctionConstantBase(context->getConfigRef().getString("display_name", getFQDNOrHostName()), context->isDistributed()) {}
explicit FunctionDisplayName(ContextPtr context) : FunctionServerConstantBase(context->getConfigRef().getString("display_name", getFQDNOrHostName()), context->isDistributed()) {}
static FunctionPtr create(ContextPtr context) {return std::make_shared<FunctionDisplayName>(context); }
};
}

View File

@ -51,7 +51,6 @@
#include <Interpreters/SessionTracker.h>
#include <Core/ServerSettings.h>
#include <Interpreters/PreparedSets.h>
#include <Core/Settings.h>
#include <Core/SettingsQuirks.h>
#include <Access/AccessControl.h>
#include <Access/ContextAccess.h>

View File

@ -40,7 +40,7 @@ ColumnPtr tryGetSubcolumnFromBlock(const Block & block, const DataTypePtr & requ
auto subcolumn_name = requested_subcolumn.getSubcolumnName();
/// If requested subcolumn is dynamic, we should first perform cast and then
/// extract the subcolumn, because the data of dynamic subcolumn can change after cast.
if (elem->type->hasDynamicSubcolumns() && !elem->type->equals(*requested_column_type))
if ((elem->type->hasDynamicSubcolumns() || requested_column_type->hasDynamicSubcolumns()) && !elem->type->equals(*requested_column_type))
{
auto casted_column = castColumn({elem->column, elem->type, ""}, requested_column_type);
auto elem_column = requested_column_type->tryGetSubcolumn(subcolumn_name, casted_column);

View File

@ -87,14 +87,14 @@ bool canRemoveConstantFromGroupByKey(const ConstantNode & root)
else if (function_node)
{
/// Do not allow removing constants like `hostName()`
if (!function_node->getFunctionOrThrow()->isDeterministic())
if (function_node->getFunctionOrThrow()->isServerConstant())
return false;
for (const auto & child : function_node->getArguments())
nodes.push(child.get());
}
else
return false;
// else
// return false;
}
return true;

View File

@ -701,7 +701,6 @@ std::optional<NameAndTypePair> ColumnsDescription::tryGetColumn(const GetColumns
auto jt = subcolumns.get<0>().find(column_name);
if (jt != subcolumns.get<0>().end())
return *jt;
}
/// Check for dynamic subcolumns.
auto [ordinary_column_name, dynamic_subcolumn_name] = Nested::splitName(column_name);
@ -711,6 +710,7 @@ std::optional<NameAndTypePair> ColumnsDescription::tryGetColumn(const GetColumns
if (auto dynamic_subcolumn_type = it->type->tryGetSubcolumnType(dynamic_subcolumn_name))
return NameAndTypePair(ordinary_column_name, dynamic_subcolumn_name, it->type, dynamic_subcolumn_type);
}
}
return {};
}

View File

@ -1068,6 +1068,15 @@ def main() -> int:
if build_result:
if build_result.status == SUCCESS:
previous_status = build_result.status
JobReport(
status=SUCCESS,
description="",
test_results=[],
start_time="",
duration=0.0,
additional_files=[],
job_skipped=True,
).dump()
else:
# FIXME: Consider reusing failures for build jobs.
# Just remove this if/else - that makes build job starting and failing immediately
@ -1265,12 +1274,17 @@ def main() -> int:
elif job_report.pre_report:
print(f"ERROR: Job was killed - generate evidence")
job_report.update_duration()
# Job was killed!
ret_code = os.getenv("JOB_EXIT_CODE", "")
if ret_code:
try:
job_report.exit_code = int(ret_code)
except ValueError:
pass
if Utils.is_killed_with_oom():
print("WARNING: OOM while job execution")
error = f"Out Of Memory, exit_code {job_report.exit_code}, after {job_report.duration}s"
error = f"Out Of Memory, exit_code {job_report.exit_code}, after {int(job_report.duration)}s"
else:
error = f"Unknown, exit_code {job_report.exit_code}, after {job_report.duration}s"
error = f"Unknown, exit_code {job_report.exit_code}, after {int(job_report.duration)}s"
CIBuddy().post_error(error, job_name=_get_ext_check_name(args.job_name))
if CI.is_test_job(args.job_name):
gh = GitHub(get_best_robot_token(), per_page=100)

View File

@ -26,6 +26,7 @@ class CIBuddy:
self.pr_number = pr_info.number
self.head_ref = pr_info.head_ref
self.commit_url = pr_info.commit_html_url
self.sha = pr_info.sha[:10]
@staticmethod
def _get_webhooks():
@ -69,8 +70,10 @@ class CIBuddy:
line_err = f":red_circle: *Error: {error_description}*\n\n"
line_ghr = f" *Runner:* `{instance_type}`, `{instance_id}`\n"
line_job = f" *Job:* `{job_name}`\n"
line_pr_ = f" *PR:* <https://github.com/{self.repo}/pull/{self.pr_number}|#{self.pr_number}>\n"
line_br_ = f" *Branch:* `{self.head_ref}`, <{self.commit_url}|commit>\n"
line_pr_ = f" *PR:* <https://github.com/{self.repo}/pull/{self.pr_number}|#{self.pr_number}>, <{self.commit_url}|{self.sha}>\n"
line_br_ = (
f" *Branch:* `{self.head_ref}`, <{self.commit_url}|{self.sha}>\n"
)
message = line_err
message += line_job
if with_instance_info:
@ -85,4 +88,4 @@ class CIBuddy:
if __name__ == "__main__":
# test
buddy = CIBuddy(dry_run=True)
buddy.post_error("Out of memory")
buddy.post_error("TEst")

View File

@ -763,22 +763,13 @@ class CiCache:
# TIMEOUT * MAX_ROUNDS_TO_WAIT must be less than 6h (GH job timeout) with a room for rest RunConfig work
TIMEOUT = 3000 # 50 min
MAX_ROUNDS_TO_WAIT = 6
MAX_JOB_NUM_TO_WAIT = 3
round_cnt = 0
def _has_build_job():
for job in self.jobs_to_wait:
if CI.is_build_job(job):
return True
return False
if not is_release:
# in PRs we can wait only for builds, TIMEOUT*MAX_ROUNDS_TO_WAIT=100min is enough
MAX_ROUNDS_TO_WAIT = 2
while (
len(self.jobs_to_wait) > MAX_JOB_NUM_TO_WAIT or _has_build_job()
) and round_cnt < MAX_ROUNDS_TO_WAIT:
while round_cnt < MAX_ROUNDS_TO_WAIT:
round_cnt += 1
GHActions.print_in_group(
f"Wait pending jobs, round [{round_cnt}/{MAX_ROUNDS_TO_WAIT}]:",
@ -820,6 +811,10 @@ class CiCache:
f"Job [{job_name}_[{batch}/{num_batches}]] is not pending anymore"
)
job_config.batches.remove(batch)
if not job_config.batches:
print(f"Remove job [{job_name}] from jobs_to_do")
self.jobs_to_skip.append(job_name)
del self.jobs_to_do[job_name]
else:
print(
f"NOTE: Job [{job_name}:{batch}] finished failed - do not add to ready"
@ -830,9 +825,7 @@ class CiCache:
await_finished.add(job_name)
for job in await_finished:
self.jobs_to_skip.append(job)
del self.jobs_to_wait[job]
del self.jobs_to_do[job]
if not dry_run:
expired_sec = int(time.time()) - start_at

View File

@ -49,14 +49,15 @@ class GHActions:
class Shell:
@classmethod
def run_strict(cls, command):
subprocess.run(
command + " 2>&1",
res = subprocess.run(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
)
return res.stdout.strip()
@classmethod
def run(cls, command):

View File

@ -14,6 +14,7 @@ from ssh import SSHAgent
from env_helper import GITHUB_REPOSITORY, S3_BUILDS_BUCKET
from s3_helper import S3Helper
from autoscale_runners_lambda.lambda_shared.pr import Labels
from ci_utils import Shell
from version_helper import (
FILE_WITH_VERSION_PATH,
GENERATED_CONTRIBUTORS,
@ -65,6 +66,8 @@ class ReleaseInfo:
commit_sha: str
# lts or stable
codename: str
previous_release_tag: str
previous_release_sha: str
@staticmethod
def from_file(file_path: str) -> "ReleaseInfo":
@ -79,6 +82,8 @@ class ReleaseInfo:
version = None
release_branch = None
release_tag = None
previous_release_tag = None
previous_release_sha = None
codename = None
assert release_type in ("patch", "new")
if release_type == "new":
@ -101,6 +106,11 @@ class ReleaseInfo:
codename = (
VersionType.STABLE
) # dummy value (artifactory won't be updated for new release)
previous_release_tag = expected_prev_tag
previous_release_sha = Shell.run_strict(
f"git rev-parse {previous_release_tag}"
)
assert previous_release_sha
if release_type == "patch":
with checkout(commit_ref):
_, commit_sha = ShellRunner.run(f"git rev-parse {commit_ref}")
@ -118,9 +128,10 @@ class ReleaseInfo:
)
if version.patch == 1:
expected_version = copy(version)
previous_release_tag = f"v{version.major}.{version.minor}.1.1-new"
expected_version.bump()
expected_tag_prefix = (
f"v{expected_version.major}.{expected_version.minor}-"
f"v{expected_version.major}.{expected_version.minor}."
)
expected_tag_suffix = "-new"
else:
@ -128,6 +139,7 @@ class ReleaseInfo:
f"v{version.major}.{version.minor}.{version.patch-1}."
)
expected_tag_suffix = f"-{version.get_stable_release_type()}"
previous_release_tag = git.latest_tag
if git.latest_tag.startswith(
expected_tag_prefix
) and git.latest_tag.endswith(expected_tag_suffix):
@ -137,8 +149,15 @@ class ReleaseInfo:
False
), f"BUG: Unexpected latest tag [{git.latest_tag}] expected [{expected_tag_prefix}*{expected_tag_suffix}]"
previous_release_sha = Shell.run_strict(
f"git rev-parse {previous_release_tag}"
)
assert previous_release_sha
assert (
release_branch
and previous_release_tag
and previous_release_sha
and commit_sha
and release_tag
and version
@ -150,6 +169,8 @@ class ReleaseInfo:
release_tag=release_tag,
version=version.string,
codename=codename,
previous_release_tag=previous_release_tag,
previous_release_sha=previous_release_sha,
)
with open(outfile, "w", encoding="utf-8") as f:
print(json.dumps(dataclasses.asdict(res), indent=2), file=f)
@ -618,6 +639,8 @@ sudo apt install --yes --no-install-recommends python3-dev python3-pip gh unzip
sudo apt install --yes python3-boto3
sudo apt install --yes python3-github
sudo apt install --yes python3-unidiff
sudo apt install --yes python3-tqdm # cloud changelog
sudo apt install --yes python3-thefuzz # cloud changelog
sudo apt install --yes s3fs
### INSTALL AWS CLI

View File

@ -24,6 +24,18 @@ from tee_popen import TeePopen
NO_CHANGES_MSG = "Nothing to run"
class SensitiveFormatter(logging.Formatter):
@staticmethod
def _filter(s):
return re.sub(
r"(.*)(AZURE_CONNECTION_STRING.*\')(.*)", r"\1AZURE_CONNECTION_STRING\3", s
)
def format(self, record):
original = logging.Formatter.format(self, record)
return self._filter(original)
def get_additional_envs(
check_name: str, run_by_hash_num: int, run_by_hash_total: int
) -> List[str]:
@ -213,6 +225,9 @@ def parse_args():
def main():
logging.basicConfig(level=logging.INFO)
for handler in logging.root.handlers:
# pylint: disable=protected-access
handler.setFormatter(SensitiveFormatter(handler.formatter._fmt)) # type: ignore
stopwatch = Stopwatch()

View File

@ -75,6 +75,7 @@ def get_run_command(
f"--volume={result_path}:/test_output "
"--security-opt seccomp=unconfined " # required to issue io_uring sys-calls
f"--cap-add=SYS_PTRACE {env_str} {additional_options_str} {image} "
"python3 ./utils/runner.py"
)

View File

@ -260,18 +260,29 @@ def main():
failed_to_get_info = False
has_failed_statuses = False
for status in statuses:
if not CI.is_required(status.context):
if not CI.is_required(status.context) or status.context in (
CI.StatusNames.SYNC,
CI.StatusNames.PR_CHECK,
):
# CI.StatusNames.SYNC or CI.StatusNames.PR_CHECK should not be checked
continue
print(f"Check status [{status.context}], [{status.state}]")
if status.state == FAILURE:
has_failed_statuses = True
failed_cnt = Utils.get_failed_tests_number(status.description)
if failed_cnt is None:
failed_to_get_info = True
print(
f"WARNING: failed to get number of failed tests from [{status.description}]"
)
else:
if failed_cnt > max_failed_tests_per_job:
job_name_with_max_failures = status.context
max_failed_tests_per_job = failed_cnt
total_failed_tests += failed_cnt
print(
f"Failed test cases in [{status.context}] is [{failed_cnt}], total failures [{total_failed_tests}]"
)
elif status.state != SUCCESS and status.context not in (
CI.StatusNames.SYNC,
CI.StatusNames.PR_CHECK,

View File

@ -3,6 +3,7 @@
import csv
import logging
import os
import re
import subprocess
import sys
from pathlib import Path
@ -19,6 +20,18 @@ from stopwatch import Stopwatch
from tee_popen import TeePopen
class SensitiveFormatter(logging.Formatter):
@staticmethod
def _filter(s):
return re.sub(
r"(.*)(AZURE_CONNECTION_STRING.*\')(.*)", r"\1AZURE_CONNECTION_STRING\3", s
)
def format(self, record):
original = logging.Formatter.format(self, record)
return self._filter(original)
def get_additional_envs(check_name: str) -> List[str]:
result = []
azure_connection_string = get_parameter_from_ssm("azure_connection_string")
@ -117,6 +130,9 @@ def process_results(
def run_stress_test(docker_image_name: str) -> None:
logging.basicConfig(level=logging.INFO)
for handler in logging.root.handlers:
# pylint: disable=protected-access
handler.setFormatter(SensitiveFormatter(handler.formatter._fmt)) # type: ignore
stopwatch = Stopwatch()
temp_path = Path(TEMP_PATH)

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python3
import copy
import unittest
import random
@ -416,6 +416,30 @@ class TestCIConfig(unittest.TestCase):
"""
checks ci.py job configuration
"""
def _reset_ci_cache_to_wait_all_jobs(ci_cache):
# pretend there are pending jobs that we need to wait
ci_cache.jobs_to_wait = dict(ci_cache.jobs_to_do)
for job, config in ci_cache.jobs_to_wait.items():
assert config.batches
config.pending_batches = list(config.batches)
for batch in range(config.num_batches):
record = CiCache.Record(
record_type=CiCache.RecordType.PENDING,
job_name=job,
job_digest=ci_cache.job_digests[job],
batch=batch,
num_batches=config.num_batches,
release_branch=True,
)
for record_t_, records_ in ci_cache.records.items():
if record_t_.value == CiCache.RecordType.PENDING.value:
records_[record.to_str_key()] = record
assert not ci_cache.jobs_to_skip
assert ci_cache.jobs_to_wait
ci_cache.jobs_to_skip = []
settings = CiSettings()
settings.no_ci_cache = True
pr_info = PRInfo(github_event=_TEST_EVENT_JSON)
@ -432,26 +456,6 @@ class TestCIConfig(unittest.TestCase):
assert not ci_cache.jobs_to_skip
assert not ci_cache.jobs_to_wait
# pretend there are pending jobs that we need to wait
ci_cache.jobs_to_wait = dict(ci_cache.jobs_to_do)
for job, config in ci_cache.jobs_to_wait.items():
assert not config.pending_batches
assert config.batches
config.pending_batches = list(config.batches)
for job, config in ci_cache.jobs_to_wait.items():
for batch in range(config.num_batches):
record = CiCache.Record(
record_type=CiCache.RecordType.PENDING,
job_name=job,
job_digest=ci_cache.job_digests[job],
batch=batch,
num_batches=config.num_batches,
release_branch=True,
)
for record_t_, records_ in ci_cache.records.items():
if record_t_.value == CiCache.RecordType.PENDING.value:
records_[record.to_str_key()] = record
def _test_await_for_batch(
ci_cache: CiCache, record_type: CiCache.RecordType, batch: int
) -> None:
@ -477,32 +481,76 @@ class TestCIConfig(unittest.TestCase):
and batch < config_.num_batches
):
assert batch not in config_.pending_batches
else:
assert batch in config_.pending_batches
for _, config_ in ci_cache.jobs_to_do.items():
# jobs to do must have batches to run before/after await
# if it's an empty list after await - apparently job has not been removed after await
assert config_.batches
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 0)
# check all one-batch jobs are in jobs_to_skip
for job in all_jobs_in_wf:
config = CI.JOB_CONFIGS[job]
if config.num_batches == 1:
self.assertTrue(job in ci_cache.jobs_to_skip)
self.assertTrue(job not in ci_cache.jobs_to_do)
else:
self.assertTrue(job not in ci_cache.jobs_to_skip)
self.assertTrue(job in ci_cache.jobs_to_do)
_test_await_for_batch(ci_cache, CiCache.RecordType.FAILED, 1)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 2)
self.assertTrue(len(ci_cache.jobs_to_skip) > 0)
self.assertTrue(len(ci_cache.jobs_to_do) > 0)
_reset_ci_cache_to_wait_all_jobs(ci_cache)
_test_await_for_batch(ci_cache, CiCache.RecordType.FAILED, 0)
tested = False
for job, config in ci_cache.jobs_to_do.items():
if config.batches == [0]:
tested = True
self.assertTrue(
job not in ci_cache.jobs_to_wait,
"Job must be removed from @jobs_to_wait, because its only batch has FAILED cache record",
)
self.assertCountEqual(
list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip, all_jobs_in_wf
ci_cache.jobs_to_skip,
[],
"No jobs must be skipped, since all cache records are of type FAILED",
)
assert tested
# reset jobs_to_wait after previous test
_reset_ci_cache_to_wait_all_jobs(ci_cache)
assert not ci_cache.jobs_to_skip
# set batch 0 as SUCCESSFUL in ci cache
jobs_to_do_prev = list(ci_cache.jobs_to_do)
jobs_to_skip_prev = []
jobs_to_wait_prev = list(ci_cache.jobs_to_wait)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 0)
self.assertTrue(len(jobs_to_skip_prev) != len(ci_cache.jobs_to_skip))
self.assertTrue(len(jobs_to_wait_prev) > len(ci_cache.jobs_to_wait))
self.assertCountEqual(
list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip,
jobs_to_do_prev + jobs_to_skip_prev,
)
# set batch 1 as SUCCESSFUL in ci cache
jobs_to_do_prev = list(ci_cache.jobs_to_do)
jobs_to_skip_prev = list(ci_cache.jobs_to_skip)
jobs_to_wait_prev = list(ci_cache.jobs_to_wait)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 1)
self.assertTrue(len(jobs_to_skip_prev) != len(ci_cache.jobs_to_skip))
self.assertTrue(len(jobs_to_wait_prev) > len(ci_cache.jobs_to_wait))
self.assertCountEqual(
list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip,
jobs_to_do_prev + jobs_to_skip_prev,
)
# set batch 3, 4, 5, 6 as SUCCESSFUL in ci cache
jobs_to_do_prev = list(ci_cache.jobs_to_do)
jobs_to_skip_prev = list(ci_cache.jobs_to_skip)
jobs_to_wait_prev = list(ci_cache.jobs_to_wait)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 2)
self.assertTrue(ci_cache.jobs_to_do)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 3)
self.assertTrue(ci_cache.jobs_to_do)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 4)
self.assertTrue(ci_cache.jobs_to_do)
_test_await_for_batch(ci_cache, CiCache.RecordType.SUCCESSFUL, 5)
self.assertTrue(
not ci_cache.jobs_to_do
) # by this moment there must be no jobs left as batch 5 is currently the maximum
self.assertTrue(len(jobs_to_skip_prev) != len(ci_cache.jobs_to_skip))
self.assertTrue(len(jobs_to_wait_prev) > len(ci_cache.jobs_to_wait))
self.assertCountEqual(
list(ci_cache.jobs_to_do) + ci_cache.jobs_to_skip,
jobs_to_do_prev + jobs_to_skip_prev,
)
def test_ci_py_filters_not_affected_jobs_in_prs(self):

View File

@ -1,102 +1,42 @@
flat
-- { echoOn }
INSERT INTO table_for_update_field_dictionary VALUES (1, 'First', now());
SELECT key, value FROM dict_flat ORDER BY key ASC;
1 First
INSERT INTO table_for_update_field_dictionary VALUES (2, 'Second', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT key, value FROM dict_flat ORDER BY key ASC;
1 First
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_flat ORDER BY key ASC;
1 First
2 SecondUpdated
3 Third
flat/custom
-- { echoOn }
INSERT INTO table_for_update_field_dictionary VALUES (1, 'First', now());
SELECT key, value FROM dict_flat_custom ORDER BY key ASC;
1 First
INSERT INTO table_for_update_field_dictionary VALUES (2, 'Second', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT key, value FROM dict_flat_custom ORDER BY key ASC;
1 First
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_flat_custom ORDER BY key ASC;
1 First
2 SecondUpdated
3 Third
hashed
-- { echoOn }
INSERT INTO table_for_update_field_dictionary VALUES (1, 'First', now());
SELECT key, value FROM dict_hashed ORDER BY key ASC;
1 First
INSERT INTO table_for_update_field_dictionary VALUES (2, 'Second', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT key, value FROM dict_hashed ORDER BY key ASC;
1 First
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_hashed ORDER BY key ASC;
1 First
2 SecondUpdated
3 Third
hashed/custom
-- { echoOn }
INSERT INTO table_for_update_field_dictionary VALUES (1, 'First', now());
SELECT key, value FROM dict_hashed_custom ORDER BY key ASC;
1 First
INSERT INTO table_for_update_field_dictionary VALUES (2, 'Second', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT key, value FROM dict_hashed_custom ORDER BY key ASC;
1 First
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_hashed_custom ORDER BY key ASC;
1 First
2 SecondUpdated
3 Third
complex_key_hashed
-- { echoOn }
INSERT INTO table_for_update_field_dictionary VALUES (1, 'First', now());
SELECT key, value FROM dict_complex_key_hashed ORDER BY key ASC;
1 First
INSERT INTO table_for_update_field_dictionary VALUES (2, 'Second', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT key, value FROM dict_complex_key_hashed ORDER BY key ASC;
1 First
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_complex_key_hashed ORDER BY key ASC;
1 First
2 SecondUpdated
3 Third
complex_key_hashed/custom
-- { echoOn }
INSERT INTO table_for_update_field_dictionary VALUES (1, 'First', now());
SELECT key, value FROM dict_complex_key_hashed_custom ORDER BY key ASC;
1 First
INSERT INTO table_for_update_field_dictionary VALUES (2, 'Second', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
SELECT key, value FROM dict_complex_key_hashed_custom ORDER BY key ASC;
1 First
2 Second
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
SELECT key, value FROM dict_complex_key_hashed_custom ORDER BY key ASC;
1 First
2 SecondUpdated
3 Third

View File

@ -35,7 +35,7 @@ for layout in "${layouts[@]}"; do
echo "$layout"
fi
$CLICKHOUSE_CLIENT -nm -q "
$CLICKHOUSE_CLIENT --multiquery "
TRUNCATE TABLE table_for_update_field_dictionary;
CREATE DICTIONARY $dictionary_name
@ -49,24 +49,31 @@ for layout in "${layouts[@]}"; do
LAYOUT($layout())
LIFETIME(1);
-- { echoOn }
INSERT INTO table_for_update_field_dictionary VALUES (1, 'First', now());
SELECT key, value FROM $dictionary_name ORDER BY key ASC;
INSERT INTO table_for_update_field_dictionary VALUES (1, 'First', now());"
INSERT INTO table_for_update_field_dictionary VALUES (2, 'Second', now());
SELECT sleepEachRow(1) FROM numbers(10) SETTINGS function_sleep_max_microseconds_per_block = 10000000 FORMAT Null;
while true
do
$CLICKHOUSE_CLIENT --query "SELECT key, value FROM $dictionary_name ORDER BY key ASC" | grep -A10 -B10 'First' && break;
sleep .1;
done
SELECT key, value FROM $dictionary_name ORDER BY key ASC;
$CLICKHOUSE_CLIENT --query "INSERT INTO table_for_update_field_dictionary VALUES (2, 'Second', now());"
INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());
INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());
SELECT sleepEachRow(1) FROM numbers(20) SETTINGS function_sleep_max_microseconds_per_block = 20000000 FORMAT Null;
while true
do
$CLICKHOUSE_CLIENT --query "SELECT key, value FROM $dictionary_name ORDER BY key ASC" | grep -A10 -B10 'Second' && break;
sleep .1;
done
SELECT key, value FROM $dictionary_name ORDER BY key ASC;
-- { echoOff }
$CLICKHOUSE_CLIENT --query "INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now()), (3, 'Third', now())"
DROP DICTIONARY $dictionary_name;
"
while true
do
$CLICKHOUSE_CLIENT --query "SELECT key, value FROM $dictionary_name ORDER BY key ASC" | grep -A10 -B10 'SecondUpdated' && break;
sleep .1;
done
$CLICKHOUSE_CLIENT --query "DROP DICTIONARY $dictionary_name"
done
done

View File

@ -6,6 +6,9 @@ INSERT INTO 02918_parallel_replicas SELECT toString(number), number % 4 FROM num
SET prefer_localhost_replica=0;
--- if we try to query unavaialble replica, connection will be retried
--- but a warning log message will be printed out
SET send_logs_level='error';
-- { echoOn }
SELECT y, count()
FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas)
@ -26,5 +29,6 @@ GROUP BY y
ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default';
-- { echoOff }
SET send_logs_level='warning';
DROP TABLE 02918_parallel_replicas;

View File

@ -30,3 +30,46 @@ SELECT
min(dummy)
FROM remote('127.0.0.{2,3}', system.one)
GROUP BY y;
CREATE TABLE ttt (hr DateTime, ts DateTime) ENGINE=Memory
as select '2000-01-01' d, d;
SELECT
count(),
now() AS c1
FROM remote('127.0.0.{1,2}', currentDatabase(), ttt)
GROUP BY c1 FORMAT Null;
SELECT
count(),
now() AS c1
FROM remote('127.0.0.{3,2}', currentDatabase(), ttt)
GROUP BY c1 FORMAT Null;
SELECT
count(),
now() AS c1
FROM remote('127.0.0.{1,2}', currentDatabase(), ttt)
GROUP BY c1 + 1 FORMAT Null;
SELECT
count(),
now() AS c1
FROM remote('127.0.0.{3,2}', currentDatabase(), ttt)
GROUP BY c1 + 1 FORMAT Null;
SELECT
count(),
tuple(nullIf(toDateTime(formatDateTime(hr, '%F %T', 'America/Los_Angeles'), 'America/Los_Angeles'), toDateTime(0))) as c1,
defaultValueOfArgumentType(toTimeZone(ts, 'America/Los_Angeles')) as c2,
formatDateTime(hr, '%F %T', 'America/Los_Angeles') as c3
FROM remote('127.0.0.{1,2}', currentDatabase(), ttt)
GROUP BY c1, c2, c3 FORMAT Null;
SELECT
count(),
tuple(nullIf(toDateTime(formatDateTime(hr, '%F %T', 'America/Los_Angeles'), 'America/Los_Angeles'), toDateTime(0))) as c1,
defaultValueOfArgumentType(toTimeZone(ts, 'America/Los_Angeles')) as c2,
formatDateTime(hr, '%F %T', 'America/Los_Angeles') as c3
FROM remote('127.0.0.{3,2}', currentDatabase(), ttt)
GROUP BY c1, c2, c3 FORMAT Null;

View File

@ -7,7 +7,7 @@ CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_dynamic_type=1 --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_analyzer=1"
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_dynamic_type=1 --allow_experimental_variant_type=1 --use_variant_as_common_type=1"
function run()
{

View File

@ -0,0 +1,17 @@
QUERY id: 0
PROJECTION COLUMNS
d.String Nullable(String)
PROJECTION
LIST id: 1, nodes: 1
COLUMN id: 2, column_name: d.String, result_type: Nullable(String), source_id: 3
JOIN TREE
TABLE id: 3, alias: __table1, table_name: default.test_dynamic
SETTINGS allow_experimental_analyzer=1
foo
\N
\N
foo
\N
\N
6
6

View File

@ -0,0 +1,21 @@
-- Tags: no-random-settings, no-s3-storage
SET allow_experimental_dynamic_type = 1;
DROP TABLE IF EXISTS test_dynamic;
CREATE TABLE test_dynamic (id UInt64, d Dynamic) ENGINE = MergeTree ORDER BY id SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO test_dynamic VALUES (1, 'foo'), (2, 1111), (3, [1, 2, 3]);
EXPLAIN QUERY TREE SELECT d.String FROM test_dynamic SETTINGS allow_experimental_analyzer = 1;
SYSTEM DROP MARK CACHE;
SELECT d.String FROM test_dynamic SETTINGS allow_experimental_analyzer = 1;
SYSTEM DROP MARK CACHE;
SELECT d.String FROM test_dynamic SETTINGS allow_experimental_analyzer = 0;
SYSTEM FLUSH LOGS;
SELECT
ProfileEvents['FileOpen']
FROM system.query_log
WHERE (type = 2) AND (query LIKE 'SELECT d.String %test_dynamic%') AND (current_database = currentDatabase())
ORDER BY event_time_microseconds DESC
LIMIT 2;
DROP TABLE test_dynamic;

View File

@ -7,7 +7,7 @@ CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1"
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1 --optimize_functions_to_subcolumns=0"
function test()

View File

@ -761,6 +761,7 @@ QueryCacheMisses
QueryPreempted
QueryThread
QuickAssist
QuickSight
QuoteMeta
RBAC
RClickHouse

View File

@ -9,6 +9,7 @@
#include <Core/ColumnWithTypeAndName.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Disks/DiskLocal.h>
#include <Core/Settings.h>
#include <Formats/ReadSchemaUtils.h>
#include <Formats/registerFormats.h>
#include <IO/ReadBuffer.h>

View File

@ -20,6 +20,7 @@ def run_fuzzer(fuzzer: str):
options_file = f"{fuzzer}.options"
custom_libfuzzer_options = ""
fuzzer_arguments = ""
with Path(options_file) as path:
if path.exists() and path.is_file():
@ -47,9 +48,17 @@ def run_fuzzer(fuzzer: str):
for key, value in parser["libfuzzer"].items()
)
if parser.has_section("fuzzer_arguments"):
fuzzer_arguments = " ".join(
("%s" % key) if value == "" else ("%s=%s" % (key, value))
for key, value in parser["fuzzer_arguments"].items()
)
cmd_line = f"{DEBUGGER} ./{fuzzer} {FUZZER_ARGS} {corpus_dir}"
if custom_libfuzzer_options:
cmd_line += f" {custom_libfuzzer_options}"
if fuzzer_arguments:
cmd_line += f" {fuzzer_arguments}"
if not "-dict=" in cmd_line and Path(f"{fuzzer}.dict").exists():
cmd_line += f" -dict={fuzzer}.dict"
@ -70,8 +79,6 @@ def main():
if (current / fuzzer).is_file() and os.access(current / fuzzer, os.X_OK):
run_fuzzer(fuzzer)
exit(0)
if __name__ == "__main__":
main()