Merge branch 'ClickHouse:master' into aliasFeature

This commit is contained in:
Peter Nguyen 2024-08-03 21:38:42 -06:00 committed by GitHub
commit a31e2a4a5a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
94 changed files with 678 additions and 595 deletions

View File

@ -26,7 +26,6 @@ sed -i '/onBrokenMarkdownLinks:/ s/ignore/error/g' docusaurus.config.js
if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then
export CI=true
yarn install
exec yarn build "$@"
fi

View File

@ -849,7 +849,7 @@ try
#endif
#if defined(SANITIZER)
LOG_INFO(log, "Query Profiler disabled because they cannot work under sanitizers"
LOG_INFO(log, "Query Profiler is disabled because it cannot work under sanitizers"
" when two different stack unwinding methods will interfere with each other.");
#endif

View File

@ -1130,8 +1130,7 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_views_log>
<!-- Uncomment if use part log.
Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads).-->
<!-- Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads). -->
<part_log>
<database>system</database>
<table>part_log</table>
@ -1143,9 +1142,9 @@
<flush_on_crash>false</flush_on_crash>
</part_log>
<!-- Uncomment to write text log into table.
Text log contains all information from usual server log but stores it in structured and efficient way.
<!-- Text log contains all information from usual server log but stores it in structured and efficient way.
The level of the messages that goes to the table can be limited (<level>), if not specified all messages will go to the table.
-->
<text_log>
<database>system</database>
<table>text_log</table>
@ -1154,9 +1153,8 @@
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
<level></level>
<level>trace</level>
</text_log>
-->
<!-- Metric log contains rows with current values of ProfileEvents, CurrentMetrics collected with "collect_interval_milliseconds" interval. -->
<metric_log>

View File

