mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge branch 'master' into tavplubix-patch-5
This commit is contained in:
commit
91b2cdb94a
@ -1148,6 +1148,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
total_memory_tracker.setDescription("(total)");
|
||||
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
|
||||
|
||||
bool allow_use_jemalloc_memory = config->getBool("allow_use_jemalloc_memory", true);
|
||||
total_memory_tracker.setAllowUseJemallocMemory(allow_use_jemalloc_memory);
|
||||
|
||||
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
|
||||
total_memory_tracker.setOvercommitTracker(global_overcommit_tracker);
|
||||
|
||||
|
@ -13,6 +13,7 @@ struct Settings;
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int CORRUPTED_DATA;
|
||||
}
|
||||
|
||||
|
||||
@ -89,6 +90,13 @@ public:
|
||||
{
|
||||
this->data(place).result.read(buf, *serialization_res, arena);
|
||||
this->data(place).value.read(buf, *serialization_val, arena);
|
||||
if (unlikely(this->data(place).value.has() != this->data(place).result.has()))
|
||||
throw Exception(
|
||||
ErrorCodes::CORRUPTED_DATA,
|
||||
"Invalid state of the aggregate function {}: has_value ({}) != has_result ({})",
|
||||
getName(),
|
||||
this->data(place).value.has(),
|
||||
this->data(place).result.has());
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override
|
||||
|
@ -448,6 +448,34 @@ public:
|
||||
|
||||
};
|
||||
|
||||
struct Compatibility
|
||||
{
|
||||
/// Old versions used to store terminating null-character in SingleValueDataString.
|
||||
/// Then -WithTerminatingZero methods were removed from IColumn interface,
|
||||
/// because these methods are quite dangerous and easy to misuse. It introduced incompatibility.
|
||||
/// See https://github.com/ClickHouse/ClickHouse/pull/41431 and https://github.com/ClickHouse/ClickHouse/issues/42916
|
||||
/// Here we keep these functions for compatibility.
|
||||
/// It's safe because there's no way unsanitized user input (without \0 at the end) can reach these functions.
|
||||
|
||||
static StringRef getDataAtWithTerminatingZero(const ColumnString & column, size_t n)
|
||||
{
|
||||
auto res = column.getDataAt(n);
|
||||
/// ColumnString always reserves extra byte for null-character after string.
|
||||
/// But getDataAt returns StringRef without the null-character. Let's add it.
|
||||
chassert(res.data[res.size] == '\0');
|
||||
++res.size;
|
||||
return res;
|
||||
}
|
||||
|
||||
static void insertDataWithTerminatingZero(ColumnString & column, const char * pos, size_t length)
|
||||
{
|
||||
/// String already has terminating null-character.
|
||||
/// But insertData will add another one unconditionally. Trim existing null-character to avoid duplication.
|
||||
chassert(0 < length);
|
||||
chassert(pos[length - 1] == '\0');
|
||||
column.insertData(pos, length - 1);
|
||||
}
|
||||
};
|
||||
|
||||
/** For strings. Short strings are stored in the object itself, and long strings are allocated separately.
|
||||
* NOTE It could also be suitable for arrays of numbers.
|
||||
@ -477,20 +505,31 @@ public:
|
||||
return size >= 0;
|
||||
}
|
||||
|
||||
const char * getData() const
|
||||
private:
|
||||
char * getDataMutable()
|
||||
{
|
||||
return size <= MAX_SMALL_STRING_SIZE ? small_data : large_data;
|
||||
}
|
||||
|
||||
const char * getData() const
|
||||
{
|
||||
const char * data_ptr = size <= MAX_SMALL_STRING_SIZE ? small_data : large_data;
|
||||
/// It must always be terminated with null-character
|
||||
chassert(0 < size);
|
||||
chassert(data_ptr[size - 1] == '\0');
|
||||
return data_ptr;
|
||||
}
|
||||
|
||||
StringRef getStringRef() const
|
||||
{
|
||||
return StringRef(getData(), size);
|
||||
}
|
||||
|
||||
public:
|
||||
void insertResultInto(IColumn & to) const
|
||||
{
|
||||
if (has())
|
||||
assert_cast<ColumnString &>(to).insertData(getData(), size);
|
||||
Compatibility::insertDataWithTerminatingZero(assert_cast<ColumnString &>(to), getData(), size);
|
||||
else
|
||||
assert_cast<ColumnString &>(to).insertDefault();
|
||||
}
|
||||
@ -502,44 +541,76 @@ public:
|
||||
buf.write(getData(), size);
|
||||
}
|
||||
|
||||
void allocateLargeDataIfNeeded(Int64 size_to_reserve, Arena * arena)
|
||||
{
|
||||
if (capacity < size_to_reserve)
|
||||
{
|
||||
capacity = static_cast<Int32>(roundUpToPowerOfTwoOrZero(size_to_reserve));
|
||||
/// It might happen if the size was too big and the rounded value does not fit a size_t
|
||||
if (unlikely(capacity < size_to_reserve))
|
||||
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({})", size_to_reserve);
|
||||
|
||||
/// Don't free large_data here.
|
||||
large_data = arena->alloc(capacity);
|
||||
}
|
||||
}
|
||||
|
||||
void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena * arena)
|
||||
{
|
||||
Int32 rhs_size;
|
||||
readBinary(rhs_size, buf);
|
||||
|
||||
if (rhs_size >= 0)
|
||||
{
|
||||
if (rhs_size <= MAX_SMALL_STRING_SIZE)
|
||||
{
|
||||
/// Don't free large_data here.
|
||||
|
||||
size = rhs_size;
|
||||
|
||||
if (size > 0)
|
||||
buf.readStrict(small_data, size);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (capacity < rhs_size)
|
||||
{
|
||||
capacity = static_cast<Int32>(roundUpToPowerOfTwoOrZero(rhs_size));
|
||||
/// It might happen if the size was too big and the rounded value does not fit a size_t
|
||||
if (unlikely(capacity < rhs_size))
|
||||
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({})", rhs_size);
|
||||
|
||||
/// Don't free large_data here.
|
||||
large_data = arena->alloc(capacity);
|
||||
}
|
||||
|
||||
size = rhs_size;
|
||||
buf.readStrict(large_data, size);
|
||||
}
|
||||
}
|
||||
else
|
||||
if (rhs_size < 0)
|
||||
{
|
||||
/// Don't free large_data here.
|
||||
size = rhs_size;
|
||||
return;
|
||||
}
|
||||
|
||||
if (rhs_size <= MAX_SMALL_STRING_SIZE)
|
||||
{
|
||||
/// Don't free large_data here.
|
||||
|
||||
size = rhs_size;
|
||||
|
||||
if (size > 0)
|
||||
buf.readStrict(small_data, size);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Reserve one byte more for null-character
|
||||
Int64 rhs_size_to_reserve = rhs_size;
|
||||
rhs_size_to_reserve += 1; /// Avoid overflow
|
||||
allocateLargeDataIfNeeded(rhs_size_to_reserve, arena);
|
||||
size = rhs_size;
|
||||
buf.readStrict(large_data, size);
|
||||
}
|
||||
|
||||
/// Check if the string we read is null-terminated (getDataMutable does not have the assertion)
|
||||
if (0 < size && getDataMutable()[size - 1] == '\0')
|
||||
return;
|
||||
|
||||
/// It's not null-terminated, but it must be (for historical reasons). There are two variants:
|
||||
/// - The value was serialized by one of the incompatible versions of ClickHouse. We had some range of versions
|
||||
/// that used to serialize SingleValueDataString without terminating '\0'. Let's just append it.
|
||||
/// - An attacker sent crafted data. Sanitize it and append '\0'.
|
||||
/// In all other cases the string must be already null-terminated.
|
||||
|
||||
/// NOTE We cannot add '\0' unconditionally, because it will be duplicated.
|
||||
/// NOTE It's possible that a string that actually ends with '\0' was written by one of the incompatible versions.
|
||||
/// Unfortunately, we cannot distinguish it from normal string written by normal version.
|
||||
/// So such strings will be trimmed.
|
||||
|
||||
if (size == MAX_SMALL_STRING_SIZE)
|
||||
{
|
||||
/// Special case: We have to move value to large_data
|
||||
allocateLargeDataIfNeeded(size + 1, arena);
|
||||
memcpy(large_data, small_data, size);
|
||||
}
|
||||
|
||||
/// We have enough space to append
|
||||
++size;
|
||||
getDataMutable()[size - 1] = '\0';
|
||||
}
|
||||
|
||||
/// Assuming to.has()
|
||||
@ -557,13 +628,7 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
if (capacity < value_size)
|
||||
{
|
||||
/// Don't free large_data here.
|
||||
capacity = static_cast<Int32>(roundUpToPowerOfTwoOrZero(value_size));
|
||||
large_data = arena->alloc(capacity);
|
||||
}
|
||||
|
||||
allocateLargeDataIfNeeded(value_size, arena);
|
||||
size = value_size;
|
||||
memcpy(large_data, value.data, size);
|
||||
}
|
||||
@ -571,7 +636,7 @@ public:
|
||||
|
||||
void change(const IColumn & column, size_t row_num, Arena * arena)
|
||||
{
|
||||
changeImpl(assert_cast<const ColumnString &>(column).getDataAt(row_num), arena);
|
||||
changeImpl(Compatibility::getDataAtWithTerminatingZero(assert_cast<const ColumnString &>(column), row_num), arena);
|
||||
}
|
||||
|
||||
void change(const Self & to, Arena * arena)
|
||||
@ -620,7 +685,7 @@ public:
|
||||
|
||||
bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena)
|
||||
{
|
||||
if (!has() || assert_cast<const ColumnString &>(column).getDataAt(row_num) < getStringRef())
|
||||
if (!has() || Compatibility::getDataAtWithTerminatingZero(assert_cast<const ColumnString &>(column), row_num) < getStringRef())
|
||||
{
|
||||
change(column, row_num, arena);
|
||||
return true;
|
||||
@ -642,7 +707,7 @@ public:
|
||||
|
||||
bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena)
|
||||
{
|
||||
if (!has() || assert_cast<const ColumnString &>(column).getDataAt(row_num) > getStringRef())
|
||||
if (!has() || Compatibility::getDataAtWithTerminatingZero(assert_cast<const ColumnString &>(column), row_num) > getStringRef())
|
||||
{
|
||||
change(column, row_num, arena);
|
||||
return true;
|
||||
@ -669,7 +734,7 @@ public:
|
||||
|
||||
bool isEqualTo(const IColumn & column, size_t row_num) const
|
||||
{
|
||||
return has() && assert_cast<const ColumnString &>(column).getDataAt(row_num) == getStringRef();
|
||||
return has() && Compatibility::getDataAtWithTerminatingZero(assert_cast<const ColumnString &>(column), row_num) == getStringRef();
|
||||
}
|
||||
|
||||
static bool allocatesMemoryInArena()
|
||||
|
@ -27,6 +27,14 @@ public:
|
||||
/// NOTE: Adding events into distant past (further than `period`) must be avoided.
|
||||
void add(double now, double count)
|
||||
{
|
||||
// Remove data for initial heating stage that can present at the beginning of a query.
|
||||
// Otherwise it leads to wrong gradual increase of average value, turning algorithm into not very reactive.
|
||||
if (count != 0.0 && ++data_points < 5)
|
||||
{
|
||||
start = events.time;
|
||||
events = ExponentiallySmoothedAverage();
|
||||
}
|
||||
|
||||
if (now - period <= start) // precise counting mode
|
||||
events = ExponentiallySmoothedAverage(events.value + count, now);
|
||||
else // exponential smoothing mode
|
||||
@ -51,6 +59,7 @@ public:
|
||||
{
|
||||
start = now;
|
||||
events = ExponentiallySmoothedAverage();
|
||||
data_points = 0;
|
||||
}
|
||||
|
||||
private:
|
||||
@ -58,6 +67,7 @@ private:
|
||||
const double half_decay_time;
|
||||
double start; // Instant in past without events before it; when measurement started or reset
|
||||
ExponentiallySmoothedAverage events; // Estimated number of events in the last `period`
|
||||
size_t data_points = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <Common/formatReadable.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/ErrorCodes.h>
|
||||
#include <Common/SensitiveDataMasker.h>
|
||||
#include <Common/LockMemoryExceptionInThread.h>
|
||||
#include <filesystem>
|
||||
|
||||
@ -63,11 +64,18 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool
|
||||
ErrorCodes::increment(code, remote, msg, trace);
|
||||
}
|
||||
|
||||
Exception::Exception(const std::string & msg, int code, bool remote_)
|
||||
: Poco::Exception(msg, code)
|
||||
Exception::MessageMasked::MessageMasked(const std::string & msg_)
|
||||
: msg(msg_)
|
||||
{
|
||||
if (auto * masker = SensitiveDataMasker::getInstance())
|
||||
masker->wipeSensitiveData(msg);
|
||||
}
|
||||
|
||||
Exception::Exception(const MessageMasked & msg_masked, int code, bool remote_)
|
||||
: Poco::Exception(msg_masked.msg, code)
|
||||
, remote(remote_)
|
||||
{
|
||||
handle_error_code(msg, code, remote, getStackFramePointers());
|
||||
handle_error_code(msg_masked.msg, code, remote, getStackFramePointers());
|
||||
}
|
||||
|
||||
Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc)
|
||||
|
@ -27,7 +27,19 @@ public:
|
||||
using FramePointers = std::vector<void *>;
|
||||
|
||||
Exception() = default;
|
||||
Exception(const std::string & msg, int code, bool remote_ = false);
|
||||
|
||||
// used to remove the sensitive information from exceptions if query_masking_rules is configured
|
||||
struct MessageMasked
|
||||
{
|
||||
std::string msg;
|
||||
MessageMasked(const std::string & msg_);
|
||||
};
|
||||
|
||||
Exception(const MessageMasked & msg_masked, int code, bool remote_);
|
||||
|
||||
// delegating constructor to mask sensitive information from the message
|
||||
Exception(const std::string & msg, int code, bool remote_ = false): Exception(MessageMasked(msg), code, remote_)
|
||||
{}
|
||||
|
||||
Exception(int code, const std::string & message)
|
||||
: Exception(message, code)
|
||||
@ -54,12 +66,17 @@ public:
|
||||
template <typename... Args>
|
||||
void addMessage(fmt::format_string<Args...> format, Args &&... args)
|
||||
{
|
||||
extendedMessage(fmt::format(format, std::forward<Args>(args)...));
|
||||
addMessage(fmt::format(format, std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
void addMessage(const std::string& message)
|
||||
{
|
||||
extendedMessage(message);
|
||||
addMessage(MessageMasked(message));
|
||||
}
|
||||
|
||||
void addMessage(const MessageMasked & msg_masked)
|
||||
{
|
||||
extendedMessage(msg_masked.msg);
|
||||
}
|
||||
|
||||
/// Used to distinguish local exceptions from the one that was received from remote node.
|
||||
|
@ -220,7 +220,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
|
||||
Int64 limit_to_check = current_hard_limit;
|
||||
|
||||
#if USE_JEMALLOC
|
||||
if (level == VariableContext::Global)
|
||||
if (level == VariableContext::Global && allow_use_jemalloc_memory.load(std::memory_order_relaxed))
|
||||
{
|
||||
/// Jemalloc arenas may keep some extra memory.
|
||||
/// This memory was substucted from RSS to decrease memory drift.
|
||||
|
@ -55,6 +55,7 @@ private:
|
||||
std::atomic<Int64> soft_limit {0};
|
||||
std::atomic<Int64> hard_limit {0};
|
||||
std::atomic<Int64> profiler_limit {0};
|
||||
std::atomic_bool allow_use_jemalloc_memory {true};
|
||||
|
||||
static std::atomic<Int64> free_memory_in_allocator_arenas;
|
||||
|
||||
@ -125,6 +126,10 @@ public:
|
||||
{
|
||||
return soft_limit.load(std::memory_order_relaxed);
|
||||
}
|
||||
void setAllowUseJemallocMemory(bool value)
|
||||
{
|
||||
allow_use_jemalloc_memory.store(value, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
/** Set limit if it was not set.
|
||||
* Otherwise, set limit to new value, if new value is greater than previous limit.
|
||||
|
@ -90,7 +90,7 @@ private:
|
||||
|
||||
bool write_progress_on_update = false;
|
||||
|
||||
EventRateMeter cpu_usage_meter{static_cast<double>(clock_gettime_ns()), 3'000'000'000 /*ns*/}; // average cpu utilization last 3 second
|
||||
EventRateMeter cpu_usage_meter{static_cast<double>(clock_gettime_ns()), 2'000'000'000 /*ns*/}; // average cpu utilization last 2 second
|
||||
HostToThreadTimesMap thread_data;
|
||||
/// In case of all of the above:
|
||||
/// - clickhouse-local
|
||||
|
@ -42,7 +42,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
||||
if (read_hint.has_value())
|
||||
estimated_size = *read_hint;
|
||||
else if (file_size.has_value())
|
||||
estimated_size = file_size.has_value() ? *file_size : 0;
|
||||
estimated_size = *file_size;
|
||||
|
||||
if (!existing_memory
|
||||
&& settings.local_fs_method == LocalFSReadMethod::mmap
|
||||
@ -158,7 +158,15 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
||||
#endif
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary);
|
||||
return create(settings.local_fs_buffer_size, flags);
|
||||
|
||||
size_t buffer_size = settings.local_fs_buffer_size;
|
||||
/// Check if the buffer can be smaller than default
|
||||
if (read_hint.has_value() && *read_hint > 0 && *read_hint < buffer_size)
|
||||
buffer_size = *read_hint;
|
||||
if (file_size.has_value() && *file_size < buffer_size)
|
||||
buffer_size = *file_size;
|
||||
|
||||
return create(buffer_size, flags);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <optional>
|
||||
#include <ranges>
|
||||
#include <filesystem>
|
||||
|
||||
@ -62,7 +63,7 @@ UnlinkFileOperation::UnlinkFileOperation(const std::string & path_, IDisk & disk
|
||||
|
||||
void UnlinkFileOperation::execute(std::unique_lock<std::shared_mutex> &)
|
||||
{
|
||||
auto buf = disk.readFile(path);
|
||||
auto buf = disk.readFile(path, ReadSettings{}, std::nullopt, disk.getFileSize(path));
|
||||
readStringUntilEOF(prev_data, *buf);
|
||||
disk.removeFile(path);
|
||||
}
|
||||
|
@ -1,31 +1,40 @@
|
||||
#include <Functions/IFunction.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/IFunction.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <filesystem>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int UNKNOWN_DISK;
|
||||
}
|
||||
namespace
|
||||
{
|
||||
|
||||
struct FilesystemAvailable
|
||||
{
|
||||
static constexpr auto name = "filesystemAvailable";
|
||||
static std::uintmax_t get(const std::filesystem::space_info & spaceinfo) { return spaceinfo.available; }
|
||||
static std::uintmax_t get(const DiskPtr & disk) { return disk->getAvailableSpace(); }
|
||||
};
|
||||
|
||||
struct FilesystemFree
|
||||
struct FilesystemUnreserved
|
||||
{
|
||||
static constexpr auto name = "filesystemFree";
|
||||
static std::uintmax_t get(const std::filesystem::space_info & spaceinfo) { return spaceinfo.free; }
|
||||
static constexpr auto name = "filesystemUnreserved";
|
||||
static std::uintmax_t get(const DiskPtr & disk) { return disk->getUnreservedSpace(); }
|
||||
};
|
||||
|
||||
struct FilesystemCapacity
|
||||
{
|
||||
static constexpr auto name = "filesystemCapacity";
|
||||
static std::uintmax_t get(const std::filesystem::space_info & spaceinfo) { return spaceinfo.capacity; }
|
||||
static std::uintmax_t get(const DiskPtr & disk) { return disk->getTotalSpace(); }
|
||||
};
|
||||
|
||||
template <typename Impl>
|
||||
@ -34,34 +43,72 @@ class FilesystemImpl : public IFunction
|
||||
public:
|
||||
static constexpr auto name = Impl::name;
|
||||
|
||||
static FunctionPtr create(ContextPtr context)
|
||||
{
|
||||
return std::make_shared<FilesystemImpl<Impl>>(std::filesystem::space(context->getPath()));
|
||||
}
|
||||
static FunctionPtr create(ContextPtr context_) { return std::make_shared<FilesystemImpl<Impl>>(context_); }
|
||||
|
||||
explicit FilesystemImpl(ContextPtr context_) : context(context_) { }
|
||||
|
||||
bool useDefaultImplementationForConstants() const override { return true; }
|
||||
|
||||
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
explicit FilesystemImpl(std::filesystem::space_info spaceinfo_) : spaceinfo(spaceinfo_) { }
|
||||
|
||||
String getName() const override { return name; }
|
||||
|
||||
bool isVariadic() const override { return true; }
|
||||
|
||||
size_t getNumberOfArguments() const override { return 0; }
|
||||
bool isDeterministic() const override { return false; }
|
||||
|
||||
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
|
||||
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
|
||||
{
|
||||
if (arguments.size() > 1)
|
||||
{
|
||||
throw Exception("Arguments size of function " + getName() + " should be 0 or 1", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
}
|
||||
if (arguments.size() == 1 && !isStringOrFixedString(arguments[0]))
|
||||
{
|
||||
throw Exception(
|
||||
"Arguments of function " + getName() + " should be String or FixedString", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
}
|
||||
return std::make_shared<DataTypeUInt64>();
|
||||
}
|
||||
|
||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
|
||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
|
||||
{
|
||||
return DataTypeUInt64().createColumnConst(input_rows_count, static_cast<UInt64>(Impl::get(spaceinfo)));
|
||||
if (arguments.empty())
|
||||
{
|
||||
auto disk = context->getDisk("default");
|
||||
return DataTypeUInt64().createColumnConst(input_rows_count, Impl::get(disk));
|
||||
}
|
||||
else
|
||||
{
|
||||
auto col = arguments[0].column;
|
||||
if (const ColumnString * col_str = checkAndGetColumn<ColumnString>(col.get()))
|
||||
{
|
||||
auto disk_map = context->getDisksMap();
|
||||
|
||||
auto col_res = ColumnVector<UInt64>::create(col_str->size());
|
||||
auto & data = col_res->getData();
|
||||
for (size_t i = 0; i < col_str->size(); ++i)
|
||||
{
|
||||
auto disk_name = col_str->getDataAt(i).toString();
|
||||
if (auto it = disk_map.find(disk_name); it != disk_map.end())
|
||||
data[i] = Impl::get(it->second);
|
||||
else
|
||||
throw Exception(
|
||||
"Unknown disk name " + disk_name + " while execute function " + getName(), ErrorCodes::UNKNOWN_DISK);
|
||||
}
|
||||
return col_res;
|
||||
}
|
||||
throw Exception(
|
||||
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::filesystem::space_info spaceinfo;
|
||||
ContextPtr context;
|
||||
};
|
||||
|
||||
}
|
||||
@ -70,7 +117,7 @@ REGISTER_FUNCTION(Filesystem)
|
||||
{
|
||||
factory.registerFunction<FilesystemImpl<FilesystemAvailable>>();
|
||||
factory.registerFunction<FilesystemImpl<FilesystemCapacity>>();
|
||||
factory.registerFunction<FilesystemImpl<FilesystemFree>>();
|
||||
factory.registerFunction<FilesystemImpl<FilesystemUnreserved>>();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1016,6 +1016,7 @@ public:
|
||||
size_t getNumberOfArguments() const override { return 3; }
|
||||
|
||||
bool useDefaultImplementationForNulls() const override { return false; }
|
||||
bool useDefaultImplementationForNothing() const override { return false; }
|
||||
bool isShortCircuit(ShortCircuitSettings & settings, size_t /*number_of_arguments*/) const override
|
||||
{
|
||||
settings.enable_lazy_execution_for_first_argument = false;
|
||||
|
@ -50,6 +50,7 @@ public:
|
||||
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
|
||||
size_t getNumberOfArguments() const override { return 0; }
|
||||
bool useDefaultImplementationForNulls() const override { return false; }
|
||||
bool useDefaultImplementationForNothing() const override { return false; }
|
||||
|
||||
ColumnNumbers getArgumentsThatDontImplyNullableReturnType(size_t number_of_arguments) const override
|
||||
{
|
||||
|
@ -315,7 +315,9 @@ MergeTreeReadTaskColumns getReadTaskColumns(
|
||||
/// 1. Columns for row level filter
|
||||
if (prewhere_info->row_level_filter)
|
||||
{
|
||||
Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames();
|
||||
Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames();
|
||||
injectRequiredColumns(
|
||||
data_part_info_for_reader, storage_snapshot, with_subcolumns, row_filter_column_names);
|
||||
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, row_filter_column_names));
|
||||
pre_name_set.insert(row_filter_column_names.begin(), row_filter_column_names.end());
|
||||
}
|
||||
@ -323,7 +325,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
|
||||
/// 2. Columns for prewhere
|
||||
Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
|
||||
|
||||
const auto injected_pre_columns = injectRequiredColumns(
|
||||
injectRequiredColumns(
|
||||
data_part_info_for_reader, storage_snapshot, with_subcolumns, all_pre_column_names);
|
||||
|
||||
for (const auto & name : all_pre_column_names)
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include "config.h"
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <Storages/StorageDelta.h>
|
||||
#include <Storages/StorageDeltaLake.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
@ -47,7 +47,7 @@ void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid table metadata, tried to remove {} before adding it", filename);
|
||||
}
|
||||
|
||||
std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
|
||||
std::vector<String> DeltaLakeMetadata::listCurrentFiles() &&
|
||||
{
|
||||
std::vector<String> keys;
|
||||
keys.reserve(file_update_time.size());
|
||||
@ -61,10 +61,10 @@ std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
|
||||
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context)
|
||||
: base_configuration(configuration_), table_path(table_path_)
|
||||
{
|
||||
Init(context);
|
||||
init(context);
|
||||
}
|
||||
|
||||
void JsonMetadataGetter::Init(ContextPtr context)
|
||||
void JsonMetadataGetter::init(ContextPtr context)
|
||||
{
|
||||
auto keys = getJsonLogFiles();
|
||||
|
||||
@ -180,7 +180,53 @@ void JsonMetadataGetter::handleJSON(const JSON & json)
|
||||
}
|
||||
}
|
||||
|
||||
StorageDelta::StorageDelta(
|
||||
namespace
|
||||
{
|
||||
|
||||
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
|
||||
{
|
||||
return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers};
|
||||
}
|
||||
|
||||
// DeltaLake stores data in parts in different files
|
||||
// keys is vector of parts with latest version
|
||||
// generateQueryFromKeys constructs query from parts filenames for
|
||||
// underlying StorageS3 engine
|
||||
String generateQueryFromKeys(const std::vector<String> & keys)
|
||||
{
|
||||
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
|
||||
return new_query;
|
||||
}
|
||||
|
||||
|
||||
StorageS3Configuration getAdjustedS3Configuration(
|
||||
const ContextPtr & context,
|
||||
StorageS3::S3Configuration & base_configuration,
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::string & table_path,
|
||||
Poco::Logger * log)
|
||||
{
|
||||
JsonMetadataGetter getter{base_configuration, table_path, context};
|
||||
|
||||
auto keys = getter.getFiles();
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys);
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
// set new url in configuration
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration.format;
|
||||
|
||||
return new_configuration;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
StorageDeltaLake::StorageDeltaLake(
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
ColumnsDescription columns_,
|
||||
@ -189,28 +235,14 @@ StorageDelta::StorageDelta(
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
|
||||
, base_configuration{getBaseConfiguration(configuration_)}
|
||||
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
|
||||
, table_path(base_configuration.uri.key)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
StorageS3::updateS3Configuration(context_, base_configuration);
|
||||
|
||||
JsonMetadataGetter getter{base_configuration, table_path, context_};
|
||||
|
||||
auto keys = getter.getFiles();
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys));
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
// set new url in configuration
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration_.format;
|
||||
|
||||
auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, configuration_, table_path, log);
|
||||
|
||||
if (columns_.empty())
|
||||
{
|
||||
@ -238,7 +270,7 @@ StorageDelta::StorageDelta(
|
||||
nullptr);
|
||||
}
|
||||
|
||||
Pipe StorageDelta::read(
|
||||
Pipe StorageDeltaLake::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -252,16 +284,18 @@ Pipe StorageDelta::read(
|
||||
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
String StorageDelta::generateQueryFromKeys(std::vector<String> && keys)
|
||||
ColumnsDescription StorageDeltaLake::getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
|
||||
{
|
||||
// DeltaLake store data parts in different files
|
||||
// keys are filenames of parts
|
||||
// for StorageS3 to read all parts we need format {key1,key2,key3,...keyn}
|
||||
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
|
||||
return new_query;
|
||||
auto base_configuration = getBaseConfiguration(configuration);
|
||||
StorageS3::updateS3Configuration(ctx, base_configuration);
|
||||
auto new_configuration = getAdjustedS3Configuration(
|
||||
ctx, base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
|
||||
return StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
|
||||
}
|
||||
|
||||
void registerStorageDelta(StorageFactory & factory)
|
||||
void registerStorageDeltaLake(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage(
|
||||
"DeltaLake",
|
||||
@ -287,7 +321,7 @@ void registerStorageDelta(StorageFactory & factory)
|
||||
configuration.format = "Parquet";
|
||||
}
|
||||
|
||||
return std::make_shared<StorageDelta>(
|
||||
return std::make_shared<StorageDeltaLake>(
|
||||
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), std::nullopt);
|
||||
},
|
||||
{
|
@ -32,7 +32,7 @@ public:
|
||||
void setLastModifiedTime(const String & filename, uint64_t timestamp);
|
||||
void remove(const String & filename, uint64_t timestamp);
|
||||
|
||||
std::vector<String> ListCurrentFiles() &&;
|
||||
std::vector<String> listCurrentFiles() &&;
|
||||
|
||||
private:
|
||||
std::unordered_map<String, uint64_t> file_update_time;
|
||||
@ -44,10 +44,10 @@ class JsonMetadataGetter
|
||||
public:
|
||||
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context);
|
||||
|
||||
std::vector<String> getFiles() { return std::move(metadata).ListCurrentFiles(); }
|
||||
std::vector<String> getFiles() { return std::move(metadata).listCurrentFiles(); }
|
||||
|
||||
private:
|
||||
void Init(ContextPtr context);
|
||||
void init(ContextPtr context);
|
||||
|
||||
std::vector<String> getJsonLogFiles();
|
||||
|
||||
@ -60,13 +60,13 @@ private:
|
||||
DeltaLakeMetadata metadata;
|
||||
};
|
||||
|
||||
class StorageDelta : public IStorage
|
||||
class StorageDeltaLake : public IStorage
|
||||
{
|
||||
public:
|
||||
// 1. Parses internal file structure of table
|
||||
// 2. Finds out parts with latest version
|
||||
// 3. Creates url for underlying StorageS3 enigne to handle reads
|
||||
StorageDelta(
|
||||
StorageDeltaLake(
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
ColumnsDescription columns_,
|
||||
@ -87,14 +87,12 @@ public:
|
||||
size_t max_block_size,
|
||||
size_t num_streams) override;
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ContextPtr ctx);
|
||||
private:
|
||||
void Init();
|
||||
|
||||
// DeltaLake stores data in parts in different files
|
||||
// keys is vector of parts with latest version
|
||||
// generateQueryFromKeys constructs query from parts filenames for
|
||||
// underlying StorageS3 engine
|
||||
static String generateQueryFromKeys(std::vector<String> && keys);
|
||||
void init();
|
||||
|
||||
StorageS3::S3Configuration base_configuration;
|
||||
std::shared_ptr<StorageS3> s3engine;
|
@ -28,115 +28,20 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
StorageHudi::StorageHudi(
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
ColumnsDescription columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
|
||||
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
|
||||
, table_path(base_configuration.uri.key)
|
||||
namespace
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
StorageS3::updateS3Configuration(context_, base_configuration);
|
||||
|
||||
auto keys = getKeysFromS3();
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration_.format);
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration_.format;
|
||||
|
||||
if (columns_.empty())
|
||||
{
|
||||
columns_ = StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
|
||||
storage_metadata.setColumns(columns_);
|
||||
}
|
||||
else
|
||||
storage_metadata.setColumns(columns_);
|
||||
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
s3engine = std::make_shared<StorageS3>(
|
||||
new_configuration,
|
||||
table_id_,
|
||||
columns_,
|
||||
constraints_,
|
||||
comment,
|
||||
context_,
|
||||
format_settings_,
|
||||
/* distributed_processing_ */ false,
|
||||
nullptr);
|
||||
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
|
||||
{
|
||||
return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers};
|
||||
}
|
||||
|
||||
Pipe StorageHudi::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
size_t num_streams)
|
||||
{
|
||||
StorageS3::updateS3Configuration(context, base_configuration);
|
||||
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
std::vector<std::string> StorageHudi::getKeysFromS3()
|
||||
{
|
||||
std::vector<std::string> keys;
|
||||
|
||||
const auto & client = base_configuration.client;
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
Aws::S3::Model::ListObjectsV2Outcome outcome;
|
||||
|
||||
bool is_finished{false};
|
||||
const auto bucket{base_configuration.uri.bucket};
|
||||
|
||||
request.SetBucket(bucket);
|
||||
request.SetPrefix(table_path);
|
||||
|
||||
while (!is_finished)
|
||||
{
|
||||
outcome = client->ListObjectsV2(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception(
|
||||
ErrorCodes::S3_ERROR,
|
||||
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
|
||||
quoteString(bucket),
|
||||
quoteString(table_path),
|
||||
backQuote(outcome.GetError().GetExceptionName()),
|
||||
quoteString(outcome.GetError().GetMessage()));
|
||||
|
||||
const auto & result_batch = outcome.GetResult().GetContents();
|
||||
for (const auto & obj : result_batch)
|
||||
{
|
||||
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
|
||||
keys.push_back(filename);
|
||||
LOG_DEBUG(log, "Found file: {}", filename);
|
||||
}
|
||||
|
||||
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
|
||||
is_finished = !outcome.GetResult().GetIsTruncated();
|
||||
}
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
String StorageHudi::generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
|
||||
/// Apache Hudi store parts of data in different files.
|
||||
/// Every part file has timestamp in it.
|
||||
/// Every partition(directory) in Apache Hudi has different versions of part.
|
||||
/// To find needed parts we need to find out latest part file for every partition.
|
||||
/// Part format is usually parquet, but can differ.
|
||||
String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
|
||||
{
|
||||
/// For each partition path take only latest file.
|
||||
struct FileInfo
|
||||
@ -187,6 +92,138 @@ String StorageHudi::generateQueryFromKeys(const std::vector<std::string> & keys,
|
||||
return "{" + list_of_keys + "}";
|
||||
}
|
||||
|
||||
std::vector<std::string> getKeysFromS3(const StorageS3::S3Configuration & base_configuration, const std::string & table_path, Poco::Logger * log)
|
||||
{
|
||||
std::vector<std::string> keys;
|
||||
|
||||
const auto & client = base_configuration.client;
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
Aws::S3::Model::ListObjectsV2Outcome outcome;
|
||||
|
||||
bool is_finished{false};
|
||||
const auto bucket{base_configuration.uri.bucket};
|
||||
|
||||
request.SetBucket(bucket);
|
||||
request.SetPrefix(table_path);
|
||||
|
||||
while (!is_finished)
|
||||
{
|
||||
outcome = client->ListObjectsV2(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception(
|
||||
ErrorCodes::S3_ERROR,
|
||||
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
|
||||
quoteString(bucket),
|
||||
quoteString(table_path),
|
||||
backQuote(outcome.GetError().GetExceptionName()),
|
||||
quoteString(outcome.GetError().GetMessage()));
|
||||
|
||||
const auto & result_batch = outcome.GetResult().GetContents();
|
||||
for (const auto & obj : result_batch)
|
||||
{
|
||||
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
|
||||
keys.push_back(filename);
|
||||
LOG_DEBUG(log, "Found file: {}", filename);
|
||||
}
|
||||
|
||||
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
|
||||
is_finished = !outcome.GetResult().GetIsTruncated();
|
||||
}
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
|
||||
StorageS3Configuration getAdjustedS3Configuration(
|
||||
StorageS3::S3Configuration & base_configuration,
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::string & table_path,
|
||||
Poco::Logger * log)
|
||||
{
|
||||
auto keys = getKeysFromS3(base_configuration, table_path, log);
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration.format);
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
LOG_DEBUG(log, "Table path: {}", table_path);
|
||||
|
||||
StorageS3Configuration new_configuration;
|
||||
new_configuration.url = new_uri;
|
||||
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
|
||||
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
|
||||
new_configuration.format = configuration.format;
|
||||
|
||||
return new_configuration;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
StorageHudi::StorageHudi(
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
ColumnsDescription columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration{getBaseConfiguration(configuration_)}
|
||||
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
|
||||
, table_path(base_configuration.uri.key)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
StorageS3::updateS3Configuration(context_, base_configuration);
|
||||
|
||||
auto new_configuration = getAdjustedS3Configuration(base_configuration, configuration_, table_path, log);
|
||||
|
||||
if (columns_.empty())
|
||||
{
|
||||
columns_ = StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
|
||||
storage_metadata.setColumns(columns_);
|
||||
}
|
||||
else
|
||||
storage_metadata.setColumns(columns_);
|
||||
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
s3engine = std::make_shared<StorageS3>(
|
||||
new_configuration,
|
||||
table_id_,
|
||||
columns_,
|
||||
constraints_,
|
||||
comment,
|
||||
context_,
|
||||
format_settings_,
|
||||
/* distributed_processing_ */ false,
|
||||
nullptr);
|
||||
}
|
||||
|
||||
Pipe StorageHudi::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
size_t num_streams)
|
||||
{
|
||||
StorageS3::updateS3Configuration(context, base_configuration);
|
||||
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
ColumnsDescription StorageHudi::getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
|
||||
{
|
||||
auto base_configuration = getBaseConfiguration(configuration);
|
||||
StorageS3::updateS3Configuration(ctx, base_configuration);
|
||||
auto new_configuration = getAdjustedS3Configuration(
|
||||
base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
|
||||
return StorageS3::getTableStructureFromData(
|
||||
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
|
||||
}
|
||||
|
||||
void registerStorageHudi(StorageFactory & factory)
|
||||
{
|
||||
|
@ -48,16 +48,11 @@ public:
|
||||
size_t max_block_size,
|
||||
size_t num_streams) override;
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
const StorageS3Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ContextPtr ctx);
|
||||
private:
|
||||
std::vector<std::string> getKeysFromS3();
|
||||
|
||||
/// Apache Hudi store parts of data in different files.
|
||||
/// Every part file has timestamp in it.
|
||||
/// Every partition(directory) in Apache Hudi has different versions of part.
|
||||
/// To find needed parts we need to find out latest part file for every partition.
|
||||
/// Part format is usually parquet, but can differ.
|
||||
static String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format);
|
||||
|
||||
StorageS3::S3Configuration base_configuration;
|
||||
std::shared_ptr<StorageS3> s3engine;
|
||||
Poco::Logger * log;
|
||||
|
@ -214,7 +214,7 @@ private:
|
||||
friend class StorageS3Cluster;
|
||||
friend class TableFunctionS3Cluster;
|
||||
friend class StorageHudi;
|
||||
friend class StorageDelta;
|
||||
friend class StorageDeltaLake;
|
||||
|
||||
S3Configuration s3_configuration;
|
||||
std::vector<String> keys;
|
||||
|
@ -34,7 +34,7 @@ void registerStorageS3(StorageFactory & factory);
|
||||
void registerStorageCOS(StorageFactory & factory);
|
||||
void registerStorageOSS(StorageFactory & factory);
|
||||
void registerStorageHudi(StorageFactory & factory);
|
||||
void registerStorageDelta(StorageFactory & factory);
|
||||
void registerStorageDeltaLake(StorageFactory & factory);
|
||||
#endif
|
||||
|
||||
#if USE_HDFS
|
||||
@ -123,7 +123,7 @@ void registerStorages()
|
||||
registerStorageCOS(factory);
|
||||
registerStorageOSS(factory);
|
||||
registerStorageHudi(factory);
|
||||
registerStorageDelta(factory);
|
||||
registerStorageDeltaLake(factory);
|
||||
#endif
|
||||
|
||||
#if USE_HDFS
|
||||
|
@ -10,7 +10,7 @@
|
||||
# include <Interpreters/evaluateConstantExpression.h>
|
||||
# include <Interpreters/parseColumnsListForTableFunction.h>
|
||||
# include <Parsers/ASTLiteral.h>
|
||||
# include <Storages/StorageDelta.h>
|
||||
# include <Storages/StorageDeltaLake.h>
|
||||
# include <Storages/StorageURL.h>
|
||||
# include <Storages/checkAndGetLiteralArgument.h>
|
||||
# include <TableFunctions/TableFunctionDeltaLake.h>
|
||||
@ -27,7 +27,7 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
void TableFunctionDelta::parseArgumentsImpl(
|
||||
void TableFunctionDeltaLake::parseArgumentsImpl(
|
||||
const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & base_configuration)
|
||||
{
|
||||
if (args.empty() || args.size() > 6)
|
||||
@ -100,7 +100,7 @@ void TableFunctionDelta::parseArgumentsImpl(
|
||||
= checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
|
||||
}
|
||||
|
||||
void TableFunctionDelta::parseArguments(const ASTPtr & ast_function, ContextPtr context)
|
||||
void TableFunctionDeltaLake::parseArguments(const ASTPtr & ast_function, ContextPtr context)
|
||||
{
|
||||
/// Parse args
|
||||
ASTs & args_func = ast_function->children;
|
||||
@ -125,18 +125,18 @@ void TableFunctionDelta::parseArguments(const ASTPtr & ast_function, ContextPtr
|
||||
parseArgumentsImpl(message, args, context, configuration);
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionDelta::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionDeltaLake::getActualTableStructure(ContextPtr context) const
|
||||
{
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
|
||||
return StorageDeltaLake::getTableStructureFromData(configuration, std::nullopt, context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(configuration.structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionDelta::executeImpl(
|
||||
StoragePtr TableFunctionDeltaLake::executeImpl(
|
||||
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
Poco::URI uri(configuration.url);
|
||||
@ -146,7 +146,7 @@ StoragePtr TableFunctionDelta::executeImpl(
|
||||
if (configuration.structure != "auto")
|
||||
columns = parseColumnsListFromString(configuration.structure, context);
|
||||
|
||||
StoragePtr storage = std::make_shared<StorageDelta>(
|
||||
StoragePtr storage = std::make_shared<StorageDeltaLake>(
|
||||
configuration, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, String{}, context, std::nullopt);
|
||||
|
||||
storage->startup();
|
||||
@ -155,9 +155,9 @@ StoragePtr TableFunctionDelta::executeImpl(
|
||||
}
|
||||
|
||||
|
||||
void registerTableFunctionDelta(TableFunctionFactory & factory)
|
||||
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<TableFunctionDelta>(
|
||||
factory.registerFunction<TableFunctionDeltaLake>(
|
||||
{.documentation
|
||||
= {R"(The table function can be used to read the DeltaLake table stored on object store.)",
|
||||
Documentation::Examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)"}},
|
||||
|
@ -16,7 +16,7 @@ class TableFunctionS3Cluster;
|
||||
|
||||
/* deltaLake(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary DeltaLake table on S3.
|
||||
*/
|
||||
class TableFunctionDelta : public ITableFunction
|
||||
class TableFunctionDeltaLake : public ITableFunction
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = "deltaLake";
|
||||
|
@ -130,7 +130,7 @@ ColumnsDescription TableFunctionHudi::getActualTableStructure(ContextPtr context
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
|
||||
return StorageHudi::getTableStructureFromData(configuration, std::nullopt, context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(configuration.structure, context);
|
||||
|
@ -28,7 +28,7 @@ void registerTableFunctions()
|
||||
registerTableFunctionS3Cluster(factory);
|
||||
registerTableFunctionCOS(factory);
|
||||
registerTableFunctionHudi(factory);
|
||||
registerTableFunctionDelta(factory);
|
||||
registerTableFunctionDeltaLake(factory);
|
||||
registerTableFunctionOSS(factory);
|
||||
|
||||
#endif
|
||||
|
@ -25,7 +25,7 @@ void registerTableFunctionS3(TableFunctionFactory & factory);
|
||||
void registerTableFunctionS3Cluster(TableFunctionFactory & factory);
|
||||
void registerTableFunctionCOS(TableFunctionFactory & factory);
|
||||
void registerTableFunctionHudi(TableFunctionFactory & factory);
|
||||
void registerTableFunctionDelta(TableFunctionFactory & factory);
|
||||
void registerTableFunctionDeltaLake(TableFunctionFactory & factory);
|
||||
void registerTableFunctionOSS(TableFunctionFactory & factory);
|
||||
#endif
|
||||
|
||||
|
@ -1,4 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
import logging
|
||||
|
||||
import boto3 # type: ignore
|
||||
from github import Github # type: ignore
|
||||
|
||||
@ -9,14 +11,30 @@ def get_parameter_from_ssm(name, decrypt=True, client=None):
|
||||
return client.get_parameter(Name=name, WithDecryption=decrypt)["Parameter"]["Value"]
|
||||
|
||||
|
||||
def get_best_robot_token(token_prefix_env_name="github_robot_token_", total_tokens=4):
|
||||
def get_best_robot_token(token_prefix_env_name="github_robot_token_"):
|
||||
client = boto3.client("ssm", region_name="us-east-1")
|
||||
tokens = {}
|
||||
for i in range(1, total_tokens + 1):
|
||||
token_name = token_prefix_env_name + str(i)
|
||||
token = get_parameter_from_ssm(token_name, True, client)
|
||||
gh = Github(token, per_page=100)
|
||||
rest, _ = gh.rate_limiting
|
||||
tokens[token] = rest
|
||||
parameters = client.describe_parameters(
|
||||
ParameterFilters=[
|
||||
{"Key": "Name", "Option": "BeginsWith", "Values": [token_prefix_env_name]}
|
||||
]
|
||||
)["Parameters"]
|
||||
assert parameters
|
||||
token = {"login": "", "value": "", "rest": 0}
|
||||
|
||||
return max(tokens.items(), key=lambda x: x[1])[0]
|
||||
for token_name in [p["Name"] for p in parameters]:
|
||||
value = get_parameter_from_ssm(token_name, True, client)
|
||||
gh = Github(value, per_page=100)
|
||||
# Do not spend additional request to API by accessin user.login unless
|
||||
# the token is chosen by the remaining requests number
|
||||
user = gh.get_user()
|
||||
rest, _ = gh.rate_limiting
|
||||
logging.info("Get token with %s remaining requests", rest)
|
||||
if token["rest"] < rest:
|
||||
token = {"user": user, "value": value, "rest": rest}
|
||||
|
||||
assert token["value"]
|
||||
logging.info(
|
||||
"User %s with %s remaining requests is used", token["user"].login, token["rest"]
|
||||
)
|
||||
|
||||
return token["value"]
|
||||
|
@ -249,7 +249,7 @@
|
||||
"cosh"
|
||||
"basename"
|
||||
"evalMLMethod"
|
||||
"filesystemFree"
|
||||
"filesystemUnreserved"
|
||||
"filesystemCapacity"
|
||||
"reinterpretAsDate"
|
||||
"filesystemAvailable"
|
||||
|
@ -13,7 +13,11 @@ upstream = cluster.add_instance("upstream")
|
||||
backward = cluster.add_instance(
|
||||
"backward",
|
||||
image="clickhouse/clickhouse-server",
|
||||
tag="22.9",
|
||||
# Note that a bug changed the string representation of several aggregations in 22.9 and 22.10 and some minor
|
||||
# releases of 22.8, 22.7 and 22.3
|
||||
# See https://github.com/ClickHouse/ClickHouse/issues/42916
|
||||
# Affected at least: singleValueOrNull, last_value, min, max, any, anyLast, anyHeavy, first_value, argMin, argMax
|
||||
tag="22.6",
|
||||
with_installed_binary=True,
|
||||
)
|
||||
|
||||
@ -139,6 +143,9 @@ def test_string_functions(start_cluster):
|
||||
"substring",
|
||||
"CAST",
|
||||
# NOTE: no need to ignore now()/now64() since they will fail because they don't accept any argument
|
||||
# 22.8 Backward Incompatible Change: Extended range of Date32
|
||||
"toDate32OrZero",
|
||||
"toDate32OrDefault",
|
||||
]
|
||||
functions = filter(lambda x: x not in excludes, functions)
|
||||
|
||||
@ -149,14 +156,15 @@ def test_string_functions(start_cluster):
|
||||
failed = 0
|
||||
passed = 0
|
||||
|
||||
def get_function_value(node, function_name, value="foo"):
|
||||
def get_function_value(node, function_name, value):
|
||||
return node.query(f"select {function_name}('{value}')").strip()
|
||||
|
||||
v = "foo"
|
||||
for function in functions:
|
||||
logging.info("Checking %s", function)
|
||||
logging.info("Checking %s('%s')", function, v)
|
||||
|
||||
try:
|
||||
backward_value = get_function_value(backward, function)
|
||||
backward_value = get_function_value(backward, function, v)
|
||||
except QueryRuntimeException as e:
|
||||
error_message = str(e)
|
||||
allowed_errors = [
|
||||
@ -199,11 +207,12 @@ def test_string_functions(start_cluster):
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
upstream_value = get_function_value(upstream, function)
|
||||
upstream_value = get_function_value(upstream, function, v)
|
||||
if upstream_value != backward_value:
|
||||
logging.info(
|
||||
"Failed %s, %s (backward) != %s (upstream)",
|
||||
logging.warning(
|
||||
"Failed %s('%s') %s (backward) != %s (upstream)",
|
||||
function,
|
||||
v,
|
||||
backward_value,
|
||||
upstream_value,
|
||||
)
|
||||
|
@ -1,3 +1,4 @@
|
||||
<clickhouse>
|
||||
<max_server_memory_usage>2000000000</max_server_memory_usage>
|
||||
<allow_use_jemalloc_memory>false</allow_use_jemalloc_memory>
|
||||
</clickhouse>
|
@ -1,7 +1,6 @@
|
||||
import logging
|
||||
import os
|
||||
import json
|
||||
|
||||
import helpers.client
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
@ -143,3 +142,25 @@ def test_select_query(started_cluster):
|
||||
),
|
||||
).splitlines()
|
||||
assert len(result) > 0
|
||||
|
||||
|
||||
def test_describe_query(started_cluster):
|
||||
instance = started_cluster.instances["main_server"]
|
||||
bucket = started_cluster.minio_bucket
|
||||
result = instance.query(
|
||||
f"DESCRIBE deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
|
||||
)
|
||||
|
||||
assert result == TSV(
|
||||
[
|
||||
["begin_lat", "Nullable(Float64)"],
|
||||
["begin_lon", "Nullable(Float64)"],
|
||||
["driver", "Nullable(String)"],
|
||||
["end_lat", "Nullable(Float64)"],
|
||||
["end_lon", "Nullable(Float64)"],
|
||||
["fare", "Nullable(Float64)"],
|
||||
["rider", "Nullable(String)"],
|
||||
["ts", "Nullable(Int64)"],
|
||||
["uuid", "Nullable(String)"],
|
||||
]
|
||||
)
|
||||
|
@ -161,7 +161,7 @@ def test_select_query(started_cluster):
|
||||
result = run_query(instance, distinct_select_query)
|
||||
result_table_function = run_query(
|
||||
instance,
|
||||
distinct_select_query.format(
|
||||
distinct_select_table_function_query.format(
|
||||
ip=started_cluster.minio_ip, port=started_cluster.minio_port, bucket=bucket
|
||||
),
|
||||
)
|
||||
@ -173,3 +173,31 @@ def test_select_query(started_cluster):
|
||||
|
||||
assert TSV(result) == TSV(expected)
|
||||
assert TSV(result_table_function) == TSV(expected)
|
||||
|
||||
|
||||
def test_describe_query(started_cluster):
|
||||
instance = started_cluster.instances["main_server"]
|
||||
bucket = started_cluster.minio_bucket
|
||||
result = instance.query(
|
||||
f"DESCRIBE hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
|
||||
)
|
||||
|
||||
assert result == TSV(
|
||||
[
|
||||
["_hoodie_commit_time", "Nullable(String)"],
|
||||
["_hoodie_commit_seqno", "Nullable(String)"],
|
||||
["_hoodie_record_key", "Nullable(String)"],
|
||||
["_hoodie_partition_path", "Nullable(String)"],
|
||||
["_hoodie_file_name", "Nullable(String)"],
|
||||
["begin_lat", "Nullable(Float64)"],
|
||||
["begin_lon", "Nullable(Float64)"],
|
||||
["driver", "Nullable(String)"],
|
||||
["end_lat", "Nullable(Float64)"],
|
||||
["end_lon", "Nullable(Float64)"],
|
||||
["fare", "Nullable(Float64)"],
|
||||
["partitionpath", "Nullable(String)"],
|
||||
["rider", "Nullable(String)"],
|
||||
["ts", "Nullable(Int64)"],
|
||||
["uuid", "Nullable(String)"],
|
||||
]
|
||||
)
|
||||
|
@ -1 +1 @@
|
||||
SELECT filesystemCapacity() >= filesystemFree() AND filesystemFree() >= filesystemAvailable() AND filesystemAvailable() >= 0;
|
||||
SELECT filesystemCapacity() >= filesystemAvailable() AND filesystemAvailable() >= 0 AND filesystemUnreserved() >= 0;
|
||||
|
@ -1,11 +1,14 @@
|
||||
1
|
||||
2
|
||||
3
|
||||
3.1
|
||||
4
|
||||
5
|
||||
5.1
|
||||
6
|
||||
7
|
||||
7.1
|
||||
7.2
|
||||
8
|
||||
9
|
||||
text_log non empty
|
||||
|
@ -37,12 +37,20 @@ rm -f "$tmp_file" >/dev/null 2>&1
|
||||
echo 3
|
||||
# failure at before query start
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--query="SELECT 'find_me_TOPSECRET=TOPSECRET' FROM non_existing_table FORMAT Null" \
|
||||
--query="SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" \
|
||||
--log_queries=1 --ignore-error --multiquery |& grep -v '^(query: ' > "$tmp_file"
|
||||
|
||||
grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3a'
|
||||
grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3b'
|
||||
|
||||
echo '3.1'
|
||||
echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @- >"$tmp_file" 2>&1
|
||||
|
||||
grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3.1a'
|
||||
grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3.1b'
|
||||
|
||||
#echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | curl -sSg http://172.17.0.3:8123/ -d @-
|
||||
|
||||
rm -f "$tmp_file" >/dev/null 2>&1
|
||||
echo 4
|
||||
# failure at the end of query
|
||||
@ -100,6 +108,21 @@ $CLICKHOUSE_CLIENT \
|
||||
--server_logs_file=/dev/null \
|
||||
--query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and query like '%TOPSECRET%';"
|
||||
|
||||
echo '7.1'
|
||||
# query_log exceptions
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--server_logs_file=/dev/null \
|
||||
--query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and exception like '%TOPSECRET%'"
|
||||
|
||||
echo '7.2'
|
||||
|
||||
# not perfect: when run in parallel with other tests that check can give false-negative result
|
||||
# because other tests can overwrite the last_error_message, where we check the absence of sensitive data.
|
||||
# But it's still good enough for CI - in case of regressions it will start flapping (normally it shouldn't)
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--server_logs_file=/dev/null \
|
||||
--query="select * from system.errors where last_error_message like '%TOPSECRET%';"
|
||||
|
||||
|
||||
rm -f "$tmp_file" >/dev/null 2>&1
|
||||
echo 8
|
||||
|
@ -5,4 +5,4 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
# Checks that these functions are working inside clickhouse-local. Does not check specific values.
|
||||
$CLICKHOUSE_LOCAL --query "SELECT filesystemAvailable() > 0, filesystemFree() <= filesystemCapacity()"
|
||||
$CLICKHOUSE_LOCAL --query "SELECT filesystemAvailable() > 0, filesystemUnreserved() <= filesystemCapacity()"
|
||||
|
@ -322,7 +322,7 @@ farmHash64
|
||||
file
|
||||
filesystemAvailable
|
||||
filesystemCapacity
|
||||
filesystemFree
|
||||
filesystemUnreserved
|
||||
finalizeAggregation
|
||||
firstSignificantSubdomainCustom
|
||||
firstSignificantSubdomainCustomRFC
|
||||
|
@ -0,0 +1,2 @@
|
||||
1
|
||||
1
|
6
tests/queries/0_stateless/02457_filesystem_function.sql
Normal file
6
tests/queries/0_stateless/02457_filesystem_function.sql
Normal file
@ -0,0 +1,6 @@
|
||||
-- Tags: no-fasttest
|
||||
|
||||
select filesystemCapacity('s3_disk') >= filesystemAvailable('s3_disk') and filesystemAvailable('s3_disk') >= filesystemUnreserved('s3_disk');
|
||||
select filesystemCapacity('default') >= filesystemAvailable('default') and filesystemAvailable('default') >= 0 and filesystemUnreserved('default') >= 0;
|
||||
|
||||
select filesystemCapacity('__un_exists_disk'); -- { serverError UNKNOWN_DISK }
|
@ -0,0 +1,25 @@
|
||||
1
|
||||
22.8.5.29 10
|
||||
22.8.6.71 10
|
||||
1
|
||||
22.8.5.29 52
|
||||
22.8.6.71 52
|
||||
1
|
||||
22.8.5.29 0
|
||||
22.8.6.71 0
|
||||
46_OK 0123456789012345678901234567890123456789012345
|
||||
46_KO 0123456789012345678901234567890123456789012345
|
||||
47_OK 01234567890123456789012345678901234567890123456
|
||||
47_KO 01234567890123456789012345678901234567890123456
|
||||
48_OK 012345678901234567890123456789012345678901234567
|
||||
48_KO 012345678901234567890123456789012345678901234567
|
||||
63_OK 012345678901234567890123456789012345678901234567890123456789012
|
||||
63_KO 012345678901234567890123456789012345678901234567890123456789012
|
||||
64_OK 0123456789012345678901234567890123456789012345678901234567890123
|
||||
64_KO 0123456789012345678901234567890123456789012345678901234567890123
|
||||
-1 0
|
||||
-2 0
|
||||
-2^31 0
|
||||
1M without 0 1048576
|
||||
1M with 0 1048575
|
||||
fuzz2 0123 4
|
@ -0,0 +1,109 @@
|
||||
|
||||
-- Context: https://github.com/ClickHouse/ClickHouse/issues/42916
|
||||
|
||||
-- STRING WITH 10 CHARACTERS
|
||||
-- SELECT version() AS v, hex(argMaxState('0123456789', number)) AS state FROM numbers(1) FORMAT CSV
|
||||
|
||||
CREATE TABLE argmaxstate_hex_small
|
||||
(
|
||||
`v` String,
|
||||
`state` String
|
||||
)
|
||||
ENGINE = TinyLog;
|
||||
|
||||
INSERT into argmaxstate_hex_small VALUES ('22.8.5.29','0B0000003031323334353637383900010000000000000000'), ('22.8.6.71','0A00000030313233343536373839010000000000000000');
|
||||
|
||||
-- Assert that the current version will write the same as 22.8.5 (last known good 22.8 minor)
|
||||
SELECT
|
||||
(SELECT hex(argMaxState('0123456789', number)) FROM numbers(1)) = state
|
||||
FROM argmaxstate_hex_small
|
||||
WHERE v = '22.8.5.29';
|
||||
|
||||
-- Assert that the current version can read correctly both the old and the regression states
|
||||
SELECT
|
||||
v,
|
||||
length(finalizeAggregation(CAST(unhex(state) AS AggregateFunction(argMax, String, UInt64))))
|
||||
FROM argmaxstate_hex_small;
|
||||
|
||||
-- STRING WITH 54 characters
|
||||
-- SELECT version() AS v, hex(argMaxState('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz', number)) AS state FROM numbers(1) FORMAT CSV
|
||||
CREATE TABLE argmaxstate_hex_large
|
||||
(
|
||||
`v` String,
|
||||
`state` String
|
||||
)
|
||||
ENGINE = TinyLog;
|
||||
|
||||
INSERT into argmaxstate_hex_large VALUES ('22.8.5.29','350000004142434445464748494A4B4C4D4E4F505152535455565758595A6162636465666768696A6B6C6D6E6F707172737475767778797A00010000000000000000'), ('22.8.6.71','340000004142434445464748494A4B4C4D4E4F505152535455565758595A6162636465666768696A6B6C6D6E6F707172737475767778797A010000000000000000');
|
||||
|
||||
SELECT
|
||||
(SELECT hex(argMaxState('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz', number)) FROM numbers(1)) = state
|
||||
FROM argmaxstate_hex_large
|
||||
WHERE v = '22.8.5.29';
|
||||
|
||||
SELECT
|
||||
v,
|
||||
length(finalizeAggregation(CAST(unhex(state) AS AggregateFunction(argMax, String, UInt64))))
|
||||
FROM argmaxstate_hex_large;
|
||||
|
||||
-- STRING WITH 0 characters
|
||||
-- SELECT version() AS v, hex(argMaxState('', number)) AS state FROM numbers(1) FORMAT CSV
|
||||
CREATE TABLE argmaxstate_hex_empty
|
||||
(
|
||||
`v` String,
|
||||
`state` String
|
||||
)
|
||||
ENGINE = TinyLog;
|
||||
|
||||
INSERT into argmaxstate_hex_empty VALUES ('22.8.5.29','0100000000010000000000000000'), ('22.8.6.71','00000000010000000000000000');
|
||||
|
||||
SELECT
|
||||
(SELECT hex(argMaxState('', number)) FROM numbers(1)) = state
|
||||
FROM argmaxstate_hex_empty
|
||||
WHERE v = '22.8.5.29';
|
||||
|
||||
SELECT v, length(finalizeAggregation(CAST(unhex(state) AS AggregateFunction(argMax, String, UInt64))))
|
||||
FROM argmaxstate_hex_empty;
|
||||
|
||||
-- Right in the border of small and large buffers
|
||||
-- SELECT hex(argMaxState('0123456789012345678901234567890123456789012345' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV
|
||||
SELECT '46_OK', finalizeAggregation(CAST(unhex('2F0000003031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343500010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
|
||||
SELECT '46_KO', finalizeAggregation(CAST(unhex('2E00000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
|
||||
|
||||
-- SELECT hex(argMaxState('01234567890123456789012345678901234567890123456' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV
|
||||
SELECT '47_OK', finalizeAggregation(CAST(unhex('30000000303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
|
||||
SELECT '47_KO', finalizeAggregation(CAST(unhex('2F0000003031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
|
||||
|
||||
-- SELECT hex(argMaxState('012345678901234567890123456789012345678901234567' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV
|
||||
SELECT '48_OK', finalizeAggregation(CAST(unhex('3100000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363700010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
|
||||
SELECT '48_KO', finalizeAggregation(CAST(unhex('30000000303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
|
||||
|
||||
-- Right in the allocation limit (power of 2)
|
||||
-- SELECT hex(argMaxState('012345678901234567890123456789012345678901234567890123456789012' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV
|
||||
SELECT '63_OK', finalizeAggregation(CAST(unhex('4000000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313200010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
|
||||
SELECT '63_KO', finalizeAggregation(CAST(unhex('3F000000303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
|
||||
-- SELECT hex(argMaxState('0123456789012345678901234567890123456789012345678901234567890123' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV
|
||||
SELECT '64_OK', finalizeAggregation(CAST(unhex('410000003031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323300010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
|
||||
SELECT '64_KO', finalizeAggregation(CAST(unhex('4000000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233010000000000000000'), 'AggregateFunction(argMax, String, UInt64)'));
|
||||
|
||||
SELECT '-1', maxMerge(x), length(maxMerge(x)) from (select CAST(unhex('ffffffff') || randomString(100500), 'AggregateFunction(max, String)') as x);
|
||||
SELECT '-2', maxMerge(x), length(maxMerge(x)) from (select CAST(unhex('fffffffe') || randomString(100500), 'AggregateFunction(max, String)') as x);
|
||||
SELECT '-2^31', maxMerge(x), length(maxMerge(x)) from (select CAST(unhex('00000080') || randomString(100500), 'AggregateFunction(max, String)') as x);
|
||||
|
||||
SELECT '2^31-2', maxMerge(x) from (select CAST(unhex('feffff7f') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE }
|
||||
SELECT '2^31-1', maxMerge(x) from (select CAST(unhex('ffffff7f') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE }
|
||||
|
||||
SELECT '2^30', maxMerge(x) from (select CAST(unhex('00000040') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE }
|
||||
SELECT '2^30+1', maxMerge(x) from (select CAST(unhex('01000040') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE }
|
||||
|
||||
SELECT '2^30-1', maxMerge(x) from (select CAST(unhex('ffffff3f') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError CANNOT_READ_ALL_DATA }
|
||||
-- The following query works, but it's too long and consumes to much memory
|
||||
-- SELECT '2^30-1', length(maxMerge(x)) from (select CAST(unhex('ffffff3f') || randomString(0x3FFFFFFF - 1) || 'x', 'AggregateFunction(max, String)') as x);
|
||||
SELECT '1M without 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || 'x', 'AggregateFunction(max, String)') as x);
|
||||
SELECT '1M with 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || '\0', 'AggregateFunction(max, String)') as x);
|
||||
|
||||
SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError CORRUPTED_DATA }
|
||||
SELECT 'fuzz2', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '01' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x);
|
||||
SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
|
||||
SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
|
||||
SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError CORRUPTED_DATA }
|
@ -0,0 +1,16 @@
|
||||
-- { echoOn }
|
||||
|
||||
SELECT a, c FROM test_rlp WHERE c%2 == 0 AND b < 5;
|
||||
0 10
|
||||
2 12
|
||||
4 14
|
||||
DROP POLICY IF EXISTS test_rlp_policy ON test_rlp;
|
||||
CREATE ROW POLICY test_rlp_policy ON test_rlp FOR SELECT USING c%2 == 0 TO default;
|
||||
SELECT a, c FROM test_rlp WHERE b < 5 SETTINGS optimize_move_to_prewhere = 0;
|
||||
0 10
|
||||
2 12
|
||||
4 14
|
||||
SELECT a, c FROM test_rlp PREWHERE b < 5;
|
||||
0 10
|
||||
2 12
|
||||
4 14
|
@ -0,0 +1,25 @@
|
||||
DROP TABLE IF EXISTS test_rlp;
|
||||
|
||||
CREATE TABLE test_rlp (a Int32, b Int32) ENGINE=MergeTree() ORDER BY a SETTINGS index_granularity=5;
|
||||
|
||||
INSERT INTO test_rlp SELECT number, number FROM numbers(15);
|
||||
|
||||
ALTER TABLE test_rlp ADD COLUMN c Int32 DEFAULT b+10;
|
||||
|
||||
-- { echoOn }
|
||||
|
||||
SELECT a, c FROM test_rlp WHERE c%2 == 0 AND b < 5;
|
||||
|
||||
DROP POLICY IF EXISTS test_rlp_policy ON test_rlp;
|
||||
|
||||
CREATE ROW POLICY test_rlp_policy ON test_rlp FOR SELECT USING c%2 == 0 TO default;
|
||||
|
||||
SELECT a, c FROM test_rlp WHERE b < 5 SETTINGS optimize_move_to_prewhere = 0;
|
||||
|
||||
SELECT a, c FROM test_rlp PREWHERE b < 5;
|
||||
|
||||
-- { echoOff }
|
||||
|
||||
DROP POLICY test_rlp_policy ON test_rlp;
|
||||
|
||||
DROP TABLE test_rlp;
|
@ -0,0 +1,2 @@
|
||||
[] 0
|
||||
[] 0
|
@ -0,0 +1,3 @@
|
||||
select [] as arr, if(empty(arr), 0, arr[-1]);
|
||||
select [] as arr, multiIf(empty(arr), 0, length(arr) > 1, arr[-1], 0);
|
||||
|
Loading…
Reference in New Issue
Block a user