Merge branch 'ClickHouse:master' into interval_type_conversion

This commit is contained in:
Yarik Briukhovetskyi 2024-08-19 18:25:45 +02:00 committed by GitHub
commit eaeebb4da1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
46 changed files with 562 additions and 220 deletions

View File

@ -482,7 +482,7 @@ jobs:
if: ${{ !failure() }}
run: |
# update overall ci report
python3 finish_check.py --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }}
python3 ./tests/ci/finish_check.py --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }}
- name: Check Workflow results
if: ${{ !cancelled() }}
run: |
@ -490,5 +490,4 @@ jobs:
cat > "$WORKFLOW_RESULT_FILE" << 'EOF'
${{ toJson(needs) }}
EOF
python3 ./tests/ci/ci_buddy.py --check-wf-status

View File

@ -256,22 +256,6 @@ function configure
rm -f "$FASTTEST_DATA/config.d/secure_ports.xml"
}
function timeout_with_logging() {
local exit_code=0
timeout -s TERM --preserve-status "${@}" || exit_code="${?}"
echo "Checking if it is a timeout. The code 124 will indicate a timeout."
if [[ "${exit_code}" -eq "124" ]]
then
echo "The command 'timeout ${*}' has been killed by timeout."
else
echo "No, it isn't a timeout."
fi
return $exit_code
}
function run_tests
{
clickhouse-server --version
@ -340,7 +324,7 @@ case "$stage" in
configure 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee "$FASTTEST_OUTPUT/install_log.txt"
;&
"run_tests")
timeout_with_logging 35m bash -c run_tests ||:
run_tests ||:
/process_functional_tests_result.py --in-results-dir "$FASTTEST_OUTPUT/" \
--out-results-file "$FASTTEST_OUTPUT/test_results.tsv" \
--out-status-file "$FASTTEST_OUTPUT/check_status.tsv" || echo -e "failure\tCannot parse results" > "$FASTTEST_OUTPUT/check_status.tsv"

View File

@ -35,7 +35,6 @@ RUN mkdir -p /tmp/clickhouse-odbc-tmp \
ENV TZ=Europe/Amsterdam
ENV MAX_RUN_TIME=9000
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ARG sqllogic_test_repo="https://github.com/gregrahn/sqllogictest.git"

View File

@ -94,7 +94,7 @@ function run_tests()
export -f run_tests
timeout "${MAX_RUN_TIME:-9000}" bash -c run_tests || echo "timeout reached" >&2
run_tests
#/process_functional_tests_result.py || echo -e "failure\tCannot parse results" > /test_output/check_status.tsv

View File

@ -22,7 +22,6 @@ ARG sqltest_repo="https://github.com/elliotchance/sqltest/"
RUN git clone ${sqltest_repo}
ENV TZ=UTC
ENV MAX_RUN_TIME=900
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
COPY run.sh /

View File

@ -4,9 +4,6 @@
source /setup_export_logs.sh
set -e -x
MAX_RUN_TIME=${MAX_RUN_TIME:-3600}
MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 3600 : MAX_RUN_TIME))
# Choose random timezone for this test run
TZ="$(rg -v '#' /usr/share/zoneinfo/zone.tab | awk '{print $3}' | shuf | head -n1)"
echo "Choosen random timezone $TZ"
@ -123,9 +120,6 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
clickhouse-client --query "DROP TABLE datasets.hits_v1"
clickhouse-client --query "DROP TABLE datasets.visits_v1"
MAX_RUN_TIME=$((MAX_RUN_TIME < 9000 ? MAX_RUN_TIME : 9000)) # min(MAX_RUN_TIME, 2.5 hours)
MAX_RUN_TIME=$((MAX_RUN_TIME != 0 ? MAX_RUN_TIME : 9000)) # set to 2.5 hours if 0 (unlimited)
else
clickhouse-client --query "CREATE DATABASE test"
clickhouse-client --query "SHOW TABLES FROM test"
@ -258,24 +252,7 @@ function run_tests()
export -f run_tests
function timeout_with_logging() {
local exit_code=0
timeout -s TERM --preserve-status "${@}" || exit_code="${?}"
echo "Checking if it is a timeout. The code 124 will indicate a timeout."
if [[ "${exit_code}" -eq "124" ]]
then
echo "The command 'timeout ${*}' has been killed by timeout."
else
echo "No, it isn't a timeout."
fi
return $exit_code
}
TIMEOUT=$((MAX_RUN_TIME - 700))
timeout_with_logging "$TIMEOUT" bash -c run_tests ||:
run_tests ||:
echo "Files in current directory"
ls -la ./

View File

@ -65,7 +65,6 @@ ENV TZ=Europe/Amsterdam
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ENV NUM_TRIES=1
ENV MAX_RUN_TIME=0
# Unrelated to vars in setup_minio.sh, but should be the same there
# to have the same binaries for local running scenario

View File

@ -12,9 +12,6 @@ dmesg --clear
# fail on errors, verbose and export all env variables
set -e -x -a
MAX_RUN_TIME=${MAX_RUN_TIME:-9000}
MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 9000 : MAX_RUN_TIME))
USE_DATABASE_REPLICATED=${USE_DATABASE_REPLICATED:=0}
USE_SHARED_CATALOG=${USE_SHARED_CATALOG:=0}
@ -308,8 +305,6 @@ function run_tests()
try_run_with_retry 10 clickhouse-client -q "insert into system.zookeeper (name, path, value) values ('auxiliary_zookeeper2', '/test/chroot/', '')"
TIMEOUT=$((MAX_RUN_TIME - 800 > 8400 ? 8400 : MAX_RUN_TIME - 800))
START_TIME=${SECONDS}
set +e
TEST_ARGS=(
@ -324,32 +319,22 @@ function run_tests()
--test-runs "$NUM_TRIES"
"${ADDITIONAL_OPTIONS[@]}"
)
timeout --preserve-status --signal TERM --kill-after 60m ${TIMEOUT}s clickhouse-test "${TEST_ARGS[@]}" 2>&1 \
clickhouse-test "${TEST_ARGS[@]}" 2>&1 \
| ts '%Y-%m-%d %H:%M:%S' \
| tee -a test_output/test_result.txt
set -e
DURATION=$((SECONDS - START_TIME))
echo "Elapsed ${DURATION} seconds."
if [[ $DURATION -ge $TIMEOUT ]]
then
echo "It looks like the command is terminated by the timeout, which is ${TIMEOUT} seconds."
fi
}
export -f run_tests
# This should be enough to setup job and collect artifacts
TIMEOUT=$((MAX_RUN_TIME - 700))
if [ "$NUM_TRIES" -gt "1" ]; then
# We don't run tests with Ordinary database in PRs, only in master.
# So run new/changed tests with Ordinary at least once in flaky check.
timeout_with_logging "$TIMEOUT" bash -c 'NUM_TRIES=1; USE_DATABASE_ORDINARY=1; run_tests' \
NUM_TRIES=1; USE_DATABASE_ORDINARY=1; run_tests \
| sed 's/All tests have finished/Redacted: a message about tests finish is deleted/' | sed 's/No tests were run/Redacted: a message about no tests run is deleted/' ||:
fi
timeout_with_logging "$TIMEOUT" bash -c run_tests ||:
run_tests ||:
echo "Files in current directory"
ls -la ./

View File

@ -40,22 +40,6 @@ function fn_exists() {
declare -F "$1" > /dev/null;
}
function timeout_with_logging() {
local exit_code=0
timeout -s TERM --preserve-status "${@}" || exit_code="${?}"
echo "Checking if it is a timeout. The code 124 will indicate a timeout."
if [[ "${exit_code}" -eq "124" ]]
then
echo "The command 'timeout ${*}' has been killed by timeout."
else
echo "No, it isn't a timeout."
fi
return $exit_code
}
function collect_core_dumps()
{
find . -type f -maxdepth 1 -name 'core.*' | while read -r core; do

View File

@ -1381,7 +1381,7 @@ Default value: `2`.
Close connection before returning connection to the pool.
Default value: true.
Default value: false.
## odbc_bridge_connection_pool_size {#odbc-bridge-connection-pool-size}

View File

@ -1769,6 +1769,8 @@ try
new_server_settings.http_connections_store_limit,
});
DNSResolver::instance().setFilterSettings(new_server_settings.dns_allow_resolve_names_to_ipv4, new_server_settings.dns_allow_resolve_names_to_ipv6);
if (global_context->isServerCompletelyStarted())
CannotAllocateThreadFaultInjector::setFaultProbability(new_server_settings.cannot_allocate_thread_fault_injection_probability);