@ -17,7 +17,7 @@
--input-shadow-color: rgba(0, 255, 0, 1);
--error-color: red;
--global-error-color: white;
--legend-background: rgba(255, 255, 255, 0.75);
--legend-background: rgba(255, 255, 0, 0.75);
--title-color: #666;
--text-color: black;
--edit-title-background: #FEE;
@ -41,7 +41,7 @@
--moving-shadow-color: rgba(255, 255, 255, 0.25);
--input-shadow-color: rgba(255, 128, 0, 0.25);
--error-color: #F66;
--legend-background: rgba(255, 255, 255, 0.25);
--legend-background: rgba(0, 96, 128, 0.75);
--title-color: white;
--text-color: white;
--edit-title-background: #364f69;
@ -218,6 +218,7 @@
#chart-params .param {
width: 6%;
font-family: monospace;
}
input {
@ -256,6 +257,7 @@
font-weight: bold;
user-select: none;
cursor: pointer;
margin-bottom: 1rem;
}
#run:hover {
@ -309,7 +311,7 @@
color: var(--param-text-color);
display: inline-block;
box-shadow: 1px 1px 0 var(--shadow-color);
margin-bottom: 1rem;
margin-bottom: 0.5rem;
}
input:focus {
@ -657,6 +659,10 @@ function insertParam(name, value) {
param_value.value = value;
param_value.spellcheck = false;
let setWidth = e => { e.style.width = (e.value.length + 1) + 'ch' };
if (value) { setWidth(param_value); }
param_value.addEventListener('input', e => setWidth(e.target));
param_wrapper.appendChild(param_name);
param_wrapper.appendChild(param_value);
document.getElementById('chart-params').appendChild(param_wrapper);
@ -945,6 +951,7 @@ function showMassEditor() {
let editor = document.getElementById('mass-editor-textarea');
editor.value = JSON.stringify({params: params, queries: queries}, null, 2);
editor.focus();
mass_editor_active = true;
}
@ -1004,7 +1011,7 @@ function legendAsTooltipPlugin({ className, style = { background: "var(--legend-
className && legendEl.classList.add(className);
uPlot.assign(legendEl.style, {
textAlign: "left",
textAlign: "right",
pointerEvents: "none",
display: "none",
position: "absolute",
@ -1051,8 +1058,10 @@ function legendAsTooltipPlugin({ className, style = { background: "var(--legend-
function update(u) {
let { left, top } = u.cursor;
left -= legendEl.clientWidth / 2;
top -= legendEl.clientHeight / 2;
/// This will make the balloon to the right of the cursor when the cursor is on the left side, and vise-versa,
/// avoiding the borders of the chart.
left -= legendEl.clientWidth * (left / u.width);
top -= legendEl.clientHeight;
legendEl.style.transform = "translate(" + left + "px, " + top + "px)";
if (multiline) {
@ -1139,7 +1148,7 @@ async function draw(idx, chart, url_params, query) {
let {reply, error} = await doFetch(query, url_params);
if (!error) {
if (reply.rows.length == 0) {
if (reply.rows == 0) {
error = "Query returned empty result.";
} else if (reply.meta.length < 2) {
error = "Query should return at least two columns: unix timestamp and value.";
@ -1229,14 +1238,53 @@ async function draw(idx, chart, url_params, query) {
let sync = uPlot.sync("sync");
let axis = {
function formatDateTime(t) {
return (new Date(t * 1000)).toISOString().replace('T', '\n').replace('.000Z', '');
}
function formatDateTimes(self, ticks) {
return ticks.map((t, idx) => {
let res = formatDateTime(t);
if (idx == 0 || res.substring(0, 10) != formatDateTime(ticks[idx - 1]).substring(0, 10)) {
return res;
} else {
return res.substring(11);
}
});
}
function formatValue(v) {
const a = Math.abs(v);
if (a >= 1000000000000000) { return (v / 1000000000000000) + 'P'; }
if (a >= 1000000000000) { return (v / 1000000000000) + 'T'; }
if (a >= 1000000000) { return (v / 1000000000) + 'G'; }
if (a >= 1000000) { return (v / 1000000) + 'M'; }
if (a >= 1000) { return (v / 1000) + 'K'; }
if (a > 0 && a < 0.001) { return (v * 1000000) + "μ"; }
return v;
}
let axis_x = {
stroke: axes_color,
grid: { width: 1 / devicePixelRatio, stroke: grid_color },
ticks: { width: 1 / devicePixelRatio, stroke: grid_color }
ticks: { width: 1 / devicePixelRatio, stroke: grid_color },
values: formatDateTimes,
space: 80,
incrs: [1, 5, 10, 15, 30,
60, 60 * 5, 60 * 10, 60 * 15, 60 * 30,
3600, 3600 * 2, 3600 * 3, 3600 * 4, 3600 * 6, 3600 * 12,
3600 * 24],
};
let axes = [axis, axis];
let series = [{ label: "x" }];
let axis_y = {
stroke: axes_color,
grid: { width: 1 / devicePixelRatio, stroke: grid_color },
ticks: { width: 1 / devicePixelRatio, stroke: grid_color },
values: (self, ticks) => ticks.map(formatValue)
};
let axes = [axis_x, axis_y];
let series = [{ label: "time", value: (self, t) => formatDateTime(t) }];
let data = [reply.data[reply.meta[0].name]];
// Treat every column as series
@ -1254,9 +1302,10 @@ async function draw(idx, chart, url_params, query) {
const opts = {
width: chart.clientWidth,
height: chart.clientHeight,
scales: { x: { time: false } }, /// Because we want to split and format time on our own.
axes,
series,
padding: [ null, null, null, (Math.round(max_value * 100) / 100).toString().length * 6 - 10 ],
padding: [ null, null, null, 3 ],
plugins: [ legendAsTooltipPlugin() ],
cursor: {
sync: {

View File

@ -43,6 +43,12 @@ size_t getCompoundTypeDepth(const IDataType & type)
const auto & tuple_elements = assert_cast<const DataTypeTuple &>(*current_type).getElements();
if (!tuple_elements.empty())
current_type = tuple_elements.at(0).get();
else
{
/// Special case: tuple with no element - tuple(). In this case, what's the compound type depth?
/// I'm not certain about the theoretical answer, but from experiment, 1 is the most reasonable choice.
return 1;
}
++result;
}

View File

@ -61,8 +61,6 @@ private:
void createRootNodes();
void removeAllNodes();
class ReplicatedDatabasesMetadataSync;
/// get_zookeeper will provide a zookeeper client without any fault injection
const zkutil::GetZooKeeper get_zookeeper;
const String root_zookeeper_path;

View File

@ -222,12 +222,21 @@ void RestorerFromBackup::setStage(const String & new_stage, const String & messa
if (restore_coordination)
{
restore_coordination->setStage(new_stage, message);
/// The initiator of a RESTORE ON CLUSTER query waits for other hosts to complete their work (see waitForStage(Stage::COMPLETED) in BackupsWorker::doRestore),
/// but other hosts shouldn't wait for each others' completion. (That's simply unnecessary and also
/// the initiator may start cleaning up (e.g. removing restore-coordination ZooKeeper nodes) once all other hosts are in Stage::COMPLETED.)
bool need_wait = (new_stage != Stage::COMPLETED);
if (need_wait)
{
if (new_stage == Stage::FINDING_TABLES_IN_BACKUP)
restore_coordination->waitForStage(new_stage, on_cluster_first_sync_timeout);
else
restore_coordination->waitForStage(new_stage);
}
}
}
void RestorerFromBackup::schedule(std::function<void()> && task_, const char * thread_name_)
{

View File

@ -218,10 +218,10 @@ AsyncLoader::~AsyncLoader()
{
// All `LoadTask` objects should be destructed before AsyncLoader destruction because they hold a reference.
// To make sure we check for all pending jobs to be finished.
{
std::unique_lock lock{mutex};
if (scheduled_jobs.empty() && finished_jobs.empty())
return;
if (!scheduled_jobs.empty() || !finished_jobs.empty())
{
std::vector<String> scheduled;
std::vector<String> finished;
scheduled.reserve(scheduled_jobs.size());
@ -233,6 +233,13 @@ AsyncLoader::~AsyncLoader()
LOG_ERROR(log, "Bug. Destruction with pending ({}) and finished ({}) load jobs.", fmt::join(scheduled, ", "), fmt::join(finished, ", "));
abort();
}
}
// When all jobs are done we could still have finalizing workers.
// These workers could call updateCurrentPriorityAndSpawn() that scans all pools.
// We need to stop all of them before destructing any of them.
stop();
}
void AsyncLoader::start()
{

View File

@ -306,6 +306,8 @@
\
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
\
M(S3DiskNoKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -19,7 +19,7 @@ Epoll::Epoll() : events_count(0)
{
epoll_fd = epoll_create1(0);
if (epoll_fd == -1)
throw DB::ErrnoException(DB::ErrorCodes::EPOLL_ERROR, "Cannot open epoll descriptor");
throw ErrnoException(ErrorCodes::EPOLL_ERROR, "Cannot open epoll descriptor");
}
Epoll::Epoll(Epoll && other) noexcept : epoll_fd(other.epoll_fd), events_count(other.events_count.load())
@ -47,7 +47,7 @@ void Epoll::add(int fd, void * ptr, uint32_t events)
++events_count;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1)
throw DB::ErrnoException(DB::ErrorCodes::EPOLL_ERROR, "Cannot add new descriptor to epoll");
throw ErrnoException(ErrorCodes::EPOLL_ERROR, "Cannot add new descriptor to epoll");
}
void Epoll::remove(int fd)
@ -55,7 +55,7 @@ void Epoll::remove(int fd)
--events_count;
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr) == -1)
throw DB::ErrnoException(DB::ErrorCodes::EPOLL_ERROR, "Cannot remove descriptor from epoll");
throw ErrnoException(ErrorCodes::EPOLL_ERROR, "Cannot remove descriptor from epoll");
}
size_t Epoll::getManyReady(int max_events, epoll_event * events_out, int timeout) const
@ -82,7 +82,7 @@ size_t Epoll::getManyReady(int max_events, epoll_event * events_out, int timeout
continue;
}
else
throw DB::ErrnoException(DB::ErrorCodes::EPOLL_ERROR, "Error in epoll_wait");
throw ErrnoException(ErrorCodes::EPOLL_ERROR, "Error in epoll_wait");
}
else
break;

View File

@ -4,8 +4,6 @@
#include <Common/ExponentiallySmoothedCounter.h>
#include <numbers>
namespace DB
{
@ -14,9 +12,10 @@ namespace DB
class EventRateMeter
{
public:
explicit EventRateMeter(double now, double period_)
explicit EventRateMeter(double now, double period_, size_t heating_ = 0)
: period(period_)
, half_decay_time(period * std::numbers::ln2) // for `ExponentiallySmoothedAverage::sumWeights()` to be equal to `1/period`
, max_interval(period * 10)
, heating(heating_)
{
reset(now);
}
@ -29,16 +28,11 @@ public:
{
// Remove data for initial heating stage that can present at the beginning of a query.
// Otherwise it leads to wrong gradual increase of average value, turning algorithm into not very reactive.
if (count != 0.0 && ++data_points < 5)
{
start = events.time;
events = ExponentiallySmoothedAverage();
}
if (count != 0.0 && data_points++ <= heating)
reset(events.time, data_points);
if (now - period <= start) // precise counting mode
events = ExponentiallySmoothedAverage(events.value + count, now);
else // exponential smoothing mode
events.add(count, now, half_decay_time);
duration.add(std::min(max_interval, now - duration.time), now, period);
events.add(count, now, period);
}
/// Compute average event rate throughout `[now - period, now]` period.
@ -49,24 +43,26 @@ public:
add(now, 0);
if (unlikely(now <= start))
return 0;
if (now - period <= start) // precise counting mode
return events.value / (now - start);
else // exponential smoothing mode
return events.get(half_decay_time); // equals to `events.value / period`
// We do not use .get() because sum of weights will anyway be canceled out (optimization)
return events.value / duration.value;
}
void reset(double now)
void reset(double now, size_t data_points_ = 0)
{
start = now;
events = ExponentiallySmoothedAverage();
data_points = 0;
duration = ExponentiallySmoothedAverage();
data_points = data_points_;
}
private:
const double period;
const double half_decay_time;
const double max_interval;
const size_t heating;
double start; // Instant in past without events before it; when measurement started or reset
ExponentiallySmoothedAverage events; // Estimated number of events in the last `period`
ExponentiallySmoothedAverage duration; // Current duration of a period
ExponentiallySmoothedAverage events; // Estimated number of events in last `duration` seconds
size_t data_points = 0;
};

View File

@ -253,16 +253,16 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddr
}
}
for (auto & rec : merged)
for (auto & record : merged)
{
if (!rec.failed)
if (!record.failed || !record.consecutive_fail_count)
continue;
/// Exponential increased time for each consecutive fail
auto banned_until = now - Poco::Timespan(history.totalMicroseconds() * (1ull << (rec.consecutive_fail_count - 1)));
if (rec.fail_time < banned_until)
auto banned_until = now - Poco::Timespan(history.totalMicroseconds() * (1ull << (record.consecutive_fail_count - 1)));
if (record.fail_time < banned_until)
{
rec.failed = false;
record.failed = false;
CurrentMetrics::sub(metrics.banned_count);
}
}

View File

@ -105,7 +105,7 @@ private:
bool write_progress_on_update = false;
EventRateMeter cpu_usage_meter{static_cast<double>(clock_gettime_ns()), 2'000'000'000 /*ns*/}; // average cpu utilization last 2 second
EventRateMeter cpu_usage_meter{static_cast<double>(clock_gettime_ns()), 2'000'000'000 /*ns*/, 4}; // average cpu utilization last 2 second, skip first 4 points
HostToTimesMap hosts_data;
/// In case of all of the above:
/// - clickhouse-local

View File

@ -3,6 +3,8 @@
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <Common/Priority.h>
#include <Common/EventRateMeter.h>
#include <Common/Stopwatch.h>
#include <base/defines.h>
#include <base/types.h>
@ -176,6 +178,14 @@ protected:
/// Postponed to be handled in scheduler thread, so it is intended to be called from outside.
void scheduleActivation();
/// Helper for introspection metrics
void incrementDequeued(ResourceCost cost)
{
dequeued_requests++;
dequeued_cost += cost;
throughput.add(static_cast<double>(clock_gettime_ns())/1e9, cost);
}
public:
EventQueue * const event_queue;
String basename;
@ -189,6 +199,10 @@ public:
std::atomic<ResourceCost> dequeued_cost{0};
std::atomic<ResourceCost> canceled_cost{0};
std::atomic<UInt64> busy_periods{0};
/// Average dequeued_cost per second
/// WARNING: Should only be accessed from the scheduler thread, so that locking is not required
EventRateMeter throughput{static_cast<double>(clock_gettime_ns())/1e9, 2, 1};
};
using SchedulerNodePtr = std::shared_ptr<ISchedulerNode>;

View File

@ -188,8 +188,7 @@ public:
if (request)
{
dequeued_requests++;
dequeued_cost += request->cost;
incrementDequeued(request->cost);
return {request, heap_size > 0};
}
}

View File

@ -59,8 +59,7 @@ public:
if (requests.empty())
busy_periods++;
queue_cost -= result->cost;
dequeued_requests++;
dequeued_cost += result->cost;
incrementDequeued(result->cost);
return {result, !requests.empty()};
}

View File

@ -122,8 +122,7 @@ public:
if (request)
{
dequeued_requests++;
dequeued_cost += request->cost;
incrementDequeued(request->cost);
return {request, !items.empty()};
}
}

View File

@ -81,8 +81,7 @@ public:
child_active = child_now_active;
if (!active())
busy_periods++;
dequeued_requests++;
dequeued_cost += request->cost;
incrementDequeued(request->cost);
return {request, active()};
}

View File

@ -89,8 +89,7 @@ public:
child_active = child_now_active;
if (!active())
busy_periods++;
dequeued_requests++;
dequeued_cost += request->cost;
incrementDequeued(request->cost);
return {request, active()};
}

View File

@ -162,8 +162,7 @@ public:
if (request == nullptr) // Possible in case of request cancel, just retry
continue;
dequeued_requests++;
dequeued_cost += request->cost;
incrementDequeued(request->cost);
return {request, current != nullptr};
}
}

View File

@ -2,6 +2,7 @@
#include <Common/TimerDescriptor.h>
#include <Common/Exception.h>
#include <Common/Epoll.h>
#include <Common/logger_useful.h>
#include <sys/timerfd.h>
@ -75,10 +76,22 @@ void TimerDescriptor::drain() const
/// or since the last successful read(2), then the buffer given to read(2) returns an unsigned 8-byte integer (uint64_t)
/// containing the number of expirations that have occurred.
/// (The returned value is in host byte order—that is, the native byte order for integers on the host machine.)
/// Due to a bug in Linux Kernel, reading from timerfd in non-blocking mode can be still blocking.
/// Avoid it with polling.
Epoll epoll;
epoll.add(timer_fd);
epoll_event event;
event.data.fd = -1;
size_t ready_count = epoll.getManyReady(1, &event, 0);
if (!ready_count)
return;
uint64_t buf;
while (true)
{
ssize_t res = ::read(timer_fd, &buf, sizeof(buf));
if (res < 0)
{
/// man timerfd_create:

View File

@ -0,0 +1,68 @@
#include <gtest/gtest.h>
#include <Common/EventRateMeter.h>
#include <cmath>
TEST(EventRateMeter, ExponentiallySmoothedAverage)
{
double target = 100.0;
// The test is only correct for timestep of 1 second because of
// how sum of weights is implemented inside `ExponentiallySmoothedAverage`
double time_step = 1.0;
for (double half_decay_time : { 0.1, 1.0, 10.0, 100.0})
{
DB::ExponentiallySmoothedAverage esa;
int steps = static_cast<int>(half_decay_time * 30 / time_step);
for (int i = 1; i <= steps; ++i)
esa.add(target * time_step, i * time_step, half_decay_time);
double measured = esa.get(half_decay_time);
ASSERT_LE(std::fabs(measured - target), 1e-5 * target);
}
}
TEST(EventRateMeter, ConstantRate)
{
double target = 100.0;
for (double period : {0.1, 1.0, 10.0})
{
for (double time_step : {0.001, 0.01, 0.1, 1.0})
{
DB::EventRateMeter erm(0.0, period);
int steps = static_cast<int>(period * 30 / time_step);
for (int i = 1; i <= steps; ++i)
erm.add(i * time_step, target * time_step);
double measured = erm.rate(steps * time_step);
// std::cout << "T=" << period << " dt=" << time_step << " measured=" << measured << std::endl;
ASSERT_LE(std::fabs(measured - target), 1e-5 * target);
}
}
}
TEST(EventRateMeter, PreciseStart)
{
double target = 100.0;
for (double period : {0.1, 1.0, 10.0})
{
for (double time_step : {0.001, 0.01, 0.1, 1.0})
{
DB::EventRateMeter erm(0.0, period);
int steps = static_cast<int>(period / time_step);
for (int i = 1; i <= steps; ++i)
{
erm.add(i * time_step, target * time_step);
double measured = erm.rate(i * time_step);
// std::cout << "T=" << period << " dt=" << time_step << " measured=" << measured << std::endl;
ASSERT_LE(std::fabs(measured - target), 1e-5 * target);
}
}
}
}

View File

@ -936,6 +936,7 @@ class IColumn;
M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \
M(Bool, parallel_replicas_prefer_local_join, true, "If true, and JOIN can be executed with parallel replicas algorithm, and all storages of right JOIN part are *MergeTree, local JOIN will be used instead of GLOBAL JOIN.", 0) \
M(UInt64, parallel_replicas_mark_segment_size, 128, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing", 0) \
M(Bool, allow_archive_path_syntax, true, "File/S3 engines/table function will parse paths with '::' as '<archive> :: <file>' if archive has correct extension", 0) \
\
M(Bool, allow_experimental_inverted_index, false, "If it is set to true, allow to use experimental inverted index.", 0) \
M(Bool, allow_experimental_full_text_index, false, "If it is set to true, allow to use experimental full-text index.", 0) \

View File

@ -76,6 +76,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"24.8",
{
{"merge_tree_min_bytes_per_task_for_remote_reading", 4194304, 2097152, "Value is unified with `filesystem_prefetch_min_bytes_for_single_read_task`"},
{"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."},
}
},
{"24.7",
@ -151,6 +152,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"cast_string_to_dynamic_use_inference", false, false, "Add setting to allow converting String to Dynamic through parsing"},
{"allow_experimental_dynamic_type", false, false, "Add new experimental Dynamic type"},
{"azure_max_blocks_in_multipart_upload", 50000, 50000, "Maximum number of blocks in multipart upload for Azure."},
{"allow_archive_path_syntax", false, true, "Added new setting to allow disabling archive path syntax."},
}
},
{"24.4",

View File

@ -103,7 +103,15 @@ static std::string getSortDescriptionDump(const SortDescription & description, c
WriteBufferFromOwnString buffer;
for (size_t i = 0; i < description.size(); ++i)
buffer << header_types[i]->getName() << ' ' << description[i].direction << ' ' << description[i].nulls_direction;
{
if (i != 0)
buffer << ", ";
buffer << "(type: " << header_types[i]->getName()
<< ", direction: " << description[i].direction
<< ", nulls_direction: " << description[i].nulls_direction
<< ")";
}
return buffer.str();
}

View File

@ -44,7 +44,7 @@ namespace ErrorCodes
DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_, time_t expiration_time_, ContextPtr context_)
: DatabaseOnDisk(name_, metadata_path_, "data/" + escapeForFileName(name_) + "/", "DatabaseLazy (" + name_ + ")", context_)
: DatabaseOnDisk(name_, metadata_path_, std::filesystem::path("data") / escapeForFileName(name_) / "", "DatabaseLazy (" + name_ + ")", context_)
, expiration_time(expiration_time_)
{
}

View File

@ -12,7 +12,7 @@ class DatabaseLazyIterator;
class Context;
/** Lazy engine of databases.
* Works like DatabaseOrdinary, but stores in memory only cache.
* Works like DatabaseOrdinary, but stores in memory only the cache.
* Can be used only with *Log engines.
*/
class DatabaseLazy final : public DatabaseOnDisk

View File

@ -313,7 +313,7 @@ void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const Stri
std::lock_guard lock(mutex);
if (const auto it = snapshot_detached_tables.find(table_name); it == snapshot_detached_tables.end())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Snapshot doesn't contain info about detached table={}", table_name);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Snapshot doesn't contain info about detached table `{}`", table_name);
}
else
{

View File

@ -16,6 +16,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/NormalizeSelectWithUnionQueryVisitor.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ParserCreateQuery.h>
@ -250,6 +251,8 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
convertMergeTreeToReplicatedIfNeeded(ast, qualified_name, file_name);
NormalizeSelectWithUnionQueryVisitor::Data data{local_context->getSettingsRef().union_default_mode};
NormalizeSelectWithUnionQueryVisitor{data}.visit(ast);
std::lock_guard lock{metadata.mutex};
metadata.parsed_tables[qualified_name] = ParsedTableMetadata{full_path.string(), ast};
metadata.total_dictionaries += create_query->is_dictionary;

View File

@ -12,7 +12,6 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/PoolId.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
@ -339,12 +338,9 @@ ClusterPtr DatabaseReplicated::getClusterImpl(bool all_groups) const
return std::make_shared<Cluster>(getContext()->getSettingsRef(), shards, params);
}
ReplicasInfo DatabaseReplicated::tryGetReplicasInfo(const ClusterPtr & cluster_) const
std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr & cluster_) const
{
Strings paths_get, paths_exists;
paths_get.emplace_back(fs::path(zookeeper_path) / "max_log_ptr");
Strings paths;
const auto & addresses_with_failover = cluster_->getShardsAddresses();
const auto & shards_info = cluster_->getShardsInfo();
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
@ -352,59 +348,32 @@ ReplicasInfo DatabaseReplicated::tryGetReplicasInfo(const ClusterPtr & cluster_)
for (const auto & replica : addresses_with_failover[shard_index])
{
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
paths_exists.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
paths_get.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "log_ptr");
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
}
}
try
{
auto current_zookeeper = getZooKeeper();
auto get_res = current_zookeeper->get(paths_get);
auto exist_res = current_zookeeper->exists(paths_exists);
chassert(get_res.size() == exist_res.size() + 1);
auto res = current_zookeeper->exists(paths);
auto max_log_ptr_zk = get_res[0];
if (max_log_ptr_zk.error != Coordination::Error::ZOK)
throw Coordination::Exception(max_log_ptr_zk.error);
std::vector<UInt8> statuses;
statuses.resize(paths.size());
UInt32 max_log_ptr = parse<UInt32>(max_log_ptr_zk.data);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error == Coordination::Error::ZOK)
statuses[i] = 1;
ReplicasInfo replicas_info;
replicas_info.resize(exist_res.size());
size_t global_replica_index = 0;
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
for (const auto & replica : addresses_with_failover[shard_index])
{
auto replica_active = exist_res[global_replica_index];
auto replica_log_ptr = get_res[global_replica_index + 1];
if (replica_active.error != Coordination::Error::ZOK && replica_active.error != Coordination::Error::ZNONODE)
throw Coordination::Exception(replica_active.error);
if (replica_log_ptr.error != Coordination::Error::ZOK)
throw Coordination::Exception(replica_log_ptr.error);
replicas_info[global_replica_index] = ReplicaInfo{
.is_active = replica_active.error == Coordination::Error::ZOK,
.replication_lag = max_log_ptr - parse<UInt32>(replica_log_ptr.data),
.recovery_time = replica.is_local ? ddl_worker->getCurrentInitializationDurationMs() : 0,
};
++global_replica_index;
return statuses;
}
}
return replicas_info;
} catch (...)
catch (...)
{
tryLogCurrentException(log);
return {};
}
}
void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco::Util::AbstractConfiguration & config_ref)
{
const auto & config_prefix = fmt::format("named_collections.{}", collection_name);

View File

@ -17,14 +17,6 @@ using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
struct ReplicaInfo
{
bool is_active;
UInt32 replication_lag;
UInt64 recovery_time;
};
using ReplicasInfo = std::vector<ReplicaInfo>;
class DatabaseReplicated : public DatabaseAtomic
{
public:
@ -92,7 +84,7 @@ public:
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, bool throw_if_noop);
ReplicasInfo tryGetReplicasInfo(const ClusterPtr & cluster_) const;
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;
void renameDatabase(ContextPtr query_context, const String & new_name) override;

View File

@ -32,12 +32,6 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db
bool DatabaseReplicatedDDLWorker::initializeMainThread()
{
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.emplace();
initialization_duration_timer->start();
}
while (!stop_flag)
{
try
@ -75,10 +69,6 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
initializeReplication();
initialized = true;
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.reset();
}
return true;
}
catch (...)
@ -88,11 +78,6 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
}
}
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.reset();
}
return false;
}
@ -474,10 +459,4 @@ UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const
return max_id.load();
}
UInt64 DatabaseReplicatedDDLWorker::getCurrentInitializationDurationMs() const
{
std::lock_guard lock(initialization_duration_timer_mutex);
return initialization_duration_timer ? initialization_duration_timer->elapsedMilliseconds() : 0;
}
}

