mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge branch 'master' into sort_mode_rename
This commit is contained in:
commit
8fece1e2d2
2
contrib/NuRaft
vendored
2
contrib/NuRaft
vendored
@ -1 +1 @@
|
||||
Subproject commit bdba298189e29995892de78dcecf64d127444e81
|
||||
Subproject commit 1be805e7cb2494aa8170015493474379b0362dfc
|
@ -54,9 +54,8 @@ set(SRCS
|
||||
add_library(cxx ${SRCS})
|
||||
set_target_properties(cxx PROPERTIES FOLDER "contrib/libcxx-cmake")
|
||||
|
||||
target_include_directories(cxx SYSTEM BEFORE PUBLIC
|
||||
$<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>
|
||||
$<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}>/src)
|
||||
target_include_directories(cxx SYSTEM BEFORE PRIVATE $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/src>)
|
||||
target_include_directories(cxx SYSTEM BEFORE PUBLIC $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>)
|
||||
target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)
|
||||
|
||||
# Enable capturing stack traces for all exceptions.
|
||||
|
@ -83,5 +83,8 @@ RUN export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
|
||||
--yes --no-install-recommends \
|
||||
&& apt-get clean
|
||||
|
||||
# for external_symbolizer_path
|
||||
RUN ln -s /usr/bin/llvm-symbolizer-15 /usr/bin/llvm-symbolizer
|
||||
|
||||
COPY build.sh /
|
||||
CMD ["bash", "-c", "/build.sh 2>&1"]
|
||||
|
@ -37,7 +37,7 @@ sudo xcode-select --install
|
||||
|
||||
``` bash
|
||||
brew update
|
||||
brew install cmake ninja libtool gettext llvm gcc binutils grep findutils
|
||||
brew install ccache cmake ninja libtool gettext llvm gcc binutils grep findutils
|
||||
```
|
||||
|
||||
## Checkout ClickHouse Sources {#checkout-clickhouse-sources}
|
||||
|
@ -15,7 +15,7 @@ Usage examples:
|
||||
## Usage in ClickHouse Server {#usage-in-clickhouse-server}
|
||||
|
||||
``` sql
|
||||
ENGINE = GenerateRandom(random_seed, max_string_length, max_array_length)
|
||||
ENGINE = GenerateRandom([random_seed] [,max_string_length] [,max_array_length])
|
||||
```
|
||||
|
||||
The `max_array_length` and `max_string_length` parameters specify maximum length of all
|
||||
|
@ -6,18 +6,28 @@ title: "UK Property Price Paid"
|
||||
---
|
||||
|
||||
The dataset contains data about prices paid for real-estate property in England and Wales. The data is available since year 1995.
|
||||
The size of the dataset in uncompressed form is about 4 GiB and it will take about 278 MiB in ClickHouse.
|
||||
|
||||
Source: https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads <br/>
|
||||
Source: https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads
|
||||
Description of the fields: https://www.gov.uk/guidance/about-the-price-paid-data
|
||||
|
||||
Contains HM Land Registry data © Crown copyright and database right 2021. This data is licensed under the Open Government Licence v3.0.
|
||||
|
||||
## Download the Dataset {#download-dataset}
|
||||
|
||||
Run the command:
|
||||
|
||||
```bash
|
||||
wget http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv
|
||||
```
|
||||
|
||||
Download will take about 2 minutes with good internet connection.
|
||||
|
||||
## Create the Table {#create-table}
|
||||
|
||||
```sql
|
||||
CREATE TABLE uk_price_paid
|
||||
(
|
||||
uuid UUID,
|
||||
price UInt32,
|
||||
date Date,
|
||||
postcode1 LowCardinality(String),
|
||||
@ -32,67 +42,66 @@ CREATE TABLE uk_price_paid
|
||||
town LowCardinality(String),
|
||||
district LowCardinality(String),
|
||||
county LowCardinality(String),
|
||||
category UInt8,
|
||||
category2 UInt8
|
||||
) ORDER BY (postcode1, postcode2, addr1, addr2);
|
||||
category UInt8
|
||||
) ENGINE = MergeTree ORDER BY (postcode1, postcode2, addr1, addr2);
|
||||
```
|
||||
|
||||
## Preprocess and Import Data {#preprocess-import-data}
|
||||
|
||||
In this example, we define the structure of source data from the CSV file and specify a query to preprocess the data with either `clickhouse-client` or the web based Play UI.
|
||||
We will use `clickhouse-local` tool for data preprocessing and `clickhouse-client` to upload it.
|
||||
|
||||
In this example, we define the structure of source data from the CSV file and specify a query to preprocess the data with `clickhouse-local`.
|
||||
|
||||
The preprocessing is:
|
||||
- splitting the postcode to two different columns `postcode1` and `postcode2` that are better for storage and queries;
|
||||
- splitting the postcode to two different columns `postcode1` and `postcode2` that is better for storage and queries;
|
||||
- coverting the `time` field to date as it only contains 00:00 time;
|
||||
- ignoring the [UUid](../../sql-reference/data-types/uuid.md) field because we don't need it for analysis;
|
||||
- transforming `type` and `duration` to more readable Enum fields with function [transform](../../sql-reference/functions/other-functions.md#transform);
|
||||
- transforming `is_new` and `category` fields from single-character string (`Y`/`N` and `A`/`B`) to [UInt8](../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-uint256-int8-int16-int32-int64-int128-int256) field with 0 and 1.
|
||||
|
||||
Preprocessed data is piped directly to `clickhouse-client` to be inserted into ClickHouse table in streaming fashion.
|
||||
|
||||
```bash
|
||||
INSERT INTO uk_price_paid
|
||||
WITH
|
||||
splitByChar(' ', postcode) AS p
|
||||
SELECT
|
||||
replaceRegexpAll(uuid_string, '{|}','') AS uuid,
|
||||
toUInt32(price_string) AS price,
|
||||
parseDateTimeBestEffortUS(time) AS date,
|
||||
p[1] AS postcode1,
|
||||
p[2] AS postcode2,
|
||||
transform(a, ['T', 'S', 'D', 'F', 'O'], ['terraced', 'semi-detached', 'detached', 'flat', 'other']) AS type,
|
||||
b = 'Y' AS is_new,
|
||||
transform(c, ['F', 'L', 'U'], ['freehold', 'leasehold', 'unknown']) AS duration,
|
||||
addr1,
|
||||
addr2,
|
||||
street,
|
||||
locality,
|
||||
town,
|
||||
district,
|
||||
county,
|
||||
d = 'B' AS category,
|
||||
e = 'B' AS category2
|
||||
FROM url(
|
||||
'http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv',
|
||||
'CSV',
|
||||
'uuid_string String,
|
||||
price_string String,
|
||||
time String,
|
||||
postcode String,
|
||||
a String,
|
||||
b String,
|
||||
c String,
|
||||
addr1 String,
|
||||
addr2 String,
|
||||
street String,
|
||||
locality String,
|
||||
town String,
|
||||
district String,
|
||||
county String,
|
||||
d String,
|
||||
e String'
|
||||
)
|
||||
SETTINGS max_http_get_redirects=1;
|
||||
clickhouse-local --input-format CSV --structure '
|
||||
uuid String,
|
||||
price UInt32,
|
||||
time DateTime,
|
||||
postcode String,
|
||||
a String,
|
||||
b String,
|
||||
c String,
|
||||
addr1 String,
|
||||
addr2 String,
|
||||
street String,
|
||||
locality String,
|
||||
town String,
|
||||
district String,
|
||||
county String,
|
||||
d String,
|
||||
e String
|
||||
' --query "
|
||||
WITH splitByChar(' ', postcode) AS p
|
||||
SELECT
|
||||
price,
|
||||
toDate(time) AS date,
|
||||
p[1] AS postcode1,
|
||||
p[2] AS postcode2,
|
||||
transform(a, ['T', 'S', 'D', 'F', 'O'], ['terraced', 'semi-detached', 'detached', 'flat', 'other']) AS type,
|
||||
b = 'Y' AS is_new,
|
||||
transform(c, ['F', 'L', 'U'], ['freehold', 'leasehold', 'unknown']) AS duration,
|
||||
addr1,
|
||||
addr2,
|
||||
street,
|
||||
locality,
|
||||
town,
|
||||
district,
|
||||
county,
|
||||
d = 'B' AS category
|
||||
FROM table" --date_time_input_format best_effort < pp-complete.csv | clickhouse-client --query "INSERT INTO uk_price_paid FORMAT TSV"
|
||||
```
|
||||
|
||||
It will take about 40 seconds.
|
||||
|
||||
## Validate the Data {#validate-data}
|
||||
|
||||
Query:
|
||||
@ -103,12 +112,28 @@ SELECT count() FROM uk_price_paid;
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
```text
|
||||
┌──count()─┐
|
||||
│ 27450499 │
|
||||
│ 26321785 │
|
||||
└──────────┘
|
||||
```
|
||||
|
||||
The size of dataset in ClickHouse is just 278 MiB, check it.
|
||||
|
||||
Query:
|
||||
|
||||
```sql
|
||||
SELECT formatReadableSize(total_bytes) FROM system.tables WHERE name = 'uk_price_paid';
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```text
|
||||
┌─formatReadableSize(total_bytes)─┐
|
||||
│ 278.80 MiB │
|
||||
└─────────────────────────────────┘
|
||||
```
|
||||
|
||||
## Run Some Queries {#run-queries}
|
||||
|
||||
### Query 1. Average Price Per Year {#average-price}
|
||||
@ -121,7 +146,7 @@ SELECT toYear(date) AS year, round(avg(price)) AS price, bar(price, 0, 1000000,
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
```text
|
||||
┌─year─┬──price─┬─bar(round(avg(price)), 0, 1000000, 80)─┐
|
||||
│ 1995 │ 67932 │ █████▍ │
|
||||
│ 1996 │ 71505 │ █████▋ │
|
||||
|
@ -175,6 +175,10 @@ You can also choose to use [HTTP compression](https://en.wikipedia.org/wiki/HTTP
|
||||
- `br`
|
||||
- `deflate`
|
||||
- `xz`
|
||||
- `zstd`
|
||||
- `lz4`
|
||||
- `bz2`
|
||||
- `snappy`
|
||||
|
||||
To send a compressed `POST` request, append the request header `Content-Encoding: compression_method`.
|
||||
In order for ClickHouse to compress the response, enable compression with [enable_http_compression](../operations/settings/settings.md#settings-enable_http_compression) setting and append `Accept-Encoding: compression_method` header to the request. You can configure the data compression level in the [http_zlib_compression_level](../operations/settings/settings.md#settings-http_zlib_compression_level) setting for all compression methods.
|
||||
|
@ -94,6 +94,21 @@ It is also possible for `Flat`, `Hashed`, `ComplexKeyHashed` dictionaries to onl
|
||||
- If the source is HTTP then `update_field` will be added as a query parameter with the last update time as the parameter value.
|
||||
- If the source is Executable then `update_field` will be added as an executable script argument with the last update time as the argument value.
|
||||
- If the source is ClickHouse, MySQL, PostgreSQL, ODBC there will be an additional part of `WHERE`, where `update_field` is compared as greater or equal with the last update time.
|
||||
- Per default, this `WHERE`-condition is checked at the highest level of the SQL-Query. Alternatively, the condition can be checked in any other `WHERE`-clause within the query using the `{condition}`-keyword. Example:
|
||||
```sql
|
||||
...
|
||||
SOURCE(CLICKHOUSE(...
|
||||
update_field 'added_time'
|
||||
QUERY '
|
||||
SELECT my_arr.1 AS x, my_arr.2 AS y, creation_time
|
||||
FROM (
|
||||
SELECT arrayZip(x_arr, y_arr) AS my_arr, creation_time
|
||||
FROM dictionary_source
|
||||
WHERE {condition}
|
||||
)'
|
||||
))
|
||||
...
|
||||
```
|
||||
|
||||
If `update_field` option is set, additional option `update_lag` can be set. Value of `update_lag` option is subtracted from previous update time before request updated data.
|
||||
|
||||
|
@ -267,7 +267,7 @@ Result:
|
||||
└────────────────┘
|
||||
```
|
||||
|
||||
:::Attention
|
||||
:::note
|
||||
The return type of `toStartOf*`, `toLastDayOfMonth`, `toMonday` functions described below is `Date` or `DateTime`.
|
||||
Though these functions can take values of the extended types `Date32` and `DateTime64` as an argument, passing them a time outside the normal range (year 1970 to 2149 for `Date` / 2106 for `DateTime`) will produce wrong results.
|
||||
In case argument is out of normal range:
|
||||
|
@ -430,5 +430,119 @@ Result:
|
||||
└────────────────────────────┘
|
||||
```
|
||||
|
||||
## mapApply
|
||||
|
||||
**Syntax**
|
||||
|
||||
```sql
|
||||
mapApply(func, map)
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
- `func` - [Lamda function](../../sql-reference/functions/index.md#higher-order-functions---operator-and-lambdaparams-expr-function).
|
||||
- `map` — [Map](../../sql-reference/data-types/map.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- Returns a map obtained from the original map by application of `func(map1[i], …, mapN[i])` for each element.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
```sql
|
||||
SELECT mapApply((k, v) -> (k, v * 10), _map) AS r
|
||||
FROM
|
||||
(
|
||||
SELECT map('key1', number, 'key2', number * 2) AS _map
|
||||
FROM numbers(3)
|
||||
)
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```text
|
||||
┌─r─────────────────────┐
|
||||
│ {'key1':0,'key2':0} │
|
||||
│ {'key1':10,'key2':20} │
|
||||
│ {'key1':20,'key2':40} │
|
||||
└───────────────────────┘
|
||||
```
|
||||
|
||||
## mapFilter
|
||||
|
||||
**Syntax**
|
||||
|
||||
```sql
|
||||
mapFilter(func, map)
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
- `func` - [Lamda function](../../sql-reference/functions/index.md#higher-order-functions---operator-and-lambdaparams-expr-function).
|
||||
- `map` — [Map](../../sql-reference/data-types/map.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- Returns a map containing only the elements in `map` for which `func(map1[i], …, mapN[i])` returns something other than 0.
|
||||
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
```sql
|
||||
SELECT mapFilter((k, v) -> ((v % 2) = 0), _map) AS r
|
||||
FROM
|
||||
(
|
||||
SELECT map('key1', number, 'key2', number * 2) AS _map
|
||||
FROM numbers(3)
|
||||
)
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```text
|
||||
┌─r───────────────────┐
|
||||
│ {'key1':0,'key2':0} │
|
||||
│ {'key2':2} │
|
||||
│ {'key1':2,'key2':4} │
|
||||
└─────────────────────┘
|
||||
```
|
||||
|
||||
|
||||
## mapUpdate
|
||||
|
||||
**Syntax**
|
||||
|
||||
```sql
|
||||
mapUpdate(map1, map2)
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
- `map1` [Map](../../sql-reference/data-types/map.md).
|
||||
- `map2` [Map](../../sql-reference/data-types/map.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- Returns a map1 with values updated of values for the corresponding keys in map2.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
```sql
|
||||
SELECT mapUpdate(map('key1', 0, 'key3', 0), map('key1', 10, 'key2', 10)) AS map;
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```text
|
||||
┌─map────────────────────────────┐
|
||||
│ {'key3':0,'key1':10,'key2':10} │
|
||||
└────────────────────────────────┘
|
||||
```
|
||||
|
||||
[Original article](https://clickhouse.com/docs/en/sql-reference/functions/tuple-map-functions/) <!--hide-->
|
||||
|
@ -303,7 +303,7 @@ SHOW USERS
|
||||
|
||||
## SHOW ROLES
|
||||
|
||||
Returns a list of [roles](../../operations/access-rights.md#role-management). To view another parameters, see system tables [system.roles](../../operations/system-tables/roles.md#system_tables-roles) and [system.role-grants](../../operations/system-tables/role-grants.md#system_tables-role_grants).
|
||||
Returns a list of [roles](../../operations/access-rights.md#role-management). To view another parameters, see system tables [system.roles](../../operations/system-tables/roles.md#system_tables-roles) and [system.role_grants](../../operations/system-tables/role-grants.md#system_tables-role_grants).
|
||||
|
||||
### Syntax
|
||||
|
||||
|
@ -267,7 +267,7 @@ SELECT toUnixTimestamp('2017-11-05 08:07:47', 'Asia/Tokyo') AS unix_timestamp;
|
||||
└────────────────┘
|
||||
```
|
||||
|
||||
:::Attention
|
||||
:::note
|
||||
Тип возвращаемого описанными далее функциями `toStartOf*`, `toMonday` значения - `Date` или `DateTime`.
|
||||
Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат.
|
||||
Возвращаемые значения для значений вне нормального диапазона:
|
||||
@ -277,7 +277,7 @@ SELECT toUnixTimestamp('2017-11-05 08:07:47', 'Asia/Tokyo') AS unix_timestamp;
|
||||
* `2149-05-31` будет результатом функции `toLastDayOfMonth` при обработке аргумента больше `2149-05-31`.
|
||||
:::
|
||||
|
||||
:::Attention
|
||||
:::note
|
||||
Тип возвращаемого описанными далее функциями `toStartOf*`, `toLastDayOfMonth`, `toMonday` значения - `Date` или `DateTime`.
|
||||
Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат.
|
||||
Возвращаемые значения для значений вне нормального диапазона:
|
||||
|
@ -305,7 +305,7 @@ SHOW USERS
|
||||
|
||||
## SHOW ROLES {#show-roles-statement}
|
||||
|
||||
Выводит список [ролей](../../operations/access-rights.md#role-management). Для просмотра параметров ролей, см. системные таблицы [system.roles](../../operations/system-tables/roles.md#system_tables-roles) и [system.role-grants](../../operations/system-tables/role-grants.md#system_tables-role_grants).
|
||||
Выводит список [ролей](../../operations/access-rights.md#role-management). Для просмотра параметров ролей, см. системные таблицы [system.roles](../../operations/system-tables/roles.md#system_tables-roles) и [system.role_grants](../../operations/system-tables/role-grants.md#system_tables-role_grants).
|
||||
|
||||
### Синтаксис {#show-roles-syntax}
|
||||
|
||||
|
67
programs/disks/CommandMkDir.cpp
Normal file
67
programs/disks/CommandMkDir.cpp
Normal file
@ -0,0 +1,67 @@
|
||||
#pragma once
|
||||
|
||||
#include "ICommand.h"
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
class CommandMkDir : public ICommand
|
||||
{
|
||||
public:
|
||||
CommandMkDir()
|
||||
{
|
||||
command_name = "mkdir";
|
||||
command_option_description.emplace(createOptionsDescription("Allowed options", getTerminalWidth()));
|
||||
description = "Create directory or directories recursively";
|
||||
usage = "mkdir [OPTION]... <PATH>";
|
||||
command_option_description->add_options()
|
||||
("recursive", "recursively create directories")
|
||||
;
|
||||
}
|
||||
|
||||
void processOptions(
|
||||
Poco::Util::LayeredConfiguration & config,
|
||||
po::variables_map & options) const override
|
||||
{
|
||||
if (options.count("recursive"))
|
||||
config.setBool("recursive", true);
|
||||
}
|
||||
|
||||
void execute(
|
||||
const std::vector<String> & command_arguments,
|
||||
DB::ContextMutablePtr & global_context,
|
||||
Poco::Util::LayeredConfiguration & config) override
|
||||
{
|
||||
if (command_arguments.size() != 1)
|
||||
{
|
||||
printHelpMessage();
|
||||
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
String disk_name = config.getString("disk", "default");
|
||||
|
||||
String path = command_arguments[0];
|
||||
|
||||
DiskPtr disk = global_context->getDisk(disk_name);
|
||||
|
||||
String full_path = fullPathWithValidate(disk, path);
|
||||
bool recursive = config.getBool("recursive", false);
|
||||
|
||||
if (recursive)
|
||||
disk->createDirectories(full_path);
|
||||
else
|
||||
disk->createDirectory(full_path);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
std::unique_ptr <DB::ICommand> makeCommandMkDir()
|
||||
{
|
||||
return std::make_unique<DB::CommandMkDir>();
|
||||
}
|
@ -63,7 +63,7 @@ void DisksApp::addOptions(
|
||||
|
||||
positional_options_description.add("command_name", 1);
|
||||
|
||||
supported_commands = {"list-disks", "list", "move", "remove", "link", "copy", "write", "read"};
|
||||
supported_commands = {"list-disks", "list", "move", "remove", "link", "copy", "write", "read", "mkdir"};
|
||||
|
||||
command_descriptions.emplace("list-disks", makeCommandListDisks());
|
||||
command_descriptions.emplace("list", makeCommandList());
|
||||
@ -73,6 +73,7 @@ void DisksApp::addOptions(
|
||||
command_descriptions.emplace("copy", makeCommandCopy());
|
||||
command_descriptions.emplace("write", makeCommandWrite());
|
||||
command_descriptions.emplace("read", makeCommandRead());
|
||||
command_descriptions.emplace("mkdir", makeCommandMkDir());
|
||||
}
|
||||
|
||||
void DisksApp::processOptions()
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include "CommandLink.cpp"
|
||||
#include "CommandList.cpp"
|
||||
#include "CommandListDisks.cpp"
|
||||
#include "CommandMkDir.cpp"
|
||||
#include "CommandMove.cpp"
|
||||
#include "CommandRead.cpp"
|
||||
#include "CommandRemove.cpp"
|
||||
|
@ -65,3 +65,4 @@ std::unique_ptr <DB::ICommand> makeCommandMove();
|
||||
std::unique_ptr <DB::ICommand> makeCommandRead();
|
||||
std::unique_ptr <DB::ICommand> makeCommandRemove();
|
||||
std::unique_ptr <DB::ICommand> makeCommandWrite();
|
||||
std::unique_ptr <DB::ICommand> makeCommandMkDir();
|
||||
|
@ -52,15 +52,10 @@ void CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
|
||||
if (current_thread)
|
||||
{
|
||||
Int64 will_be = current_thread->untracked_memory + size;
|
||||
Int64 limit = current_thread->untracked_memory_limit + current_thread->untracked_memory_limit_increase;
|
||||
|
||||
if (will_be > limit)
|
||||
if (will_be > current_thread->untracked_memory_limit)
|
||||
{
|
||||
/// Increase limit before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
|
||||
/// more. It could be useful to enlarge Exception message in rethrow logic.
|
||||
current_thread->untracked_memory_limit_increase = current_thread->untracked_memory_limit;
|
||||
memory_tracker->allocImpl(will_be, throw_if_memory_exceeded);
|
||||
current_thread->untracked_memory_limit_increase = 0;
|
||||
current_thread->untracked_memory = 0;
|
||||
}
|
||||
else
|
||||
|
@ -133,8 +133,6 @@ public:
|
||||
Int64 untracked_memory = 0;
|
||||
/// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.
|
||||
Int64 untracked_memory_limit = 4 * 1024 * 1024;
|
||||
/// Increase limit in case of exception.
|
||||
Int64 untracked_memory_limit_increase = 0;
|
||||
|
||||
/// Statistics of read and write rows/bytes
|
||||
Progress progress_in;
|
||||
|
@ -431,7 +431,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks()
|
||||
{
|
||||
if (schedule)
|
||||
{
|
||||
std::lock_guard lock(bg_tasks_mutex);
|
||||
std::unique_lock lock(bg_tasks_mutex);
|
||||
{
|
||||
while (!upload_object_tasks.empty() && upload_object_tasks.front().is_finised)
|
||||
{
|
||||
@ -442,7 +442,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks()
|
||||
|
||||
if (exception)
|
||||
{
|
||||
waitForAllBackGroundTasks();
|
||||
waitForAllBackGroundTasksUnlocked(lock);
|
||||
std::rethrow_exception(exception);
|
||||
}
|
||||
|
||||
@ -457,7 +457,15 @@ void WriteBufferFromS3::waitForAllBackGroundTasks()
|
||||
if (schedule)
|
||||
{
|
||||
std::unique_lock lock(bg_tasks_mutex);
|
||||
bg_tasks_condvar.wait(lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; });
|
||||
waitForAllBackGroundTasksUnlocked(lock);
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::waitForAllBackGroundTasksUnlocked(std::unique_lock<std::mutex> & bg_tasks_lock)
|
||||
{
|
||||
if (schedule)
|
||||
{
|
||||
bg_tasks_condvar.wait(bg_tasks_lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; });
|
||||
|
||||
while (!upload_object_tasks.empty())
|
||||
{
|
||||
@ -472,7 +480,7 @@ void WriteBufferFromS3::waitForAllBackGroundTasks()
|
||||
|
||||
if (put_object_task)
|
||||
{
|
||||
bg_tasks_condvar.wait(lock, [this]() { return put_object_task->is_finised; });
|
||||
bg_tasks_condvar.wait(bg_tasks_lock, [this]() { return put_object_task->is_finised; });
|
||||
if (put_object_task->exception)
|
||||
std::rethrow_exception(put_object_task->exception);
|
||||
}
|
||||
|
@ -84,6 +84,7 @@ private:
|
||||
|
||||
void waitForReadyBackGroundTasks();
|
||||
void waitForAllBackGroundTasks();
|
||||
void waitForAllBackGroundTasksUnlocked(std::unique_lock<std::mutex> & bg_tasks_lock);
|
||||
|
||||
String bucket;
|
||||
String key;
|
||||
|
113
src/Interpreters/AggregationUtils.cpp
Normal file
113
src/Interpreters/AggregationUtils.cpp
Normal file
@ -0,0 +1,113 @@
|
||||
#include <Interpreters/AggregationUtils.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
OutputBlockColumns prepareOutputBlockColumns(
|
||||
const Aggregator::Params & params,
|
||||
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions,
|
||||
const Block & res_header,
|
||||
Arenas & aggregates_pools,
|
||||
bool final,
|
||||
size_t rows)
|
||||
{
|
||||
MutableColumns key_columns(params.keys_size);
|
||||
MutableColumns aggregate_columns(params.aggregates_size);
|
||||
MutableColumns final_aggregate_columns(params.aggregates_size);
|
||||
Aggregator::AggregateColumnsData aggregate_columns_data(params.aggregates_size);
|
||||
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
{
|
||||
key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
|
||||
key_columns[i]->reserve(rows);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
{
|
||||
if (!final)
|
||||
{
|
||||
const auto & aggregate_column_name = params.aggregates[i].column_name;
|
||||
aggregate_columns[i] = res_header.getByName(aggregate_column_name).type->createColumn();
|
||||
|
||||
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
|
||||
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
|
||||
|
||||
for (auto & pool : aggregates_pools)
|
||||
column_aggregate_func.addArena(pool);
|
||||
|
||||
aggregate_columns_data[i] = &column_aggregate_func.getData();
|
||||
aggregate_columns_data[i]->reserve(rows);
|
||||
}
|
||||
else
|
||||
{
|
||||
final_aggregate_columns[i] = aggregate_functions[i]->getReturnType()->createColumn();
|
||||
final_aggregate_columns[i]->reserve(rows);
|
||||
|
||||
if (aggregate_functions[i]->isState())
|
||||
{
|
||||
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
|
||||
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(final_aggregate_columns[i].get()))
|
||||
for (auto & pool : aggregates_pools)
|
||||
column_aggregate_func->addArena(pool);
|
||||
|
||||
/// Aggregate state can be wrapped into array if aggregate function ends with -Resample combinator.
|
||||
final_aggregate_columns[i]->forEachSubcolumn(
|
||||
[&aggregates_pools](auto & subcolumn)
|
||||
{
|
||||
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(subcolumn.get()))
|
||||
for (auto & pool : aggregates_pools)
|
||||
column_aggregate_func->addArena(pool);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (key_columns.size() != params.keys_size)
|
||||
throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};
|
||||
|
||||
std::vector<IColumn *> raw_key_columns;
|
||||
raw_key_columns.reserve(key_columns.size());
|
||||
for (auto & column : key_columns)
|
||||
raw_key_columns.push_back(column.get());
|
||||
|
||||
return {
|
||||
.key_columns = std::move(key_columns),
|
||||
.raw_key_columns = std::move(raw_key_columns),
|
||||
.aggregate_columns = std::move(aggregate_columns),
|
||||
.final_aggregate_columns = std::move(final_aggregate_columns),
|
||||
.aggregate_columns_data = std::move(aggregate_columns_data),
|
||||
};
|
||||
}
|
||||
|
||||
Block finalizeBlock(const Aggregator::Params & params, const Block & res_header, OutputBlockColumns && out_cols, bool final, size_t rows)
|
||||
{
|
||||
auto && [key_columns, raw_key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;
|
||||
|
||||
Block res = res_header.cloneEmpty();
|
||||
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
res.getByPosition(i).column = std::move(key_columns[i]);
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
{
|
||||
const auto & aggregate_column_name = params.aggregates[i].column_name;
|
||||
if (final)
|
||||
res.getByName(aggregate_column_name).column = std::move(final_aggregate_columns[i]);
|
||||
else
|
||||
res.getByName(aggregate_column_name).column = std::move(aggregate_columns[i]);
|
||||
}
|
||||
|
||||
/// Change the size of the columns-constants in the block.
|
||||
size_t columns = res_header.columns();
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
if (isColumnConst(*res.getByPosition(i).column))
|
||||
res.getByPosition(i).column = res.getByPosition(i).column->cut(0, rows);
|
||||
|
||||
return res;
|
||||
}
|
||||
}
|
27
src/Interpreters/AggregationUtils.h
Normal file
27
src/Interpreters/AggregationUtils.h
Normal file
@ -0,0 +1,27 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Aggregator.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct OutputBlockColumns
|
||||
{
|
||||
MutableColumns key_columns;
|
||||
std::vector<IColumn *> raw_key_columns;
|
||||
MutableColumns aggregate_columns;
|
||||
MutableColumns final_aggregate_columns;
|
||||
Aggregator::AggregateColumnsData aggregate_columns_data;
|
||||
};
|
||||
|
||||
|
||||
OutputBlockColumns prepareOutputBlockColumns(
|
||||
const Aggregator::Params & params,
|
||||
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions,
|
||||
const Block & res_header,
|
||||
Arenas & aggregates_pools,
|
||||
bool final,
|
||||
size_t rows);
|
||||
|
||||
Block finalizeBlock(const Aggregator::Params & params, const Block & res_header, OutputBlockColumns && out_cols, bool final, size_t rows);
|
||||
}
|
@ -34,6 +34,8 @@
|
||||
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
|
||||
#include <Interpreters/AggregationUtils.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event ExternalAggregationWritePart;
|
||||
@ -1587,16 +1589,10 @@ Block Aggregator::convertOneBucketToBlock(
|
||||
bool final,
|
||||
size_t bucket) const
|
||||
{
|
||||
Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(),
|
||||
[bucket, &method, arena, this] (
|
||||
MutableColumns & key_columns,
|
||||
AggregateColumnsData & aggregate_columns,
|
||||
MutableColumns & final_aggregate_columns,
|
||||
bool final_)
|
||||
{
|
||||
convertToBlockImpl(method, method.data.impls[bucket],
|
||||
key_columns, aggregate_columns, final_aggregate_columns, arena, final_);
|
||||
});
|
||||
// Used in ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform (expects single chunk for each bucket_id).
|
||||
constexpr bool return_single_block = true;
|
||||
Block block = convertToBlockImpl<return_single_block>(
|
||||
method, method.data.impls[bucket], arena, data_variants.aggregates_pools, final, method.data.impls[bucket].size());
|
||||
|
||||
block.info.bucket_num = bucket;
|
||||
return block;
|
||||
@ -1702,26 +1698,17 @@ bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const
|
||||
}
|
||||
|
||||
|
||||
template <typename Method, typename Table>
|
||||
void Aggregator::convertToBlockImpl(
|
||||
Method & method,
|
||||
Table & data,
|
||||
MutableColumns & key_columns,
|
||||
AggregateColumnsData & aggregate_columns,
|
||||
MutableColumns & final_aggregate_columns,
|
||||
Arena * arena,
|
||||
bool final) const
|
||||
template <bool return_single_block, typename Method, typename Table>
|
||||
Aggregator::ConvertToBlockRes<return_single_block>
|
||||
Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const
|
||||
{
|
||||
if (data.empty())
|
||||
return;
|
||||
{
|
||||
auto && out_cols = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), aggregates_pools, final, rows);
|
||||
return {finalizeBlock(params, getHeader(final), std::move(out_cols), final, rows)};
|
||||
}
|
||||
|
||||
if (key_columns.size() != params.keys_size)
|
||||
throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};
|
||||
|
||||
std::vector<IColumn *> raw_key_columns;
|
||||
raw_key_columns.reserve(key_columns.size());
|
||||
for (auto & column : key_columns)
|
||||
raw_key_columns.push_back(column.get());
|
||||
ConvertToBlockRes<return_single_block> res;
|
||||
|
||||
if (final)
|
||||
{
|
||||
@ -1729,20 +1716,23 @@ void Aggregator::convertToBlockImpl(
|
||||
if (compiled_aggregate_functions_holder)
|
||||
{
|
||||
static constexpr bool use_compiled_functions = !Method::low_cardinality_optimization;
|
||||
convertToBlockImplFinal<Method, use_compiled_functions>(method, data, std::move(raw_key_columns), final_aggregate_columns, arena);
|
||||
res = convertToBlockImplFinal<Method, use_compiled_functions, return_single_block>(method, data, arena, aggregates_pools, rows);
|
||||
}
|
||||
else
|
||||
#endif
|
||||
{
|
||||
convertToBlockImplFinal<Method, false>(method, data, std::move(raw_key_columns), final_aggregate_columns, arena);
|
||||
res = convertToBlockImplFinal<Method, false, return_single_block>(method, data, arena, aggregates_pools, rows);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
convertToBlockImplNotFinal(method, data, std::move(raw_key_columns), aggregate_columns);
|
||||
res = convertToBlockImplNotFinal<return_single_block>(method, data, aggregates_pools, rows);
|
||||
}
|
||||
|
||||
/// In order to release memory early.
|
||||
data.clearAndShrink();
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
@ -1811,38 +1801,9 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu
|
||||
}
|
||||
|
||||
|
||||
template <typename Method, bool use_compiled_functions, typename Table>
|
||||
void NO_INLINE Aggregator::convertToBlockImplFinal(
|
||||
Method & method,
|
||||
Table & data,
|
||||
std::vector<IColumn *> key_columns,
|
||||
MutableColumns & final_aggregate_columns,
|
||||
Arena * arena) const
|
||||
template <bool use_compiled_functions>
|
||||
Block Aggregator::insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & places, OutputBlockColumns && out_cols, Arena * arena) const
|
||||
{
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
{
|
||||
if (data.hasNullKeyData())
|
||||
{
|
||||
key_columns[0]->insertDefault();
|
||||
insertAggregatesIntoColumns(data.getNullKeyData(), final_aggregate_columns, arena);
|
||||
}
|
||||
}
|
||||
|
||||
auto shuffled_key_sizes = method.shuffleKeyColumns(key_columns, key_sizes);
|
||||
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
|
||||
|
||||
PaddedPODArray<AggregateDataPtr> places;
|
||||
places.reserve(data.size());
|
||||
|
||||
data.forEachValue([&](const auto & key, auto & mapped)
|
||||
{
|
||||
method.insertKeyIntoColumns(key, key_columns, key_sizes_ref);
|
||||
places.emplace_back(mapped);
|
||||
|
||||
/// Mark the cell as destroyed so it will not be destroyed in destructor.
|
||||
mapped = nullptr;
|
||||
});
|
||||
|
||||
std::exception_ptr exception;
|
||||
size_t aggregate_functions_destroy_index = 0;
|
||||
|
||||
@ -1863,7 +1824,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
|
||||
if (!is_aggregate_function_compiled[i])
|
||||
continue;
|
||||
|
||||
auto & final_aggregate_column = final_aggregate_columns[i];
|
||||
auto & final_aggregate_column = out_cols.final_aggregate_columns[i];
|
||||
final_aggregate_column = final_aggregate_column->cloneResized(places.size());
|
||||
columns_data.emplace_back(getColumnData(final_aggregate_column.get()));
|
||||
}
|
||||
@ -1884,7 +1845,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
|
||||
}
|
||||
}
|
||||
|
||||
auto & final_aggregate_column = final_aggregate_columns[aggregate_functions_destroy_index];
|
||||
auto & final_aggregate_column = out_cols.final_aggregate_columns[aggregate_functions_destroy_index];
|
||||
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
|
||||
|
||||
/** We increase aggregate_functions_destroy_index because by function contract if insertResultIntoBatch
|
||||
@ -1898,7 +1859,8 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
|
||||
bool is_state = aggregate_functions[destroy_index]->isState();
|
||||
bool destroy_place_after_insert = !is_state;
|
||||
|
||||
aggregate_functions[destroy_index]->insertResultIntoBatch(0, places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
|
||||
aggregate_functions[destroy_index]->insertResultIntoBatch(
|
||||
0, places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
@ -1923,125 +1885,155 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
|
||||
|
||||
if (exception)
|
||||
std::rethrow_exception(exception);
|
||||
|
||||
return finalizeBlock(params, getHeader(/* final */ true), std::move(out_cols), /* final */ true, places.size());
|
||||
}
|
||||
|
||||
template <typename Method, typename Table>
|
||||
void NO_INLINE Aggregator::convertToBlockImplNotFinal(
|
||||
Method & method,
|
||||
Table & data,
|
||||
std::vector<IColumn *> key_columns,
|
||||
AggregateColumnsData & aggregate_columns) const
|
||||
template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
|
||||
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
|
||||
Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t) const
|
||||
{
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
const size_t max_block_size = params.max_block_size;
|
||||
const bool final = true;
|
||||
ConvertToBlockRes<return_single_block> res;
|
||||
|
||||
std::optional<OutputBlockColumns> out_cols;
|
||||
std::optional<Sizes> shuffled_key_sizes;
|
||||
PaddedPODArray<AggregateDataPtr> places;
|
||||
|
||||
auto init_out_cols = [&]()
|
||||
{
|
||||
if (data.hasNullKeyData())
|
||||
out_cols = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), aggregates_pools, final, max_block_size);
|
||||
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
{
|
||||
key_columns[0]->insertDefault();
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_columns[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]);
|
||||
|
||||
data.getNullKeyData() = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
auto shuffled_key_sizes = method.shuffleKeyColumns(key_columns, key_sizes);
|
||||
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
|
||||
|
||||
data.forEachValue([&](const auto & key, auto & mapped)
|
||||
{
|
||||
method.insertKeyIntoColumns(key, key_columns, key_sizes_ref);
|
||||
|
||||
/// reserved, so push_back does not throw exceptions
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_columns[i]->push_back(mapped + offsets_of_aggregate_states[i]);
|
||||
|
||||
mapped = nullptr;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
template <typename Filler>
|
||||
Block Aggregator::prepareBlockAndFill(
|
||||
AggregatedDataVariants & data_variants,
|
||||
bool final,
|
||||
size_t rows,
|
||||
Filler && filler) const
|
||||
{
|
||||
MutableColumns key_columns(params.keys_size);
|
||||
MutableColumns aggregate_columns(params.aggregates_size);
|
||||
MutableColumns final_aggregate_columns(params.aggregates_size);
|
||||
AggregateColumnsData aggregate_columns_data(params.aggregates_size);
|
||||
|
||||
Block res_header = getHeader(final);
|
||||
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
{
|
||||
key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
|
||||
key_columns[i]->reserve(rows);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
{
|
||||
if (!final)
|
||||
{
|
||||
const auto & aggregate_column_name = params.aggregates[i].column_name;
|
||||
aggregate_columns[i] = res_header.getByName(aggregate_column_name).type->createColumn();
|
||||
|
||||
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
|
||||
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
|
||||
|
||||
for (auto & pool : data_variants.aggregates_pools)
|
||||
column_aggregate_func.addArena(pool);
|
||||
|
||||
aggregate_columns_data[i] = &column_aggregate_func.getData();
|
||||
aggregate_columns_data[i]->reserve(rows);
|
||||
}
|
||||
else
|
||||
{
|
||||
final_aggregate_columns[i] = aggregate_functions[i]->getReturnType()->createColumn();
|
||||
final_aggregate_columns[i]->reserve(rows);
|
||||
|
||||
if (aggregate_functions[i]->isState())
|
||||
if (data.hasNullKeyData())
|
||||
{
|
||||
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
|
||||
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(final_aggregate_columns[i].get()))
|
||||
for (auto & pool : data_variants.aggregates_pools)
|
||||
column_aggregate_func->addArena(pool);
|
||||
|
||||
/// Aggregate state can be wrapped into array if aggregate function ends with -Resample combinator.
|
||||
final_aggregate_columns[i]->forEachSubcolumn([&data_variants](auto & subcolumn)
|
||||
{
|
||||
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(subcolumn.get()))
|
||||
for (auto & pool : data_variants.aggregates_pools)
|
||||
column_aggregate_func->addArena(pool);
|
||||
});
|
||||
out_cols->key_columns[0]->insertDefault();
|
||||
insertAggregatesIntoColumns(data.getNullKeyData(), out_cols->final_aggregate_columns, arena);
|
||||
data.hasNullKeyData() = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
filler(key_columns, aggregate_columns_data, final_aggregate_columns, final);
|
||||
shuffled_key_sizes = method.shuffleKeyColumns(out_cols->raw_key_columns, key_sizes);
|
||||
|
||||
Block res = res_header.cloneEmpty();
|
||||
places.reserve(max_block_size);
|
||||
};
|
||||
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
res.getByPosition(i).column = std::move(key_columns[i]);
|
||||
// should be invoked at least once, because null data might be the only content of the `data`
|
||||
init_out_cols();
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
data.forEachValue(
|
||||
[&](const auto & key, auto & mapped)
|
||||
{
|
||||
if (!out_cols.has_value())
|
||||
init_out_cols();
|
||||
|
||||
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
|
||||
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
|
||||
places.emplace_back(mapped);
|
||||
|
||||
/// Mark the cell as destroyed so it will not be destroyed in destructor.
|
||||
mapped = nullptr;
|
||||
|
||||
if constexpr (!return_single_block)
|
||||
{
|
||||
if (places.size() >= max_block_size)
|
||||
{
|
||||
res.emplace_back(insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena));
|
||||
places.clear();
|
||||
out_cols.reset();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if constexpr (return_single_block)
|
||||
{
|
||||
const auto & aggregate_column_name = params.aggregates[i].column_name;
|
||||
if (final)
|
||||
res.getByName(aggregate_column_name).column = std::move(final_aggregate_columns[i]);
|
||||
else
|
||||
res.getByName(aggregate_column_name).column = std::move(aggregate_columns[i]);
|
||||
return insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (out_cols.has_value())
|
||||
res.emplace_back(insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena));
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
/// Change the size of the columns-constants in the block.
|
||||
size_t columns = res_header.columns();
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
if (isColumnConst(*res.getByPosition(i).column))
|
||||
res.getByPosition(i).column = res.getByPosition(i).column->cut(0, rows);
|
||||
template <bool return_single_block, typename Method, typename Table>
|
||||
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
|
||||
Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t) const
|
||||
{
|
||||
const size_t max_block_size = params.max_block_size;
|
||||
const bool final = false;
|
||||
ConvertToBlockRes<return_single_block> res;
|
||||
|
||||
std::optional<OutputBlockColumns> out_cols;
|
||||
std::optional<Sizes> shuffled_key_sizes;
|
||||
|
||||
auto init_out_cols = [&]()
|
||||
{
|
||||
out_cols = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), aggregates_pools, final, max_block_size);
|
||||
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
{
|
||||
if (data.hasNullKeyData())
|
||||
{
|
||||
out_cols->raw_key_columns[0]->insertDefault();
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
out_cols->aggregate_columns_data[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]);
|
||||
|
||||
data.getNullKeyData() = nullptr;
|
||||
data.hasNullKeyData() = false;
|
||||
}
|
||||
}
|
||||
|
||||
shuffled_key_sizes = method.shuffleKeyColumns(out_cols->raw_key_columns, key_sizes);
|
||||
};
|
||||
|
||||
// should be invoked at least once, because null data might be the only content of the `data`
|
||||
init_out_cols();
|
||||
|
||||
size_t rows_in_current_block = 0;
|
||||
|
||||
data.forEachValue(
|
||||
[&](const auto & key, auto & mapped)
|
||||
{
|
||||
if (!out_cols.has_value())
|
||||
init_out_cols();
|
||||
|
||||
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
|
||||
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
|
||||
|
||||
/// reserved, so push_back does not throw exceptions
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
out_cols->aggregate_columns_data[i]->push_back(mapped + offsets_of_aggregate_states[i]);
|
||||
|
||||
mapped = nullptr;
|
||||
|
||||
++rows_in_current_block;
|
||||
|
||||
if constexpr (!return_single_block)
|
||||
{
|
||||
if (rows_in_current_block >= max_block_size)
|
||||
{
|
||||
res.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols.value()), final, rows_in_current_block));
|
||||
out_cols.reset();
|
||||
rows_in_current_block = 0;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if constexpr (return_single_block)
|
||||
{
|
||||
return finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (rows_in_current_block)
|
||||
res.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block));
|
||||
return res;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -2105,39 +2097,35 @@ void Aggregator::createStatesAndFillKeyColumnsWithSingleKey(
|
||||
Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const
|
||||
{
|
||||
size_t rows = 1;
|
||||
auto && out_cols
|
||||
= prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), data_variants.aggregates_pools, final, rows);
|
||||
auto && [key_columns, raw_key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;
|
||||
|
||||
auto filler = [&data_variants, this](
|
||||
MutableColumns & key_columns,
|
||||
AggregateColumnsData & aggregate_columns,
|
||||
MutableColumns & final_aggregate_columns,
|
||||
bool final_)
|
||||
if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
|
||||
{
|
||||
if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
|
||||
AggregatedDataWithoutKey & data = data_variants.without_key;
|
||||
|
||||
if (!data)
|
||||
throw Exception("Wrong data variant passed.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (!final)
|
||||
{
|
||||
AggregatedDataWithoutKey & data = data_variants.without_key;
|
||||
|
||||
if (!data)
|
||||
throw Exception("Wrong data variant passed.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (!final_)
|
||||
{
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]);
|
||||
data = nullptr;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Always single-thread. It's safe to pass current arena from 'aggregates_pool'.
|
||||
insertAggregatesIntoColumns(data, final_aggregate_columns, data_variants.aggregates_pool);
|
||||
}
|
||||
|
||||
if (params.overflow_row)
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
key_columns[i]->insertDefault();
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_columns_data[i]->push_back(data + offsets_of_aggregate_states[i]);
|
||||
data = nullptr;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Always single-thread. It's safe to pass current arena from 'aggregates_pool'.
|
||||
insertAggregatesIntoColumns(data, final_aggregate_columns, data_variants.aggregates_pool);
|
||||
}
|
||||
};
|
||||
|
||||
Block block = prepareBlockAndFill(data_variants, final, rows, filler);
|
||||
if (params.overflow_row)
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
key_columns[i]->insertDefault();
|
||||
}
|
||||
|
||||
Block block = finalizeBlock(params, getHeader(final), std::move(out_cols), final, rows);
|
||||
|
||||
if (is_overflows)
|
||||
block.info.is_overflows = true;
|
||||
@ -2148,29 +2136,22 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
|
||||
return block;
|
||||
}
|
||||
|
||||
Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const
|
||||
template <bool return_single_block>
|
||||
Aggregator::ConvertToBlockRes<return_single_block>
|
||||
Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const
|
||||
{
|
||||
size_t rows = data_variants.sizeWithoutOverflowRow();
|
||||
const size_t rows = data_variants.sizeWithoutOverflowRow();
|
||||
#define M(NAME) \
|
||||
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
|
||||
{ \
|
||||
return convertToBlockImpl<return_single_block>( \
|
||||
*data_variants.NAME, data_variants.NAME->data, data_variants.aggregates_pool, data_variants.aggregates_pools, final, rows); \
|
||||
}
|
||||
|
||||
auto filler = [&data_variants, this](
|
||||
MutableColumns & key_columns,
|
||||
AggregateColumnsData & aggregate_columns,
|
||||
MutableColumns & final_aggregate_columns,
|
||||
bool final_)
|
||||
{
|
||||
#define M(NAME) \
|
||||
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
|
||||
convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \
|
||||
key_columns, aggregate_columns, final_aggregate_columns, data_variants.aggregates_pool, final_);
|
||||
|
||||
if (false) {} // NOLINT
|
||||
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
|
||||
#undef M
|
||||
else
|
||||
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
||||
};
|
||||
|
||||
return prepareBlockAndFill(data_variants, final, rows, filler);
|
||||
if (false) {} // NOLINT
|
||||
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
|
||||
#undef M
|
||||
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
||||
}
|
||||
|
||||
|
||||
@ -2292,7 +2273,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
|
||||
if (data_variants.type != AggregatedDataVariants::Type::without_key)
|
||||
{
|
||||
if (!data_variants.isTwoLevel())
|
||||
blocks.emplace_back(prepareBlockAndFillSingleLevel(data_variants, final));
|
||||
blocks.splice(blocks.end(), prepareBlockAndFillSingleLevel</* return_single_block */ false>(data_variants, final));
|
||||
else
|
||||
blocks.splice(blocks.end(), prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get()));
|
||||
}
|
||||
@ -3044,9 +3025,15 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
|
||||
|
||||
Block block;
|
||||
if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
|
||||
{
|
||||
block = prepareBlockAndFillWithoutKey(result, final, is_overflows);
|
||||
}
|
||||
else
|
||||
block = prepareBlockAndFillSingleLevel(result, final);
|
||||
{
|
||||
// Used during memory efficient merging (SortingAggregatedTransform expects single chunk for each bucket_id).
|
||||
constexpr bool return_single_block = true;
|
||||
block = prepareBlockAndFillSingleLevel<return_single_block>(result, final);
|
||||
}
|
||||
/// NOTE: two-level data is not possible here - chooseAggregationMethod chooses only among single-level methods.
|
||||
|
||||
if (!final)
|
||||
@ -3247,4 +3234,6 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) cons
|
||||
}
|
||||
|
||||
|
||||
template Aggregator::ConvertToBlockRes<false>
|
||||
Aggregator::prepareBlockAndFillSingleLevel<false>(AggregatedDataVariants & data_variants, bool final) const;
|
||||
}
|
||||
|
@ -1,8 +1,9 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <type_traits>
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
@ -872,6 +873,7 @@ using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants
|
||||
|
||||
class CompiledAggregateFunctionsHolder;
|
||||
class NativeWriter;
|
||||
struct OutputBlockColumns;
|
||||
|
||||
/** How are "total" values calculated with WITH TOTALS?
|
||||
* (For more details, see TotalsHavingTransform.)
|
||||
@ -933,6 +935,8 @@ public:
|
||||
bool compile_aggregate_expressions;
|
||||
size_t min_count_to_compile_aggregate_expression;
|
||||
|
||||
size_t max_block_size;
|
||||
|
||||
bool only_merge;
|
||||
|
||||
struct StatsCollectingParams
|
||||
@ -969,6 +973,7 @@ public:
|
||||
size_t min_free_disk_space_,
|
||||
bool compile_aggregate_expressions_,
|
||||
size_t min_count_to_compile_aggregate_expression_,
|
||||
size_t max_block_size_,
|
||||
bool only_merge_ = false, // true for projections
|
||||
const StatsCollectingParams & stats_collecting_params_ = {})
|
||||
: keys(keys_)
|
||||
@ -987,15 +992,16 @@ public:
|
||||
, min_free_disk_space(min_free_disk_space_)
|
||||
, compile_aggregate_expressions(compile_aggregate_expressions_)
|
||||
, min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
|
||||
, max_block_size(max_block_size_)
|
||||
, only_merge(only_merge_)
|
||||
, stats_collecting_params(stats_collecting_params_)
|
||||
{
|
||||
}
|
||||
|
||||
/// Only parameters that matter during merge.
|
||||
Params(const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
|
||||
Params(const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_, size_t max_block_size_)
|
||||
: Params(
|
||||
keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, true, {})
|
||||
keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, max_block_size_, true, {})
|
||||
{
|
||||
}
|
||||
|
||||
@ -1277,15 +1283,12 @@ private:
|
||||
void mergeSingleLevelDataImpl(
|
||||
ManyAggregatedDataVariants & non_empty_data) const;
|
||||
|
||||
template <typename Method, typename Table>
|
||||
void convertToBlockImpl(
|
||||
Method & method,
|
||||
Table & data,
|
||||
MutableColumns & key_columns,
|
||||
AggregateColumnsData & aggregate_columns,
|
||||
MutableColumns & final_aggregate_columns,
|
||||
Arena * arena,
|
||||
bool final) const;
|
||||
template <bool return_single_block>
|
||||
using ConvertToBlockRes = std::conditional_t<return_single_block, Block, BlocksList>;
|
||||
|
||||
template <bool return_single_block, typename Method, typename Table>
|
||||
ConvertToBlockRes<return_single_block>
|
||||
convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const;
|
||||
|
||||
template <typename Mapped>
|
||||
void insertAggregatesIntoColumns(
|
||||
@ -1293,27 +1296,16 @@ private:
|
||||
MutableColumns & final_aggregate_columns,
|
||||
Arena * arena) const;
|
||||
|
||||
template <typename Method, bool use_compiled_functions, typename Table>
|
||||
void convertToBlockImplFinal(
|
||||
Method & method,
|
||||
Table & data,
|
||||
std::vector<IColumn *> key_columns,
|
||||
MutableColumns & final_aggregate_columns,
|
||||
Arena * arena) const;
|
||||
template <bool use_compiled_functions>
|
||||
Block insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & places, OutputBlockColumns && out_cols, Arena * arena) const;
|
||||
|
||||
template <typename Method, typename Table>
|
||||
void convertToBlockImplNotFinal(
|
||||
Method & method,
|
||||
Table & data,
|
||||
std::vector<IColumn *> key_columns,
|
||||
AggregateColumnsData & aggregate_columns) const;
|
||||
template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
|
||||
ConvertToBlockRes<return_single_block>
|
||||
convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t rows) const;
|
||||
|
||||
template <typename Filler>
|
||||
Block prepareBlockAndFill(
|
||||
AggregatedDataVariants & data_variants,
|
||||
bool final,
|
||||
size_t rows,
|
||||
Filler && filler) const;
|
||||
template <bool return_single_block, typename Method, typename Table>
|
||||
ConvertToBlockRes<return_single_block>
|
||||
convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t rows) const;
|
||||
|
||||
template <typename Method>
|
||||
Block convertOneBucketToBlock(
|
||||
@ -1331,9 +1323,11 @@ private:
|
||||
std::atomic<bool> * is_cancelled = nullptr) const;
|
||||
|
||||
Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
|
||||
Block prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
|
||||
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const;
|
||||
|
||||
template <bool return_single_block>
|
||||
ConvertToBlockRes<return_single_block> prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
|
||||
|
||||
template <typename Method>
|
||||
BlocksList prepareBlocksAndFillTwoLevelImpl(
|
||||
AggregatedDataVariants & data_variants,
|
||||
|
@ -1763,7 +1763,7 @@ static void executeMergeAggregatedImpl(
|
||||
* but it can work more slowly.
|
||||
*/
|
||||
|
||||
Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads);
|
||||
Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads, settings.max_block_size);
|
||||
|
||||
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
@ -2359,6 +2359,7 @@ static Aggregator::Params getAggregatorParams(
|
||||
settings.min_free_disk_space_for_temporary_data,
|
||||
settings.compile_aggregate_expressions,
|
||||
settings.min_count_to_compile_aggregate_expression,
|
||||
settings.max_block_size,
|
||||
/* only_merge */ false,
|
||||
stats_collecting_params
|
||||
};
|
||||
|
@ -1,16 +1,17 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <DataTypes/IDataType.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <DataTypes/IDataType.h>
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTQueryParameter.h>
|
||||
#include <Interpreters/IdentifierSemantic.h>
|
||||
#include <Interpreters/ReplaceQueryParameterVisitor.h>
|
||||
#include <Interpreters/addTypeConversionToAST.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTQueryParameter.h>
|
||||
#include <Parsers/TablePropertiesQueriesASTs.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -30,7 +31,12 @@ void ReplaceQueryParameterVisitor::visit(ASTPtr & ast)
|
||||
else if (ast->as<ASTIdentifier>() || ast->as<ASTTableIdentifier>())
|
||||
visitIdentifier(ast);
|
||||
else
|
||||
visitChildren(ast);
|
||||
{
|
||||
if (auto * describe_query = dynamic_cast<ASTDescribeQuery *>(ast.get()); describe_query && describe_query->table_expression)
|
||||
visitChildren(describe_query->table_expression);
|
||||
else
|
||||
visitChildren(ast);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -182,6 +182,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
|
||||
transform_params->params.min_free_disk_space,
|
||||
transform_params->params.compile_aggregate_expressions,
|
||||
transform_params->params.min_count_to_compile_aggregate_expression,
|
||||
transform_params->params.max_block_size,
|
||||
/* only_merge */ false,
|
||||
transform_params->params.stats_collecting_params};
|
||||
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(src_header, std::move(params_for_set), final);
|
||||
@ -376,16 +377,15 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
|
||||
});
|
||||
|
||||
/// We add the explicit resize here, but not in case of aggregating in order, since AIO don't use two-level hash tables and thus returns only buckets with bucket_number = -1.
|
||||
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : pipeline.getNumStreams(), true /* force */);
|
||||
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, true /* force */);
|
||||
|
||||
aggregating = collector.detachProcessors(0);
|
||||
}
|
||||
else
|
||||
{
|
||||
pipeline.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<AggregatingTransform>(header, transform_params);
|
||||
});
|
||||
pipeline.addSimpleTransform([&](const Block & header) { return std::make_shared<AggregatingTransform>(header, transform_params); });
|
||||
|
||||
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, false /* force */);
|
||||
|
||||
aggregating = collector.detachProcessors(0);
|
||||
}
|
||||
|
@ -38,7 +38,8 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
|
||||
settings.max_threads,
|
||||
settings.min_free_disk_space_for_temporary_data,
|
||||
settings.compile_aggregate_expressions,
|
||||
settings.min_count_to_compile_aggregate_expression);
|
||||
settings.min_count_to_compile_aggregate_expression,
|
||||
settings.max_block_size);
|
||||
|
||||
aggregator = std::make_unique<Aggregator>(header, params);
|
||||
|
||||
|
@ -182,7 +182,8 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
|
||||
if (cur_block_size >= max_block_size || cur_block_bytes + current_memory_usage >= max_block_bytes)
|
||||
{
|
||||
if (group_by_key)
|
||||
group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false);
|
||||
group_by_block
|
||||
= params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ true>(variants, /* final= */ false);
|
||||
cur_block_bytes += current_memory_usage;
|
||||
finalizeCurrentChunk(std::move(chunk), key_end);
|
||||
return;
|
||||
@ -293,7 +294,8 @@ void AggregatingInOrderTransform::generate()
|
||||
if (cur_block_size && is_consume_finished)
|
||||
{
|
||||
if (group_by_key)
|
||||
group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false);
|
||||
group_by_block
|
||||
= params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ true>(variants, /* final= */ false);
|
||||
else
|
||||
params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns);
|
||||
variants.invalidate();
|
||||
|
@ -203,7 +203,7 @@ public:
|
||||
{
|
||||
auto & output = outputs.front();
|
||||
|
||||
if (finished && !has_input)
|
||||
if (finished && single_level_chunks.empty())
|
||||
{
|
||||
output.finish();
|
||||
return Status::Finished;
|
||||
@ -230,7 +230,7 @@ public:
|
||||
if (!processors.empty())
|
||||
return Status::ExpandPipeline;
|
||||
|
||||
if (has_input)
|
||||
if (!single_level_chunks.empty())
|
||||
return preparePushToOutput();
|
||||
|
||||
/// Single level case.
|
||||
@ -244,11 +244,14 @@ public:
|
||||
private:
|
||||
IProcessor::Status preparePushToOutput()
|
||||
{
|
||||
auto & output = outputs.front();
|
||||
output.push(std::move(current_chunk));
|
||||
has_input = false;
|
||||
if (single_level_chunks.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Some ready chunks expected");
|
||||
|
||||
if (finished)
|
||||
auto & output = outputs.front();
|
||||
output.push(std::move(single_level_chunks.back()));
|
||||
single_level_chunks.pop_back();
|
||||
|
||||
if (finished && single_level_chunks.empty())
|
||||
{
|
||||
output.finish();
|
||||
return Status::Finished;
|
||||
@ -268,17 +271,17 @@ private:
|
||||
{
|
||||
auto chunk = input.pull();
|
||||
auto bucket = getInfoFromChunk(chunk)->bucket_num;
|
||||
chunks[bucket] = std::move(chunk);
|
||||
two_level_chunks[bucket] = std::move(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
if (!shared_data->is_bucket_processed[current_bucket_num])
|
||||
return Status::NeedData;
|
||||
|
||||
if (!chunks[current_bucket_num])
|
||||
if (!two_level_chunks[current_bucket_num])
|
||||
return Status::NeedData;
|
||||
|
||||
output.push(std::move(chunks[current_bucket_num]));
|
||||
output.push(std::move(two_level_chunks[current_bucket_num]));
|
||||
|
||||
++current_bucket_num;
|
||||
if (current_bucket_num == NUM_BUCKETS)
|
||||
@ -298,27 +301,16 @@ private:
|
||||
size_t num_threads;
|
||||
|
||||
bool is_initialized = false;
|
||||
bool has_input = false;
|
||||
bool finished = false;
|
||||
|
||||
Chunk current_chunk;
|
||||
Chunks single_level_chunks;
|
||||
|
||||
UInt32 current_bucket_num = 0;
|
||||
static constexpr Int32 NUM_BUCKETS = 256;
|
||||
std::array<Chunk, NUM_BUCKETS> chunks;
|
||||
std::array<Chunk, NUM_BUCKETS> two_level_chunks;
|
||||
|
||||
Processors processors;
|
||||
|
||||
void setCurrentChunk(Chunk chunk)
|
||||
{
|
||||
if (has_input)
|
||||
throw Exception("Current chunk was already set in "
|
||||
"ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
has_input = true;
|
||||
current_chunk = std::move(chunk);
|
||||
}
|
||||
|
||||
void initialize()
|
||||
{
|
||||
is_initialized = true;
|
||||
@ -339,7 +331,7 @@ private:
|
||||
auto block = params->aggregator.prepareBlockAndFillWithoutKey(
|
||||
*first, params->final, first->type != AggregatedDataVariants::Type::without_key);
|
||||
|
||||
setCurrentChunk(convertToChunk(block));
|
||||
single_level_chunks.emplace_back(convertToChunk(block));
|
||||
}
|
||||
}
|
||||
|
||||
@ -364,9 +356,10 @@ private:
|
||||
else
|
||||
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
||||
|
||||
auto block = params->aggregator.prepareBlockAndFillSingleLevel(*first, params->final);
|
||||
auto blocks = params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ false>(*first, params->final);
|
||||
for (auto & block : blocks)
|
||||
single_level_chunks.emplace_back(convertToChunk(block));
|
||||
|
||||
setCurrentChunk(convertToChunk(block));
|
||||
finished = true;
|
||||
}
|
||||
|
||||
|
@ -5555,6 +5555,10 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
||||
if (select_query->interpolate() && !select_query->interpolate()->children.empty())
|
||||
return std::nullopt;
|
||||
|
||||
// Currently projections don't support GROUPING SET yet.
|
||||
if (select_query->group_by_with_grouping_sets)
|
||||
return std::nullopt;
|
||||
|
||||
auto query_options = SelectQueryOptions(
|
||||
QueryProcessingStage::WithMergeableState,
|
||||
/* depth */ 1,
|
||||
|
@ -313,6 +313,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
|
||||
settings.min_free_disk_space_for_temporary_data,
|
||||
settings.compile_aggregate_expressions,
|
||||
settings.min_count_to_compile_aggregate_expression,
|
||||
settings.max_block_size,
|
||||
only_merge);
|
||||
|
||||
return std::make_pair(params, only_merge);
|
||||
|
@ -1,4 +1,8 @@
|
||||
<test>
|
||||
<query>select sipHash64(number) from numbers(1e7) group by number format Null</query>
|
||||
<query>select * from (select * from numbers(1e7) group by number) group by number format Null</query>
|
||||
<query>select * from (select * from numbers(1e7) group by number) order by number format Null</query>
|
||||
|
||||
<query>select * from (select * from numbers_mt(1e7) group by number) group by number format Null</query>
|
||||
<query>select * from (select * from numbers_mt(1e7) group by number) order by number format Null</query>
|
||||
<query>select * from (select * from numbers_mt(1e7) group by number) group by number format Null settings max_bytes_before_external_group_by = 1</query>
|
||||
|
@ -1,4 +1,5 @@
|
||||
SET joined_subquery_requires_alias = 0;
|
||||
SET max_threads = 1;
|
||||
|
||||
-- incremental streaming usecase
|
||||
-- that has sense only if data filling order has guarantees of chronological order
|
||||
|
@ -28,7 +28,7 @@ WITH
|
||||
ORDER BY event_time DESC
|
||||
LIMIT 1
|
||||
) AS id
|
||||
SELECT uniqExact(thread_id)
|
||||
SELECT uniqExact(thread_id) > 2
|
||||
FROM system.query_thread_log
|
||||
WHERE (event_date >= (today() - 1)) AND (query_id = id) AND (thread_id != master_thread_id);
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-replicated-database, no-parallel, no-fasttest, no-tsan, no-asan, no-random-settings, no-s3-storage
|
||||
# Tags: no-replicated-database, no-parallel, no-fasttest, no-tsan, no-asan, no-random-settings, no-s3-storage, no-msan
|
||||
# Tag no-fasttest: max_memory_usage_for_user can interfere another queries running concurrently
|
||||
|
||||
# Regression for MemoryTracker that had been incorrectly accounted
|
||||
|
@ -6,4 +6,4 @@
|
||||
2020-01-01 00:00:00 2
|
||||
1
|
||||
499999
|
||||
5
|
||||
18
|
||||
|
@ -1,7 +1,7 @@
|
||||
DROP TABLE IF EXISTS select_final;
|
||||
|
||||
SET do_not_merge_across_partitions_select_final = 1;
|
||||
SET max_threads = 0;
|
||||
SET max_threads = 16;
|
||||
|
||||
CREATE TABLE select_final (t DateTime, x Int32, string String) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY (x, t);
|
||||
|
||||
|
@ -1,27 +1,12 @@
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
@ -29,32 +14,47 @@
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 50 1 0 49
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
1 50 51 0 1 51
|
||||
|
@ -52,6 +52,7 @@ ALL LEFT JOIN
|
||||
FROM group_bitmap_data_test
|
||||
WHERE pickup_date = '2019-01-01'
|
||||
GROUP BY city_id
|
||||
) AS js2 USING (city_id);
|
||||
) AS js2 USING (city_id)
|
||||
ORDER BY today_users, before_users, ll_users, old_users, new_users, diff_users;
|
||||
|
||||
DROP TABLE IF EXISTS group_bitmap_data_test;
|
||||
|
@ -0,0 +1,28 @@
|
||||
a 2
|
||||
a x 1
|
||||
a y 1
|
||||
b 2
|
||||
b x 1
|
||||
b y 1
|
||||
4
|
||||
a 2
|
||||
a x 1
|
||||
a y 1
|
||||
b 2
|
||||
b x 1
|
||||
b y 1
|
||||
4
|
||||
x 2
|
||||
y 2
|
||||
a 2
|
||||
a x 1
|
||||
a y 1
|
||||
b 2
|
||||
b x 1
|
||||
b y 1
|
||||
a x 1
|
||||
a y 1
|
||||
b x 1
|
||||
b y 1
|
||||
|
||||
4
|
@ -0,0 +1,15 @@
|
||||
drop table if exists test;
|
||||
|
||||
create table test(dim1 String, dim2 String, projection p1 (select dim1, dim2, count() group by dim1, dim2)) engine MergeTree order by dim1;
|
||||
|
||||
insert into test values ('a', 'x') ('a', 'y') ('b', 'x') ('b', 'y');
|
||||
|
||||
select dim1, dim2, count() from test group by grouping sets ((dim1, dim2), dim1) order by dim1, dim2, count();
|
||||
|
||||
select dim1, dim2, count() from test group by dim1, dim2 with rollup order by dim1, dim2, count();
|
||||
|
||||
select dim1, dim2, count() from test group by dim1, dim2 with cube order by dim1, dim2, count();
|
||||
|
||||
select dim1, dim2, count() from test group by dim1, dim2 with totals order by dim1, dim2, count();
|
||||
|
||||
drop table test;
|
@ -12,7 +12,7 @@ select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by n
|
||||
-- and the query with GROUP BY on remote servers will first do GROUP BY and then send the block,
|
||||
-- so the initiator will first receive all blocks from remotes and only after start merging,
|
||||
-- and will hit the memory limit.
|
||||
select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by number order by number limit 1e6 settings distributed_group_by_no_merge=2, max_memory_usage='100Mi'; -- { serverError 241 }
|
||||
select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by number order by number limit 1e6 settings distributed_group_by_no_merge=2, max_memory_usage='100Mi', max_block_size=1e12; -- { serverError 241 }
|
||||
|
||||
-- with optimize_aggregation_in_order=1 remote servers will produce blocks more frequently,
|
||||
-- since they don't need to wait until the aggregation will be finished,
|
||||
|
@ -1,24 +1,24 @@
|
||||
SET join_algorithm = 'full_sorting_merge';
|
||||
|
||||
SELECT * FROM (SELECT 1 as key) AS t1 JOIN (SELECT 1 as key) t2 ON t1.key = t2.key;
|
||||
SELECT * FROM (SELECT 1 as key) AS t1 JOIN (SELECT 1 as key) t2 ON t1.key = t2.key ORDER BY key;
|
||||
|
||||
SELECT * FROM (SELECT 1 as key) AS t1 JOIN (SELECT 1 as key) t2 USING key;
|
||||
SELECT * FROM (SELECT 1 as key) AS t1 JOIN (SELECT 1 as key) t2 USING key ORDER BY key;
|
||||
|
||||
SELECT * FROM (SELECT 1 :: UInt32 as key) AS t1 FULL JOIN (SELECT 1 :: Nullable(UInt32) as key) t2 USING (key);
|
||||
SELECT * FROM (SELECT 1 :: UInt32 as key) AS t1 FULL JOIN (SELECT 1 :: Nullable(UInt32) as key) t2 USING (key) ORDER BY key;
|
||||
|
||||
SELECT * FROM (SELECT 1 :: UInt32 as key) AS t1 FULL JOIN (SELECT NULL :: Nullable(UInt32) as key) t2 USING (key);
|
||||
SELECT * FROM (SELECT 1 :: UInt32 as key) AS t1 FULL JOIN (SELECT NULL :: Nullable(UInt32) as key) t2 USING (key) ORDER BY key;
|
||||
|
||||
SELECT * FROM (SELECT 1 :: Int32 as key) AS t1 JOIN (SELECT 1 :: UInt32 as key) t2 ON t1.key = t2.key;
|
||||
SELECT * FROM (SELECT 1 :: Int32 as key) AS t1 JOIN (SELECT 1 :: UInt32 as key) t2 ON t1.key = t2.key ORDER BY key;
|
||||
|
||||
SELECT * FROM (SELECT -1 :: Nullable(Int32) as key) AS t1 FULL JOIN (SELECT 4294967295 :: UInt32 as key) t2 ON t1.key = t2.key;
|
||||
SELECT * FROM (SELECT -1 :: Nullable(Int32) as key) AS t1 FULL JOIN (SELECT 4294967295 :: UInt32 as key) t2 ON t1.key = t2.key ORDER BY key;
|
||||
|
||||
SELECT * FROM (SELECT 'a' :: LowCardinality(String) AS key) AS t1 JOIN (SELECT 'a' :: String AS key) AS t2 ON t1.key = t2.key;
|
||||
SELECT * FROM (SELECT 'a' :: LowCardinality(String) AS key) AS t1 JOIN (SELECT 'a' :: String AS key) AS t2 ON t1.key = t2.key ORDER BY key;
|
||||
|
||||
SELECT * FROM (SELECT 'a' :: LowCardinality(Nullable(String)) AS key) AS t1 JOIN (SELECT 'a' :: String AS key) AS t2 ON t1.key = t2.key;
|
||||
SELECT * FROM (SELECT 'a' :: LowCardinality(Nullable(String)) AS key) AS t1 JOIN (SELECT 'a' :: String AS key) AS t2 ON t1.key = t2.key ORDER BY key;
|
||||
|
||||
SELECT * FROM (SELECT 'a' :: LowCardinality(Nullable(String)) AS key) AS t1 JOIN (SELECT 'a' :: Nullable(String) AS key) AS t2 ON t1.key = t2.key;
|
||||
SELECT * FROM (SELECT 'a' :: LowCardinality(Nullable(String)) AS key) AS t1 JOIN (SELECT 'a' :: Nullable(String) AS key) AS t2 ON t1.key = t2.key ORDER BY key;
|
||||
|
||||
SELECT * FROM (SELECT 'a' :: LowCardinality(String) AS key) AS t1 JOIN (SELECT 'a' :: LowCardinality(String) AS key) AS t2 ON t1.key = t2.key;
|
||||
SELECT * FROM (SELECT 'a' :: LowCardinality(String) AS key) AS t1 JOIN (SELECT 'a' :: LowCardinality(String) AS key) AS t2 ON t1.key = t2.key ORDER BY key;
|
||||
|
||||
SELECT 5 == count() FROM (SELECT number as a from numbers(5)) as t1 LEFT JOIN (SELECT number as b from numbers(5) WHERE number > 100) as t2 ON t1.a = t2.b;
|
||||
SELECT 5 == count() FROM (SELECT number as a from numbers(5) WHERE number > 100) as t1 RIGHT JOIN (SELECT number as b from numbers(5)) as t2 ON t1.a = t2.b;
|
||||
SELECT 5 == count() FROM (SELECT number as a from numbers(5)) as t1 LEFT JOIN (SELECT number as b from numbers(5) WHERE number > 100) as t2 ON t1.a = t2.b ORDER BY 1;
|
||||
SELECT 5 == count() FROM (SELECT number as a from numbers(5) WHERE number > 100) as t1 RIGHT JOIN (SELECT number as b from numbers(5)) as t2 ON t1.a = t2.b ORDER BY 1;
|
||||
|
@ -1,5 +1,22 @@
|
||||
-- { echoOn }
|
||||
|
||||
explain pipeline select * from (select * from numbers(1e8) group by number) group by number;
|
||||
(Expression)
|
||||
ExpressionTransform × 16
|
||||
(Aggregating)
|
||||
Resize 16 → 16
|
||||
AggregatingTransform × 16
|
||||
StrictResize 16 → 16
|
||||
(Expression)
|
||||
ExpressionTransform × 16
|
||||
(Aggregating)
|
||||
Resize 1 → 16
|
||||
AggregatingTransform
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(ReadFromStorage)
|
||||
Limit
|
||||
Numbers 0 → 1
|
||||
explain pipeline select * from (select * from numbers_mt(1e8) group by number) group by number;
|
||||
(Expression)
|
||||
ExpressionTransform × 16
|
||||
|
@ -1,9 +1,12 @@
|
||||
set max_threads = 16;
|
||||
set prefer_localhost_replica = 1;
|
||||
set optimize_aggregation_in_order = 0;
|
||||
set max_block_size = 65505;
|
||||
|
||||
-- { echoOn }
|
||||
|
||||
explain pipeline select * from (select * from numbers(1e8) group by number) group by number;
|
||||
|
||||
explain pipeline select * from (select * from numbers_mt(1e8) group by number) group by number;
|
||||
|
||||
explain pipeline select * from (select * from numbers_mt(1e8) group by number) order by number;
|
||||
|
@ -0,0 +1 @@
|
||||
1
|
@ -0,0 +1,9 @@
|
||||
SET max_block_size = 4213;
|
||||
|
||||
SELECT DISTINCT (blockSize() <= 4213)
|
||||
FROM
|
||||
(
|
||||
SELECT number
|
||||
FROM numbers(100000)
|
||||
GROUP BY number
|
||||
);
|
@ -7,3 +7,10 @@ UInt64 String DateTime Map(UUID, Array(Float32))
|
||||
13 str 2022-08-04 18:30:53 {'10':[11,12],'13':[14,15]}
|
||||
1
|
||||
1
|
||||
_CAST(42, \'Int64\') Int64
|
||||
_CAST([1, 2, 3], \'Array(UInt8)\') Array(UInt8)
|
||||
_CAST(((\'abc\', 22), (\'def\', 33)), \'Map(String, UInt8)\') Map(String, UInt8)
|
||||
_CAST([[4, 5, 6], [7], [8, 9]], \'Array(Array(UInt8))\') Array(Array(UInt8))
|
||||
_CAST(((10, [11, 12]), (13, [14, 15])), \'Map(UInt8, Array(UInt8))\') Map(UInt8, Array(UInt8))
|
||||
_CAST(((\'ghj\', ((\'klm\', [16, 17]))), (\'nop\', ((\'rst\', [18])))), \'Map(String, Map(String, Array(UInt8)))\') Map(String, Map(String, Array(UInt8)))
|
||||
a Int8
|
||||
|
@ -68,13 +68,27 @@ $CLICKHOUSE_CLIENT -n -q "select {n: UInt8} -- { serverError 456 }"
|
||||
$CLICKHOUSE_CLIENT -n -q "set param_n = 12; set param_n = 13; select {n: UInt8}"
|
||||
|
||||
|
||||
# but multiple different parameters could be defined within each session
|
||||
# multiple different parameters could be defined within each session
|
||||
$CLICKHOUSE_CLIENT -n -q "
|
||||
set param_a = 13, param_b = 'str';
|
||||
set param_c = '2022-08-04 18:30:53';
|
||||
set param_d = '{\'10\': [11, 12], \'13\': [14, 15]}';
|
||||
select {a: UInt32}, {b: String}, {c: DateTime}, {d: Map(String, Array(UInt8))}"
|
||||
|
||||
|
||||
# empty parameter name is not allowed
|
||||
$CLICKHOUSE_CLIENT --param_="" -q "select 1" 2>&1 | grep -c 'Code: 36'
|
||||
$CLICKHOUSE_CLIENT -q "set param_ = ''" 2>&1 | grep -c 'Code: 36'
|
||||
|
||||
|
||||
# parameters are also supported for DESCRIBE TABLE queries
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--param_id="42" \
|
||||
--param_arr="[1, 2, 3]" \
|
||||
--param_map="{'abc': 22, 'def': 33}" \
|
||||
--param_mul_arr="[[4, 5, 6], [7], [8, 9]]" \
|
||||
--param_map_arr="{10: [11, 12], 13: [14, 15]}" \
|
||||
--param_map_map_arr="{'ghj': {'klm': [16, 17]}, 'nop': {'rst': [18]}}" \
|
||||
-q "describe table(select {id: Int64}, {arr: Array(UInt8)}, {map: Map(String, UInt8)}, {mul_arr: Array(Array(UInt8))}, {map_arr: Map(UInt8, Array(UInt8))}, {map_map_arr: Map(String, Map(String, Array(UInt8)))})"
|
||||
|
||||
$CLICKHOUSE_CLIENT --param_p=42 -q "describe table (select * from (select {p:Int8} as a group by a) order by a)"
|
||||
|
Loading…
Reference in New Issue
Block a user