Merge branch 'ClickHouse:master' into fix_functionSQLJSON

This commit is contained in:
Yarik Briukhovetskyi 2024-08-19 23:49:27 +02:00 committed by GitHub
commit 9d46bb43bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 579 additions and 605 deletions

View File

@ -482,7 +482,7 @@ jobs:
if: ${{ !failure() }}
run: |
# update overall ci report
python3 finish_check.py --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }}
python3 ./tests/ci/finish_check.py --wf-status ${{ contains(needs.*.result, 'failure') && 'failure' || 'success' }}
- name: Check Workflow results
if: ${{ !cancelled() }}
run: |
@ -490,5 +490,4 @@ jobs:
cat > "$WORKFLOW_RESULT_FILE" << 'EOF'
${{ toJson(needs) }}
EOF
python3 ./tests/ci/ci_buddy.py --check-wf-status

View File

@ -256,22 +256,6 @@ function configure
rm -f "$FASTTEST_DATA/config.d/secure_ports.xml"
}
function timeout_with_logging() {
local exit_code=0
timeout -s TERM --preserve-status "${@}" || exit_code="${?}"
echo "Checking if it is a timeout. The code 124 will indicate a timeout."
if [[ "${exit_code}" -eq "124" ]]
then
echo "The command 'timeout ${*}' has been killed by timeout."
else
echo "No, it isn't a timeout."
fi
return $exit_code
}
function run_tests
{
clickhouse-server --version
@ -340,7 +324,7 @@ case "$stage" in
configure 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee "$FASTTEST_OUTPUT/install_log.txt"
;&
"run_tests")
timeout_with_logging 35m bash -c run_tests ||:
run_tests ||:
/process_functional_tests_result.py --in-results-dir "$FASTTEST_OUTPUT/" \
--out-results-file "$FASTTEST_OUTPUT/test_results.tsv" \
--out-status-file "$FASTTEST_OUTPUT/check_status.tsv" || echo -e "failure\tCannot parse results" > "$FASTTEST_OUTPUT/check_status.tsv"

View File

@ -35,7 +35,6 @@ RUN mkdir -p /tmp/clickhouse-odbc-tmp \
ENV TZ=Europe/Amsterdam
ENV MAX_RUN_TIME=9000
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ARG sqllogic_test_repo="https://github.com/gregrahn/sqllogictest.git"

View File

@ -94,7 +94,7 @@ function run_tests()
export -f run_tests
timeout "${MAX_RUN_TIME:-9000}" bash -c run_tests || echo "timeout reached" >&2
run_tests
#/process_functional_tests_result.py || echo -e "failure\tCannot parse results" > /test_output/check_status.tsv

View File

@ -22,7 +22,6 @@ ARG sqltest_repo="https://github.com/elliotchance/sqltest/"
RUN git clone ${sqltest_repo}
ENV TZ=UTC
ENV MAX_RUN_TIME=900
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
COPY run.sh /

View File

@ -4,9 +4,6 @@
source /setup_export_logs.sh
set -e -x
MAX_RUN_TIME=${MAX_RUN_TIME:-3600}
MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 3600 : MAX_RUN_TIME))
# Choose random timezone for this test run
TZ="$(rg -v '#' /usr/share/zoneinfo/zone.tab | awk '{print $3}' | shuf | head -n1)"
echo "Choosen random timezone $TZ"
@ -123,9 +120,6 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
clickhouse-client --query "DROP TABLE datasets.hits_v1"
clickhouse-client --query "DROP TABLE datasets.visits_v1"
MAX_RUN_TIME=$((MAX_RUN_TIME < 9000 ? MAX_RUN_TIME : 9000)) # min(MAX_RUN_TIME, 2.5 hours)
MAX_RUN_TIME=$((MAX_RUN_TIME != 0 ? MAX_RUN_TIME : 9000)) # set to 2.5 hours if 0 (unlimited)
else
clickhouse-client --query "CREATE DATABASE test"
clickhouse-client --query "SHOW TABLES FROM test"
@ -258,24 +252,7 @@ function run_tests()
export -f run_tests
function timeout_with_logging() {
local exit_code=0
timeout -s TERM --preserve-status "${@}" || exit_code="${?}"
echo "Checking if it is a timeout. The code 124 will indicate a timeout."
if [[ "${exit_code}" -eq "124" ]]
then
echo "The command 'timeout ${*}' has been killed by timeout."
else
echo "No, it isn't a timeout."
fi
return $exit_code
}
TIMEOUT=$((MAX_RUN_TIME - 700))
timeout_with_logging "$TIMEOUT" bash -c run_tests ||:
run_tests ||:
echo "Files in current directory"
ls -la ./

View File

@ -65,7 +65,6 @@ ENV TZ=Europe/Amsterdam
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ENV NUM_TRIES=1
ENV MAX_RUN_TIME=0
# Unrelated to vars in setup_minio.sh, but should be the same there
# to have the same binaries for local running scenario

View File

@ -12,9 +12,6 @@ dmesg --clear
# fail on errors, verbose and export all env variables
set -e -x -a
MAX_RUN_TIME=${MAX_RUN_TIME:-9000}
MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 9000 : MAX_RUN_TIME))
USE_DATABASE_REPLICATED=${USE_DATABASE_REPLICATED:=0}
USE_SHARED_CATALOG=${USE_SHARED_CATALOG:=0}
@ -308,8 +305,6 @@ function run_tests()
try_run_with_retry 10 clickhouse-client -q "insert into system.zookeeper (name, path, value) values ('auxiliary_zookeeper2', '/test/chroot/', '')"
TIMEOUT=$((MAX_RUN_TIME - 800 > 8400 ? 8400 : MAX_RUN_TIME - 800))
START_TIME=${SECONDS}
set +e
TEST_ARGS=(
@ -324,32 +319,22 @@ function run_tests()
--test-runs "$NUM_TRIES"
"${ADDITIONAL_OPTIONS[@]}"
)
timeout --preserve-status --signal TERM --kill-after 60m ${TIMEOUT}s clickhouse-test "${TEST_ARGS[@]}" 2>&1 \
clickhouse-test "${TEST_ARGS[@]}" 2>&1 \
| ts '%Y-%m-%d %H:%M:%S' \
| tee -a test_output/test_result.txt
set -e
DURATION=$((SECONDS - START_TIME))
echo "Elapsed ${DURATION} seconds."
if [[ $DURATION -ge $TIMEOUT ]]
then
echo "It looks like the command is terminated by the timeout, which is ${TIMEOUT} seconds."
fi
}
export -f run_tests
# This should be enough to setup job and collect artifacts
TIMEOUT=$((MAX_RUN_TIME - 700))
if [ "$NUM_TRIES" -gt "1" ]; then
# We don't run tests with Ordinary database in PRs, only in master.
# So run new/changed tests with Ordinary at least once in flaky check.
timeout_with_logging "$TIMEOUT" bash -c 'NUM_TRIES=1; USE_DATABASE_ORDINARY=1; run_tests' \
NUM_TRIES=1; USE_DATABASE_ORDINARY=1; run_tests \
| sed 's/All tests have finished/Redacted: a message about tests finish is deleted/' | sed 's/No tests were run/Redacted: a message about no tests run is deleted/' ||:
fi
timeout_with_logging "$TIMEOUT" bash -c run_tests ||:
run_tests ||:
echo "Files in current directory"
ls -la ./

View File

@ -40,22 +40,6 @@ function fn_exists() {
declare -F "$1" > /dev/null;
}
function timeout_with_logging() {
local exit_code=0
timeout -s TERM --preserve-status "${@}" || exit_code="${?}"
echo "Checking if it is a timeout. The code 124 will indicate a timeout."
if [[ "${exit_code}" -eq "124" ]]
then
echo "The command 'timeout ${*}' has been killed by timeout."
else
echo "No, it isn't a timeout."
fi
return $exit_code
}
function collect_core_dumps()
{
find . -type f -maxdepth 1 -name 'core.*' | while read -r core; do

View File

@ -1381,7 +1381,7 @@ Default value: `2`.
Close connection before returning connection to the pool.
Default value: true.
Default value: false.
## odbc_bridge_connection_pool_size {#odbc-bridge-connection-pool-size}

View File

@ -1769,6 +1769,8 @@ try
new_server_settings.http_connections_store_limit,
});
DNSResolver::instance().setFilterSettings(new_server_settings.dns_allow_resolve_names_to_ipv4, new_server_settings.dns_allow_resolve_names_to_ipv6);
if (global_context->isServerCompletelyStarted())
CannotAllocateThreadFaultInjector::setFaultProbability(new_server_settings.cannot_allocate_thread_fault_injection_probability);