View File

@ -36,8 +36,6 @@ public:
DatabaseReplicated * const database, bool committed = false); /// NOLINT
UInt32 getLogPointer() const;
UInt64 getCurrentInitializationDurationMs() const;
private:
bool initializeMainThread() override;
void initializeReplication();
@ -58,9 +56,6 @@ private:
ZooKeeperPtr active_node_holder_zookeeper;
/// It will remove "active" node when database is detached
zkutil::EphemeralNodeHolderPtr active_node_holder;
std::optional<Stopwatch> initialization_duration_timer;
mutable std::mutex initialization_duration_timer_mutex;
};
}

View File

@ -2,6 +2,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Backups/RestorerFromBackup.h>
#include <Core/Settings.h>
#include <Functions/FunctionFactory.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/UserDefinedExecutableFunctionFactory.h>
@ -9,6 +10,7 @@
#include <Functions/UserDefined/UserDefinedSQLObjectsBackup.h>
#include <Interpreters/Context.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/NormalizeSelectWithUnionQueryVisitor.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -80,13 +82,15 @@ namespace
validateFunctionRecursiveness(*function_body, name);
}
ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query)
ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query, const ContextPtr & context)
{
auto ptr = create_function_query.clone();
auto & res = typeid_cast<ASTCreateFunctionQuery &>(*ptr);
res.if_not_exists = false;
res.or_replace = false;
FunctionNameNormalizer::visit(res.function_core.get());
NormalizeSelectWithUnionQueryVisitor::Data data{context->getSettingsRef().union_default_mode};
NormalizeSelectWithUnionQueryVisitor{data}.visit(res.function_core);
return ptr;
}
}
@ -125,7 +129,7 @@ void UserDefinedSQLFunctionFactory::checkCanBeUnregistered(const ContextPtr & co
bool UserDefinedSQLFunctionFactory::registerFunction(const ContextMutablePtr & context, const String & function_name, ASTPtr create_function_query, bool throw_if_exists, bool replace_if_exists)
{
checkCanBeRegistered(context, function_name, *create_function_query);
create_function_query = normalizeCreateFunctionQuery(*create_function_query);
create_function_query = normalizeCreateFunctionQuery(*create_function_query, context);
try
{

View File

@ -1,7 +1,7 @@
#include "Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h"
#include "Functions/UserDefined/UserDefinedSQLFunctionFactory.h"
#include "Functions/UserDefined/UserDefinedSQLObjectType.h"
#include <Functions/UserDefined/UserDefinedSQLObjectType.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h>
#include <Common/StringUtils.h>
#include <Common/atomicRename.h>
@ -54,7 +54,7 @@ namespace
}
UserDefinedSQLObjectsDiskStorage::UserDefinedSQLObjectsDiskStorage(const ContextPtr & global_context_, const String & dir_path_)
: global_context(global_context_)
: UserDefinedSQLObjectsStorageBase(global_context_)
, dir_path{makeDirectoryPathCanonical(dir_path_)}
, log{getLogger("UserDefinedSQLObjectsLoaderFromDisk")}
{

View File

@ -42,7 +42,6 @@ private:
ASTPtr tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & file_path, bool check_file_exists);
String getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const;
ContextPtr global_context;
String dir_path;
LoggerPtr log;
std::atomic<bool> objects_loaded = false;

View File

@ -2,7 +2,10 @@
#include <boost/container/flat_set.hpp>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/NormalizeSelectWithUnionQueryVisitor.h>
#include <Parsers/ASTCreateFunctionQuery.h>
namespace DB
@ -17,18 +20,24 @@ namespace ErrorCodes
namespace
{
ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query)
ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query, const ContextPtr & context)
{
auto ptr = create_function_query.clone();
auto & res = typeid_cast<ASTCreateFunctionQuery &>(*ptr);
res.if_not_exists = false;
res.or_replace = false;
FunctionNameNormalizer::visit(res.function_core.get());
NormalizeSelectWithUnionQueryVisitor::Data data{context->getSettingsRef().union_default_mode};
NormalizeSelectWithUnionQueryVisitor{data}.visit(res.function_core);
return ptr;
}
}
UserDefinedSQLObjectsStorageBase::UserDefinedSQLObjectsStorageBase(ContextPtr global_context_)
: global_context(std::move(global_context_))
{}
ASTPtr UserDefinedSQLObjectsStorageBase::get(const String & object_name) const
{
std::lock_guard lock(mutex);
@ -148,7 +157,7 @@ void UserDefinedSQLObjectsStorageBase::setAllObjects(const std::vector<std::pair
{
std::unordered_map<String, ASTPtr> normalized_functions;
for (const auto & [function_name, create_query] : new_objects)
normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query);
normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query, global_context);
std::lock_guard lock(mutex);
object_name_to_create_object_map = std::move(normalized_functions);
@ -166,7 +175,7 @@ std::vector<std::pair<String, ASTPtr>> UserDefinedSQLObjectsStorageBase::getAllO
void UserDefinedSQLObjectsStorageBase::setObject(const String & object_name, const IAST & create_object_query)
{
std::lock_guard lock(mutex);
object_name_to_create_object_map[object_name] = normalizeCreateFunctionQuery(create_object_query);
object_name_to_create_object_map[object_name] = normalizeCreateFunctionQuery(create_object_query, global_context);
}
void UserDefinedSQLObjectsStorageBase::removeObject(const String & object_name)

View File

@ -4,6 +4,7 @@
#include <mutex>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST.h>
@ -13,6 +14,7 @@ namespace DB
class UserDefinedSQLObjectsStorageBase : public IUserDefinedSQLObjectsStorage
{
public:
explicit UserDefinedSQLObjectsStorageBase(ContextPtr global_context_);
ASTPtr get(const String & object_name) const override;
ASTPtr tryGet(const String & object_name) const override;
@ -64,6 +66,8 @@ protected:
std::unordered_map<String, ASTPtr> object_name_to_create_object_map;
mutable std::recursive_mutex mutex;
ContextPtr global_context;
};
}