View File

@ -12,6 +12,7 @@
#include <atomic>
#include <optional>
#include <string_view>
#include "Common/MultiVersion.h"
#include <unordered_set>
#include "DNSPTRResolverProvider.h"
@ -139,12 +140,6 @@ DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host)
return addresses;
}
DNSResolver::IPAddresses resolveIPAddressWithCache(CacheBase<std::string, DNSResolver::CacheEntry> & cache, const std::string & host)
{
auto [result, _ ] = cache.getOrSet(host, [&host]() {return std::make_shared<DNSResolver::CacheEntry>(resolveIPAddressImpl(host), std::chrono::system_clock::now());});
return result->addresses;
}
std::unordered_set<String> reverseResolveImpl(const Poco::Net::IPAddress & address)
{
auto ptr_resolver = DB::DNSPTRResolverProvider::get();
@ -198,21 +193,89 @@ struct DNSResolver::Impl
std::atomic<bool> disable_cache{false};
};
struct DNSResolver::AddressFilter
{
struct DNSFilterSettings
{
bool dns_allow_resolve_names_to_ipv4{true};
bool dns_allow_resolve_names_to_ipv6{true};
};
DNSResolver::DNSResolver() : impl(std::make_unique<DNSResolver::Impl>()), log(getLogger("DNSResolver")) {}
AddressFilter() : settings(std::make_unique<DNSFilterSettings>()) {}
void performAddressFiltering(DNSResolver::IPAddresses & addresses) const
{
const auto current_settings = settings.get();
bool dns_resolve_ipv4 = current_settings->dns_allow_resolve_names_to_ipv4;
bool dns_resolve_ipv6 = current_settings->dns_allow_resolve_names_to_ipv6;
if (dns_resolve_ipv4 && dns_resolve_ipv6)
{
return;
}
if (!dns_resolve_ipv4 && !dns_resolve_ipv6)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "DNS can't resolve any address, because dns_resolve_ipv6_interfaces and dns_resolve_ipv4_interfaces both are disabled");
}
std::erase_if(addresses, [dns_resolve_ipv6, dns_resolve_ipv4](const Poco::Net::IPAddress& address)
{
return (address.family() == Poco::Net::IPAddress::IPv6 && !dns_resolve_ipv6)
|| (address.family() == Poco::Net::IPAddress::IPv4 && !dns_resolve_ipv4);
});
}
void setSettings(bool dns_allow_resolve_names_to_ipv4, bool dns_allow_resolve_names_to_ipv6)
{
settings.set(std::make_unique<DNSFilterSettings>(dns_allow_resolve_names_to_ipv4, dns_allow_resolve_names_to_ipv6));
}
MultiVersion<DNSFilterSettings> settings;
};
DNSResolver::DNSResolver()
: impl(std::make_unique<DNSResolver::Impl>())
, addressFilter(std::make_unique<DNSResolver::AddressFilter>())
, log(getLogger("DNSResolver")) {}
DNSResolver::IPAddresses DNSResolver::getResolvedIPAdressessWithFiltering(const std::string & host)
{
auto addresses = resolveIPAddressImpl(host);
addressFilter->performAddressFiltering(addresses);
if (addresses.empty())
{
ProfileEvents::increment(ProfileEvents::DNSError);
throw DB::NetException(ErrorCodes::DNS_ERROR, "After filtering there are no resolved address for host({}).", host);
}
return addresses;
}
DNSResolver::IPAddresses DNSResolver::resolveIPAddressWithCache(const std::string & host)
{
auto [result, _ ] = impl->cache_host.getOrSet(host, [&host, this]() {return std::make_shared<DNSResolver::CacheEntry>(getResolvedIPAdressessWithFiltering(host), std::chrono::system_clock::now());});
return result->addresses;
}
Poco::Net::IPAddress DNSResolver::resolveHost(const std::string & host)
{
return pickAddress(resolveHostAll(host)); // random order -> random pick
}
void DNSResolver::setFilterSettings(bool dns_allow_resolve_names_to_ipv4, bool dns_allow_resolve_names_to_ipv6)
{
addressFilter->setSettings(dns_allow_resolve_names_to_ipv4, dns_allow_resolve_names_to_ipv6);
}
DNSResolver::IPAddresses DNSResolver::resolveHostAllInOriginOrder(const std::string & host)
{
if (impl->disable_cache)
return resolveIPAddressImpl(host);
return getResolvedIPAdressessWithFiltering(host);
addToNewHosts(host);
return resolveIPAddressWithCache(impl->cache_host, host);
return resolveIPAddressWithCache(host);
}
DNSResolver::IPAddresses DNSResolver::resolveHostAll(const std::string & host)
@ -232,7 +295,7 @@ Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host_an
splitHostAndPort(host_and_port, host, port);
addToNewHosts(host);
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(impl->cache_host, host)), port);
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(host)), port);
}
Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, UInt16 port)
@ -241,7 +304,7 @@ Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, U
return Poco::Net::SocketAddress(host, port);
addToNewHosts(host);
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(impl->cache_host, host)), port);
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(host)), port);
}
std::vector<Poco::Net::SocketAddress> DNSResolver::resolveAddressList(const std::string & host, UInt16 port)
@ -254,7 +317,7 @@ std::vector<Poco::Net::SocketAddress> DNSResolver::resolveAddressList(const std:
if (!impl->disable_cache)
addToNewHosts(host);
std::vector<Poco::Net::IPAddress> ips = impl->disable_cache ? hostByName(host) : resolveIPAddressWithCache(impl->cache_host, host);
std::vector<Poco::Net::IPAddress> ips = impl->disable_cache ? hostByName(host) : resolveIPAddressWithCache(host);
auto ips_end = std::unique(ips.begin(), ips.end());
addresses.reserve(ips_end - ips.begin());
@ -419,8 +482,8 @@ bool DNSResolver::updateCache(UInt32 max_consecutive_failures)
bool DNSResolver::updateHost(const String & host)
{
const auto old_value = resolveIPAddressWithCache(impl->cache_host, host);
auto new_value = resolveIPAddressImpl(host);
const auto old_value = resolveIPAddressWithCache(host);
auto new_value = getResolvedIPAdressessWithFiltering(host);
const bool result = old_value != new_value;
impl->cache_host.set(host, std::make_shared<DNSResolver::CacheEntry>(std::move(new_value), std::chrono::system_clock::now()));
return result;

View File

@ -68,6 +68,8 @@ public:
/// Returns true if IP of any host has been changed or an element was dropped (too many failures)
bool updateCache(UInt32 max_consecutive_failures);
void setFilterSettings(bool dns_allow_resolve_names_to_ipv4, bool dns_allow_resolve_names_to_ipv6);
/// Returns a copy of cache entries
std::vector<std::pair<std::string, CacheEntry>> cacheEntries() const;
@ -86,6 +88,10 @@ private:
struct Impl;
std::unique_ptr<Impl> impl;
struct AddressFilter;
std::unique_ptr<AddressFilter> addressFilter;
LoggerPtr log;
/// Updates cached value and returns true it has been changed.
@ -94,6 +100,9 @@ private:
void addToNewHosts(const String & host);
void addToNewAddresses(const Poco::Net::IPAddress & address);
IPAddresses resolveIPAddressWithCache(const std::string & host);
IPAddresses getResolvedIPAdressessWithFiltering(const std::string & host);
};
}

View File

@ -51,7 +51,7 @@ StatusFile::StatusFile(std::string path_, FillFunction fill_)
std::string contents;
{
ReadBufferFromFile in(path, 1024);
LimitReadBuffer limit_in(in, 1024, /* trow_exception */ false, /* exact_limit */ {});
LimitReadBuffer limit_in(in, 1024, /* throw_exception */ false, /* exact_limit */ {});
readStringUntilEOF(contents, limit_in);
}

