Merge branch 'master' into tighten-limits-functional-tests

This commit is contained in:
Alexey Milovidov 2024-08-03 19:29:58 +02:00
commit 900cc8afc6
89 changed files with 706 additions and 587 deletions

View File

@ -26,7 +26,6 @@ sed -i '/onBrokenMarkdownLinks:/ s/ignore/error/g' docusaurus.config.js
if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then
export CI=true
yarn install
exec yarn build "$@"
fi

View File

@ -119,11 +119,6 @@ Minimum size of blocks of uncompressed data required for compression when writin
You can also specify this setting in the global settings (see [min_compress_block_size](/docs/en/operations/settings/settings.md/#min-compress-block-size) setting).
The value specified when table is created overrides the global value for this setting.
## max_partitions_to_read
Limits the maximum number of partitions that can be accessed in one query.
You can also specify setting [max_partitions_to_read](/docs/en/operations/settings/merge-tree-settings.md/#max-partitions-to-read) in the global setting.
## max_suspicious_broken_parts
If the number of broken parts in a single partition exceeds the `max_suspicious_broken_parts` value, automatic deletion is denied.
@ -691,6 +686,8 @@ Possible values:
Default value: -1 (unlimited).
You can also specify a query complexity setting [max_partitions_to_read](query-complexity#max-partitions-to-read) at a query / session / profile level.
## min_age_to_force_merge_seconds {#min_age_to_force_merge_seconds}
Merge parts if every part in the range is older than the value of `min_age_to_force_merge_seconds`.

View File

@ -188,7 +188,7 @@ If you set `timeout_before_checking_execution_speed `to 0, ClickHouse will use c
What to do if the query is run longer than `max_execution_time` or the estimated running time is longer than `max_estimated_execution_time`: `throw` or `break`. By default, `throw`.
# max_execution_time_leaf
## max_execution_time_leaf
Similar semantic to `max_execution_time` but only apply on leaf node for distributed or remote queries.
@ -204,7 +204,7 @@ We can use `max_execution_time_leaf` as the query settings:
SELECT count() FROM cluster(cluster, view(SELECT * FROM t)) SETTINGS max_execution_time_leaf = 10;
```
# timeout_overflow_mode_leaf
## timeout_overflow_mode_leaf
What to do when the query in leaf node run longer than `max_execution_time_leaf`: `throw` or `break`. By default, `throw`.
@ -426,3 +426,17 @@ Example:
```
Default value: 0 (Infinite count of simultaneous sessions).
## max_partitions_to_read {#max-partitions-to-read}
Limits the maximum number of partitions that can be accessed in one query.
The setting value specified when the table is created can be overridden via query-level setting.
Possible values:
- Any positive integer.
Default value: -1 (unlimited).
You can also specify a MergeTree setting [max_partitions_to_read](merge-tree-settings#max-partitions-to-read) in tables' setting.

View File

@ -218,20 +218,27 @@ AsyncLoader::~AsyncLoader()
{
// All `LoadTask` objects should be destructed before AsyncLoader destruction because they hold a reference.
// To make sure we check for all pending jobs to be finished.
std::unique_lock lock{mutex};
if (scheduled_jobs.empty() && finished_jobs.empty())
return;
{
std::unique_lock lock{mutex};
if (!scheduled_jobs.empty() || !finished_jobs.empty())
{
std::vector<String> scheduled;
std::vector<String> finished;
scheduled.reserve(scheduled_jobs.size());
finished.reserve(finished_jobs.size());
for (const auto & [job, _] : scheduled_jobs)
scheduled.push_back(job->name);
for (const auto & job : finished_jobs)
finished.push_back(job->name);
LOG_ERROR(log, "Bug. Destruction with pending ({}) and finished ({}) load jobs.", fmt::join(scheduled, ", "), fmt::join(finished, ", "));
abort();
}
}
std::vector<String> scheduled;
std::vector<String> finished;
scheduled.reserve(scheduled_jobs.size());
finished.reserve(finished_jobs.size());
for (const auto & [job, _] : scheduled_jobs)
scheduled.push_back(job->name);
for (const auto & job : finished_jobs)
finished.push_back(job->name);
LOG_ERROR(log, "Bug. Destruction with pending ({}) and finished ({}) load jobs.", fmt::join(scheduled, ", "), fmt::join(finished, ", "));
abort();
// When all jobs are done we could still have finalizing workers.
// These workers could call updateCurrentPriorityAndSpawn() that scans all pools.
// We need to stop all of them before destructing any of them.
stop();
}
void AsyncLoader::start()

View File

@ -306,6 +306,8 @@
\
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
\
M(S3DiskNoKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -4,8 +4,6 @@
#include <Common/ExponentiallySmoothedCounter.h>
#include <numbers>
namespace DB
{
@ -14,9 +12,10 @@ namespace DB
class EventRateMeter
{
public:
explicit EventRateMeter(double now, double period_)
explicit EventRateMeter(double now, double period_, size_t heating_ = 0)
: period(period_)
, half_decay_time(period * std::numbers::ln2) // for `ExponentiallySmoothedAverage::sumWeights()` to be equal to `1/period`
, max_interval(period * 10)
, heating(heating_)
{
reset(now);
}
@ -29,16 +28,11 @@ public:
{
// Remove data for initial heating stage that can present at the beginning of a query.
// Otherwise it leads to wrong gradual increase of average value, turning algorithm into not very reactive.
if (count != 0.0 && ++data_points < 5)
{
start = events.time;
events = ExponentiallySmoothedAverage();
}
if (count != 0.0 && data_points++ <= heating)
reset(events.time, data_points);
if (now - period <= start) // precise counting mode
events = ExponentiallySmoothedAverage(events.value + count, now);
else // exponential smoothing mode
events.add(count, now, half_decay_time);
duration.add(std::min(max_interval, now - duration.time), now, period);
events.add(count, now, period);
}
/// Compute average event rate throughout `[now - period, now]` period.
@ -49,24 +43,26 @@ public:
add(now, 0);
if (unlikely(now <= start))
return 0;
if (now - period <= start) // precise counting mode
return events.value / (now - start);
else // exponential smoothing mode
return events.get(half_decay_time); // equals to `events.value / period`
// We do not use .get() because sum of weights will anyway be canceled out (optimization)
return events.value / duration.value;
}
void reset(double now)
void reset(double now, size_t data_points_ = 0)
{
start = now;
events = ExponentiallySmoothedAverage();
data_points = 0;
duration = ExponentiallySmoothedAverage();
data_points = data_points_;
}
private:
const double period;
const double half_decay_time;
const double max_interval;
const size_t heating;
double start; // Instant in past without events before it; when measurement started or reset
ExponentiallySmoothedAverage events; // Estimated number of events in the last `period`
ExponentiallySmoothedAverage duration; // Current duration of a period
ExponentiallySmoothedAverage events; // Estimated number of events in last `duration` seconds
size_t data_points = 0;
};

View File

@ -105,7 +105,7 @@ private:
bool write_progress_on_update = false;
EventRateMeter cpu_usage_meter{static_cast<double>(clock_gettime_ns()), 2'000'000'000 /*ns*/}; // average cpu utilization last 2 second
EventRateMeter cpu_usage_meter{static_cast<double>(clock_gettime_ns()), 2'000'000'000 /*ns*/, 4}; // average cpu utilization last 2 second, skip first 4 points
HostToTimesMap hosts_data;
/// In case of all of the above:
/// - clickhouse-local

View File

@ -3,6 +3,8 @@
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <Common/Priority.h>
#include <Common/EventRateMeter.h>
#include <Common/Stopwatch.h>
#include <base/defines.h>
#include <base/types.h>
@ -176,6 +178,14 @@ protected:
/// Postponed to be handled in scheduler thread, so it is intended to be called from outside.
void scheduleActivation();
/// Helper for introspection metrics
void incrementDequeued(ResourceCost cost)
{
dequeued_requests++;
dequeued_cost += cost;
throughput.add(static_cast<double>(clock_gettime_ns())/1e9, cost);
}
public:
EventQueue * const event_queue;
String basename;
@ -189,6 +199,10 @@ public:
std::atomic<ResourceCost> dequeued_cost{0};
std::atomic<ResourceCost> canceled_cost{0};
std::atomic<UInt64> busy_periods{0};
/// Average dequeued_cost per second
/// WARNING: Should only be accessed from the scheduler thread, so that locking is not required
EventRateMeter throughput{static_cast<double>(clock_gettime_ns())/1e9, 2, 1};
};
using SchedulerNodePtr = std::shared_ptr<ISchedulerNode>;

View File

@ -188,8 +188,7 @@ public:
if (request)
{
dequeued_requests++;
dequeued_cost += request->cost;
incrementDequeued(request->cost);
return {request, heap_size > 0};
}
}

View File

@ -59,8 +59,7 @@ public:
if (requests.empty())
busy_periods++;
queue_cost -= result->cost;
dequeued_requests++;
dequeued_cost += result->cost;
incrementDequeued(result->cost);
return {result, !requests.empty()};
}

View File

@ -122,8 +122,7 @@ public:
if (request)
{
dequeued_requests++;
dequeued_cost += request->cost;
incrementDequeued(request->cost);
return {request, !items.empty()};
}
}

View File

@ -81,8 +81,7 @@ public:
child_active = child_now_active;
if (!active())
busy_periods++;
dequeued_requests++;
dequeued_cost += request->cost;
incrementDequeued(request->cost);
return {request, active()};
}

View File

@ -89,8 +89,7 @@ public:
child_active = child_now_active;
if (!active())
busy_periods++;
dequeued_requests++;
dequeued_cost += request->cost;
incrementDequeued(request->cost);
return {request, active()};
}

View File

@ -162,8 +162,7 @@ public:
if (request == nullptr) // Possible in case of request cancel, just retry
continue;
dequeued_requests++;
dequeued_cost += request->cost;
incrementDequeued(request->cost);
return {request, current != nullptr};
}
}

View File

@ -0,0 +1,68 @@
#include <gtest/gtest.h>
#include <Common/EventRateMeter.h>
#include <cmath>
TEST(EventRateMeter, ExponentiallySmoothedAverage)
{
double target = 100.0;
// The test is only correct for timestep of 1 second because of
// how sum of weights is implemented inside `ExponentiallySmoothedAverage`
double time_step = 1.0;
for (double half_decay_time : { 0.1, 1.0, 10.0, 100.0})
{
DB::ExponentiallySmoothedAverage esa;
int steps = static_cast<int>(half_decay_time * 30 / time_step);
for (int i = 1; i <= steps; ++i)
esa.add(target * time_step, i * time_step, half_decay_time);
double measured = esa.get(half_decay_time);
ASSERT_LE(std::fabs(measured - target), 1e-5 * target);
}
}
TEST(EventRateMeter, ConstantRate)
{
double target = 100.0;
for (double period : {0.1, 1.0, 10.0})
{
for (double time_step : {0.001, 0.01, 0.1, 1.0})
{
DB::EventRateMeter erm(0.0, period);
int steps = static_cast<int>(period * 30 / time_step);
for (int i = 1; i <= steps; ++i)
erm.add(i * time_step, target * time_step);
double measured = erm.rate(steps * time_step);
// std::cout << "T=" << period << " dt=" << time_step << " measured=" << measured << std::endl;
ASSERT_LE(std::fabs(measured - target), 1e-5 * target);
}
}
}
TEST(EventRateMeter, PreciseStart)
{
double target = 100.0;
for (double period : {0.1, 1.0, 10.0})
{
for (double time_step : {0.001, 0.01, 0.1, 1.0})
{
DB::EventRateMeter erm(0.0, period);
int steps = static_cast<int>(period / time_step);
for (int i = 1; i <= steps; ++i)
{
erm.add(i * time_step, target * time_step);
double measured = erm.rate(i * time_step);
// std::cout << "T=" << period << " dt=" << time_step << " measured=" << measured << std::endl;
ASSERT_LE(std::fabs(measured - target), 1e-5 * target);
}
}
}
}