View File

@ -48,7 +48,7 @@ namespace
UserDefinedSQLObjectsZooKeeperStorage::UserDefinedSQLObjectsZooKeeperStorage(
const ContextPtr & global_context_, const String & zookeeper_path_)
: global_context{global_context_}
: UserDefinedSQLObjectsStorageBase(global_context_)
, zookeeper_getter{[global_context_]() { return global_context_->getZooKeeper(); }}
, zookeeper_path{zookeeper_path_}
, watch_queue{std::make_shared<ConcurrentBoundedQueue<std::pair<UserDefinedSQLObjectType, String>>>(std::numeric_limits<size_t>::max())}

View File

@ -68,8 +68,6 @@ private:
void refreshObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type);
void syncObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type);
ContextPtr global_context;
zkutil::ZooKeeperCachingGetter zookeeper_getter;
String zookeeper_path;
std::atomic<bool> objects_loaded = false;

View File

@ -0,0 +1,50 @@
#include <IO/Archives/ArchiveUtils.h>
#include <string_view>
#include <array>
namespace DB
{
namespace
{
using namespace std::literals;
constexpr std::array tar_extensions{".tar"sv, ".tar.gz"sv, ".tgz"sv, ".tar.zst"sv, ".tzst"sv, ".tar.xz"sv, ".tar.bz2"sv, ".tar.lzma"sv};
constexpr std::array zip_extensions{".zip"sv, ".zipx"sv};
constexpr std::array sevenz_extensiosns{".7z"sv};
bool hasSupportedExtension(std::string_view path, const auto & supported_extensions)
{
for (auto supported_extension : supported_extensions)
{
if (path.ends_with(supported_extension))
return true;
}
return false;
}
}
bool hasSupportedTarExtension(std::string_view path)
{
return hasSupportedExtension(path, tar_extensions);
}
bool hasSupportedZipExtension(std::string_view path)
{
return hasSupportedExtension(path, zip_extensions);
}
bool hasSupported7zExtension(std::string_view path)
{
return hasSupportedExtension(path, sevenz_extensiosns);
}
bool hasSupportedArchiveExtension(std::string_view path)
{
return hasSupportedTarExtension(path) || hasSupportedZipExtension(path) || hasSupported7zExtension(path);
}
}

View File

@ -10,3 +10,17 @@
#include <archive.h>
#include <archive_entry.h>
#endif
#include <string_view>
namespace DB
{
bool hasSupportedTarExtension(std::string_view path);
bool hasSupportedZipExtension(std::string_view path);
bool hasSupported7zExtension(std::string_view path);
bool hasSupportedArchiveExtension(std::string_view path);
}

View File