View File

@ -106,6 +106,8 @@ namespace DB
M(UInt64, dns_cache_max_entries, 10000, "Internal DNS cache max entries.", 0) \
M(Int32, dns_cache_update_period, 15, "Internal DNS cache update period in seconds.", 0) \
M(UInt32, dns_max_consecutive_failures, 10, "Max DNS resolve failures of a hostname before dropping the hostname from ClickHouse DNS cache.", 0) \
M(Bool, dns_allow_resolve_names_to_ipv4, true, "Allows resolve names to ipv4 addresses.", 0) \
M(Bool, dns_allow_resolve_names_to_ipv6, true, "Allows resolve names to ipv6 addresses.", 0) \
\
M(UInt64, max_table_size_to_drop, 50000000000lu, "If size of a table is greater than this value (in bytes) than table could not be dropped with any DROP query.", 0) \
M(UInt64, max_partition_size_to_drop, 50000000000lu, "Same as max_table_size_to_drop, but for the partitions.", 0) \

View File

@ -18,13 +18,15 @@
#include <Core/Settings.h>
#include <IO/Operators.h>
#include "config.h"
#if USE_SIMDJSON
#include <Common/JSONParsers/SimdJSONParser.h>
# include <Common/JSONParsers/SimdJSONParser.h>
#elif USE_RAPIDJSON
# include <Common/JSONParsers/RapidJSONParser.h>
#else
# include <Common/JSONParsers/DummyJSONParser.h>
#endif
#if USE_RAPIDJSON
#include <Common/JSONParsers/RapidJSONParser.h>
#endif
#include <Common/JSONParsers/DummyJSONParser.h>
namespace DB
{
@ -105,7 +107,7 @@ SerializationPtr DataTypeObject::doGetDefaultSerialization() const
switch (schema_format)
{
case SchemaFormat::JSON:
#ifdef USE_SIMDJSON
#if USE_SIMDJSON
return std::make_shared<SerializationJSON<SimdJSONParser>>(
std::move(typed_path_serializations),
paths_to_skip,

View File

@ -80,20 +80,27 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
if (with_file_cache)
{
auto cache_key = settings.remote_fs_cache->createKeyForPath(object_path);
buf = std::make_unique<CachedOnDiskReadBufferFromFile>(
object_path,
cache_key,
settings.remote_fs_cache,
FileCache::getCommonUser(),
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object); },
settings,
query_id,
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
/* read_until_position */std::nullopt,
cache_log);
if (settings.remote_fs_cache->isInitialized())
{
auto cache_key = settings.remote_fs_cache->createKeyForPath(object_path);
buf = std::make_unique<CachedOnDiskReadBufferFromFile>(
object_path,
cache_key,
settings.remote_fs_cache,
FileCache::getCommonUser(),
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object); },
settings,
query_id,
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
/* read_until_position */std::nullopt,
cache_log);
}
else
{
settings.remote_fs_cache->throwInitExceptionIfNeeded();
}
}
/// Can't wrap CachedOnDiskReadBufferFromFile in CachedInMemoryReadBufferFromFile because the

View File

@ -99,7 +99,7 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
/// Need to remove even if cache_on_write == false.
removeCacheIfExists(object.remote_path);
if (cache_on_write)
if (cache_on_write && cache->isInitialized())
{
auto key = getCacheKey(object.remote_path);
return std::make_unique<CachedOnDiskWriteBufferFromFile>(
@ -122,7 +122,8 @@ void CachedObjectStorage::removeCacheIfExists(const std::string & path_key_for_c
return;
/// Add try catch?
cache->removeKeyIfExists(getCacheKey(path_key_for_cache), FileCache::getCommonUser().user_id);
if (cache->isInitialized())
cache->removeKeyIfExists(getCacheKey(path_key_for_cache), FileCache::getCommonUser().user_id);
}
void CachedObjectStorage::removeObject(const StoredObject & object)

View File

@ -11,11 +11,15 @@
#include <Interpreters/Cache/EvictionCandidates.h>
#include <Interpreters/Context.h>
#include <base/hex.h>
#include <Common/callOnce.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Core/ServerUUID.h>
#include <exception>
#include <filesystem>
#include <mutex>
namespace fs = std::filesystem;
@ -88,6 +92,7 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
, bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0)
, boundary_alignment(settings.boundary_alignment)
, load_metadata_threads(settings.load_metadata_threads)
, load_metadata_asynchronously(settings.load_metadata_asynchronously)
, write_cache_per_user_directory(settings.write_cache_per_user_id_directory)
, keep_current_size_to_max_ratio(1 - settings.keep_free_space_size_ratio)
, keep_current_elements_to_max_ratio(1 - settings.keep_free_space_elements_ratio)
@ -136,7 +141,17 @@ const FileCache::UserInfo & FileCache::getInternalUser()
bool FileCache::isInitialized() const
{
return is_initialized.load(std::memory_order_seq_cst);
return is_initialized;
}
void FileCache::throwInitExceptionIfNeeded()
{
if (load_metadata_asynchronously)
return;
std::lock_guard lock(init_mutex);
if (init_exception)
std::rethrow_exception(init_exception);
}
const String & FileCache::getBasePath() const
@ -170,6 +185,35 @@ void FileCache::assertInitialized() const
}
void FileCache::initialize()
{
// Prevent initialize() from running twice. This may be caused by two cache disks being created with the same path (see integration/test_filesystem_cache).
callOnce(initialize_called, [&] {
bool need_to_load_metadata = fs::exists(getBasePath());
try
{
if (!need_to_load_metadata)
fs::create_directories(getBasePath());
status_file = make_unique<StatusFile>(fs::path(getBasePath()) / "status", StatusFile::write_full_info);
}
catch (...)
{
init_exception = std::current_exception();
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
if (load_metadata_asynchronously)
{
load_metadata_main_thread = ThreadFromGlobalPool([this, need_to_load_metadata] { initializeImpl(need_to_load_metadata); });
}
else
{
initializeImpl(need_to_load_metadata);
}
});
}
void FileCache::initializeImpl(bool load_metadata)
{
std::lock_guard lock(init_mutex);
@ -178,16 +222,10 @@ void FileCache::initialize()
try
{
if (fs::exists(getBasePath()))
{
if (load_metadata)
loadMetadata();
}
else
{
fs::create_directories(getBasePath());
}
status_file = make_unique<StatusFile>(fs::path(getBasePath()) / "status", StatusFile::write_full_info);
metadata.startup();
}
catch (...)
{
@ -196,8 +234,6 @@ void FileCache::initialize()
throw;
}
metadata.startup();
if (keep_current_size_to_max_ratio != 1 || keep_current_elements_to_max_ratio != 1)
{
keep_up_free_space_ratio_task = Context::getGlobalContextInstance()->getSchedulePool().createTask(log->name(), [this] { freeSpaceRatioKeepingThreadFunc(); });
@ -205,6 +241,7 @@ void FileCache::initialize()
}
is_initialized = true;
LOG_TEST(log, "Initialized cache from {}", metadata.getBaseDirectory());
}
CachePriorityGuard::Lock FileCache::lockCache() const
@ -1197,7 +1234,6 @@ void FileCache::loadMetadataImpl()
std::vector<ThreadFromGlobalPool> loading_threads;
std::exception_ptr first_exception;
std::mutex set_exception_mutex;
std::atomic<bool> stop_loading = false;
LOG_INFO(log, "Loading filesystem cache with {} threads from {}", load_metadata_threads, metadata.getBaseDirectory());
@ -1207,7 +1243,7 @@ void FileCache::loadMetadataImpl()
{
loading_threads.emplace_back([&]
{
while (!stop_loading)
while (!stop_loading_metadata)
{
try
{
@ -1224,7 +1260,7 @@ void FileCache::loadMetadataImpl()
if (!first_exception)
first_exception = std::current_exception();
}
stop_loading = true;
stop_loading_metadata = true;
return;
}
}
@ -1237,7 +1273,7 @@ void FileCache::loadMetadataImpl()
if (!first_exception)
first_exception = std::current_exception();
}
stop_loading = true;
stop_loading_metadata = true;
break;
}
}
@ -1424,6 +1460,11 @@ FileCache::~FileCache()
void FileCache::deactivateBackgroundOperations()
{
shutdown.store(true);
stop_loading_metadata = true;
if (load_metadata_main_thread.joinable())
load_metadata_main_thread.join();
metadata.shutdown();
if (keep_up_free_space_ratio_task)
keep_up_free_space_ratio_task->deactivate();

View File

@ -8,6 +8,7 @@
#include <IO/ReadSettings.h>
#include <Common/callOnce.h>
#include <Common/ThreadPool.h>
#include <Common/StatusFile.h>
#include <Interpreters/Cache/LRUFileCachePriority.h>
@ -82,6 +83,9 @@ public:
bool isInitialized() const;
/// Throws if `!load_metadata_asynchronously` and there is an exception in `init_exception`
void throwInitExceptionIfNeeded();
const String & getBasePath() const;
static Key createKeyForPath(const String & path);
@ -199,6 +203,9 @@ private:
const size_t bypass_cache_threshold;
const size_t boundary_alignment;
size_t load_metadata_threads;
const bool load_metadata_asynchronously;
std::atomic<bool> stop_loading_metadata = false;
ThreadFromGlobalPool load_metadata_main_thread;
const bool write_cache_per_user_directory;
BackgroundSchedulePool::TaskHolder keep_up_free_space_ratio_task;
@ -210,6 +217,7 @@ private:
std::exception_ptr init_exception;
std::atomic<bool> is_initialized = false;
OnceFlag initialize_called;
mutable std::mutex init_mutex;
std::unique_ptr<StatusFile> status_file;
std::atomic<bool> shutdown = false;
@ -247,6 +255,8 @@ private:
*/
FileCacheQueryLimitPtr query_limit;
void initializeImpl(bool load_metadata);
void assertInitialized() const;
void assertCacheCorrectness();

View File

@ -65,6 +65,9 @@ void FileCacheSettings::loadImpl(FuncHas has, FuncGetUInt get_uint, FuncGetStrin
if (has("load_metadata_threads"))
load_metadata_threads = get_uint("load_metadata_threads");
if (has("load_metadata_asynchronously"))
load_metadata_asynchronously = get_uint("load_metadata_asynchronously");
if (boundary_alignment > max_file_segment_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `boundary_alignment` cannot exceed `max_file_segment_size`");

View File

@ -32,6 +32,7 @@ struct FileCacheSettings
size_t background_download_queue_size_limit = FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_QUEUE_SIZE_LIMIT;
size_t load_metadata_threads = FILECACHE_DEFAULT_LOAD_METADATA_THREADS;
bool load_metadata_asynchronously = false;
bool write_cache_per_user_id_directory = false;

View File

@ -20,6 +20,7 @@ static Block getSampleBlock()
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "max_size"},
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "max_elements"},
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "max_file_segment_size"},
ColumnWithTypeAndName{std::make_shared<DataTypeUInt8>(), "is_initialized"},
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "boundary_alignment"},
ColumnWithTypeAndName{std::make_shared<DataTypeNumber<UInt8>>(), "cache_on_write_operations"},
ColumnWithTypeAndName{std::make_shared<DataTypeNumber<UInt8>>(), "cache_hits_threshold"},
@ -50,6 +51,7 @@ BlockIO InterpreterDescribeCacheQuery::execute()
res_columns[i++]->insert(settings.max_size);
res_columns[i++]->insert(settings.max_elements);
res_columns[i++]->insert(settings.max_file_segment_size);
res_columns[i++]->insert(cache->isInitialized());
res_columns[i++]->insert(settings.boundary_alignment);
res_columns[i++]->insert(settings.cache_on_write_operations);
res_columns[i++]->insert(settings.cache_hits_threshold);