View File

@ -936,6 +936,7 @@ class IColumn;
M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \
M(Bool, parallel_replicas_prefer_local_join, true, "If true, and JOIN can be executed with parallel replicas algorithm, and all storages of right JOIN part are *MergeTree, local JOIN will be used instead of GLOBAL JOIN.", 0) \
M(UInt64, parallel_replicas_mark_segment_size, 128, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing", 0) \
M(Bool, allow_archive_path_syntax, true, "File/S3 engines/table function will parse paths with '::' as '<archive> :: <file>' if archive has correct extension", 0) \
\
M(Bool, allow_experimental_inverted_index, false, "If it is set to true, allow to use experimental inverted index.", 0) \
M(Bool, allow_experimental_full_text_index, false, "If it is set to true, allow to use experimental full-text index.", 0) \

View File

@ -76,6 +76,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"24.8",
{
{"merge_tree_min_bytes_per_task_for_remote_reading", 4194304, 2097152, "Value is unified with `filesystem_prefetch_min_bytes_for_single_read_task`"},
{"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."},
}
},
{"24.7",
@ -151,6 +152,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"cast_string_to_dynamic_use_inference", false, false, "Add setting to allow converting String to Dynamic through parsing"},
{"allow_experimental_dynamic_type", false, false, "Add new experimental Dynamic type"},
{"azure_max_blocks_in_multipart_upload", 50000, 50000, "Maximum number of blocks in multipart upload for Azure."},
{"allow_archive_path_syntax", false, true, "Added new setting to allow disabling archive path syntax."},
}
},
{"24.4",

View File

@ -103,7 +103,15 @@ static std::string getSortDescriptionDump(const SortDescription & description, c
WriteBufferFromOwnString buffer;
for (size_t i = 0; i < description.size(); ++i)
buffer << header_types[i]->getName() << ' ' << description[i].direction << ' ' << description[i].nulls_direction;
{
if (i != 0)
buffer << ", ";
buffer << "(type: " << header_types[i]->getName()
<< ", direction: " << description[i].direction
<< ", nulls_direction: " << description[i].nulls_direction
<< ")";
}
return buffer.str();
}

View File

@ -16,6 +16,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/NormalizeSelectWithUnionQueryVisitor.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ParserCreateQuery.h>
@ -250,6 +251,8 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
convertMergeTreeToReplicatedIfNeeded(ast, qualified_name, file_name);
NormalizeSelectWithUnionQueryVisitor::Data data{local_context->getSettingsRef().union_default_mode};
NormalizeSelectWithUnionQueryVisitor{data}.visit(ast);
std::lock_guard lock{metadata.mutex};
metadata.parsed_tables[qualified_name] = ParsedTableMetadata{full_path.string(), ast};
metadata.total_dictionaries += create_query->is_dictionary;

View File

@ -12,7 +12,6 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/PoolId.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
@ -339,12 +338,9 @@ ClusterPtr DatabaseReplicated::getClusterImpl(bool all_groups) const
return std::make_shared<Cluster>(getContext()->getSettingsRef(), shards, params);
}
ReplicasInfo DatabaseReplicated::tryGetReplicasInfo(const ClusterPtr & cluster_) const
std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr & cluster_) const
{
Strings paths_get, paths_exists;
paths_get.emplace_back(fs::path(zookeeper_path) / "max_log_ptr");
Strings paths;
const auto & addresses_with_failover = cluster_->getShardsAddresses();
const auto & shards_info = cluster_->getShardsInfo();
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
@ -352,59 +348,32 @@ ReplicasInfo DatabaseReplicated::tryGetReplicasInfo(const ClusterPtr & cluster_)
for (const auto & replica : addresses_with_failover[shard_index])
{
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
paths_exists.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
paths_get.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "log_ptr");
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
}
}
try
{
auto current_zookeeper = getZooKeeper();
auto get_res = current_zookeeper->get(paths_get);
auto exist_res = current_zookeeper->exists(paths_exists);
chassert(get_res.size() == exist_res.size() + 1);
auto res = current_zookeeper->exists(paths);
auto max_log_ptr_zk = get_res[0];
if (max_log_ptr_zk.error != Coordination::Error::ZOK)
throw Coordination::Exception(max_log_ptr_zk.error);
std::vector<UInt8> statuses;
statuses.resize(paths.size());
UInt32 max_log_ptr = parse<UInt32>(max_log_ptr_zk.data);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error == Coordination::Error::ZOK)
statuses[i] = 1;
ReplicasInfo replicas_info;
replicas_info.resize(exist_res.size());
size_t global_replica_index = 0;
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
for (const auto & replica : addresses_with_failover[shard_index])
{
auto replica_active = exist_res[global_replica_index];
auto replica_log_ptr = get_res[global_replica_index + 1];
if (replica_active.error != Coordination::Error::ZOK && replica_active.error != Coordination::Error::ZNONODE)
throw Coordination::Exception(replica_active.error);
if (replica_log_ptr.error != Coordination::Error::ZOK)
throw Coordination::Exception(replica_log_ptr.error);
replicas_info[global_replica_index] = ReplicaInfo{
.is_active = replica_active.error == Coordination::Error::ZOK,
.replication_lag = max_log_ptr - parse<UInt32>(replica_log_ptr.data),
.recovery_time = replica.is_local ? ddl_worker->getCurrentInitializationDurationMs() : 0,
};
++global_replica_index;
}
}
return replicas_info;
} catch (...)
return statuses;
}
catch (...)
{
tryLogCurrentException(log);
return {};
}
}
void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco::Util::AbstractConfiguration & config_ref)
{
const auto & config_prefix = fmt::format("named_collections.{}", collection_name);

View File

@ -17,14 +17,6 @@ using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
struct ReplicaInfo
{
bool is_active;
UInt32 replication_lag;
UInt64 recovery_time;
};
using ReplicasInfo = std::vector<ReplicaInfo>;
class DatabaseReplicated : public DatabaseAtomic
{
public:
@ -92,7 +84,7 @@ public:
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, bool throw_if_noop);
ReplicasInfo tryGetReplicasInfo(const ClusterPtr & cluster_) const;
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;
void renameDatabase(ContextPtr query_context, const String & new_name) override;

View File

@ -32,12 +32,6 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db
bool DatabaseReplicatedDDLWorker::initializeMainThread()
{
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.emplace();
initialization_duration_timer->start();
}
while (!stop_flag)
{
try
@ -75,10 +69,6 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
initializeReplication();
initialized = true;
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.reset();
}
return true;
}
catch (...)
@ -88,11 +78,6 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
}
}
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.reset();
}
return false;
}
@ -474,10 +459,4 @@ UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const
return max_id.load();
}
UInt64 DatabaseReplicatedDDLWorker::getCurrentInitializationDurationMs() const
{
std::lock_guard lock(initialization_duration_timer_mutex);
return initialization_duration_timer ? initialization_duration_timer->elapsedMilliseconds() : 0;
}
}

View File

@ -36,8 +36,6 @@ public:
DatabaseReplicated * const database, bool committed = false); /// NOLINT
UInt32 getLogPointer() const;
UInt64 getCurrentInitializationDurationMs() const;
private:
bool initializeMainThread() override;
void initializeReplication();
@ -58,9 +56,6 @@ private:
ZooKeeperPtr active_node_holder_zookeeper;
/// It will remove "active" node when database is detached
zkutil::EphemeralNodeHolderPtr active_node_holder;
std::optional<Stopwatch> initialization_duration_timer;
mutable std::mutex initialization_duration_timer_mutex;
};
}

View File