@ -1,6 +1,7 @@
#include <IO/Archives/LibArchiveReader.h>
#include <IO/Archives/ZipArchiveReader.h>
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/ArchiveUtils.h>
#include <Common/Exception.h>
@ -12,7 +13,6 @@ extern const int CANNOT_UNPACK_ARCHIVE;
extern const int SUPPORT_IS_DISABLED;
}
std::shared_ptr<IArchiveReader> createArchiveReader(const String & path_to_archive)
{
return createArchiveReader(path_to_archive, {}, 0);
@ -24,11 +24,7 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
[[maybe_unused]] const std::function<std::unique_ptr<SeekableReadBuffer>()> & archive_read_function,
[[maybe_unused]] size_t archive_size)
{
using namespace std::literals;
static constexpr std::array tar_extensions{
".tar"sv, ".tar.gz"sv, ".tgz"sv, ".tar.zst"sv, ".tzst"sv, ".tar.xz"sv, ".tar.bz2"sv, ".tar.lzma"sv};
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
if (hasSupportedZipExtension(path_to_archive))
{
#if USE_MINIZIP
return std::make_shared<ZipArchiveReader>(path_to_archive, archive_read_function, archive_size);
@ -36,8 +32,7 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "minizip library is disabled");
#endif
}
else if (std::any_of(
tar_extensions.begin(), tar_extensions.end(), [&](const auto extension) { return path_to_archive.ends_with(extension); }))
else if (hasSupportedTarExtension(path_to_archive))
{
#if USE_LIBARCHIVE
return std::make_shared<TarArchiveReader>(path_to_archive, archive_read_function);
@ -45,7 +40,7 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "libarchive library is disabled");
#endif
}
else if (path_to_archive.ends_with(".7z"))
else if (hasSupported7zExtension(path_to_archive))
{
#if USE_LIBARCHIVE
return std::make_shared<SevenZipArchiveReader>(path_to_archive);

View File

@ -1,3 +1,4 @@
#include <IO/Archives/ArchiveUtils.h>
#include <IO/Archives/LibArchiveWriter.h>
#include <IO/Archives/TarArchiveWriter.h>
#include <IO/Archives/ZipArchiveWriter.h>
@ -24,10 +25,7 @@ std::shared_ptr<IArchiveWriter> createArchiveWriter(const String & path_to_archi
std::shared_ptr<IArchiveWriter>
createArchiveWriter(const String & path_to_archive, [[maybe_unused]] std::unique_ptr<WriteBuffer> archive_write_buffer)
{
using namespace std::literals;
static constexpr std::array tar_extensions{
".tar"sv, ".tar.gz"sv, ".tgz"sv, ".tar.bz2"sv, ".tar.lzma"sv, ".tar.zst"sv, ".tzst"sv, ".tar.xz"sv};
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
if (hasSupportedZipExtension(path_to_archive))
{
#if USE_MINIZIP
return std::make_shared<ZipArchiveWriter>(path_to_archive, std::move(archive_write_buffer));
@ -35,8 +33,7 @@ createArchiveWriter(const String & path_to_archive, [[maybe_unused]] std::unique
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "minizip library is disabled");
#endif
}
else if (std::any_of(
tar_extensions.begin(), tar_extensions.end(), [&](const auto extension) { return path_to_archive.ends_with(extension); }))
else if (hasSupportedTarExtension(path_to_archive))
{
#if USE_LIBARCHIVE
return std::make_shared<TarArchiveWriter>(path_to_archive, std::move(archive_write_buffer));

View File

@ -24,6 +24,7 @@
#include <Common/assert_cast.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <Core/Settings.h>
@ -43,6 +44,11 @@ namespace ProfileEvents
extern const Event TinyS3Clients;
}
namespace CurrentMetrics
{
extern const Metric S3DiskNoKeyErrors;
}
namespace DB
{
@ -381,7 +387,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
/// The next call is NOT a recurcive call
/// This is a virtuall call Aws::S3::S3Client::HeadObject(const Model::HeadObjectRequest&)
return enrichErrorMessage(
return processRequestResult(
HeadObject(static_cast<const Model::HeadObjectRequest&>(request)));
}
@ -402,7 +408,7 @@ Model::ListObjectsOutcome Client::ListObjects(ListObjectsRequest & request) cons
Model::GetObjectOutcome Client::GetObject(GetObjectRequest & request) const
{
return enrichErrorMessage(
return processRequestResult(
doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); }));
}
@ -689,11 +695,14 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request
}
template <typename RequestResult>
RequestResult Client::enrichErrorMessage(RequestResult && outcome) const
RequestResult Client::processRequestResult(RequestResult && outcome) const
{
if (outcome.IsSuccess() || !isClientForDisk())
return std::forward<RequestResult>(outcome);
if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
CurrentMetrics::add(CurrentMetrics::S3DiskNoKeyErrors);
String enriched_message = fmt::format(
"{} {}",
outcome.GetError().GetMessage(),

View File

@ -271,7 +271,7 @@ private:
void insertRegionOverride(const std::string & bucket, const std::string & region) const;
template <typename RequestResult>
RequestResult enrichErrorMessage(RequestResult && outcome) const;
RequestResult processRequestResult(RequestResult && outcome) const;
String initial_endpoint;
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;

View File

@ -6,6 +6,7 @@
#include <Common/Exception.h>
#include <Common/quoteString.h>
#include <Common/re2.h>
#include <IO/Archives/ArchiveUtils.h>
#include <boost/algorithm/string/case_conv.hpp>
@ -29,7 +30,7 @@ namespace ErrorCodes
namespace S3
{
URI::URI(const std::string & uri_)
URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
{
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.region.amazonaws.com/key)
@ -54,10 +55,11 @@ URI::URI(const std::string & uri_)
static constexpr auto OSS = "OSS";
static constexpr auto EOS = "EOS";
if (containsArchive(uri_))
std::tie(uri_str, archive_pattern) = getPathToArchiveAndArchivePattern(uri_);
if (allow_archive_path_syntax)
std::tie(uri_str, archive_pattern) = getURIAndArchivePattern(uri_);
else
uri_str = uri_;
uri = Poco::URI(uri_str);
std::unordered_map<std::string, std::string> mapper;
@ -167,32 +169,37 @@ void URI::validateBucket(const String & bucket, const Poco::URI & uri)
!uri.empty() ? " (" + uri.toString() + ")" : "");
}
bool URI::containsArchive(const std::string & source)
std::pair<std::string, std::optional<std::string>> URI::getURIAndArchivePattern(const std::string & source)
{
size_t pos = source.find("::");
return (pos != std::string::npos);
if (pos == String::npos)
return {source, std::nullopt};
std::string_view path_to_archive_view = std::string_view{source}.substr(0, pos);
bool contains_spaces_around_operator = false;
while (path_to_archive_view.ends_with(' '))
{
contains_spaces_around_operator = true;
path_to_archive_view.remove_suffix(1);
}
std::pair<std::string, std::string> URI::getPathToArchiveAndArchivePattern(const std::string & source)
std::string_view archive_pattern_view = std::string_view{source}.substr(pos + 2);
while (archive_pattern_view.starts_with(' '))
{
size_t pos = source.find("::");
assert(pos != std::string::npos);
contains_spaces_around_operator = true;
archive_pattern_view.remove_prefix(1);
}
std::string path_to_archive = source.substr(0, pos);
while ((!path_to_archive.empty()) && path_to_archive.ends_with(' '))
path_to_archive.pop_back();
/// possible situations when the first part can be archive is only if one of the following is true:
/// - it contains supported extension
/// - it contains spaces after or before :: (URI cannot contain spaces)
/// - it contains characters that could mean glob expression
if (archive_pattern_view.empty() || path_to_archive_view.empty()
|| (!contains_spaces_around_operator && !hasSupportedArchiveExtension(path_to_archive_view)
&& path_to_archive_view.find_first_of("*?{") == std::string_view::npos))
return {source, std::nullopt};
if (path_to_archive.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path to archive is empty");
std::string_view path_in_archive_view = std::string_view{source}.substr(pos + 2);
while (path_in_archive_view.front() == ' ')
path_in_archive_view.remove_prefix(1);
if (path_in_archive_view.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filename is empty");
return {path_to_archive, std::string{path_in_archive_view}};
return std::pair{std::string{path_to_archive_view}, std::string{archive_pattern_view}};
}
}

View File

@ -36,14 +36,13 @@ struct URI
bool is_virtual_hosted_style;
URI() = default;
explicit URI(const std::string & uri_);
explicit URI(const std::string & uri_, bool allow_archive_path_syntax = false);
void addRegionToURI(const std::string & region);
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
private:
bool containsArchive(const std::string & source);
std::pair<std::string, std::string> getPathToArchiveAndArchivePattern(const std::string & source);
std::pair<std::string, std::optional<std::string>> getURIAndArchivePattern(const std::string & source);
};
}

View File

@ -4,8 +4,6 @@
#include <Interpreters/InDepthNodeVisitor.h>
#include <Parsers/IAST_fwd.h>
#include <unordered_set>
namespace DB
{

View File

@ -657,7 +657,7 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
{
if (auto ctx = context.lock())
{
res.query_settings = std::make_shared<Settings>(ctx->getSettingsRef());
res.query_settings = std::make_shared<Settings>(ctx->getSettingsCopy());
res.current_database = ctx->getCurrentDatabase();
}
}

View File

@ -142,14 +142,14 @@ ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context,
void StorageS3Configuration::fromNamedCollection(const NamedCollection & collection, ContextPtr context)
{
const auto settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);
auto filename = collection.getOrDefault<String>("filename", "");
if (!filename.empty())
url = S3::URI(std::filesystem::path(collection.get<String>("url")) / filename);
url = S3::URI(std::filesystem::path(collection.get<String>("url")) / filename, settings.allow_archive_path_syntax);
else
url = S3::URI(collection.get<String>("url"));
url = S3::URI(collection.get<String>("url"), settings.allow_archive_path_syntax);
auth_settings.access_key_id = collection.getOrDefault<String>("access_key_id", "");
auth_settings.secret_access_key = collection.getOrDefault<String>("secret_access_key", "");
@ -330,7 +330,7 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
}
/// This argument is always the first
url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"));
url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"), context->getSettingsRef().allow_archive_path_syntax);
if (engine_args_to_idx.contains("format"))
{

View File

@ -25,6 +25,7 @@
#include <IO/WriteHelpers.h>
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/IArchiveReader.h>
#include <IO/Archives/ArchiveUtils.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/IOUringReader.h>
@ -2207,7 +2208,11 @@ void registerStorageFile(StorageFactory & factory)
else if (type == Field::Types::UInt64)
source_fd = static_cast<int>(literal->value.get<UInt64>());
else if (type == Field::Types::String)
StorageFile::parseFileSource(literal->value.get<String>(), source_path, storage_args.path_to_archive);
StorageFile::parseFileSource(
literal->value.get<String>(),
source_path,
storage_args.path_to_archive,
factory_args.getLocalContext()->getSettingsRef().allow_archive_path_syntax);
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second argument must be path or file descriptor");
}
@ -2234,8 +2239,14 @@ SchemaCache & StorageFile::getSchemaCache(const ContextPtr & context)
return schema_cache;
}
void StorageFile::parseFileSource(String source, String & filename, String & path_to_archive)
void StorageFile::parseFileSource(String source, String & filename, String & path_to_archive, bool allow_archive_path_syntax)
{
if (!allow_archive_path_syntax)
{
filename = std::move(source);
return;
}
size_t pos = source.find("::");
if (pos == String::npos)
{
@ -2247,18 +2258,21 @@ void StorageFile::parseFileSource(String source, String & filename, String & pat
while (path_to_archive_view.ends_with(' '))
path_to_archive_view.remove_suffix(1);
if (path_to_archive_view.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path to archive is empty");
path_to_archive = path_to_archive_view;
std::string_view filename_view = std::string_view{source}.substr(pos + 2);
while (filename_view.front() == ' ')
while (filename_view.starts_with(' '))
filename_view.remove_prefix(1);
if (filename_view.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filename is empty");
/// possible situations when the first part can be archive is only if one of the following is true:
/// - it contains supported extension
/// - it contains characters that could mean glob expression
if (filename_view.empty() || path_to_archive_view.empty()
|| (!hasSupportedArchiveExtension(path_to_archive_view) && path_to_archive_view.find_first_of("*?{") == std::string_view::npos))
{
filename = std::move(source);
return;
}
path_to_archive = path_to_archive_view;
filename = filename_view;
}

View File

@ -128,7 +128,7 @@ public:
static SchemaCache & getSchemaCache(const ContextPtr & context);
static void parseFileSource(String source, String & filename, String & path_to_archive);
static void parseFileSource(String source, String & filename, String & path_to_archive, bool allow_archive_path_syntax);
static ArchiveInfo getArchiveInfo(
const std::string & path_to_archive,

View File

@ -31,8 +31,6 @@ ColumnsDescription StorageSystemClusters::getColumnsDescription()
{"database_shard_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database shard (for clusters that belong to a `Replicated` database)."},
{"database_replica_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database replica (for clusters that belong to a `Replicated` database)."},
{"is_active", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "The status of the Replicated database replica (for clusters that belong to a Replicated database): 1 means 'replica is online', 0 means 'replica is offline', NULL means 'unknown'."},
{"replication_lag", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt32>()), "The replication lag of the `Replicated` database replica (for clusters that belong to a Replicated database)."},
{"recovery_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()), "The recovery time of the `Replicated` database replica (for clusters that belong to a Replicated database), in milliseconds."},
};
description.setAliases({
@ -48,30 +46,31 @@ void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr co
writeCluster(res_columns, name_and_cluster, {});
const auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & [database_name, database] : databases)
for (const auto & name_and_database : databases)
{
if (const auto * replicated = typeid_cast<const DatabaseReplicated *>(database.get()))
if (const auto * replicated = typeid_cast<const DatabaseReplicated *>(name_and_database.second.get()))
{
if (auto database_cluster = replicated->tryGetCluster())
writeCluster(res_columns, {database_name, database_cluster},
replicated->tryGetReplicasInfo(database_cluster));
writeCluster(res_columns, {name_and_database.first, database_cluster},
replicated->tryGetAreReplicasActive(database_cluster));
if (auto database_cluster = replicated->tryGetAllGroupsCluster())
writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + database_name, database_cluster},
replicated->tryGetReplicasInfo(database_cluster));
writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + name_and_database.first, database_cluster},
replicated->tryGetAreReplicasActive(database_cluster));
}
}
}
void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster,
const ReplicasInfo & replicas_info)
const std::vector<UInt8> & is_active)
{
const String & cluster_name = name_and_cluster.first;
const ClusterPtr & cluster = name_and_cluster.second;
const auto & shards_info = cluster->getShardsInfo();
const auto & addresses_with_failover = cluster->getShardsAddresses();
size_t global_replica_idx = 0;
size_t replica_idx = 0;
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
const auto & shard_info = shards_info[shard_index];
@ -100,24 +99,10 @@ void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const Nam
res_columns[i++]->insert(pool_status[replica_index].estimated_recovery_time.count());
res_columns[i++]->insert(address.database_shard_name);
res_columns[i++]->insert(address.database_replica_name);
if (replicas_info.empty())
{
if (is_active.empty())
res_columns[i++]->insertDefault();
res_columns[i++]->insertDefault();
res_columns[i++]->insertDefault();
}
else
{
const auto & replica_info = replicas_info[global_replica_idx];
res_columns[i++]->insert(replica_info.is_active);
res_columns[i++]->insert(replica_info.replication_lag);
if (replica_info.recovery_time != 0)
res_columns[i++]->insert(replica_info.recovery_time);
else
res_columns[i++]->insertDefault();
}
++global_replica_idx;
res_columns[i++]->insert(is_active[replica_idx++]);
}
}
}

View File

@ -1,10 +1,10 @@
#pragma once
#include <Databases/DatabaseReplicated.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
@ -27,7 +27,7 @@ protected:
using NameAndCluster = std::pair<String, std::shared_ptr<Cluster>>;
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const ReplicasInfo & replicas_info);
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const std::vector<UInt8> & is_active);
};
}

View File