View File

@ -65,7 +65,7 @@ TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, Cu
std::unique_ptr<WriteBufferFromFileBase> TemporaryDataOnDisk::createRawStream(size_t max_file_size)
{
if (file_cache)
if (file_cache && file_cache->isInitialized())
{
auto holder = createCacheFile(max_file_size);
return std::make_unique<WriteBufferToFileSegment>(std::move(holder));
@ -81,7 +81,7 @@ std::unique_ptr<WriteBufferFromFileBase> TemporaryDataOnDisk::createRawStream(si
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size)
{
if (file_cache)
if (file_cache && file_cache->isInitialized())
{
auto holder = createCacheFile(max_file_size);

View File

@ -7,6 +7,7 @@
#include <algorithm>
#include <numeric>
#include <thread>
#include <chrono>
#include <Core/ServerUUID.h>
#include <Common/iota.h>
@ -42,6 +43,7 @@
#include <Interpreters/DatabaseCatalog.h>
#include <base/scope_guard.h>
using namespace std::chrono_literals;
namespace fs = std::filesystem;
using namespace DB;
@ -360,9 +362,11 @@ TEST_F(FileCacheTest, LRUPolicy)
settings.max_size = 30;
settings.max_elements = 5;
settings.boundary_alignment = 1;
settings.load_metadata_asynchronously = false;
const size_t file_size = INT_MAX; // the value doesn't really matter because boundary_alignment == 1.
const auto user = FileCache::getCommonUser();
{
std::cerr << "Step 1\n";
@ -817,6 +821,7 @@ TEST_F(FileCacheTest, writeBuffer)
settings.max_elements = 5;
settings.max_file_segment_size = 5;
settings.base_path = cache_base_path;
settings.load_metadata_asynchronously = false;
FileCache cache("6", settings);
cache.initialize();
@ -948,6 +953,7 @@ TEST_F(FileCacheTest, temporaryData)
settings.max_size = 10_KiB;
settings.max_file_segment_size = 1_KiB;
settings.base_path = cache_base_path;
settings.load_metadata_asynchronously = false;
DB::FileCache file_cache("7", settings);
file_cache.initialize();
@ -1076,6 +1082,7 @@ TEST_F(FileCacheTest, CachedReadBuffer)
settings.max_size = 30;
settings.max_elements = 10;
settings.boundary_alignment = 1;
settings.load_metadata_asynchronously = false;
ReadSettings read_settings;
read_settings.enable_filesystem_cache = true;
@ -1095,6 +1102,7 @@ TEST_F(FileCacheTest, CachedReadBuffer)
auto cache = std::make_shared<DB::FileCache>("8", settings);
cache->initialize();
auto key = cache->createKeyForPath(file_path);
const auto user = FileCache::getCommonUser();
@ -1135,6 +1143,7 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
settings.max_size = 10_KiB;
settings.max_file_segment_size = 1_KiB;
settings.base_path = cache_base_path;
settings.load_metadata_asynchronously = false;
DB::FileCache file_cache("cache", settings);
file_cache.initialize();
@ -1198,6 +1207,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
settings.max_size = 40;
settings.max_elements = 6;
settings.boundary_alignment = 1;
settings.load_metadata_asynchronously = false;
settings.cache_policy = "SLRU";
settings.slru_size_ratio = 0.5;
@ -1310,6 +1320,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
settings2.boundary_alignment = 1;
settings2.cache_policy = "SLRU";
settings2.slru_size_ratio = 0.5;
settings.load_metadata_asynchronously = false;
auto cache = std::make_shared<DB::FileCache>("slru_2", settings2);
cache->initialize();

View File

@ -47,6 +47,9 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
for (const auto & [cache_name, cache_data] : caches)
{
const auto & cache = cache_data->cache;
if (!cache->isInitialized())
continue;
cache->iterate([&](const FileSegment::Info & file_segment)
{
size_t i = 0;

View File

@ -21,6 +21,7 @@ ColumnsDescription StorageSystemFilesystemCacheSettings::getColumnsDescription()
{"path", std::make_shared<DataTypeString>(), "Cache directory"},
{"max_size", std::make_shared<DataTypeUInt64>(), "Cache size limit by the number of bytes"},
{"max_elements", std::make_shared<DataTypeUInt64>(), "Cache size limit by the number of elements"},
{"is_initialized", std::make_shared<DataTypeUInt8>(), "Whether the cache is initialized and ready to be used"},
{"current_size", std::make_shared<DataTypeUInt64>(), "Current cache size by the number of bytes"},
{"current_elements", std::make_shared<DataTypeUInt64>(), "Current cache size by the number of elements"},
{"max_file_segment_size", std::make_shared<DataTypeUInt64>(), "Maximum allowed file segment size"},
@ -56,6 +57,7 @@ void StorageSystemFilesystemCacheSettings::fillData(
res_columns[i++]->insert(settings.base_path);
res_columns[i++]->insert(settings.max_size);
res_columns[i++]->insert(settings.max_elements);
res_columns[i++]->insert(cache->isInitialized());
res_columns[i++]->insert(cache->getUsedCacheSize());
res_columns[i++]->insert(cache->getFileSegmentsNum());
res_columns[i++]->insert(settings.max_file_segment_size);

View File

@ -50,7 +50,6 @@ from github_helper import GitHub
from pr_info import PRInfo
from report import (
ERROR,
FAILURE,
PENDING,
SUCCESS,
BuildResult,
@ -62,11 +61,11 @@ from report import (
FAIL,
)
from s3_helper import S3Helper
from stopwatch import Stopwatch
from tee_popen import TeePopen
from ci_cache import CiCache
from ci_settings import CiSettings
from ci_buddy import CIBuddy
from stopwatch import Stopwatch
from version_helper import get_version_from_repo
# pylint: disable=too-many-lines
@ -370,8 +369,8 @@ def _pre_action(s3, job_name, batch, indata, pr_info):
# skip_status = SUCCESS already there
GH.print_in_group("Commit Status Data", job_status)
# create pre report
jr = JobReport.create_pre_report(status=skip_status, job_skipped=to_be_skipped)
# create dummy report
jr = JobReport.create_dummy(status=skip_status, job_skipped=to_be_skipped)
jr.dump()
if not to_be_skipped:
@ -990,19 +989,21 @@ def _run_test(job_name: str, run_command: str) -> int:
stopwatch = Stopwatch()
job_log = Path(TEMP_PATH) / "job_log.txt"
with TeePopen(run_command, job_log, env, timeout) as process:
print(f"Job process started, pid [{process.process.pid}]")
retcode = process.wait()
if retcode != 0:
print(f"Run action failed for: [{job_name}] with exit code [{retcode}]")
if timeout and process.timeout_exceeded:
print(f"Timeout {timeout} exceeded, dumping the job report")
JobReport(
status=FAILURE,
description=f"Timeout {timeout} exceeded",
test_results=[TestResult.create_check_timeout_expired(timeout)],
start_time=stopwatch.start_time_str,
duration=stopwatch.duration_seconds,
additional_files=[job_log],
).dump()
if process.timeout_exceeded:
print(f"Job timed out: [{job_name}] exit code [{retcode}]")
assert JobReport.exist(), "JobReport real or dummy must be present"
jr = JobReport.load()
if jr.dummy:
print(
f"ERROR: Run action failed with timeout and did not generate JobReport - update dummy report with execution time"
)
jr.test_results = [TestResult.create_check_timeout_expired()]
jr.duration = stopwatch.duration_seconds
jr.additional_files += [job_log]
print(f"Run action done for: [{job_name}]")
return retcode
@ -1205,7 +1206,7 @@ def main() -> int:
job_report
), "BUG. There must be job report either real report, or pre-report if job was killed"
error_description = ""
if not job_report.pre_report:
if not job_report.dummy:
# it's a real job report
ch_helper = ClickHouseHelper()
check_url = ""
@ -1329,10 +1330,20 @@ def main() -> int:
if CI.is_test_job(args.job_name):
gh = GitHub(get_best_robot_token(), per_page=100)
commit = get_commit(gh, pr_info.sha)
check_url = ""
if job_report.test_results or job_report.additional_files:
check_url = upload_result_helper.upload_results(
s3,
pr_info.number,
pr_info.sha,
job_report.test_results,
job_report.additional_files,
job_report.check_name or _get_ext_check_name(args.job_name),
)
post_commit_status(
commit,
ERROR,
"",
check_url,
"Error: " + error_description,
_get_ext_check_name(args.job_name),
pr_info,

View File

@ -316,6 +316,7 @@ class CI:
JobNames.STATEFUL_TEST_PARALLEL_REPL_TSAN: CommonJobConfigs.STATEFUL_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_TSAN],
random_bucket="parrepl_with_sanitizer",
timeout=3600,
),
JobNames.STATELESS_TEST_ASAN: CommonJobConfigs.STATELESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_ASAN], num_batches=2
@ -343,17 +344,17 @@ class CI:
runner_type=Runners.FUNC_TESTER_ARM,
),
JobNames.STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE: CommonJobConfigs.STATELESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_RELEASE], num_batches=4
required_builds=[BuildNames.PACKAGE_RELEASE], num_batches=2
),
JobNames.STATELESS_TEST_S3_DEBUG: CommonJobConfigs.STATELESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_DEBUG], num_batches=2
required_builds=[BuildNames.PACKAGE_DEBUG], num_batches=1
),
JobNames.STATELESS_TEST_AZURE_ASAN: CommonJobConfigs.STATELESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_ASAN], num_batches=3, release_only=True
),
JobNames.STATELESS_TEST_S3_TSAN: CommonJobConfigs.STATELESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_TSAN],
num_batches=4,
num_batches=3,
),
JobNames.STRESS_TEST_DEBUG: CommonJobConfigs.STRESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_DEBUG],
@ -401,7 +402,8 @@ class CI:
required_builds=[BuildNames.PACKAGE_ASAN], release_only=True, num_batches=4
),
JobNames.INTEGRATION_TEST_ASAN_OLD_ANALYZER: CommonJobConfigs.INTEGRATION_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_ASAN], num_batches=6
required_builds=[BuildNames.PACKAGE_ASAN],
num_batches=6,
),
JobNames.INTEGRATION_TEST_TSAN: CommonJobConfigs.INTEGRATION_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_TSAN], num_batches=6