View File

@ -114,7 +114,7 @@ private:
{
if (ind < first.size())
return first[ind];
return second[ind % first.size()];
return second[ind - first.size()];
}
size_t size() const

View File

@ -12,6 +12,7 @@
#include <atomic>
#include <optional>
#include <string_view>
#include "Common/MultiVersion.h"
#include <unordered_set>
#include "DNSPTRResolverProvider.h"
@ -139,12 +140,6 @@ DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host)
return addresses;
}
DNSResolver::IPAddresses resolveIPAddressWithCache(CacheBase<std::string, DNSResolver::CacheEntry> & cache, const std::string & host)
{
auto [result, _ ] = cache.getOrSet(host, [&host]() {return std::make_shared<DNSResolver::CacheEntry>(resolveIPAddressImpl(host), std::chrono::system_clock::now());});
return result->addresses;
}
std::unordered_set<String> reverseResolveImpl(const Poco::Net::IPAddress & address)
{
auto ptr_resolver = DB::DNSPTRResolverProvider::get();
@ -198,21 +193,89 @@ struct DNSResolver::Impl
std::atomic<bool> disable_cache{false};
};
struct DNSResolver::AddressFilter
{
struct DNSFilterSettings
{
bool dns_allow_resolve_names_to_ipv4{true};
bool dns_allow_resolve_names_to_ipv6{true};
};
DNSResolver::DNSResolver() : impl(std::make_unique<DNSResolver::Impl>()), log(getLogger("DNSResolver")) {}
AddressFilter() : settings(std::make_unique<DNSFilterSettings>()) {}
void performAddressFiltering(DNSResolver::IPAddresses & addresses) const
{
const auto current_settings = settings.get();
bool dns_resolve_ipv4 = current_settings->dns_allow_resolve_names_to_ipv4;
bool dns_resolve_ipv6 = current_settings->dns_allow_resolve_names_to_ipv6;
if (dns_resolve_ipv4 && dns_resolve_ipv6)
{
return;
}
if (!dns_resolve_ipv4 && !dns_resolve_ipv6)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "DNS can't resolve any address, because dns_resolve_ipv6_interfaces and dns_resolve_ipv4_interfaces both are disabled");
}
std::erase_if(addresses, [dns_resolve_ipv6, dns_resolve_ipv4](const Poco::Net::IPAddress& address)
{
return (address.family() == Poco::Net::IPAddress::IPv6 && !dns_resolve_ipv6)
|| (address.family() == Poco::Net::IPAddress::IPv4 && !dns_resolve_ipv4);
});
}
void setSettings(bool dns_allow_resolve_names_to_ipv4, bool dns_allow_resolve_names_to_ipv6)
{
settings.set(std::make_unique<DNSFilterSettings>(dns_allow_resolve_names_to_ipv4, dns_allow_resolve_names_to_ipv6));
}
MultiVersion<DNSFilterSettings> settings;
};
DNSResolver::DNSResolver()
: impl(std::make_unique<DNSResolver::Impl>())
, addressFilter(std::make_unique<DNSResolver::AddressFilter>())
, log(getLogger("DNSResolver")) {}
DNSResolver::IPAddresses DNSResolver::getResolvedIPAdressessWithFiltering(const std::string & host)
{
auto addresses = resolveIPAddressImpl(host);
addressFilter->performAddressFiltering(addresses);
if (addresses.empty())
{
ProfileEvents::increment(ProfileEvents::DNSError);
throw DB::NetException(ErrorCodes::DNS_ERROR, "After filtering there are no resolved address for host({}).", host);
}
return addresses;
}
DNSResolver::IPAddresses DNSResolver::resolveIPAddressWithCache(const std::string & host)
{
auto [result, _ ] = impl->cache_host.getOrSet(host, [&host, this]() {return std::make_shared<DNSResolver::CacheEntry>(getResolvedIPAdressessWithFiltering(host), std::chrono::system_clock::now());});
return result->addresses;
}
Poco::Net::IPAddress DNSResolver::resolveHost(const std::string & host)
{
return pickAddress(resolveHostAll(host)); // random order -> random pick
}
void DNSResolver::setFilterSettings(bool dns_allow_resolve_names_to_ipv4, bool dns_allow_resolve_names_to_ipv6)
{
addressFilter->setSettings(dns_allow_resolve_names_to_ipv4, dns_allow_resolve_names_to_ipv6);
}
DNSResolver::IPAddresses DNSResolver::resolveHostAllInOriginOrder(const std::string & host)
{
if (impl->disable_cache)
return resolveIPAddressImpl(host);
return getResolvedIPAdressessWithFiltering(host);
addToNewHosts(host);
return resolveIPAddressWithCache(impl->cache_host, host);
return resolveIPAddressWithCache(host);
}
DNSResolver::IPAddresses DNSResolver::resolveHostAll(const std::string & host)
@ -232,7 +295,7 @@ Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host_an
splitHostAndPort(host_and_port, host, port);
addToNewHosts(host);
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(impl->cache_host, host)), port);
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(host)), port);
}
Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, UInt16 port)
@ -241,7 +304,7 @@ Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, U
return Poco::Net::SocketAddress(host, port);
addToNewHosts(host);
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(impl->cache_host, host)), port);
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(host)), port);
}
std::vector<Poco::Net::SocketAddress> DNSResolver::resolveAddressList(const std::string & host, UInt16 port)
@ -254,7 +317,7 @@ std::vector<Poco::Net::SocketAddress> DNSResolver::resolveAddressList(const std:
if (!impl->disable_cache)
addToNewHosts(host);
std::vector<Poco::Net::IPAddress> ips = impl->disable_cache ? hostByName(host) : resolveIPAddressWithCache(impl->cache_host, host);
std::vector<Poco::Net::IPAddress> ips = impl->disable_cache ? hostByName(host) : resolveIPAddressWithCache(host);
auto ips_end = std::unique(ips.begin(), ips.end());
addresses.reserve(ips_end - ips.begin());
@ -419,8 +482,8 @@ bool DNSResolver::updateCache(UInt32 max_consecutive_failures)
bool DNSResolver::updateHost(const String & host)
{
const auto old_value = resolveIPAddressWithCache(impl->cache_host, host);
auto new_value = resolveIPAddressImpl(host);
const auto old_value = resolveIPAddressWithCache(host);
auto new_value = getResolvedIPAdressessWithFiltering(host);
const bool result = old_value != new_value;
impl->cache_host.set(host, std::make_shared<DNSResolver::CacheEntry>(std::move(new_value), std::chrono::system_clock::now()));
return result;

View File

@ -68,6 +68,8 @@ public:
/// Returns true if IP of any host has been changed or an element was dropped (too many failures)
bool updateCache(UInt32 max_consecutive_failures);
void setFilterSettings(bool dns_allow_resolve_names_to_ipv4, bool dns_allow_resolve_names_to_ipv6);
/// Returns a copy of cache entries
std::vector<std::pair<std::string, CacheEntry>> cacheEntries() const;
@ -86,6 +88,10 @@ private:
struct Impl;
std::unique_ptr<Impl> impl;
struct AddressFilter;
std::unique_ptr<AddressFilter> addressFilter;
LoggerPtr log;
/// Updates cached value and returns true it has been changed.
@ -94,6 +100,9 @@ private:
void addToNewHosts(const String & host);
void addToNewAddresses(const Poco::Net::IPAddress & address);
IPAddresses resolveIPAddressWithCache(const std::string & host);
IPAddresses getResolvedIPAdressessWithFiltering(const std::string & host);
};
}

View File

