Merge branch 'master' into parallel-log-appending

This commit is contained in:
Antonio Andelic 2022-11-22 14:14:17 +01:00 committed by GitHub
commit 4827bcb6bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
107 changed files with 1560 additions and 611 deletions

View File

@ -0,0 +1,31 @@
---
sidebar_position: 1
sidebar_label: 2022
---
# 2022 Changelog
### ClickHouse release v22.8.9.24-lts (a1b69551d40) FIXME as compared to v22.8.8.3-lts (ac5a6cababc)
#### Performance Improvement
* Backported in [#43012](https://github.com/ClickHouse/ClickHouse/issues/43012): Keeper performance improvement: improve commit performance for cases when many different nodes have uncommitted states. This should help with cases when a follower node can't sync fast enough. [#42926](https://github.com/ClickHouse/ClickHouse/pull/42926) ([Antonio Andelic](https://github.com/antonio2368)).
#### Improvement
* Backported in [#42840](https://github.com/ClickHouse/ClickHouse/issues/42840): Update tzdata to 2022f. Mexico will no longer observe DST except near the US border: https://www.timeanddate.com/news/time/mexico-abolishes-dst-2022.html. Chihuahua moves to year-round UTC-6 on 2022-10-30. Fiji no longer observes DST. See https://github.com/google/cctz/pull/235 and https://bugs.launchpad.net/ubuntu/+source/tzdata/+bug/1995209. [#42796](https://github.com/ClickHouse/ClickHouse/pull/42796) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
#### Build/Testing/Packaging Improvement
* Backported in [#42964](https://github.com/ClickHouse/ClickHouse/issues/42964): Before the fix, the user-defined config was preserved by RPM in `$file.rpmsave`. The PR fixes it and won't replace the user's files from packages. [#42936](https://github.com/ClickHouse/ClickHouse/pull/42936) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Backported in [#43040](https://github.com/ClickHouse/ClickHouse/issues/43040): Add a CI step to mark commits as ready for release; soft-forbid launching a release script from branches but master. [#43017](https://github.com/ClickHouse/ClickHouse/pull/43017) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
* Backported in [#42720](https://github.com/ClickHouse/ClickHouse/issues/42720): Fixed `Unknown identifier (aggregate-function)` exception which appears when a user tries to calculate WINDOW ORDER BY/PARTITION BY expressions over aggregate functions: ``` CREATE TABLE default.tenk1 ( `unique1` Int32, `unique2` Int32, `ten` Int32 ) ENGINE = MergeTree ORDER BY tuple() SETTINGS index_granularity = 8192; SELECT ten, sum(unique1) + sum(unique2) AS res, rank() OVER (ORDER BY sum(unique1) + sum(unique2) ASC) AS rank FROM _complex GROUP BY ten ORDER BY ten ASC; ``` which gives: ``` Code: 47. DB::Exception: Received from localhost:9000. DB::Exception: Unknown identifier: sum(unique1); there are columns: unique1, unique2, ten: While processing sum(unique1) + sum(unique2) ASC. (UNKNOWN_IDENTIFIER) ```. [#39762](https://github.com/ClickHouse/ClickHouse/pull/39762) ([Vladimir Chebotaryov](https://github.com/quickhouse)).
* Backported in [#42748](https://github.com/ClickHouse/ClickHouse/issues/42748): A segmentation fault related to DNS & c-ares has been reported. The below error ocurred in multiple threads: ``` 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008088 [ 356 ] {} <Fatal> BaseDaemon: ######################################## 2022-09-28 15:41:19.008,"2022.09.28 15:41:19.008147 [ 356 ] {} <Fatal> BaseDaemon: (version 22.8.5.29 (official build), build id: 92504ACA0B8E2267) (from thread 353) (no query) Received signal Segmentation fault (11)" 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008196 [ 356 ] {} <Fatal> BaseDaemon: Address: 0xf Access: write. Address not mapped to object. 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008216 [ 356 ] {} <Fatal> BaseDaemon: Stack trace: 0x188f8212 0x1626851b 0x1626a69e 0x16269b3f 0x16267eab 0x13cf8284 0x13d24afc 0x13c5217e 0x14ec2495 0x15ba440f 0x15b9d13b 0x15bb2699 0x1891ccb3 0x1891e00d 0x18ae0769 0x18ade022 0x7f76aa985609 0x7f76aa8aa133 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008274 [ 356 ] {} <Fatal> BaseDaemon: 2. Poco::Net::IPAddress::family() const @ 0x188f8212 in /usr/bin/clickhouse 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008297 [ 356 ] {} <Fatal> BaseDaemon: 3. ? @ 0x1626851b in /usr/bin/clickhouse 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008309 [ 356 ] {} <Fatal> BaseDaemon: 4. ? @ 0x1626a69e in /usr/bin/clickhouse ```. [#42234](https://github.com/ClickHouse/ClickHouse/pull/42234) ([Arthur Passos](https://github.com/arthurpassos)).
* Backported in [#43062](https://github.com/ClickHouse/ClickHouse/issues/43062): Fix rare NOT_FOUND_COLUMN_IN_BLOCK error when projection is possible to use but there is no projection available. This fixes [#42771](https://github.com/ClickHouse/ClickHouse/issues/42771) . The bug was introduced in https://github.com/ClickHouse/ClickHouse/pull/25563. [#42938](https://github.com/ClickHouse/ClickHouse/pull/42938) ([Amos Bird](https://github.com/amosbird)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Do not warn about kvm-clock [#41217](https://github.com/ClickHouse/ClickHouse/pull/41217) ([Sergei Trifonov](https://github.com/serxa)).
* Revert revert 41268 disable s3 parallel write for part moves to disk s3 [#42617](https://github.com/ClickHouse/ClickHouse/pull/42617) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Always run `BuilderReport` and `BuilderSpecialReport` in all CI types [#42684](https://github.com/ClickHouse/ClickHouse/pull/42684) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -57,7 +57,7 @@ Internal coordination settings are located in the `<keeper_server>.<coordination
- `auto_forwarding` — Allow to forward write requests from followers to the leader (default: true).
- `shutdown_timeout` — Wait to finish internal connections and shutdown (ms) (default: 5000).
- `startup_timeout` — If the server doesn't connect to other quorum participants in the specified timeout it will terminate (ms) (default: 30000).
- `four_letter_word_white_list` — White list of 4lw commands (default: `conf,cons,crst,envi,ruok,srst,srvr,stat,wchc,wchs,dirs,mntr,isro`).
- `four_letter_word_white_list` — White list of 4lw commands (default: `conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld`).
Quorum configuration is located in the `<keeper_server>.<raft_configuration>` section and contain servers description.
@ -126,7 +126,7 @@ clickhouse keeper --config /etc/your_path_to_config/config.xml
ClickHouse Keeper also provides 4lw commands which are almost the same with Zookeeper. Each command is composed of four letters such as `mntr`, `stat` etc. There are some more interesting commands: `stat` gives some general information about the server and connected clients, while `srvr` and `cons` give extended details on server and connections respectively.
The 4lw commands has a white list configuration `four_letter_word_white_list` which has default value `conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif`.
The 4lw commands has a white list configuration `four_letter_word_white_list` which has default value `conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld`.
You can issue the commands to ClickHouse Keeper via telnet or nc, at the client port.
@ -328,6 +328,12 @@ target_committed_log_idx 101
last_snapshot_idx 50
```
- `rqld`: Request to become new leader. Return `Sent leadership request to leader.` if request sent or `Failed to send leadership request to leader.` if request not sent. Note that if node is already leader the outcome is same as the request is sent.
```
Sent leadership request to leader.
```
## Migration from ZooKeeper {#migration-from-zookeeper}
Seamlessly migration from ZooKeeper to ClickHouse Keeper is impossible you have to stop your ZooKeeper cluster, convert data and start ClickHouse Keeper. `clickhouse-keeper-converter` tool allows converting ZooKeeper logs and snapshots to ClickHouse Keeper snapshot. It works only with ZooKeeper > 3.4. Steps for migration:

View File

@ -0,0 +1,75 @@
---
slug: /en/sql-reference/table-functions/format
sidebar_position: 56
sidebar_label: format
---
# format
Extracts table structure from data and parses it according to specified input format.
**Syntax**
``` sql
format(format_name, data)
```
**Parameters**
- `format_name` — The [format](../../interfaces/formats.md#formats) of the data.
- `data` — String literal or constant expression that returns a string containing data in specified format
**Returned value**
A table with data parsed from `data` argument according specified format and extracted schema.
**Examples**
**Query:**
``` sql
:) select * from format(JSONEachRow,
$$
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 112}
{"a": "World", "b": 124}
$$)
```
**Result:**
```text
┌───b─┬─a─────┐
│ 111 │ Hello │
│ 123 │ World │
│ 112 │ Hello │
│ 124 │ World │
└─────┴───────┘
```
**Query:**
```sql
:) desc format(JSONEachRow,
$$
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 112}
{"a": "World", "b": 124}
$$)
```
**Result:**
```text
┌─name─┬─type──────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
│ b │ Nullable(Float64) │ │ │ │ │ │
│ a │ Nullable(String) │ │ │ │ │ │
└──────┴───────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘
```
**See Also**
- [Formats](../../interfaces/formats.md)
[Original article](https://clickhouse.com/docs/en/sql-reference/table-functions/format) <!--hide-->

View File

@ -0,0 +1 @@
../../../en/sql-reference/table-functions/format.md

View File

@ -0,0 +1 @@
../../../en/sql-reference/table-functions/format.md

View File

@ -160,7 +160,7 @@ void ClusterCopierApp::mainImpl()
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
registerDisks(/* global_skip_access_check= */ true);
registerFormats();
static const std::string default_database = "_local";

View File

@ -176,7 +176,7 @@ int DisksApp::main(const std::vector<String> & /*args*/)
Poco::Logger::root().setLevel(Poco::Logger::parseLevel(log_level));
}
registerDisks();
registerDisks(/* global_skip_access_check= */ true);
registerFormats();
shared_context = Context::createShared();

View File

@ -413,7 +413,7 @@ try
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
registerDisks(/* global_skip_access_check= */ true);
registerFormats();
processConfig();

View File

@ -679,7 +679,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
registerDisks(/* global_skip_access_check= */ false);
registerFormats();
registerRemoteFileMetadatas();
@ -1148,6 +1148,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
bool allow_use_jemalloc_memory = config->getBool("allow_use_jemalloc_memory", true);
total_memory_tracker.setAllowUseJemallocMemory(allow_use_jemalloc_memory);
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
total_memory_tracker.setOvercommitTracker(global_overcommit_tracker);

View File

@ -13,6 +13,7 @@ struct Settings;
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int CORRUPTED_DATA;
}
@ -89,6 +90,13 @@ public:
{
this->data(place).result.read(buf, *serialization_res, arena);
this->data(place).value.read(buf, *serialization_val, arena);
if (unlikely(this->data(place).value.has() != this->data(place).result.has()))
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Invalid state of the aggregate function {}: has_value ({}) != has_result ({})",
getName(),
this->data(place).value.has(),
this->data(place).result.has());
}
bool allocatesMemoryInArena() const override

View File

@ -448,6 +448,34 @@ public:
};
struct Compatibility
{
/// Old versions used to store terminating null-character in SingleValueDataString.
/// Then -WithTerminatingZero methods were removed from IColumn interface,
/// because these methods are quite dangerous and easy to misuse. It introduced incompatibility.
/// See https://github.com/ClickHouse/ClickHouse/pull/41431 and https://github.com/ClickHouse/ClickHouse/issues/42916
/// Here we keep these functions for compatibility.
/// It's safe because there's no way unsanitized user input (without \0 at the end) can reach these functions.
static StringRef getDataAtWithTerminatingZero(const ColumnString & column, size_t n)
{
auto res = column.getDataAt(n);
/// ColumnString always reserves extra byte for null-character after string.
/// But getDataAt returns StringRef without the null-character. Let's add it.
chassert(res.data[res.size] == '\0');
++res.size;
return res;
}
static void insertDataWithTerminatingZero(ColumnString & column, const char * pos, size_t length)
{
/// String already has terminating null-character.
/// But insertData will add another one unconditionally. Trim existing null-character to avoid duplication.
chassert(0 < length);
chassert(pos[length - 1] == '\0');
column.insertData(pos, length - 1);
}
};
/** For strings. Short strings are stored in the object itself, and long strings are allocated separately.
* NOTE It could also be suitable for arrays of numbers.
@ -477,20 +505,31 @@ public:
return size >= 0;
}
const char * getData() const
private:
char * getDataMutable()
{
return size <= MAX_SMALL_STRING_SIZE ? small_data : large_data;
}
const char * getData() const
{
const char * data_ptr = size <= MAX_SMALL_STRING_SIZE ? small_data : large_data;
/// It must always be terminated with null-character
chassert(0 < size);
chassert(data_ptr[size - 1] == '\0');
return data_ptr;
}
StringRef getStringRef() const
{
return StringRef(getData(), size);
}
public:
void insertResultInto(IColumn & to) const
{
if (has())
assert_cast<ColumnString &>(to).insertData(getData(), size);
Compatibility::insertDataWithTerminatingZero(assert_cast<ColumnString &>(to), getData(), size);
else
assert_cast<ColumnString &>(to).insertDefault();
}
@ -502,44 +541,76 @@ public:
buf.write(getData(), size);
}
void allocateLargeDataIfNeeded(Int64 size_to_reserve, Arena * arena)
{
if (capacity < size_to_reserve)
{
capacity = static_cast<Int32>(roundUpToPowerOfTwoOrZero(size_to_reserve));
/// It might happen if the size was too big and the rounded value does not fit a size_t
if (unlikely(capacity < size_to_reserve))
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({})", size_to_reserve);
/// Don't free large_data here.
large_data = arena->alloc(capacity);
}
}
void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena * arena)
{
Int32 rhs_size;
readBinary(rhs_size, buf);
if (rhs_size >= 0)
{
if (rhs_size <= MAX_SMALL_STRING_SIZE)
{
/// Don't free large_data here.
size = rhs_size;
if (size > 0)
buf.readStrict(small_data, size);
}
else
{
if (capacity < rhs_size)
{
capacity = static_cast<Int32>(roundUpToPowerOfTwoOrZero(rhs_size));
/// It might happen if the size was too big and the rounded value does not fit a size_t
if (unlikely(capacity < rhs_size))
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({})", rhs_size);
/// Don't free large_data here.
large_data = arena->alloc(capacity);
}
size = rhs_size;
buf.readStrict(large_data, size);
}
}
else
if (rhs_size < 0)
{
/// Don't free large_data here.
size = rhs_size;
return;
}
if (rhs_size <= MAX_SMALL_STRING_SIZE)
{
/// Don't free large_data here.
size = rhs_size;
if (size > 0)
buf.readStrict(small_data, size);
}
else
{
/// Reserve one byte more for null-character
Int64 rhs_size_to_reserve = rhs_size;
rhs_size_to_reserve += 1; /// Avoid overflow
allocateLargeDataIfNeeded(rhs_size_to_reserve, arena);
size = rhs_size;
buf.readStrict(large_data, size);
}
/// Check if the string we read is null-terminated (getDataMutable does not have the assertion)
if (0 < size && getDataMutable()[size - 1] == '\0')
return;
/// It's not null-terminated, but it must be (for historical reasons). There are two variants:
/// - The value was serialized by one of the incompatible versions of ClickHouse. We had some range of versions
/// that used to serialize SingleValueDataString without terminating '\0'. Let's just append it.
/// - An attacker sent crafted data. Sanitize it and append '\0'.
/// In all other cases the string must be already null-terminated.
/// NOTE We cannot add '\0' unconditionally, because it will be duplicated.
/// NOTE It's possible that a string that actually ends with '\0' was written by one of the incompatible versions.
/// Unfortunately, we cannot distinguish it from normal string written by normal version.
/// So such strings will be trimmed.
if (size == MAX_SMALL_STRING_SIZE)
{
/// Special case: We have to move value to large_data
allocateLargeDataIfNeeded(size + 1, arena);
memcpy(large_data, small_data, size);
}
/// We have enough space to append
++size;
getDataMutable()[size - 1] = '\0';
}
/// Assuming to.has()
@ -557,13 +628,7 @@ public:
}
else
{
if (capacity < value_size)
{
/// Don't free large_data here.
capacity = static_cast<Int32>(roundUpToPowerOfTwoOrZero(value_size));
large_data = arena->alloc(capacity);
}
allocateLargeDataIfNeeded(value_size, arena);
size = value_size;
memcpy(large_data, value.data, size);
}
@ -571,7 +636,7 @@ public:
void change(const IColumn & column, size_t row_num, Arena * arena)
{
changeImpl(assert_cast<const ColumnString &>(column).getDataAt(row_num), arena);
changeImpl(Compatibility::getDataAtWithTerminatingZero(assert_cast<const ColumnString &>(column), row_num), arena);
}
void change(const Self & to, Arena * arena)
@ -620,7 +685,7 @@ public:
bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena)
{
if (!has() || assert_cast<const ColumnString &>(column).getDataAt(row_num) < getStringRef())
if (!has() || Compatibility::getDataAtWithTerminatingZero(assert_cast<const ColumnString &>(column), row_num) < getStringRef())
{
change(column, row_num, arena);
return true;
@ -642,7 +707,7 @@ public:
bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena)
{
if (!has() || assert_cast<const ColumnString &>(column).getDataAt(row_num) > getStringRef())
if (!has() || Compatibility::getDataAtWithTerminatingZero(assert_cast<const ColumnString &>(column), row_num) > getStringRef())
{
change(column, row_num, arena);
return true;
@ -669,7 +734,7 @@ public:
bool isEqualTo(const IColumn & column, size_t row_num) const
{
return has() && assert_cast<const ColumnString &>(column).getDataAt(row_num) == getStringRef();
return has() && Compatibility::getDataAtWithTerminatingZero(assert_cast<const ColumnString &>(column), row_num) == getStringRef();
}
static bool allocatesMemoryInArena()

View File

@ -27,6 +27,14 @@ public:
/// NOTE: Adding events into distant past (further than `period`) must be avoided.
void add(double now, double count)
{
// Remove data for initial heating stage that can present at the beginning of a query.
// Otherwise it leads to wrong gradual increase of average value, turning algorithm into not very reactive.
if (count != 0.0 && ++data_points < 5)
{
start = events.time;
events = ExponentiallySmoothedAverage();
}
if (now - period <= start) // precise counting mode
events = ExponentiallySmoothedAverage(events.value + count, now);
else // exponential smoothing mode
@ -51,6 +59,7 @@ public:
{
start = now;
events = ExponentiallySmoothedAverage();
data_points = 0;
}
private:
@ -58,6 +67,7 @@ private:
const double half_decay_time;
double start; // Instant in past without events before it; when measurement started or reset
ExponentiallySmoothedAverage events; // Estimated number of events in the last `period`
size_t data_points = 0;
};
}

View File

@ -15,6 +15,7 @@
#include <Common/formatReadable.h>
#include <Common/filesystemHelpers.h>
#include <Common/ErrorCodes.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <filesystem>
@ -63,11 +64,18 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool
ErrorCodes::increment(code, remote, msg, trace);
}
Exception::Exception(const std::string & msg, int code, bool remote_)
: Poco::Exception(msg, code)
Exception::MessageMasked::MessageMasked(const std::string & msg_)
: msg(msg_)
{
if (auto * masker = SensitiveDataMasker::getInstance())
masker->wipeSensitiveData(msg);
}
Exception::Exception(const MessageMasked & msg_masked, int code, bool remote_)
: Poco::Exception(msg_masked.msg, code)
, remote(remote_)
{
handle_error_code(msg, code, remote, getStackFramePointers());
handle_error_code(msg_masked.msg, code, remote, getStackFramePointers());
}
Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc)

View File

@ -27,7 +27,19 @@ public:
using FramePointers = std::vector<void *>;
Exception() = default;
Exception(const std::string & msg, int code, bool remote_ = false);
// used to remove the sensitive information from exceptions if query_masking_rules is configured
struct MessageMasked
{
std::string msg;
MessageMasked(const std::string & msg_);
};
Exception(const MessageMasked & msg_masked, int code, bool remote_);
// delegating constructor to mask sensitive information from the message
Exception(const std::string & msg, int code, bool remote_ = false): Exception(MessageMasked(msg), code, remote_)
{}
Exception(int code, const std::string & message)
: Exception(message, code)
@ -54,12 +66,17 @@ public:
template <typename... Args>
void addMessage(fmt::format_string<Args...> format, Args &&... args)
{
extendedMessage(fmt::format(format, std::forward<Args>(args)...));
addMessage(fmt::format(format, std::forward<Args>(args)...));
}
void addMessage(const std::string& message)
{
extendedMessage(message);
addMessage(MessageMasked(message));
}
void addMessage(const MessageMasked & msg_masked)
{
extendedMessage(msg_masked.msg);
}
/// Used to distinguish local exceptions from the one that was received from remote node.

View File

@ -220,7 +220,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
Int64 limit_to_check = current_hard_limit;
#if USE_JEMALLOC
if (level == VariableContext::Global)
if (level == VariableContext::Global && allow_use_jemalloc_memory.load(std::memory_order_relaxed))
{
/// Jemalloc arenas may keep some extra memory.
/// This memory was substucted from RSS to decrease memory drift.

View File

@ -55,6 +55,7 @@ private:
std::atomic<Int64> soft_limit {0};
std::atomic<Int64> hard_limit {0};
std::atomic<Int64> profiler_limit {0};
std::atomic_bool allow_use_jemalloc_memory {true};
static std::atomic<Int64> free_memory_in_allocator_arenas;
@ -125,6 +126,10 @@ public:
{
return soft_limit.load(std::memory_order_relaxed);
}
void setAllowUseJemallocMemory(bool value)
{
allow_use_jemalloc_memory.store(value, std::memory_order_relaxed);
}
/** Set limit if it was not set.
* Otherwise, set limit to new value, if new value is greater than previous limit.

View File

@ -90,7 +90,7 @@ private:
bool write_progress_on_update = false;
EventRateMeter cpu_usage_meter{static_cast<double>(clock_gettime_ns()), 3'000'000'000 /*ns*/}; // average cpu utilization last 3 second
EventRateMeter cpu_usage_meter{static_cast<double>(clock_gettime_ns()), 2'000'000'000 /*ns*/}; // average cpu utilization last 2 second
HostToThreadTimesMap thread_data;
/// In case of all of the above:
/// - clickhouse-local

View File

@ -36,7 +36,7 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
}
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif";
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld";
KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
: server_id(NOT_EXIST)

View File

@ -142,6 +142,9 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
FourLetterCommandPtr log_info_command = std::make_shared<LogInfoCommand>(keeper_dispatcher);
factory.registerCommand(log_info_command);
FourLetterCommandPtr request_leader_command = std::make_shared<RequestLeaderCommand>(keeper_dispatcher);
factory.registerCommand(request_leader_command);
factory.initializeAllowList(keeper_dispatcher);
factory.setInitialize(true);
}
@ -507,4 +510,9 @@ String LogInfoCommand::run()
return ret.str();
}
String RequestLeaderCommand::run()
{
return keeper_dispatcher.requestLeader() ? "Sent leadership request to leader." : "Failed to send leadership request to leader.";
}
}

View File

@ -364,4 +364,17 @@ struct LogInfoCommand : public IFourLetterCommand
~LogInfoCommand() override = default;
};
/// Request to be leader.
struct RequestLeaderCommand : public IFourLetterCommand
{
explicit RequestLeaderCommand(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "rqld"; }
String run() override;
~RequestLeaderCommand() override = default;
};
}

View File

@ -215,6 +215,12 @@ public:
{
return server->getKeeperLogInfo();
}
/// Request to be leader.
bool requestLeader()
{
return server->requestLeader();
}
};
}

View File

@ -935,4 +935,9 @@ KeeperLogInfo KeeperServer::getKeeperLogInfo()
return log_info;
}
bool KeeperServer::requestLeader()
{
return isLeader() || raft_instance->request_leadership();
}
}

View File

@ -135,6 +135,8 @@ public:
uint64_t createSnapshot();
KeeperLogInfo getKeeperLogInfo();
bool requestLeader();
};
}

View File

@ -4,7 +4,10 @@
namespace DB
{
DiskDecorator::DiskDecorator(const DiskPtr & delegate_) : delegate(delegate_)
DiskDecorator::DiskDecorator(const DiskPtr & delegate_)
: IDisk(/* name_= */ "<decorator>")
, delegate(delegate_)
{
}
@ -226,9 +229,9 @@ void DiskDecorator::shutdown()
delegate->shutdown();
}
void DiskDecorator::startup(ContextPtr context)
void DiskDecorator::startupImpl(ContextPtr context)
{
delegate->startup(context);
delegate->startupImpl(context);
}
void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map)

View File

@ -81,7 +81,7 @@ public:
void onFreeze(const String & path) override;
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
void shutdown() override;
void startup(ContextPtr context) override;
void startupImpl(ContextPtr context) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
bool supportsCache() const override { return delegate->supportsCache(); }

View File

@ -210,7 +210,7 @@ DiskEncrypted::DiskEncrypted(
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_)
: DiskDecorator(settings_->wrapped_disk)
, name(name_)
, encrypted_name(name_)
, disk_path(settings_->disk_path)
, disk_absolute_path(settings_->wrapped_disk->getPath() + settings_->disk_path)
, current_settings(std::move(settings_))
@ -369,15 +369,19 @@ void DiskEncrypted::applyNewSettings(
current_settings.set(std::move(new_settings));
}
void registerDiskEncrypted(DiskFactory & factory)
void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr /*context*/,
const DisksMap & map) -> DiskPtr
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & map) -> DiskPtr
{
return std::make_shared<DiskEncrypted>(name, config, config_prefix, map);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk = std::make_shared<DiskEncrypted>(name, config, config_prefix, map);
disk->startup(context, skip_access_check);
return disk;
};
factory.registerDiskType("encrypted", creator);
}

View File

@ -33,7 +33,7 @@ public:
DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_);
DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_);
const String & getName() const override { return name; }
const String & getName() const override { return encrypted_name; }
const String & getPath() const override { return disk_absolute_path; }
ReservationPtr reserve(UInt64 bytes) override;
@ -261,7 +261,7 @@ private:
return disk_path + path;
}
const String name;
const String encrypted_name;
const String disk_path;
const String disk_absolute_path;
MultiVersion<DiskEncryptedSettings> current_settings;

View File

@ -500,7 +500,7 @@ void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & confi
}
DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
: name(name_)
: IDisk(name_)
, disk_path(path_)
, keep_free_space_bytes(keep_free_space_bytes_)
, logger(&Poco::Logger::get("DiskLocal"))
@ -528,26 +528,6 @@ DataSourceDescription DiskLocal::getDataSourceDescription() const
return data_source_description;
}
void DiskLocal::startup(ContextPtr)
{
try
{
broken = false;
disk_checker_magic_number = -1;
disk_checker_can_check_read = true;
readonly = !setup();
}
catch (...)
{
tryLogCurrentException(logger, fmt::format("Disk {} is marked as broken during startup", name));
broken = true;
/// Disk checker is disabled when failing to start up.
disk_checker_can_check_read = false;
}
if (disk_checker && disk_checker_can_check_read)
disk_checker->startup();
}
void DiskLocal::shutdown()
{
if (disk_checker)
@ -641,18 +621,30 @@ DiskObjectStoragePtr DiskLocal::createDiskObjectStorage()
);
}
bool DiskLocal::setup()
void DiskLocal::checkAccessImpl(const String & path)
{
try
{
fs::create_directories(disk_path);
if (!FS::canWrite(disk_path))
{
LOG_ERROR(logger, "Cannot write to the root directory of disk {} ({}).", name, disk_path);
readonly = true;
return;
}
}
catch (...)
{
LOG_ERROR(logger, "Cannot create the directory of disk {} ({}).", name, disk_path);
throw;
LOG_ERROR(logger, "Cannot create the root directory of disk {} ({}).", name, disk_path);
readonly = true;
return;
}
IDisk::checkAccessImpl(path);
}
void DiskLocal::setup()
{
try
{
if (!FS::canRead(disk_path))
@ -666,7 +658,7 @@ bool DiskLocal::setup()
/// If disk checker is disabled, just assume RW by default.
if (!disk_checker)
return true;
return;
try
{
@ -690,6 +682,7 @@ bool DiskLocal::setup()
/// Try to create a new checker file. The disk status can be either broken or readonly.
if (disk_checker_magic_number == -1)
{
try
{
pcg32_fast rng(randomSeed());
@ -709,12 +702,33 @@ bool DiskLocal::setup()
disk_checker_path,
name);
disk_checker_can_check_read = false;
return true;
return;
}
}
if (disk_checker_magic_number == -1)
throw Exception("disk_checker_magic_number is not initialized. It's a bug", ErrorCodes::LOGICAL_ERROR);
return true;
}
void DiskLocal::startupImpl(ContextPtr)
{
broken = false;
disk_checker_magic_number = -1;
disk_checker_can_check_read = true;
try
{
setup();
}
catch (...)
{
tryLogCurrentException(logger, fmt::format("Disk {} is marked as broken during startup", name));
broken = true;
/// Disk checker is disabled when failing to start up.
disk_checker_can_check_read = false;
}
if (disk_checker && disk_checker_can_check_read)
disk_checker->startup();
}
struct stat DiskLocal::stat(const String & path) const
@ -741,13 +755,14 @@ MetadataStoragePtr DiskLocal::getMetadataStorage()
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
}
void registerDiskLocal(DiskFactory & factory)
void registerDiskLocal(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & map) -> DiskPtr
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & map) -> DiskPtr
{
String path;
UInt64 keep_free_space_bytes;
@ -757,9 +772,10 @@ void registerDiskLocal(DiskFactory & factory)
if (path == disk_ptr->getPath())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk {} and disk {} cannot have the same path ({})", name, disk_name, path);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
std::shared_ptr<IDisk> disk
= std::make_shared<DiskLocal>(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0));
disk->startup(context);
disk->startup(context, skip_access_check);
return std::make_shared<DiskRestartProxy>(disk);
};
factory.registerDiskType("local", creator);

View File

@ -28,8 +28,6 @@ public:
ContextPtr context,
UInt64 local_disk_check_period_ms);
const String & getName() const override { return name; }
const String & getPath() const override { return disk_path; }
ReservationPtr reserve(UInt64 bytes) override;
@ -112,8 +110,9 @@ public:
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &) override;
bool isBroken() const override { return broken; }
bool isReadOnly() const override { return readonly; }
void startup(ContextPtr) override;
void startupImpl(ContextPtr context) override;
void shutdown() override;
@ -133,17 +132,19 @@ public:
MetadataStoragePtr getMetadataStorage() override;
protected:
void checkAccessImpl(const String & path) override;
private:
std::optional<UInt64> tryReserve(UInt64 bytes);
/// Setup disk for healthy check. Returns true if it's read-write, false if read-only.
/// Setup disk for healthy check.
/// Throw exception if it's not possible to setup necessary files and directories.
bool setup();
void setup();
/// Read magic number from disk checker file. Return std::nullopt if exception happens.
std::optional<UInt32> readDiskCheckerMagicNumber() const noexcept;
const String name;
const String disk_path;
const String disk_checker_path = ".disk_checker_file";
std::atomic<UInt64> keep_free_space_bytes;

View File

@ -141,6 +141,11 @@ private:
};
DiskMemory::DiskMemory(const String & name_)
: IDisk(name_)
, disk_path("memory(" + name_ + ')')
{}
ReservationPtr DiskMemory::reserve(UInt64 /*bytes*/)
{
throw Exception("Method reserve is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
@ -456,13 +461,20 @@ MetadataStoragePtr DiskMemory::getMetadataStorage()
using DiskMemoryPtr = std::shared_ptr<DiskMemory>;
void registerDiskMemory(DiskFactory & factory)
void registerDiskMemory(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & /*config*/,
const String & /*config_prefix*/,
ContextPtr /*context*/,
const DisksMap & /*map*/) -> DiskPtr { return std::make_shared<DiskMemory>(name); };
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk = std::make_shared<DiskMemory>(name);
disk->startup(context, skip_access_check);
return disk;
};
factory.registerDiskType("memory", creator);
}

View File

@ -8,7 +8,7 @@
namespace DB
{
class DiskMemory;
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
@ -22,9 +22,7 @@ class WriteBufferFromFileBase;
class DiskMemory : public IDisk
{
public:
explicit DiskMemory(const String & name_) : name(name_), disk_path("memory://" + name_ + '/') {}
const String & getName() const override { return name; }
explicit DiskMemory(const String & name_);
const String & getPath() const override { return disk_path; }
@ -121,7 +119,6 @@ private:
};
using Files = std::unordered_map<String, FileData>; /// file path -> file data
const String name;
const String disk_path;
Files files;
mutable std::mutex mutex;

View File

@ -79,7 +79,8 @@ private:
};
DiskRestartProxy::DiskRestartProxy(DiskPtr & delegate_)
: DiskDecorator(delegate_) { }
: DiskDecorator(delegate_)
{}
ReservationPtr DiskRestartProxy::reserve(UInt64 bytes)
{
@ -368,7 +369,8 @@ void DiskRestartProxy::restart(ContextPtr context)
LOG_INFO(log, "Restart lock acquired. Restarting disk {}", DiskDecorator::getName());
DiskDecorator::startup(context);
/// NOTE: access checking will cause deadlock here, so skip it.
DiskDecorator::startup(context, /* skip_access_check= */ true);
LOG_INFO(log, "Disk restarted {}", DiskDecorator::getName());
}

View File

@ -6,6 +6,7 @@
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Core/ServerUUID.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/FakeMetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/LocalObjectStorage.h>
@ -17,6 +18,8 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_READ_ALL_DATA;
extern const int LOGICAL_ERROR;
}
bool IDisk::isDirectoryEmpty(const String & path) const
@ -126,4 +129,87 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
return nullptr;
}
void IDisk::startup(ContextPtr context, bool skip_access_check)
{
if (!skip_access_check)
{
if (isReadOnly())
{
LOG_DEBUG(&Poco::Logger::get("IDisk"),
"Skip access check for disk {} (read-only disk).",
getName());
}
else
checkAccess();
}
startupImpl(context);
}
void IDisk::checkAccess()
{
DB::UUID server_uuid = DB::ServerUUID::get();
if (server_uuid == DB::UUIDHelpers::Nil)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Server UUID is not initialized");
const String path = fmt::format("clickhouse_access_check_{}", DB::toString(server_uuid));
checkAccessImpl(path);
}
/// NOTE: should we mark the disk readonly if the write/unlink fails instead of throws?
void IDisk::checkAccessImpl(const String & path)
try
{
const std::string_view payload("test", 4);
/// write
{
auto file = writeFile(path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
try
{
file->write(payload.data(), payload.size());
}
catch (...)
{
/// Log current exception, because finalize() can throw a different exception.
tryLogCurrentException(__PRETTY_FUNCTION__);
file->finalize();
throw;
}
}
/// read
{
auto file = readFile(path);
String buf(payload.size(), '0');
file->readStrict(buf.data(), buf.size());
if (buf != payload)
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Content of {}::{} does not matches after read ({} vs {})", name, path, buf, payload);
}
}
/// read with offset
{
auto file = readFile(path);
auto offset = 2;
String buf(payload.size() - offset, '0');
file->seek(offset, 0);
file->readStrict(buf.data(), buf.size());
if (buf != payload.substr(offset))
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Content of {}::{} does not matches after read with offset ({} vs {})", name, path, buf, payload.substr(offset));
}
}
/// remove
removeFile(path);
}
catch (Exception & e)
{
e.addMessage(fmt::format("While checking access for disk {}", name));
throw;
}
}

View File

@ -107,8 +107,9 @@ class IDisk : public Space
{
public:
/// Default constructor.
explicit IDisk(std::shared_ptr<Executor> executor_ = std::make_shared<SyncExecutor>())
: executor(executor_)
explicit IDisk(const String & name_, std::shared_ptr<Executor> executor_ = std::make_shared<SyncExecutor>())
: name(name_)
, executor(executor_)
{
}
@ -121,6 +122,9 @@ public:
/// It's not required to be a local filesystem path.
virtual const String & getPath() const = 0;
/// Return disk name.
const String & getName() const override { return name; }
/// Total available space on the disk.
virtual UInt64 getTotalSpace() const = 0;
@ -316,8 +320,11 @@ public:
/// Invoked when Global Context is shutdown.
virtual void shutdown() {}
/// Performs action on disk startup.
virtual void startup(ContextPtr) {}
/// Performs access check and custom action on disk startup.
void startup(ContextPtr context, bool skip_access_check);
/// Performs custom action on disk startup.
virtual void startupImpl(ContextPtr) {}
/// Return some uniq string for file, overrode for IDiskRemote
/// Required for distinguish different copies of the same part on remote disk
@ -400,6 +407,8 @@ public:
protected:
friend class DiskDecorator;
const String name;
/// Returns executor to perform asynchronous operations.
virtual Executor & getExecutor() { return *executor; }
@ -408,8 +417,13 @@ protected:
/// A derived class may override copy() to provide a faster implementation.
void copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir = true);
virtual void checkAccessImpl(const String & path);
private:
std::shared_ptr<Executor> executor;
/// Check access to the disk.
void checkAccess();
};
using Disks = std::vector<DiskPtr>;

View File

@ -42,7 +42,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
if (read_hint.has_value())
estimated_size = *read_hint;
else if (file_size.has_value())
estimated_size = file_size.has_value() ? *file_size : 0;
estimated_size = *file_size;
if (!existing_memory
&& settings.local_fs_method == LocalFSReadMethod::mmap
@ -158,7 +158,15 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
#endif
ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary);
return create(settings.local_fs_buffer_size, flags);
size_t buffer_size = settings.local_fs_buffer_size;
/// Check if the buffer can be smaller than default
if (read_hint.has_value() && *read_hint > 0 && *read_hint < buffer_size)
buffer_size = *read_hint;
if (file_size.has_value() && *file_size < buffer_size)
buffer_size = *file_size;
return create(buffer_size, flags);
}
}

View File

@ -17,55 +17,9 @@
namespace DB
{
namespace ErrorCodes
void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access_check)
{
extern const int PATH_ACCESS_DENIED;
}
namespace
{
constexpr char test_file[] = "test.txt";
constexpr char test_str[] = "test";
constexpr size_t test_str_size = 4;
void checkWriteAccess(IDisk & disk)
{
auto file = disk.writeFile(test_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write(test_str, test_str_size);
}
void checkReadAccess(IDisk & disk)
{
auto file = disk.readFile(test_file);
String buf(test_str_size, '0');
file->readStrict(buf.data(), test_str_size);
if (buf != test_str)
throw Exception("No read access to disk", ErrorCodes::PATH_ACCESS_DENIED);
}
void checkReadWithOffset(IDisk & disk)
{
auto file = disk.readFile(test_file);
auto offset = 2;
auto test_size = test_str_size - offset;
String buf(test_size, '0');
file->seek(offset, 0);
file->readStrict(buf.data(), test_size);
if (buf != test_str + offset)
throw Exception("Failed to read file with offset", ErrorCodes::PATH_ACCESS_DENIED);
}
void checkRemoveAccess(IDisk & disk)
{
disk.removeFile(test_file);
}
}
void registerDiskAzureBlobStorage(DiskFactory & factory)
{
auto creator = [](
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
@ -94,15 +48,8 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
copy_thread_pool_size
);
if (!config.getBool(config_prefix + ".skip_access_check", false))
{
checkWriteAccess(*azure_blob_storage_disk);
checkReadAccess(*azure_blob_storage_disk);
checkReadWithOffset(*azure_blob_storage_disk);
checkRemoveAccess(*azure_blob_storage_disk);
}
azure_blob_storage_disk->startup(context);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
azure_blob_storage_disk->startup(context, skip_access_check);
return std::make_shared<DiskRestartProxy>(azure_blob_storage_disk);
};
@ -117,7 +64,7 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
namespace DB
{
void registerDiskAzureBlobStorage(DiskFactory &) {}
void registerDiskAzureBlobStorage(DiskFactory &, bool /* global_skip_access_check */) {}
}

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
void registerDiskCache(DiskFactory & factory)
void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check */)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,

View File

@ -109,8 +109,7 @@ DiskObjectStorage::DiskObjectStorage(
ObjectStoragePtr object_storage_,
bool send_metadata_,
uint64_t thread_pool_size_)
: IDisk(getAsyncExecutor(log_name, thread_pool_size_))
, name(name_)
: IDisk(name_, getAsyncExecutor(log_name, thread_pool_size_))
, object_storage_root_path(object_storage_root_path_)
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
, metadata_storage(std::move(metadata_storage_))
@ -420,9 +419,8 @@ void DiskObjectStorage::shutdown()
LOG_INFO(log, "Disk {} shut down", name);
}
void DiskObjectStorage::startup(ContextPtr context)
void DiskObjectStorage::startupImpl(ContextPtr context)
{
LOG_INFO(log, "Starting up disk {}", name);
object_storage->startup();

View File

@ -45,8 +45,6 @@ public:
bool supportParallelWrite() const override { return object_storage->supportParallelWrite(); }
const String & getName() const override { return name; }
const String & getPath() const override { return metadata_storage->getPath(); }
StoredObjects getStorageObjects(const String & local_path) const override;
@ -138,7 +136,7 @@ public:
void shutdown() override;
void startup(ContextPtr context) override;
void startupImpl(ContextPtr context) override;
ReservationPtr reserve(UInt64 bytes) override;
@ -212,7 +210,6 @@ private:
/// execution.
DiskTransactionPtr createObjectStorageTransaction();
const String name;
const String object_storage_root_path;
Poco::Logger * log;

View File

@ -14,13 +14,14 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
void registerDiskHDFS(DiskFactory & factory)
void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context_,
const DisksMap & /*map*/) -> DiskPtr
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
String uri{config.getString(config_prefix + ".endpoint")};
checkHDFSURL(uri);
@ -31,19 +32,20 @@ void registerDiskHDFS(DiskFactory & factory)
std::unique_ptr<HDFSObjectStorageSettings> settings = std::make_unique<HDFSObjectStorageSettings>(
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
context_->getSettingsRef().hdfs_replication
context->getSettingsRef().hdfs_replication
);
/// FIXME Cache currently unsupported :(
ObjectStoragePtr hdfs_storage = std::make_unique<HDFSObjectStorage>(uri, std::move(settings), config);
auto [_, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context_);
auto [_, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri);
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk_result = std::make_shared<DiskObjectStorage>(
DiskPtr disk = std::make_shared<DiskObjectStorage>(
name,
uri,
"DiskHDFS",
@ -51,8 +53,9 @@ void registerDiskHDFS(DiskFactory & factory)
std::move(hdfs_storage),
/* send_metadata = */ false,
copy_thread_pool_size);
disk->startup(context, skip_access_check);
return std::make_shared<DiskRestartProxy>(disk_result);
return std::make_shared<DiskRestartProxy>(disk);
};
factory.registerDiskType("hdfs", creator);

View File

@ -4,6 +4,7 @@
#include <Common/getRandomASCIIString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <optional>
#include <ranges>
#include <filesystem>
@ -62,7 +63,7 @@ UnlinkFileOperation::UnlinkFileOperation(const std::string & path_, IDisk & disk
void UnlinkFileOperation::execute(std::unique_lock<std::shared_mutex> &)
{
auto buf = disk.readFile(path);
auto buf = disk.readFile(path, ReadSettings{}, std::nullopt, disk.getFileSize(path));
readStringUntilEOF(prev_data, *buf);
disk.removeFile(path);
}

View File

@ -22,6 +22,7 @@
#include <IO/S3Common.h>
#include <Storages/StorageS3Settings.h>
#include <Core/ServerUUID.h>
namespace DB
@ -30,90 +31,78 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int PATH_ACCESS_DENIED;
extern const int LOGICAL_ERROR;
}
namespace
{
void checkWriteAccess(IDisk & disk)
class CheckAccess
{
auto file = disk.writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
try
{
file->write("test", 4);
}
catch (...)
{
/// Log current exception, because finalize() can throw a different exception.
tryLogCurrentException(__PRETTY_FUNCTION__);
file->finalize();
throw;
}
}
void checkReadAccess(const String & disk_name, IDisk & disk)
{
auto file = disk.readFile("test_acl");
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read access to S3 bucket in disk " + disk_name, ErrorCodes::PATH_ACCESS_DENIED);
}
void checkRemoveAccess(IDisk & disk)
{
disk.removeFile("test_acl");
}
bool checkBatchRemoveIsMissing(S3ObjectStorage & storage, const String & key_with_trailing_slash)
{
StoredObject object(key_with_trailing_slash + "_test_remove_objects_capability");
try
{
auto file = storage.writeObject(object, WriteMode::Rewrite);
file->write("test", 4);
file->finalize();
}
catch (...)
public:
static bool checkBatchRemove(S3ObjectStorage & storage, const String & key_with_trailing_slash)
{
/// NOTE: key_with_trailing_slash is the disk prefix, it is required
/// because access is done via S3ObjectStorage not via IDisk interface
/// (since we don't have disk yet).
const String path = fmt::format("{}clickhouse_remove_objects_capability_{}", key_with_trailing_slash, getServerUUID());
StoredObject object(path);
try
{
storage.removeObject(object);
auto file = storage.writeObject(object, WriteMode::Rewrite);
file->write("test", 4);
file->finalize();
}
catch (...)
{
try
{
storage.removeObject(object);
}
catch (...)
{
}
return true; /// We don't have write access, therefore no information about batch remove.
}
return false; /// We don't have write access, therefore no information about batch remove.
}
try
{
/// Uses `DeleteObjects` request (batch delete).
storage.removeObjects({object});
return false;
}
catch (const Exception &)
{
try
{
storage.removeObject(object);
/// Uses `DeleteObjects` request (batch delete).
storage.removeObjects({object});
return true;
}
catch (...)
catch (const Exception &)
{
try
{
storage.removeObject(object);
}
catch (...)
{
}
return false;
}
return true;
}
}
private:
static String getServerUUID()
{
DB::UUID server_uuid = DB::ServerUUID::get();
if (server_uuid == DB::UUIDHelpers::Nil)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Server UUID is not initialized");
return DB::toString(server_uuid);
}
};
}
void registerDiskS3(DiskFactory & factory)
void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
@ -144,12 +133,12 @@ void registerDiskS3(DiskFactory & factory)
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key);
}
bool skip_access_check = config.getBool(config_prefix + ".skip_access_check", false);
/// NOTE: should we still perform this check for clickhouse-disks?
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
if (!skip_access_check)
{
/// If `support_batch_delete` is turned on (default), check and possibly switch it off.
if (s3_capabilities.support_batch_delete && checkBatchRemoveIsMissing(*s3_storage, uri.key))
if (s3_capabilities.support_batch_delete && !CheckAccess::checkBatchRemove(*s3_storage, uri.key))
{
LOG_WARNING(
&Poco::Logger::get("registerDiskS3"),
@ -165,7 +154,7 @@ void registerDiskS3(DiskFactory & factory)
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
std::shared_ptr<DiskObjectStorage> s3disk = std::make_shared<DiskObjectStorage>(
DiskObjectStoragePtr s3disk = std::make_shared<DiskObjectStorage>(
name,
uri.key,
type == "s3" ? "DiskS3" : "DiskS3Plain",
@ -174,15 +163,7 @@ void registerDiskS3(DiskFactory & factory)
send_metadata,
copy_thread_pool_size);
/// This code is used only to check access to the corresponding disk.
if (!skip_access_check)
{
checkWriteAccess(*s3disk);
checkReadAccess(name, *s3disk);
checkRemoveAccess(*s3disk);
}
s3disk->startup(context);
s3disk->startup(context, skip_access_check);
std::shared_ptr<IDisk> disk_result = s3disk;
@ -196,6 +177,6 @@ void registerDiskS3(DiskFactory & factory)
#else
void registerDiskS3(DiskFactory &) {}
void registerDiskS3(DiskFactory &, bool /* global_skip_access_check */) {}
#endif

View File

@ -14,15 +14,17 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
void registerDiskWebServer(DiskFactory & factory)
void registerDiskWebServer(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & disk_name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
auto creator = [global_skip_access_check](
const String & disk_name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
String uri{config.getString(config_prefix + ".endpoint")};
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
if (!uri.ends_with('/'))
throw Exception(
@ -41,7 +43,7 @@ void registerDiskWebServer(DiskFactory & factory)
auto metadata_storage = std::make_shared<MetadataStorageFromStaticFilesWebServer>(assert_cast<const WebObjectStorage &>(*object_storage));
std::string root_path;
return std::make_shared<DiskObjectStorage>(
DiskPtr disk = std::make_shared<DiskObjectStorage>(
disk_name,
root_path,
"DiskWebServer",
@ -49,6 +51,8 @@ void registerDiskWebServer(DiskFactory & factory)
object_storage,
/* send_metadata */false,
/* threadpool_size */16);
disk->startup(context, skip_access_check);
return disk;
};
factory.registerDiskType("web", creator);

View File

@ -7,55 +7,55 @@
namespace DB
{
void registerDiskLocal(DiskFactory & factory);
void registerDiskMemory(DiskFactory & factory);
void registerDiskLocal(DiskFactory & factory, bool global_skip_access_check);
void registerDiskMemory(DiskFactory & factory, bool global_skip_access_check);
#if USE_AWS_S3
void registerDiskS3(DiskFactory & factory);
void registerDiskS3(DiskFactory & factory, bool global_skip_access_check);
#endif
#if USE_AZURE_BLOB_STORAGE
void registerDiskAzureBlobStorage(DiskFactory & factory);
void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access_check);
#endif
#if USE_SSL
void registerDiskEncrypted(DiskFactory & factory);
void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check);
#endif
#if USE_HDFS
void registerDiskHDFS(DiskFactory & factory);
void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check);
#endif
void registerDiskWebServer(DiskFactory & factory);
void registerDiskWebServer(DiskFactory & factory, bool global_skip_access_check);
void registerDiskCache(DiskFactory & factory);
void registerDiskCache(DiskFactory & factory, bool global_skip_access_check);
void registerDisks()
void registerDisks(bool global_skip_access_check)
{
auto & factory = DiskFactory::instance();
registerDiskLocal(factory);
registerDiskMemory(factory);
registerDiskLocal(factory, global_skip_access_check);
registerDiskMemory(factory, global_skip_access_check);
#if USE_AWS_S3
registerDiskS3(factory);
registerDiskS3(factory, global_skip_access_check);
#endif
#if USE_AZURE_BLOB_STORAGE
registerDiskAzureBlobStorage(factory);
registerDiskAzureBlobStorage(factory, global_skip_access_check);
#endif
#if USE_SSL
registerDiskEncrypted(factory);
registerDiskEncrypted(factory, global_skip_access_check);
#endif
#if USE_HDFS
registerDiskHDFS(factory);
registerDiskHDFS(factory, global_skip_access_check);
#endif
registerDiskWebServer(factory);
registerDiskWebServer(factory, global_skip_access_check);
registerDiskCache(factory);
registerDiskCache(factory, global_skip_access_check);
}
}

View File

@ -2,5 +2,10 @@
namespace DB
{
void registerDisks();
/// @param global_skip_access_check - skip access check regardless regardless
/// .skip_access_check config directive (used
/// for clickhouse-disks)
void registerDisks(bool global_skip_access_check);
}

View File

@ -1,31 +1,40 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypesNumber.h>
#include <Disks/IDisk.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <filesystem>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNKNOWN_DISK;
}
namespace
{
struct FilesystemAvailable
{
static constexpr auto name = "filesystemAvailable";
static std::uintmax_t get(const std::filesystem::space_info & spaceinfo) { return spaceinfo.available; }
static std::uintmax_t get(const DiskPtr & disk) { return disk->getAvailableSpace(); }
};
struct FilesystemFree
struct FilesystemUnreserved
{
static constexpr auto name = "filesystemFree";
static std::uintmax_t get(const std::filesystem::space_info & spaceinfo) { return spaceinfo.free; }
static constexpr auto name = "filesystemUnreserved";
static std::uintmax_t get(const DiskPtr & disk) { return disk->getUnreservedSpace(); }
};
struct FilesystemCapacity
{
static constexpr auto name = "filesystemCapacity";
static std::uintmax_t get(const std::filesystem::space_info & spaceinfo) { return spaceinfo.capacity; }
static std::uintmax_t get(const DiskPtr & disk) { return disk->getTotalSpace(); }
};
template <typename Impl>
@ -34,34 +43,72 @@ class FilesystemImpl : public IFunction
public:
static constexpr auto name = Impl::name;
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FilesystemImpl<Impl>>(std::filesystem::space(context->getPath()));
}
static FunctionPtr create(ContextPtr context_) { return std::make_shared<FilesystemImpl<Impl>>(context_); }
explicit FilesystemImpl(ContextPtr context_) : context(context_) { }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override
{
return false;
}
explicit FilesystemImpl(std::filesystem::space_info spaceinfo_) : spaceinfo(spaceinfo_) { }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool isDeterministic() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.size() > 1)
{
throw Exception("Arguments size of function " + getName() + " should be 0 or 1", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
if (arguments.size() == 1 && !isStringOrFixedString(arguments[0]))
{
throw Exception(
"Arguments of function " + getName() + " should be String or FixedString", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
return std::make_shared<DataTypeUInt64>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeUInt64().createColumnConst(input_rows_count, static_cast<UInt64>(Impl::get(spaceinfo)));
if (arguments.empty())
{
auto disk = context->getDisk("default");
return DataTypeUInt64().createColumnConst(input_rows_count, Impl::get(disk));
}
else
{
auto col = arguments[0].column;
if (const ColumnString * col_str = checkAndGetColumn<ColumnString>(col.get()))
{
auto disk_map = context->getDisksMap();
auto col_res = ColumnVector<UInt64>::create(col_str->size());
auto & data = col_res->getData();
for (size_t i = 0; i < col_str->size(); ++i)
{
auto disk_name = col_str->getDataAt(i).toString();
if (auto it = disk_map.find(disk_name); it != disk_map.end())
data[i] = Impl::get(it->second);
else
throw Exception(
"Unknown disk name " + disk_name + " while execute function " + getName(), ErrorCodes::UNKNOWN_DISK);
}
return col_res;
}
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
}
}
private:
std::filesystem::space_info spaceinfo;
ContextPtr context;
};
}
@ -70,7 +117,7 @@ REGISTER_FUNCTION(Filesystem)
{
factory.registerFunction<FilesystemImpl<FilesystemAvailable>>();
factory.registerFunction<FilesystemImpl<FilesystemCapacity>>();
factory.registerFunction<FilesystemImpl<FilesystemFree>>();
factory.registerFunction<FilesystemImpl<FilesystemUnreserved>>();
}
}

View File

@ -1016,6 +1016,7 @@ public:
size_t getNumberOfArguments() const override { return 3; }
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForNothing() const override { return false; }
bool isShortCircuit(ShortCircuitSettings & settings, size_t /*number_of_arguments*/) const override
{
settings.enable_lazy_execution_for_first_argument = false;

View File

@ -50,6 +50,7 @@ public:
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
size_t getNumberOfArguments() const override { return 0; }
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForNothing() const override { return false; }
ColumnNumbers getArgumentsThatDontImplyNullableReturnType(size_t number_of_arguments) const override
{

View File

@ -33,7 +33,7 @@ try
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
registerDisks(/* global_skip_access_check= */ true);
registerFormats();
return true;

View File

@ -925,7 +925,7 @@ public:
, ErrorCodes::SYNTAX_ERROR);
}
if (allow_function_parameters && ParserToken(TokenType::OpeningRoundBracket).ignore(pos, expected))
if (allow_function_parameters && !parameters && ParserToken(TokenType::OpeningRoundBracket).ignore(pos, expected))
{
parameters = std::make_shared<ASTExpressionList>();
std::swap(parameters->children, elements);

View File

@ -22,7 +22,7 @@ bool ParserTableExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
auto res = std::make_shared<ASTTableExpression>();
if (!ParserWithOptionalAlias(std::make_unique<ParserSubquery>(), true).parse(pos, res->subquery, expected)
&& !ParserWithOptionalAlias(std::make_unique<ParserFunction>(true, true), true).parse(pos, res->table_function, expected)
&& !ParserWithOptionalAlias(std::make_unique<ParserFunction>(false, true), true).parse(pos, res->table_function, expected)
&& !ParserWithOptionalAlias(std::make_unique<ParserCompoundIdentifier>(true, true), true)
.parse(pos, res->database_and_table_name, expected))
return false;

View File

@ -99,32 +99,24 @@ Pipe StorageHDFSCluster::read(
addColumnsStructureToQueryWithClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 3, getName());
for (const auto & replicas : cluster->getShardsAddresses())
const auto & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard_info : cluster->getShardsInfo())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
auto try_results = shard_info.pool->getMany(timeouts, &current_settings, PoolMode::GET_MANY);
for (auto & try_result : try_results)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"HDFSClusterInititiator",
node.compression,
node.secure
);
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
queryToString(query_to_send),
header,
context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
RemoteQueryExecutor::Extension{.task_iterator = callback});
shard_info.pool,
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
header,
context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
RemoteQueryExecutor::Extension{.task_iterator = callback});
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}

View File

@ -315,7 +315,9 @@ MergeTreeReadTaskColumns getReadTaskColumns(
/// 1. Columns for row level filter
if (prewhere_info->row_level_filter)
{
Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames();
Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames();
injectRequiredColumns(
data_part_info_for_reader, storage_snapshot, with_subcolumns, row_filter_column_names);
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, row_filter_column_names));
pre_name_set.insert(row_filter_column_names.begin(), row_filter_column_names.end());
}
@ -323,7 +325,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
/// 2. Columns for prewhere
Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
const auto injected_pre_columns = injectRequiredColumns(
injectRequiredColumns(
data_part_info_for_reader, storage_snapshot, with_subcolumns, all_pre_column_names);
for (const auto & name : all_pre_column_names)

View File

@ -1,7 +1,7 @@
#include "config.h"
#if USE_AWS_S3
#include <Storages/StorageDelta.h>
#include <Storages/StorageDeltaLake.h>
#include <Common/logger_useful.h>
#include <IO/ReadBufferFromS3.h>
@ -47,7 +47,7 @@ void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */)
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid table metadata, tried to remove {} before adding it", filename);
}
std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
std::vector<String> DeltaLakeMetadata::listCurrentFiles() &&
{
std::vector<String> keys;
keys.reserve(file_update_time.size());
@ -61,10 +61,10 @@ std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context)
: base_configuration(configuration_), table_path(table_path_)
{
Init(context);
init(context);
}
void JsonMetadataGetter::Init(ContextPtr context)
void JsonMetadataGetter::init(ContextPtr context)
{
auto keys = getJsonLogFiles();
@ -180,7 +180,53 @@ void JsonMetadataGetter::handleJSON(const JSON & json)
}
}
StorageDelta::StorageDelta(
namespace
{
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
{
return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers};
}
// DeltaLake stores data in parts in different files
// keys is vector of parts with latest version
// generateQueryFromKeys constructs query from parts filenames for
// underlying StorageS3 engine
String generateQueryFromKeys(const std::vector<String> & keys)
{
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
return new_query;
}
StorageS3Configuration getAdjustedS3Configuration(
const ContextPtr & context,
StorageS3::S3Configuration & base_configuration,
const StorageS3Configuration & configuration,
const std::string & table_path,
Poco::Logger * log)
{
JsonMetadataGetter getter{base_configuration, table_path, context};
auto keys = getter.getFiles();
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys);
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
// set new url in configuration
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
new_configuration.format = configuration.format;
return new_configuration;
}
}
StorageDeltaLake::StorageDeltaLake(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
@ -189,28 +235,14 @@ StorageDelta::StorageDelta(
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
, base_configuration{getBaseConfiguration(configuration_)}
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key)
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
JsonMetadataGetter getter{base_configuration, table_path, context_};
auto keys = getter.getFiles();
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys));
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
// set new url in configuration
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
new_configuration.format = configuration_.format;
auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, configuration_, table_path, log);
if (columns_.empty())
{
@ -238,7 +270,7 @@ StorageDelta::StorageDelta(
nullptr);
}
Pipe StorageDelta::read(
Pipe StorageDeltaLake::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -252,16 +284,18 @@ Pipe StorageDelta::read(
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
String StorageDelta::generateQueryFromKeys(std::vector<String> && keys)
ColumnsDescription StorageDeltaLake::getTableStructureFromData(
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
{
// DeltaLake store data parts in different files
// keys are filenames of parts
// for StorageS3 to read all parts we need format {key1,key2,key3,...keyn}
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
return new_query;
auto base_configuration = getBaseConfiguration(configuration);
StorageS3::updateS3Configuration(ctx, base_configuration);
auto new_configuration = getAdjustedS3Configuration(
ctx, base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
return StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
}
void registerStorageDelta(StorageFactory & factory)
void registerStorageDeltaLake(StorageFactory & factory)
{
factory.registerStorage(
"DeltaLake",
@ -287,7 +321,7 @@ void registerStorageDelta(StorageFactory & factory)
configuration.format = "Parquet";
}
return std::make_shared<StorageDelta>(
return std::make_shared<StorageDeltaLake>(
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), std::nullopt);
},
{

View File

@ -32,7 +32,7 @@ public:
void setLastModifiedTime(const String & filename, uint64_t timestamp);
void remove(const String & filename, uint64_t timestamp);
std::vector<String> ListCurrentFiles() &&;
std::vector<String> listCurrentFiles() &&;
private:
std::unordered_map<String, uint64_t> file_update_time;
@ -44,10 +44,10 @@ class JsonMetadataGetter
public:
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context);
std::vector<String> getFiles() { return std::move(metadata).ListCurrentFiles(); }
std::vector<String> getFiles() { return std::move(metadata).listCurrentFiles(); }
private:
void Init(ContextPtr context);
void init(ContextPtr context);
std::vector<String> getJsonLogFiles();
@ -60,13 +60,13 @@ private:
DeltaLakeMetadata metadata;
};
class StorageDelta : public IStorage
class StorageDeltaLake : public IStorage
{
public:
// 1. Parses internal file structure of table
// 2. Finds out parts with latest version
// 3. Creates url for underlying StorageS3 enigne to handle reads
StorageDelta(
StorageDeltaLake(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
@ -87,14 +87,12 @@ public:
size_t max_block_size,
size_t num_streams) override;
static ColumnsDescription getTableStructureFromData(
const StorageS3Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
private:
void Init();
// DeltaLake stores data in parts in different files
// keys is vector of parts with latest version
// generateQueryFromKeys constructs query from parts filenames for
// underlying StorageS3 engine
static String generateQueryFromKeys(std::vector<String> && keys);
void init();
StorageS3::S3Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine;

View File

@ -28,115 +28,20 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
StorageHudi::StorageHudi(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key)
namespace
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
auto keys = getKeysFromS3();
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration_.format);
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
new_configuration.format = configuration_.format;
if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
s3engine = std::make_shared<StorageS3>(
new_configuration,
table_id_,
columns_,
constraints_,
comment,
context_,
format_settings_,
/* distributed_processing_ */ false,
nullptr);
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
{
return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers};
}
Pipe StorageHudi::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
StorageS3::updateS3Configuration(context, base_configuration);
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
std::vector<std::string> StorageHudi::getKeysFromS3()
{
std::vector<std::string> keys;
const auto & client = base_configuration.client;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};
const auto bucket{base_configuration.uri.bucket};
request.SetBucket(bucket);
request.SetPrefix(table_path);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(table_path),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
keys.push_back(filename);
LOG_DEBUG(log, "Found file: {}", filename);
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return keys;
}
String StorageHudi::generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
/// Apache Hudi store parts of data in different files.
/// Every part file has timestamp in it.
/// Every partition(directory) in Apache Hudi has different versions of part.
/// To find needed parts we need to find out latest part file for every partition.
/// Part format is usually parquet, but can differ.
String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
{
/// For each partition path take only latest file.
struct FileInfo
@ -187,6 +92,138 @@ String StorageHudi::generateQueryFromKeys(const std::vector<std::string> & keys,
return "{" + list_of_keys + "}";
}
std::vector<std::string> getKeysFromS3(const StorageS3::S3Configuration & base_configuration, const std::string & table_path, Poco::Logger * log)
{
std::vector<std::string> keys;
const auto & client = base_configuration.client;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};
const auto bucket{base_configuration.uri.bucket};
request.SetBucket(bucket);
request.SetPrefix(table_path);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(table_path),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
keys.push_back(filename);
LOG_DEBUG(log, "Found file: {}", filename);
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return keys;
}
StorageS3Configuration getAdjustedS3Configuration(
StorageS3::S3Configuration & base_configuration,
const StorageS3Configuration & configuration,
const std::string & table_path,
Poco::Logger * log)
{
auto keys = getKeysFromS3(base_configuration, table_path, log);
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration.format);
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
new_configuration.format = configuration.format;
return new_configuration;
}
}
StorageHudi::StorageHudi(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{getBaseConfiguration(configuration_)}
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key)
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
auto new_configuration = getAdjustedS3Configuration(base_configuration, configuration_, table_path, log);
if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
s3engine = std::make_shared<StorageS3>(
new_configuration,
table_id_,
columns_,
constraints_,
comment,
context_,
format_settings_,
/* distributed_processing_ */ false,
nullptr);
}
Pipe StorageHudi::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
StorageS3::updateS3Configuration(context, base_configuration);
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
ColumnsDescription StorageHudi::getTableStructureFromData(
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
{
auto base_configuration = getBaseConfiguration(configuration);
StorageS3::updateS3Configuration(ctx, base_configuration);
auto new_configuration = getAdjustedS3Configuration(
base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
return StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
}
void registerStorageHudi(StorageFactory & factory)
{

View File

@ -48,16 +48,11 @@ public:
size_t max_block_size,
size_t num_streams) override;
static ColumnsDescription getTableStructureFromData(
const StorageS3Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
private:
std::vector<std::string> getKeysFromS3();
/// Apache Hudi store parts of data in different files.
/// Every part file has timestamp in it.
/// Every partition(directory) in Apache Hudi has different versions of part.
/// To find needed parts we need to find out latest part file for every partition.
/// Part format is usually parquet, but can differ.
static String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format);
StorageS3::S3Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine;
Poco::Logger * log;

View File

@ -214,7 +214,7 @@ private:
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
friend class StorageHudi;
friend class StorageDelta;
friend class StorageDeltaLake;
S3Configuration s3_configuration;
std::vector<String> keys;

View File

@ -34,7 +34,7 @@ void registerStorageS3(StorageFactory & factory);
void registerStorageCOS(StorageFactory & factory);
void registerStorageOSS(StorageFactory & factory);
void registerStorageHudi(StorageFactory & factory);
void registerStorageDelta(StorageFactory & factory);
void registerStorageDeltaLake(StorageFactory & factory);
#endif
#if USE_HDFS
@ -123,7 +123,7 @@ void registerStorages()
registerStorageCOS(factory);
registerStorageOSS(factory);
registerStorageHudi(factory);
registerStorageDelta(factory);
registerStorageDeltaLake(factory);
#endif
#if USE_HDFS

View File

@ -86,6 +86,16 @@ private:
struct TableFunctionProperties
{
Documentation documentation;
/** It is determined by the possibility of modifying any data or making requests to arbitrary hostnames.
*
* If users can make a request to an arbitrary hostname, they can get the info from the internal network
* or manipulate internal APIs (say - put some data into Memcached, which is available only in the corporate network).
* This is named "SSRF attack".
* Or a user can use an open ClickHouse server to amplify DoS attacks.
*
* In those cases, the table function should not be allowed in readonly mode.
*/
bool allow_readonly = false;
};

View File

@ -10,7 +10,7 @@
# include <Interpreters/evaluateConstantExpression.h>
# include <Interpreters/parseColumnsListForTableFunction.h>
# include <Parsers/ASTLiteral.h>
# include <Storages/StorageDelta.h>
# include <Storages/StorageDeltaLake.h>
# include <Storages/StorageURL.h>
# include <Storages/checkAndGetLiteralArgument.h>
# include <TableFunctions/TableFunctionDeltaLake.h>
@ -27,7 +27,7 @@ namespace ErrorCodes
}
void TableFunctionDelta::parseArgumentsImpl(
void TableFunctionDeltaLake::parseArgumentsImpl(
const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & base_configuration)
{
if (args.empty() || args.size() > 6)
@ -100,7 +100,7 @@ void TableFunctionDelta::parseArgumentsImpl(
= checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
}
void TableFunctionDelta::parseArguments(const ASTPtr & ast_function, ContextPtr context)
void TableFunctionDeltaLake::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
/// Parse args
ASTs & args_func = ast_function->children;
@ -125,18 +125,18 @@ void TableFunctionDelta::parseArguments(const ASTPtr & ast_function, ContextPtr
parseArgumentsImpl(message, args, context, configuration);
}
ColumnsDescription TableFunctionDelta::getActualTableStructure(ContextPtr context) const
ColumnsDescription TableFunctionDeltaLake::getActualTableStructure(ContextPtr context) const
{
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
return StorageDeltaLake::getTableStructureFromData(configuration, std::nullopt, context);
}
return parseColumnsListFromString(configuration.structure, context);
}
StoragePtr TableFunctionDelta::executeImpl(
StoragePtr TableFunctionDeltaLake::executeImpl(
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
Poco::URI uri(configuration.url);
@ -146,7 +146,7 @@ StoragePtr TableFunctionDelta::executeImpl(
if (configuration.structure != "auto")
columns = parseColumnsListFromString(configuration.structure, context);
StoragePtr storage = std::make_shared<StorageDelta>(
StoragePtr storage = std::make_shared<StorageDeltaLake>(
configuration, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, String{}, context, std::nullopt);
storage->startup();
@ -155,9 +155,9 @@ StoragePtr TableFunctionDelta::executeImpl(
}
void registerTableFunctionDelta(TableFunctionFactory & factory)
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDelta>(
factory.registerFunction<TableFunctionDeltaLake>(
{.documentation
= {R"(The table function can be used to read the DeltaLake table stored on object store.)",
Documentation::Examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)"}},

View File

@ -16,7 +16,7 @@ class TableFunctionS3Cluster;
/* deltaLake(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary DeltaLake table on S3.
*/
class TableFunctionDelta : public ITableFunction
class TableFunctionDeltaLake : public ITableFunction
{
public:
static constexpr auto name = "deltaLake";

View File

@ -89,9 +89,72 @@ StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, Con
return res;
}
static const Documentation format_table_function_documentation =
{
R"(
Extracts table structure from data and parses it according to specified input format.
Syntax: `format(format_name, data)`.
Parameters:
- `format_name` - the format of the data.
- `data ` - String literal or constant expression that returns a string containing data in specified format.
Returned value: A table with data parsed from `data` argument according specified format and extracted schema.
)",
Documentation::Examples
{
{
"First example",
R"(
Query:
```
:) select * from format(JSONEachRow,
$$
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 112}
{"a": "World", "b": 124}
$$)
```
Result:
```
ba
111 Hello
123 World
112 Hello
124 World
```
)"
},
{
"Second example",
R"(
Query:
```
:) desc format(JSONEachRow,
$$
{"a": "Hello", "b": 111}
{"a": "World", "b": 123}
{"a": "Hello", "b": 112}
{"a": "World", "b": 124}
$$)
```
Result:
```
nametypedefault_typedefault_expressioncommentcodec_expressionttl_expression
b Nullable(Float64)
a Nullable(String)
```
)"
},
},
Documentation::Categories{"format", "table-functions"}
};
void registerTableFunctionFormat(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionFormat>({}, TableFunctionFactory::CaseInsensitive);
factory.registerFunction<TableFunctionFormat>({format_table_function_documentation, false}, TableFunctionFactory::CaseInsensitive);
}
}

View File

@ -130,7 +130,7 @@ ColumnsDescription TableFunctionHudi::getActualTableStructure(ContextPtr context
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
return StorageHudi::getTableStructureFromData(configuration, std::nullopt, context);
}
return parseColumnsListFromString(configuration.structure, context);

View File

@ -28,7 +28,7 @@ void registerTableFunctions()
registerTableFunctionS3Cluster(factory);
registerTableFunctionCOS(factory);
registerTableFunctionHudi(factory);
registerTableFunctionDelta(factory);
registerTableFunctionDeltaLake(factory);
registerTableFunctionOSS(factory);
#endif

View File

@ -25,7 +25,7 @@ void registerTableFunctionS3(TableFunctionFactory & factory);
void registerTableFunctionS3Cluster(TableFunctionFactory & factory);
void registerTableFunctionCOS(TableFunctionFactory & factory);
void registerTableFunctionHudi(TableFunctionFactory & factory);
void registerTableFunctionDelta(TableFunctionFactory & factory);
void registerTableFunctionDeltaLake(TableFunctionFactory & factory);
void registerTableFunctionOSS(TableFunctionFactory & factory);
#endif

View File

@ -1,13 +0,0 @@
FROM public.ecr.aws/lambda/python:3.9
# Install the function's dependencies using file requirements.txt
# from your project folder.
COPY requirements.txt .
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"
# Copy function code
COPY app.py ${LAMBDA_TASK_ROOT}
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.handler" ]

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
from base64 import b64decode
from collections import namedtuple
from typing import Any, Dict, List
from threading import Thread
@ -19,26 +20,25 @@ NEED_RERUN_OR_CANCELL_WORKFLOWS = {
"BackportPR",
}
# https://docs.github.com/en/rest/reference/actions#cancel-a-workflow-run
#
API_URL = os.getenv("API_URL", "https://api.github.com/repos/ClickHouse/ClickHouse")
MAX_RETRY = 5
DEBUG_INFO = {} # type: Dict[str, Any]
class Worker(Thread):
def __init__(self, request_queue: Queue, ignore_exception: bool = False):
def __init__(
self, request_queue: Queue, token: str, ignore_exception: bool = False
):
Thread.__init__(self)
self.queue = request_queue
self.token = token
self.ignore_exception = ignore_exception
self.response = {} # type: Dict
def run(self):
m = self.queue.get()
try:
self.response = _exec_get_with_retry(m)
self.response = _exec_get_with_retry(m, self.token)
except Exception as e:
if not self.ignore_exception:
raise
@ -98,10 +98,11 @@ def get_token_from_aws():
return get_access_token(encoded_jwt, installation_id)
def _exec_get_with_retry(url):
def _exec_get_with_retry(url: str, token: str) -> dict:
headers = {"Authorization": f"token {token}"}
for i in range(MAX_RETRY):
try:
response = requests.get(url)
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except Exception as ex:
@ -113,23 +114,25 @@ def _exec_get_with_retry(url):
WorkflowDescription = namedtuple(
"WorkflowDescription",
["run_id", "head_sha", "status", "rerun_url", "cancel_url", "conclusion"],
["url", "run_id", "head_sha", "status", "rerun_url", "cancel_url", "conclusion"],
)
def get_workflows_description_for_pull_request(
pull_request_event,
token,
) -> List[WorkflowDescription]:
head_repo = pull_request_event["head"]["repo"]["full_name"]
head_branch = pull_request_event["head"]["ref"]
print("PR", pull_request_event["number"], "has head ref", head_branch)
workflows_data = []
request_url = f"{API_URL}/actions/runs?per_page=100"
repo_url = pull_request_event["base"]["repo"]["url"]
request_url = f"{repo_url}/actions/runs?per_page=100"
# Get all workflows for the current branch
for i in range(1, 11):
workflows = _exec_get_with_retry(
f"{request_url}&event=pull_request&branch={head_branch}&page={i}"
f"{request_url}&event=pull_request&branch={head_branch}&page={i}", token
)
if not workflows["workflow_runs"]:
break
@ -164,6 +167,7 @@ def get_workflows_description_for_pull_request(
):
workflow_descriptions.append(
WorkflowDescription(
url=workflow["url"],
run_id=workflow["id"],
head_sha=workflow["head_sha"],
status=workflow["status"],
@ -176,19 +180,22 @@ def get_workflows_description_for_pull_request(
return workflow_descriptions
def get_workflow_description_fallback(pull_request_event) -> List[WorkflowDescription]:
def get_workflow_description_fallback(
pull_request_event, token
) -> List[WorkflowDescription]:
head_repo = pull_request_event["head"]["repo"]["full_name"]
head_branch = pull_request_event["head"]["ref"]
print("Get last 500 workflows from API to search related there")
# Fallback for a case of an already deleted branch and no workflows received
request_url = f"{API_URL}/actions/runs?per_page=100"
repo_url = pull_request_event["base"]["repo"]["url"]
request_url = f"{repo_url}/actions/runs?per_page=100"
q = Queue() # type: Queue
workers = []
workflows_data = []
i = 1
for i in range(1, 6):
q.put(f"{request_url}&page={i}")
worker = Worker(q, True)
worker = Worker(q, token, True)
worker.start()
workers.append(worker)
@ -220,6 +227,7 @@ def get_workflow_description_fallback(pull_request_event) -> List[WorkflowDescri
workflow_descriptions = [
WorkflowDescription(
url=wf["url"],
run_id=wf["id"],
head_sha=wf["head_sha"],
status=wf["status"],
@ -233,9 +241,10 @@ def get_workflow_description_fallback(pull_request_event) -> List[WorkflowDescri
return workflow_descriptions
def get_workflow_description(workflow_id) -> WorkflowDescription:
workflow = _exec_get_with_retry(API_URL + f"/actions/runs/{workflow_id}")
def get_workflow_description(workflow_url, token) -> WorkflowDescription:
workflow = _exec_get_with_retry(workflow_url, token)
return WorkflowDescription(
url=workflow["url"],
run_id=workflow["id"],
head_sha=workflow["head_sha"],
status=workflow["status"],
@ -268,8 +277,11 @@ def exec_workflow_url(urls_to_cancel, token):
def main(event):
token = get_token_from_aws()
DEBUG_INFO["event_body"] = event["body"]
event_data = json.loads(event["body"])
DEBUG_INFO["event"] = event
if event["isBase64Encoded"]:
event_data = json.loads(b64decode(event["body"]))
else:
event_data = json.loads(event["body"])
print("Got event for PR", event_data["number"])
action = event_data["action"]
@ -279,9 +291,12 @@ def main(event):
print("PR has labels", labels)
if action == "closed" or "do not test" in labels:
print("PR merged/closed or manually labeled 'do not test' will kill workflows")
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
workflow_descriptions = get_workflows_description_for_pull_request(
pull_request, token
)
workflow_descriptions = (
workflow_descriptions or get_workflow_description_fallback(pull_request)
workflow_descriptions
or get_workflow_description_fallback(pull_request, token)
)
urls_to_cancel = []
for workflow_description in workflow_descriptions:
@ -294,9 +309,12 @@ def main(event):
exec_workflow_url(urls_to_cancel, token)
elif action == "synchronize":
print("PR is synchronized, going to stop old actions")
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
workflow_descriptions = get_workflows_description_for_pull_request(
pull_request, token
)
workflow_descriptions = (
workflow_descriptions or get_workflow_description_fallback(pull_request)
workflow_descriptions
or get_workflow_description_fallback(pull_request, token)
)
urls_to_cancel = []
for workflow_description in workflow_descriptions:
@ -308,11 +326,14 @@ def main(event):
urls_to_cancel.append(workflow_description.cancel_url)
print(f"Found {len(urls_to_cancel)} workflows to cancel")
exec_workflow_url(urls_to_cancel, token)
elif action == "labeled" and "can be tested" in labels:
elif action == "labeled" and event_data["label"]["name"] == "can be tested":
print("PR marked with can be tested label, rerun workflow")
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
workflow_descriptions = get_workflows_description_for_pull_request(
pull_request, token
)
workflow_descriptions = (
workflow_descriptions or get_workflow_description_fallback(pull_request)
workflow_descriptions
or get_workflow_description_fallback(pull_request, token)
)
if not workflow_descriptions:
print("Not found any workflows")
@ -330,7 +351,10 @@ def main(event):
print("Cancelled")
for _ in range(45):
latest_workflow_desc = get_workflow_description(most_recent_workflow.run_id)
# If the number of retries is changed: tune the lambda limits accordingly
latest_workflow_desc = get_workflow_description(
most_recent_workflow.url, token
)
print("Checking latest workflow", latest_workflow_desc)
if latest_workflow_desc.status in ("completed", "cancelled"):
print("Finally latest workflow done, going to rerun")
@ -347,6 +371,12 @@ def main(event):
def handler(event, _):
try:
main(event)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": '{"status": "OK"}',
}
finally:
for name, value in DEBUG_INFO.items():
print(f"Value of {name}: ", value)

View File

@ -0,0 +1 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -1,4 +1,6 @@
#!/usr/bin/env python3
import logging
import boto3 # type: ignore
from github import Github # type: ignore
@ -9,14 +11,30 @@ def get_parameter_from_ssm(name, decrypt=True, client=None):
return client.get_parameter(Name=name, WithDecryption=decrypt)["Parameter"]["Value"]
def get_best_robot_token(token_prefix_env_name="github_robot_token_", total_tokens=4):
def get_best_robot_token(token_prefix_env_name="github_robot_token_"):
client = boto3.client("ssm", region_name="us-east-1")
tokens = {}
for i in range(1, total_tokens + 1):
token_name = token_prefix_env_name + str(i)
token = get_parameter_from_ssm(token_name, True, client)
gh = Github(token, per_page=100)
rest, _ = gh.rate_limiting
tokens[token] = rest
parameters = client.describe_parameters(
ParameterFilters=[
{"Key": "Name", "Option": "BeginsWith", "Values": [token_prefix_env_name]}
]
)["Parameters"]
assert parameters
token = {"login": "", "value": "", "rest": 0}
return max(tokens.items(), key=lambda x: x[1])[0]
for token_name in [p["Name"] for p in parameters]:
value = get_parameter_from_ssm(token_name, True, client)
gh = Github(value, per_page=100)
# Do not spend additional request to API by accessin user.login unless
# the token is chosen by the remaining requests number
user = gh.get_user()
rest, _ = gh.rate_limiting
logging.info("Get token with %s remaining requests", rest)
if token["rest"] < rest:
token = {"user": user, "value": value, "rest": rest}
assert token["value"]
logging.info(
"User %s with %s remaining requests is used", token["user"].login, token["rest"]
)
return token["value"]

View File

@ -491,6 +491,12 @@ def main(event):
def handler(event, _):
try:
main(event)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": '{"status": "OK"}',
}
except Exception:
print("Received event: ", event)
raise

View File

@ -249,7 +249,7 @@
"cosh"
"basename"
"evalMLMethod"
"filesystemFree"
"filesystemUnreserved"
"filesystemCapacity"
"reinterpretAsDate"
"filesystemAvailable"

View File

@ -39,3 +39,15 @@ def wait_until_quorum_lost(cluster, node, port=9181):
def wait_nodes(cluster, nodes):
for node in nodes:
wait_until_connected(cluster, node)
def is_leader(cluster, node, port=9181):
stat = send_4lw_cmd(cluster, node, "stat", port)
return "Mode: leader" in stat
def get_leader(cluster, nodes):
for node in nodes:
if is_leader(cluster, node):
return node
raise Exception("No leader in Keeper cluster.")

View File

@ -13,7 +13,11 @@ upstream = cluster.add_instance("upstream")
backward = cluster.add_instance(
"backward",
image="clickhouse/clickhouse-server",
tag="22.9",
# Note that a bug changed the string representation of several aggregations in 22.9 and 22.10 and some minor
# releases of 22.8, 22.7 and 22.3
# See https://github.com/ClickHouse/ClickHouse/issues/42916
# Affected at least: singleValueOrNull, last_value, min, max, any, anyLast, anyHeavy, first_value, argMin, argMax
tag="22.6",
with_installed_binary=True,
)
@ -139,6 +143,9 @@ def test_string_functions(start_cluster):
"substring",
"CAST",
# NOTE: no need to ignore now()/now64() since they will fail because they don't accept any argument
# 22.8 Backward Incompatible Change: Extended range of Date32
"toDate32OrZero",
"toDate32OrDefault",
]
functions = filter(lambda x: x not in excludes, functions)
@ -149,14 +156,15 @@ def test_string_functions(start_cluster):
failed = 0
passed = 0
def get_function_value(node, function_name, value="foo"):
def get_function_value(node, function_name, value):
return node.query(f"select {function_name}('{value}')").strip()
v = "foo"
for function in functions:
logging.info("Checking %s", function)
logging.info("Checking %s('%s')", function, v)
try:
backward_value = get_function_value(backward, function)
backward_value = get_function_value(backward, function, v)
except QueryRuntimeException as e:
error_message = str(e)
allowed_errors = [
@ -199,11 +207,12 @@ def test_string_functions(start_cluster):
failed += 1
continue
upstream_value = get_function_value(upstream, function)
upstream_value = get_function_value(upstream, function, v)
if upstream_value != backward_value:
logging.info(
"Failed %s, %s (backward) != %s (upstream)",
logging.warning(
"Failed %s('%s') %s (backward) != %s (upstream)",
function,
v,
backward_value,
upstream_value,
)

View File

@ -12,7 +12,7 @@
</disk_memory>
<disk_hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/data/</endpoint>
<endpoint>hdfs://hdfs1:9000/</endpoint>
</disk_hdfs>
<disk_encrypted>
<type>encrypted</type>

View File

@ -22,6 +22,7 @@ def cluster():
with_hdfs=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()

View File

@ -1,3 +1,4 @@
<clickhouse>
<max_server_memory_usage>2000000000</max_server_memory_usage>
<allow_use_jemalloc_memory>false</allow_use_jemalloc_memory>
</clickhouse>

View File

@ -33,7 +33,7 @@
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>false</can_become_leader>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>

View File

@ -33,7 +33,7 @@
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>false</can_become_leader>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>

View File

@ -33,7 +33,7 @@
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>false</can_become_leader>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>

View File

@ -148,10 +148,11 @@ def test_cmd_mntr(started_cluster):
wait_nodes()
clear_znodes()
leader = keeper_utils.get_leader(cluster, [node1, node2, node3])
# reset stat first
reset_node_stats(node1)
reset_node_stats(leader)
zk = get_fake_zk(node1.name, timeout=30.0)
zk = get_fake_zk(leader.name, timeout=30.0)
do_some_action(
zk,
create_cnt=10,
@ -162,7 +163,7 @@ def test_cmd_mntr(started_cluster):
delete_cnt=2,
)
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="mntr")
data = keeper_utils.send_4lw_cmd(cluster, leader, cmd="mntr")
# print(data.decode())
reader = csv.reader(data.split("\n"), delimiter="\t")
@ -307,12 +308,13 @@ def test_cmd_srvr(started_cluster):
wait_nodes()
clear_znodes()
reset_node_stats(node1)
leader = keeper_utils.get_leader(cluster, [node1, node2, node3])
reset_node_stats(leader)
zk = get_fake_zk(node1.name, timeout=30.0)
zk = get_fake_zk(leader.name, timeout=30.0)
do_some_action(zk, create_cnt=10)
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="srvr")
data = keeper_utils.send_4lw_cmd(cluster, leader, cmd="srvr")
print("srvr output -------------------------------------")
print(data)
@ -329,7 +331,7 @@ def test_cmd_srvr(started_cluster):
assert result["Received"] == "10"
assert result["Sent"] == "10"
assert int(result["Connections"]) == 1
assert int(result["Zxid"]) > 14
assert int(result["Zxid"]) > 10
assert result["Mode"] == "leader"
assert result["Node count"] == "13"
@ -342,13 +344,15 @@ def test_cmd_stat(started_cluster):
try:
wait_nodes()
clear_znodes()
reset_node_stats(node1)
reset_conn_stats(node1)
zk = get_fake_zk(node1.name, timeout=30.0)
leader = keeper_utils.get_leader(cluster, [node1, node2, node3])
reset_node_stats(leader)
reset_conn_stats(leader)
zk = get_fake_zk(leader.name, timeout=30.0)
do_some_action(zk, create_cnt=10)
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="stat")
data = keeper_utils.send_4lw_cmd(cluster, leader, cmd="stat")
print("stat output -------------------------------------")
print(data)
@ -604,6 +608,10 @@ def test_cmd_csnp(started_cluster):
wait_nodes()
zk = get_fake_zk(node1.name, timeout=30.0)
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="csnp")
print("csnp output -------------------------------------")
print(data)
try:
int(data)
assert True
@ -623,7 +631,10 @@ def test_cmd_lgif(started_cluster):
do_some_action(zk, create_cnt=100)
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="lgif")
print("lgif output -------------------------------------")
print(data)
reader = csv.reader(data.split("\n"), delimiter="\t")
result = {}
@ -641,3 +652,28 @@ def test_cmd_lgif(started_cluster):
assert int(result["last_snapshot_idx"]) >= 1
finally:
destroy_zk_client(zk)
def test_cmd_rqld(started_cluster):
wait_nodes()
# node2 can not be leader
for node in [node1, node3]:
data = keeper_utils.send_4lw_cmd(cluster, node, cmd="rqld")
assert data == "Sent leadership request to leader."
print("rqld output -------------------------------------")
print(data)
if not keeper_utils.is_leader(cluster, node):
# pull wait to become leader
retry = 0
# TODO not a restrict way
while not keeper_utils.is_leader(cluster, node) and retry < 30:
time.sleep(1)
retry += 1
if retry == 30:
print(
node.name
+ " does not become leader after 30s, maybe there is something wrong."
)
assert keeper_utils.is_leader(cluster, node)

View File

@ -4,6 +4,8 @@
<hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse/</endpoint>
<!-- FIXME: chicken and egg problem with current cluster.py -->
<skip_access_check>true</skip_access_check>
</hdfs>
</disks>
</storage_configuration>

View File

@ -4,6 +4,8 @@
<hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse/</endpoint>
<!-- FIXME: chicken and egg problem with current cluster.py -->
<skip_access_check>true</skip_access_check>
</hdfs>
<hdd>
<type>local</type>

View File

@ -84,6 +84,8 @@ def test_restart_zookeeper(start_cluster):
time.sleep(5)
for table_id in range(NUM_TABLES):
node1.query(
f"INSERT INTO test_table_{table_id} VALUES (6), (7), (8), (9), (10);"
node1.query_with_retry(
sql=f"INSERT INTO test_table_{table_id} VALUES (6), (7), (8), (9), (10);",
retry_count=10,
sleep_time=1,
)

View File

@ -4,14 +4,20 @@
<hdfs1>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse1/</endpoint>
<!-- FIXME: chicken and egg problem with current cluster.py -->
<skip_access_check>true</skip_access_check>
</hdfs1>
<hdfs1_again>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse1/</endpoint>
<!-- FIXME: chicken and egg problem with current cluster.py -->
<skip_access_check>true</skip_access_check>
</hdfs1_again>
<hdfs2>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse2/</endpoint>
<!-- FIXME: chicken and egg problem with current cluster.py -->
<skip_access_check>true</skip_access_check>
</hdfs2>
</disks>
<policies>

View File

@ -1,7 +1,6 @@
import logging
import os
import json
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster
@ -143,3 +142,25 @@ def test_select_query(started_cluster):
),
).splitlines()
assert len(result) > 0
def test_describe_query(started_cluster):
instance = started_cluster.instances["main_server"]
bucket = started_cluster.minio_bucket
result = instance.query(
f"DESCRIBE deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
)
assert result == TSV(
[
["begin_lat", "Nullable(Float64)"],
["begin_lon", "Nullable(Float64)"],
["driver", "Nullable(String)"],
["end_lat", "Nullable(Float64)"],
["end_lon", "Nullable(Float64)"],
["fare", "Nullable(Float64)"],
["rider", "Nullable(String)"],
["ts", "Nullable(Int64)"],
["uuid", "Nullable(String)"],
]
)

View File

@ -0,0 +1,18 @@
<clickhouse>
<remote_servers>
<cluster_non_existent_port>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node1</host>
<port>19000</port>
</replica>
</shard>
</cluster_non_existent_port>
</remote_servers>
</clickhouse>

View File

@ -9,7 +9,11 @@ from pyhdfs import HdfsClient
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/macro.xml", "configs/schema_cache.xml"],
main_configs=[
"configs/macro.xml",
"configs/schema_cache.xml",
"configs/cluster.xml",
],
with_hdfs=True,
)
@ -783,6 +787,32 @@ def test_schema_inference_cache(started_cluster):
check_cache_misses(node1, files, 4)
def test_hdfsCluster_skip_unavailable_shards(started_cluster):
hdfs_api = started_cluster.hdfs_api
node = started_cluster.instances["node1"]
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
hdfs_api.write_data("/skip_unavailable_shards", data)
assert (
node1.query(
"select * from hdfsCluster('cluster_non_existent_port', 'hdfs://hdfs1:9000/skip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64') settings skip_unavailable_shards = 1"
)
== data
)
def test_hdfsCluster_unskip_unavailable_shards(started_cluster):
hdfs_api = started_cluster.hdfs_api
node = started_cluster.instances["node1"]
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
hdfs_api.write_data("/unskip_unavailable_shards", data)
error = node.query_and_get_error(
"select * from hdfsCluster('cluster_non_existent_port', 'hdfs://hdfs1:9000/unskip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64')"
)
assert "NETWORK_ERROR" in error
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -161,7 +161,7 @@ def test_select_query(started_cluster):
result = run_query(instance, distinct_select_query)
result_table_function = run_query(
instance,
distinct_select_query.format(
distinct_select_table_function_query.format(
ip=started_cluster.minio_ip, port=started_cluster.minio_port, bucket=bucket
),
)
@ -173,3 +173,31 @@ def test_select_query(started_cluster):
assert TSV(result) == TSV(expected)
assert TSV(result_table_function) == TSV(expected)
def test_describe_query(started_cluster):
instance = started_cluster.instances["main_server"]
bucket = started_cluster.minio_bucket
result = instance.query(
f"DESCRIBE hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
)
assert result == TSV(
[
["_hoodie_commit_time", "Nullable(String)"],
["_hoodie_commit_seqno", "Nullable(String)"],
["_hoodie_record_key", "Nullable(String)"],
["_hoodie_partition_path", "Nullable(String)"],
["_hoodie_file_name", "Nullable(String)"],
["begin_lat", "Nullable(Float64)"],
["begin_lon", "Nullable(Float64)"],
["driver", "Nullable(String)"],
["end_lat", "Nullable(Float64)"],
["end_lon", "Nullable(Float64)"],
["fare", "Nullable(Float64)"],
["partitionpath", "Nullable(String)"],
["rider", "Nullable(String)"],
["ts", "Nullable(Int64)"],
["uuid", "Nullable(String)"],
]
)

View File

@ -1 +1 @@
SELECT filesystemCapacity() >= filesystemFree() AND filesystemFree() >= filesystemAvailable() AND filesystemAvailable() >= 0;
SELECT filesystemCapacity() >= filesystemAvailable() AND filesystemAvailable() >= 0 AND filesystemUnreserved() >= 0;

View File

@ -1,11 +1,14 @@
1
2
3
3.1
4
5
5.1
6
7
7.1
7.2
8
9
text_log non empty

View File

@ -37,12 +37,20 @@ rm -f "$tmp_file" >/dev/null 2>&1
echo 3
# failure at before query start
$CLICKHOUSE_CLIENT \
--query="SELECT 'find_me_TOPSECRET=TOPSECRET' FROM non_existing_table FORMAT Null" \
--query="SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" \
--log_queries=1 --ignore-error --multiquery |& grep -v '^(query: ' > "$tmp_file"
grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3a'
grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3b'
echo '3.1'
echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @- >"$tmp_file" 2>&1
grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3.1a'
grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3.1b'
#echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | curl -sSg http://172.17.0.3:8123/ -d @-
rm -f "$tmp_file" >/dev/null 2>&1
echo 4
# failure at the end of query
@ -100,6 +108,21 @@ $CLICKHOUSE_CLIENT \
--server_logs_file=/dev/null \
--query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and query like '%TOPSECRET%';"
echo '7.1'
# query_log exceptions
$CLICKHOUSE_CLIENT \
--server_logs_file=/dev/null \
--query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and exception like '%TOPSECRET%'"
echo '7.2'
# not perfect: when run in parallel with other tests that check can give false-negative result
# because other tests can overwrite the last_error_message, where we check the absence of sensitive data.
# But it's still good enough for CI - in case of regressions it will start flapping (normally it shouldn't)
$CLICKHOUSE_CLIENT \
--server_logs_file=/dev/null \
--query="select * from system.errors where last_error_message like '%TOPSECRET%';"
rm -f "$tmp_file" >/dev/null 2>&1
echo 8

View File

@ -21,5 +21,5 @@ SYSTEM FLUSH LOGS;
SELECT read_rows < 110000 FROM system.query_log
WHERE type = 'QueryFinish' AND current_database = currentDatabase()
AND event_time > now() - INTERVAL 10 SECOND
AND event_date >= yesterday()
AND lower(query) LIKE lower('SELECT s FROM order_by_desc ORDER BY u%');

View File

@ -5,4 +5,4 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CUR_DIR"/../shell_config.sh
# Checks that these functions are working inside clickhouse-local. Does not check specific values.
$CLICKHOUSE_LOCAL --query "SELECT filesystemAvailable() > 0, filesystemFree() <= filesystemCapacity()"
$CLICKHOUSE_LOCAL --query "SELECT filesystemAvailable() > 0, filesystemUnreserved() <= filesystemCapacity()"

View File

@ -3,7 +3,6 @@ clusterAllReplicas
dictionary
executable
file
format
generateRandom
input
jdbc

View File

@ -322,7 +322,7 @@ farmHash64
file
filesystemAvailable
filesystemCapacity
filesystemFree
filesystemUnreserved
finalizeAggregation
firstSignificantSubdomainCustom
firstSignificantSubdomainCustomRFC

View File

@ -0,0 +1,2 @@
1
1

View File

@ -0,0 +1,6 @@
-- Tags: no-fasttest
select filesystemCapacity('s3_disk') >= filesystemAvailable('s3_disk') and filesystemAvailable('s3_disk') >= filesystemUnreserved('s3_disk');
select filesystemCapacity('default') >= filesystemAvailable('default') and filesystemAvailable('default') >= 0 and filesystemUnreserved('default') >= 0;
select filesystemCapacity('__un_exists_disk'); -- { serverError UNKNOWN_DISK }

View File

@ -0,0 +1,25 @@
1
22.8.5.29 10
22.8.6.71 10
1
22.8.5.29 52
22.8.6.71 52
1
22.8.5.29 0
22.8.6.71 0
46_OK 0123456789012345678901234567890123456789012345
46_KO 0123456789012345678901234567890123456789012345
47_OK 01234567890123456789012345678901234567890123456
47_KO 01234567890123456789012345678901234567890123456
48_OK 012345678901234567890123456789012345678901234567
48_KO 012345678901234567890123456789012345678901234567
63_OK 012345678901234567890123456789012345678901234567890123456789012
63_KO 012345678901234567890123456789012345678901234567890123456789012
64_OK 0123456789012345678901234567890123456789012345678901234567890123
64_KO 0123456789012345678901234567890123456789012345678901234567890123
-1 0
-2 0
-2^31 0
1M without 0 1048576
1M with 0 1048575
fuzz2 0123 4

View File

@ -0,0 +1,109 @@
-- Context: https://github.com/ClickHouse/ClickHouse/issues/42916
-- STRING WITH 10 CHARACTERS
-- SELECT version() AS v, hex(argMaxState('0123456789', number)) AS state FROM numbers(1) FORMAT CSV
CREATE TABLE argmaxstate_hex_small
(
`v` String,
`state` String
)
ENGINE = TinyLog;
INSERT into argmaxstate_hex_small VALUES ('22.8.5.29','0B0000003031323334353637383900010000000000000000'), ('22.8.6.71','0A00000030313233343536373839010000000000000000');
-- Assert that the current version will write the same as 22.8.5 (last known good 22.8 minor)
SELECT
(SELECT hex(argMaxState('0123456789', number)) FROM numbers(1)) = state
FROM argmaxstate_hex_small
WHERE v = '22.8.5.29';
-- Assert that the current version can read correctly both the old and the regression states
SELECT
v,
length(finalizeAggregation(CAST(unhex(state) AS AggregateFunction(argMax, String, UInt64))))
FROM argmaxstate_hex_small;
-- STRING WITH 54 characters
-- SELECT version() AS v, hex(argMaxState('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz', number)) AS state FROM numbers(1) FORMAT CSV
CREATE TABLE argmaxstate_hex_large
(
`v` String,
`state` String
)
ENGINE = TinyLog;
INSERT into argmaxstate_hex_large VALUES ('22.8.5.29','350000004142434445464748494A4B4C4D4E4F505152535455565758595A6162636465666768696A6B6C6D6E6F707172737475767778797A00010000000000000000'), ('22.8.6.71','340000004142434445464748494A4B4C4D4E4F505152535455565758595A6162636465666768696A6B6C6D6E6F707172737475767778797A010000000000000000');
SELECT
(SELECT hex(argMaxState('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz', number)) FROM numbers(1)) = state
FROM argmaxstate_hex_large
WHERE v = '22.8.5.29';
SELECT
v,
length(finalizeAggregation(CAST(unhex(state) AS AggregateFunction(argMax, String, UInt64))))
FROM argmaxstate_hex_large;
-- STRING WITH 0 characters
-- SELECT version() AS v, hex(argMaxState('', number)) AS state FROM numbers(1) FORMAT CSV
CREATE TABLE argmaxstate_hex_empty
(
`v` String,
`state` String
)
ENGINE = TinyLog;
INSERT into argmaxstate_hex_empty VALUES ('22.8.5.29','0100000000010000000000000000'), ('22.8.6.71','00000000010000000000000000');
SELECT
(SELECT hex(argMaxState('', number)) FROM numbers(1)) = state
FROM argmaxstate_hex_empty
WHERE v = '22.8.5.29';
SELECT v, length(finalizeAggregation(CAST(unhex(state) AS AggregateFunction(argMax, String, UInt64))))
FROM argmaxstate_hex_empty;
-- Right in the border of small and large buffers
-- SELECT hex(argMaxState('0123456789012345678901234567890123456789012345' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV
SELECT '46_OK', finalizeAggregation(CAST(unhex('2F0000003031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343500010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
SELECT '46_KO', finalizeAggregation(CAST(unhex('2E00000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
-- SELECT hex(argMaxState('01234567890123456789012345678901234567890123456' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV
SELECT '47_OK', finalizeAggregation(CAST(unhex('30000000303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
SELECT '47_KO', finalizeAggregation(CAST(unhex('2F0000003031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
-- SELECT hex(argMaxState('012345678901234567890123456789012345678901234567' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV
SELECT '48_OK', finalizeAggregation(CAST(unhex('3100000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363700010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
SELECT '48_KO', finalizeAggregation(CAST(unhex('30000000303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
-- Right in the allocation limit (power of 2)
-- SELECT hex(argMaxState('012345678901234567890123456789012345678901234567890123456789012' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV
SELECT '63_OK', finalizeAggregation(CAST(unhex('4000000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313200010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
SELECT '63_KO', finalizeAggregation(CAST(unhex('3F000000303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
-- SELECT hex(argMaxState('0123456789012345678901234567890123456789012345678901234567890123' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV
SELECT '64_OK', finalizeAggregation(CAST(unhex('410000003031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323300010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
SELECT '64_KO', finalizeAggregation(CAST(unhex('4000000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
SELECT '-1', maxMerge(x), length(maxMerge(x)) from (select CAST(unhex('ffffffff') || randomString(100500), 'AggregateFunction(max, String)') as x);
SELECT '-2', maxMerge(x), length(maxMerge(x)) from (select CAST(unhex('fffffffe') || randomString(100500), 'AggregateFunction(max, String)') as x);
SELECT '-2^31', maxMerge(x), length(maxMerge(x)) from (select CAST(unhex('00000080') || randomString(100500), 'AggregateFunction(max, String)') as x);
SELECT '2^31-2', maxMerge(x) from (select CAST(unhex('feffff7f') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE }
SELECT '2^31-1', maxMerge(x) from (select CAST(unhex('ffffff7f') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE }
SELECT '2^30', maxMerge(x) from (select CAST(unhex('00000040') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE }
SELECT '2^30+1', maxMerge(x) from (select CAST(unhex('01000040') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE }
SELECT '2^30-1', maxMerge(x) from (select CAST(unhex('ffffff3f') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError CANNOT_READ_ALL_DATA }
-- The following query works, but it's too long and consumes to much memory
-- SELECT '2^30-1', length(maxMerge(x)) from (select CAST(unhex('ffffff3f') || randomString(0x3FFFFFFF - 1) || 'x', 'AggregateFunction(max, String)') as x);
SELECT '1M without 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || 'x', 'AggregateFunction(max, String)') as x);
SELECT '1M with 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || '\0', 'AggregateFunction(max, String)') as x);
SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz2', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '01' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x);
SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError CORRUPTED_DATA }

Some files were not shown because too many files have changed in this diff Show More