View File

@ -332,7 +332,7 @@ class JobConfig:
# will be triggered for the job if omitted in CI workflow yml
run_command: str = ""
# job timeout, seconds
timeout: Optional[int] = None
timeout: int = 7200
# sets number of batches for a multi-batch job
num_batches: int = 1
# label that enables job in CI, if set digest isn't used
@ -421,7 +421,6 @@ class CommonJobConfigs:
),
run_command='functional_test_check.py "$CHECK_NAME"',
runner_type=Runners.FUNC_TESTER,
timeout=9000,
)
STATEFUL_TEST = JobConfig(
job_name_keyword="stateful",
@ -466,6 +465,7 @@ class CommonJobConfigs:
),
run_command="upgrade_check.py",
runner_type=Runners.STRESS_TESTER,
timeout=3600,
)
INTEGRATION_TEST = JobConfig(
job_name_keyword="integration",

View File

@ -5,10 +5,11 @@ import csv
import logging
import os
import re
import signal
import subprocess
import sys
from pathlib import Path
from typing import List, Tuple
from typing import List, Tuple, Optional
from build_download_helper import download_all_deb_packages
from clickhouse_helper import CiLogsCredentials
@ -25,11 +26,12 @@ from report import (
TestResults,
read_test_results,
FAILURE,
TestResult,
)
from stopwatch import Stopwatch
from tee_popen import TeePopen
from ci_config import CI
from ci_utils import Utils
from ci_utils import Utils, Shell
NO_CHANGES_MSG = "Nothing to run"
@ -113,10 +115,6 @@ def get_run_command(
if flaky_check:
envs.append("-e NUM_TRIES=50")
envs.append("-e MAX_RUN_TIME=2800")
else:
max_run_time = os.getenv("MAX_RUN_TIME", "0")
envs.append(f"-e MAX_RUN_TIME={max_run_time}")
envs += [f"-e {e}" for e in additional_envs]
@ -128,7 +126,7 @@ def get_run_command(
)
return (
f"docker run --volume={builds_path}:/package_folder "
f"docker run --rm --name func-tester --volume={builds_path}:/package_folder "
# For dmesg and sysctl
"--privileged "
f"{ci_logs_args}"
@ -198,7 +196,7 @@ def process_results(
state, description = status[0][0], status[0][1]
if ret_code != 0:
state = ERROR
description += " (but script exited with an error)"
description = f"Job failed, exit code: {ret_code}. " + description
try:
results_path = result_directory / "test_results.tsv"
@ -240,7 +238,19 @@ def parse_args():
return parser.parse_args()
test_process = None # type: Optional[TeePopen]
timeout_expired = False
def handle_sigterm(signum, _frame):
print(f"WARNING: Received signal {signum}")
global timeout_expired
timeout_expired = True
Shell.check(f"docker exec func-tester pkill -f clickhouse-test", verbose=True)
def main():
signal.signal(signal.SIGTERM, handle_sigterm)
logging.basicConfig(level=logging.INFO)
for handler in logging.root.handlers:
# pylint: disable=protected-access
@ -328,11 +338,13 @@ def main():
logging.info("Going to run func tests: %s", run_command)
with TeePopen(run_command, run_log_path) as process:
global test_process
test_process = process
retcode = process.wait()
if retcode == 0:
logging.info("Run successfully")
else:
logging.info("Run failed")
logging.info("Run failed, exit code %s", retcode)
try:
subprocess.check_call(
@ -348,6 +360,13 @@ def main():
state, description, test_results, additional_logs = process_results(
retcode, result_path, server_log_path
)
if timeout_expired:
description = "Timeout expired"
state = FAILURE
test_results.insert(
0, TestResult.create_check_timeout_expired(stopwatch.duration_seconds)
)
else:
print(
"This is validate bugfix or flaky check run, but no changes test to run - skip with success"

View File

@ -9,6 +9,7 @@ import random
import re
import shlex
import shutil
import signal
import string
import subprocess
import sys
@ -16,11 +17,13 @@ import time
import zlib # for crc32
from collections import defaultdict
from itertools import chain
from typing import Any, Dict
from typing import Any, Dict, Optional
from env_helper import IS_CI
from integration_test_images import IMAGES
from tee_popen import TeePopen
from report import JOB_TIMEOUT_TEST_NAME
from stopwatch import Stopwatch
MAX_RETRY = 1
NUM_WORKERS = 5
@ -621,6 +624,9 @@ class ClickhouseIntegrationTestsRunner:
test_data_dirs = {}
for i in range(num_tries):
if timeout_expired:
print("Timeout expired - break test group execution")
break
logging.info("Running test group %s for the %s retry", test_group, i)
clear_ip_tables_and_restart_daemons()
@ -657,6 +663,8 @@ class ClickhouseIntegrationTestsRunner:
logging.info("Executing cmd: %s", cmd)
# ignore retcode, since it meaningful due to pipe to tee
with subprocess.Popen(cmd, shell=True, stderr=log, stdout=log) as proc:
global runner_subprocess
runner_subprocess = proc
proc.wait()
extra_logs_names = [log_basename]
@ -780,6 +788,9 @@ class ClickhouseIntegrationTestsRunner:
logs = []
tries_num = 1 if should_fail else FLAKY_TRIES_COUNT
for i in range(tries_num):
if timeout_expired:
print("Timeout expired - break flaky check execution")
break
final_retry += 1
logging.info("Running tests for the %s time", i)
counters, tests_times, log_paths = self.try_run_test_group(
@ -839,6 +850,7 @@ class ClickhouseIntegrationTestsRunner:
return result_state, status_text, test_result, logs
def run_impl(self, repo_path, build_path):
stopwatch = Stopwatch()
if self.flaky_check or self.bugfix_validate_check:
return self.run_flaky_check(
repo_path, build_path, should_fail=self.bugfix_validate_check
@ -921,6 +933,9 @@ class ClickhouseIntegrationTestsRunner:
random.shuffle(items_to_run)
for group, tests in items_to_run:
if timeout_expired:
print("Timeout expired - break tests execution")
break
logging.info("Running test group %s containing %s tests", group, len(tests))
group_counters, group_test_times, log_paths = self.try_run_test_group(
repo_path, group, tests, MAX_RETRY, NUM_WORKERS, 0
@ -981,6 +996,17 @@ class ClickhouseIntegrationTestsRunner:
status_text = "Timeout, " + status_text
result_state = "failure"
if timeout_expired:
logging.error(
"Job killed by external timeout signal - setting status to failure!"
)
status_text = "Job timeout expired, " + status_text
result_state = "failure"
# add mock test case to make timeout visible in job report and in ci db
test_result.insert(
0, (JOB_TIMEOUT_TEST_NAME, "FAIL", f"{stopwatch.duration_seconds}", "")
)
if not counters or sum(len(counter) for counter in counters.values()) == 0:
status_text = "No tests found for some reason! It's a bug"
result_state = "failure"
@ -1001,6 +1027,7 @@ def write_results(results_file, status_file, results, status):
def run():
signal.signal(signal.SIGTERM, handle_sigterm)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
repo_path = os.environ.get("CLICKHOUSE_TESTS_REPO_PATH")
@ -1035,5 +1062,17 @@ def run():
logging.info("Result written")
timeout_expired = False
runner_subprocess = None # type:Optional[subprocess.Popen]
def handle_sigterm(signum, _frame):
print(f"WARNING: Received signal {signum}")
global timeout_expired
timeout_expired = True
if runner_subprocess:
runner_subprocess.send_signal(signal.SIGTERM)
if __name__ == "__main__":
run()

View File

@ -249,6 +249,7 @@ JOB_REPORT_FILE = Path(GITHUB_WORKSPACE) / "job_report.json"
JOB_STARTED_TEST_NAME = "STARTED"
JOB_FINISHED_TEST_NAME = "COMPLETED"
JOB_TIMEOUT_TEST_NAME = "Job Timeout Expired"
@dataclass
@ -277,8 +278,8 @@ class TestResult:
self.log_files.append(log_path)
@staticmethod
def create_check_timeout_expired(timeout: float) -> "TestResult":
return TestResult("Check timeout expired", "FAIL", timeout)
def create_check_timeout_expired(duration: Optional[float] = None) -> "TestResult":
return TestResult(JOB_TIMEOUT_TEST_NAME, "FAIL", time=duration)
TestResults = List[TestResult]
@ -303,7 +304,7 @@ class JobReport:
# indicates that this is not real job report but report for the job that was skipped by rerun check
job_skipped: bool = False
# indicates that report generated by CI script in order to check later if job was killed before real report is generated
pre_report: bool = False
dummy: bool = False
exit_code: int = -1
@staticmethod
@ -311,7 +312,7 @@ class JobReport:
return datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
@classmethod
def create_pre_report(cls, status: str, job_skipped: bool) -> "JobReport":
def create_dummy(cls, status: str, job_skipped: bool) -> "JobReport":
return JobReport(
status=status,
description="",
@ -320,7 +321,7 @@ class JobReport:
duration=0.0,
additional_files=[],
job_skipped=job_skipped,
pre_report=True,
dummy=True,
)
def update_duration(self):
@ -741,10 +742,21 @@ def create_test_html_report(
has_test_time = any(tr.time is not None for tr in test_results)
has_log_urls = False
# Display entires with logs at the top (they correspond to failed tests)
test_results.sort(
key=lambda result: result.raw_logs is None and result.log_files is None
)
def sort_key(status):
if "fail" in status.lower():
return 0
elif "error" in status.lower():
return 1
elif "not" in status.lower():
return 2
elif "ok" in status.lower():
return 10
elif "success" in status.lower():
return 9
else:
return 5
test_results.sort(key=lambda result: sort_key(result.status))
for test_result in test_results:
colspan = 0

View File

@ -2,6 +2,7 @@
import logging
import os
import signal
import sys
from io import TextIOWrapper
from pathlib import Path
@ -30,20 +31,34 @@ class TeePopen:
self._process = None # type: Optional[Popen]
self.timeout = timeout
self.timeout_exceeded = False
self.terminated_by_sigterm = False
self.terminated_by_sigkill = False
def _check_timeout(self) -> None:
if self.timeout is None:
return
sleep(self.timeout)
logging.warning(
"Timeout exceeded. Send SIGTERM to process %s, timeout %s",
self.process.pid,
self.timeout,
)
self.send_signal(signal.SIGTERM)
time_wait = 0
self.terminated_by_sigterm = True
self.timeout_exceeded = True
while self.process.poll() is None and time_wait < 100:
print("wait...")
wait = 5
sleep(wait)
time_wait += wait
while self.process.poll() is None:
logging.warning(
"Killing process %s, timeout %s exceeded",
self.process.pid,
self.timeout,
logging.error(
"Process is still running. Send SIGKILL",
)
os.killpg(self.process.pid, 9)
sleep(10)
self.send_signal(signal.SIGKILL)
self.terminated_by_sigkill = True
sleep(5)
def __enter__(self) -> "TeePopen":
self.process = Popen(
@ -57,6 +72,8 @@ class TeePopen:
bufsize=1,
errors="backslashreplace",
)
sleep(1)
print(f"Subprocess started, pid [{self.process.pid}]")
if self.timeout is not None and self.timeout > 0:
t = Thread(target=self._check_timeout)
t.daemon = True # does not block the program from exit
@ -85,6 +102,12 @@ class TeePopen:
return self.process.wait()
def poll(self):
return self.process.poll()
def send_signal(self, signal_num):
os.killpg(self.process.pid, signal_num)
@property
def process(self) -> Popen:
if self._process is not None:

View File

@ -2572,12 +2572,12 @@ def do_run_tests(jobs, test_suite: TestSuite):
try:
clickhouse_execute(
args,
query="SELECT 1 /*hang up check*/",
max_http_retries=5,
timeout=20,
query="SELECT 1 /*hung check*/",
max_http_retries=20,
timeout=10,
)
except Exception:
print("Hang up check failed")
print("Hung check failed")
server_died.set()
if server_died.is_set():

View File

@ -27,6 +27,7 @@
<slru_size_ratio>0.3</slru_size_ratio>
<keep_free_space_size_ratio>0.15</keep_free_space_size_ratio>
<keep_free_space_elements_ratio>0.15</keep_free_space_elements_ratio>
<load_metadata_asynchronously>0</load_metadata_asynchronously>
</s3_cache>
<s3_cache_02933>
<type>cache</type>
@ -37,6 +38,7 @@
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
<background_download_threads>0</background_download_threads>
<background_download_queue_size_limit>0</background_download_queue_size_limit>
<load_metadata_asynchronously>0</load_metadata_asynchronously>
</s3_cache_02933>
<!-- local disks -->
<local_disk>

View File

@ -19,6 +19,7 @@
<boundary_alignment>10</boundary_alignment>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
<cache_on_write_operations>0</cache_on_write_operations>
<load_metadata_asynchronously>0</load_metadata_asynchronously>
</s3_cache_02944>
</disks>
</storage_configuration>

View File

@ -1,6 +1,7 @@
[
"test_dns_cache/test.py::test_dns_cache_update",
"test_dns_cache/test.py::test_ip_change_drop_dns_cache",
"test_dns_cache/test.py::test_dns_resolver_filter",
"test_dns_cache/test.py::test_ip_change_update_dns_cache",
"test_dns_cache/test.py::test_user_access_ip_change[node0]",
"test_dns_cache/test.py::test_user_access_ip_change[node1]",

View File

@ -317,3 +317,74 @@ def test_host_is_drop_from_cache_after_consecutive_failures(
assert node4.wait_for_log_line(
"Cached hosts dropped:.*InvalidHostThatDoesNotExist.*"
)
node7 = cluster.add_instance(
"node7",
main_configs=["configs/listen_host.xml", "configs/dns_update_long.xml"],
with_zookeeper=True,
ipv6_address="2001:3984:3989::1:1117",
ipv4_address="10.5.95.17",
)
def _render_filter_config(allow_ipv4, allow_ipv6):
config = f"""
<clickhouse>
<dns_allow_resolve_names_to_ipv4>{int(allow_ipv4)}</dns_allow_resolve_names_to_ipv4>
<dns_allow_resolve_names_to_ipv6>{int(allow_ipv6)}</dns_allow_resolve_names_to_ipv6>
</clickhouse>
"""
return config
@pytest.mark.parametrize(
"allow_ipv4, allow_ipv6",
[
(True, False),
(False, True),
(False, False),
],
)
def test_dns_resolver_filter(cluster_without_dns_cache_update, allow_ipv4, allow_ipv6):
node = node7
host_ipv6 = node.ipv6_address
host_ipv4 = node.ipv4_address
node.set_hosts(
[
(host_ipv6, "test_host"),
(host_ipv4, "test_host"),
]
)
node.replace_config(
"/etc/clickhouse-server/config.d/dns_filter.xml",
_render_filter_config(allow_ipv4, allow_ipv6),
)
node.query("SYSTEM RELOAD CONFIG")
node.query("SYSTEM DROP DNS CACHE")
node.query("SYSTEM DROP CONNECTIONS CACHE")
if not allow_ipv4 and not allow_ipv6:
with pytest.raises(QueryRuntimeException):
node.query("SELECT * FROM remote('lost_host', 'system', 'one')")
else:
node.query("SELECT * FROM remote('test_host', system, one)")
assert (
node.query(
"SELECT ip_address FROM system.dns_cache WHERE hostname='test_host'"
)
== f"{host_ipv4 if allow_ipv4 else host_ipv6}\n"
)
node.exec_in_container(
[
"bash",
"-c",
"rm /etc/clickhouse-server/config.d/dns_filter.xml",
],
privileged=True,
user="root",
)
node.query("SYSTEM RELOAD CONFIG")

View File

@ -1,6 +1,7 @@
import logging
import time
import os
import random
import pytest
from helpers.cluster import ClickHouseCluster
@ -30,14 +31,6 @@ def cluster():
"config.d/storage_conf_2.xml",
],
)
cluster.add_instance(
"node_no_filesystem_caches_path",
main_configs=[
"config.d/storage_conf.xml",
"config.d/remove_filesystem_caches_path.xml",
],
stay_alive=True,
)
cluster.add_instance(
"node_force_read_through_cache_on_merge",
main_configs=[
@ -59,6 +52,51 @@ def cluster():
cluster.shutdown()
@pytest.fixture(scope="function")
def non_shared_cluster():
"""
For tests that cannot run in parallel against the same node/cluster (see test_custom_cached_disk, which relies on
changing server settings at runtime)
"""
try:
# Randomize the cluster name
cluster = ClickHouseCluster(f"{__file__}_non_shared_{random.randint(0, 10**7)}")
cluster.add_instance(
"node_no_filesystem_caches_path",
main_configs=[
"config.d/storage_conf.xml",
"config.d/remove_filesystem_caches_path.xml",
],
stay_alive=True,
)
logging.info("Starting test-exclusive cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def wait_for_cache_initialized(node, cache_path, max_attempts=50):
initialized = False
attempts = 0
while not initialized:
query_result = node.query(
"SELECT path FROM system.filesystem_cache_settings WHERE is_initialized"
)
initialized = cache_path in query_result
if initialized:
break
time.sleep(0.1)
attempts += 1
if attempts >= max_attempts:
raise "Stopped waiting for cache to be initialized"
@pytest.mark.parametrize("node_name", ["node"])
def test_parallel_cache_loading_on_startup(cluster, node_name):
node = cluster.instances[node_name]
@ -71,14 +109,21 @@ def test_parallel_cache_loading_on_startup(cluster, node_name):
ORDER BY value
SETTINGS disk = disk(
type = cache,
path = 'paralel_loading_test',
name = 'parallel_loading_test',
path = 'parallel_loading_test',
disk = 'hdd_blob',
max_file_segment_size = '1Ki',
boundary_alignment = '1Ki',
max_size = '1Gi',
max_elements = 10000000,
load_metadata_threads = 30);
"""
)
wait_for_cache_initialized(node, "parallel_loading_test")
node.query(
"""
SYSTEM DROP FILESYSTEM CACHE;
INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000;
SELECT * FROM test FORMAT Null;
@ -103,6 +148,7 @@ def test_parallel_cache_loading_on_startup(cluster, node_name):
)
node.restart_clickhouse()
wait_for_cache_initialized(node, "parallel_loading_test")
# < because of additional files loaded into cache on server startup.
assert cache_count <= int(node.query("SELECT count() FROM system.filesystem_cache"))
@ -131,7 +177,7 @@ def test_caches_with_the_same_configuration(cluster, node_name):
node = cluster.instances[node_name]
cache_path = "cache1"
node.query(f"SYSTEM DROP FILESYSTEM CACHE;")
node.query("SYSTEM DROP FILESYSTEM CACHE;")
for table in ["test", "test2"]:
node.query(
f"""
@ -142,14 +188,20 @@ def test_caches_with_the_same_configuration(cluster, node_name):
ORDER BY value
SETTINGS disk = disk(
type = cache,
name = {table},
name = '{table}',
path = '{cache_path}',
disk = 'hdd_blob',
max_file_segment_size = '1Ki',
boundary_alignment = '1Ki',
cache_on_write_operations=1,
max_size = '1Mi');
"""
)
wait_for_cache_initialized(node, cache_path)
node.query(
f"""
SET enable_filesystem_cache_on_write_operations=1;
INSERT INTO {table} SELECT * FROM generateRandom('a Int32, b String')
LIMIT 1000;
@ -195,9 +247,8 @@ def test_caches_with_the_same_configuration(cluster, node_name):
@pytest.mark.parametrize("node_name", ["node_caches_with_same_path"])
def test_caches_with_the_same_configuration_2(cluster, node_name):
node = cluster.instances[node_name]
cache_path = "cache1"
node.query(f"SYSTEM DROP FILESYSTEM CACHE;")
node.query("SYSTEM DROP FILESYSTEM CACHE;")
for table in ["cache1", "cache2"]:
node.query(
f"""
@ -207,7 +258,13 @@ def test_caches_with_the_same_configuration_2(cluster, node_name):
Engine=MergeTree()
ORDER BY value
SETTINGS disk = '{table}';
"""
)
wait_for_cache_initialized(node, "cache1")
node.query(
f"""
SET enable_filesystem_cache_on_write_operations=1;
INSERT INTO {table} SELECT * FROM generateRandom('a Int32, b String')
LIMIT 1000;
@ -227,8 +284,8 @@ def test_caches_with_the_same_configuration_2(cluster, node_name):
)
def test_custom_cached_disk(cluster):
node = cluster.instances["node_no_filesystem_caches_path"]
def test_custom_cached_disk(non_shared_cluster):
node = non_shared_cluster.instances["node_no_filesystem_caches_path"]
assert "Cannot create cached custom disk without" in node.query_and_get_error(
f"""
@ -377,6 +434,7 @@ def test_force_filesystem_cache_on_merges(cluster):
ORDER BY value
SETTINGS disk = disk(
type = cache,
name = 'force_cache_on_merges',
path = 'force_cache_on_merges',
disk = 'hdd_blob',
max_file_segment_size = '1Ki',
@ -385,7 +443,13 @@ def test_force_filesystem_cache_on_merges(cluster):
max_size = '10Gi',
max_elements = 10000000,
load_metadata_threads = 30);
"""
)
wait_for_cache_initialized(node, "force_cache_on_merges")
node.query(
"""
SYSTEM DROP FILESYSTEM CACHE;
INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000;
INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000;
@ -441,7 +505,13 @@ SETTINGS disk = disk(type = cache,
path = "test_system_sync_filesystem_cache",
delayed_cleanup_interval_ms = 10000000, disk = hdd_blob),
min_bytes_for_wide_part = 10485760;
"""
)
wait_for_cache_initialized(node, "test_system_sync_filesystem_cache")
node.query(
"""
INSERT INTO test SELECT 1, 'test';
"""
)
@ -525,7 +595,13 @@ SETTINGS disk = disk(type = cache,
keep_free_space_elements_ratio = {elements_ratio},
disk = hdd_blob),
min_bytes_for_wide_part = 10485760;
"""
)
wait_for_cache_initialized(node, "test_keep_up_size_ratio")
node.query(
"""
INSERT INTO test SELECT randomString(200);
"""
)

View File

@ -1,2 +1,2 @@
1
102400 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/02344_describe_cache_test 0 5000 0 16
102400 10000000 33554432 1 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/02344_describe_cache_test 0 5000 0 16

View File

@ -11,7 +11,7 @@ $CLICKHOUSE_CLIENT -nm --query """
DROP TABLE IF EXISTS test;
CREATE TABLE test (a Int32, b String)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(name = '$disk_name', type = cache, max_size = '100Ki', path = '$disk_name', disk = 's3_disk');
SETTINGS disk = disk(name = '$disk_name', type = cache, max_size = '100Ki', path = '$disk_name', disk = 's3_disk', load_metadata_asynchronously = 0);
"""
$CLICKHOUSE_CLIENT -nm --query """

View File

@ -1,2 +1,2 @@
1048576 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection_sql 0 5000 0 16
1048576 10000000 33554432 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection 0 5000 0 16
1048576 10000000 33554432 1 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection_sql 0 5000 0 16
1048576 10000000 33554432 1 4194304 0 0 0 0 /var/lib/clickhouse/filesystem_caches/collection 0 5000 0 16

View File

@ -3,8 +3,8 @@
CREATE NAMED COLLECTION IF NOT EXISTS cache_collection_sql AS path = 'collection_sql', max_size = '1Mi';
DROP TABLE IF EXISTS test;
CREATE TABLE test (a Int32, b String)
ENGINE = MergeTree() ORDER BY a SETTINGS disk = disk(type = cache, disk = 'local_disk', name = '$CLICHOUSE_TEST_UNIQUE_NAME', cache_name='cache_collection_sql');
ENGINE = MergeTree() ORDER BY a SETTINGS disk = disk(type = cache, disk = 'local_disk', name = '$CLICHOUSE_TEST_UNIQUE_NAME', cache_name='cache_collection_sql', load_metadata_asynchronously = 0);
DESCRIBE FILESYSTEM CACHE '$CLICHOUSE_TEST_UNIQUE_NAME';
CREATE TABLE test2 (a Int32, b String)
ENGINE = MergeTree() ORDER BY a SETTINGS disk = disk(type = cache, disk = 'local_disk', name = '$CLICHOUSE_TEST_UNIQUE_NAME_2', cache_name='cache_collection');
ENGINE = MergeTree() ORDER BY a SETTINGS disk = disk(type = cache, disk = 'local_disk', name = '$CLICHOUSE_TEST_UNIQUE_NAME_2', cache_name='cache_collection', load_metadata_asynchronously = 0);
DESCRIBE FILESYSTEM CACHE '$CLICHOUSE_TEST_UNIQUE_NAME_2';

View File

@ -1,7 +1,7 @@
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 0 0 16
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 10 1000 0 16
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 5 1000 0 16
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 15 1000 0 16
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 2 1000 0 16
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 1000 0 16
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 0 0 16
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 0 0 16
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 10 1000 0 16
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 5 1000 0 16
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 15 1000 0 16
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 2 1000 0 16
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 1000 0 16
134217728 10000000 33554432 1 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 0 0 16

View File

@ -1,20 +1,20 @@
100 10 10 10 0 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
100 10 10 1 10 0 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
0
10
98
set max_size from 100 to 10
10 10 10 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
10 10 10 1 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
1
8
set max_size from 10 to 100
100 10 10 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
100 10 10 1 10 0 0 8 1 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
10
98
set max_elements from 10 to 2
100 2 10 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
100 2 10 1 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
2
18
set max_elements from 2 to 10
100 10 10 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
100 10 10 1 10 0 0 18 2 /var/lib/clickhouse/filesystem_caches/s3_cache_02944/ 0 5000 0 16
10
98