@ -23,7 +23,7 @@ namespace postgres
{
PoolWithFailover::PoolWithFailover(
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
const ReplicasConfigurationByPriority & configurations_by_priority,
size_t pool_size,
size_t pool_wait_timeout_,
size_t max_tries_,

View File

@ -8,7 +8,6 @@
#include "ConnectionHolder.h"
#include <mutex>
#include <Poco/Util/AbstractConfiguration.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StoragePostgreSQL.h>
@ -20,12 +19,12 @@ namespace postgres
class PoolWithFailover
{
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
public:
using ReplicasConfigurationByPriority = std::map<size_t, std::vector<DB::StoragePostgreSQL::Configuration>>;
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
PoolWithFailover(
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
const ReplicasConfigurationByPriority & configurations_by_priority,
size_t pool_size,
size_t pool_wait_timeout,
size_t max_tries_,

View File

@ -106,6 +106,8 @@ namespace DB
M(UInt64, dns_cache_max_entries, 10000, "Internal DNS cache max entries.", 0) \
M(Int32, dns_cache_update_period, 15, "Internal DNS cache update period in seconds.", 0) \
M(UInt32, dns_max_consecutive_failures, 10, "Max DNS resolve failures of a hostname before dropping the hostname from ClickHouse DNS cache.", 0) \
M(Bool, dns_allow_resolve_names_to_ipv4, true, "Allows resolve names to ipv4 addresses.", 0) \
M(Bool, dns_allow_resolve_names_to_ipv6, true, "Allows resolve names to ipv6 addresses.", 0) \
\
M(UInt64, max_table_size_to_drop, 50000000000lu, "If size of a table is greater than this value (in bytes) than table could not be dropped with any DROP query.", 0) \
M(UInt64, max_partition_size_to_drop, 50000000000lu, "Same as max_table_size_to_drop, but for the partitions.", 0) \

View File

@ -18,13 +18,15 @@
#include <Core/Settings.h>
#include <IO/Operators.h>
#include "config.h"
#if USE_SIMDJSON
#include <Common/JSONParsers/SimdJSONParser.h>
# include <Common/JSONParsers/SimdJSONParser.h>
#elif USE_RAPIDJSON
# include <Common/JSONParsers/RapidJSONParser.h>
#else
# include <Common/JSONParsers/DummyJSONParser.h>
#endif
#if USE_RAPIDJSON
#include <Common/JSONParsers/RapidJSONParser.h>
#endif
#include <Common/JSONParsers/DummyJSONParser.h>
namespace DB
{
@ -105,7 +107,7 @@ SerializationPtr DataTypeObject::doGetDefaultSerialization() const
switch (schema_format)
{
case SchemaFormat::JSON:
#ifdef USE_SIMDJSON
#if USE_SIMDJSON
return std::make_shared<SerializationJSON<SimdJSONParser>>(
std::move(typed_path_serializations),
paths_to_skip,

View File

@ -8,12 +8,12 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Processors/Formats/IInputFormat.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Poco/Net/HTTPRequest.h>
#include <Common/logger_useful.h>
#include "DictionarySourceFactory.h"
#include "DictionarySourceHelpers.h"
#include "DictionaryStructure.h"
#include <Storages/NamedCollectionsHelpers.h>
#include "registerDictionaries.h"
@ -223,21 +223,23 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
String endpoint;
String format;
auto named_collection = created_from_ddl
? getURLBasedDataSourceConfiguration(config, settings_config_prefix, global_context)
: std::nullopt;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, global_context) : nullptr;
if (named_collection)
{
url = named_collection->configuration.url;
endpoint = named_collection->configuration.endpoint;
format = named_collection->configuration.format;
validateNamedCollection(
*named_collection,
/* required_keys */{},
/* optional_keys */ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>{
"url", "endpoint", "user", "credentials.user", "password", "credentials.password", "format", "compression_method", "structure", "name"});
credentials.setUsername(named_collection->configuration.user);
credentials.setPassword(named_collection->configuration.password);
url = named_collection->getOrDefault<String>("url", "");
endpoint = named_collection->getOrDefault<String>("endpoint", "");
format = named_collection->getOrDefault<String>("format", "");
header_entries.reserve(named_collection->configuration.headers.size());
for (const auto & [key, value] : named_collection->configuration.headers)
header_entries.emplace_back(key, value);
credentials.setUsername(named_collection->getAnyOrDefault<String>({"user", "credentials.user"}, ""));
credentials.setPassword(named_collection->getAnyOrDefault<String>({"password", "credentials.password"}, ""));
header_entries = getHeadersFromNamedCollection(*named_collection);
}
else
{

View File

@ -1,15 +1,12 @@
#include "MongoDBDictionarySource.h"
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StorageMongoDBSocketFactory.h>
#include <Storages/NamedCollectionsHelpers.h>
namespace DB
{
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db", "database", "uri", "collection", "name", "method", "options"};
void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
{
auto create_mongo_db_dictionary = [](
@ -22,35 +19,53 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
bool created_from_ddl)
{
const auto config_prefix = root_config_prefix + ".mongodb";
ExternalDataSourceConfiguration configuration;
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key); };
auto named_collection = getExternalDataSourceConfiguration(config, config_prefix, context, has_config_key);
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, config_prefix, context) : nullptr;
String host, username, password, database, method, options, collection;
UInt16 port;
if (named_collection)
{
configuration = named_collection->configuration;
validateNamedCollection(
*named_collection,
/* required_keys */{"collection"},
/* optional_keys */ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>{
"host", "port", "user", "password", "db", "database", "uri", "name", "method", "options"});
host = named_collection->getOrDefault<String>("host", "");
port = static_cast<UInt16>(named_collection->getOrDefault<UInt64>("port", 0));
username = named_collection->getOrDefault<String>("user", "");
password = named_collection->getOrDefault<String>("password", "");
database = named_collection->getAnyOrDefault<String>({"db", "database"}, "");
method = named_collection->getOrDefault<String>("method", "");
collection = named_collection->getOrDefault<String>("collection", "");
options = named_collection->getOrDefault<String>("options", "");
}
else
{
configuration.host = config.getString(config_prefix + ".host", "");
configuration.port = config.getUInt(config_prefix + ".port", 0);
configuration.username = config.getString(config_prefix + ".user", "");
configuration.password = config.getString(config_prefix + ".password", "");
configuration.database = config.getString(config_prefix + ".db", "");
host = config.getString(config_prefix + ".host", "");
port = config.getUInt(config_prefix + ".port", 0);
username = config.getString(config_prefix + ".user", "");
password = config.getString(config_prefix + ".password", "");
database = config.getString(config_prefix + ".db", "");
method = config.getString(config_prefix + ".method", "");
collection = config.getString(config_prefix + ".collection");
options = config.getString(config_prefix + ".options", "");
}
if (created_from_ddl)
context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
context->getRemoteHostFilter().checkHostAndPort(host, toString(port));
return std::make_unique<MongoDBDictionarySource>(dict_struct,
return std::make_unique<MongoDBDictionarySource>(
dict_struct,
config.getString(config_prefix + ".uri", ""),
configuration.host,
configuration.port,
configuration.username,
configuration.password,
config.getString(config_prefix + ".method", ""),
configuration.database,
config.getString(config_prefix + ".collection"),
config.getString(config_prefix + ".options", ""),
host,
port,
username,
password,
method,
database,
collection,
options,
sample_block);
};

View File

@ -4,6 +4,7 @@
#include <Core/QualifiedTableName.h>
#include <Core/Settings.h>
#include "DictionarySourceFactory.h"
#include <Storages/NamedCollectionsHelpers.h>
#include "registerDictionaries.h"
#if USE_LIBPQXX
@ -13,7 +14,6 @@
#include "readInvalidateQuery.h"
#include <Interpreters/Context.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Common/logger_useful.h>
#endif
@ -24,16 +24,17 @@ namespace DB
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
extern const int BAD_ARGUMENTS;
}
static const ValidateKeysMultiset<ExternalDatabaseEqualKeysSet> dictionary_allowed_keys = {
"host", "port", "user", "password", "db", "database", "table", "schema",
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"};
#if USE_LIBPQXX
static const UInt64 max_block_size = 8192;
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db", "database", "table", "schema",
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"};
namespace
{
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct, const String & schema, const String & table, const String & query, const String & where)
@ -177,6 +178,19 @@ std::string PostgreSQLDictionarySource::toString() const
return "PostgreSQL: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where);
}
static void validateConfigKeys(
const Poco::Util::AbstractConfiguration & dict_config, const String & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
dict_config.keys(config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (dictionary_allowed_keys.contains(config_key) || startsWith(config_key, "replica"))
continue;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected key `{}` in dictionary source configuration", config_key);
}
}
#endif
void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
@ -191,38 +205,117 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
{
#if USE_LIBPQXX
const auto settings_config_prefix = config_prefix + ".postgresql";
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key) || key.starts_with("replica"); };
auto configuration = getExternalDataSourceConfigurationByPriority(config, settings_config_prefix, context, has_config_key);
const auto & settings = context->getSettingsRef();
std::optional<PostgreSQLDictionarySource::Configuration> dictionary_configuration;
postgres::PoolWithFailover::ReplicasConfigurationByPriority replicas_by_priority;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, context) : nullptr;
if (named_collection)
{
validateNamedCollection<ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>>(*named_collection, {}, dictionary_allowed_keys);
StoragePostgreSQL::Configuration common_configuration;
common_configuration.host = named_collection->getOrDefault<String>("host", "");
common_configuration.port = named_collection->getOrDefault<UInt64>("port", 0);
common_configuration.username = named_collection->getOrDefault<String>("user", "");
common_configuration.password = named_collection->getOrDefault<String>("password", "");
common_configuration.database = named_collection->getAnyOrDefault<String>({"database", "db"}, "");
common_configuration.schema = named_collection->getOrDefault<String>("schema", "");
common_configuration.table = named_collection->getOrDefault<String>("table", "");
dictionary_configuration.emplace(PostgreSQLDictionarySource::Configuration{
.db = common_configuration.database,
.schema = common_configuration.schema,
.table = common_configuration.table,
.query = named_collection->getOrDefault<String>("query", ""),
.where = named_collection->getOrDefault<String>("where", ""),
.invalidate_query = named_collection->getOrDefault<String>("invalidate_query", ""),
.update_field = named_collection->getOrDefault<String>("update_field", ""),
.update_lag = named_collection->getOrDefault<UInt64>("update_lag", 1),
});
replicas_by_priority[0].emplace_back(common_configuration);
}
else
{
validateConfigKeys(config, settings_config_prefix);
StoragePostgreSQL::Configuration common_configuration;
common_configuration.host = config.getString(settings_config_prefix + ".host", "");
common_configuration.port = config.getUInt(settings_config_prefix + ".port", 0);
common_configuration.username = config.getString(settings_config_prefix + ".user", "");
common_configuration.password = config.getString(settings_config_prefix + ".password", "");
common_configuration.database = config.getString(fmt::format("{}.database", settings_config_prefix), config.getString(fmt::format("{}.db", settings_config_prefix), ""));
common_configuration.schema = config.getString(fmt::format("{}.schema", settings_config_prefix), "");
common_configuration.table = config.getString(fmt::format("{}.table", settings_config_prefix), "");
dictionary_configuration.emplace(PostgreSQLDictionarySource::Configuration
{
.db = common_configuration.database,
.schema = common_configuration.schema,
.table = common_configuration.table,
.query = config.getString(fmt::format("{}.query", settings_config_prefix), ""),
.where = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
.invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""),
.update_field = config.getString(fmt::format("{}.update_field", settings_config_prefix), ""),
.update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1)
});
if (config.has(settings_config_prefix + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(settings_config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (config_key.starts_with("replica"))
{
String replica_name = settings_config_prefix + "." + config_key;
StoragePostgreSQL::Configuration replica_configuration{common_configuration};
size_t priority = config.getInt(replica_name + ".priority", 0);
replica_configuration.host = config.getString(replica_name + ".host", common_configuration.host);
replica_configuration.port = config.getUInt(replica_name + ".port", common_configuration.port);
replica_configuration.username = config.getString(replica_name + ".user", common_configuration.username);
replica_configuration.password = config.getString(replica_name + ".password", common_configuration.password);
if (replica_configuration.host.empty() || replica_configuration.port == 0
|| replica_configuration.username.empty() || replica_configuration.password.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some "
"of the parameters and no other dictionary parameters are added");
}
replicas_by_priority[priority].emplace_back(replica_configuration);
}
}
}
else
{
replicas_by_priority[0].emplace_back(common_configuration);
}
}
if (created_from_ddl)
{
for (const auto & replicas : configuration.replicas_configurations)
for (const auto & replica : replicas.second)
for (const auto & [_, replicas] : replicas_by_priority)
for (const auto & replica : replicas)
context->getRemoteHostFilter().checkHostAndPort(replica.host, toString(replica.port));
}
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration.replicas_configurations,
replicas_by_priority,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
settings.postgresql_connection_pool_retries,
settings.postgresql_connection_pool_auto_close_connection,
settings.postgresql_connection_attempt_timeout);
PostgreSQLDictionarySource::Configuration dictionary_configuration
{
.db = configuration.database,
.schema = configuration.schema,
.table = configuration.table,
.query = config.getString(fmt::format("{}.query", settings_config_prefix), ""),
.where = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
.invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""),
.update_field = config.getString(fmt::format("{}.update_field", settings_config_prefix), ""),
.update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1)
};
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, dictionary_configuration, pool, sample_block);
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, dictionary_configuration.value(), pool, sample_block);
#else
(void)dict_struct;
(void)config;