@ -24,7 +24,7 @@ namespace DB
static constexpr auto millisecond_multiplier = 1'000;
static constexpr auto microsecond_multiplier = 1'000'000;
static constexpr auto nanosecond_multiplier = 1'000'000'000;
static constexpr auto nanosecond_multiplier = 1'000'000'000;
static constexpr FormatSettings::DateTimeOverflowBehavior default_date_time_overflow_behavior = FormatSettings::DateTimeOverflowBehavior::Ignore;
@ -381,11 +381,13 @@ struct ToStartOfWeekImpl
static UInt16 execute(Int64 t, UInt8 week_mode, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfWeek(time_zone.toDayNum(t), week_mode);
const int res = time_zone.toFirstDayNumOfWeek(time_zone.toDayNum(t), week_mode);
return std::max(res, 0);
}
static UInt16 execute(UInt32 t, UInt8 week_mode, const DateLUTImpl & time_zone)
{
return time_zone.toFirstDayNumOfWeek(time_zone.toDayNum(t), week_mode);
const int res = time_zone.toFirstDayNumOfWeek(time_zone.toDayNum(t), week_mode);
return std::max(res, 0);
}
static UInt16 execute(Int32 d, UInt8 week_mode, const DateLUTImpl & time_zone)
{

View File

@ -2,6 +2,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Backups/RestorerFromBackup.h>
#include <Core/Settings.h>
#include <Functions/FunctionFactory.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/UserDefinedExecutableFunctionFactory.h>
@ -9,6 +10,7 @@
#include <Functions/UserDefined/UserDefinedSQLObjectsBackup.h>
#include <Interpreters/Context.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/NormalizeSelectWithUnionQueryVisitor.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -80,13 +82,15 @@ namespace
validateFunctionRecursiveness(*function_body, name);
}
ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query)
ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query, const ContextPtr & context)
{
auto ptr = create_function_query.clone();
auto & res = typeid_cast<ASTCreateFunctionQuery &>(*ptr);
res.if_not_exists = false;
res.or_replace = false;
FunctionNameNormalizer::visit(res.function_core.get());
NormalizeSelectWithUnionQueryVisitor::Data data{context->getSettingsRef().union_default_mode};
NormalizeSelectWithUnionQueryVisitor{data}.visit(res.function_core);
return ptr;
}
}
@ -125,7 +129,7 @@ void UserDefinedSQLFunctionFactory::checkCanBeUnregistered(const ContextPtr & co
bool UserDefinedSQLFunctionFactory::registerFunction(const ContextMutablePtr & context, const String & function_name, ASTPtr create_function_query, bool throw_if_exists, bool replace_if_exists)
{
checkCanBeRegistered(context, function_name, *create_function_query);
create_function_query = normalizeCreateFunctionQuery(*create_function_query);
create_function_query = normalizeCreateFunctionQuery(*create_function_query, context);
try
{

View File

@ -1,7 +1,7 @@
#include "Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h"
#include "Functions/UserDefined/UserDefinedSQLFunctionFactory.h"
#include "Functions/UserDefined/UserDefinedSQLObjectType.h"
#include <Functions/UserDefined/UserDefinedSQLObjectType.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h>
#include <Common/StringUtils.h>
#include <Common/atomicRename.h>
@ -54,7 +54,7 @@ namespace
}
UserDefinedSQLObjectsDiskStorage::UserDefinedSQLObjectsDiskStorage(const ContextPtr & global_context_, const String & dir_path_)
: global_context(global_context_)
: UserDefinedSQLObjectsStorageBase(global_context_)
, dir_path{makeDirectoryPathCanonical(dir_path_)}
, log{getLogger("UserDefinedSQLObjectsLoaderFromDisk")}
{

View File

@ -42,7 +42,6 @@ private:
ASTPtr tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & file_path, bool check_file_exists);
String getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const;
ContextPtr global_context;
String dir_path;
LoggerPtr log;
std::atomic<bool> objects_loaded = false;

View File

@ -2,7 +2,10 @@
#include <boost/container/flat_set.hpp>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/NormalizeSelectWithUnionQueryVisitor.h>
#include <Parsers/ASTCreateFunctionQuery.h>
namespace DB
@ -17,18 +20,24 @@ namespace ErrorCodes
namespace
{
ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query)
ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query, const ContextPtr & context)
{
auto ptr = create_function_query.clone();
auto & res = typeid_cast<ASTCreateFunctionQuery &>(*ptr);
res.if_not_exists = false;
res.or_replace = false;
FunctionNameNormalizer::visit(res.function_core.get());
NormalizeSelectWithUnionQueryVisitor::Data data{context->getSettingsRef().union_default_mode};
NormalizeSelectWithUnionQueryVisitor{data}.visit(res.function_core);
return ptr;
}
}
UserDefinedSQLObjectsStorageBase::UserDefinedSQLObjectsStorageBase(ContextPtr global_context_)
: global_context(std::move(global_context_))
{}
ASTPtr UserDefinedSQLObjectsStorageBase::get(const String & object_name) const
{
std::lock_guard lock(mutex);
@ -148,7 +157,7 @@ void UserDefinedSQLObjectsStorageBase::setAllObjects(const std::vector<std::pair
{
std::unordered_map<String, ASTPtr> normalized_functions;
for (const auto & [function_name, create_query] : new_objects)
normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query);
normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query, global_context);
std::lock_guard lock(mutex);
object_name_to_create_object_map = std::move(normalized_functions);
@ -166,7 +175,7 @@ std::vector<std::pair<String, ASTPtr>> UserDefinedSQLObjectsStorageBase::getAllO
void UserDefinedSQLObjectsStorageBase::setObject(const String & object_name, const IAST & create_object_query)
{
std::lock_guard lock(mutex);
object_name_to_create_object_map[object_name] = normalizeCreateFunctionQuery(create_object_query);
object_name_to_create_object_map[object_name] = normalizeCreateFunctionQuery(create_object_query, global_context);
}
void UserDefinedSQLObjectsStorageBase::removeObject(const String & object_name)

View File

@ -4,6 +4,7 @@
#include <mutex>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST.h>
@ -13,6 +14,7 @@ namespace DB
class UserDefinedSQLObjectsStorageBase : public IUserDefinedSQLObjectsStorage
{
public:
explicit UserDefinedSQLObjectsStorageBase(ContextPtr global_context_);
ASTPtr get(const String & object_name) const override;
ASTPtr tryGet(const String & object_name) const override;
@ -64,6 +66,8 @@ protected:
std::unordered_map<String, ASTPtr> object_name_to_create_object_map;
mutable std::recursive_mutex mutex;
ContextPtr global_context;
};
}

View File

@ -48,7 +48,7 @@ namespace
UserDefinedSQLObjectsZooKeeperStorage::UserDefinedSQLObjectsZooKeeperStorage(
const ContextPtr & global_context_, const String & zookeeper_path_)
: global_context{global_context_}
: UserDefinedSQLObjectsStorageBase(global_context_)
, zookeeper_getter{[global_context_]() { return global_context_->getZooKeeper(); }}
, zookeeper_path{zookeeper_path_}
, watch_queue{std::make_shared<ConcurrentBoundedQueue<std::pair<UserDefinedSQLObjectType, String>>>(std::numeric_limits<size_t>::max())}

View File

@ -68,8 +68,6 @@ private:
void refreshObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type);
void syncObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type);
ContextPtr global_context;
zkutil::ZooKeeperCachingGetter zookeeper_getter;
String zookeeper_path;
std::atomic<bool> objects_loaded = false;

View File

@ -0,0 +1,50 @@
#include <IO/Archives/ArchiveUtils.h>
#include <string_view>
#include <array>
namespace DB
{
namespace
{
using namespace std::literals;
constexpr std::array tar_extensions{".tar"sv, ".tar.gz"sv, ".tgz"sv, ".tar.zst"sv, ".tzst"sv, ".tar.xz"sv, ".tar.bz2"sv, ".tar.lzma"sv};
constexpr std::array zip_extensions{".zip"sv, ".zipx"sv};
constexpr std::array sevenz_extensiosns{".7z"sv};
bool hasSupportedExtension(std::string_view path, const auto & supported_extensions)
{
for (auto supported_extension : supported_extensions)
{
if (path.ends_with(supported_extension))
return true;
}
return false;
}
}
bool hasSupportedTarExtension(std::string_view path)
{
return hasSupportedExtension(path, tar_extensions);
}
bool hasSupportedZipExtension(std::string_view path)
{
return hasSupportedExtension(path, zip_extensions);
}
bool hasSupported7zExtension(std::string_view path)
{
return hasSupportedExtension(path, sevenz_extensiosns);
}
bool hasSupportedArchiveExtension(std::string_view path)
{
return hasSupportedTarExtension(path) || hasSupportedZipExtension(path) || hasSupported7zExtension(path);
}
}

View File

@ -10,3 +10,17 @@
#include <archive.h>
#include <archive_entry.h>
#endif
#include <string_view>
namespace DB
{
bool hasSupportedTarExtension(std::string_view path);
bool hasSupportedZipExtension(std::string_view path);
bool hasSupported7zExtension(std::string_view path);
bool hasSupportedArchiveExtension(std::string_view path);
}

View File

