diff --git a/contrib/libarchive-cmake/CMakeLists.txt b/contrib/libarchive-cmake/CMakeLists.txt index cd5658b7086..e89770da5f6 100644 --- a/contrib/libarchive-cmake/CMakeLists.txt +++ b/contrib/libarchive-cmake/CMakeLists.txt @@ -157,7 +157,7 @@ if (TARGET ch_contrib::zlib) endif() if (TARGET ch_contrib::zstd) - target_compile_definitions(_libarchive PUBLIC HAVE_ZSTD_H=1 HAVE_LIBZSTD=1) + target_compile_definitions(_libarchive PUBLIC HAVE_ZSTD_H=1 HAVE_LIBZSTD=1 HAVE_LIBZSTD_COMPRESSOR=1) target_link_libraries(_libarchive PRIVATE ch_contrib::zstd) endif() diff --git a/contrib/libmetrohash/src/metrohash128.h b/contrib/libmetrohash/src/metrohash128.h index 639a4fa97e3..2dbb6ca5a8a 100644 --- a/contrib/libmetrohash/src/metrohash128.h +++ b/contrib/libmetrohash/src/metrohash128.h @@ -25,21 +25,21 @@ public: static const uint32_t bits = 128; // Constructor initializes the same as Initialize() - MetroHash128(const uint64_t seed=0); - + explicit MetroHash128(const uint64_t seed=0); + // Initializes internal state for new hash with optional seed void Initialize(const uint64_t seed=0); - + // Update the hash state with a string of bytes. If the length // is sufficiently long, the implementation switches to a bulk // hashing algorithm directly on the argument buffer for speed. void Update(const uint8_t * buffer, const uint64_t length); - + // Constructs the final hash and writes it to the argument buffer. // After a hash is finalized, this instance must be Initialized()-ed // again or the behavior of Update() and Finalize() is undefined. void Finalize(uint8_t * const hash); - + // A non-incremental function implementation. This can be significantly // faster than the incremental implementation for some usage patterns. static void Hash(const uint8_t * buffer, const uint64_t length, uint8_t * const hash, const uint64_t seed=0); @@ -57,7 +57,7 @@ private: static const uint64_t k1 = 0x8648DBDB; static const uint64_t k2 = 0x7BDEC03B; static const uint64_t k3 = 0x2F5870A5; - + struct { uint64_t v[4]; } state; struct { uint8_t b[32]; } input; uint64_t bytes; diff --git a/contrib/libmetrohash/src/metrohash64.h b/contrib/libmetrohash/src/metrohash64.h index d58898b117d..911e54e6863 100644 --- a/contrib/libmetrohash/src/metrohash64.h +++ b/contrib/libmetrohash/src/metrohash64.h @@ -25,21 +25,21 @@ public: static const uint32_t bits = 64; // Constructor initializes the same as Initialize() - MetroHash64(const uint64_t seed=0); - + explicit MetroHash64(const uint64_t seed=0); + // Initializes internal state for new hash with optional seed void Initialize(const uint64_t seed=0); - + // Update the hash state with a string of bytes. If the length // is sufficiently long, the implementation switches to a bulk // hashing algorithm directly on the argument buffer for speed. void Update(const uint8_t * buffer, const uint64_t length); - + // Constructs the final hash and writes it to the argument buffer. // After a hash is finalized, this instance must be Initialized()-ed // again or the behavior of Update() and Finalize() is undefined. void Finalize(uint8_t * const hash); - + // A non-incremental function implementation. This can be significantly // faster than the incremental implementation for some usage patterns. static void Hash(const uint8_t * buffer, const uint64_t length, uint8_t * const hash, const uint64_t seed=0); @@ -57,7 +57,7 @@ private: static const uint64_t k1 = 0xA2AA033B; static const uint64_t k2 = 0x62992FC1; static const uint64_t k3 = 0x30BC5B29; - + struct { uint64_t v[4]; } state; struct { uint8_t b[32]; } input; uint64_t bytes; diff --git a/docs/en/engines/table-engines/integrations/azureBlobStorage.md b/docs/en/engines/table-engines/integrations/azureBlobStorage.md index c6525121667..0843ff1ac47 100644 --- a/docs/en/engines/table-engines/integrations/azureBlobStorage.md +++ b/docs/en/engines/table-engines/integrations/azureBlobStorage.md @@ -19,6 +19,8 @@ CREATE TABLE azure_blob_storage_table (name String, value UInt32) ### Engine parameters +- `endpoint` — AzureBlobStorage endpoint URL with container & prefix. Optionally can contain account_name if the authentication method used needs it. (http://azurite1:{port}/[account_name]{container_name}/{data_prefix}) or these parameters can be provided separately using storage_account_url, account_name & container. For specifying prefix, endpoint should be used. +- `endpoint_contains_account_name` - This flag is used to specify if endpoint contains account_name as it is only needed for certain authentication methods. (Default : true) - `connection_string|storage_account_url` — connection_string includes account name & key ([Create connection string](https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#configure-a-connection-string-for-an-azure-storage-account)) or you could also provide the storage account url here and account name & account key as separate parameters (see parameters account_name & account_key) - `container_name` - Container name - `blobpath` - file path. Supports following wildcards in readonly mode: `*`, `**`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings. diff --git a/docs/en/engines/table-engines/mergetree-family/mergetree.md b/docs/en/engines/table-engines/mergetree-family/mergetree.md index 228b2c8884f..58717b33aef 100644 --- a/docs/en/engines/table-engines/mergetree-family/mergetree.md +++ b/docs/en/engines/table-engines/mergetree-family/mergetree.md @@ -1242,7 +1242,9 @@ Configuration markup: ``` Connection parameters: -* `storage_account_url` - **Required**, Azure Blob Storage account URL, like `http://account.blob.core.windows.net` or `http://azurite1:10000/devstoreaccount1`. +* `endpoint` — AzureBlobStorage endpoint URL with container & prefix. Optionally can contain account_name if the authentication method used needs it. (`http://account.blob.core.windows.net:{port}/[account_name]{container_name}/{data_prefix}`) or these parameters can be provided separately using storage_account_url, account_name & container. For specifying prefix, endpoint should be used. +* `endpoint_contains_account_name` - This flag is used to specify if endpoint contains account_name as it is only needed for certain authentication methods. (Default : true) +* `storage_account_url` - Required if endpoint is not specified, Azure Blob Storage account URL, like `http://account.blob.core.windows.net` or `http://azurite1:10000/devstoreaccount1`. * `container_name` - Target container name, defaults to `default-container`. * `container_already_exists` - If set to `false`, a new container `container_name` is created in the storage account, if set to `true`, disk connects to the container directly, and if left unset, disk connects to the account, checks if the container `container_name` exists, and creates it if it doesn't exist yet. diff --git a/docs/en/operations/backup.md b/docs/en/operations/backup.md index b1f2135c476..2d9bf2a2ee8 100644 --- a/docs/en/operations/backup.md +++ b/docs/en/operations/backup.md @@ -168,6 +168,28 @@ RESTORE TABLE test.table PARTITIONS '2', '3' FROM Disk('backups', 'filename.zip') ``` +### Backups as tar archives + +Backups can also be stored as tar archives. The functionality is the same as for zip, except that a password is not supported. + +Write a backup as a tar: +``` +BACKUP TABLE test.table TO Disk('backups', '1.tar') +``` + +Corresponding restore: +``` +RESTORE TABLE test.table FROM Disk('backups', '1.tar') +``` + +To change the compression method, the correct file suffix should be appended to the backup name. I.E to compress the tar archive using gzip: +``` +BACKUP TABLE test.table TO Disk('backups', '1.tar.gz') +``` + +The supported compression file suffixes are `tar.gz`, `.tgz` `tar.bz2`, `tar.lzma`, `.tar.zst`, `.tzst` and `.tar.xz`. + + ### Check the status of backups The backup command returns an `id` and `status`, and that `id` can be used to get the status of the backup. This is very useful to check the progress of long ASYNC backups. The example below shows a failure that happened when trying to overwrite an existing backup file: diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index 0805cc81f6e..07c9a2b88ab 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -200,17 +200,13 @@ Type: Bool Default: 0 -## dns_cache_max_size +## dns_cache_max_entries -Internal DNS cache max size in bytes. - -:::note -ClickHouse also has a reverse cache, so the actual memory usage could be twice as much. -::: +Internal DNS cache max entries. Type: UInt64 -Default: 1024 +Default: 10000 ## dns_cache_update_period diff --git a/docs/en/operations/system-tables/dns_cache.md b/docs/en/operations/system-tables/dns_cache.md index 824ce016a70..befeb9298aa 100644 --- a/docs/en/operations/system-tables/dns_cache.md +++ b/docs/en/operations/system-tables/dns_cache.md @@ -33,6 +33,6 @@ Result: **See also** - [disable_internal_dns_cache setting](../../operations/server-configuration-parameters/settings.md#disable_internal_dns_cache) -- [dns_cache_max_size setting](../../operations/server-configuration-parameters/settings.md#dns_cache_max_size) +- [dns_cache_max_entries setting](../../operations/server-configuration-parameters/settings.md#dns_cache_max_entries) - [dns_cache_update_period setting](../../operations/server-configuration-parameters/settings.md#dns_cache_update_period) - [dns_max_consecutive_failures setting](../../operations/server-configuration-parameters/settings.md#dns_max_consecutive_failures) diff --git a/docs/en/sql-reference/functions/bit-functions.md b/docs/en/sql-reference/functions/bit-functions.md index 3c07fe8bcbe..0951c783aae 100644 --- a/docs/en/sql-reference/functions/bit-functions.md +++ b/docs/en/sql-reference/functions/bit-functions.md @@ -167,6 +167,10 @@ Result: └──────────────────────────────────────────┴───────────────────────────────┘ ``` +## byteSlice(s, offset, length) + +See function [substring](string-functions.md#substring). + ## bitTest Takes any integer and converts it into [binary form](https://en.wikipedia.org/wiki/Binary_number), returns the value of a bit at specified position. The countdown starts from 0 from the right to the left. diff --git a/docs/en/sql-reference/functions/date-time-functions.md b/docs/en/sql-reference/functions/date-time-functions.md index c5b3b4cc3ae..83a3bd77cdb 100644 --- a/docs/en/sql-reference/functions/date-time-functions.md +++ b/docs/en/sql-reference/functions/date-time-functions.md @@ -394,8 +394,7 @@ Result: ## toYear -Converts a date or date with time to the year number (AD) as `UInt16` value. - +Returns the year component (AD) of a date or date with time. **Syntax** @@ -431,7 +430,7 @@ Result: ## toQuarter -Converts a date or date with time to the quarter number (1-4) as `UInt8` value. +Returns the quarter (1-4) of a date or date with time. **Syntax** @@ -465,10 +464,9 @@ Result: └──────────────────────────────────────────────┘ ``` - ## toMonth -Converts a date or date with time to the month number (1-12) as `UInt8` value. +Returns the month component (1-12) of a date or date with time. **Syntax** @@ -504,7 +502,7 @@ Result: ## toDayOfYear -Converts a date or date with time to the number of the day of the year (1-366) as `UInt16` value. +Returns the number of the day within the year (1-366) of a date or date with time. **Syntax** @@ -540,7 +538,7 @@ Result: ## toDayOfMonth -Converts a date or date with time to the number of the day in the month (1-31) as `UInt8` value. +Returns the number of the day within the month (1-31) of a date or date with time. **Syntax** @@ -576,7 +574,7 @@ Result: ## toDayOfWeek -Converts a date or date with time to the number of the day in the week as `UInt8` value. +Returns the number of the day within the week of a date or date with time. The two-argument form of `toDayOfWeek()` enables you to specify whether the week starts on Monday or Sunday, and whether the return value should be in the range from 0 to 6 or 1 to 7. If the mode argument is omitted, the default mode is 0. The time zone of the date can be specified as the third argument. @@ -627,7 +625,7 @@ Result: ## toHour -Converts a date with time to the number of the hour in 24-hour time (0-23) as `UInt8` value. +Returns the hour component (0-24) of a date with time. Assumes that if clocks are moved ahead, it is by one hour and occurs at 2 a.m., and if clocks are moved back, it is by one hour and occurs at 3 a.m. (which is not always exactly when it occurs - it depends on the timezone). @@ -641,7 +639,7 @@ Alias: `HOUR` **Arguments** -- `value` - a [Date](../data-types/date.md), [Date32](../data-types/date32.md), [DateTime](../data-types/datetime.md) or [DateTime64](../data-types/datetime64.md) +- `value` - a [DateTime](../data-types/datetime.md) or [DateTime64](../data-types/datetime64.md) **Returned value** @@ -665,7 +663,7 @@ Result: ## toMinute -Converts a date with time to the number of the minute of the hour (0-59) as `UInt8` value. +Returns the minute component (0-59) a date with time. **Syntax** @@ -677,7 +675,7 @@ Alias: `MINUTE` **Arguments** -- `value` - a [Date](../data-types/date.md), [Date32](../data-types/date32.md), [DateTime](../data-types/datetime.md) or [DateTime64](../data-types/datetime64.md) +- `value` - a [DateTime](../data-types/datetime.md) or [DateTime64](../data-types/datetime64.md) **Returned value** @@ -701,7 +699,7 @@ Result: ## toSecond -Converts a date with time to the second in the minute (0-59) as `UInt8` value. Leap seconds are not considered. +Returns the second component (0-59) of a date with time. Leap seconds are not considered. **Syntax** @@ -713,7 +711,7 @@ Alias: `SECOND` **Arguments** -- `value` - a [Date](../data-types/date.md), [Date32](../data-types/date32.md), [DateTime](../data-types/datetime.md) or [DateTime64](../data-types/datetime64.md) +- `value` - a [DateTime](../data-types/datetime.md) or [DateTime64](../data-types/datetime64.md) **Returned value** @@ -735,6 +733,40 @@ Result: └─────────────────────────────────────────────┘ ``` +## toMillisecond + +Returns the millisecond component (0-999) of a date with time. + +**Syntax** + +```sql +toMillisecond(value) +``` + +*Arguments** + +- `value` - [DateTime](../data-types/datetime.md) or [DateTime64](../data-types/datetime64.md) + +Alias: `MILLISECOND` + +```sql +SELECT toMillisecond(toDateTime64('2023-04-21 10:20:30.456', 3)) +``` + +Result: + +```response +┌──toMillisecond(toDateTime64('2023-04-21 10:20:30.456', 3))─┐ +│ 456 │ +└────────────────────────────────────────────────────────────┘ +``` + +**Returned value** + +- The millisecond in the minute (0 - 59) of the given date/time + +Type: `UInt16` + ## toUnixTimestamp Converts a string, a date or a date with time to the [Unix Timestamp](https://en.wikipedia.org/wiki/Unix_time) in `UInt32` representation. diff --git a/docs/en/sql-reference/functions/string-functions.md b/docs/en/sql-reference/functions/string-functions.md index 9ae403be524..3b49e4954ed 100644 --- a/docs/en/sql-reference/functions/string-functions.md +++ b/docs/en/sql-reference/functions/string-functions.md @@ -558,6 +558,7 @@ substring(s, offset[, length]) Alias: - `substr` - `mid` +- `byteSlice` **Arguments** diff --git a/docs/en/sql-reference/statements/system.md b/docs/en/sql-reference/statements/system.md index 868571f3bb2..a128814f072 100644 --- a/docs/en/sql-reference/statements/system.md +++ b/docs/en/sql-reference/statements/system.md @@ -68,7 +68,7 @@ RELOAD FUNCTION [ON CLUSTER cluster_name] function_name Clears ClickHouse’s internal DNS cache. Sometimes (for old ClickHouse versions) it is necessary to use this command when changing the infrastructure (changing the IP address of another ClickHouse server or the server used by dictionaries). -For more convenient (automatic) cache management, see disable_internal_dns_cache, dns_cache_max_size, dns_cache_update_period parameters. +For more convenient (automatic) cache management, see disable_internal_dns_cache, dns_cache_max_entries, dns_cache_update_period parameters. ## DROP MARK CACHE diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 6dc33042a05..a10f47be0b8 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1774,7 +1774,7 @@ try } else { - DNSResolver::instance().setCacheMaxSize(server_settings.dns_cache_max_size); + DNSResolver::instance().setCacheMaxEntries(server_settings.dns_cache_max_entries); /// Initialize a watcher periodically updating DNS cache dns_cache_updater = std::make_unique( diff --git a/src/Access/HTTPAuthClient.h b/src/Access/HTTPAuthClient.h index 5a8a3031a84..caefe869005 100644 --- a/src/Access/HTTPAuthClient.h +++ b/src/Access/HTTPAuthClient.h @@ -24,7 +24,7 @@ class HTTPAuthClient public: using Result = TResponseParser::Result; - HTTPAuthClient(const HTTPAuthClientParams & params, const TResponseParser & parser_ = TResponseParser{}) + explicit HTTPAuthClient(const HTTPAuthClientParams & params, const TResponseParser & parser_ = TResponseParser{}) : timeouts{params.timeouts} , max_tries{params.max_tries} , retry_initial_backoff_ms{params.retry_initial_backoff_ms} diff --git a/src/AggregateFunctions/AggregateFunctionAny.cpp b/src/AggregateFunctions/AggregateFunctionAny.cpp index a6010ff07c3..f727ab04aa9 100644 --- a/src/AggregateFunctions/AggregateFunctionAny.cpp +++ b/src/AggregateFunctions/AggregateFunctionAny.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include #include #include @@ -11,219 +11,347 @@ struct Settings; namespace ErrorCodes { - extern const int INCORRECT_DATA; - extern const int LOGICAL_ERROR; +extern const int NOT_IMPLEMENTED; } namespace { -struct AggregateFunctionAnyRespectNullsData + +template +class AggregateFunctionAny final : public IAggregateFunctionDataHelper> { - enum Status : UInt8 - { - NotSet = 1, - SetNull = 2, - SetOther = 3 - }; - - Status status = Status::NotSet; - Field value; - - bool isSet() const { return status != Status::NotSet; } - void setNull() { status = Status::SetNull; } - void setOther() { status = Status::SetOther; } -}; - -template -class AggregateFunctionAnyRespectNulls final - : public IAggregateFunctionDataHelper> -{ -public: - using Data = AggregateFunctionAnyRespectNullsData; - +private: SerializationPtr serialization; - const bool returns_nullable_type = false; - explicit AggregateFunctionAnyRespectNulls(const DataTypePtr & type) - : IAggregateFunctionDataHelper>({type}, {}, type) - , serialization(type->getDefaultSerialization()) - , returns_nullable_type(type->isNullable()) +public: + explicit AggregateFunctionAny(const DataTypes & argument_types_) + : IAggregateFunctionDataHelper>(argument_types_, {}, argument_types_[0]) + , serialization(this->result_type->getDefaultSerialization()) { } - String getName() const override - { - if constexpr (First) - return "any_respect_nulls"; - else - return "anyLast_respect_nulls"; - } + String getName() const override { return "any"; } - bool allocatesMemoryInArena() const override { return false; } - - void addNull(AggregateDataPtr __restrict place) const + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override { - chassert(returns_nullable_type); - auto & d = this->data(place); - if (First && d.isSet()) - return; - d.setNull(); - } - - void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override - { - if (columns[0]->isNullable()) - { - if (columns[0]->isNullAt(row_num)) - return addNull(place); - } - auto & d = this->data(place); - if (First && d.isSet()) - return; - d.setOther(); - columns[0]->get(row_num, d.value); - } - - void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override - { - if (columns[0]->isNullable()) - addNull(place); - else - add(place, columns, 0, arena); + if (!this->data(place).has()) + this->data(place).set(*columns[0], row_num, arena); } void addBatchSinglePlace( - size_t row_begin, size_t row_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos) - const override + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** __restrict columns, + Arena * arena, + ssize_t if_argument_pos) const override { + if (this->data(place).has() || row_begin >= row_end) + return; + if (if_argument_pos >= 0) { - const auto & flags = assert_cast(*columns[if_argument_pos]).getData(); - size_t size = row_end - row_begin; - for (size_t i = 0; i < size; ++i) + const auto & if_map = assert_cast(*columns[if_argument_pos]).getData(); + for (size_t i = row_begin; i < row_end; i++) { - size_t pos = First ? row_begin + i : row_end - 1 - i; - if (flags[pos]) + if (if_map.data()[i] != 0) { - add(place, columns, pos, arena); - break; + this->data(place).set(*columns[0], i, arena); + return; } } } - else if (row_begin < row_end) + else { - size_t pos = First ? row_begin : row_end - 1; - add(place, columns, pos, arena); + this->data(place).set(*columns[0], row_begin, arena); } } void addBatchSinglePlaceNotNull( - size_t, size_t, AggregateDataPtr __restrict, const IColumn **, const UInt8 *, Arena *, ssize_t) const override + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** __restrict columns, + const UInt8 * __restrict null_map, + Arena * arena, + ssize_t if_argument_pos) const override { - /// This should not happen since it means somebody else has preprocessed the data (NULLs or IFs) and might - /// have discarded values that we need (NULLs) - throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "AggregateFunctionAnyRespectNulls::addBatchSinglePlaceNotNull called"); - } - - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override - { - auto & d = this->data(place); - if (First && d.isSet()) + if (this->data(place).has() || row_begin >= row_end) return; - auto & other = this->data(rhs); - if (other.isSet()) + if (if_argument_pos >= 0) { - d.status = other.status; - d.value = other.value; + const auto & if_map = assert_cast(*columns[if_argument_pos]).getData(); + for (size_t i = row_begin; i < row_end; i++) + { + if (if_map.data()[i] != 0 && null_map[i] == 0) + { + this->data(place).set(*columns[0], i, arena); + return; + } + } } + else + { + for (size_t i = row_begin; i < row_end; i++) + { + if (null_map[i] == 0) + { + this->data(place).set(*columns[0], i, arena); + return; + } + } + } + } + + void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override + { + if (!this->data(place).has()) + this->data(place).set(*columns[0], 0, arena); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + if (!this->data(place).has()) + this->data(place).set(this->data(rhs), arena); } void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override { - auto & d = this->data(place); - UInt8 k = d.status; - - writeBinaryLittleEndian(k, buf); - if (k == Data::Status::SetOther) - serialization->serializeBinary(d.value, buf, {}); + this->data(place).write(buf, *serialization); } - void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional /* version */, Arena *) const override + void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional /* version */, Arena * arena) const override { - auto & d = this->data(place); - UInt8 k = Data::Status::NotSet; - readBinaryLittleEndian(k, buf); - d.status = static_cast(k); - if (d.status == Data::Status::NotSet) - return; - else if (d.status == Data::Status::SetNull) - { - if (!returns_nullable_type) - throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect type (NULL) in non-nullable {}State", getName()); - return; - } - else if (d.status == Data::Status::SetOther) - serialization->deserializeBinary(d.value, buf, {}); - else - throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect type ({}) in {}State", static_cast(k), getName()); + this->data(place).read(buf, *serialization, arena); } + bool allocatesMemoryInArena() const override { return Data::allocatesMemoryInArena(); } + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override { - auto & d = this->data(place); - if (d.status == Data::Status::SetOther) - to.insert(d.value); - else - to.insertDefault(); + this->data(place).insertResultInto(to); } - AggregateFunctionPtr getOwnNullAdapter( - const AggregateFunctionPtr & original_function, - const DataTypes & /*arguments*/, - const Array & /*params*/, - const AggregateFunctionProperties & /*properties*/) const override +#if USE_EMBEDDED_COMPILER + bool isCompilable() const override { - return original_function; + if constexpr (!Data::is_compilable) + return false; + else + return Data::isCompilable(*this->argument_types[0]); } + + void compileCreate(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override + { + if constexpr (Data::is_compilable) + Data::compileCreate(builder, aggregate_data_ptr); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } + + void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override + { + if constexpr (Data::is_compilable) + Data::compileAny(builder, aggregate_data_ptr, arguments[0].value); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } + + void + compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override + { + if constexpr (Data::is_compilable) + Data::compileAnyMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } + + llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override + { + if constexpr (Data::is_compilable) + return Data::compileGetResult(builder, aggregate_data_ptr); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } +#endif }; - -template -IAggregateFunction * createAggregateFunctionSingleValueRespectNulls( - const String & name, const DataTypes & argument_types, const Array & parameters, const Settings *) +AggregateFunctionPtr +createAggregateFunctionAny(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) { - assertNoParameters(name, parameters); - assertUnary(name, argument_types); - - return new AggregateFunctionAnyRespectNulls(argument_types[0]); + return AggregateFunctionPtr( + createAggregateFunctionSingleValue(name, argument_types, parameters, settings)); } -AggregateFunctionPtr createAggregateFunctionAny(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) -{ - return AggregateFunctionPtr(createAggregateFunctionSingleValue(name, argument_types, parameters, settings)); -} -AggregateFunctionPtr createAggregateFunctionAnyRespectNulls( +template +class AggregateFunctionAnyLast final : public IAggregateFunctionDataHelper> +{ +private: + SerializationPtr serialization; + +public: + explicit AggregateFunctionAnyLast(const DataTypes & argument_types_) + : IAggregateFunctionDataHelper>(argument_types_, {}, argument_types_[0]) + , serialization(this->result_type->getDefaultSerialization()) + { + } + + String getName() const override { return "anyLast"; } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + this->data(place).set(*columns[0], row_num, arena); + } + + void addBatchSinglePlace( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** __restrict columns, + Arena * arena, + ssize_t if_argument_pos) const override + { + if (row_begin >= row_end) + return; + + size_t batch_size = row_end - row_begin; + if (if_argument_pos >= 0) + { + const auto & if_map = assert_cast(*columns[if_argument_pos]).getData(); + for (size_t i = 0; i < batch_size; i++) + { + size_t pos = (row_end - 1) - i; + if (if_map.data()[pos] != 0) + { + this->data(place).set(*columns[0], pos, arena); + return; + } + } + } + else + { + this->data(place).set(*columns[0], row_end - 1, arena); + } + } + + void addBatchSinglePlaceNotNull( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** __restrict columns, + const UInt8 * __restrict null_map, + Arena * arena, + ssize_t if_argument_pos) const override + { + if (row_begin >= row_end) + return; + + size_t batch_size = row_end - row_begin; + if (if_argument_pos >= 0) + { + const auto & if_map = assert_cast(*columns[if_argument_pos]).getData(); + for (size_t i = 0; i < batch_size; i++) + { + size_t pos = (row_end - 1) - i; + if (if_map.data()[pos] != 0 && null_map[pos] == 0) + { + this->data(place).set(*columns[0], pos, arena); + return; + } + } + } + else + { + for (size_t i = 0; i < batch_size; i++) + { + size_t pos = (row_end - 1) - i; + if (null_map[pos] == 0) + { + this->data(place).set(*columns[0], pos, arena); + return; + } + } + } + } + + void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override + { + this->data(place).set(*columns[0], 0, arena); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + this->data(place).set(this->data(rhs), arena); + } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override + { + this->data(place).write(buf, *serialization); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional /* version */, Arena * arena) const override + { + this->data(place).read(buf, *serialization, arena); + } + + bool allocatesMemoryInArena() const override { return Data::allocatesMemoryInArena(); } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + this->data(place).insertResultInto(to); + } + +#if USE_EMBEDDED_COMPILER + bool isCompilable() const override + { + if constexpr (!Data::is_compilable) + return false; + else + return Data::isCompilable(*this->argument_types[0]); + } + + void compileCreate(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override + { + if constexpr (Data::is_compilable) + Data::compileCreate(builder, aggregate_data_ptr); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } + + void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override + { + if constexpr (Data::is_compilable) + Data::compileAnyLast(builder, aggregate_data_ptr, arguments[0].value); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } + + void + compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override + { + if constexpr (Data::is_compilable) + Data::compileAnyLastMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } + + llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override + { + if constexpr (Data::is_compilable) + return Data::compileGetResult(builder, aggregate_data_ptr); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } +#endif +}; + +AggregateFunctionPtr createAggregateFunctionAnyLast( const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) { - return AggregateFunctionPtr(createAggregateFunctionSingleValueRespectNulls(name, argument_types, parameters, settings)); -} - -AggregateFunctionPtr createAggregateFunctionAnyLast(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) -{ - return AggregateFunctionPtr(createAggregateFunctionSingleValue(name, argument_types, parameters, settings)); -} - -AggregateFunctionPtr createAggregateFunctionAnyLastRespectNulls( - const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) -{ - return AggregateFunctionPtr(createAggregateFunctionSingleValueRespectNulls(name, argument_types, parameters, settings)); -} - -AggregateFunctionPtr createAggregateFunctionAnyHeavy(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) -{ - return AggregateFunctionPtr(createAggregateFunctionSingleValue(name, argument_types, parameters, settings)); + return AggregateFunctionPtr( + createAggregateFunctionSingleValue(name, argument_types, parameters, settings)); } } @@ -231,27 +359,11 @@ AggregateFunctionPtr createAggregateFunctionAnyHeavy(const std::string & name, c void registerAggregateFunctionsAny(AggregateFunctionFactory & factory) { AggregateFunctionProperties default_properties = {.returns_default_when_only_null = false, .is_order_dependent = true}; - AggregateFunctionProperties default_properties_for_respect_nulls - = {.returns_default_when_only_null = false, .is_order_dependent = true, .is_window_function = true}; factory.registerFunction("any", {createAggregateFunctionAny, default_properties}); factory.registerAlias("any_value", "any", AggregateFunctionFactory::CaseInsensitive); factory.registerAlias("first_value", "any", AggregateFunctionFactory::CaseInsensitive); - - factory.registerFunction("any_respect_nulls", {createAggregateFunctionAnyRespectNulls, default_properties_for_respect_nulls}); - factory.registerAlias("any_value_respect_nulls", "any_respect_nulls", AggregateFunctionFactory::CaseInsensitive); - factory.registerAlias("first_value_respect_nulls", "any_respect_nulls", AggregateFunctionFactory::CaseInsensitive); - factory.registerFunction("anyLast", {createAggregateFunctionAnyLast, default_properties}); factory.registerAlias("last_value", "anyLast", AggregateFunctionFactory::CaseInsensitive); - - factory.registerFunction("anyLast_respect_nulls", {createAggregateFunctionAnyLastRespectNulls, default_properties_for_respect_nulls}); - factory.registerAlias("last_value_respect_nulls", "anyLast_respect_nulls", AggregateFunctionFactory::CaseInsensitive); - - factory.registerFunction("anyHeavy", {createAggregateFunctionAnyHeavy, default_properties}); - - factory.registerNullsActionTransformation("any", "any_respect_nulls"); - factory.registerNullsActionTransformation("anyLast", "anyLast_respect_nulls"); } - } diff --git a/src/AggregateFunctions/AggregateFunctionAnyHeavy.cpp b/src/AggregateFunctions/AggregateFunctionAnyHeavy.cpp new file mode 100644 index 00000000000..4f4d4a19cba --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionAnyHeavy.cpp @@ -0,0 +1,168 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ +struct Settings; + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} + +namespace +{ + +/** Implement 'heavy hitters' algorithm. + * Selects most frequent value if its frequency is more than 50% in each thread of execution. + * Otherwise, selects some arbitrary value. + * http://www.cs.umd.edu/~samir/498/karp.pdf + */ +struct AggregateFunctionAnyHeavyData +{ + using Self = AggregateFunctionAnyHeavyData; + +private: + SingleValueDataBaseMemoryBlock v_data; + UInt64 counter = 0; + +public: + [[noreturn]] explicit AggregateFunctionAnyHeavyData() + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "AggregateFunctionAnyHeavyData initialized empty"); + } + + explicit AggregateFunctionAnyHeavyData(TypeIndex value_type) { generateSingleValueFromTypeIndex(value_type, v_data); } + + ~AggregateFunctionAnyHeavyData() { data().~SingleValueDataBase(); } + + SingleValueDataBase & data() { return v_data.get(); } + const SingleValueDataBase & data() const { return v_data.get(); } + + void add(const IColumn & column, size_t row_num, Arena * arena) + { + if (data().isEqualTo(column, row_num)) + { + ++counter; + } + else if (counter == 0) + { + data().set(column, row_num, arena); + ++counter; + } + else + { + --counter; + } + } + + void add(const Self & to, Arena * arena) + { + if (!to.data().has()) + return; + + if (data().isEqualTo(to.data())) + counter += to.counter; + else if (!data().has() || counter < to.counter) + data().set(to.data(), arena); + else + counter -= to.counter; + } + + void addManyDefaults(const IColumn & column, size_t length, Arena * arena) + { + for (size_t i = 0; i < length; ++i) + add(column, 0, arena); + } + + void write(WriteBuffer & buf, const ISerialization & serialization) const + { + data().write(buf, serialization); + writeBinaryLittleEndian(counter, buf); + } + + void read(ReadBuffer & buf, const ISerialization & serialization, Arena * arena) + { + data().read(buf, serialization, arena); + readBinaryLittleEndian(counter, buf); + } + + void insertResultInto(IColumn & to) const { data().insertResultInto(to); } +}; + + +class AggregateFunctionAnyHeavy final : public IAggregateFunctionDataHelper +{ +private: + SerializationPtr serialization; + const TypeIndex value_type_index; + +public: + explicit AggregateFunctionAnyHeavy(const DataTypePtr & type) + : IAggregateFunctionDataHelper({type}, {}, type) + , serialization(type->getDefaultSerialization()) + , value_type_index(WhichDataType(type).idx) + { + } + + void create(AggregateDataPtr __restrict place) const override { new (place) AggregateFunctionAnyHeavyData(value_type_index); } + + String getName() const override { return "anyHeavy"; } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + this->data(place).add(*columns[0], row_num, arena); + } + + void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override + { + this->data(place).addManyDefaults(*columns[0], 0, arena); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + this->data(place).add(this->data(rhs), arena); + } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override + { + this->data(place).write(buf, *serialization); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional /* version */, Arena * arena) const override + { + this->data(place).read(buf, *serialization, arena); + } + + bool allocatesMemoryInArena() const override { return singleValueTypeAllocatesMemoryInArena(value_type_index); } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + this->data(place).insertResultInto(to); + } +}; + + +AggregateFunctionPtr +createAggregateFunctionAnyHeavy(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *) +{ + assertNoParameters(name, parameters); + assertUnary(name, argument_types); + + const DataTypePtr & res_type = argument_types[0]; + return AggregateFunctionPtr(new AggregateFunctionAnyHeavy(res_type)); +} + +} + +void registerAggregateFunctionAnyHeavy(AggregateFunctionFactory & factory) +{ + AggregateFunctionProperties default_properties = {.returns_default_when_only_null = false, .is_order_dependent = true}; + factory.registerFunction("anyHeavy", {createAggregateFunctionAnyHeavy, default_properties}); +} + +} diff --git a/src/AggregateFunctions/AggregateFunctionAnyRespectNulls.cpp b/src/AggregateFunctions/AggregateFunctionAnyRespectNulls.cpp new file mode 100644 index 00000000000..7275409c151 --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionAnyRespectNulls.cpp @@ -0,0 +1,235 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ +struct Settings; + +namespace ErrorCodes +{ +extern const int INCORRECT_DATA; +extern const int LOGICAL_ERROR; +} + +namespace +{ +struct AggregateFunctionAnyRespectNullsData +{ + enum class Status : UInt8 + { + NotSet = 1, + SetNull = 2, + SetOther = 3 + }; + + Status status = Status::NotSet; + Field value; + + bool isSet() const { return status != Status::NotSet; } + void setNull() { status = Status::SetNull; } + void setOther() { status = Status::SetOther; } +}; + +template +class AggregateFunctionAnyRespectNulls final + : public IAggregateFunctionDataHelper> +{ +public: + using Data = AggregateFunctionAnyRespectNullsData; + + SerializationPtr serialization; + const bool returns_nullable_type = false; + + explicit AggregateFunctionAnyRespectNulls(const DataTypePtr & type) + : IAggregateFunctionDataHelper>({type}, {}, type) + , serialization(type->getDefaultSerialization()) + , returns_nullable_type(type->isNullable()) + { + } + + String getName() const override + { + if constexpr (First) + return "any_respect_nulls"; + else + return "anyLast_respect_nulls"; + } + + bool allocatesMemoryInArena() const override { return false; } + + void addNull(AggregateDataPtr __restrict place) const + { + chassert(returns_nullable_type); + auto & d = this->data(place); + if (First && d.isSet()) + return; + d.setNull(); + } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override + { + if (columns[0]->isNullable()) + { + if (columns[0]->isNullAt(row_num)) + return addNull(place); + } + auto & d = this->data(place); + if (First && d.isSet()) + return; + d.setOther(); + columns[0]->get(row_num, d.value); + } + + void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override + { + if (columns[0]->isNullable()) + addNull(place); + else + add(place, columns, 0, arena); + } + + void addBatchSinglePlace( + size_t row_begin, size_t row_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos) + const override + { + if (if_argument_pos >= 0) + { + const auto & flags = assert_cast(*columns[if_argument_pos]).getData(); + size_t size = row_end - row_begin; + for (size_t i = 0; i < size; ++i) + { + size_t pos = First ? row_begin + i : row_end - 1 - i; + if (flags[pos]) + { + add(place, columns, pos, arena); + break; + } + } + } + else if (row_begin < row_end) + { + size_t pos = First ? row_begin : row_end - 1; + add(place, columns, pos, arena); + } + } + + void addBatchSinglePlaceNotNull( + size_t, size_t, AggregateDataPtr __restrict, const IColumn **, const UInt8 *, Arena *, ssize_t) const override + { + /// This should not happen since it means somebody else has preprocessed the data (NULLs or IFs) and might + /// have discarded values that we need (NULLs) + throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "AggregateFunctionAnyRespectNulls::addBatchSinglePlaceNotNull called"); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override + { + auto & d = this->data(place); + if (First && d.isSet()) + return; + + auto & other = this->data(rhs); + if (other.isSet()) + { + d.status = other.status; + d.value = other.value; + } + } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override + { + auto & d = this->data(place); + UInt8 k = static_cast(d.status); + + writeBinaryLittleEndian(k, buf); + if (d.status == Data::Status::SetOther) + serialization->serializeBinary(d.value, buf, {}); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional /* version */, Arena *) const override + { + auto & d = this->data(place); + UInt8 k = 0; + readBinaryLittleEndian(k, buf); + d.status = static_cast(k); + if (d.status == Data::Status::NotSet) + return; + else if (d.status == Data::Status::SetNull) + { + if (!returns_nullable_type) + throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect type (NULL) in non-nullable {}State", getName()); + return; + } + else if (d.status == Data::Status::SetOther) + { + serialization->deserializeBinary(d.value, buf, {}); + return; + } + throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect type ({}) in {}State", static_cast(k), getName()); + } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + auto & d = this->data(place); + if (d.status == Data::Status::SetOther) + to.insert(d.value); + else + to.insertDefault(); + } + + AggregateFunctionPtr getOwnNullAdapter( + const AggregateFunctionPtr & original_function, + const DataTypes & /*arguments*/, + const Array & /*params*/, + const AggregateFunctionProperties & /*properties*/) const override + { + return original_function; + } +}; + + +template +IAggregateFunction * createAggregateFunctionSingleValueRespectNulls( + const String & name, const DataTypes & argument_types, const Array & parameters, const Settings *) +{ + assertNoParameters(name, parameters); + assertUnary(name, argument_types); + + return new AggregateFunctionAnyRespectNulls(argument_types[0]); +} + +AggregateFunctionPtr createAggregateFunctionAnyRespectNulls( + const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) +{ + return AggregateFunctionPtr(createAggregateFunctionSingleValueRespectNulls(name, argument_types, parameters, settings)); +} + +AggregateFunctionPtr createAggregateFunctionAnyLastRespectNulls( + const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) +{ + return AggregateFunctionPtr(createAggregateFunctionSingleValueRespectNulls(name, argument_types, parameters, settings)); +} + +} + +void registerAggregateFunctionsAnyRespectNulls(AggregateFunctionFactory & factory) +{ + AggregateFunctionProperties default_properties_for_respect_nulls + = {.returns_default_when_only_null = false, .is_order_dependent = true, .is_window_function = true}; + + factory.registerFunction("any_respect_nulls", {createAggregateFunctionAnyRespectNulls, default_properties_for_respect_nulls}); + factory.registerAlias("any_value_respect_nulls", "any_respect_nulls", AggregateFunctionFactory::CaseInsensitive); + factory.registerAlias("first_value_respect_nulls", "any_respect_nulls", AggregateFunctionFactory::CaseInsensitive); + + factory.registerFunction("anyLast_respect_nulls", {createAggregateFunctionAnyLastRespectNulls, default_properties_for_respect_nulls}); + factory.registerAlias("last_value_respect_nulls", "anyLast_respect_nulls", AggregateFunctionFactory::CaseInsensitive); + + /// Must happen after registering any and anyLast + factory.registerNullsActionTransformation("any", "any_respect_nulls"); + factory.registerNullsActionTransformation("anyLast", "anyLast_respect_nulls"); +} + +} diff --git a/src/AggregateFunctions/AggregateFunctionArgMinMax.h b/src/AggregateFunctions/AggregateFunctionArgMinMax.h deleted file mode 100644 index e1ba2d10abc..00000000000 --- a/src/AggregateFunctions/AggregateFunctionArgMinMax.h +++ /dev/null @@ -1,107 +0,0 @@ -#pragma once - -#include -#include -#include -#include // SingleValueDataString used in embedded compiler - - -namespace DB -{ -struct Settings; - -namespace ErrorCodes -{ - extern const int ILLEGAL_TYPE_OF_ARGUMENT; - extern const int CORRUPTED_DATA; -} - - -/// For possible values for template parameters, see 'AggregateFunctionMinMaxAny.h'. -template -struct AggregateFunctionArgMinMaxData -{ - using ResultData_t = ResultData; - using ValueData_t = ValueData; - - ResultData result; // the argument at which the minimum/maximum value is reached. - ValueData value; // value for which the minimum/maximum is calculated. - - static bool allocatesMemoryInArena() - { - return ResultData::allocatesMemoryInArena() || ValueData::allocatesMemoryInArena(); - } -}; - -/// Returns the first arg value found for the minimum/maximum value. Example: argMax(arg, value). -template -class AggregateFunctionArgMinMax final : public IAggregateFunctionDataHelper> -{ -private: - const DataTypePtr & type_val; - const SerializationPtr serialization_res; - const SerializationPtr serialization_val; - - using Base = IAggregateFunctionDataHelper>; - -public: - AggregateFunctionArgMinMax(const DataTypePtr & type_res_, const DataTypePtr & type_val_) - : Base({type_res_, type_val_}, {}, type_res_) - , type_val(this->argument_types[1]) - , serialization_res(type_res_->getDefaultSerialization()) - , serialization_val(type_val->getDefaultSerialization()) - { - if (!type_val->isComparable()) - throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of second argument of " - "aggregate function {} because the values of that data type are not comparable", - type_val->getName(), getName()); - } - - String getName() const override - { - return StringRef(Data::ValueData_t::name()) == StringRef("min") ? "argMin" : "argMax"; - } - - void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override - { - if (this->data(place).value.changeIfBetter(*columns[1], row_num, arena)) - this->data(place).result.change(*columns[0], row_num, arena); - } - - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override - { - if (this->data(place).value.changeIfBetter(this->data(rhs).value, arena)) - this->data(place).result.change(this->data(rhs).result, arena); - } - - void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override - { - this->data(place).result.write(buf, *serialization_res); - this->data(place).value.write(buf, *serialization_val); - } - - void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional /* version */, Arena * arena) const override - { - this->data(place).result.read(buf, *serialization_res, arena); - this->data(place).value.read(buf, *serialization_val, arena); - if (unlikely(this->data(place).value.has() != this->data(place).result.has())) - throw Exception( - ErrorCodes::CORRUPTED_DATA, - "Invalid state of the aggregate function {}: has_value ({}) != has_result ({})", - getName(), - this->data(place).value.has(), - this->data(place).result.has()); - } - - bool allocatesMemoryInArena() const override - { - return Data::allocatesMemoryInArena(); - } - - void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override - { - this->data(place).result.insertResultInto(to); - } -}; - -} diff --git a/src/AggregateFunctions/AggregateFunctionMLMethod.h b/src/AggregateFunctions/AggregateFunctionMLMethod.h index d2200243012..fd50fe4b28c 100644 --- a/src/AggregateFunctions/AggregateFunctionMLMethod.h +++ b/src/AggregateFunctions/AggregateFunctionMLMethod.h @@ -204,7 +204,7 @@ private: class Adam : public IWeightsUpdater { public: - Adam(size_t num_params) + explicit Adam(size_t num_params) { beta1_powered = beta1; beta2_powered = beta2; diff --git a/src/AggregateFunctions/AggregateFunctionMax.cpp b/src/AggregateFunctions/AggregateFunctionMax.cpp deleted file mode 100644 index c20df0bc81e..00000000000 --- a/src/AggregateFunctions/AggregateFunctionMax.cpp +++ /dev/null @@ -1,238 +0,0 @@ -#include -#include -#include -#include -#include - -namespace DB -{ -struct Settings; - -namespace -{ - -template -class AggregateFunctionsSingleValueMax final : public AggregateFunctionsSingleValue -{ - using Parent = AggregateFunctionsSingleValue; - -public: - explicit AggregateFunctionsSingleValueMax(const DataTypePtr & type) : Parent(type) { } - - /// Specializations for native numeric types - void addBatchSinglePlace( - size_t row_begin, - size_t row_end, - AggregateDataPtr __restrict place, - const IColumn ** __restrict columns, - Arena * arena, - ssize_t if_argument_pos) const override; - - void addBatchSinglePlaceNotNull( - size_t row_begin, - size_t row_end, - AggregateDataPtr __restrict place, - const IColumn ** __restrict columns, - const UInt8 * __restrict null_map, - Arena * arena, - ssize_t if_argument_pos) const override; -}; - -// NOLINTBEGIN(bugprone-macro-parentheses) -#define SPECIALIZE(TYPE) \ -template <> \ -void AggregateFunctionsSingleValueMax>>::addBatchSinglePlace( \ - size_t row_begin, \ - size_t row_end, \ - AggregateDataPtr __restrict place, \ - const IColumn ** __restrict columns, \ - Arena *, \ - ssize_t if_argument_pos) const \ -{ \ - const auto & column = assert_cast>::ColVecType &>(*columns[0]); \ - std::optional opt; \ - if (if_argument_pos >= 0) \ - { \ - const auto & flags = assert_cast(*columns[if_argument_pos]).getData(); \ - opt = findExtremeMaxIf(column.getData().data(), flags.data(), row_begin, row_end); \ - } \ - else \ - opt = findExtremeMax(column.getData().data(), row_begin, row_end); \ - if (opt.has_value()) \ - this->data(place).changeIfGreater(opt.value()); \ -} -// NOLINTEND(bugprone-macro-parentheses) - -FOR_BASIC_NUMERIC_TYPES(SPECIALIZE) -#undef SPECIALIZE - -template -void AggregateFunctionsSingleValueMax::addBatchSinglePlace( - size_t row_begin, - size_t row_end, - AggregateDataPtr __restrict place, - const IColumn ** __restrict columns, - Arena * arena, - ssize_t if_argument_pos) const -{ - if constexpr (!is_any_of) - { - /// Leave other numeric types (large integers, decimals, etc) to keep doing the comparison as it's - /// faster than doing a permutation - return Parent::addBatchSinglePlace(row_begin, row_end, place, columns, arena, if_argument_pos); - } - - constexpr int nan_null_direction_hint = -1; - auto const & column = *columns[0]; - if (if_argument_pos >= 0) - { - size_t index = row_begin; - const auto & if_flags = assert_cast(*columns[if_argument_pos]).getData(); - while (if_flags[index] == 0 && index < row_end) - index++; - if (index >= row_end) - return; - - for (size_t i = index + 1; i < row_end; i++) - { - if ((if_flags[i] != 0) && (column.compareAt(i, index, column, nan_null_direction_hint) > 0)) - index = i; - } - this->data(place).changeIfGreater(column, index, arena); - } - else - { - if (row_begin >= row_end) - return; - - /// TODO: Introduce row_begin and row_end to getPermutation - if (row_begin != 0 || row_end != column.size()) - { - size_t index = row_begin; - for (size_t i = index + 1; i < row_end; i++) - { - if (column.compareAt(i, index, column, nan_null_direction_hint) > 0) - index = i; - } - this->data(place).changeIfGreater(column, index, arena); - } - else - { - constexpr IColumn::PermutationSortDirection direction = IColumn::PermutationSortDirection::Descending; - constexpr IColumn::PermutationSortStability stability = IColumn::PermutationSortStability::Unstable; - IColumn::Permutation permutation; - constexpr UInt64 limit = 1; - column.getPermutation(direction, stability, limit, nan_null_direction_hint, permutation); - this->data(place).changeIfGreater(column, permutation[0], arena); - } - } -} - -// NOLINTBEGIN(bugprone-macro-parentheses) -#define SPECIALIZE(TYPE) \ -template <> \ -void AggregateFunctionsSingleValueMax>>::addBatchSinglePlaceNotNull( \ - size_t row_begin, \ - size_t row_end, \ - AggregateDataPtr __restrict place, \ - const IColumn ** __restrict columns, \ - const UInt8 * __restrict null_map, \ - Arena *, \ - ssize_t if_argument_pos) const \ -{ \ - const auto & column = assert_cast>::ColVecType &>(*columns[0]); \ - std::optional opt; \ - if (if_argument_pos >= 0) \ - { \ - const auto * if_flags = assert_cast(*columns[if_argument_pos]).getData().data(); \ - auto final_flags = std::make_unique(row_end); \ - for (size_t i = row_begin; i < row_end; ++i) \ - final_flags[i] = (!null_map[i]) & !!if_flags[i]; \ - opt = findExtremeMaxIf(column.getData().data(), final_flags.get(), row_begin, row_end); \ - } \ - else \ - opt = findExtremeMaxNotNull(column.getData().data(), null_map, row_begin, row_end); \ - if (opt.has_value()) \ - this->data(place).changeIfGreater(opt.value()); \ -} -// NOLINTEND(bugprone-macro-parentheses) - -FOR_BASIC_NUMERIC_TYPES(SPECIALIZE) -#undef SPECIALIZE - -template -void AggregateFunctionsSingleValueMax::addBatchSinglePlaceNotNull( - size_t row_begin, - size_t row_end, - AggregateDataPtr __restrict place, - const IColumn ** __restrict columns, - const UInt8 * __restrict null_map, - Arena * arena, - ssize_t if_argument_pos) const -{ - if constexpr (!is_any_of) - { - /// Leave other numeric types (large integers, decimals, etc) to keep doing the comparison as it's - /// faster than doing a permutation - return Parent::addBatchSinglePlaceNotNull(row_begin, row_end, place, columns, null_map, arena, if_argument_pos); - } - - constexpr int nan_null_direction_hint = -1; - auto const & column = *columns[0]; - if (if_argument_pos >= 0) - { - size_t index = row_begin; - const auto & if_flags = assert_cast(*columns[if_argument_pos]).getData(); - while ((if_flags[index] == 0 || null_map[index] != 0) && (index < row_end)) - index++; - if (index >= row_end) - return; - - for (size_t i = index + 1; i < row_end; i++) - { - if ((if_flags[i] != 0) && (null_map[i] == 0) && (column.compareAt(i, index, column, nan_null_direction_hint) > 0)) - index = i; - } - this->data(place).changeIfGreater(column, index, arena); - } - else - { - size_t index = row_begin; - while ((null_map[index] != 0) && (index < row_end)) - index++; - if (index >= row_end) - return; - - for (size_t i = index + 1; i < row_end; i++) - { - if ((null_map[i] == 0) && (column.compareAt(i, index, column, nan_null_direction_hint) > 0)) - index = i; - } - this->data(place).changeIfGreater(column, index, arena); - } -} - -AggregateFunctionPtr createAggregateFunctionMax( - const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) -{ - return AggregateFunctionPtr(createAggregateFunctionSingleValue(name, argument_types, parameters, settings)); -} - -AggregateFunctionPtr createAggregateFunctionArgMax( - const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) -{ - return AggregateFunctionPtr(createAggregateFunctionArgMinMax(name, argument_types, parameters, settings)); -} - -} - -void registerAggregateFunctionsMax(AggregateFunctionFactory & factory) -{ - factory.registerFunction("max", createAggregateFunctionMax, AggregateFunctionFactory::CaseInsensitive); - - /// The functions below depend on the order of data. - AggregateFunctionProperties properties = { .returns_default_when_only_null = false, .is_order_dependent = true }; - factory.registerFunction("argMax", { createAggregateFunctionArgMax, properties }); -} - -} diff --git a/src/AggregateFunctions/AggregateFunctionMin.cpp b/src/AggregateFunctions/AggregateFunctionMin.cpp deleted file mode 100644 index 7941f3af0de..00000000000 --- a/src/AggregateFunctions/AggregateFunctionMin.cpp +++ /dev/null @@ -1,240 +0,0 @@ -#include -#include -#include -#include -#include - - -namespace DB -{ -struct Settings; - -namespace -{ - -template -class AggregateFunctionsSingleValueMin final : public AggregateFunctionsSingleValue -{ - using Parent = AggregateFunctionsSingleValue; - -public: - explicit AggregateFunctionsSingleValueMin(const DataTypePtr & type) : Parent(type) { } - - /// Specializations for native numeric types - void addBatchSinglePlace( - size_t row_begin, - size_t row_end, - AggregateDataPtr __restrict place, - const IColumn ** __restrict columns, - Arena * arena, - ssize_t if_argument_pos) const override; - - void addBatchSinglePlaceNotNull( - size_t row_begin, - size_t row_end, - AggregateDataPtr __restrict place, - const IColumn ** __restrict columns, - const UInt8 * __restrict null_map, - Arena * arena, - ssize_t if_argument_pos) const override; -}; - -// NOLINTBEGIN(bugprone-macro-parentheses) -#define SPECIALIZE(TYPE) \ - template <> \ - void AggregateFunctionsSingleValueMin>>::addBatchSinglePlace( \ - size_t row_begin, \ - size_t row_end, \ - AggregateDataPtr __restrict place, \ - const IColumn ** __restrict columns, \ - Arena *, \ - ssize_t if_argument_pos) const \ - { \ - const auto & column = assert_cast>::ColVecType &>(*columns[0]); \ - std::optional opt; \ - if (if_argument_pos >= 0) \ - { \ - const auto & flags = assert_cast(*columns[if_argument_pos]).getData(); \ - opt = findExtremeMinIf(column.getData().data(), flags.data(), row_begin, row_end); \ - } \ - else \ - opt = findExtremeMin(column.getData().data(), row_begin, row_end); \ - if (opt.has_value()) \ - this->data(place).changeIfLess(opt.value()); \ - } -// NOLINTEND(bugprone-macro-parentheses) - -FOR_BASIC_NUMERIC_TYPES(SPECIALIZE) -#undef SPECIALIZE - -template -void AggregateFunctionsSingleValueMin::addBatchSinglePlace( - size_t row_begin, - size_t row_end, - AggregateDataPtr __restrict place, - const IColumn ** __restrict columns, - Arena * arena, - ssize_t if_argument_pos) const -{ - if constexpr (!is_any_of) - { - /// Leave other numeric types (large integers, decimals, etc) to keep doing the comparison as it's - /// faster than doing a permutation - return Parent::addBatchSinglePlace(row_begin, row_end, place, columns, arena, if_argument_pos); - } - - constexpr int nan_null_direction_hint = 1; - auto const & column = *columns[0]; - if (if_argument_pos >= 0) - { - size_t index = row_begin; - const auto & if_flags = assert_cast(*columns[if_argument_pos]).getData(); - while (if_flags[index] == 0 && index < row_end) - index++; - if (index >= row_end) - return; - - for (size_t i = index + 1; i < row_end; i++) - { - if ((if_flags[i] != 0) && (column.compareAt(i, index, column, nan_null_direction_hint) < 0)) - index = i; - } - this->data(place).changeIfLess(column, index, arena); - } - else - { - if (row_begin >= row_end) - return; - - /// TODO: Introduce row_begin and row_end to getPermutation - if (row_begin != 0 || row_end != column.size()) - { - size_t index = row_begin; - for (size_t i = index + 1; i < row_end; i++) - { - if (column.compareAt(i, index, column, nan_null_direction_hint) < 0) - index = i; - } - this->data(place).changeIfLess(column, index, arena); - } - else - { - constexpr IColumn::PermutationSortDirection direction = IColumn::PermutationSortDirection::Ascending; - constexpr IColumn::PermutationSortStability stability = IColumn::PermutationSortStability::Unstable; - IColumn::Permutation permutation; - constexpr UInt64 limit = 1; - column.getPermutation(direction, stability, limit, nan_null_direction_hint, permutation); - this->data(place).changeIfLess(column, permutation[0], arena); - } - } -} - -// NOLINTBEGIN(bugprone-macro-parentheses) -#define SPECIALIZE(TYPE) \ - template <> \ - void AggregateFunctionsSingleValueMin>>::addBatchSinglePlaceNotNull( \ - size_t row_begin, \ - size_t row_end, \ - AggregateDataPtr __restrict place, \ - const IColumn ** __restrict columns, \ - const UInt8 * __restrict null_map, \ - Arena *, \ - ssize_t if_argument_pos) const \ - { \ - const auto & column = assert_cast>::ColVecType &>(*columns[0]); \ - std::optional opt; \ - if (if_argument_pos >= 0) \ - { \ - const auto * if_flags = assert_cast(*columns[if_argument_pos]).getData().data(); \ - auto final_flags = std::make_unique(row_end); \ - for (size_t i = row_begin; i < row_end; ++i) \ - final_flags[i] = (!null_map[i]) & !!if_flags[i]; \ - opt = findExtremeMinIf(column.getData().data(), final_flags.get(), row_begin, row_end); \ - } \ - else \ - opt = findExtremeMinNotNull(column.getData().data(), null_map, row_begin, row_end); \ - if (opt.has_value()) \ - this->data(place).changeIfLess(opt.value()); \ - } -// NOLINTEND(bugprone-macro-parentheses) - -FOR_BASIC_NUMERIC_TYPES(SPECIALIZE) -#undef SPECIALIZE - -template -void AggregateFunctionsSingleValueMin::addBatchSinglePlaceNotNull( - size_t row_begin, - size_t row_end, - AggregateDataPtr __restrict place, - const IColumn ** __restrict columns, - const UInt8 * __restrict null_map, - Arena * arena, - ssize_t if_argument_pos) const -{ - if constexpr (!is_any_of) - { - /// Leave other numeric types (large integers, decimals, etc) to keep doing the comparison as it's - /// faster than doing a permutation - return Parent::addBatchSinglePlaceNotNull(row_begin, row_end, place, columns, null_map, arena, if_argument_pos); - } - - constexpr int nan_null_direction_hint = 1; - auto const & column = *columns[0]; - if (if_argument_pos >= 0) - { - size_t index = row_begin; - const auto & if_flags = assert_cast(*columns[if_argument_pos]).getData(); - while ((if_flags[index] == 0 || null_map[index] != 0) && (index < row_end)) - index++; - if (index >= row_end) - return; - - for (size_t i = index + 1; i < row_end; i++) - { - if ((if_flags[i] != 0) && (null_map[index] == 0) && (column.compareAt(i, index, column, nan_null_direction_hint) < 0)) - index = i; - } - this->data(place).changeIfLess(column, index, arena); - } - else - { - size_t index = row_begin; - while ((null_map[index] != 0) && (index < row_end)) - index++; - if (index >= row_end) - return; - - for (size_t i = index + 1; i < row_end; i++) - { - if ((null_map[i] == 0) && (column.compareAt(i, index, column, nan_null_direction_hint) < 0)) - index = i; - } - this->data(place).changeIfLess(column, index, arena); - } -} - -AggregateFunctionPtr createAggregateFunctionMin( - const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) -{ - return AggregateFunctionPtr(createAggregateFunctionSingleValue( - name, argument_types, parameters, settings)); -} - -AggregateFunctionPtr createAggregateFunctionArgMin( - const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) -{ - return AggregateFunctionPtr(createAggregateFunctionArgMinMax(name, argument_types, parameters, settings)); -} - -} - -void registerAggregateFunctionsMin(AggregateFunctionFactory & factory) -{ - factory.registerFunction("min", createAggregateFunctionMin, AggregateFunctionFactory::CaseInsensitive); - - /// The functions below depend on the order of data. - AggregateFunctionProperties properties = { .returns_default_when_only_null = false, .is_order_dependent = true }; - factory.registerFunction("argMin", { createAggregateFunctionArgMin, properties }); -} - -} diff --git a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h deleted file mode 100644 index dec70861543..00000000000 --- a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h +++ /dev/null @@ -1,1449 +0,0 @@ -#pragma once - -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "config.h" - -#if USE_EMBEDDED_COMPILER -# include -# include -#endif - -namespace DB -{ -struct Settings; - -namespace ErrorCodes -{ - extern const int ILLEGAL_TYPE_OF_ARGUMENT; - extern const int NOT_IMPLEMENTED; - extern const int TOO_LARGE_STRING_SIZE; - extern const int LOGICAL_ERROR; -} - -/** Aggregate functions that store one of passed values. - * For example: min, max, any, anyLast. - */ - - -/// For numeric values. -template -struct SingleValueDataFixed -{ - using Self = SingleValueDataFixed; - using ColVecType = ColumnVectorOrDecimal; - - bool has_value = false; /// We need to remember if at least one value has been passed. This is necessary for AggregateFunctionIf. - T value = T{}; - - static constexpr bool result_is_nullable = false; - static constexpr bool should_skip_null_arguments = true; - static constexpr bool is_any = false; - - bool has() const - { - return has_value; - } - - void insertResultInto(IColumn & to) const - { - if (has()) - assert_cast(to).getData().push_back(value); - else - assert_cast(to).insertDefault(); - } - - void write(WriteBuffer & buf, const ISerialization & /*serialization*/) const - { - writeBinary(has(), buf); - if (has()) - writeBinaryLittleEndian(value, buf); - } - - void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena *) - { - readBinary(has_value, buf); - if (has()) - readBinaryLittleEndian(value, buf); - } - - - void change(const IColumn & column, size_t row_num, Arena *) - { - has_value = true; - value = assert_cast(column).getData()[row_num]; - } - - /// Assuming to.has() - void change(const Self & to, Arena *) - { - has_value = true; - value = to.value; - } - - bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena) - { - if (!has()) - { - change(column, row_num, arena); - return true; - } - else - return false; - } - - bool changeFirstTime(const Self & to, Arena * arena) - { - if (!has() && to.has()) - { - change(to, arena); - return true; - } - else - return false; - } - - bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena) - { - change(column, row_num, arena); - return true; - } - - bool changeEveryTime(const Self & to, Arena * arena) - { - if (to.has()) - { - change(to, arena); - return true; - } - else - return false; - } - - bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena) - { - if (!has() || assert_cast(column).getData()[row_num] < value) - { - change(column, row_num, arena); - return true; - } - else - return false; - } - - bool changeIfLess(const Self & to, Arena * arena) - { - if (to.has() && (!has() || to.value < value)) - { - change(to, arena); - return true; - } - else - return false; - } - - void changeIfLess(T from) - { - if (!has() || from < value) - { - has_value = true; - value = from; - } - } - - bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) - { - if (!has() || assert_cast(column).getData()[row_num] > value) - { - change(column, row_num, arena); - return true; - } - else - return false; - } - - bool changeIfGreater(const Self & to, Arena * arena) - { - if (to.has() && (!has() || to.value > value)) - { - change(to, arena); - return true; - } - else - return false; - } - - void changeIfGreater(T & from) - { - if (!has() || from > value) - { - has_value = true; - value = from; - } - } - - bool isEqualTo(const Self & to) const - { - return has() && to.value == value; - } - - bool isEqualTo(const IColumn & column, size_t row_num) const - { - return has() && assert_cast(column).getData()[row_num] == value; - } - - static bool allocatesMemoryInArena() - { - return false; - } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = true; - - static llvm::Value * getValuePtrFromAggregateDataPtr(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - static constexpr size_t value_offset_from_structure = offsetof(SingleValueDataFixed, value); - auto * value_ptr = b.CreateConstInBoundsGEP1_64(b.getInt8Ty(), aggregate_data_ptr, value_offset_from_structure); - - return value_ptr; - } - - static llvm::Value * getValueFromAggregateDataPtr(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * type = toNativeType(builder); - auto * value_ptr = getValuePtrFromAggregateDataPtr(builder, aggregate_data_ptr); - - return b.CreateLoad(type, value_ptr); - } - - static void compileChange(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_ptr = aggregate_data_ptr; - b.CreateStore(b.getInt1(true), has_value_ptr); - - auto * value_ptr = getValuePtrFromAggregateDataPtr(b, aggregate_data_ptr); - b.CreateStore(value_to_check, value_ptr); - } - - static void compileChangeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - auto * value_src = getValueFromAggregateDataPtr(builder, aggregate_data_src_ptr); - - compileChange(builder, aggregate_data_dst_ptr, value_src); - } - - static void compileChangeFirstTime(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_ptr = aggregate_data_ptr; - auto * has_value_value = b.CreateLoad(b.getInt1Ty(), has_value_ptr); - - auto * head = b.GetInsertBlock(); - - auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); - auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); - auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); - - b.CreateCondBr(has_value_value, if_should_not_change, if_should_change); - - b.SetInsertPoint(if_should_not_change); - b.CreateBr(join_block); - - b.SetInsertPoint(if_should_change); - compileChange(builder, aggregate_data_ptr, value_to_check); - b.CreateBr(join_block); - - b.SetInsertPoint(join_block); - } - - static void compileChangeFirstTimeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_dst_ptr = aggregate_data_dst_ptr; - auto * has_value_dst = b.CreateLoad(b.getInt1Ty(), has_value_dst_ptr); - - auto * has_value_src_ptr = aggregate_data_src_ptr; - auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); - - auto * head = b.GetInsertBlock(); - - auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); - auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); - auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); - - b.CreateCondBr(b.CreateAnd(b.CreateNot(has_value_dst), has_value_src), if_should_change, if_should_not_change); - - b.SetInsertPoint(if_should_change); - compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - b.CreateBr(join_block); - - b.SetInsertPoint(if_should_not_change); - b.CreateBr(join_block); - - b.SetInsertPoint(join_block); - } - - static void compileChangeEveryTime(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - compileChange(builder, aggregate_data_ptr, value_to_check); - } - - static void compileChangeEveryTimeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_src_ptr = aggregate_data_src_ptr; - auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); - - auto * head = b.GetInsertBlock(); - - auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); - auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); - auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); - - b.CreateCondBr(has_value_src, if_should_change, if_should_not_change); - - b.SetInsertPoint(if_should_change); - compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - b.CreateBr(join_block); - - b.SetInsertPoint(if_should_not_change); - b.CreateBr(join_block); - - b.SetInsertPoint(join_block); - } - - template - static void compileChangeComparison(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_ptr = aggregate_data_ptr; - auto * has_value_value = b.CreateLoad(b.getInt1Ty(), has_value_ptr); - - auto * value = getValueFromAggregateDataPtr(b, aggregate_data_ptr); - - auto * head = b.GetInsertBlock(); - - auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); - auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); - auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); - - auto is_signed = std::numeric_limits::is_signed; - - llvm::Value * should_change_after_comparison = nullptr; - - if constexpr (is_less) - { - if (value_to_check->getType()->isIntegerTy()) - should_change_after_comparison = is_signed ? b.CreateICmpSLT(value_to_check, value) : b.CreateICmpULT(value_to_check, value); - else - should_change_after_comparison = b.CreateFCmpOLT(value_to_check, value); - } - else - { - if (value_to_check->getType()->isIntegerTy()) - should_change_after_comparison = is_signed ? b.CreateICmpSGT(value_to_check, value) : b.CreateICmpUGT(value_to_check, value); - else - should_change_after_comparison = b.CreateFCmpOGT(value_to_check, value); - } - - b.CreateCondBr(b.CreateOr(b.CreateNot(has_value_value), should_change_after_comparison), if_should_change, if_should_not_change); - - b.SetInsertPoint(if_should_change); - compileChange(builder, aggregate_data_ptr, value_to_check); - b.CreateBr(join_block); - - b.SetInsertPoint(if_should_not_change); - b.CreateBr(join_block); - - b.SetInsertPoint(join_block); - } - - template - static void compileChangeComparisonMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_dst_ptr = aggregate_data_dst_ptr; - auto * has_value_dst = b.CreateLoad(b.getInt1Ty(), has_value_dst_ptr); - - auto * value_dst = getValueFromAggregateDataPtr(b, aggregate_data_dst_ptr); - - auto * has_value_src_ptr = aggregate_data_src_ptr; - auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); - - auto * value_src = getValueFromAggregateDataPtr(b, aggregate_data_src_ptr); - - auto * head = b.GetInsertBlock(); - - auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); - auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); - auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); - - auto is_signed = std::numeric_limits::is_signed; - - llvm::Value * should_change_after_comparison = nullptr; - - if constexpr (is_less) - { - if (value_src->getType()->isIntegerTy()) - should_change_after_comparison = is_signed ? b.CreateICmpSLT(value_src, value_dst) : b.CreateICmpULT(value_src, value_dst); - else - should_change_after_comparison = b.CreateFCmpOLT(value_src, value_dst); - } - else - { - if (value_src->getType()->isIntegerTy()) - should_change_after_comparison = is_signed ? b.CreateICmpSGT(value_src, value_dst) : b.CreateICmpUGT(value_src, value_dst); - else - should_change_after_comparison = b.CreateFCmpOGT(value_src, value_dst); - } - - b.CreateCondBr(b.CreateAnd(has_value_src, b.CreateOr(b.CreateNot(has_value_dst), should_change_after_comparison)), if_should_change, if_should_not_change); - - b.SetInsertPoint(if_should_change); - compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - b.CreateBr(join_block); - - b.SetInsertPoint(if_should_not_change); - b.CreateBr(join_block); - - b.SetInsertPoint(join_block); - } - - static void compileChangeIfLess(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - static constexpr bool is_less = true; - compileChangeComparison(builder, aggregate_data_ptr, value_to_check); - } - - static void compileChangeIfLessMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - static constexpr bool is_less = true; - compileChangeComparisonMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - - static void compileChangeIfGreater(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - static constexpr bool is_less = false; - compileChangeComparison(builder, aggregate_data_ptr, value_to_check); - } - - static void compileChangeIfGreaterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - static constexpr bool is_less = false; - compileChangeComparisonMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - - static llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) - { - return getValueFromAggregateDataPtr(builder, aggregate_data_ptr); - } - -#endif -}; - -struct Compatibility -{ - /// Old versions used to store terminating null-character in SingleValueDataString. - /// Then -WithTerminatingZero methods were removed from IColumn interface, - /// because these methods are quite dangerous and easy to misuse. It introduced incompatibility. - /// See https://github.com/ClickHouse/ClickHouse/pull/41431 and https://github.com/ClickHouse/ClickHouse/issues/42916 - /// Here we keep these functions for compatibility. - /// It's safe because there's no way unsanitized user input (without \0 at the end) can reach these functions. - - static StringRef getDataAtWithTerminatingZero(const ColumnString & column, size_t n) - { - auto res = column.getDataAt(n); - /// ColumnString always reserves extra byte for null-character after string. - /// But getDataAt returns StringRef without the null-character. Let's add it. - chassert(res.data[res.size] == '\0'); - ++res.size; - return res; - } - - static void insertDataWithTerminatingZero(ColumnString & column, const char * pos, size_t length) - { - /// String already has terminating null-character. - /// But insertData will add another one unconditionally. Trim existing null-character to avoid duplication. - chassert(0 < length); - chassert(pos[length - 1] == '\0'); - column.insertData(pos, length - 1); - } -}; - -/** For strings. Short strings are stored in the object itself, and long strings are allocated separately. - * NOTE It could also be suitable for arrays of numbers. - */ -struct SingleValueDataString -{ -private: - using Self = SingleValueDataString; - - /// 0 size indicates that there is no value. Empty string must has terminating '\0' and, therefore, size of empty string is 1 - UInt32 size = 0; - UInt32 capacity = 0; /// power of two or zero - char * large_data; - -public: - static constexpr UInt32 AUTOMATIC_STORAGE_SIZE = 64; - static constexpr UInt32 MAX_SMALL_STRING_SIZE = AUTOMATIC_STORAGE_SIZE - sizeof(size) - sizeof(capacity) - sizeof(large_data); - static constexpr UInt32 MAX_STRING_SIZE = std::numeric_limits::max(); - -private: - char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero. - -public: - static constexpr bool result_is_nullable = false; - static constexpr bool should_skip_null_arguments = true; - static constexpr bool is_any = false; - - bool has() const - { - return size; - } - -private: - char * getDataMutable() - { - return size <= MAX_SMALL_STRING_SIZE ? small_data : large_data; - } - - const char * getData() const - { - const char * data_ptr = size <= MAX_SMALL_STRING_SIZE ? small_data : large_data; - /// It must always be terminated with null-character - chassert(0 < size); - chassert(data_ptr[size - 1] == '\0'); - return data_ptr; - } - - StringRef getStringRef() const - { - return StringRef(getData(), size); - } - -public: - void insertResultInto(IColumn & to) const - { - if (has()) - Compatibility::insertDataWithTerminatingZero(assert_cast(to), getData(), size); - else - assert_cast(to).insertDefault(); - } - - void write(WriteBuffer & buf, const ISerialization & /*serialization*/) const - { - if (unlikely(MAX_STRING_SIZE < size)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "String size is too big ({}), it's a bug", size); - - /// For serialization we use signed Int32 (for historical reasons), -1 means "no value" - Int32 size_to_write = size ? size : -1; - writeBinaryLittleEndian(size_to_write, buf); - if (has()) - buf.write(getData(), size); - } - - void allocateLargeDataIfNeeded(UInt32 size_to_reserve, Arena * arena) - { - if (capacity < size_to_reserve) - { - if (unlikely(MAX_STRING_SIZE < size_to_reserve)) - throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({}), maximum: {}", - size_to_reserve, MAX_STRING_SIZE); - - size_t rounded_capacity = roundUpToPowerOfTwoOrZero(size_to_reserve); - chassert(rounded_capacity <= MAX_STRING_SIZE + 1); /// rounded_capacity <= 2^31 - capacity = static_cast(rounded_capacity); - - /// Don't free large_data here. - large_data = arena->alloc(capacity); - } - } - - void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena * arena) - { - /// For serialization we use signed Int32 (for historical reasons), -1 means "no value" - Int32 rhs_size_signed; - readBinaryLittleEndian(rhs_size_signed, buf); - - if (rhs_size_signed < 0) - { - /// Don't free large_data here. - size = 0; - return; - } - - UInt32 rhs_size = rhs_size_signed; - if (rhs_size <= MAX_SMALL_STRING_SIZE) - { - /// Don't free large_data here. - size = rhs_size; - buf.readStrict(small_data, size); - } - else - { - /// Reserve one byte more for null-character - allocateLargeDataIfNeeded(rhs_size + 1, arena); - size = rhs_size; - buf.readStrict(large_data, size); - } - - /// Check if the string we read is null-terminated (getDataMutable does not have the assertion) - if (0 < size && getDataMutable()[size - 1] == '\0') - return; - - /// It's not null-terminated, but it must be (for historical reasons). There are two variants: - /// - The value was serialized by one of the incompatible versions of ClickHouse. We had some range of versions - /// that used to serialize SingleValueDataString without terminating '\0'. Let's just append it. - /// - An attacker sent crafted data. Sanitize it and append '\0'. - /// In all other cases the string must be already null-terminated. - - /// NOTE We cannot add '\0' unconditionally, because it will be duplicated. - /// NOTE It's possible that a string that actually ends with '\0' was written by one of the incompatible versions. - /// Unfortunately, we cannot distinguish it from normal string written by normal version. - /// So such strings will be trimmed. - - if (size == MAX_SMALL_STRING_SIZE) - { - /// Special case: We have to move value to large_data - allocateLargeDataIfNeeded(size + 1, arena); - memcpy(large_data, small_data, size); - } - - /// We have enough space to append - ++size; - getDataMutable()[size - 1] = '\0'; - } - - /// Assuming to.has() - void changeImpl(StringRef value, Arena * arena) - { - if (unlikely(MAX_STRING_SIZE < value.size)) - throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({}), maximum: {}", - value.size, MAX_STRING_SIZE); - - UInt32 value_size = static_cast(value.size); - - if (value_size <= MAX_SMALL_STRING_SIZE) - { - /// Don't free large_data here. - size = value_size; - - if (size > 0) - memcpy(small_data, value.data, size); - } - else - { - allocateLargeDataIfNeeded(value_size, arena); - size = value_size; - memcpy(large_data, value.data, size); - } - } - - void change(const IColumn & column, size_t row_num, Arena * arena) - { - changeImpl(Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num), arena); - } - - void change(const Self & to, Arena * arena) - { - changeImpl(to.getStringRef(), arena); - } - - bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena) - { - if (!has()) - { - change(column, row_num, arena); - return true; - } - else - return false; - } - - bool changeFirstTime(const Self & to, Arena * arena) - { - if (!has() && to.has()) - { - change(to, arena); - return true; - } - else - return false; - } - - bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena) - { - change(column, row_num, arena); - return true; - } - - bool changeEveryTime(const Self & to, Arena * arena) - { - if (to.has()) - { - change(to, arena); - return true; - } - else - return false; - } - - bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena) - { - if (!has() || Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num) < getStringRef()) - { - change(column, row_num, arena); - return true; - } - else - return false; - } - - bool changeIfLess(const Self & to, Arena * arena) - { - if (to.has() && (!has() || to.getStringRef() < getStringRef())) - { - change(to, arena); - return true; - } - else - return false; - } - - bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) - { - if (!has() || Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num) > getStringRef()) - { - change(column, row_num, arena); - return true; - } - else - return false; - } - - bool changeIfGreater(const Self & to, Arena * arena) - { - if (to.has() && (!has() || to.getStringRef() > getStringRef())) - { - change(to, arena); - return true; - } - else - return false; - } - - bool isEqualTo(const Self & to) const - { - return has() && to.getStringRef() == getStringRef(); - } - - bool isEqualTo(const IColumn & column, size_t row_num) const - { - return has() && Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num) == getStringRef(); - } - - static bool allocatesMemoryInArena() - { - return true; - } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = false; - -#endif - -}; - -static_assert( - sizeof(SingleValueDataString) == SingleValueDataString::AUTOMATIC_STORAGE_SIZE, - "Incorrect size of SingleValueDataString struct"); - - -/// For any other value types. -struct SingleValueDataGeneric -{ -private: - using Self = SingleValueDataGeneric; - Field value; - -public: - static constexpr bool result_is_nullable = false; - static constexpr bool should_skip_null_arguments = true; - static constexpr bool is_any = false; - - bool has() const { return !value.isNull(); } - - void insertResultInto(IColumn & to) const - { - if (has()) - to.insert(value); - else - to.insertDefault(); - } - - void write(WriteBuffer & buf, const ISerialization & serialization) const - { - if (!value.isNull()) - { - writeBinary(true, buf); - serialization.serializeBinary(value, buf, {}); - } - else - writeBinary(false, buf); - } - - void read(ReadBuffer & buf, const ISerialization & serialization, Arena *) - { - bool is_not_null; - readBinary(is_not_null, buf); - - if (is_not_null) - serialization.deserializeBinary(value, buf, {}); - } - - void change(const IColumn & column, size_t row_num, Arena *) { column.get(row_num, value); } - - void change(const Self & to, Arena *) { value = to.value; } - - bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena) - { - if (!has()) - { - change(column, row_num, arena); - return true; - } - else - return false; - } - - bool changeFirstTime(const Self & to, Arena * arena) - { - if (!has() && to.has()) - { - change(to, arena); - return true; - } - else - return false; - } - - bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena) - { - change(column, row_num, arena); - return true; - } - - bool changeEveryTime(const Self & to, Arena * arena) - { - if (to.has()) - { - change(to, arena); - return true; - } - else - return false; - } - - bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena) - { - if (!has()) - { - change(column, row_num, arena); - return true; - } - else - { - Field new_value; - column.get(row_num, new_value); - if (new_value < value) - { - value = new_value; - return true; - } - else - return false; - } - } - - bool changeIfLess(const Self & to, Arena * arena) - { - if (!to.has()) - return false; - if (!has() || to.value < value) - { - change(to, arena); - return true; - } - else - return false; - } - - bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) - { - if (!has()) - { - change(column, row_num, arena); - return true; - } - else - { - Field new_value; - column.get(row_num, new_value); - if (new_value > value) - { - value = new_value; - return true; - } - else - return false; - } - } - - bool changeIfGreater(const Self & to, Arena * arena) - { - if (!to.has()) - return false; - if (!has() || to.value > value) - { - change(to, arena); - return true; - } - else - return false; - } - - bool isEqualTo(const IColumn & column, size_t row_num) const { return has() && value == column[row_num]; } - - bool isEqualTo(const Self & to) const { return has() && to.value == value; } - - static bool allocatesMemoryInArena() - { - return false; - } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = false; - -#endif - -}; - - -/** What is the difference between the aggregate functions min, max, any, anyLast - * (the condition that the stored value is replaced by a new one, - * as well as, of course, the name). - */ - -template -struct AggregateFunctionMinData : Data -{ - using Self = AggregateFunctionMinData; - using Impl = Data; - - bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfLess(column, row_num, arena); } - bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfLess(to, arena); } - void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeIfLess(column, 0, arena); } - - static const char * name() { return "min"; } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = Data::is_compilable; - - static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - Data::compileChangeIfLess(builder, aggregate_data_ptr, value_to_check); - } - - static void compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - Data::compileChangeIfLessMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - -#endif -}; - -template -struct AggregateFunctionMaxData : Data -{ - using Self = AggregateFunctionMaxData; - using Impl = Data; - - bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfGreater(column, row_num, arena); } - bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfGreater(to, arena); } - void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeIfGreater(column, 0, arena); } - - static const char * name() { return "max"; } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = Data::is_compilable; - - static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - Data::compileChangeIfGreater(builder, aggregate_data_ptr, value_to_check); - } - - static void compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - Data::compileChangeIfGreaterMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - -#endif -}; - -template -struct AggregateFunctionAnyData : Data -{ - using Self = AggregateFunctionAnyData; - static constexpr bool is_any = true; - - bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeFirstTime(column, row_num, arena); } - bool changeIfBetter(const Self & to, Arena * arena) { return this->changeFirstTime(to, arena); } - void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeFirstTime(column, 0, arena); } - - static const char * name() { return "any"; } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = Data::is_compilable; - - static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - Data::compileChangeFirstTime(builder, aggregate_data_ptr, value_to_check); - } - - static void compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - Data::compileChangeFirstTimeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - -#endif -}; - -template -struct AggregateFunctionAnyLastData : Data -{ - using Self = AggregateFunctionAnyLastData; - - bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeEveryTime(column, row_num, arena); } - bool changeIfBetter(const Self & to, Arena * arena) { return this->changeEveryTime(to, arena); } - void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeEveryTime(column, 0, arena); } - - static const char * name() { return "anyLast"; } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = Data::is_compilable; - - static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - Data::compileChangeEveryTime(builder, aggregate_data_ptr, value_to_check); - } - - static void compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - Data::compileChangeEveryTimeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - -#endif -}; - - -/** The aggregate function 'singleValueOrNull' is used to implement subquery operators, - * such as x = ALL (SELECT ...) - * It checks if there is only one unique non-NULL value in the data. - * If there is only one unique value - returns it. - * If there are zero or at least two distinct values - returns NULL. - */ -template -struct AggregateFunctionSingleValueOrNullData : Data -{ - using Self = AggregateFunctionSingleValueOrNullData; - - static constexpr bool result_is_nullable = true; - - bool first_value = true; - bool is_null = false; - - void changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) - { - if (first_value) - { - first_value = false; - this->change(column, row_num, arena); - } - else if (!this->isEqualTo(column, row_num)) - { - is_null = true; - } - } - - void changeIfBetter(const Self & to, Arena * arena) - { - if (!to.has()) - return; - - if (first_value && !to.first_value) - { - first_value = false; - this->change(to, arena); - } - else if (!this->isEqualTo(to)) - { - is_null = true; - } - } - - void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeIfBetter(column, 0, arena); } - - void insertResultInto(IColumn & to) const - { - if (is_null || first_value) - { - to.insertDefault(); - } - else - { - ColumnNullable & col = typeid_cast(to); - col.getNullMapColumn().insertDefault(); - this->Data::insertResultInto(col.getNestedColumn()); - } - } - - static const char * name() { return "singleValueOrNull"; } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = false; - -#endif -}; - -/** Implement 'heavy hitters' algorithm. - * Selects most frequent value if its frequency is more than 50% in each thread of execution. - * Otherwise, selects some arbitrary value. - * http://www.cs.umd.edu/~samir/498/karp.pdf - */ -template -struct AggregateFunctionAnyHeavyData : Data -{ - UInt64 counter = 0; - - using Self = AggregateFunctionAnyHeavyData; - - bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) - { - if (this->isEqualTo(column, row_num)) - { - ++counter; - } - else - { - if (counter == 0) - { - this->change(column, row_num, arena); - ++counter; - return true; - } - else - --counter; - } - return false; - } - - bool changeIfBetter(const Self & to, Arena * arena) - { - if (!to.has()) - return false; - - if (this->isEqualTo(to)) - { - counter += to.counter; - } - else - { - if ((!this->has() && to.has()) || counter < to.counter) - { - this->change(to, arena); - return true; - } - else - counter -= to.counter; - } - return false; - } - - void addManyDefaults(const IColumn & column, size_t length, Arena * arena) - { - for (size_t i = 0; i < length; ++i) - changeIfBetter(column, 0, arena); - } - - void write(WriteBuffer & buf, const ISerialization & serialization) const - { - Data::write(buf, serialization); - writeBinaryLittleEndian(counter, buf); - } - - void read(ReadBuffer & buf, const ISerialization & serialization, Arena * arena) - { - Data::read(buf, serialization, arena); - readBinaryLittleEndian(counter, buf); - } - - static const char * name() { return "anyHeavy"; } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = false; - -#endif - -}; - - -template -class AggregateFunctionsSingleValue : public IAggregateFunctionDataHelper> -{ - static constexpr bool is_any = Data::is_any; - -private: - SerializationPtr serialization; - -public: - explicit AggregateFunctionsSingleValue(const DataTypePtr & type) - : IAggregateFunctionDataHelper>({type}, {}, createResultType(type)) - , serialization(type->getDefaultSerialization()) - { - if (StringRef(Data::name()) == StringRef("min") - || StringRef(Data::name()) == StringRef("max")) - { - if (!type->isComparable()) - throw Exception( - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, - "Illegal type {} of argument of aggregate function {} because the values of that data type are not comparable", - type->getName(), - Data::name()); - } - } - - String getName() const override { return Data::name(); } - - static DataTypePtr createResultType(const DataTypePtr & type_) - { - if constexpr (Data::result_is_nullable) - return makeNullable(type_); - return type_; - } - - void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override - { - this->data(place).changeIfBetter(*columns[0], row_num, arena); - } - - void addManyDefaults( - AggregateDataPtr __restrict place, - const IColumn ** columns, - size_t length, - Arena * arena) const override - { - this->data(place).addManyDefaults(*columns[0], length, arena); - } - - void addBatchSinglePlace( - size_t row_begin, - size_t row_end, - AggregateDataPtr place, - const IColumn ** columns, - Arena * arena, - ssize_t if_argument_pos) const override - { - if constexpr (is_any) - if (this->data(place).has()) - return; - if (if_argument_pos >= 0) - { - const auto & flags = assert_cast(*columns[if_argument_pos]).getData(); - for (size_t i = row_begin; i < row_end; ++i) - { - if (flags[i]) - { - this->data(place).changeIfBetter(*columns[0], i, arena); - if constexpr (is_any) - break; - } - } - } - else - { - for (size_t i = row_begin; i < row_end; ++i) - { - this->data(place).changeIfBetter(*columns[0], i, arena); - if constexpr (is_any) - break; - } - } - } - - void addBatchSinglePlaceNotNull( /// NOLINT - size_t row_begin, - size_t row_end, - AggregateDataPtr place, - const IColumn ** columns, - const UInt8 * null_map, - Arena * arena, - ssize_t if_argument_pos = -1) const override - { - if constexpr (is_any) - if (this->data(place).has()) - return; - - if (if_argument_pos >= 0) - { - const auto & flags = assert_cast(*columns[if_argument_pos]).getData(); - for (size_t i = row_begin; i < row_end; ++i) - { - if (!null_map[i] && flags[i]) - { - this->data(place).changeIfBetter(*columns[0], i, arena); - if constexpr (is_any) - break; - } - } - } - else - { - for (size_t i = row_begin; i < row_end; ++i) - { - if (!null_map[i]) - { - this->data(place).changeIfBetter(*columns[0], i, arena); - if constexpr (is_any) - break; - } - } - } - } - - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override - { - this->data(place).changeIfBetter(this->data(rhs), arena); - } - - void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override - { - this->data(place).write(buf, *serialization); - } - - void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional /* version */, Arena * arena) const override - { - this->data(place).read(buf, *serialization, arena); - } - - bool allocatesMemoryInArena() const override - { - return Data::allocatesMemoryInArena(); - } - - void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override - { - this->data(place).insertResultInto(to); - } - - AggregateFunctionPtr getOwnNullAdapter( - const AggregateFunctionPtr & original_function, - const DataTypes & /*arguments*/, - const Array & /*params*/, - const AggregateFunctionProperties & /*properties*/) const override - { - if (Data::result_is_nullable && !Data::should_skip_null_arguments) - return original_function; - return nullptr; - } - -#if USE_EMBEDDED_COMPILER - - bool isCompilable() const override - { - if constexpr (!Data::is_compilable) - return false; - - return canBeNativeType(*this->argument_types[0]); - } - - - void compileCreate(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - b.CreateMemSet(aggregate_data_ptr, llvm::ConstantInt::get(b.getInt8Ty(), 0), this->sizeOfData(), llvm::assumeAligned(this->alignOfData())); - } - - void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override - { - if constexpr (Data::is_compilable) - { - Data::compileChangeIfBetter(builder, aggregate_data_ptr, arguments[0].value); - } - else - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); - } - } - - void compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override - { - if constexpr (Data::is_compilable) - { - Data::compileChangeIfBetterMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - else - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); - } - } - - llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override - { - if constexpr (Data::is_compilable) - { - return Data::compileGetResult(builder, aggregate_data_ptr); - } - else - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); - } - } - -#endif -}; - -} diff --git a/src/AggregateFunctions/AggregateFunctionSingleValueOrNull.cpp b/src/AggregateFunctions/AggregateFunctionSingleValueOrNull.cpp index cd897dfcf6e..b14af34c5fc 100644 --- a/src/AggregateFunctions/AggregateFunctionSingleValueOrNull.cpp +++ b/src/AggregateFunctions/AggregateFunctionSingleValueOrNull.cpp @@ -1,19 +1,193 @@ #include -#include #include -#include "registerAggregateFunctions.h" +#include +#include +#include namespace DB { struct Settings; +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} + namespace { +/** The aggregate function 'singleValueOrNull' is used to implement subquery operators, + * such as x = ALL (SELECT ...) + * It checks if there is only one unique non-NULL value in the data. + * If there is only one unique value - returns it. + * If there are zero or at least two distinct values - returns NULL. + */ -AggregateFunctionPtr createAggregateFunctionSingleValueOrNull(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) +struct AggregateFunctionSingleValueOrNullData { - return AggregateFunctionPtr(createAggregateFunctionSingleValue(name, argument_types, parameters, settings)); + using Self = AggregateFunctionSingleValueOrNullData; + +private: + SingleValueDataBaseMemoryBlock v_data; + bool first_value = true; + bool is_null = false; + +public: + [[noreturn]] explicit AggregateFunctionSingleValueOrNullData() + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "AggregateFunctionSingleValueOrNullData initialized empty"); + } + + explicit AggregateFunctionSingleValueOrNullData(TypeIndex value_type) { generateSingleValueFromTypeIndex(value_type, v_data); } + + ~AggregateFunctionSingleValueOrNullData() { data().~SingleValueDataBase(); } + + SingleValueDataBase & data() { return v_data.get(); } + const SingleValueDataBase & data() const { return v_data.get(); } + + bool isNull() const { return is_null; } + + void add(const IColumn & column, size_t row_num, Arena * arena) + { + if (first_value) + { + first_value = false; + data().set(column, row_num, arena); + } + else if (!data().isEqualTo(column, row_num)) + { + is_null = true; + } + } + + void add(const Self & to, Arena * arena) + { + if (!to.data().has()) + return; + + if (first_value && !to.first_value) + { + first_value = false; + data().set(to.data(), arena); + } + else if (!data().isEqualTo(to.data())) + { + is_null = true; + } + } + + /// TODO: Methods write and read lose data (first_value and is_null) + /// Fixing it requires a breaking change (but it's probably necessary) + void write(WriteBuffer & buf, const ISerialization & serialization) const { data().write(buf, serialization); } + + void read(ReadBuffer & buf, const ISerialization & serialization, Arena * arena) { data().read(buf, serialization, arena); } + + void insertResultInto(IColumn & to) const + { + if (is_null || first_value) + { + to.insertDefault(); + } + else + { + ColumnNullable & col = typeid_cast(to); + col.getNullMapColumn().insertDefault(); + data().insertResultInto(col.getNestedColumn()); + } + } +}; + + +class AggregateFunctionSingleValueOrNull final + : public IAggregateFunctionDataHelper +{ +private: + SerializationPtr serialization; + const TypeIndex value_type_index; + +public: + explicit AggregateFunctionSingleValueOrNull(const DataTypePtr & type) + : IAggregateFunctionDataHelper( + {type}, {}, makeNullable(type)) + , serialization(type->getDefaultSerialization()) + , value_type_index(WhichDataType(type).idx) + { + } + + void create(AggregateDataPtr __restrict place) const override { new (place) AggregateFunctionSingleValueOrNullData(value_type_index); } + + String getName() const override { return "singleValueOrNull"; } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + this->data(place).add(*columns[0], row_num, arena); + } + + void addBatchSinglePlace( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** __restrict columns, + Arena * arena, + ssize_t if_argument_pos) const override + { + if (this->data(place).isNull()) + return; + IAggregateFunctionDataHelper::addBatchSinglePlace( + row_begin, row_end, place, columns, arena, if_argument_pos); + } + + void addBatchSinglePlaceNotNull( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** __restrict columns, + const UInt8 * __restrict null_map, + Arena * arena, + ssize_t if_argument_pos) const override + { + if (this->data(place).isNull()) + return; + IAggregateFunctionDataHelper::addBatchSinglePlaceNotNull( + row_begin, row_end, place, columns, null_map, arena, if_argument_pos); + } + + void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override + { + this->data(place).add(*columns[0], 0, arena); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + this->data(place).add(this->data(rhs), arena); + } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override + { + this->data(place).write(buf, *serialization); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional /* version */, Arena * arena) const override + { + this->data(place).read(buf, *serialization, arena); + } + + bool allocatesMemoryInArena() const override { return singleValueTypeAllocatesMemoryInArena(value_type_index); } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + this->data(place).insertResultInto(to); + } +}; + +AggregateFunctionPtr createAggregateFunctionSingleValueOrNull( + const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *) +{ + assertNoParameters(name, parameters); + assertUnary(name, argument_types); + + const DataTypePtr & res_type = argument_types[0]; + return AggregateFunctionPtr(new AggregateFunctionSingleValueOrNull(res_type)); } } @@ -22,6 +196,4 @@ void registerAggregateFunctionSingleValueOrNull(AggregateFunctionFactory & facto { factory.registerFunction("singleValueOrNull", createAggregateFunctionSingleValueOrNull); } - - } diff --git a/src/AggregateFunctions/AggregateFunctionsArgMinArgMax.cpp b/src/AggregateFunctions/AggregateFunctionsArgMinArgMax.cpp new file mode 100644 index 00000000000..e8f40120152 --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionsArgMinArgMax.cpp @@ -0,0 +1,236 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +struct Settings; + +namespace ErrorCodes +{ + +extern const int CORRUPTED_DATA; +extern const int ILLEGAL_TYPE_OF_ARGUMENT; +extern const int LOGICAL_ERROR; +} + +namespace +{ + +template +struct AggregateFunctionArgMinMaxData +{ +private: + SingleValueDataBaseMemoryBlock result_data; + ValueType value_data; + +public: + SingleValueDataBase & result() { return result_data.get(); } + const SingleValueDataBase & result() const { return result_data.get(); } + ValueType & value() { return value_data; } + const ValueType & value() const { return value_data; } + + [[noreturn]] explicit AggregateFunctionArgMinMaxData() + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "AggregateFunctionArgMinMaxData initialized empty"); + } + + explicit AggregateFunctionArgMinMaxData(TypeIndex result_type) : value_data() + { + generateSingleValueFromTypeIndex(result_type, result_data); + } + + ~AggregateFunctionArgMinMaxData() { result().~SingleValueDataBase(); } +}; + +static_assert( + sizeof(AggregateFunctionArgMinMaxData) <= 2 * SingleValueDataBase::MAX_STORAGE_SIZE, + "Incorrect size of AggregateFunctionArgMinMaxData struct"); + +/// Returns the first arg value found for the minimum/maximum value. Example: argMin(arg, value). +template +class AggregateFunctionArgMinMax final + : public IAggregateFunctionDataHelper, AggregateFunctionArgMinMax> +{ +private: + const DataTypePtr & type_val; + const SerializationPtr serialization_res; + const SerializationPtr serialization_val; + const TypeIndex result_type_index; + + using Base = IAggregateFunctionDataHelper, AggregateFunctionArgMinMax>; + +public: + explicit AggregateFunctionArgMinMax(const DataTypes & argument_types_) + : Base(argument_types_, {}, argument_types_[0]) + , type_val(this->argument_types[1]) + , serialization_res(this->argument_types[0]->getDefaultSerialization()) + , serialization_val(this->argument_types[1]->getDefaultSerialization()) + , result_type_index(WhichDataType(this->argument_types[0]).idx) + { + if (!type_val->isComparable()) + throw Exception( + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal type {} of second argument of aggregate function {} because the values of that data type are not comparable", + type_val->getName(), + getName()); + } + + void create(AggregateDataPtr __restrict place) const override /// NOLINT + { + new (place) AggregateFunctionArgMinMaxData(result_type_index); + } + + String getName() const override + { + if constexpr (isMin) + return "argMin"; + else + return "argMax"; + } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + if constexpr (isMin) + { + if (this->data(place).value().setIfSmaller(*columns[1], row_num, arena)) + this->data(place).result().set(*columns[0], row_num, arena); + } + else + { + if (this->data(place).value().setIfGreater(*columns[1], row_num, arena)) + this->data(place).result().set(*columns[0], row_num, arena); + } + } + + void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override + { + add(place, columns, 0, arena); + } + + void addBatchSinglePlace( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** __restrict columns, + Arena * arena, + ssize_t if_argument_pos) const override + { + std::optional idx; + if (if_argument_pos >= 0) + { + const auto & if_map = assert_cast(*columns[if_argument_pos]).getData(); + if constexpr (isMin) + idx = this->data(place).value().getSmallestIndexNotNullIf(*columns[1], nullptr, if_map.data(), row_begin, row_end); + else + idx = this->data(place).value().getGreatestIndexNotNullIf(*columns[1], nullptr, if_map.data(), row_begin, row_end); + } + else + { + if constexpr (isMin) + idx = this->data(place).value().getSmallestIndex(*columns[1], row_begin, row_end); + else + idx = this->data(place).value().getGreatestIndex(*columns[1], row_begin, row_end); + } + + if (idx) + add(place, columns, *idx, arena); + } + + void addBatchSinglePlaceNotNull( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** __restrict columns, + const UInt8 * __restrict null_map, + Arena * arena, + ssize_t if_argument_pos) const override + { + std::optional idx; + if (if_argument_pos >= 0) + { + const auto & if_map = assert_cast(*columns[if_argument_pos]).getData(); + if constexpr (isMin) + idx = this->data(place).value().getSmallestIndexNotNullIf(*columns[1], null_map, if_map.data(), row_begin, row_end); + else + idx = this->data(place).value().getGreatestIndexNotNullIf(*columns[1], null_map, if_map.data(), row_begin, row_end); + } + else + { + if constexpr (isMin) + idx = this->data(place).value().getSmallestIndexNotNullIf(*columns[1], null_map, nullptr, row_begin, row_end); + else + idx = this->data(place).value().getGreatestIndexNotNullIf(*columns[1], null_map, nullptr, row_begin, row_end); + } + + if (idx) + add(place, columns, *idx, arena); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + if constexpr (isMin) + { + if (this->data(place).value().setIfSmaller(this->data(rhs).value(), arena)) + this->data(place).result().set(this->data(rhs).result(), arena); + } + else + { + if (this->data(place).value().setIfGreater(this->data(rhs).value(), arena)) + this->data(place).result().set(this->data(rhs).result(), arena); + } + } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override + { + this->data(place).result().write(buf, *serialization_res); + this->data(place).value().write(buf, *serialization_val); + } + + void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional /* version */, Arena * arena) const override + { + this->data(place).result().read(buf, *serialization_res, arena); + this->data(place).value().read(buf, *serialization_val, arena); + if (unlikely(this->data(place).value().has() != this->data(place).result().has())) + throw Exception( + ErrorCodes::CORRUPTED_DATA, + "Invalid state of the aggregate function {}: has_value ({}) != has_result ({})", + getName(), + this->data(place).value().has(), + this->data(place).result().has()); + } + + bool allocatesMemoryInArena() const override + { + return singleValueTypeAllocatesMemoryInArena(result_type_index) || ValueData::allocatesMemoryInArena(); + } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + this->data(place).result().insertResultInto(to); + } +}; + +template +AggregateFunctionPtr createAggregateFunctionArgMinMax( + const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) +{ + return AggregateFunctionPtr(createAggregateFunctionSingleValue( + name, argument_types, parameters, settings)); +} + +} + +void registerAggregateFunctionsArgMinArgMax(AggregateFunctionFactory & factory) +{ + AggregateFunctionProperties properties = {.returns_default_when_only_null = false, .is_order_dependent = true}; + factory.registerFunction("argMin", {createAggregateFunctionArgMinMax, properties}); + factory.registerFunction("argMax", {createAggregateFunctionArgMinMax, properties}); +} + +} diff --git a/src/AggregateFunctions/AggregateFunctionsMinMax.cpp b/src/AggregateFunctions/AggregateFunctionsMinMax.cpp new file mode 100644 index 00000000000..03e21c15a75 --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionsMinMax.cpp @@ -0,0 +1,202 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ +struct Settings; + +namespace ErrorCodes +{ +extern const int ILLEGAL_TYPE_OF_ARGUMENT; +extern const int NOT_IMPLEMENTED; +} + +namespace +{ + +template +class AggregateFunctionMinMax final : public IAggregateFunctionDataHelper> +{ +private: + SerializationPtr serialization; + +public: + explicit AggregateFunctionMinMax(const DataTypes & argument_types_) + : IAggregateFunctionDataHelper>(argument_types_, {}, argument_types_[0]) + , serialization(this->result_type->getDefaultSerialization()) + { + if (!this->result_type->isComparable()) + throw Exception( + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal type {} of argument of aggregate function {} because the values of that data type are not comparable", + this->result_type->getName(), + getName()); + } + + String getName() const override + { + if constexpr (isMin) + return "min"; + else + return "max"; + } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + if constexpr (isMin) + this->data(place).setIfSmaller(*columns[0], row_num, arena); + else + this->data(place).setIfGreater(*columns[0], row_num, arena); + } + + void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t, Arena * arena) const override + { + add(place, columns, 0, arena); + } + + void addBatchSinglePlace( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** __restrict columns, + Arena * arena, + ssize_t if_argument_pos) const override + { + if (if_argument_pos >= 0) + { + const auto & if_map = assert_cast(*columns[if_argument_pos]).getData(); + if constexpr (isMin) + this->data(place).setSmallestNotNullIf(*columns[0], nullptr, if_map.data(), row_begin, row_end, arena); + else + this->data(place).setGreatestNotNullIf(*columns[0], nullptr, if_map.data(), row_begin, row_end, arena); + } + else + { + if constexpr (isMin) + this->data(place).setSmallest(*columns[0], row_begin, row_end, arena); + else + this->data(place).setGreatest(*columns[0], row_begin, row_end, arena); + } + } + + void addBatchSinglePlaceNotNull( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** __restrict columns, + const UInt8 * __restrict null_map, + Arena * arena, + ssize_t if_argument_pos) const override + { + if (if_argument_pos >= 0) + { + const auto & if_map = assert_cast(*columns[if_argument_pos]).getData(); + if constexpr (isMin) + this->data(place).setSmallestNotNullIf(*columns[0], null_map, if_map.data(), row_begin, row_end, arena); + else + this->data(place).setGreatestNotNullIf(*columns[0], null_map, if_map.data(), row_begin, row_end, arena); + } + else + { + if constexpr (isMin) + this->data(place).setSmallestNotNullIf(*columns[0], null_map, nullptr, row_begin, row_end, arena); + else + this->data(place).setGreatestNotNullIf(*columns[0], null_map, nullptr, row_begin, row_end, arena); + } + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + if constexpr (isMin) + this->data(place).setIfSmaller(this->data(rhs), arena); + else + this->data(place).setIfGreater(this->data(rhs), arena); + } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override + { + this->data(place).write(buf, *serialization); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional /* version */, Arena * arena) const override + { + this->data(place).read(buf, *serialization, arena); + } + + bool allocatesMemoryInArena() const override { return Data::allocatesMemoryInArena(); } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + this->data(place).insertResultInto(to); + } + +#if USE_EMBEDDED_COMPILER + bool isCompilable() const override + { + if constexpr (!Data::is_compilable) + return false; + else + return Data::isCompilable(*this->argument_types[0]); + } + + void compileCreate(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override + { + if constexpr (Data::is_compilable) + Data::compileCreate(builder, aggregate_data_ptr); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } + + void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const ValuesWithType & arguments) const override + { + if constexpr (Data::is_compilable) + if constexpr (isMin) + Data::compileMin(builder, aggregate_data_ptr, arguments[0].value); + else + Data::compileMax(builder, aggregate_data_ptr, arguments[0].value); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } + + void + compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override + { + if constexpr (Data::is_compilable) + if constexpr (isMin) + Data::compileMinMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + else + Data::compileMaxMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } + + llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override + { + if constexpr (Data::is_compilable) + return Data::compileGetResult(builder, aggregate_data_ptr); + else + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not JIT-compilable", getName()); + } +#endif +}; + +template +AggregateFunctionPtr createAggregateFunctionMinMax( + const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) +{ + return AggregateFunctionPtr( + createAggregateFunctionSingleValue(name, argument_types, parameters, settings)); +} +} + +void registerAggregateFunctionsMinMax(AggregateFunctionFactory & factory) +{ + factory.registerFunction("min", createAggregateFunctionMinMax, AggregateFunctionFactory::CaseInsensitive); + factory.registerFunction("max", createAggregateFunctionMinMax, AggregateFunctionFactory::CaseInsensitive); +} + +} diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionArgMinMax.cpp b/src/AggregateFunctions/Combinators/AggregateFunctionArgMinMax.cpp deleted file mode 100644 index 38e19a0b8da..00000000000 --- a/src/AggregateFunctions/Combinators/AggregateFunctionArgMinMax.cpp +++ /dev/null @@ -1,93 +0,0 @@ -#include "AggregateFunctionArgMinMax.h" -#include "AggregateFunctionCombinatorFactory.h" - -#include -#include -#include -#include - -namespace DB -{ - -namespace ErrorCodes -{ -extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; -} - -namespace -{ -template