View File

@ -1,288 +0,0 @@
#include "ExternalDataSourceConfiguration.h"
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
IMPLEMENT_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "quota_key", "db",
"database", "table", "schema", "replica",
"update_field", "update_lag", "invalidate_query", "query",
"where", "name", "secure", "uri", "collection"};
template<typename T>
SettingsChanges getSettingsChangesFromConfig(
const BaseSettings<T> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
SettingsChanges config_settings;
for (const auto & setting : settings.all())
{
const auto & setting_name = setting.getName();
auto setting_value = config.getString(config_prefix + '.' + setting_name, "");
if (!setting_value.empty())
config_settings.emplace_back(setting_name, setting_value);
}
return config_settings;
}
String ExternalDataSourceConfiguration::toString() const
{
WriteBufferFromOwnString configuration_info;
configuration_info << "username: " << username << "\t";
if (addresses.empty())
{
configuration_info << "host: " << host << "\t";
configuration_info << "port: " << port << "\t";
}
else
{
for (const auto & [replica_host, replica_port] : addresses)
{
configuration_info << "host: " << replica_host << "\t";
configuration_info << "port: " << replica_port << "\t";
}
}
return configuration_info.str();
}
void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration & conf)
{
host = conf.host;
port = conf.port;
username = conf.username;
password = conf.password;
quota_key = conf.quota_key;
database = conf.database;
table = conf.table;
schema = conf.schema;
addresses = conf.addresses;
addresses_expr = conf.addresses_expr;
}
static void validateConfigKeys(
const Poco::Util::AbstractConfiguration & dict_config, const String & config_prefix, HasConfigKeyFunc has_config_key_func)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
dict_config.keys(config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (!has_config_key_func(config_key))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected key `{}` in dictionary source configuration", config_key);
}
}
template <typename T>
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<T> & settings)
{
validateConfigKeys(dict_config, dict_config_prefix, has_config_key);
ExternalDataSourceConfiguration configuration;
auto collection_name = dict_config.getString(dict_config_prefix + ".name", "");
if (!collection_name.empty())
{
const auto & config = context->getConfigRef();
const auto & collection_prefix = fmt::format("named_collections.{}", collection_name);
validateConfigKeys(dict_config, collection_prefix, has_config_key);
auto config_settings = getSettingsChangesFromConfig(settings, config, collection_prefix);
auto dict_settings = getSettingsChangesFromConfig(settings, dict_config, dict_config_prefix);
/// dictionary config settings override collection settings.
config_settings.insert(config_settings.end(), dict_settings.begin(), dict_settings.end());
if (!config.has(collection_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
configuration.host = dict_config.getString(dict_config_prefix + ".host", config.getString(collection_prefix + ".host", ""));
configuration.port = dict_config.getInt(dict_config_prefix + ".port", config.getUInt(collection_prefix + ".port", 0));
configuration.username = dict_config.getString(dict_config_prefix + ".user", config.getString(collection_prefix + ".user", ""));
configuration.password = dict_config.getString(dict_config_prefix + ".password", config.getString(collection_prefix + ".password", ""));
configuration.quota_key = dict_config.getString(dict_config_prefix + ".quota_key", config.getString(collection_prefix + ".quota_key", ""));
configuration.database = dict_config.getString(dict_config_prefix + ".db", config.getString(dict_config_prefix + ".database",
config.getString(collection_prefix + ".db", config.getString(collection_prefix + ".database", ""))));
configuration.table = dict_config.getString(dict_config_prefix + ".table", config.getString(collection_prefix + ".table", ""));
configuration.schema = dict_config.getString(dict_config_prefix + ".schema", config.getString(collection_prefix + ".schema", ""));
if (configuration.host.empty() || configuration.port == 0 || configuration.username.empty() || configuration.table.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some "
"of the parameters and dictionary parameters are not added");
}
return ExternalDataSourceInfo{.configuration = configuration, .settings_changes = config_settings};
}
return std::nullopt;
}
std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context)
{
URLBasedDataSourceConfiguration configuration;
auto collection_name = dict_config.getString(dict_config_prefix + ".name", "");
if (!collection_name.empty())
{
const auto & config = context->getConfigRef();
const auto & collection_prefix = fmt::format("named_collections.{}", collection_name);
if (!config.has(collection_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
configuration.url =
dict_config.getString(dict_config_prefix + ".url", config.getString(collection_prefix + ".url", ""));
configuration.endpoint =
dict_config.getString(dict_config_prefix + ".endpoint", config.getString(collection_prefix + ".endpoint", ""));
configuration.format =
dict_config.getString(dict_config_prefix + ".format", config.getString(collection_prefix + ".format", ""));
configuration.compression_method =
dict_config.getString(dict_config_prefix + ".compression", config.getString(collection_prefix + ".compression_method", ""));
configuration.structure =
dict_config.getString(dict_config_prefix + ".structure", config.getString(collection_prefix + ".structure", ""));
configuration.user =
dict_config.getString(dict_config_prefix + ".credentials.user", config.getString(collection_prefix + ".credentials.user", ""));
configuration.password =
dict_config.getString(dict_config_prefix + ".credentials.password", config.getString(collection_prefix + ".credentials.password", ""));
String headers_prefix;
const Poco::Util::AbstractConfiguration *headers_config = nullptr;
if (dict_config.has(dict_config_prefix + ".headers"))
{
headers_prefix = dict_config_prefix + ".headers";
headers_config = &dict_config;
}
else
{
headers_prefix = collection_prefix + ".headers";
headers_config = &config;
}
if (headers_config)
{
Poco::Util::AbstractConfiguration::Keys header_keys;
headers_config->keys(headers_prefix, header_keys);
headers_prefix += ".";
for (const auto & header : header_keys)
{
const auto header_prefix = headers_prefix + header;
configuration.headers.emplace_back(
headers_config->getString(header_prefix + ".name"),
headers_config->getString(header_prefix + ".value"));
}
}
return URLBasedDataSourceConfig{ .configuration = configuration };
}
return std::nullopt;
}
ExternalDataSourcesByPriority getExternalDataSourceConfigurationByPriority(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context, HasConfigKeyFunc has_config_key)
{
validateConfigKeys(dict_config, dict_config_prefix, has_config_key);
ExternalDataSourceConfiguration common_configuration;
auto named_collection = getExternalDataSourceConfiguration(dict_config, dict_config_prefix, context, has_config_key);
if (named_collection)
{
common_configuration = named_collection->configuration;
}
else
{
common_configuration.host = dict_config.getString(dict_config_prefix + ".host", "");
common_configuration.port = dict_config.getUInt(dict_config_prefix + ".port", 0);
common_configuration.username = dict_config.getString(dict_config_prefix + ".user", "");
common_configuration.password = dict_config.getString(dict_config_prefix + ".password", "");
common_configuration.quota_key = dict_config.getString(dict_config_prefix + ".quota_key", "");
common_configuration.database = dict_config.getString(dict_config_prefix + ".db", dict_config.getString(dict_config_prefix + ".database", ""));
common_configuration.table = dict_config.getString(fmt::format("{}.table", dict_config_prefix), "");
common_configuration.schema = dict_config.getString(fmt::format("{}.schema", dict_config_prefix), "");
}
ExternalDataSourcesByPriority configuration
{
.database = common_configuration.database,
.table = common_configuration.table,
.schema = common_configuration.schema,
.replicas_configurations = {}
};
if (dict_config.has(dict_config_prefix + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
dict_config.keys(dict_config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (config_key.starts_with("replica"))
{
ExternalDataSourceConfiguration replica_configuration(common_configuration);
String replica_name = dict_config_prefix + "." + config_key;
validateConfigKeys(dict_config, replica_name, has_config_key);
size_t priority = dict_config.getInt(replica_name + ".priority", 0);
replica_configuration.host = dict_config.getString(replica_name + ".host", common_configuration.host);
replica_configuration.port = dict_config.getUInt(replica_name + ".port", common_configuration.port);
replica_configuration.username = dict_config.getString(replica_name + ".user", common_configuration.username);
replica_configuration.password = dict_config.getString(replica_name + ".password", common_configuration.password);
replica_configuration.quota_key = dict_config.getString(replica_name + ".quota_key", common_configuration.quota_key);
if (replica_configuration.host.empty() || replica_configuration.port == 0
|| replica_configuration.username.empty() || replica_configuration.password.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some "
"of the parameters and no other dictionary parameters are added");
}
configuration.replicas_configurations[priority].emplace_back(replica_configuration);
}
}
}
else
{
configuration.replicas_configurations[0].emplace_back(common_configuration);
}
return configuration;
}
void URLBasedDataSourceConfiguration::set(const URLBasedDataSourceConfiguration & conf)
{
url = conf.url;
format = conf.format;
compression_method = conf.compression_method;
structure = conf.structure;
http_method = conf.http_method;
headers = conf.headers;
}
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<EmptySettingsTraits> & settings);
template
SettingsChanges getSettingsChangesFromConfig(
const BaseSettings<EmptySettingsTraits> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
}

View File

@ -1,92 +0,0 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/S3Settings.h>
#include <IO/HTTPHeaderEntries.h>
namespace DB
{
#define EMPTY_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
struct EmptySettings : public BaseSettings<EmptySettingsTraits> {};
struct ExternalDataSourceConfiguration
{
String host;
UInt16 port = 0;
String username = "default";
String password;
String quota_key;
String database;
String table;
String schema;
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
String addresses_expr;
String toString() const;
void set(const ExternalDataSourceConfiguration & conf);
};
using StorageSpecificArgs = std::vector<std::pair<String, ASTPtr>>;
struct ExternalDataSourceInfo
{
ExternalDataSourceConfiguration configuration;
SettingsChanges settings_changes;
};
using HasConfigKeyFunc = std::function<bool(const String &)>;
template <typename T = EmptySettingsTraits>
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<T> & settings = {});
/// Highest priority is 0, the bigger the number in map, the less the priority.
using ExternalDataSourcesConfigurationByPriority = std::map<size_t, std::vector<ExternalDataSourceConfiguration>>;
struct ExternalDataSourcesByPriority
{
String database;
String table;
String schema;
ExternalDataSourcesConfigurationByPriority replicas_configurations;
};
ExternalDataSourcesByPriority
getExternalDataSourceConfigurationByPriority(const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context, HasConfigKeyFunc has_config_key);
struct URLBasedDataSourceConfiguration
{
String url;
String endpoint;
String format = "auto";
String compression_method = "auto";
String structure = "auto";
String user;
String password;
HTTPHeaderEntries headers;
String http_method;
void set(const URLBasedDataSourceConfiguration & conf);
};
struct URLBasedDataSourceConfig
{
URLBasedDataSourceConfiguration configuration;
};
std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);
}