@ -1,6 +1,7 @@
#include <IO/Archives/LibArchiveReader.h>
#include <IO/Archives/ZipArchiveReader.h>
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/ArchiveUtils.h>
#include <Common/Exception.h>
@ -12,7 +13,6 @@ extern const int CANNOT_UNPACK_ARCHIVE;
extern const int SUPPORT_IS_DISABLED;
}
std::shared_ptr<IArchiveReader> createArchiveReader(const String & path_to_archive)
{
return createArchiveReader(path_to_archive, {}, 0);
@ -24,11 +24,7 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
[[maybe_unused]] const std::function<std::unique_ptr<SeekableReadBuffer>()> & archive_read_function,
[[maybe_unused]] size_t archive_size)
{
using namespace std::literals;
static constexpr std::array tar_extensions{
".tar"sv, ".tar.gz"sv, ".tgz"sv, ".tar.zst"sv, ".tzst"sv, ".tar.xz"sv, ".tar.bz2"sv, ".tar.lzma"sv};
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
if (hasSupportedZipExtension(path_to_archive))
{
#if USE_MINIZIP
return std::make_shared<ZipArchiveReader>(path_to_archive, archive_read_function, archive_size);
@ -36,8 +32,7 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "minizip library is disabled");
#endif
}
else if (std::any_of(
tar_extensions.begin(), tar_extensions.end(), [&](const auto extension) { return path_to_archive.ends_with(extension); }))
else if (hasSupportedTarExtension(path_to_archive))
{
#if USE_LIBARCHIVE
return std::make_shared<TarArchiveReader>(path_to_archive, archive_read_function);
@ -45,7 +40,7 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "libarchive library is disabled");
#endif
}
else if (path_to_archive.ends_with(".7z"))
else if (hasSupported7zExtension(path_to_archive))
{
#if USE_LIBARCHIVE
return std::make_shared<SevenZipArchiveReader>(path_to_archive);

View File

@ -1,3 +1,4 @@
#include <IO/Archives/ArchiveUtils.h>
#include <IO/Archives/LibArchiveWriter.h>
#include <IO/Archives/TarArchiveWriter.h>
#include <IO/Archives/ZipArchiveWriter.h>
@ -24,10 +25,7 @@ std::shared_ptr<IArchiveWriter> createArchiveWriter(const String & path_to_archi
std::shared_ptr<IArchiveWriter>
createArchiveWriter(const String & path_to_archive, [[maybe_unused]] std::unique_ptr<WriteBuffer> archive_write_buffer)
{
using namespace std::literals;
static constexpr std::array tar_extensions{
".tar"sv, ".tar.gz"sv, ".tgz"sv, ".tar.bz2"sv, ".tar.lzma"sv, ".tar.zst"sv, ".tzst"sv, ".tar.xz"sv};
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
if (hasSupportedZipExtension(path_to_archive))
{
#if USE_MINIZIP
return std::make_shared<ZipArchiveWriter>(path_to_archive, std::move(archive_write_buffer));
@ -35,8 +33,7 @@ createArchiveWriter(const String & path_to_archive, [[maybe_unused]] std::unique
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "minizip library is disabled");
#endif
}
else if (std::any_of(
tar_extensions.begin(), tar_extensions.end(), [&](const auto extension) { return path_to_archive.ends_with(extension); }))
else if (hasSupportedTarExtension(path_to_archive))
{
#if USE_LIBARCHIVE
return std::make_shared<TarArchiveWriter>(path_to_archive, std::move(archive_write_buffer));

View File

@ -24,6 +24,7 @@
#include <Common/assert_cast.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <Core/Settings.h>
@ -43,6 +44,11 @@ namespace ProfileEvents
extern const Event TinyS3Clients;
}
namespace CurrentMetrics
{
extern const Metric S3DiskNoKeyErrors;
}
namespace DB
{
@ -381,7 +387,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
/// The next call is NOT a recurcive call
/// This is a virtuall call Aws::S3::S3Client::HeadObject(const Model::HeadObjectRequest&)
return enrichErrorMessage(
return processRequestResult(
HeadObject(static_cast<const Model::HeadObjectRequest&>(request)));
}
@ -402,7 +408,7 @@ Model::ListObjectsOutcome Client::ListObjects(ListObjectsRequest & request) cons
Model::GetObjectOutcome Client::GetObject(GetObjectRequest & request) const
{
return enrichErrorMessage(
return processRequestResult(
doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); }));
}
@ -689,11 +695,14 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request
}
template <typename RequestResult>
RequestResult Client::enrichErrorMessage(RequestResult && outcome) const
RequestResult Client::processRequestResult(RequestResult && outcome) const
{
if (outcome.IsSuccess() || !isClientForDisk())
return std::forward<RequestResult>(outcome);
if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
CurrentMetrics::add(CurrentMetrics::S3DiskNoKeyErrors);
String enriched_message = fmt::format(
"{} {}",
outcome.GetError().GetMessage(),

View File

@ -271,7 +271,7 @@ private:
void insertRegionOverride(const std::string & bucket, const std::string & region) const;
template <typename RequestResult>
RequestResult enrichErrorMessage(RequestResult && outcome) const;
RequestResult processRequestResult(RequestResult && outcome) const;
String initial_endpoint;
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;

View File

@ -6,6 +6,7 @@
#include <Common/Exception.h>
#include <Common/quoteString.h>
#include <Common/re2.h>
#include <IO/Archives/ArchiveUtils.h>
#include <boost/algorithm/string/case_conv.hpp>
@ -29,7 +30,7 @@ namespace ErrorCodes
namespace S3
{
URI::URI(const std::string & uri_)
URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
{
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.region.amazonaws.com/key)
@ -54,10 +55,11 @@ URI::URI(const std::string & uri_)
static constexpr auto OSS = "OSS";
static constexpr auto EOS = "EOS";
if (containsArchive(uri_))
std::tie(uri_str, archive_pattern) = getPathToArchiveAndArchivePattern(uri_);
if (allow_archive_path_syntax)
std::tie(uri_str, archive_pattern) = getURIAndArchivePattern(uri_);
else
uri_str = uri_;
uri = Poco::URI(uri_str);
std::unordered_map<std::string, std::string> mapper;
@ -167,32 +169,37 @@ void URI::validateBucket(const String & bucket, const Poco::URI & uri)
!uri.empty() ? " (" + uri.toString() + ")" : "");
}
bool URI::containsArchive(const std::string & source)
std::pair<std::string, std::optional<std::string>> URI::getURIAndArchivePattern(const std::string & source)
{
size_t pos = source.find("::");
return (pos != std::string::npos);
}
if (pos == String::npos)
return {source, std::nullopt};
std::pair<std::string, std::string> URI::getPathToArchiveAndArchivePattern(const std::string & source)
{
size_t pos = source.find("::");
assert(pos != std::string::npos);
std::string_view path_to_archive_view = std::string_view{source}.substr(0, pos);
bool contains_spaces_around_operator = false;
while (path_to_archive_view.ends_with(' '))
{
contains_spaces_around_operator = true;
path_to_archive_view.remove_suffix(1);
}
std::string path_to_archive = source.substr(0, pos);
while ((!path_to_archive.empty()) && path_to_archive.ends_with(' '))
path_to_archive.pop_back();
std::string_view archive_pattern_view = std::string_view{source}.substr(pos + 2);
while (archive_pattern_view.starts_with(' '))
{
contains_spaces_around_operator = true;
archive_pattern_view.remove_prefix(1);
}
if (path_to_archive.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path to archive is empty");
/// possible situations when the first part can be archive is only if one of the following is true:
/// - it contains supported extension
/// - it contains spaces after or before :: (URI cannot contain spaces)
/// - it contains characters that could mean glob expression
if (archive_pattern_view.empty() || path_to_archive_view.empty()
|| (!contains_spaces_around_operator && !hasSupportedArchiveExtension(path_to_archive_view)
&& path_to_archive_view.find_first_of("*?{") == std::string_view::npos))
return {source, std::nullopt};
std::string_view path_in_archive_view = std::string_view{source}.substr(pos + 2);
while (path_in_archive_view.front() == ' ')
path_in_archive_view.remove_prefix(1);
if (path_in_archive_view.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filename is empty");
return {path_to_archive, std::string{path_in_archive_view}};
return std::pair{std::string{path_to_archive_view}, std::string{archive_pattern_view}};
}
}

View File

@ -36,14 +36,13 @@ struct URI
bool is_virtual_hosted_style;
URI() = default;
explicit URI(const std::string & uri_);
explicit URI(const std::string & uri_, bool allow_archive_path_syntax = false);
void addRegionToURI(const std::string & region);
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
private:
bool containsArchive(const std::string & source);
std::pair<std::string, std::string> getPathToArchiveAndArchivePattern(const std::string & source);
std::pair<std::string, std::optional<std::string>> getURIAndArchivePattern(const std::string & source);
};
}

View File

@ -4,8 +4,6 @@
#include <Interpreters/InDepthNodeVisitor.h>
#include <Parsers/IAST_fwd.h>
#include <unordered_set>
namespace DB
{

View File

@ -142,14 +142,14 @@ ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context,
void StorageS3Configuration::fromNamedCollection(const NamedCollection & collection, ContextPtr context)
{
const auto settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);
auto filename = collection.getOrDefault<String>("filename", "");
if (!filename.empty())
url = S3::URI(std::filesystem::path(collection.get<String>("url")) / filename);
url = S3::URI(std::filesystem::path(collection.get<String>("url")) / filename, settings.allow_archive_path_syntax);
else
url = S3::URI(collection.get<String>("url"));
url = S3::URI(collection.get<String>("url"), settings.allow_archive_path_syntax);
auth_settings.access_key_id = collection.getOrDefault<String>("access_key_id", "");
auth_settings.secret_access_key = collection.getOrDefault<String>("secret_access_key", "");
@ -330,7 +330,7 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
}
/// This argument is always the first
url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"));
url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"), context->getSettingsRef().allow_archive_path_syntax);
if (engine_args_to_idx.contains("format"))
{

View File

@ -25,6 +25,7 @@
#include <IO/WriteHelpers.h>
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/IArchiveReader.h>
#include <IO/Archives/ArchiveUtils.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/IOUringReader.h>
@ -2207,7 +2208,11 @@ void registerStorageFile(StorageFactory & factory)
else if (type == Field::Types::UInt64)
source_fd = static_cast<int>(literal->value.get<UInt64>());
else if (type == Field::Types::String)
StorageFile::parseFileSource(literal->value.get<String>(), source_path, storage_args.path_to_archive);
StorageFile::parseFileSource(
literal->value.get<String>(),
source_path,
storage_args.path_to_archive,
factory_args.getLocalContext()->getSettingsRef().allow_archive_path_syntax);
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second argument must be path or file descriptor");
}
@ -2234,8 +2239,14 @@ SchemaCache & StorageFile::getSchemaCache(const ContextPtr & context)
return schema_cache;
}
void StorageFile::parseFileSource(String source, String & filename, String & path_to_archive)
void StorageFile::parseFileSource(String source, String & filename, String & path_to_archive, bool allow_archive_path_syntax)
{
if (!allow_archive_path_syntax)
{
filename = std::move(source);
return;
}
size_t pos = source.find("::");
if (pos == String::npos)
{
@ -2247,18 +2258,21 @@ void StorageFile::parseFileSource(String source, String & filename, String & pat
while (path_to_archive_view.ends_with(' '))
path_to_archive_view.remove_suffix(1);
if (path_to_archive_view.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path to archive is empty");
path_to_archive = path_to_archive_view;
std::string_view filename_view = std::string_view{source}.substr(pos + 2);
while (filename_view.front() == ' ')
while (filename_view.starts_with(' '))
filename_view.remove_prefix(1);
if (filename_view.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filename is empty");
/// possible situations when the first part can be archive is only if one of the following is true:
/// - it contains supported extension
/// - it contains characters that could mean glob expression
if (filename_view.empty() || path_to_archive_view.empty()
|| (!hasSupportedArchiveExtension(path_to_archive_view) && path_to_archive_view.find_first_of("*?{") == std::string_view::npos))
{
filename = std::move(source);
return;
}
path_to_archive = path_to_archive_view;
filename = filename_view;
}

View File

@ -128,7 +128,7 @@ public:
static SchemaCache & getSchemaCache(const ContextPtr & context);
static void parseFileSource(String source, String & filename, String & path_to_archive);
static void parseFileSource(String source, String & filename, String & path_to_archive, bool allow_archive_path_syntax);
static ArchiveInfo getArchiveInfo(
const std::string & path_to_archive,

View File

@ -31,8 +31,6 @@ ColumnsDescription StorageSystemClusters::getColumnsDescription()
{"database_shard_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database shard (for clusters that belong to a `Replicated` database)."},
{"database_replica_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database replica (for clusters that belong to a `Replicated` database)."},
{"is_active", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "The status of the Replicated database replica (for clusters that belong to a Replicated database): 1 means 'replica is online', 0 means 'replica is offline', NULL means 'unknown'."},
{"replication_lag", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt32>()), "The replication lag of the `Replicated` database replica (for clusters that belong to a Replicated database)."},
{"recovery_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()), "The recovery time of the `Replicated` database replica (for clusters that belong to a Replicated database), in milliseconds."},
};
description.setAliases({
@ -48,30 +46,31 @@ void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr co
writeCluster(res_columns, name_and_cluster, {});
const auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & [database_name, database] : databases)
for (const auto & name_and_database : databases)
{
if (const auto * replicated = typeid_cast<const DatabaseReplicated *>(database.get()))
if (const auto * replicated = typeid_cast<const DatabaseReplicated *>(name_and_database.second.get()))
{
if (auto database_cluster = replicated->tryGetCluster())
writeCluster(res_columns, {database_name, database_cluster},
replicated->tryGetReplicasInfo(database_cluster));
writeCluster(res_columns, {name_and_database.first, database_cluster},
replicated->tryGetAreReplicasActive(database_cluster));
if (auto database_cluster = replicated->tryGetAllGroupsCluster())
writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + database_name, database_cluster},
replicated->tryGetReplicasInfo(database_cluster));
writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + name_and_database.first, database_cluster},
replicated->tryGetAreReplicasActive(database_cluster));
}
}
}
void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster,
const ReplicasInfo & replicas_info)
const std::vector<UInt8> & is_active)
{
const String & cluster_name = name_and_cluster.first;
const ClusterPtr & cluster = name_and_cluster.second;
const auto & shards_info = cluster->getShardsInfo();
const auto & addresses_with_failover = cluster->getShardsAddresses();
size_t global_replica_idx = 0;
size_t replica_idx = 0;
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
const auto & shard_info = shards_info[shard_index];
@ -100,24 +99,10 @@ void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const Nam
res_columns[i++]->insert(pool_status[replica_index].estimated_recovery_time.count());
res_columns[i++]->insert(address.database_shard_name);
res_columns[i++]->insert(address.database_replica_name);
if (replicas_info.empty())
{
if (is_active.empty())
res_columns[i++]->insertDefault();
res_columns[i++]->insertDefault();
res_columns[i++]->insertDefault();
}
else
{
const auto & replica_info = replicas_info[global_replica_idx];
res_columns[i++]->insert(replica_info.is_active);
res_columns[i++]->insert(replica_info.replication_lag);
if (replica_info.recovery_time != 0)
res_columns[i++]->insert(replica_info.recovery_time);
else
res_columns[i++]->insertDefault();
}
++global_replica_idx;
res_columns[i++]->insert(is_active[replica_idx++]);
}
}
}

