diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index d2ad9b04993..cfc153629e0 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1148,6 +1148,9 @@ int Server::main(const std::vector & /*args*/) total_memory_tracker.setDescription("(total)"); total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); + bool allow_use_jemalloc_memory = config->getBool("allow_use_jemalloc_memory", true); + total_memory_tracker.setAllowUseJemallocMemory(allow_use_jemalloc_memory); + auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker(); total_memory_tracker.setOvercommitTracker(global_overcommit_tracker); diff --git a/src/AggregateFunctions/AggregateFunctionArgMinMax.h b/src/AggregateFunctions/AggregateFunctionArgMinMax.h index 516d33f42de..decb572b019 100644 --- a/src/AggregateFunctions/AggregateFunctionArgMinMax.h +++ b/src/AggregateFunctions/AggregateFunctionArgMinMax.h @@ -13,6 +13,7 @@ struct Settings; namespace ErrorCodes { extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int CORRUPTED_DATA; } @@ -89,6 +90,13 @@ public: { this->data(place).result.read(buf, *serialization_res, arena); this->data(place).value.read(buf, *serialization_val, arena); + if (unlikely(this->data(place).value.has() != this->data(place).result.has())) + throw Exception( + ErrorCodes::CORRUPTED_DATA, + "Invalid state of the aggregate function {}: has_value ({}) != has_result ({})", + getName(), + this->data(place).value.has(), + this->data(place).result.has()); } bool allocatesMemoryInArena() const override diff --git a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h index 1f3c51c1c1c..6e20e91025f 100644 --- a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h +++ b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h @@ -448,6 +448,34 @@ public: }; +struct Compatibility +{ + /// Old versions used to store terminating null-character in SingleValueDataString. + /// Then -WithTerminatingZero methods were removed from IColumn interface, + /// because these methods are quite dangerous and easy to misuse. It introduced incompatibility. + /// See https://github.com/ClickHouse/ClickHouse/pull/41431 and https://github.com/ClickHouse/ClickHouse/issues/42916 + /// Here we keep these functions for compatibility. + /// It's safe because there's no way unsanitized user input (without \0 at the end) can reach these functions. + + static StringRef getDataAtWithTerminatingZero(const ColumnString & column, size_t n) + { + auto res = column.getDataAt(n); + /// ColumnString always reserves extra byte for null-character after string. + /// But getDataAt returns StringRef without the null-character. Let's add it. + chassert(res.data[res.size] == '\0'); + ++res.size; + return res; + } + + static void insertDataWithTerminatingZero(ColumnString & column, const char * pos, size_t length) + { + /// String already has terminating null-character. + /// But insertData will add another one unconditionally. Trim existing null-character to avoid duplication. + chassert(0 < length); + chassert(pos[length - 1] == '\0'); + column.insertData(pos, length - 1); + } +}; /** For strings. Short strings are stored in the object itself, and long strings are allocated separately. * NOTE It could also be suitable for arrays of numbers. @@ -477,20 +505,31 @@ public: return size >= 0; } - const char * getData() const +private: + char * getDataMutable() { return size <= MAX_SMALL_STRING_SIZE ? small_data : large_data; } + const char * getData() const + { + const char * data_ptr = size <= MAX_SMALL_STRING_SIZE ? small_data : large_data; + /// It must always be terminated with null-character + chassert(0 < size); + chassert(data_ptr[size - 1] == '\0'); + return data_ptr; + } + StringRef getStringRef() const { return StringRef(getData(), size); } +public: void insertResultInto(IColumn & to) const { if (has()) - assert_cast(to).insertData(getData(), size); + Compatibility::insertDataWithTerminatingZero(assert_cast(to), getData(), size); else assert_cast(to).insertDefault(); } @@ -502,44 +541,76 @@ public: buf.write(getData(), size); } + void allocateLargeDataIfNeeded(Int64 size_to_reserve, Arena * arena) + { + if (capacity < size_to_reserve) + { + capacity = static_cast(roundUpToPowerOfTwoOrZero(size_to_reserve)); + /// It might happen if the size was too big and the rounded value does not fit a size_t + if (unlikely(capacity < size_to_reserve)) + throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({})", size_to_reserve); + + /// Don't free large_data here. + large_data = arena->alloc(capacity); + } + } + void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena * arena) { Int32 rhs_size; readBinary(rhs_size, buf); - if (rhs_size >= 0) - { - if (rhs_size <= MAX_SMALL_STRING_SIZE) - { - /// Don't free large_data here. - - size = rhs_size; - - if (size > 0) - buf.readStrict(small_data, size); - } - else - { - if (capacity < rhs_size) - { - capacity = static_cast(roundUpToPowerOfTwoOrZero(rhs_size)); - /// It might happen if the size was too big and the rounded value does not fit a size_t - if (unlikely(capacity < rhs_size)) - throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({})", rhs_size); - - /// Don't free large_data here. - large_data = arena->alloc(capacity); - } - - size = rhs_size; - buf.readStrict(large_data, size); - } - } - else + if (rhs_size < 0) { /// Don't free large_data here. size = rhs_size; + return; } + + if (rhs_size <= MAX_SMALL_STRING_SIZE) + { + /// Don't free large_data here. + + size = rhs_size; + + if (size > 0) + buf.readStrict(small_data, size); + } + else + { + /// Reserve one byte more for null-character + Int64 rhs_size_to_reserve = rhs_size; + rhs_size_to_reserve += 1; /// Avoid overflow + allocateLargeDataIfNeeded(rhs_size_to_reserve, arena); + size = rhs_size; + buf.readStrict(large_data, size); + } + + /// Check if the string we read is null-terminated (getDataMutable does not have the assertion) + if (0 < size && getDataMutable()[size - 1] == '\0') + return; + + /// It's not null-terminated, but it must be (for historical reasons). There are two variants: + /// - The value was serialized by one of the incompatible versions of ClickHouse. We had some range of versions + /// that used to serialize SingleValueDataString without terminating '\0'. Let's just append it. + /// - An attacker sent crafted data. Sanitize it and append '\0'. + /// In all other cases the string must be already null-terminated. + + /// NOTE We cannot add '\0' unconditionally, because it will be duplicated. + /// NOTE It's possible that a string that actually ends with '\0' was written by one of the incompatible versions. + /// Unfortunately, we cannot distinguish it from normal string written by normal version. + /// So such strings will be trimmed. + + if (size == MAX_SMALL_STRING_SIZE) + { + /// Special case: We have to move value to large_data + allocateLargeDataIfNeeded(size + 1, arena); + memcpy(large_data, small_data, size); + } + + /// We have enough space to append + ++size; + getDataMutable()[size - 1] = '\0'; } /// Assuming to.has() @@ -557,13 +628,7 @@ public: } else { - if (capacity < value_size) - { - /// Don't free large_data here. - capacity = static_cast(roundUpToPowerOfTwoOrZero(value_size)); - large_data = arena->alloc(capacity); - } - + allocateLargeDataIfNeeded(value_size, arena); size = value_size; memcpy(large_data, value.data, size); } @@ -571,7 +636,7 @@ public: void change(const IColumn & column, size_t row_num, Arena * arena) { - changeImpl(assert_cast(column).getDataAt(row_num), arena); + changeImpl(Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num), arena); } void change(const Self & to, Arena * arena) @@ -620,7 +685,7 @@ public: bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena) { - if (!has() || assert_cast(column).getDataAt(row_num) < getStringRef()) + if (!has() || Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num) < getStringRef()) { change(column, row_num, arena); return true; @@ -642,7 +707,7 @@ public: bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) { - if (!has() || assert_cast(column).getDataAt(row_num) > getStringRef()) + if (!has() || Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num) > getStringRef()) { change(column, row_num, arena); return true; @@ -669,7 +734,7 @@ public: bool isEqualTo(const IColumn & column, size_t row_num) const { - return has() && assert_cast(column).getDataAt(row_num) == getStringRef(); + return has() && Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num) == getStringRef(); } static bool allocatesMemoryInArena() diff --git a/src/Common/EventRateMeter.h b/src/Common/EventRateMeter.h index f70258faa9e..3a21a80ce8b 100644 --- a/src/Common/EventRateMeter.h +++ b/src/Common/EventRateMeter.h @@ -27,6 +27,14 @@ public: /// NOTE: Adding events into distant past (further than `period`) must be avoided. void add(double now, double count) { + // Remove data for initial heating stage that can present at the beginning of a query. + // Otherwise it leads to wrong gradual increase of average value, turning algorithm into not very reactive. + if (count != 0.0 && ++data_points < 5) + { + start = events.time; + events = ExponentiallySmoothedAverage(); + } + if (now - period <= start) // precise counting mode events = ExponentiallySmoothedAverage(events.value + count, now); else // exponential smoothing mode @@ -51,6 +59,7 @@ public: { start = now; events = ExponentiallySmoothedAverage(); + data_points = 0; } private: @@ -58,6 +67,7 @@ private: const double half_decay_time; double start; // Instant in past without events before it; when measurement started or reset ExponentiallySmoothedAverage events; // Estimated number of events in the last `period` + size_t data_points = 0; }; } diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index 399ccecf000..35231354651 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -63,11 +64,18 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool ErrorCodes::increment(code, remote, msg, trace); } -Exception::Exception(const std::string & msg, int code, bool remote_) - : Poco::Exception(msg, code) +Exception::MessageMasked::MessageMasked(const std::string & msg_) + : msg(msg_) +{ + if (auto * masker = SensitiveDataMasker::getInstance()) + masker->wipeSensitiveData(msg); +} + +Exception::Exception(const MessageMasked & msg_masked, int code, bool remote_) + : Poco::Exception(msg_masked.msg, code) , remote(remote_) { - handle_error_code(msg, code, remote, getStackFramePointers()); + handle_error_code(msg_masked.msg, code, remote, getStackFramePointers()); } Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc) diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 62121cc22e1..0f459a887f1 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -27,7 +27,19 @@ public: using FramePointers = std::vector; Exception() = default; - Exception(const std::string & msg, int code, bool remote_ = false); + + // used to remove the sensitive information from exceptions if query_masking_rules is configured + struct MessageMasked + { + std::string msg; + MessageMasked(const std::string & msg_); + }; + + Exception(const MessageMasked & msg_masked, int code, bool remote_); + + // delegating constructor to mask sensitive information from the message + Exception(const std::string & msg, int code, bool remote_ = false): Exception(MessageMasked(msg), code, remote_) + {} Exception(int code, const std::string & message) : Exception(message, code) @@ -54,12 +66,17 @@ public: template void addMessage(fmt::format_string format, Args &&... args) { - extendedMessage(fmt::format(format, std::forward(args)...)); + addMessage(fmt::format(format, std::forward(args)...)); } void addMessage(const std::string& message) { - extendedMessage(message); + addMessage(MessageMasked(message)); + } + + void addMessage(const MessageMasked & msg_masked) + { + extendedMessage(msg_masked.msg); } /// Used to distinguish local exceptions from the one that was received from remote node. diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index b530410ec63..f556b255fc2 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -220,7 +220,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT Int64 limit_to_check = current_hard_limit; #if USE_JEMALLOC - if (level == VariableContext::Global) + if (level == VariableContext::Global && allow_use_jemalloc_memory.load(std::memory_order_relaxed)) { /// Jemalloc arenas may keep some extra memory. /// This memory was substucted from RSS to decrease memory drift. diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 2d898935dcf..f6113d31423 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -55,6 +55,7 @@ private: std::atomic soft_limit {0}; std::atomic hard_limit {0}; std::atomic profiler_limit {0}; + std::atomic_bool allow_use_jemalloc_memory {true}; static std::atomic free_memory_in_allocator_arenas; @@ -125,6 +126,10 @@ public: { return soft_limit.load(std::memory_order_relaxed); } + void setAllowUseJemallocMemory(bool value) + { + allow_use_jemalloc_memory.store(value, std::memory_order_relaxed); + } /** Set limit if it was not set. * Otherwise, set limit to new value, if new value is greater than previous limit. diff --git a/src/Common/ProgressIndication.h b/src/Common/ProgressIndication.h index 294b7c9a493..717de5debb9 100644 --- a/src/Common/ProgressIndication.h +++ b/src/Common/ProgressIndication.h @@ -90,7 +90,7 @@ private: bool write_progress_on_update = false; - EventRateMeter cpu_usage_meter{static_cast(clock_gettime_ns()), 3'000'000'000 /*ns*/}; // average cpu utilization last 3 second + EventRateMeter cpu_usage_meter{static_cast(clock_gettime_ns()), 2'000'000'000 /*ns*/}; // average cpu utilization last 2 second HostToThreadTimesMap thread_data; /// In case of all of the above: /// - clickhouse-local diff --git a/src/Disks/IO/createReadBufferFromFileBase.cpp b/src/Disks/IO/createReadBufferFromFileBase.cpp index 98da89f81ed..b274786f162 100644 --- a/src/Disks/IO/createReadBufferFromFileBase.cpp +++ b/src/Disks/IO/createReadBufferFromFileBase.cpp @@ -42,7 +42,7 @@ std::unique_ptr createReadBufferFromFileBase( if (read_hint.has_value()) estimated_size = *read_hint; else if (file_size.has_value()) - estimated_size = file_size.has_value() ? *file_size : 0; + estimated_size = *file_size; if (!existing_memory && settings.local_fs_method == LocalFSReadMethod::mmap @@ -158,7 +158,15 @@ std::unique_ptr createReadBufferFromFileBase( #endif ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary); - return create(settings.local_fs_buffer_size, flags); + + size_t buffer_size = settings.local_fs_buffer_size; + /// Check if the buffer can be smaller than default + if (read_hint.has_value() && *read_hint > 0 && *read_hint < buffer_size) + buffer_size = *read_hint; + if (file_size.has_value() && *file_size < buffer_size) + buffer_size = *file_size; + + return create(buffer_size, flags); } } diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp index 010fc103254..ce5171fedee 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -62,7 +63,7 @@ UnlinkFileOperation::UnlinkFileOperation(const std::string & path_, IDisk & disk void UnlinkFileOperation::execute(std::unique_lock &) { - auto buf = disk.readFile(path); + auto buf = disk.readFile(path, ReadSettings{}, std::nullopt, disk.getFileSize(path)); readStringUntilEOF(prev_data, *buf); disk.removeFile(path); } diff --git a/src/Functions/filesystem.cpp b/src/Functions/filesystem.cpp index 12813c3d852..7af1c61d3b8 100644 --- a/src/Functions/filesystem.cpp +++ b/src/Functions/filesystem.cpp @@ -1,31 +1,40 @@ -#include -#include +#include +#include #include +#include +#include +#include #include -#include #include namespace DB { +namespace ErrorCodes +{ + extern const int ILLEGAL_COLUMN; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int UNKNOWN_DISK; +} namespace { struct FilesystemAvailable { static constexpr auto name = "filesystemAvailable"; - static std::uintmax_t get(const std::filesystem::space_info & spaceinfo) { return spaceinfo.available; } + static std::uintmax_t get(const DiskPtr & disk) { return disk->getAvailableSpace(); } }; -struct FilesystemFree +struct FilesystemUnreserved { - static constexpr auto name = "filesystemFree"; - static std::uintmax_t get(const std::filesystem::space_info & spaceinfo) { return spaceinfo.free; } + static constexpr auto name = "filesystemUnreserved"; + static std::uintmax_t get(const DiskPtr & disk) { return disk->getUnreservedSpace(); } }; struct FilesystemCapacity { static constexpr auto name = "filesystemCapacity"; - static std::uintmax_t get(const std::filesystem::space_info & spaceinfo) { return spaceinfo.capacity; } + static std::uintmax_t get(const DiskPtr & disk) { return disk->getTotalSpace(); } }; template @@ -34,34 +43,72 @@ class FilesystemImpl : public IFunction public: static constexpr auto name = Impl::name; - static FunctionPtr create(ContextPtr context) - { - return std::make_shared>(std::filesystem::space(context->getPath())); - } + static FunctionPtr create(ContextPtr context_) { return std::make_shared>(context_); } + + explicit FilesystemImpl(ContextPtr context_) : context(context_) { } + + bool useDefaultImplementationForConstants() const override { return true; } bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; } - explicit FilesystemImpl(std::filesystem::space_info spaceinfo_) : spaceinfo(spaceinfo_) { } - String getName() const override { return name; } + + bool isVariadic() const override { return true; } + size_t getNumberOfArguments() const override { return 0; } bool isDeterministic() const override { return false; } - DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override + DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override { + if (arguments.size() > 1) + { + throw Exception("Arguments size of function " + getName() + " should be 0 or 1", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + } + if (arguments.size() == 1 && !isStringOrFixedString(arguments[0])) + { + throw Exception( + "Arguments of function " + getName() + " should be String or FixedString", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } return std::make_shared(); } - ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override { - return DataTypeUInt64().createColumnConst(input_rows_count, static_cast(Impl::get(spaceinfo))); + if (arguments.empty()) + { + auto disk = context->getDisk("default"); + return DataTypeUInt64().createColumnConst(input_rows_count, Impl::get(disk)); + } + else + { + auto col = arguments[0].column; + if (const ColumnString * col_str = checkAndGetColumn(col.get())) + { + auto disk_map = context->getDisksMap(); + + auto col_res = ColumnVector::create(col_str->size()); + auto & data = col_res->getData(); + for (size_t i = 0; i < col_str->size(); ++i) + { + auto disk_name = col_str->getDataAt(i).toString(); + if (auto it = disk_map.find(disk_name); it != disk_map.end()) + data[i] = Impl::get(it->second); + else + throw Exception( + "Unknown disk name " + disk_name + " while execute function " + getName(), ErrorCodes::UNKNOWN_DISK); + } + return col_res; + } + throw Exception( + "Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN); + } } private: - std::filesystem::space_info spaceinfo; + ContextPtr context; }; } @@ -70,7 +117,7 @@ REGISTER_FUNCTION(Filesystem) { factory.registerFunction>(); factory.registerFunction>(); - factory.registerFunction>(); + factory.registerFunction>(); } } diff --git a/src/Functions/if.cpp b/src/Functions/if.cpp index f3401713834..0baf64c83d9 100644 --- a/src/Functions/if.cpp +++ b/src/Functions/if.cpp @@ -1016,6 +1016,7 @@ public: size_t getNumberOfArguments() const override { return 3; } bool useDefaultImplementationForNulls() const override { return false; } + bool useDefaultImplementationForNothing() const override { return false; } bool isShortCircuit(ShortCircuitSettings & settings, size_t /*number_of_arguments*/) const override { settings.enable_lazy_execution_for_first_argument = false; diff --git a/src/Functions/multiIf.cpp b/src/Functions/multiIf.cpp index 6fc722e32f4..f658528a2a7 100644 --- a/src/Functions/multiIf.cpp +++ b/src/Functions/multiIf.cpp @@ -50,6 +50,7 @@ public: bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; } size_t getNumberOfArguments() const override { return 0; } bool useDefaultImplementationForNulls() const override { return false; } + bool useDefaultImplementationForNothing() const override { return false; } ColumnNumbers getArgumentsThatDontImplyNullableReturnType(size_t number_of_arguments) const override { diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index c3f069498be..525d76d0f0f 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -315,7 +315,9 @@ MergeTreeReadTaskColumns getReadTaskColumns( /// 1. Columns for row level filter if (prewhere_info->row_level_filter) { - Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames(); + Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames(); + injectRequiredColumns( + data_part_info_for_reader, storage_snapshot, with_subcolumns, row_filter_column_names); result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, row_filter_column_names)); pre_name_set.insert(row_filter_column_names.begin(), row_filter_column_names.end()); } @@ -323,7 +325,7 @@ MergeTreeReadTaskColumns getReadTaskColumns( /// 2. Columns for prewhere Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames(); - const auto injected_pre_columns = injectRequiredColumns( + injectRequiredColumns( data_part_info_for_reader, storage_snapshot, with_subcolumns, all_pre_column_names); for (const auto & name : all_pre_column_names) diff --git a/src/Storages/StorageDelta.cpp b/src/Storages/StorageDeltaLake.cpp similarity index 80% rename from src/Storages/StorageDelta.cpp rename to src/Storages/StorageDeltaLake.cpp index c077b24c610..479a11b5eb4 100644 --- a/src/Storages/StorageDelta.cpp +++ b/src/Storages/StorageDeltaLake.cpp @@ -1,7 +1,7 @@ #include "config.h" #if USE_AWS_S3 -#include +#include #include #include @@ -47,7 +47,7 @@ void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid table metadata, tried to remove {} before adding it", filename); } -std::vector DeltaLakeMetadata::ListCurrentFiles() && +std::vector DeltaLakeMetadata::listCurrentFiles() && { std::vector keys; keys.reserve(file_update_time.size()); @@ -61,10 +61,10 @@ std::vector DeltaLakeMetadata::ListCurrentFiles() && JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context) : base_configuration(configuration_), table_path(table_path_) { - Init(context); + init(context); } -void JsonMetadataGetter::Init(ContextPtr context) +void JsonMetadataGetter::init(ContextPtr context) { auto keys = getJsonLogFiles(); @@ -180,7 +180,53 @@ void JsonMetadataGetter::handleJSON(const JSON & json) } } -StorageDelta::StorageDelta( +namespace +{ + +StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration) +{ + return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers}; +} + +// DeltaLake stores data in parts in different files +// keys is vector of parts with latest version +// generateQueryFromKeys constructs query from parts filenames for +// underlying StorageS3 engine +String generateQueryFromKeys(const std::vector & keys) +{ + std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ",")); + return new_query; +} + + +StorageS3Configuration getAdjustedS3Configuration( + const ContextPtr & context, + StorageS3::S3Configuration & base_configuration, + const StorageS3Configuration & configuration, + const std::string & table_path, + Poco::Logger * log) +{ + JsonMetadataGetter getter{base_configuration, table_path, context}; + + auto keys = getter.getFiles(); + auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys); + + LOG_DEBUG(log, "New uri: {}", new_uri); + LOG_DEBUG(log, "Table path: {}", table_path); + + // set new url in configuration + StorageS3Configuration new_configuration; + new_configuration.url = new_uri; + new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id; + new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key; + new_configuration.format = configuration.format; + + return new_configuration; +} + +} + +StorageDeltaLake::StorageDeltaLake( const StorageS3Configuration & configuration_, const StorageID & table_id_, ColumnsDescription columns_, @@ -189,28 +235,14 @@ StorageDelta::StorageDelta( ContextPtr context_, std::optional format_settings_) : IStorage(table_id_) - , base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers} + , base_configuration{getBaseConfiguration(configuration_)} , log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")")) , table_path(base_configuration.uri.key) { StorageInMemoryMetadata storage_metadata; StorageS3::updateS3Configuration(context_, base_configuration); - JsonMetadataGetter getter{base_configuration, table_path, context_}; - - auto keys = getter.getFiles(); - auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys)); - - LOG_DEBUG(log, "New uri: {}", new_uri); - LOG_DEBUG(log, "Table path: {}", table_path); - - // set new url in configuration - StorageS3Configuration new_configuration; - new_configuration.url = new_uri; - new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id; - new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key; - new_configuration.format = configuration_.format; - + auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, configuration_, table_path, log); if (columns_.empty()) { @@ -238,7 +270,7 @@ StorageDelta::StorageDelta( nullptr); } -Pipe StorageDelta::read( +Pipe StorageDeltaLake::read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, @@ -252,16 +284,18 @@ Pipe StorageDelta::read( return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); } -String StorageDelta::generateQueryFromKeys(std::vector && keys) +ColumnsDescription StorageDeltaLake::getTableStructureFromData( + const StorageS3Configuration & configuration, const std::optional & format_settings, ContextPtr ctx) { - // DeltaLake store data parts in different files - // keys are filenames of parts - // for StorageS3 to read all parts we need format {key1,key2,key3,...keyn} - std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ",")); - return new_query; + auto base_configuration = getBaseConfiguration(configuration); + StorageS3::updateS3Configuration(ctx, base_configuration); + auto new_configuration = getAdjustedS3Configuration( + ctx, base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake")); + return StorageS3::getTableStructureFromData( + new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr); } -void registerStorageDelta(StorageFactory & factory) +void registerStorageDeltaLake(StorageFactory & factory) { factory.registerStorage( "DeltaLake", @@ -287,7 +321,7 @@ void registerStorageDelta(StorageFactory & factory) configuration.format = "Parquet"; } - return std::make_shared( + return std::make_shared( configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), std::nullopt); }, { diff --git a/src/Storages/StorageDelta.h b/src/Storages/StorageDeltaLake.h similarity index 83% rename from src/Storages/StorageDelta.h rename to src/Storages/StorageDeltaLake.h index e3bb4c0b416..5915d498a9f 100644 --- a/src/Storages/StorageDelta.h +++ b/src/Storages/StorageDeltaLake.h @@ -32,7 +32,7 @@ public: void setLastModifiedTime(const String & filename, uint64_t timestamp); void remove(const String & filename, uint64_t timestamp); - std::vector ListCurrentFiles() &&; + std::vector listCurrentFiles() &&; private: std::unordered_map file_update_time; @@ -44,10 +44,10 @@ class JsonMetadataGetter public: JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context); - std::vector getFiles() { return std::move(metadata).ListCurrentFiles(); } + std::vector getFiles() { return std::move(metadata).listCurrentFiles(); } private: - void Init(ContextPtr context); + void init(ContextPtr context); std::vector getJsonLogFiles(); @@ -60,13 +60,13 @@ private: DeltaLakeMetadata metadata; }; -class StorageDelta : public IStorage +class StorageDeltaLake : public IStorage { public: // 1. Parses internal file structure of table // 2. Finds out parts with latest version // 3. Creates url for underlying StorageS3 enigne to handle reads - StorageDelta( + StorageDeltaLake( const StorageS3Configuration & configuration_, const StorageID & table_id_, ColumnsDescription columns_, @@ -87,14 +87,12 @@ public: size_t max_block_size, size_t num_streams) override; + static ColumnsDescription getTableStructureFromData( + const StorageS3Configuration & configuration, + const std::optional & format_settings, + ContextPtr ctx); private: - void Init(); - - // DeltaLake stores data in parts in different files - // keys is vector of parts with latest version - // generateQueryFromKeys constructs query from parts filenames for - // underlying StorageS3 engine - static String generateQueryFromKeys(std::vector && keys); + void init(); StorageS3::S3Configuration base_configuration; std::shared_ptr s3engine; diff --git a/src/Storages/StorageHudi.cpp b/src/Storages/StorageHudi.cpp index 4b20e4cbd22..d5675ceb17c 100644 --- a/src/Storages/StorageHudi.cpp +++ b/src/Storages/StorageHudi.cpp @@ -28,115 +28,20 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -StorageHudi::StorageHudi( - const StorageS3Configuration & configuration_, - const StorageID & table_id_, - ColumnsDescription columns_, - const ConstraintsDescription & constraints_, - const String & comment, - ContextPtr context_, - std::optional format_settings_) - : IStorage(table_id_) - , base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers} - , log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")")) - , table_path(base_configuration.uri.key) +namespace { - StorageInMemoryMetadata storage_metadata; - StorageS3::updateS3Configuration(context_, base_configuration); - auto keys = getKeysFromS3(); - auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration_.format); - - LOG_DEBUG(log, "New uri: {}", new_uri); - LOG_DEBUG(log, "Table path: {}", table_path); - - StorageS3Configuration new_configuration; - new_configuration.url = new_uri; - new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id; - new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key; - new_configuration.format = configuration_.format; - - if (columns_.empty()) - { - columns_ = StorageS3::getTableStructureFromData( - new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr); - storage_metadata.setColumns(columns_); - } - else - storage_metadata.setColumns(columns_); - - storage_metadata.setConstraints(constraints_); - storage_metadata.setComment(comment); - setInMemoryMetadata(storage_metadata); - - s3engine = std::make_shared( - new_configuration, - table_id_, - columns_, - constraints_, - comment, - context_, - format_settings_, - /* distributed_processing_ */ false, - nullptr); +StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration) +{ + return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers}; } -Pipe StorageHudi::read( - const Names & column_names, - const StorageSnapshotPtr & storage_snapshot, - SelectQueryInfo & query_info, - ContextPtr context, - QueryProcessingStage::Enum processed_stage, - size_t max_block_size, - size_t num_streams) -{ - StorageS3::updateS3Configuration(context, base_configuration); - return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); -} - -std::vector StorageHudi::getKeysFromS3() -{ - std::vector keys; - - const auto & client = base_configuration.client; - - Aws::S3::Model::ListObjectsV2Request request; - Aws::S3::Model::ListObjectsV2Outcome outcome; - - bool is_finished{false}; - const auto bucket{base_configuration.uri.bucket}; - - request.SetBucket(bucket); - request.SetPrefix(table_path); - - while (!is_finished) - { - outcome = client->ListObjectsV2(request); - if (!outcome.IsSuccess()) - throw Exception( - ErrorCodes::S3_ERROR, - "Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}", - quoteString(bucket), - quoteString(table_path), - backQuote(outcome.GetError().GetExceptionName()), - quoteString(outcome.GetError().GetMessage())); - - const auto & result_batch = outcome.GetResult().GetContents(); - for (const auto & obj : result_batch) - { - const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix. - keys.push_back(filename); - LOG_DEBUG(log, "Found file: {}", filename); - } - - request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); - is_finished = !outcome.GetResult().GetIsTruncated(); - } - - return keys; -} - -String StorageHudi::generateQueryFromKeys(const std::vector & keys, const String & format) +/// Apache Hudi store parts of data in different files. +/// Every part file has timestamp in it. +/// Every partition(directory) in Apache Hudi has different versions of part. +/// To find needed parts we need to find out latest part file for every partition. +/// Part format is usually parquet, but can differ. +String generateQueryFromKeys(const std::vector & keys, const String & format) { /// For each partition path take only latest file. struct FileInfo @@ -187,6 +92,138 @@ String StorageHudi::generateQueryFromKeys(const std::vector & keys, return "{" + list_of_keys + "}"; } +std::vector getKeysFromS3(const StorageS3::S3Configuration & base_configuration, const std::string & table_path, Poco::Logger * log) +{ + std::vector keys; + + const auto & client = base_configuration.client; + + Aws::S3::Model::ListObjectsV2Request request; + Aws::S3::Model::ListObjectsV2Outcome outcome; + + bool is_finished{false}; + const auto bucket{base_configuration.uri.bucket}; + + request.SetBucket(bucket); + request.SetPrefix(table_path); + + while (!is_finished) + { + outcome = client->ListObjectsV2(request); + if (!outcome.IsSuccess()) + throw Exception( + ErrorCodes::S3_ERROR, + "Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}", + quoteString(bucket), + quoteString(table_path), + backQuote(outcome.GetError().GetExceptionName()), + quoteString(outcome.GetError().GetMessage())); + + const auto & result_batch = outcome.GetResult().GetContents(); + for (const auto & obj : result_batch) + { + const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix. + keys.push_back(filename); + LOG_DEBUG(log, "Found file: {}", filename); + } + + request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); + is_finished = !outcome.GetResult().GetIsTruncated(); + } + + return keys; +} + + +StorageS3Configuration getAdjustedS3Configuration( + StorageS3::S3Configuration & base_configuration, + const StorageS3Configuration & configuration, + const std::string & table_path, + Poco::Logger * log) +{ + auto keys = getKeysFromS3(base_configuration, table_path, log); + auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration.format); + + LOG_DEBUG(log, "New uri: {}", new_uri); + LOG_DEBUG(log, "Table path: {}", table_path); + + StorageS3Configuration new_configuration; + new_configuration.url = new_uri; + new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id; + new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key; + new_configuration.format = configuration.format; + + return new_configuration; +} + +} + +StorageHudi::StorageHudi( + const StorageS3Configuration & configuration_, + const StorageID & table_id_, + ColumnsDescription columns_, + const ConstraintsDescription & constraints_, + const String & comment, + ContextPtr context_, + std::optional format_settings_) + : IStorage(table_id_) + , base_configuration{getBaseConfiguration(configuration_)} + , log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")")) + , table_path(base_configuration.uri.key) +{ + StorageInMemoryMetadata storage_metadata; + StorageS3::updateS3Configuration(context_, base_configuration); + + auto new_configuration = getAdjustedS3Configuration(base_configuration, configuration_, table_path, log); + + if (columns_.empty()) + { + columns_ = StorageS3::getTableStructureFromData( + new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr); + storage_metadata.setColumns(columns_); + } + else + storage_metadata.setColumns(columns_); + + storage_metadata.setConstraints(constraints_); + storage_metadata.setComment(comment); + setInMemoryMetadata(storage_metadata); + + s3engine = std::make_shared( + new_configuration, + table_id_, + columns_, + constraints_, + comment, + context_, + format_settings_, + /* distributed_processing_ */ false, + nullptr); +} + +Pipe StorageHudi::read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) +{ + StorageS3::updateS3Configuration(context, base_configuration); + return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); +} + +ColumnsDescription StorageHudi::getTableStructureFromData( + const StorageS3Configuration & configuration, const std::optional & format_settings, ContextPtr ctx) +{ + auto base_configuration = getBaseConfiguration(configuration); + StorageS3::updateS3Configuration(ctx, base_configuration); + auto new_configuration = getAdjustedS3Configuration( + base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake")); + return StorageS3::getTableStructureFromData( + new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr); +} void registerStorageHudi(StorageFactory & factory) { diff --git a/src/Storages/StorageHudi.h b/src/Storages/StorageHudi.h index bebda4cd4f6..00b8c01a46d 100644 --- a/src/Storages/StorageHudi.h +++ b/src/Storages/StorageHudi.h @@ -48,16 +48,11 @@ public: size_t max_block_size, size_t num_streams) override; + static ColumnsDescription getTableStructureFromData( + const StorageS3Configuration & configuration, + const std::optional & format_settings, + ContextPtr ctx); private: - std::vector getKeysFromS3(); - - /// Apache Hudi store parts of data in different files. - /// Every part file has timestamp in it. - /// Every partition(directory) in Apache Hudi has different versions of part. - /// To find needed parts we need to find out latest part file for every partition. - /// Part format is usually parquet, but can differ. - static String generateQueryFromKeys(const std::vector & keys, const String & format); - StorageS3::S3Configuration base_configuration; std::shared_ptr s3engine; Poco::Logger * log; diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 47ac26abccb..aa558ddc0de 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -214,7 +214,7 @@ private: friend class StorageS3Cluster; friend class TableFunctionS3Cluster; friend class StorageHudi; - friend class StorageDelta; + friend class StorageDeltaLake; S3Configuration s3_configuration; std::vector keys; diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp index 200b8e637da..164b125a192 100644 --- a/src/Storages/registerStorages.cpp +++ b/src/Storages/registerStorages.cpp @@ -34,7 +34,7 @@ void registerStorageS3(StorageFactory & factory); void registerStorageCOS(StorageFactory & factory); void registerStorageOSS(StorageFactory & factory); void registerStorageHudi(StorageFactory & factory); -void registerStorageDelta(StorageFactory & factory); +void registerStorageDeltaLake(StorageFactory & factory); #endif #if USE_HDFS @@ -123,7 +123,7 @@ void registerStorages() registerStorageCOS(factory); registerStorageOSS(factory); registerStorageHudi(factory); - registerStorageDelta(factory); + registerStorageDeltaLake(factory); #endif #if USE_HDFS diff --git a/src/TableFunctions/TableFunctionDeltaLake.cpp b/src/TableFunctions/TableFunctionDeltaLake.cpp index 0f5801d57ac..aa602e59d78 100644 --- a/src/TableFunctions/TableFunctionDeltaLake.cpp +++ b/src/TableFunctions/TableFunctionDeltaLake.cpp @@ -10,7 +10,7 @@ # include # include # include -# include +# include # include # include # include @@ -27,7 +27,7 @@ namespace ErrorCodes } -void TableFunctionDelta::parseArgumentsImpl( +void TableFunctionDeltaLake::parseArgumentsImpl( const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & base_configuration) { if (args.empty() || args.size() > 6) @@ -100,7 +100,7 @@ void TableFunctionDelta::parseArgumentsImpl( = checkAndGetLiteralArgument(args[args_to_idx["secret_access_key"]], "secret_access_key"); } -void TableFunctionDelta::parseArguments(const ASTPtr & ast_function, ContextPtr context) +void TableFunctionDeltaLake::parseArguments(const ASTPtr & ast_function, ContextPtr context) { /// Parse args ASTs & args_func = ast_function->children; @@ -125,18 +125,18 @@ void TableFunctionDelta::parseArguments(const ASTPtr & ast_function, ContextPtr parseArgumentsImpl(message, args, context, configuration); } -ColumnsDescription TableFunctionDelta::getActualTableStructure(ContextPtr context) const +ColumnsDescription TableFunctionDeltaLake::getActualTableStructure(ContextPtr context) const { if (configuration.structure == "auto") { context->checkAccess(getSourceAccessType()); - return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context); + return StorageDeltaLake::getTableStructureFromData(configuration, std::nullopt, context); } return parseColumnsListFromString(configuration.structure, context); } -StoragePtr TableFunctionDelta::executeImpl( +StoragePtr TableFunctionDeltaLake::executeImpl( const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const { Poco::URI uri(configuration.url); @@ -146,7 +146,7 @@ StoragePtr TableFunctionDelta::executeImpl( if (configuration.structure != "auto") columns = parseColumnsListFromString(configuration.structure, context); - StoragePtr storage = std::make_shared( + StoragePtr storage = std::make_shared( configuration, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, String{}, context, std::nullopt); storage->startup(); @@ -155,9 +155,9 @@ StoragePtr TableFunctionDelta::executeImpl( } -void registerTableFunctionDelta(TableFunctionFactory & factory) +void registerTableFunctionDeltaLake(TableFunctionFactory & factory) { - factory.registerFunction( + factory.registerFunction( {.documentation = {R"(The table function can be used to read the DeltaLake table stored on object store.)", Documentation::Examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)"}}, diff --git a/src/TableFunctions/TableFunctionDeltaLake.h b/src/TableFunctions/TableFunctionDeltaLake.h index badfd63f431..e36ffc3847f 100644 --- a/src/TableFunctions/TableFunctionDeltaLake.h +++ b/src/TableFunctions/TableFunctionDeltaLake.h @@ -16,7 +16,7 @@ class TableFunctionS3Cluster; /* deltaLake(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary DeltaLake table on S3. */ -class TableFunctionDelta : public ITableFunction +class TableFunctionDeltaLake : public ITableFunction { public: static constexpr auto name = "deltaLake"; diff --git a/src/TableFunctions/TableFunctionHudi.cpp b/src/TableFunctions/TableFunctionHudi.cpp index 2e27d192b58..996be2359a6 100644 --- a/src/TableFunctions/TableFunctionHudi.cpp +++ b/src/TableFunctions/TableFunctionHudi.cpp @@ -130,7 +130,7 @@ ColumnsDescription TableFunctionHudi::getActualTableStructure(ContextPtr context if (configuration.structure == "auto") { context->checkAccess(getSourceAccessType()); - return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context); + return StorageHudi::getTableStructureFromData(configuration, std::nullopt, context); } return parseColumnsListFromString(configuration.structure, context); diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp index e6c32766559..95892d36b18 100644 --- a/src/TableFunctions/registerTableFunctions.cpp +++ b/src/TableFunctions/registerTableFunctions.cpp @@ -28,7 +28,7 @@ void registerTableFunctions() registerTableFunctionS3Cluster(factory); registerTableFunctionCOS(factory); registerTableFunctionHudi(factory); - registerTableFunctionDelta(factory); + registerTableFunctionDeltaLake(factory); registerTableFunctionOSS(factory); #endif diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h index 12a26bec70a..5f91205474e 100644 --- a/src/TableFunctions/registerTableFunctions.h +++ b/src/TableFunctions/registerTableFunctions.h @@ -25,7 +25,7 @@ void registerTableFunctionS3(TableFunctionFactory & factory); void registerTableFunctionS3Cluster(TableFunctionFactory & factory); void registerTableFunctionCOS(TableFunctionFactory & factory); void registerTableFunctionHudi(TableFunctionFactory & factory); -void registerTableFunctionDelta(TableFunctionFactory & factory); +void registerTableFunctionDeltaLake(TableFunctionFactory & factory); void registerTableFunctionOSS(TableFunctionFactory & factory); #endif diff --git a/tests/ci/get_robot_token.py b/tests/ci/get_robot_token.py index 4fb8cb8f49f..163e1ce071e 100644 --- a/tests/ci/get_robot_token.py +++ b/tests/ci/get_robot_token.py @@ -1,4 +1,6 @@ #!/usr/bin/env python3 +import logging + import boto3 # type: ignore from github import Github # type: ignore @@ -9,14 +11,30 @@ def get_parameter_from_ssm(name, decrypt=True, client=None): return client.get_parameter(Name=name, WithDecryption=decrypt)["Parameter"]["Value"] -def get_best_robot_token(token_prefix_env_name="github_robot_token_", total_tokens=4): +def get_best_robot_token(token_prefix_env_name="github_robot_token_"): client = boto3.client("ssm", region_name="us-east-1") - tokens = {} - for i in range(1, total_tokens + 1): - token_name = token_prefix_env_name + str(i) - token = get_parameter_from_ssm(token_name, True, client) - gh = Github(token, per_page=100) - rest, _ = gh.rate_limiting - tokens[token] = rest + parameters = client.describe_parameters( + ParameterFilters=[ + {"Key": "Name", "Option": "BeginsWith", "Values": [token_prefix_env_name]} + ] + )["Parameters"] + assert parameters + token = {"login": "", "value": "", "rest": 0} - return max(tokens.items(), key=lambda x: x[1])[0] + for token_name in [p["Name"] for p in parameters]: + value = get_parameter_from_ssm(token_name, True, client) + gh = Github(value, per_page=100) + # Do not spend additional request to API by accessin user.login unless + # the token is chosen by the remaining requests number + user = gh.get_user() + rest, _ = gh.rate_limiting + logging.info("Get token with %s remaining requests", rest) + if token["rest"] < rest: + token = {"user": user, "value": value, "rest": rest} + + assert token["value"] + logging.info( + "User %s with %s remaining requests is used", token["user"].login, token["rest"] + ) + + return token["value"] diff --git a/tests/fuzz/dictionaries/functions.dict b/tests/fuzz/dictionaries/functions.dict index e4f347babf8..e2668d7d093 100644 --- a/tests/fuzz/dictionaries/functions.dict +++ b/tests/fuzz/dictionaries/functions.dict @@ -249,7 +249,7 @@ "cosh" "basename" "evalMLMethod" -"filesystemFree" +"filesystemUnreserved" "filesystemCapacity" "reinterpretAsDate" "filesystemAvailable" diff --git a/tests/integration/test_backward_compatibility/test_functions.py b/tests/integration/test_backward_compatibility/test_functions.py index fe1c0ea7108..afb19901e74 100644 --- a/tests/integration/test_backward_compatibility/test_functions.py +++ b/tests/integration/test_backward_compatibility/test_functions.py @@ -13,7 +13,11 @@ upstream = cluster.add_instance("upstream") backward = cluster.add_instance( "backward", image="clickhouse/clickhouse-server", - tag="22.9", + # Note that a bug changed the string representation of several aggregations in 22.9 and 22.10 and some minor + # releases of 22.8, 22.7 and 22.3 + # See https://github.com/ClickHouse/ClickHouse/issues/42916 + # Affected at least: singleValueOrNull, last_value, min, max, any, anyLast, anyHeavy, first_value, argMin, argMax + tag="22.6", with_installed_binary=True, ) @@ -139,6 +143,9 @@ def test_string_functions(start_cluster): "substring", "CAST", # NOTE: no need to ignore now()/now64() since they will fail because they don't accept any argument + # 22.8 Backward Incompatible Change: Extended range of Date32 + "toDate32OrZero", + "toDate32OrDefault", ] functions = filter(lambda x: x not in excludes, functions) @@ -149,14 +156,15 @@ def test_string_functions(start_cluster): failed = 0 passed = 0 - def get_function_value(node, function_name, value="foo"): + def get_function_value(node, function_name, value): return node.query(f"select {function_name}('{value}')").strip() + v = "foo" for function in functions: - logging.info("Checking %s", function) + logging.info("Checking %s('%s')", function, v) try: - backward_value = get_function_value(backward, function) + backward_value = get_function_value(backward, function, v) except QueryRuntimeException as e: error_message = str(e) allowed_errors = [ @@ -199,11 +207,12 @@ def test_string_functions(start_cluster): failed += 1 continue - upstream_value = get_function_value(upstream, function) + upstream_value = get_function_value(upstream, function, v) if upstream_value != backward_value: - logging.info( - "Failed %s, %s (backward) != %s (upstream)", + logging.warning( + "Failed %s('%s') %s (backward) != %s (upstream)", function, + v, backward_value, upstream_value, ) diff --git a/tests/integration/test_global_overcommit_tracker/configs/global_overcommit_tracker.xml b/tests/integration/test_global_overcommit_tracker/configs/global_overcommit_tracker.xml index 6f83a570ccc..a51009542a3 100644 --- a/tests/integration/test_global_overcommit_tracker/configs/global_overcommit_tracker.xml +++ b/tests/integration/test_global_overcommit_tracker/configs/global_overcommit_tracker.xml @@ -1,3 +1,4 @@ 2000000000 + false \ No newline at end of file diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index a63244df814..3f9da071281 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -1,7 +1,6 @@ import logging import os import json - import helpers.client import pytest from helpers.cluster import ClickHouseCluster @@ -143,3 +142,25 @@ def test_select_query(started_cluster): ), ).splitlines() assert len(result) > 0 + + +def test_describe_query(started_cluster): + instance = started_cluster.instances["main_server"] + bucket = started_cluster.minio_bucket + result = instance.query( + f"DESCRIBE deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV", + ) + + assert result == TSV( + [ + ["begin_lat", "Nullable(Float64)"], + ["begin_lon", "Nullable(Float64)"], + ["driver", "Nullable(String)"], + ["end_lat", "Nullable(Float64)"], + ["end_lon", "Nullable(Float64)"], + ["fare", "Nullable(Float64)"], + ["rider", "Nullable(String)"], + ["ts", "Nullable(Int64)"], + ["uuid", "Nullable(String)"], + ] + ) diff --git a/tests/integration/test_storage_hudi/test.py b/tests/integration/test_storage_hudi/test.py index dd870aae42e..3328f859406 100644 --- a/tests/integration/test_storage_hudi/test.py +++ b/tests/integration/test_storage_hudi/test.py @@ -161,7 +161,7 @@ def test_select_query(started_cluster): result = run_query(instance, distinct_select_query) result_table_function = run_query( instance, - distinct_select_query.format( + distinct_select_table_function_query.format( ip=started_cluster.minio_ip, port=started_cluster.minio_port, bucket=bucket ), ) @@ -173,3 +173,31 @@ def test_select_query(started_cluster): assert TSV(result) == TSV(expected) assert TSV(result_table_function) == TSV(expected) + + +def test_describe_query(started_cluster): + instance = started_cluster.instances["main_server"] + bucket = started_cluster.minio_bucket + result = instance.query( + f"DESCRIBE hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV", + ) + + assert result == TSV( + [ + ["_hoodie_commit_time", "Nullable(String)"], + ["_hoodie_commit_seqno", "Nullable(String)"], + ["_hoodie_record_key", "Nullable(String)"], + ["_hoodie_partition_path", "Nullable(String)"], + ["_hoodie_file_name", "Nullable(String)"], + ["begin_lat", "Nullable(Float64)"], + ["begin_lon", "Nullable(Float64)"], + ["driver", "Nullable(String)"], + ["end_lat", "Nullable(Float64)"], + ["end_lon", "Nullable(Float64)"], + ["fare", "Nullable(Float64)"], + ["partitionpath", "Nullable(String)"], + ["rider", "Nullable(String)"], + ["ts", "Nullable(Int64)"], + ["uuid", "Nullable(String)"], + ] + ) diff --git a/tests/queries/0_stateless/00824_filesystem.sql b/tests/queries/0_stateless/00824_filesystem.sql index cd4d69a703e..c8ac9179d42 100644 --- a/tests/queries/0_stateless/00824_filesystem.sql +++ b/tests/queries/0_stateless/00824_filesystem.sql @@ -1 +1 @@ -SELECT filesystemCapacity() >= filesystemFree() AND filesystemFree() >= filesystemAvailable() AND filesystemAvailable() >= 0; +SELECT filesystemCapacity() >= filesystemAvailable() AND filesystemAvailable() >= 0 AND filesystemUnreserved() >= 0; diff --git a/tests/queries/0_stateless/00956_sensitive_data_masking.reference b/tests/queries/0_stateless/00956_sensitive_data_masking.reference index 86323ec45e8..457ab9118f1 100644 --- a/tests/queries/0_stateless/00956_sensitive_data_masking.reference +++ b/tests/queries/0_stateless/00956_sensitive_data_masking.reference @@ -1,11 +1,14 @@ 1 2 3 +3.1 4 5 5.1 6 7 +7.1 +7.2 8 9 text_log non empty diff --git a/tests/queries/0_stateless/00956_sensitive_data_masking.sh b/tests/queries/0_stateless/00956_sensitive_data_masking.sh index e36031c54be..ccd9bbcf10e 100755 --- a/tests/queries/0_stateless/00956_sensitive_data_masking.sh +++ b/tests/queries/0_stateless/00956_sensitive_data_masking.sh @@ -37,12 +37,20 @@ rm -f "$tmp_file" >/dev/null 2>&1 echo 3 # failure at before query start $CLICKHOUSE_CLIENT \ - --query="SELECT 'find_me_TOPSECRET=TOPSECRET' FROM non_existing_table FORMAT Null" \ + --query="SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" \ --log_queries=1 --ignore-error --multiquery |& grep -v '^(query: ' > "$tmp_file" grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3a' grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3b' +echo '3.1' +echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @- >"$tmp_file" 2>&1 + +grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3.1a' +grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3.1b' + +#echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | curl -sSg http://172.17.0.3:8123/ -d @- + rm -f "$tmp_file" >/dev/null 2>&1 echo 4 # failure at the end of query @@ -100,6 +108,21 @@ $CLICKHOUSE_CLIENT \ --server_logs_file=/dev/null \ --query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and query like '%TOPSECRET%';" +echo '7.1' +# query_log exceptions +$CLICKHOUSE_CLIENT \ + --server_logs_file=/dev/null \ + --query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and exception like '%TOPSECRET%'" + +echo '7.2' + +# not perfect: when run in parallel with other tests that check can give false-negative result +# because other tests can overwrite the last_error_message, where we check the absence of sensitive data. +# But it's still good enough for CI - in case of regressions it will start flapping (normally it shouldn't) +$CLICKHOUSE_CLIENT \ + --server_logs_file=/dev/null \ + --query="select * from system.errors where last_error_message like '%TOPSECRET%';" + rm -f "$tmp_file" >/dev/null 2>&1 echo 8 diff --git a/tests/queries/0_stateless/02345_filesystem_local.sh b/tests/queries/0_stateless/02345_filesystem_local.sh index 6771df2ae2d..aac66f9f7b9 100755 --- a/tests/queries/0_stateless/02345_filesystem_local.sh +++ b/tests/queries/0_stateless/02345_filesystem_local.sh @@ -5,4 +5,4 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CUR_DIR"/../shell_config.sh # Checks that these functions are working inside clickhouse-local. Does not check specific values. -$CLICKHOUSE_LOCAL --query "SELECT filesystemAvailable() > 0, filesystemFree() <= filesystemCapacity()" +$CLICKHOUSE_LOCAL --query "SELECT filesystemAvailable() > 0, filesystemUnreserved() <= filesystemCapacity()" diff --git a/tests/queries/0_stateless/02415_all_new_functions_must_be_documented.reference b/tests/queries/0_stateless/02415_all_new_functions_must_be_documented.reference index 040a8c8d317..34180020680 100644 --- a/tests/queries/0_stateless/02415_all_new_functions_must_be_documented.reference +++ b/tests/queries/0_stateless/02415_all_new_functions_must_be_documented.reference @@ -322,7 +322,7 @@ farmHash64 file filesystemAvailable filesystemCapacity -filesystemFree +filesystemUnreserved finalizeAggregation firstSignificantSubdomainCustom firstSignificantSubdomainCustomRFC diff --git a/tests/queries/0_stateless/02457_filesystem_function.reference b/tests/queries/0_stateless/02457_filesystem_function.reference new file mode 100644 index 00000000000..6ed281c757a --- /dev/null +++ b/tests/queries/0_stateless/02457_filesystem_function.reference @@ -0,0 +1,2 @@ +1 +1 diff --git a/tests/queries/0_stateless/02457_filesystem_function.sql b/tests/queries/0_stateless/02457_filesystem_function.sql new file mode 100644 index 00000000000..d8322bc65b5 --- /dev/null +++ b/tests/queries/0_stateless/02457_filesystem_function.sql @@ -0,0 +1,6 @@ +-- Tags: no-fasttest + +select filesystemCapacity('s3_disk') >= filesystemAvailable('s3_disk') and filesystemAvailable('s3_disk') >= filesystemUnreserved('s3_disk'); +select filesystemCapacity('default') >= filesystemAvailable('default') and filesystemAvailable('default') >= 0 and filesystemUnreserved('default') >= 0; + +select filesystemCapacity('__un_exists_disk'); -- { serverError UNKNOWN_DISK } diff --git a/tests/queries/0_stateless/02477_single_value_data_string_regression.reference b/tests/queries/0_stateless/02477_single_value_data_string_regression.reference new file mode 100644 index 00000000000..e89b8ff7d99 --- /dev/null +++ b/tests/queries/0_stateless/02477_single_value_data_string_regression.reference @@ -0,0 +1,25 @@ +1 +22.8.5.29 10 +22.8.6.71 10 +1 +22.8.5.29 52 +22.8.6.71 52 +1 +22.8.5.29 0 +22.8.6.71 0 +46_OK 0123456789012345678901234567890123456789012345 +46_KO 0123456789012345678901234567890123456789012345 +47_OK 01234567890123456789012345678901234567890123456 +47_KO 01234567890123456789012345678901234567890123456 +48_OK 012345678901234567890123456789012345678901234567 +48_KO 012345678901234567890123456789012345678901234567 +63_OK 012345678901234567890123456789012345678901234567890123456789012 +63_KO 012345678901234567890123456789012345678901234567890123456789012 +64_OK 0123456789012345678901234567890123456789012345678901234567890123 +64_KO 0123456789012345678901234567890123456789012345678901234567890123 +-1 0 +-2 0 +-2^31 0 +1M without 0 1048576 +1M with 0 1048575 +fuzz2 0123 4 diff --git a/tests/queries/0_stateless/02477_single_value_data_string_regression.sql b/tests/queries/0_stateless/02477_single_value_data_string_regression.sql new file mode 100644 index 00000000000..c8030733e34 --- /dev/null +++ b/tests/queries/0_stateless/02477_single_value_data_string_regression.sql @@ -0,0 +1,109 @@ + +-- Context: https://github.com/ClickHouse/ClickHouse/issues/42916 + +-- STRING WITH 10 CHARACTERS +-- SELECT version() AS v, hex(argMaxState('0123456789', number)) AS state FROM numbers(1) FORMAT CSV + +CREATE TABLE argmaxstate_hex_small +( + `v` String, + `state` String +) +ENGINE = TinyLog; + +INSERT into argmaxstate_hex_small VALUES ('22.8.5.29','0B0000003031323334353637383900010000000000000000'), ('22.8.6.71','0A00000030313233343536373839010000000000000000'); + +-- Assert that the current version will write the same as 22.8.5 (last known good 22.8 minor) +SELECT + (SELECT hex(argMaxState('0123456789', number)) FROM numbers(1)) = state +FROM argmaxstate_hex_small +WHERE v = '22.8.5.29'; + +-- Assert that the current version can read correctly both the old and the regression states +SELECT + v, + length(finalizeAggregation(CAST(unhex(state) AS AggregateFunction(argMax, String, UInt64)))) +FROM argmaxstate_hex_small; + +-- STRING WITH 54 characters +-- SELECT version() AS v, hex(argMaxState('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz', number)) AS state FROM numbers(1) FORMAT CSV +CREATE TABLE argmaxstate_hex_large +( + `v` String, + `state` String +) +ENGINE = TinyLog; + +INSERT into argmaxstate_hex_large VALUES ('22.8.5.29','350000004142434445464748494A4B4C4D4E4F505152535455565758595A6162636465666768696A6B6C6D6E6F707172737475767778797A00010000000000000000'), ('22.8.6.71','340000004142434445464748494A4B4C4D4E4F505152535455565758595A6162636465666768696A6B6C6D6E6F707172737475767778797A010000000000000000'); + +SELECT + (SELECT hex(argMaxState('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz', number)) FROM numbers(1)) = state +FROM argmaxstate_hex_large +WHERE v = '22.8.5.29'; + +SELECT + v, + length(finalizeAggregation(CAST(unhex(state) AS AggregateFunction(argMax, String, UInt64)))) +FROM argmaxstate_hex_large; + +-- STRING WITH 0 characters +-- SELECT version() AS v, hex(argMaxState('', number)) AS state FROM numbers(1) FORMAT CSV +CREATE TABLE argmaxstate_hex_empty +( + `v` String, + `state` String +) +ENGINE = TinyLog; + +INSERT into argmaxstate_hex_empty VALUES ('22.8.5.29','0100000000010000000000000000'), ('22.8.6.71','00000000010000000000000000'); + +SELECT + (SELECT hex(argMaxState('', number)) FROM numbers(1)) = state +FROM argmaxstate_hex_empty +WHERE v = '22.8.5.29'; + +SELECT v, length(finalizeAggregation(CAST(unhex(state) AS AggregateFunction(argMax, String, UInt64)))) +FROM argmaxstate_hex_empty; + +-- Right in the border of small and large buffers +-- SELECT hex(argMaxState('0123456789012345678901234567890123456789012345' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV +SELECT '46_OK', finalizeAggregation(CAST(unhex('2F0000003031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343500010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); +SELECT '46_KO', finalizeAggregation(CAST(unhex('2E00000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); + +-- SELECT hex(argMaxState('01234567890123456789012345678901234567890123456' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV +SELECT '47_OK', finalizeAggregation(CAST(unhex('30000000303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); +SELECT '47_KO', finalizeAggregation(CAST(unhex('2F0000003031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); + +-- SELECT hex(argMaxState('012345678901234567890123456789012345678901234567' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV +SELECT '48_OK', finalizeAggregation(CAST(unhex('3100000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363700010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); +SELECT '48_KO', finalizeAggregation(CAST(unhex('30000000303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); + +-- Right in the allocation limit (power of 2) +-- SELECT hex(argMaxState('012345678901234567890123456789012345678901234567890123456789012' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV +SELECT '63_OK', finalizeAggregation(CAST(unhex('4000000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313200010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); +SELECT '63_KO', finalizeAggregation(CAST(unhex('3F000000303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); +-- SELECT hex(argMaxState('0123456789012345678901234567890123456789012345678901234567890123' as a, number)) AS state, length(a) FROM numbers(1) FORMAT CSV +SELECT '64_OK', finalizeAggregation(CAST(unhex('410000003031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323300010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); +SELECT '64_KO', finalizeAggregation(CAST(unhex('4000000030313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); + +SELECT '-1', maxMerge(x), length(maxMerge(x)) from (select CAST(unhex('ffffffff') || randomString(100500), 'AggregateFunction(max, String)') as x); +SELECT '-2', maxMerge(x), length(maxMerge(x)) from (select CAST(unhex('fffffffe') || randomString(100500), 'AggregateFunction(max, String)') as x); +SELECT '-2^31', maxMerge(x), length(maxMerge(x)) from (select CAST(unhex('00000080') || randomString(100500), 'AggregateFunction(max, String)') as x); + +SELECT '2^31-2', maxMerge(x) from (select CAST(unhex('feffff7f') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE } +SELECT '2^31-1', maxMerge(x) from (select CAST(unhex('ffffff7f') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE } + +SELECT '2^30', maxMerge(x) from (select CAST(unhex('00000040') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE } +SELECT '2^30+1', maxMerge(x) from (select CAST(unhex('01000040') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError TOO_LARGE_STRING_SIZE } + +SELECT '2^30-1', maxMerge(x) from (select CAST(unhex('ffffff3f') || randomString(100500), 'AggregateFunction(max, String)') as x); -- { serverError CANNOT_READ_ALL_DATA } +-- The following query works, but it's too long and consumes to much memory +-- SELECT '2^30-1', length(maxMerge(x)) from (select CAST(unhex('ffffff3f') || randomString(0x3FFFFFFF - 1) || 'x', 'AggregateFunction(max, String)') as x); +SELECT '1M without 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || 'x', 'AggregateFunction(max, String)') as x); +SELECT '1M with 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || '\0', 'AggregateFunction(max, String)') as x); + +SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError CORRUPTED_DATA } +SELECT 'fuzz2', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '01' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); +SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA } +SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA } +SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError CORRUPTED_DATA } diff --git a/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.reference b/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.reference new file mode 100644 index 00000000000..c8e17be819a --- /dev/null +++ b/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.reference @@ -0,0 +1,16 @@ +-- { echoOn } + +SELECT a, c FROM test_rlp WHERE c%2 == 0 AND b < 5; +0 10 +2 12 +4 14 +DROP POLICY IF EXISTS test_rlp_policy ON test_rlp; +CREATE ROW POLICY test_rlp_policy ON test_rlp FOR SELECT USING c%2 == 0 TO default; +SELECT a, c FROM test_rlp WHERE b < 5 SETTINGS optimize_move_to_prewhere = 0; +0 10 +2 12 +4 14 +SELECT a, c FROM test_rlp PREWHERE b < 5; +0 10 +2 12 +4 14 diff --git a/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.sql b/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.sql new file mode 100644 index 00000000000..6835a3a57ea --- /dev/null +++ b/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.sql @@ -0,0 +1,25 @@ +DROP TABLE IF EXISTS test_rlp; + +CREATE TABLE test_rlp (a Int32, b Int32) ENGINE=MergeTree() ORDER BY a SETTINGS index_granularity=5; + +INSERT INTO test_rlp SELECT number, number FROM numbers(15); + +ALTER TABLE test_rlp ADD COLUMN c Int32 DEFAULT b+10; + +-- { echoOn } + +SELECT a, c FROM test_rlp WHERE c%2 == 0 AND b < 5; + +DROP POLICY IF EXISTS test_rlp_policy ON test_rlp; + +CREATE ROW POLICY test_rlp_policy ON test_rlp FOR SELECT USING c%2 == 0 TO default; + +SELECT a, c FROM test_rlp WHERE b < 5 SETTINGS optimize_move_to_prewhere = 0; + +SELECT a, c FROM test_rlp PREWHERE b < 5; + +-- { echoOff } + +DROP POLICY test_rlp_policy ON test_rlp; + +DROP TABLE test_rlp; diff --git a/tests/queries/0_stateless/02482_if_with_nothing_argument.reference b/tests/queries/0_stateless/02482_if_with_nothing_argument.reference new file mode 100644 index 00000000000..484c9fb68b5 --- /dev/null +++ b/tests/queries/0_stateless/02482_if_with_nothing_argument.reference @@ -0,0 +1,2 @@ +[] 0 +[] 0 diff --git a/tests/queries/0_stateless/02482_if_with_nothing_argument.sql b/tests/queries/0_stateless/02482_if_with_nothing_argument.sql new file mode 100644 index 00000000000..af46ef30d02 --- /dev/null +++ b/tests/queries/0_stateless/02482_if_with_nothing_argument.sql @@ -0,0 +1,3 @@ +select [] as arr, if(empty(arr), 0, arr[-1]); +select [] as arr, multiIf(empty(arr), 0, length(arr) > 1, arr[-1], 0); +