View File

@ -133,7 +133,7 @@ void validateNamedCollection(
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unexpected key {} in named collection. Required keys: {}, optional keys: {}",
"Unexpected key `{}` in named collection. Required keys: {}, optional keys: {}",
backQuoteIfNeed(key), fmt::join(required_keys, ", "), fmt::join(optional_keys, ", "));
}
}

View File

@ -8,8 +8,6 @@
namespace DB
{
struct ExternalDataSourceConfiguration;
/// Storages MySQL and PostgreSQL use ConnectionPoolWithFailover and support multiple replicas.
/// This class unites multiple storages with replicas into multiple shards with replicas.
/// A query to external database is passed to one replica on each shard, the result is united.

View File

@ -1,5 +1,4 @@
#include <Storages/StorageMongoDB.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Common/Exception.h>

View File

@ -15,7 +15,6 @@
#include <Storages/StorageRedis.h>
#include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB

View File

@ -50,7 +50,6 @@ from github_helper import GitHub
from pr_info import PRInfo
from report import (
ERROR,
FAILURE,
PENDING,
SUCCESS,
BuildResult,
@ -62,11 +61,11 @@ from report import (
FAIL,
)
from s3_helper import S3Helper
from stopwatch import Stopwatch
from tee_popen import TeePopen
from ci_cache import CiCache
from ci_settings import CiSettings
from ci_buddy import CIBuddy
from stopwatch import Stopwatch
from version_helper import get_version_from_repo
# pylint: disable=too-many-lines
@ -370,8 +369,8 @@ def _pre_action(s3, job_name, batch, indata, pr_info):
# skip_status = SUCCESS already there
GH.print_in_group("Commit Status Data", job_status)
# create pre report
jr = JobReport.create_pre_report(status=skip_status, job_skipped=to_be_skipped)
# create dummy report
jr = JobReport.create_dummy(status=skip_status, job_skipped=to_be_skipped)
jr.dump()
if not to_be_skipped:
@ -990,19 +989,21 @@ def _run_test(job_name: str, run_command: str) -> int:
stopwatch = Stopwatch()
job_log = Path(TEMP_PATH) / "job_log.txt"
with TeePopen(run_command, job_log, env, timeout) as process:
print(f"Job process started, pid [{process.process.pid}]")
retcode = process.wait()
if retcode != 0:
print(f"Run action failed for: [{job_name}] with exit code [{retcode}]")
if timeout and process.timeout_exceeded:
print(f"Timeout {timeout} exceeded, dumping the job report")
JobReport(
status=FAILURE,
description=f"Timeout {timeout} exceeded",
test_results=[TestResult.create_check_timeout_expired(timeout)],
start_time=stopwatch.start_time_str,
duration=stopwatch.duration_seconds,
additional_files=[job_log],
).dump()
if process.timeout_exceeded:
print(f"Job timed out: [{job_name}] exit code [{retcode}]")
assert JobReport.exist(), "JobReport real or dummy must be present"
jr = JobReport.load()
if jr.dummy:
print(
f"ERROR: Run action failed with timeout and did not generate JobReport - update dummy report with execution time"
)
jr.test_results = [TestResult.create_check_timeout_expired()]
jr.duration = stopwatch.duration_seconds
jr.additional_files += [job_log]
print(f"Run action done for: [{job_name}]")
return retcode
@ -1205,7 +1206,7 @@ def main() -> int:
job_report
), "BUG. There must be job report either real report, or pre-report if job was killed"
error_description = ""
if not job_report.pre_report:
if not job_report.dummy:
# it's a real job report
ch_helper = ClickHouseHelper()
check_url = ""
@ -1329,10 +1330,20 @@ def main() -> int:
if CI.is_test_job(args.job_name):
gh = GitHub(get_best_robot_token(), per_page=100)
commit = get_commit(gh, pr_info.sha)
check_url = ""
if job_report.test_results or job_report.additional_files:
check_url = upload_result_helper.upload_results(
s3,
pr_info.number,
pr_info.sha,
job_report.test_results,
job_report.additional_files,
job_report.check_name or _get_ext_check_name(args.job_name),
)
post_commit_status(
commit,
ERROR,
"",
check_url,
"Error: " + error_description,
_get_ext_check_name(args.job_name),
pr_info,

View File

@ -163,6 +163,7 @@ class CI:
tidy=True,
comment="clang-tidy is used for static analysis",
),
timeout=10800,
),
BuildNames.BINARY_DARWIN: CommonJobConfigs.BUILD.with_properties(
build_config=BuildConfig(
@ -316,6 +317,7 @@ class CI:
JobNames.STATEFUL_TEST_PARALLEL_REPL_TSAN: CommonJobConfigs.STATEFUL_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_TSAN],
random_bucket="parrepl_with_sanitizer",
timeout=3600,
),
JobNames.STATELESS_TEST_ASAN: CommonJobConfigs.STATELESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_ASAN], num_batches=2
@ -343,17 +345,17 @@ class CI:
runner_type=Runners.FUNC_TESTER_ARM,
),
JobNames.STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE: CommonJobConfigs.STATELESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_RELEASE], num_batches=4
required_builds=[BuildNames.PACKAGE_RELEASE], num_batches=2
),
JobNames.STATELESS_TEST_S3_DEBUG: CommonJobConfigs.STATELESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_DEBUG], num_batches=2
required_builds=[BuildNames.PACKAGE_DEBUG], num_batches=1
),
JobNames.STATELESS_TEST_AZURE_ASAN: CommonJobConfigs.STATELESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_ASAN], num_batches=3, release_only=True
),
JobNames.STATELESS_TEST_S3_TSAN: CommonJobConfigs.STATELESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_TSAN],
num_batches=4,
num_batches=3,
),
JobNames.STRESS_TEST_DEBUG: CommonJobConfigs.STRESS_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_DEBUG],
@ -401,7 +403,8 @@ class CI:
required_builds=[BuildNames.PACKAGE_ASAN], release_only=True, num_batches=4
),
JobNames.INTEGRATION_TEST_ASAN_OLD_ANALYZER: CommonJobConfigs.INTEGRATION_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_ASAN], num_batches=6
required_builds=[BuildNames.PACKAGE_ASAN],
num_batches=6,
),
JobNames.INTEGRATION_TEST_TSAN: CommonJobConfigs.INTEGRATION_TEST.with_properties(
required_builds=[BuildNames.PACKAGE_TSAN], num_batches=6

View File

@ -332,7 +332,7 @@ class JobConfig:
# will be triggered for the job if omitted in CI workflow yml
run_command: str = ""
# job timeout, seconds
timeout: Optional[int] = None
timeout: int = 7200
# sets number of batches for a multi-batch job
num_batches: int = 1
# label that enables job in CI, if set digest isn't used
@ -421,7 +421,6 @@ class CommonJobConfigs:
),
run_command='functional_test_check.py "$CHECK_NAME"',
runner_type=Runners.FUNC_TESTER,
timeout=9000,
)
STATEFUL_TEST = JobConfig(
job_name_keyword="stateful",
@ -466,6 +465,7 @@ class CommonJobConfigs:
),
run_command="upgrade_check.py",
runner_type=Runners.STRESS_TESTER,
timeout=3600,
)
INTEGRATION_TEST = JobConfig(
job_name_keyword="integration",

View File

@ -5,10 +5,11 @@ import csv
import logging
import os
import re
import signal
import subprocess
import sys
from pathlib import Path
from typing import List, Tuple
from typing import List, Tuple, Optional
from build_download_helper import download_all_deb_packages
from clickhouse_helper import CiLogsCredentials
@ -25,11 +26,12 @@ from report import (
TestResults,
read_test_results,
FAILURE,
TestResult,
)
from stopwatch import Stopwatch
from tee_popen import TeePopen
from ci_config import CI
from ci_utils import Utils
from ci_utils import Utils, Shell
NO_CHANGES_MSG = "Nothing to run"
@ -113,10 +115,6 @@ def get_run_command(
if flaky_check:
envs.append("-e NUM_TRIES=50")
envs.append("-e MAX_RUN_TIME=2800")
else:
max_run_time = os.getenv("MAX_RUN_TIME", "0")
envs.append(f"-e MAX_RUN_TIME={max_run_time}")
envs += [f"-e {e}" for e in additional_envs]
@ -128,7 +126,7 @@ def get_run_command(
)
return (
f"docker run --volume={builds_path}:/package_folder "
f"docker run --rm --name func-tester --volume={builds_path}:/package_folder "
# For dmesg and sysctl
"--privileged "
f"{ci_logs_args}"
@ -198,7 +196,7 @@ def process_results(
state, description = status[0][0], status[0][1]
if ret_code != 0:
state = ERROR
description += " (but script exited with an error)"
description = f"Job failed, exit code: {ret_code}. " + description
try:
results_path = result_directory / "test_results.tsv"
@ -240,7 +238,19 @@ def parse_args():
return parser.parse_args()
test_process = None # type: Optional[TeePopen]
timeout_expired = False
def handle_sigterm(signum, _frame):
print(f"WARNING: Received signal {signum}")
global timeout_expired
timeout_expired = True
Shell.check(f"docker exec func-tester pkill -f clickhouse-test", verbose=True)
def main():
signal.signal(signal.SIGTERM, handle_sigterm)
logging.basicConfig(level=logging.INFO)
for handler in logging.root.handlers:
# pylint: disable=protected-access
@ -328,11 +338,13 @@ def main():
logging.info("Going to run func tests: %s", run_command)
with TeePopen(run_command, run_log_path) as process:
global test_process
test_process = process
retcode = process.wait()
if retcode == 0:
logging.info("Run successfully")
else:
logging.info("Run failed")
logging.info("Run failed, exit code %s", retcode)
try:
subprocess.check_call(
@ -348,6 +360,13 @@ def main():
state, description, test_results, additional_logs = process_results(
retcode, result_path, server_log_path
)
if timeout_expired:
description = "Timeout expired"
state = FAILURE
test_results.insert(
0, TestResult.create_check_timeout_expired(stopwatch.duration_seconds)
)
else:
print(
"This is validate bugfix or flaky check run, but no changes test to run - skip with success"

View File

@ -9,6 +9,7 @@ import random
import re
import shlex
import shutil
import signal
import string
import subprocess
import sys
@ -16,11 +17,13 @@ import time
import zlib # for crc32
from collections import defaultdict
from itertools import chain
from typing import Any, Dict
from typing import Any, Dict, Optional
from env_helper import IS_CI
from integration_test_images import IMAGES
from tee_popen import TeePopen
from report import JOB_TIMEOUT_TEST_NAME
from stopwatch import Stopwatch
MAX_RETRY = 1
NUM_WORKERS = 5
@ -621,6 +624,9 @@ class ClickhouseIntegrationTestsRunner:
test_data_dirs = {}
for i in range(num_tries):
if timeout_expired:
print("Timeout expired - break test group execution")
break
logging.info("Running test group %s for the %s retry", test_group, i)
clear_ip_tables_and_restart_daemons()
@ -657,6 +663,8 @@ class ClickhouseIntegrationTestsRunner:
logging.info("Executing cmd: %s", cmd)
# ignore retcode, since it meaningful due to pipe to tee
with subprocess.Popen(cmd, shell=True, stderr=log, stdout=log) as proc:
global runner_subprocess
runner_subprocess = proc
proc.wait()
extra_logs_names = [log_basename]
@ -780,6 +788,9 @@ class ClickhouseIntegrationTestsRunner:
logs = []
tries_num = 1 if should_fail else FLAKY_TRIES_COUNT
for i in range(tries_num):
if timeout_expired:
print("Timeout expired - break flaky check execution")
break
final_retry += 1
logging.info("Running tests for the %s time", i)
counters, tests_times, log_paths = self.try_run_test_group(
@ -839,6 +850,7 @@ class ClickhouseIntegrationTestsRunner:
return result_state, status_text, test_result, logs
def run_impl(self, repo_path, build_path):
stopwatch = Stopwatch()
if self.flaky_check or self.bugfix_validate_check:
return self.run_flaky_check(
repo_path, build_path, should_fail=self.bugfix_validate_check
@ -921,6 +933,9 @@ class ClickhouseIntegrationTestsRunner:
random.shuffle(items_to_run)
for group, tests in items_to_run:
if timeout_expired:
print("Timeout expired - break tests execution")
break
logging.info("Running test group %s containing %s tests", group, len(tests))
group_counters, group_test_times, log_paths = self.try_run_test_group(
repo_path, group, tests, MAX_RETRY, NUM_WORKERS, 0
@ -981,6 +996,17 @@ class ClickhouseIntegrationTestsRunner:
status_text = "Timeout, " + status_text
result_state = "failure"
if timeout_expired:
logging.error(
"Job killed by external timeout signal - setting status to failure!"
)
status_text = "Job timeout expired, " + status_text
result_state = "failure"
# add mock test case to make timeout visible in job report and in ci db
test_result.insert(
0, (JOB_TIMEOUT_TEST_NAME, "FAIL", f"{stopwatch.duration_seconds}", "")
)
if not counters or sum(len(counter) for counter in counters.values()) == 0:
status_text = "No tests found for some reason! It's a bug"
result_state = "failure"
@ -1001,6 +1027,7 @@ def write_results(results_file, status_file, results, status):
def run():
signal.signal(signal.SIGTERM, handle_sigterm)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
repo_path = os.environ.get("CLICKHOUSE_TESTS_REPO_PATH")
@ -1035,5 +1062,17 @@ def run():
logging.info("Result written")
timeout_expired = False
runner_subprocess = None # type:Optional[subprocess.Popen]
def handle_sigterm(signum, _frame):
print(f"WARNING: Received signal {signum}")
global timeout_expired
timeout_expired = True
if runner_subprocess:
runner_subprocess.send_signal(signal.SIGTERM)
if __name__ == "__main__":
run()

View File

@ -249,6 +249,7 @@ JOB_REPORT_FILE = Path(GITHUB_WORKSPACE) / "job_report.json"
JOB_STARTED_TEST_NAME = "STARTED"
JOB_FINISHED_TEST_NAME = "COMPLETED"
JOB_TIMEOUT_TEST_NAME = "Job Timeout Expired"
@dataclass
@ -277,8 +278,8 @@ class TestResult:
self.log_files.append(log_path)
@staticmethod
def create_check_timeout_expired(timeout: float) -> "TestResult":
return TestResult("Check timeout expired", "FAIL", timeout)
def create_check_timeout_expired(duration: Optional[float] = None) -> "TestResult":
return TestResult(JOB_TIMEOUT_TEST_NAME, "FAIL", time=duration)
TestResults = List[TestResult]
@ -303,7 +304,7 @@ class JobReport:
# indicates that this is not real job report but report for the job that was skipped by rerun check
job_skipped: bool = False
# indicates that report generated by CI script in order to check later if job was killed before real report is generated
pre_report: bool = False
dummy: bool = False
exit_code: int = -1
@staticmethod
@ -311,7 +312,7 @@ class JobReport:
return datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
@classmethod
def create_pre_report(cls, status: str, job_skipped: bool) -> "JobReport":
def create_dummy(cls, status: str, job_skipped: bool) -> "JobReport":
return JobReport(
status=status,
description="",
@ -320,7 +321,7 @@ class JobReport:
duration=0.0,
additional_files=[],
job_skipped=job_skipped,
pre_report=True,
dummy=True,
)
def update_duration(self):
@ -741,10 +742,21 @@ def create_test_html_report(
has_test_time = any(tr.time is not None for tr in test_results)
has_log_urls = False
# Display entires with logs at the top (they correspond to failed tests)
test_results.sort(
key=lambda result: result.raw_logs is None and result.log_files is None
)
def sort_key(status):
if "fail" in status.lower():
return 0
elif "error" in status.lower():
return 1
elif "not" in status.lower():
return 2
elif "ok" in status.lower():
return 10
elif "success" in status.lower():
return 9
else:
return 5
test_results.sort(key=lambda result: sort_key(result.status))
for test_result in test_results:
colspan = 0

View File

@ -2,6 +2,7 @@
import logging
import os
import signal
import sys
from io import TextIOWrapper
from pathlib import Path
@ -30,20 +31,34 @@ class TeePopen:
self._process = None # type: Optional[Popen]
self.timeout = timeout
self.timeout_exceeded = False
self.terminated_by_sigterm = False
self.terminated_by_sigkill = False
def _check_timeout(self) -> None:
if self.timeout is None:
return
sleep(self.timeout)
logging.warning(
"Timeout exceeded. Send SIGTERM to process %s, timeout %s",
self.process.pid,
self.timeout,
)
self.send_signal(signal.SIGTERM)
time_wait = 0
self.terminated_by_sigterm = True
self.timeout_exceeded = True
while self.process.poll() is None and time_wait < 100:
print("wait...")
wait = 5
sleep(wait)
time_wait += wait
while self.process.poll() is None:
logging.warning(
"Killing process %s, timeout %s exceeded",
self.process.pid,
self.timeout,
logging.error(
"Process is still running. Send SIGKILL",
)
os.killpg(self.process.pid, 9)
sleep(10)
self.send_signal(signal.SIGKILL)
self.terminated_by_sigkill = True
sleep(5)
def __enter__(self) -> "TeePopen":
self.process = Popen(
@ -57,6 +72,8 @@ class TeePopen:
bufsize=1,
errors="backslashreplace",
)
sleep(1)
print(f"Subprocess started, pid [{self.process.pid}]")
if self.timeout is not None and self.timeout > 0:
t = Thread(target=self._check_timeout)
t.daemon = True # does not block the program from exit
@ -85,6 +102,12 @@ class TeePopen:
return self.process.wait()
def poll(self):
return self.process.poll()
def send_signal(self, signal_num):
os.killpg(self.process.pid, signal_num)
@property
def process(self) -> Popen:
if self._process is not None:

View File

@ -2572,12 +2572,12 @@ def do_run_tests(jobs, test_suite: TestSuite):
try:
clickhouse_execute(
args,
query="SELECT 1 /*hang up check*/",
max_http_retries=5,
timeout=20,
query="SELECT 1 /*hung check*/",
max_http_retries=20,
timeout=10,
)
except Exception:
print("Hang up check failed")
print("Hung check failed")
server_died.set()
if server_died.is_set():

View File

@ -1,6 +1,7 @@
[
"test_dns_cache/test.py::test_dns_cache_update",
"test_dns_cache/test.py::test_ip_change_drop_dns_cache",
"test_dns_cache/test.py::test_dns_resolver_filter",
"test_dns_cache/test.py::test_ip_change_update_dns_cache",
"test_dns_cache/test.py::test_user_access_ip_change[node0]",
"test_dns_cache/test.py::test_user_access_ip_change[node1]",

View File

@ -530,10 +530,61 @@ def test_bad_configuration(started_cluster):
"""
)
node1.query_and_get_error(
assert "Unexpected key `dbbb`" in node1.query_and_get_error(
"SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(1))"
)
assert node1.contains_in_log("Unexpected key `dbbb`")
def test_named_collection_from_ddl(started_cluster):
cursor = started_cluster.postgres_conn.cursor()
cursor.execute("DROP TABLE IF EXISTS test_table")
cursor.execute("CREATE TABLE test_table (id integer, value integer)")
node1.query(
"""
DROP NAMED COLLECTION IF EXISTS pg_conn;
CREATE NAMED COLLECTION pg_conn
AS user = 'postgres', password = 'mysecretpassword', host = 'postgres1', port = 5432, database = 'postgres', table = 'test_table';
"""
)
cursor.execute(
"INSERT INTO test_table SELECT i, i FROM generate_series(0, 99) as t(i)"
)
node1.query(
"""
DROP DICTIONARY IF EXISTS postgres_dict;
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME pg_conn))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
"""
)
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert int(result.strip()) == 99
node1.query(
"""
DROP NAMED COLLECTION IF EXISTS pg_conn_2;
CREATE NAMED COLLECTION pg_conn_2
AS user = 'postgres', password = 'mysecretpassword', host = 'postgres1', port = 5432, dbbb = 'postgres', table = 'test_table';
"""
)
node1.query(
"""
DROP DICTIONARY IF EXISTS postgres_dict;
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME pg_conn_2))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
"""
)
assert "Unexpected key `dbbb`" in node1.query_and_get_error(
"SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))"
)
if __name__ == "__main__":

View File

@ -317,3 +317,74 @@ def test_host_is_drop_from_cache_after_consecutive_failures(
assert node4.wait_for_log_line(
"Cached hosts dropped:.*InvalidHostThatDoesNotExist.*"
)
node7 = cluster.add_instance(
"node7",
main_configs=["configs/listen_host.xml", "configs/dns_update_long.xml"],
with_zookeeper=True,
ipv6_address="2001:3984:3989::1:1117",
ipv4_address="10.5.95.17",
)
def _render_filter_config(allow_ipv4, allow_ipv6):
config = f"""
<clickhouse>
<dns_allow_resolve_names_to_ipv4>{int(allow_ipv4)}</dns_allow_resolve_names_to_ipv4>
<dns_allow_resolve_names_to_ipv6>{int(allow_ipv6)}</dns_allow_resolve_names_to_ipv6>
</clickhouse>
"""
return config
@pytest.mark.parametrize(
"allow_ipv4, allow_ipv6",
[
(True, False),
(False, True),
(False, False),
],
)
def test_dns_resolver_filter(cluster_without_dns_cache_update, allow_ipv4, allow_ipv6):
node = node7
host_ipv6 = node.ipv6_address
host_ipv4 = node.ipv4_address
node.set_hosts(
[
(host_ipv6, "test_host"),
(host_ipv4, "test_host"),
]
)
node.replace_config(
"/etc/clickhouse-server/config.d/dns_filter.xml",
_render_filter_config(allow_ipv4, allow_ipv6),
)
node.query("SYSTEM RELOAD CONFIG")
node.query("SYSTEM DROP DNS CACHE")
node.query("SYSTEM DROP CONNECTIONS CACHE")
if not allow_ipv4 and not allow_ipv6:
with pytest.raises(QueryRuntimeException):
node.query("SELECT * FROM remote('lost_host', 'system', 'one')")
else:
node.query("SELECT * FROM remote('test_host', system, one)")
assert (
node.query(
"SELECT ip_address FROM system.dns_cache WHERE hostname='test_host'"
)
== f"{host_ipv4 if allow_ipv4 else host_ipv6}\n"
)
node.exec_in_container(
[
"bash",
"-c",
"rm /etc/clickhouse-server/config.d/dns_filter.xml",
],
privileged=True,
user="root",
)
node.query("SYSTEM RELOAD CONFIG")

View File

@ -19,7 +19,13 @@ def test_and_check(name, a, b, t_stat, p_value):
)
client.query(
"INSERT INTO mann_whitney VALUES {};".format(
", ".join(["({},{}), ({},{})".format(i, 0, j, 1) for i, j in zip(a, b)])
", ".join(["({},{})".format(i, 0) for i in a])
)
)
client.query(
"INSERT INTO mann_whitney VALUES {};".format(
", ".join(["({},{})".format(i, 1) for i in b])
)
)
@ -59,6 +65,15 @@ def test_mann_whitney():
test_and_check("mannWhitneyUTest('greater')", rvs1, rvs2, s, p)
def test_mann_whitney_skew():
rvs1 = [1]
rvs2 = [0, 2, 4]
s, p = stats.mannwhitneyu(rvs1, rvs2, alternative="two-sided")
test_and_check("mannWhitneyUTest", rvs1, rvs2, s, p)
test_and_check("mannWhitneyUTest('two-sided')", rvs1, rvs2, s, p)
if __name__ == "__main__":
test_mann_whitney()
test_mann_whitney_skew()
print("Ok.")