View File

@ -1,10 +1,10 @@
#pragma once
#include <Databases/DatabaseReplicated.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
@ -27,7 +27,7 @@ protected:
using NameAndCluster = std::pair<String, std::shared_ptr<Cluster>>;
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const ReplicasInfo & replicas_info);
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const std::vector<UInt8> & is_active);
};
}

View File

@ -31,6 +31,7 @@ ColumnsDescription StorageSystemScheduler::getColumnsDescription()
{"dequeued_requests", std::make_shared<DataTypeUInt64>(), "The total number of resource requests dequeued from this node."},
{"canceled_requests", std::make_shared<DataTypeUInt64>(), "The total number of resource requests canceled from this node."},
{"dequeued_cost", std::make_shared<DataTypeInt64>(), "The sum of costs (e.g. size in bytes) of all requests dequeued from this node."},
{"throughput", std::make_shared<DataTypeFloat64>(), "Current average throughput (dequeued cost per second)."},
{"canceled_cost", std::make_shared<DataTypeInt64>(), "The sum of costs (e.g. size in bytes) of all requests canceled from this node."},
{"busy_periods", std::make_shared<DataTypeUInt64>(), "The total number of deactivations of this node."},
{"vruntime", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeFloat64>()),
@ -96,6 +97,7 @@ void StorageSystemScheduler::fillData(MutableColumns & res_columns, ContextPtr c
res_columns[i++]->insert(node->dequeued_requests.load());
res_columns[i++]->insert(node->canceled_requests.load());
res_columns[i++]->insert(node->dequeued_cost.load());
res_columns[i++]->insert(node->throughput.rate(static_cast<double>(clock_gettime_ns())/1e9));
res_columns[i++]->insert(node->canceled_cost.load());
res_columns[i++]->insert(node->busy_periods.load());

View File

@ -26,7 +26,7 @@ void TableFunctionFile::parseFirstArguments(const ASTPtr & arg, const ContextPtr
if (context->getApplicationType() != Context::ApplicationType::LOCAL)
{
ITableFunctionFileLike::parseFirstArguments(arg, context);
StorageFile::parseFileSource(std::move(filename), filename, path_to_archive);
StorageFile::parseFileSource(std::move(filename), filename, path_to_archive, context->getSettingsRef().allow_archive_path_syntax);
return;
}
@ -42,7 +42,8 @@ void TableFunctionFile::parseFirstArguments(const ASTPtr & arg, const ContextPtr
else if (filename == "stderr")
fd = STDERR_FILENO;
else
StorageFile::parseFileSource(std::move(filename), filename, path_to_archive);
StorageFile::parseFileSource(
std::move(filename), filename, path_to_archive, context->getSettingsRef().allow_archive_path_syntax);
}
else if (type == Field::Types::Int64 || type == Field::Types::UInt64)
{
@ -63,9 +64,12 @@ std::optional<String> TableFunctionFile::tryGetFormatFromFirstArgument()
return FormatFactory::instance().tryGetFormatFromFileName(filename);
}
StoragePtr TableFunctionFile::getStorage(const String & source,
const String & format_, const ColumnsDescription & columns,
ContextPtr global_context, const std::string & table_name,
StoragePtr TableFunctionFile::getStorage(
const String & source,
const String & format_,
const ColumnsDescription & columns,
ContextPtr global_context,
const std::string & table_name,
const std::string & compression_method_) const
{
// For `file` table function, we are going to use format settings from the

View File

@ -1019,7 +1019,9 @@ def _get_ext_check_name(check_name: str) -> str:
return check_name_with_group
def _cancel_pr_wf(s3: S3Helper, pr_number: int, cancel_sync: bool = False) -> None:
def _cancel_pr_workflow(
s3: S3Helper, pr_number: int, cancel_sync: bool = False
) -> None:
wf_data = CiMetadata(s3, pr_number).fetch_meta()
if not cancel_sync:
if not wf_data.run_id:
@ -1368,12 +1370,12 @@ def main() -> int:
assert indata, "Run config must be provided via --infile"
_update_gh_statuses_action(indata=indata, s3=s3)
### CANCEL PREVIOUS WORKFLOW RUN
### CANCEL THE PREVIOUS WORKFLOW RUN
elif args.cancel_previous_run:
if pr_info.is_merge_queue:
_cancel_pr_wf(s3, pr_info.merged_pr)
_cancel_pr_workflow(s3, pr_info.merged_pr)
elif pr_info.is_pr:
_cancel_pr_wf(s3, pr_info.number, cancel_sync=True)
_cancel_pr_workflow(s3, pr_info.number, cancel_sync=True)
else:
assert False, "BUG! Not supported scenario"

View File

@ -3,6 +3,7 @@ import fileinput
import json
import logging
import time
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
@ -298,6 +299,11 @@ class CiLogsCredentials:
def get_docker_arguments(
self, pr_info: PRInfo, check_start_time: str, check_name: str
) -> str:
run_by_hash_total = int(os.getenv("RUN_BY_HASH_TOTAL", "0"))
if run_by_hash_total > 1:
run_by_hash_num = int(os.getenv("RUN_BY_HASH_NUM", "0"))
check_name = f"{check_name} [{run_by_hash_num + 1}/{run_by_hash_total}]"
self.create_ci_logs_credentials()
if not self.config_path.exists():
logging.info("Do not use external logs pushing")

View File

@ -301,7 +301,7 @@ def get_worst_state(statuses: CommitStatuses) -> StatusType:
def create_ci_report(pr_info: PRInfo, statuses: CommitStatuses) -> str:
"""The function converst the statuses to TestResults and uploads the report
"""The function converts the statuses to TestResults and uploads the report
to S3 tests bucket. Then it returns the URL"""
test_results = [] # type: TestResults
for status in statuses:

View File

@ -293,9 +293,9 @@ class JobReport:
start_time: str
duration: float
additional_files: Union[Sequence[str], Sequence[Path]]
# clickhouse version, build job only
# ClickHouse version, build job only
version: str = ""
# checkname to set in commit status, set if differs from jjob name
# check_name to be set in commit status, set it if it differs from the job name
check_name: str = ""
# directory with artifacts to upload on s3
build_dir_for_upload: Union[Path, str] = ""
@ -667,11 +667,7 @@ ColorTheme = Tuple[str, str, str]
def _format_header(
header: str, branch_name: str, branch_url: Optional[str] = None
) -> str:
# Following line does not lower CI->Ci and SQLancer->Sqlancer. It only
# capitalizes the first letter and doesn't touch the rest of the word
result = " ".join([w[0].upper() + w[1:] for w in header.split(" ") if w])
result = result.replace("Clickhouse", "ClickHouse")
result = result.replace("clickhouse", "ClickHouse")
result = header
if "ClickHouse" not in result:
result = f"ClickHouse {result}"
if branch_url:

View File

@ -1,16 +1,19 @@
#!/usr/bin/env python3
# pylint: disable=unused-argument
# pylint: disable=broad-exception-raised
import logging
import os
import pytest # pylint:disable=import-error; for style check
from helpers.cluster import run_and_check
from helpers.cluster import run_and_check, is_port_free
from helpers.network import _NetworkManager
# This is a workaround for a problem with logging in pytest [1].
#
# [1]: https://github.com/pytest-dev/pytest/issues/5502
logging.raiseExceptions = False
PORTS_PER_WORKER = 50
@pytest.fixture(scope="session", autouse=True)
@ -111,5 +114,40 @@ def pytest_addoption(parser):
)
def get_unique_free_ports(total):
ports = []
for port in range(30000, 55000):
if is_port_free(port) and port not in ports:
ports.append(port)
if len(ports) == total:
return ports
raise Exception(f"Can't collect {total} ports. Collected: {len(ports)}")
def pytest_configure(config):
os.environ["INTEGRATION_TESTS_RUN_ID"] = config.option.run_id
# When running tests without pytest-xdist,
# the `pytest_xdist_setupnodes` hook is not executed
worker_ports = os.getenv("WORKER_FREE_PORTS", None)
if worker_ports is None:
master_ports = get_unique_free_ports(PORTS_PER_WORKER)
os.environ["WORKER_FREE_PORTS"] = " ".join([str(p) for p in master_ports])
def pytest_xdist_setupnodes(config, specs):
# Find {PORTS_PER_WORKER} * {number of xdist workers} ports and
# allocate pool of {PORTS_PER_WORKER} ports to each worker
# Get number of xdist workers
num_workers = len(specs)
# Get free ports which will be distributed across workers
ports = get_unique_free_ports(num_workers * PORTS_PER_WORKER)
# Iterate over specs of workers and add allocated ports to env variable
for i, spec in enumerate(specs):
start_range = i * PORTS_PER_WORKER
per_workrer_ports = ports[start_range : start_range + PORTS_PER_WORKER]
spec.env["WORKER_FREE_PORTS"] = " ".join([str(p) for p in per_workrer_ports])

View File

@ -135,6 +135,52 @@ def get_free_port():
return s.getsockname()[1]
def is_port_free(port: int) -> bool:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", port))
return True
except socket.error:
return False
class PortPoolManager:
"""
This class is used for distribution of ports allocated to single pytest-xdist worker
It can be used by multiple ClickHouseCluster instances
"""
# Shared between instances
all_ports = None
free_ports = None
def __init__(self):
self.used_ports = []
if self.all_ports is None:
worker_ports = os.getenv("WORKER_FREE_PORTS")
ports = [int(p) for p in worker_ports.split(" ")]
# Static vars
PortPoolManager.all_ports = ports
PortPoolManager.free_ports = ports
def get_port(self):
for port in self.free_ports:
if is_port_free(port):
self.free_ports.remove(port)
self.used_ports.append(port)
return port
raise Exception(
f"No free ports: {self.all_ports}",
)
def return_used_ports(self):
self.free_ports.extend(self.used_ports)
self.used_ports.clear()
def retry_exception(num, delay, func, exception=Exception, *args, **kwargs):
"""
Retry if `func()` throws, `num` times.
@ -248,7 +294,7 @@ def check_rabbitmq_is_available(rabbitmq_id, cookie):
),
stdout=subprocess.PIPE,
)
p.communicate()
p.wait(timeout=60)
return p.returncode == 0
@ -716,62 +762,67 @@ class ClickHouseCluster:
.stop()
)
self.port_pool = PortPoolManager()
@property
def kafka_port(self):
if self._kafka_port:
return self._kafka_port
self._kafka_port = get_free_port()
self._kafka_port = self.port_pool.get_port()
return self._kafka_port
@property
def schema_registry_port(self):
if self._schema_registry_port:
return self._schema_registry_port
self._schema_registry_port = get_free_port()
self._schema_registry_port = self.port_pool.get_port()
return self._schema_registry_port
@property
def schema_registry_auth_port(self):
if self._schema_registry_auth_port:
return self._schema_registry_auth_port
self._schema_registry_auth_port = get_free_port()
self._schema_registry_auth_port = self.port_pool.get_port()
return self._schema_registry_auth_port
@property
def kerberized_kafka_port(self):
if self._kerberized_kafka_port:
return self._kerberized_kafka_port
self._kerberized_kafka_port = get_free_port()
self._kerberized_kafka_port = self.port_pool.get_port()
return self._kerberized_kafka_port
@property
def azurite_port(self):
if self._azurite_port:
return self._azurite_port
self._azurite_port = get_free_port()
self._azurite_port = self.port_pool.get_port()
return self._azurite_port
@property
def mongo_port(self):
if self._mongo_port:
return self._mongo_port
self._mongo_port = get_free_port()
self._mongo_port = self.port_pool.get_port()
return self._mongo_port
@property
def mongo_no_cred_port(self):
if self._mongo_no_cred_port:
return self._mongo_no_cred_port
self._mongo_no_cred_port = get_free_port()
self._mongo_no_cred_port = self.port_pool.get_port()
return self._mongo_no_cred_port
@property
def redis_port(self):
if self._redis_port:
return self._redis_port
self._redis_port = get_free_port()
self._redis_port = self.port_pool.get_port()
return self._redis_port
def __exit__(self, exc_type, exc_val, exc_tb):
self.port_pool.return_used_ports()
def print_all_docker_pieces(self):
res_networks = subprocess.check_output(
f"docker network ls --filter name='{self.project_name}*'",

View File

@ -4,6 +4,8 @@ import logging
import pytest
import os
import minio
import random
import string
from helpers.cluster import ClickHouseCluster
from helpers.mock_servers import start_s3_mock
@ -45,6 +47,11 @@ def cluster():
cluster.shutdown()
def randomize_query_id(query_id, random_suffix_length=10):
letters = string.ascii_letters + string.digits
return f"{query_id}_{''.join(random.choice(letters) for _ in range(random_suffix_length))}"
@pytest.fixture(scope="module")
def init_broken_s3(cluster):
yield start_s3_mock(cluster, "broken_s3", "8083")
@ -61,6 +68,7 @@ def test_upload_after_check_works(cluster, broken_s3):
node.query(
"""
DROP TABLE IF EXISTS s3_upload_after_check_works;
CREATE TABLE s3_upload_after_check_works (
id Int64,
data String
@ -127,7 +135,9 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression
broken_s3.setup_at_create_multi_part_upload()
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_FAIL_CREATE_MPU_{compression}"
insert_query_id = randomize_query_id(
f"INSERT_INTO_TABLE_FUNCTION_FAIL_CREATE_MPU_{compression}"
)
error = node.query_and_get_error(
f"""
INSERT INTO
@ -169,7 +179,9 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload(
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_at_part_upload(count=1, after=2)
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_FAIL_UPLOAD_PART_{compression}"
insert_query_id = randomize_query_id(
f"INSERT_INTO_TABLE_FUNCTION_FAIL_UPLOAD_PART_{compression}"
)
error = node.query_and_get_error(
f"""
INSERT INTO
@ -221,7 +233,7 @@ def test_when_error_is_retried(cluster, broken_s3, action_and_message):
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_at_part_upload(count=3, after=2, action=action)
insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED"
insert_query_id = randomize_query_id(f"INSERT_INTO_TABLE_{action}_RETRIED")
node.query(
f"""
INSERT INTO
@ -250,7 +262,7 @@ def test_when_error_is_retried(cluster, broken_s3, action_and_message):
assert s3_errors == 3
broken_s3.setup_at_part_upload(count=1000, after=2, action=action)
insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED_1"
insert_query_id = randomize_query_id(f"INSERT_INTO_TABLE_{action}_RETRIED_1")
error = node.query_and_get_error(
f"""
INSERT INTO
@ -285,7 +297,7 @@ def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
action="broken_pipe",
)
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD"
insert_query_id = randomize_query_id(f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD")
node.query(
f"""
INSERT INTO
@ -319,7 +331,7 @@ def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
after=2,
action="broken_pipe",
)
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1"
insert_query_id = randomize_query_id(f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1")
error = node.query_and_get_error(
f"""
INSERT INTO
@ -361,7 +373,7 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried(
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
insert_query_id = randomize_query_id(
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD_{send_something}"
)
node.query(
@ -398,7 +410,7 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried(
action="connection_reset_by_peer",
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
insert_query_id = randomize_query_id(
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD_{send_something}_1"
)
error = node.query_and_get_error(
@ -443,7 +455,7 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
insert_query_id = randomize_query_id(
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD_{send_something}"
)
node.query(
@ -481,7 +493,7 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
insert_query_id = randomize_query_id(
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD_{send_something}_1"
)
error = node.query_and_get_error(
@ -521,7 +533,7 @@ def test_query_is_canceled_with_inf_retries(cluster, broken_s3):
action="connection_refused",
)
insert_query_id = f"TEST_QUERY_IS_CANCELED_WITH_INF_RETRIES"
insert_query_id = randomize_query_id(f"TEST_QUERY_IS_CANCELED_WITH_INF_RETRIES")
request = node.get_query_request(
f"""
INSERT INTO
@ -579,7 +591,7 @@ def test_adaptive_timeouts(cluster, broken_s3, node_name):
count=1000000,
)
insert_query_id = f"TEST_ADAPTIVE_TIMEOUTS_{node_name}"
insert_query_id = randomize_query_id(f"TEST_ADAPTIVE_TIMEOUTS_{node_name}")
node.query(
f"""
INSERT INTO
@ -631,6 +643,7 @@ def test_no_key_found_disk(cluster, broken_s3):
node.query(
"""
DROP TABLE IF EXISTS no_key_found_disk;
CREATE TABLE no_key_found_disk (
id Int64
) ENGINE=MergeTree()
@ -689,3 +702,15 @@ def test_no_key_found_disk(cluster, broken_s3):
"DB::Exception: The specified key does not exist. This error happened for S3 disk."
in error
)
s3_disk_no_key_errors_metric_value = int(
node.query(
"""
SELECT value
FROM system.metrics
WHERE metric = 'S3DiskNoKeyErrors'
"""
).strip()
)
assert s3_disk_no_key_errors_metric_value > 0

View File

@ -1,41 +0,0 @@
<clickhouse>
<tcp_port>9000</tcp_port>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<profile>default</profile>
<no_password></no_password>
</default>
</users>
<keeper_server>
<tcp_port>2181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<session_timeout_ms>20000</session_timeout_ms>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>localhost</hostname>
<port>9444</port>
</server>
</raft_configuration>
</keeper_server>
<zookeeper>
<node index="1">
<host>localhost</host>
<port>2181</port>
</node>
<session_timeout_ms>20000</session_timeout_ms>
</zookeeper>
</clickhouse>

View File

@ -1,61 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/config.xml"],
stay_alive=True,
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_recovery_time_metric(start_cluster):
node.query(
"""
DROP DATABASE IF EXISTS rdb;
CREATE DATABASE rdb
ENGINE = Replicated('/test/test_recovery_time_metric', 'shard1', 'replica1')
"""
)
node.query(
"""
DROP TABLE IF EXISTS rdb.t;
CREATE TABLE rdb.t
(
`x` UInt32
)
ENGINE = MergeTree
ORDER BY x
"""
)
node.exec_in_container(["bash", "-c", "rm /var/lib/clickhouse/metadata/rdb/t.sql"])
node.restart_clickhouse()
ret = int(
node.query(
"""
SELECT recovery_time
FROM system.clusters
WHERE cluster = 'rdb'
"""
).strip()
)
assert ret > 0
node.query(
"""
DROP DATABASE rdb
"""
)

View File

@ -141,6 +141,9 @@ def test_drop_if_exists():
def test_replication():
node1.query("CREATE FUNCTION f2 AS (x, y) -> x - y")
node1.query(
"CREATE FUNCTION f3 AS () -> (SELECT sum(s) FROM (SELECT 1 as s UNION ALL SELECT 1 as s))"
)
assert (
node1.query("SELECT create_query FROM system.functions WHERE name='f2'")
@ -154,7 +157,11 @@ def test_replication():
assert node1.query("SELECT f2(12,3)") == "9\n"
assert node2.query("SELECT f2(12,3)") == "9\n"
assert node1.query("SELECT f3()") == "2\n"
assert node2.query("SELECT f3()") == "2\n"
node1.query("DROP FUNCTION f2")
node1.query("DROP FUNCTION f3")
assert (
node1.query("SELECT create_query FROM system.functions WHERE name='f2'") == ""
)
@ -214,7 +221,9 @@ def test_reload_zookeeper():
)
# config reloads, but can still work
node1.query("CREATE FUNCTION f2 AS (x, y) -> x - y")
node1.query(
"CREATE FUNCTION f2 AS () -> (SELECT sum(s) FROM (SELECT 1 as s UNION ALL SELECT 1 as s))"
)
assert_eq_with_retry(
node2,
"SELECT name FROM system.functions WHERE name IN ['f1', 'f2'] ORDER BY name",
@ -269,7 +278,7 @@ def test_reload_zookeeper():
TSV(["f1", "f2", "f3"]),
)
assert node2.query("SELECT f1(12, 3), f2(12, 3), f3(12, 3)") == TSV([[15, 9, 4]])
assert node2.query("SELECT f1(12, 3), f2(), f3(12, 3)") == TSV([[15, 2, 4]])
active_zk_connections = get_active_zk_connections()
assert (
@ -307,3 +316,13 @@ def test_start_without_zookeeper():
"CREATE FUNCTION f1 AS (x, y) -> (x + y)\n",
)
node1.query("DROP FUNCTION f1")
def test_server_restart():
node1.query(
"CREATE FUNCTION f1 AS () -> (SELECT sum(s) FROM (SELECT 1 as s UNION ALL SELECT 1 as s))"
)
assert node1.query("SELECT f1()") == "2\n"
node1.restart_clickhouse()
assert node1.query("SELECT f1()") == "2\n"
node1.query("DROP FUNCTION f1")

View File

@ -8,6 +8,8 @@ import os
import json
import time
import glob
import random
import string
import pyspark
import delta
@ -52,6 +54,11 @@ def get_spark():
return builder.master("local").getOrCreate()
def randomize_table_name(table_name, random_suffix_length=10):
letters = string.ascii_letters + string.digits
return f"{table_name}{''.join(random.choice(letters) for _ in range(random_suffix_length))}"
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -151,7 +158,7 @@ def test_single_log_file(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_single_log_file"
TABLE_NAME = randomize_table_name("test_single_log_file")
inserted_data = "SELECT number as a, toString(number + 1) as b FROM numbers(100)"
parquet_data_path = create_initial_data_file(
@ -175,7 +182,7 @@ def test_partition_by(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_partition_by"
TABLE_NAME = randomize_table_name("test_partition_by")
write_delta_from_df(
spark,
@ -197,7 +204,7 @@ def test_checkpoint(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_checkpoint"
TABLE_NAME = randomize_table_name("test_checkpoint")
write_delta_from_df(
spark,
@ -272,7 +279,7 @@ def test_multiple_log_files(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_multiple_log_files"
TABLE_NAME = randomize_table_name("test_multiple_log_files")
write_delta_from_df(
spark, generate_data(spark, 0, 100), f"/{TABLE_NAME}", mode="overwrite"
@ -310,7 +317,7 @@ def test_metadata(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_metadata"
TABLE_NAME = randomize_table_name("test_metadata")
parquet_data_path = create_initial_data_file(
started_cluster,
@ -339,9 +346,9 @@ def test_metadata(started_cluster):
def test_types(started_cluster):
TABLE_NAME = "test_types"
TABLE_NAME = randomize_table_name("test_types")
spark = started_cluster.spark_session
result_file = f"{TABLE_NAME}_result_2"
result_file = randomize_table_name(f"{TABLE_NAME}_result_2")
delta_table = (
DeltaTable.create(spark)
@ -415,7 +422,7 @@ def test_restart_broken(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = "broken"
TABLE_NAME = "test_restart_broken"
TABLE_NAME = randomize_table_name("test_restart_broken")
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)
@ -452,6 +459,18 @@ def test_restart_broken(started_cluster):
f"SELECT count() FROM {TABLE_NAME}"
)
s3_disk_no_key_errors_metric_value = int(
instance.query(
"""
SELECT value
FROM system.metrics
WHERE metric = 'S3DiskNoKeyErrors'
"""
).strip()
)
assert s3_disk_no_key_errors_metric_value == 0
minio_client.make_bucket(bucket)
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
@ -464,7 +483,7 @@ def test_restart_broken_table_function(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = "broken2"
TABLE_NAME = "test_restart_broken_table_function"
TABLE_NAME = randomize_table_name("test_restart_broken_table_function")
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)
@ -518,7 +537,7 @@ def test_partition_columns(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_partition_columns"
TABLE_NAME = randomize_table_name("test_partition_columns")
result_file = f"{TABLE_NAME}"
partition_columns = ["b", "c", "d", "e"]

View File

@ -10,9 +10,9 @@
<type>cache</type>
<disk>local_disk</disk>
<path>/tiny_local_cache/</path>
<max_size>10M</max_size>
<max_file_segment_size>1M</max_file_segment_size>
<boundary_alignment>1M</boundary_alignment>
<max_size>12M</max_size>
<max_file_segment_size>100K</max_file_segment_size>
<boundary_alignment>100K</boundary_alignment>
<cache_on_write_operations>1</cache_on_write_operations>
</tiny_local_cache>

View File

@ -7,6 +7,9 @@ import fnmatch
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
MB = 1024 * 1024
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
@ -36,15 +39,30 @@ def test_cache_evicted_by_temporary_data(start_cluster):
q("SELECT sum(size) FROM system.filesystem_cache").strip()
)
assert get_cache_size() == 0
dump_debug_info = lambda: "\n".join(
[
">>> filesystem_cache <<<",
q("SELECT * FROM system.filesystem_cache FORMAT Vertical"),
">>> remote_data_paths <<<",
q("SELECT * FROM system.remote_data_paths FORMAT Vertical"),
">>> tiny_local_cache_local_disk <<<",
q(
"SELECT * FROM system.disks WHERE name = 'tiny_local_cache_local_disk' FORMAT Vertical"
),
]
)
assert get_free_space() > 8 * 1024 * 1024
q("SYSTEM DROP FILESYSTEM CACHE")
q("DROP TABLE IF EXISTS t1 SYNC")
assert get_cache_size() == 0, dump_debug_info()
assert get_free_space() > 8 * MB, dump_debug_info()
# Codec is NONE to make cache size predictable
q(
"CREATE TABLE t1 (x UInt64 CODEC(NONE), y UInt64 CODEC(NONE)) ENGINE = MergeTree ORDER BY x SETTINGS storage_policy = 'tiny_local_cache'"
"CREATE TABLE t1 (x UInt64 CODEC(NONE)) ENGINE = MergeTree ORDER BY x SETTINGS storage_policy = 'tiny_local_cache'"
)
q("INSERT INTO t1 SELECT number, number FROM numbers(1024 * 1024)")
q("INSERT INTO t1 SELECT number FROM numbers(1024 * 1024)")
# To be sure that nothing is reading the cache and entries for t1 can be evited
q("OPTIMIZE TABLE t1 FINAL")
@ -54,11 +72,11 @@ def test_cache_evicted_by_temporary_data(start_cluster):
q("SELECT sum(x) FROM t1")
cache_size_with_t1 = get_cache_size()
assert cache_size_with_t1 > 8 * 1024 * 1024
assert cache_size_with_t1 > 8 * MB, dump_debug_info()
# Almost all disk space is occupied by t1 cache
free_space_with_t1 = get_free_space()
assert free_space_with_t1 < 4 * 1024 * 1024
assert free_space_with_t1 < 4 * MB, dump_debug_info()
# Try to sort the table, but fail because of lack of disk space
with pytest.raises(QueryRuntimeException) as exc:
@ -76,31 +94,27 @@ def test_cache_evicted_by_temporary_data(start_cluster):
# Some data evicted from cache by temporary data
cache_size_after_eviction = get_cache_size()
assert cache_size_after_eviction < cache_size_with_t1
assert cache_size_after_eviction < cache_size_with_t1, dump_debug_info()
# Disk space freed, at least 3 MB, because temporary data tried to write 4 MB
assert get_free_space() > free_space_with_t1 + 3 * 1024 * 1024
assert get_free_space() > free_space_with_t1 + 3 * MB, dump_debug_info()
# Read some data to fill the cache again
q("SELECT avg(y) FROM t1")
q("SELECT avg(x) FROM t1")
cache_size_with_t1 = get_cache_size()
assert cache_size_with_t1 > 8 * 1024 * 1024, q(
"SELECT * FROM system.filesystem_cache FORMAT Vertical"
)
assert cache_size_with_t1 > 8 * MB, dump_debug_info()
# Almost all disk space is occupied by t1 cache
free_space_with_t1 = get_free_space()
assert free_space_with_t1 < 4 * 1024 * 1024, q(
"SELECT * FROM system.disks WHERE name = 'tiny_local_cache_local_disk' FORMAT Vertical"
)
assert free_space_with_t1 < 4 * MB, dump_debug_info()
node.http_query(
"SELECT randomPrintableASCII(1024) FROM numbers(8 * 1024) FORMAT TSV",
params={"buffer_size": 0, "wait_end_of_query": 1},
)
assert get_free_space() > free_space_with_t1 + 3 * 1024 * 1024
assert get_free_space() > free_space_with_t1 + 3 * MB, dump_debug_info()
# not enough space for buffering 32 MB
with pytest.raises(Exception) as exc:
@ -112,4 +126,4 @@ def test_cache_evicted_by_temporary_data(start_cluster):
str(exc.value), "*Failed to reserve * for temporary file*"
), exc.value
q("DROP TABLE IF EXISTS t1")
q("DROP TABLE IF EXISTS t1 SYNC")

View File

@ -18,20 +18,25 @@ def started_cluster():
def test_persistence():
create_function_query1 = "CREATE FUNCTION MySum1 AS (a, b) -> a + b"
create_function_query2 = "CREATE FUNCTION MySum2 AS (a, b) -> MySum1(a, b) + b"
create_function_query3 = "CREATE FUNCTION MyUnion AS () -> (SELECT sum(s) FROM (SELECT 1 as s UNION ALL SELECT 1 as s))"
instance.query(create_function_query1)
instance.query(create_function_query2)
instance.query(create_function_query3)
assert instance.query("SELECT MySum1(1,2)") == "3\n"
assert instance.query("SELECT MySum2(1,2)") == "5\n"
assert instance.query("SELECT MyUnion()") == "2\n"
instance.restart_clickhouse()
assert instance.query("SELECT MySum1(1,2)") == "3\n"
assert instance.query("SELECT MySum2(1,2)") == "5\n"
assert instance.query("SELECT MyUnion()") == "2\n"
instance.query("DROP FUNCTION MySum2")
instance.query("DROP FUNCTION MySum1")
instance.query("DROP FUNCTION MyUnion")
instance.restart_clickhouse()
@ -48,3 +53,10 @@ def test_persistence():
or "Function with name 'MySum2' does not exist. In scope SELECT MySum2(1, 2)"
in error_message
)
error_message = instance.query_and_get_error("SELECT MyUnion()")
assert (
"Unknown function MyUnion" in error_message
or "Function with name 'MyUnion' does not exist. In scope SELECT MyUnion"
in error_message
)

View File

@ -52,8 +52,6 @@ CREATE TABLE system.clusters
`database_shard_name` String,
`database_replica_name` String,
`is_active` Nullable(UInt8),
`replication_lag` Nullable(UInt32),
`recovery_time` Nullable(UInt64),
`name` String ALIAS cluster
)
ENGINE = SystemClusters

View File

@ -64,7 +64,7 @@ toStartOfMonth;toDateTime64;false 2099-07-07
type;toStartOfMonth;toDateTime64;false Date
toStartOfWeek;toDate32;false 2099-07-07
type;toStartOfWeek;toDate32;false Date
toStartOfWeek;toDateTime64;false 2099-07-07
toStartOfWeek;toDateTime64;false 1970-01-01
type;toStartOfWeek;toDateTime64;false Date
toMonday;toDate32;false 2099-07-08
type;toMonday;toDate32;false Date

View File

@ -1,14 +1,14 @@
#!/usr/bin/expect -f
log_user 0
set timeout 60
set timeout 30
match_max 100000
spawn bash -c "clickhouse-local"
expect ":) "
send -- "SET send_logs_level = 't'\r"
expect "Exception on client:"
expect "Unexpected value of LogsLevel:" {} timeout {exit 1}
expect ":) "
send -- "exit\r"
expect eof

View File

@ -1 +0,0 @@
SELECT * FROM file('::a'); -- { serverError BAD_ARGUMENTS }

View File

@ -1,35 +0,0 @@
#!/usr/bin/env bash
function test()
{
echo "test"
$CH_CLIENT -q "insert into test select number, number from numbers(100000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, 'str_' || toString(number) from numbers(100000, 100000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1)) from numbers(200000, 100000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, NULL from numbers(300000, 100000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, multiIf(number % 4 == 3, 'str_' || toString(number), number % 4 == 2, NULL, number % 4 == 1, number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1))) from numbers(400000, 400000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, [range((number % 10 + 1)::UInt64)]::Array(Array(Dynamic)) from numbers(100000, 100000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "select distinct dynamicType(d) as type from test order by type"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'UInt64'"
$CH_CLIENT -q "select count() from test where d.UInt64 is not NULL"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'String'"
$CH_CLIENT -q "select count() from test where d.String is not NULL"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Date'"
$CH_CLIENT -q "select count() from test where d.Date is not NULL"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Array(Variant(String, UInt64))'"
$CH_CLIENT -q "select count() from test where not empty(d.\`Array(Variant(String, UInt64))\`)"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Array(Array(Dynamic))'"
$CH_CLIENT -q "select count() from test where not empty(d.\`Array(Array(Dynamic))\`)"
$CH_CLIENT -q "select count() from test where d is NULL"
$CH_CLIENT -q "select count() from test where not empty(d.\`Tuple(a Array(Dynamic))\`.a.String)"
$CH_CLIENT -q "select d, d.UInt64, d.String, d.\`Array(Variant(String, UInt64))\` from test format Null"
$CH_CLIENT -q "select d.UInt64, d.String, d.\`Array(Variant(String, UInt64))\` from test format Null"
$CH_CLIENT -q "select d.Int8, d.Date, d.\`Array(String)\` from test format Null"
$CH_CLIENT -q "select d, d.UInt64, d.Date, d.\`Array(Variant(String, UInt64))\`, d.\`Array(Variant(String, UInt64))\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64 from test format Null"
$CH_CLIENT -q "select d.UInt64, d.Date, d.\`Array(Variant(String, UInt64))\`, d.\`Array(Variant(String, UInt64))\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64, d.\`Array(Variant(String, UInt64))\`.String from test format Null"
$CH_CLIENT -q "select d, d.\`Tuple(a UInt64, b String)\`.a, d.\`Array(Dynamic)\`.\`Variant(String, UInt64)\`.UInt64, d.\`Array(Variant(String, UInt64))\`.UInt64 from test format Null"
$CH_CLIENT -q "select d.\`Array(Dynamic)\`.\`Variant(String, UInt64)\`.UInt64, d.\`Array(Dynamic)\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64 from test format Null"
$CH_CLIENT -q "select d.\`Array(Array(Dynamic))\`.size1, d.\`Array(Array(Dynamic))\`.UInt64, d.\`Array(Array(Dynamic))\`.\`Map(String, Tuple(a UInt64))\`.values.a from test format Null"
}

View File

@ -1,19 +0,0 @@
Memory
test
Array(Array(Dynamic))
Array(Variant(String, UInt64))
None
String
UInt64
200000
200000
200000
200000
0
0
200000
200000
100000
100000
200000
0

View File

@ -1,19 +0,0 @@
#!/usr/bin/env bash
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./03036_dynamic_read_subcolumns.lib
. "$CUR_DIR"/03036_dynamic_read_subcolumns.lib
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1"
$CH_CLIENT -q "drop table if exists test;"
echo "Memory"
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=Memory"
test
$CH_CLIENT -q "drop table test;"

View File

@ -1,19 +0,0 @@
MergeTree compact
test
Array(Array(Dynamic))
Array(Variant(String, UInt64))
None
String
UInt64
200000
200000
200000
200000
0
0
200000
200000
100000
100000
200000
0

View File

@ -1,19 +0,0 @@
#!/usr/bin/env bash
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./03036_dynamic_read_subcolumns.lib
. "$CUR_DIR"/03036_dynamic_read_subcolumns.lib
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1"
$CH_CLIENT -q "drop table if exists test;"
echo "MergeTree compact"
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=MergeTree order by id settings min_rows_for_wide_part=1000000000, min_bytes_for_wide_part=10000000000;"
test
$CH_CLIENT -q "drop table test;"

View File

@ -1,19 +0,0 @@
MergeTree wide
test
Array(Array(Dynamic))
Array(Variant(String, UInt64))
None
String
UInt64
200000
200000
200000
200000
0
0
200000
200000
100000
100000
200000
0

View File

@ -1,19 +0,0 @@
#!/usr/bin/env bash
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./03036_dynamic_read_subcolumns.lib
. "$CUR_DIR"/03036_dynamic_read_subcolumns.lib
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1"
$CH_CLIENT -q "drop table if exists test;"
echo "MergeTree wide"
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;"
test
$CH_CLIENT -q "drop table test;"

View File

@ -21,8 +21,9 @@ SYSTEM DROP MARK CACHE;
SELECT count() FROM t_compact_bytes_s3 WHERE NOT ignore(c2, c4);
SYSTEM FLUSH LOGS;
-- Errors in S3 requests will be automatically retried, however ProfileEvents can be wrong. That is why we subtract errors.
SELECT
ProfileEvents['S3ReadRequestsCount'],
ProfileEvents['S3ReadRequestsCount'] - ProfileEvents['S3ReadRequestsErrors'],
ProfileEvents['ReadBufferFromS3Bytes'] < ProfileEvents['ReadCompressedBytes'] * 1.1
FROM system.query_log
WHERE event_date >= yesterday() AND type = 'QueryFinish'
@ -30,7 +31,7 @@ WHERE event_date >= yesterday() AND type = 'QueryFinish'
AND query ilike '%INSERT INTO t_compact_bytes_s3 SELECT number, number, number%';
SELECT
ProfileEvents['S3ReadRequestsCount'],
ProfileEvents['S3ReadRequestsCount'] - ProfileEvents['S3ReadRequestsErrors'],
ProfileEvents['ReadBufferFromS3Bytes'] < ProfileEvents['ReadCompressedBytes'] * 1.1
FROM system.query_log
WHERE event_date >= yesterday() AND type = 'QueryFinish'

View File

@ -5,10 +5,10 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CUR_DIR"/../shell_config.sh
output=$(${CLICKHOUSE_CLIENT} -t -q "SELECT sleepEachRow(2) FORMAT Null" 2>&1)
{ echo "$output" | grep -q "^2\." && echo "Ok"; } || { echo "Fail"; echo "'$output'"; }
{ number=$(echo "$output" | grep -o "^[0-9]"); [[ -n "$number" && "$number" -ge 2 ]] && echo "Ok"; } || { echo "Fail"; echo "'$output'"; }
output=$(${CLICKHOUSE_CLIENT} --time -q "SELECT sleepEachRow(2) FORMAT Null" 2>&1)
{ echo "$output" | grep -q "^2\." && echo "Ok"; } || { echo "Fail"; echo "'$output'"; }
{ number=$(echo "$output" | grep -o "^[0-9]"); [[ -n "$number" && "$number" -ge 2 ]] && echo "Ok"; } || { echo "Fail"; echo "'$output'"; }
output=$(${CLICKHOUSE_CLIENT} --memory-usage -q "SELECT sum(number) FROM numbers(10_000) FORMAT Null" 2>&1)
{ echo "$output" | grep -q "^[0-9]\+$" && echo "Ok"; } || { echo "Fail"; echo "'$output'"; }

View File

@ -1,11 +0,0 @@
-- Tags: no-parallel
CREATE DATABASE rdb1 ENGINE = Replicated('/test/test_replication_lag_metric', 'shard1', 'replica1');
CREATE DATABASE rdb2 ENGINE = Replicated('/test/test_replication_lag_metric', 'shard1', 'replica2');
SET distributed_ddl_task_timeout = 0;
CREATE TABLE rdb1.t (id UInt32) ENGINE = ReplicatedMergeTree ORDER BY id;
SELECT replication_lag FROM system.clusters WHERE cluster IN ('rdb1', 'rdb2') ORDER BY cluster ASC, replica_num ASC;
DROP DATABASE rdb1;
DROP DATABASE rdb2;

View File

@ -0,0 +1,16 @@
::nonexistentfile.csv
1
nonexistent::nonexistentfile.csv
1
nonexistent :: nonexistentfile.csv
1
nonexistent ::nonexistentfile.csv
1
nonexistent.tar.gz :: nonexistentfile.csv
1
nonexistent.zip:: nonexistentfile.csv
1
nonexistent.tar.gz :: nonexistentfile.csv SETTINGS allow_archive_path_syntax=0
1
nonexistent.zip:: nonexistentfile.csv SETTINGS allow_archive_path_syntax=0
1

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function try_to_read_file()
{
file_to_read=$1
file_argument=$2
settings=$3
echo $file_argument $settings
$CLICKHOUSE_LOCAL -q "SELECT * FROM file('$file_argument') $settings" 2>&1 | grep -c "Cannot stat file.*$file_to_read"
}
# if archive extension is not detected for part before '::', path is taken as is
try_to_read_file "::nonexistentfile.csv" "::nonexistentfile.csv"
try_to_read_file "nonexistent::nonexistentfile.csv" "nonexistent::nonexistentfile.csv"
try_to_read_file "nonexistent :: nonexistentfile.csv" "nonexistent :: nonexistentfile.csv"
try_to_read_file "nonexistent ::nonexistentfile.csv" "nonexistent ::nonexistentfile.csv"
# if archive extension is detected for part before '::', path is split into archive and filename
try_to_read_file "nonexistent.tar.gz" "nonexistent.tar.gz :: nonexistentfile.csv"
try_to_read_file "nonexistent.zip" "nonexistent.zip:: nonexistentfile.csv"
# disabling archive syntax will always parse path as is
try_to_read_file "nonexistent.tar.gz :: nonexistentfile.csv" "nonexistent.tar.gz :: nonexistentfile.csv" "SETTINGS allow_archive_path_syntax=0"
try_to_read_file "nonexistent.zip:: nonexistentfile.csv" "nonexistent.zip:: nonexistentfile.csv" "SETTINGS allow_archive_path_syntax=0"

View File

@ -0,0 +1,3 @@
::03215_archive.csv test/::03215_archive.csv
test::03215_archive.csv test/test::03215_archive.csv
test.zip::03215_archive.csv test/test.zip::03215_archive.csv

View File

@ -0,0 +1,7 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
SELECT _file, _path FROM s3(s3_conn, filename='::03215_archive.csv') ORDER BY (_file, _path);
SELECT _file, _path FROM s3(s3_conn, filename='test :: 03215_archive.csv') ORDER BY (_file, _path); -- { serverError S3_ERROR }
SELECT _file, _path FROM s3(s3_conn, filename='test::03215_archive.csv') ORDER BY (_file, _path);
SELECT _file, _path FROM s3(s3_conn, filename='test.zip::03215_archive.csv') ORDER BY (_file, _path) SETTINGS allow_archive_path_syntax=0;

View File

@ -0,0 +1,2 @@
1970-01-01
1970-01-01

View File

@ -0,0 +1,2 @@
SELECT toStartOfWeek(toDateTime64('1970-01-01', 6));
SELECT toStartOfWeek(toDateTime('1970-01-01'));

View File

@ -0,0 +1 @@
2

View File

@ -0,0 +1,14 @@
DROP FUNCTION IF EXISTS 03215_udf_with_union;
CREATE FUNCTION 03215_udf_with_union AS () -> (
SELECT sum(s)
FROM
(
SELECT 1 AS s
UNION ALL
SELECT 1 AS s
)
);
SELECT 03215_udf_with_union();
DROP FUNCTION 03215_udf_with_union;

View File

@ -0,0 +1 @@
1
1 1

View File

@ -0,0 +1 @@
1
1 1

View File

@ -0,0 +1 @@
1
1 1