@ -31,6 +31,7 @@ ColumnsDescription StorageSystemScheduler::getColumnsDescription()
{"dequeued_requests", std::make_shared<DataTypeUInt64>(), "The total number of resource requests dequeued from this node."},
{"canceled_requests", std::make_shared<DataTypeUInt64>(), "The total number of resource requests canceled from this node."},
{"dequeued_cost", std::make_shared<DataTypeInt64>(), "The sum of costs (e.g. size in bytes) of all requests dequeued from this node."},
{"throughput", std::make_shared<DataTypeFloat64>(), "Current average throughput (dequeued cost per second)."},
{"canceled_cost", std::make_shared<DataTypeInt64>(), "The sum of costs (e.g. size in bytes) of all requests canceled from this node."},
{"busy_periods", std::make_shared<DataTypeUInt64>(), "The total number of deactivations of this node."},
{"vruntime", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeFloat64>()),
@ -96,6 +97,7 @@ void StorageSystemScheduler::fillData(MutableColumns & res_columns, ContextPtr c
res_columns[i++]->insert(node->dequeued_requests.load());
res_columns[i++]->insert(node->canceled_requests.load());
res_columns[i++]->insert(node->dequeued_cost.load());
res_columns[i++]->insert(node->throughput.rate(static_cast<double>(clock_gettime_ns())/1e9));
res_columns[i++]->insert(node->canceled_cost.load());
res_columns[i++]->insert(node->busy_periods.load());

View File

@ -26,7 +26,7 @@ void TableFunctionFile::parseFirstArguments(const ASTPtr & arg, const ContextPtr
if (context->getApplicationType() != Context::ApplicationType::LOCAL)
{
ITableFunctionFileLike::parseFirstArguments(arg, context);
StorageFile::parseFileSource(std::move(filename), filename, path_to_archive);
StorageFile::parseFileSource(std::move(filename), filename, path_to_archive, context->getSettingsRef().allow_archive_path_syntax);
return;
}
@ -42,7 +42,8 @@ void TableFunctionFile::parseFirstArguments(const ASTPtr & arg, const ContextPtr
else if (filename == "stderr")
fd = STDERR_FILENO;
else
StorageFile::parseFileSource(std::move(filename), filename, path_to_archive);
StorageFile::parseFileSource(
std::move(filename), filename, path_to_archive, context->getSettingsRef().allow_archive_path_syntax);
}
else if (type == Field::Types::Int64 || type == Field::Types::UInt64)
{
@ -63,9 +64,12 @@ std::optional<String> TableFunctionFile::tryGetFormatFromFirstArgument()
return FormatFactory::instance().tryGetFormatFromFileName(filename);
}
StoragePtr TableFunctionFile::getStorage(const String & source,
const String & format_, const ColumnsDescription & columns,
ContextPtr global_context, const std::string & table_name,
StoragePtr TableFunctionFile::getStorage(
const String & source,
const String & format_,
const ColumnsDescription & columns,
ContextPtr global_context,
const std::string & table_name,
const std::string & compression_method_) const
{
// For `file` table function, we are going to use format settings from the

View File

@ -1019,7 +1019,9 @@ def _get_ext_check_name(check_name: str) -> str:
return check_name_with_group
def _cancel_pr_wf(s3: S3Helper, pr_number: int, cancel_sync: bool = False) -> None:
def _cancel_pr_workflow(
s3: S3Helper, pr_number: int, cancel_sync: bool = False
) -> None:
wf_data = CiMetadata(s3, pr_number).fetch_meta()
if not cancel_sync:
if not wf_data.run_id:
@ -1368,12 +1370,12 @@ def main() -> int:
assert indata, "Run config must be provided via --infile"
_update_gh_statuses_action(indata=indata, s3=s3)
### CANCEL PREVIOUS WORKFLOW RUN
### CANCEL THE PREVIOUS WORKFLOW RUN
elif args.cancel_previous_run:
if pr_info.is_merge_queue:
_cancel_pr_wf(s3, pr_info.merged_pr)
_cancel_pr_workflow(s3, pr_info.merged_pr)
elif pr_info.is_pr:
_cancel_pr_wf(s3, pr_info.number, cancel_sync=True)
_cancel_pr_workflow(s3, pr_info.number, cancel_sync=True)
else:
assert False, "BUG! Not supported scenario"

View File

@ -3,6 +3,7 @@ import fileinput
import json
import logging
import time
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
@ -298,6 +299,11 @@ class CiLogsCredentials:
def get_docker_arguments(
self, pr_info: PRInfo, check_start_time: str, check_name: str
) -> str:
run_by_hash_total = int(os.getenv("RUN_BY_HASH_TOTAL", "0"))
if run_by_hash_total > 1:
run_by_hash_num = int(os.getenv("RUN_BY_HASH_NUM", "0"))
check_name = f"{check_name} [{run_by_hash_num + 1}/{run_by_hash_total}]"
self.create_ci_logs_credentials()
if not self.config_path.exists():
logging.info("Do not use external logs pushing")

View File

@ -301,7 +301,7 @@ def get_worst_state(statuses: CommitStatuses) -> StatusType:
def create_ci_report(pr_info: PRInfo, statuses: CommitStatuses) -> str:
"""The function converst the statuses to TestResults and uploads the report
"""The function converts the statuses to TestResults and uploads the report
to S3 tests bucket. Then it returns the URL"""
test_results = [] # type: TestResults
for status in statuses:

View File

@ -293,9 +293,9 @@ class JobReport:
start_time: str
duration: float
additional_files: Union[Sequence[str], Sequence[Path]]
# clickhouse version, build job only
# ClickHouse version, build job only
version: str = ""
# checkname to set in commit status, set if differs from jjob name
# check_name to be set in commit status, set it if it differs from the job name
check_name: str = ""
# directory with artifacts to upload on s3
build_dir_for_upload: Union[Path, str] = ""
@ -667,11 +667,7 @@ ColorTheme = Tuple[str, str, str]
def _format_header(
header: str, branch_name: str, branch_url: Optional[str] = None
) -> str:
# Following line does not lower CI->Ci and SQLancer->Sqlancer. It only
# capitalizes the first letter and doesn't touch the rest of the word
result = " ".join([w[0].upper() + w[1:] for w in header.split(" ") if w])
result = result.replace("Clickhouse", "ClickHouse")
result = result.replace("clickhouse", "ClickHouse")
result = header
if "ClickHouse" not in result:
result = f"ClickHouse {result}"
if branch_url:

View File

@ -4,6 +4,8 @@ import logging
import pytest
import os
import minio
import random
import string
from helpers.cluster import ClickHouseCluster
from helpers.mock_servers import start_s3_mock
@ -45,6 +47,11 @@ def cluster():
cluster.shutdown()
def randomize_query_id(query_id, random_suffix_length=10):
letters = string.ascii_letters + string.digits
return f"{query_id}_{''.join(random.choice(letters) for _ in range(random_suffix_length))}"
@pytest.fixture(scope="module")
def init_broken_s3(cluster):
yield start_s3_mock(cluster, "broken_s3", "8083")
@ -61,6 +68,7 @@ def test_upload_after_check_works(cluster, broken_s3):
node.query(
"""
DROP TABLE IF EXISTS s3_upload_after_check_works;
CREATE TABLE s3_upload_after_check_works (
id Int64,
data String
@ -127,7 +135,9 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression
broken_s3.setup_at_create_multi_part_upload()
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_FAIL_CREATE_MPU_{compression}"
insert_query_id = randomize_query_id(
f"INSERT_INTO_TABLE_FUNCTION_FAIL_CREATE_MPU_{compression}"
)
error = node.query_and_get_error(
f"""
INSERT INTO
@ -169,7 +179,9 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload(
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_at_part_upload(count=1, after=2)
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_FAIL_UPLOAD_PART_{compression}"
insert_query_id = randomize_query_id(
f"INSERT_INTO_TABLE_FUNCTION_FAIL_UPLOAD_PART_{compression}"
)
error = node.query_and_get_error(
f"""
INSERT INTO
@ -221,7 +233,7 @@ def test_when_error_is_retried(cluster, broken_s3, action_and_message):
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_at_part_upload(count=3, after=2, action=action)
insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED"
insert_query_id = randomize_query_id(f"INSERT_INTO_TABLE_{action}_RETRIED")
node.query(
f"""
INSERT INTO
@ -250,7 +262,7 @@ def test_when_error_is_retried(cluster, broken_s3, action_and_message):
assert s3_errors == 3
broken_s3.setup_at_part_upload(count=1000, after=2, action=action)
insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED_1"
insert_query_id = randomize_query_id(f"INSERT_INTO_TABLE_{action}_RETRIED_1")
error = node.query_and_get_error(
f"""
INSERT INTO
@ -285,7 +297,7 @@ def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
action="broken_pipe",
)
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD"
insert_query_id = randomize_query_id(f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD")
node.query(
f"""
INSERT INTO
@ -319,7 +331,7 @@ def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
after=2,
action="broken_pipe",
)
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1"
insert_query_id = randomize_query_id(f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1")
error = node.query_and_get_error(
f"""
INSERT INTO
@ -361,7 +373,7 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried(
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
insert_query_id = randomize_query_id(
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD_{send_something}"
)
node.query(
@ -398,7 +410,7 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried(
action="connection_reset_by_peer",
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
insert_query_id = randomize_query_id(
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD_{send_something}_1"
)
error = node.query_and_get_error(
@ -443,7 +455,7 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
insert_query_id = randomize_query_id(
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD_{send_something}"
)
node.query(
@ -481,7 +493,7 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
action_args=["1"] if send_something else ["0"],
)
insert_query_id = (
insert_query_id = randomize_query_id(
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD_{send_something}_1"
)
error = node.query_and_get_error(
@ -521,7 +533,7 @@ def test_query_is_canceled_with_inf_retries(cluster, broken_s3):
action="connection_refused",
)
insert_query_id = f"TEST_QUERY_IS_CANCELED_WITH_INF_RETRIES"
insert_query_id = randomize_query_id(f"TEST_QUERY_IS_CANCELED_WITH_INF_RETRIES")
request = node.get_query_request(
f"""
INSERT INTO
@ -579,7 +591,7 @@ def test_adaptive_timeouts(cluster, broken_s3, node_name):
count=1000000,
)
insert_query_id = f"TEST_ADAPTIVE_TIMEOUTS_{node_name}"
insert_query_id = randomize_query_id(f"TEST_ADAPTIVE_TIMEOUTS_{node_name}")
node.query(
f"""
INSERT INTO
@ -631,6 +643,7 @@ def test_no_key_found_disk(cluster, broken_s3):
node.query(
"""
DROP TABLE IF EXISTS no_key_found_disk;
CREATE TABLE no_key_found_disk (
id Int64
) ENGINE=MergeTree()
@ -689,3 +702,15 @@ def test_no_key_found_disk(cluster, broken_s3):
"DB::Exception: The specified key does not exist. This error happened for S3 disk."
in error
)
s3_disk_no_key_errors_metric_value = int(
node.query(
"""
SELECT value
FROM system.metrics
WHERE metric = 'S3DiskNoKeyErrors'
"""
).strip()
)
assert s3_disk_no_key_errors_metric_value > 0

View File

@ -1,41 +0,0 @@
<clickhouse>
<tcp_port>9000</tcp_port>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<profile>default</profile>
<no_password></no_password>
</default>
</users>
<keeper_server>
<tcp_port>2181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<session_timeout_ms>20000</session_timeout_ms>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>localhost</hostname>
<port>9444</port>
</server>
</raft_configuration>
</keeper_server>
<zookeeper>
<node index="1">
<host>localhost</host>
<port>2181</port>
</node>
<session_timeout_ms>20000</session_timeout_ms>
</zookeeper>
</clickhouse>

View File

@ -1,61 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/config.xml"],
stay_alive=True,
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_recovery_time_metric(start_cluster):
node.query(
"""
DROP DATABASE IF EXISTS rdb;
CREATE DATABASE rdb
ENGINE = Replicated('/test/test_recovery_time_metric', 'shard1', 'replica1')
"""
)
node.query(
"""
DROP TABLE IF EXISTS rdb.t;
CREATE TABLE rdb.t
(
`x` UInt32
)
ENGINE = MergeTree
ORDER BY x
"""
)
node.exec_in_container(["bash", "-c", "rm /var/lib/clickhouse/metadata/rdb/t.sql"])
node.restart_clickhouse()
ret = int(
node.query(
"""
SELECT recovery_time
FROM system.clusters
WHERE cluster = 'rdb'
"""
).strip()
)
assert ret > 0
node.query(
"""
DROP DATABASE rdb
"""
)

View File

@ -141,6 +141,9 @@ def test_drop_if_exists():
def test_replication():
node1.query("CREATE FUNCTION f2 AS (x, y) -> x - y")
node1.query(
"CREATE FUNCTION f3 AS () -> (SELECT sum(s) FROM (SELECT 1 as s UNION ALL SELECT 1 as s))"
)
assert (
node1.query("SELECT create_query FROM system.functions WHERE name='f2'")
@ -154,7 +157,11 @@ def test_replication():
assert node1.query("SELECT f2(12,3)") == "9\n"
assert node2.query("SELECT f2(12,3)") == "9\n"
assert node1.query("SELECT f3()") == "2\n"
assert node2.query("SELECT f3()") == "2\n"
node1.query("DROP FUNCTION f2")
node1.query("DROP FUNCTION f3")
assert (
node1.query("SELECT create_query FROM system.functions WHERE name='f2'") == ""
)
@ -214,7 +221,9 @@ def test_reload_zookeeper():
)
# config reloads, but can still work
node1.query("CREATE FUNCTION f2 AS (x, y) -> x - y")
node1.query(
"CREATE FUNCTION f2 AS () -> (SELECT sum(s) FROM (SELECT 1 as s UNION ALL SELECT 1 as s))"
)
assert_eq_with_retry(
node2,
"SELECT name FROM system.functions WHERE name IN ['f1', 'f2'] ORDER BY name",
@ -269,7 +278,7 @@ def test_reload_zookeeper():
TSV(["f1", "f2", "f3"]),
)
assert node2.query("SELECT f1(12, 3), f2(12, 3), f3(12, 3)") == TSV([[15, 9, 4]])
assert node2.query("SELECT f1(12, 3), f2(), f3(12, 3)") == TSV([[15, 2, 4]])
active_zk_connections = get_active_zk_connections()
assert (
@ -307,3 +316,13 @@ def test_start_without_zookeeper():
"CREATE FUNCTION f1 AS (x, y) -> (x + y)\n",
)
node1.query("DROP FUNCTION f1")
def test_server_restart():
node1.query(
"CREATE FUNCTION f1 AS () -> (SELECT sum(s) FROM (SELECT 1 as s UNION ALL SELECT 1 as s))"
)
assert node1.query("SELECT f1()") == "2\n"
node1.restart_clickhouse()
assert node1.query("SELECT f1()") == "2\n"
node1.query("DROP FUNCTION f1")

View File

@ -8,6 +8,8 @@ import os
import json
import time
import glob
import random
import string
import pyspark
import delta
@ -52,6 +54,11 @@ def get_spark():
return builder.master("local").getOrCreate()
def randomize_table_name(table_name, random_suffix_length=10):
letters = string.ascii_letters + string.digits
return f"{table_name}{''.join(random.choice(letters) for _ in range(random_suffix_length))}"
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -151,7 +158,7 @@ def test_single_log_file(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_single_log_file"
TABLE_NAME = randomize_table_name("test_single_log_file")
inserted_data = "SELECT number as a, toString(number + 1) as b FROM numbers(100)"
parquet_data_path = create_initial_data_file(
@ -175,7 +182,7 @@ def test_partition_by(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_partition_by"
TABLE_NAME = randomize_table_name("test_partition_by")
write_delta_from_df(
spark,
@ -197,7 +204,7 @@ def test_checkpoint(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_checkpoint"
TABLE_NAME = randomize_table_name("test_checkpoint")
write_delta_from_df(
spark,
@ -272,7 +279,7 @@ def test_multiple_log_files(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_multiple_log_files"
TABLE_NAME = randomize_table_name("test_multiple_log_files")
write_delta_from_df(
spark, generate_data(spark, 0, 100), f"/{TABLE_NAME}", mode="overwrite"
@ -310,7 +317,7 @@ def test_metadata(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_metadata"
TABLE_NAME = randomize_table_name("test_metadata")
parquet_data_path = create_initial_data_file(
started_cluster,
@ -339,9 +346,9 @@ def test_metadata(started_cluster):
def test_types(started_cluster):
TABLE_NAME = "test_types"
TABLE_NAME = randomize_table_name("test_types")
spark = started_cluster.spark_session
result_file = f"{TABLE_NAME}_result_2"
result_file = randomize_table_name(f"{TABLE_NAME}_result_2")
delta_table = (
DeltaTable.create(spark)
@ -415,7 +422,7 @@ def test_restart_broken(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = "broken"
TABLE_NAME = "test_restart_broken"
TABLE_NAME = randomize_table_name("test_restart_broken")
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)
@ -452,6 +459,18 @@ def test_restart_broken(started_cluster):
f"SELECT count() FROM {TABLE_NAME}"
)
s3_disk_no_key_errors_metric_value = int(
instance.query(
"""
SELECT value
FROM system.metrics
WHERE metric = 'S3DiskNoKeyErrors'
"""
).strip()
)
assert s3_disk_no_key_errors_metric_value == 0
minio_client.make_bucket(bucket)
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
@ -464,7 +483,7 @@ def test_restart_broken_table_function(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = "broken2"
TABLE_NAME = "test_restart_broken_table_function"
TABLE_NAME = randomize_table_name("test_restart_broken_table_function")
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)
@ -518,7 +537,7 @@ def test_partition_columns(started_cluster):
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_partition_columns"
TABLE_NAME = randomize_table_name("test_partition_columns")
result_file = f"{TABLE_NAME}"
partition_columns = ["b", "c", "d", "e"]

View File

@ -13,9 +13,8 @@ node = cluster.add_instance(
)
system_logs = [
# disabled by default
("system.text_log", 0),
# enabled by default
("system.text_log", 1),
("system.query_log", 1),
("system.query_thread_log", 1),
("system.part_log", 1),

View File

@ -18,20 +18,25 @@ def started_cluster():
def test_persistence():
create_function_query1 = "CREATE FUNCTION MySum1 AS (a, b) -> a + b"
create_function_query2 = "CREATE FUNCTION MySum2 AS (a, b) -> MySum1(a, b) + b"
create_function_query3 = "CREATE FUNCTION MyUnion AS () -> (SELECT sum(s) FROM (SELECT 1 as s UNION ALL SELECT 1 as s))"
instance.query(create_function_query1)
instance.query(create_function_query2)
instance.query(create_function_query3)
assert instance.query("SELECT MySum1(1,2)") == "3\n"
assert instance.query("SELECT MySum2(1,2)") == "5\n"
assert instance.query("SELECT MyUnion()") == "2\n"
instance.restart_clickhouse()
assert instance.query("SELECT MySum1(1,2)") == "3\n"
assert instance.query("SELECT MySum2(1,2)") == "5\n"
assert instance.query("SELECT MyUnion()") == "2\n"
instance.query("DROP FUNCTION MySum2")
instance.query("DROP FUNCTION MySum1")
instance.query("DROP FUNCTION MyUnion")
instance.restart_clickhouse()
@ -48,3 +53,10 @@ def test_persistence():
or "Function with name 'MySum2' does not exist. In scope SELECT MySum2(1, 2)"
in error_message
)
error_message = instance.query_and_get_error("SELECT MyUnion()")
assert (
"Unknown function MyUnion" in error_message
or "Function with name 'MyUnion' does not exist. In scope SELECT MyUnion"
in error_message
)

View File

@ -8,6 +8,18 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
set -e -o pipefail
# Wait when the dictionary will update the value for 13 on its own:
function wait_for_dict_upate()
{
for ((i = 0; i < 100; ++i)); do
if [ "$(${CLICKHOUSE_CLIENT} --query "SELECT dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))")" != -1 ]; then
return 0
fi
sleep 0.5
done
return 1
}
$CLICKHOUSE_CLIENT <<EOF
CREATE TABLE ${CLICKHOUSE_DATABASE}.table(x Int64, y Int64, insert_time DateTime) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (12, 102, now());
@ -19,7 +31,7 @@ CREATE DICTIONARY ${CLICKHOUSE_DATABASE}.dict
insert_time DateTime
)
PRIMARY KEY x
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table' DB '${CLICKHOUSE_DATABASE}' UPDATE_FIELD 'insert_time'))
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table' DB '${CLICKHOUSE_DATABASE}' UPDATE_FIELD 'insert_time' UPDATE_LAG 60))
LAYOUT(FLAT())
LIFETIME(1);
EOF
@ -29,11 +41,10 @@ $CLICKHOUSE_CLIENT --query "SELECT '12 -> ', dictGetInt64('${CLICKHOUSE_DATABASE
$CLICKHOUSE_CLIENT --query "INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (13, 103, now())"
$CLICKHOUSE_CLIENT --query "INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (14, 104, now() - INTERVAL 1 DAY)"
# Wait when the dictionary will update the value for 13 on its own:
while [ "$(${CLICKHOUSE_CLIENT} --query "SELECT dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))")" = -1 ]
do
sleep 0.5
done
if ! wait_for_dict_upate; then
echo "Dictionary had not been reloaded" >&2
exit 1
fi
$CLICKHOUSE_CLIENT --query "SELECT '13 -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))"

View File

@ -52,8 +52,6 @@ CREATE TABLE system.clusters
`database_shard_name` String,
`database_replica_name` String,
`is_active` Nullable(UInt8),
`replication_lag` Nullable(UInt32),
`recovery_time` Nullable(UInt64),
`name` String ALIAS cluster
)
ENGINE = SystemClusters

View File

@ -1 +0,0 @@
SELECT * FROM file('::a'); -- { serverError BAD_ARGUMENTS }

View File

@ -1,35 +0,0 @@
#!/usr/bin/env bash
function test()
{
echo "test"
$CH_CLIENT -q "insert into test select number, number from numbers(100000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, 'str_' || toString(number) from numbers(100000, 100000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1)) from numbers(200000, 100000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, NULL from numbers(300000, 100000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, multiIf(number % 4 == 3, 'str_' || toString(number), number % 4 == 2, NULL, number % 4 == 1, number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1))) from numbers(400000, 400000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, [range((number % 10 + 1)::UInt64)]::Array(Array(Dynamic)) from numbers(100000, 100000) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "select distinct dynamicType(d) as type from test order by type"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'UInt64'"
$CH_CLIENT -q "select count() from test where d.UInt64 is not NULL"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'String'"
$CH_CLIENT -q "select count() from test where d.String is not NULL"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Date'"
$CH_CLIENT -q "select count() from test where d.Date is not NULL"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Array(Variant(String, UInt64))'"
$CH_CLIENT -q "select count() from test where not empty(d.\`Array(Variant(String, UInt64))\`)"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Array(Array(Dynamic))'"
$CH_CLIENT -q "select count() from test where not empty(d.\`Array(Array(Dynamic))\`)"
$CH_CLIENT -q "select count() from test where d is NULL"
$CH_CLIENT -q "select count() from test where not empty(d.\`Tuple(a Array(Dynamic))\`.a.String)"
$CH_CLIENT -q "select d, d.UInt64, d.String, d.\`Array(Variant(String, UInt64))\` from test format Null"
$CH_CLIENT -q "select d.UInt64, d.String, d.\`Array(Variant(String, UInt64))\` from test format Null"
$CH_CLIENT -q "select d.Int8, d.Date, d.\`Array(String)\` from test format Null"
$CH_CLIENT -q "select d, d.UInt64, d.Date, d.\`Array(Variant(String, UInt64))\`, d.\`Array(Variant(String, UInt64))\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64 from test format Null"
$CH_CLIENT -q "select d.UInt64, d.Date, d.\`Array(Variant(String, UInt64))\`, d.\`Array(Variant(String, UInt64))\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64, d.\`Array(Variant(String, UInt64))\`.String from test format Null"
$CH_CLIENT -q "select d, d.\`Tuple(a UInt64, b String)\`.a, d.\`Array(Dynamic)\`.\`Variant(String, UInt64)\`.UInt64, d.\`Array(Variant(String, UInt64))\`.UInt64 from test format Null"
$CH_CLIENT -q "select d.\`Array(Dynamic)\`.\`Variant(String, UInt64)\`.UInt64, d.\`Array(Dynamic)\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64 from test format Null"
$CH_CLIENT -q "select d.\`Array(Array(Dynamic))\`.size1, d.\`Array(Array(Dynamic))\`.UInt64, d.\`Array(Array(Dynamic))\`.\`Map(String, Tuple(a UInt64))\`.values.a from test format Null"
}

View File

@ -1,19 +0,0 @@
Memory
test
Array(Array(Dynamic))
Array(Variant(String, UInt64))
None
String
UInt64
200000
200000
200000
200000
0
0
200000
200000
100000
100000
200000
0

View File

@ -1,19 +0,0 @@
#!/usr/bin/env bash
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./03036_dynamic_read_subcolumns.lib
. "$CUR_DIR"/03036_dynamic_read_subcolumns.lib
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1"
$CH_CLIENT -q "drop table if exists test;"
echo "Memory"
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=Memory"
test
$CH_CLIENT -q "drop table test;"

View File

@ -1,19 +0,0 @@
MergeTree compact
test
Array(Array(Dynamic))
Array(Variant(String, UInt64))
None
String
UInt64
200000
200000
200000
200000
0
0
200000
200000
100000
100000
200000
0

View File

@ -1,19 +0,0 @@
#!/usr/bin/env bash
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./03036_dynamic_read_subcolumns.lib
. "$CUR_DIR"/03036_dynamic_read_subcolumns.lib
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1"
$CH_CLIENT -q "drop table if exists test;"
echo "MergeTree compact"
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=MergeTree order by id settings min_rows_for_wide_part=1000000000, min_bytes_for_wide_part=10000000000;"
test
$CH_CLIENT -q "drop table test;"

View File

@ -1,19 +0,0 @@
MergeTree wide
test
Array(Array(Dynamic))
Array(Variant(String, UInt64))
None
String
UInt64
200000
200000
200000
200000
0
0
200000
200000
100000
100000
200000
0

View File

@ -1,19 +0,0 @@
#!/usr/bin/env bash
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./03036_dynamic_read_subcolumns.lib
. "$CUR_DIR"/03036_dynamic_read_subcolumns.lib
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1"
$CH_CLIENT -q "drop table if exists test;"
echo "MergeTree wide"
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;"
test
$CH_CLIENT -q "drop table test;"

View File

@ -21,8 +21,9 @@ SYSTEM DROP MARK CACHE;
SELECT count() FROM t_compact_bytes_s3 WHERE NOT ignore(c2, c4);
SYSTEM FLUSH LOGS;
-- Errors in S3 requests will be automatically retried, however ProfileEvents can be wrong. That is why we subtract errors.
SELECT
ProfileEvents['S3ReadRequestsCount'],
ProfileEvents['S3ReadRequestsCount'] - ProfileEvents['S3ReadRequestsErrors'],
ProfileEvents['ReadBufferFromS3Bytes'] < ProfileEvents['ReadCompressedBytes'] * 1.1
FROM system.query_log
WHERE event_date >= yesterday() AND type = 'QueryFinish'
@ -30,7 +31,7 @@ WHERE event_date >= yesterday() AND type = 'QueryFinish'
AND query ilike '%INSERT INTO t_compact_bytes_s3 SELECT number, number, number%';
SELECT
ProfileEvents['S3ReadRequestsCount'],
ProfileEvents['S3ReadRequestsCount'] - ProfileEvents['S3ReadRequestsErrors'],
ProfileEvents['ReadBufferFromS3Bytes'] < ProfileEvents['ReadCompressedBytes'] * 1.1
FROM system.query_log
WHERE event_date >= yesterday() AND type = 'QueryFinish'

View File

@ -1,11 +0,0 @@
-- Tags: no-parallel
CREATE DATABASE rdb1 ENGINE = Replicated('/test/test_replication_lag_metric', 'shard1', 'replica1');
CREATE DATABASE rdb2 ENGINE = Replicated('/test/test_replication_lag_metric', 'shard1', 'replica2');
SET distributed_ddl_task_timeout = 0;
CREATE TABLE rdb1.t (id UInt32) ENGINE = ReplicatedMergeTree ORDER BY id;
SELECT replication_lag FROM system.clusters WHERE cluster IN ('rdb1', 'rdb2') ORDER BY cluster ASC, replica_num ASC;
DROP DATABASE rdb1;
DROP DATABASE rdb2;

View File

@ -0,0 +1 @@
SELECT tuple() IN tuple(1) SETTINGS allow_experimental_map_type = 1; -- { serverError INCORRECT_ELEMENT_OF_SET }

View File

@ -0,0 +1,16 @@
::nonexistentfile.csv
1
nonexistent::nonexistentfile.csv
1
nonexistent :: nonexistentfile.csv
1
nonexistent ::nonexistentfile.csv
1
nonexistent.tar.gz :: nonexistentfile.csv
1
nonexistent.zip:: nonexistentfile.csv
1
nonexistent.tar.gz :: nonexistentfile.csv SETTINGS allow_archive_path_syntax=0
1
nonexistent.zip:: nonexistentfile.csv SETTINGS allow_archive_path_syntax=0
1

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function try_to_read_file()
{
file_to_read=$1
file_argument=$2
settings=$3
echo $file_argument $settings
$CLICKHOUSE_LOCAL -q "SELECT * FROM file('$file_argument') $settings" 2>&1 | grep -c "Cannot stat file.*$file_to_read"
}
# if archive extension is not detected for part before '::', path is taken as is
try_to_read_file "::nonexistentfile.csv" "::nonexistentfile.csv"
try_to_read_file "nonexistent::nonexistentfile.csv" "nonexistent::nonexistentfile.csv"
try_to_read_file "nonexistent :: nonexistentfile.csv" "nonexistent :: nonexistentfile.csv"
try_to_read_file "nonexistent ::nonexistentfile.csv" "nonexistent ::nonexistentfile.csv"
# if archive extension is detected for part before '::', path is split into archive and filename
try_to_read_file "nonexistent.tar.gz" "nonexistent.tar.gz :: nonexistentfile.csv"
try_to_read_file "nonexistent.zip" "nonexistent.zip:: nonexistentfile.csv"
# disabling archive syntax will always parse path as is
try_to_read_file "nonexistent.tar.gz :: nonexistentfile.csv" "nonexistent.tar.gz :: nonexistentfile.csv" "SETTINGS allow_archive_path_syntax=0"
try_to_read_file "nonexistent.zip:: nonexistentfile.csv" "nonexistent.zip:: nonexistentfile.csv" "SETTINGS allow_archive_path_syntax=0"

View File

@ -0,0 +1,3 @@
::03215_archive.csv test/::03215_archive.csv
test::03215_archive.csv test/test::03215_archive.csv
test.zip::03215_archive.csv test/test.zip::03215_archive.csv

View File

@ -0,0 +1,7 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
SELECT _file, _path FROM s3(s3_conn, filename='::03215_archive.csv') ORDER BY (_file, _path);
SELECT _file, _path FROM s3(s3_conn, filename='test :: 03215_archive.csv') ORDER BY (_file, _path); -- { serverError S3_ERROR }
SELECT _file, _path FROM s3(s3_conn, filename='test::03215_archive.csv') ORDER BY (_file, _path);
SELECT _file, _path FROM s3(s3_conn, filename='test.zip::03215_archive.csv') ORDER BY (_file, _path) SETTINGS allow_archive_path_syntax=0;

View File

@ -0,0 +1 @@
2

View File

@ -0,0 +1,14 @@
DROP FUNCTION IF EXISTS 03215_udf_with_union;
CREATE FUNCTION 03215_udf_with_union AS () -> (
SELECT sum(s)
FROM
(
SELECT 1 AS s
UNION ALL
SELECT 1 AS s
)
);
SELECT 03215_udf_with_union();
DROP FUNCTION 03215_udf_with_union;

View File

@ -0,0 +1 @@
1
1 1

View File

@ -0,0 +1 @@
1
1 1

View File

@ -0,0 +1 @@
1
1 1