Merge branch 'master' into test-repeat-last-command

This commit is contained in:
Alexey Milovidov 2022-09-04 21:41:30 +03:00 committed by GitHub
commit fc59e557a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
61 changed files with 1218 additions and 745 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit bdba298189e29995892de78dcecf64d127444e81 Subproject commit 1be805e7cb2494aa8170015493474379b0362dfc

View File

@ -54,9 +54,8 @@ set(SRCS
add_library(cxx ${SRCS}) add_library(cxx ${SRCS})
set_target_properties(cxx PROPERTIES FOLDER "contrib/libcxx-cmake") set_target_properties(cxx PROPERTIES FOLDER "contrib/libcxx-cmake")
target_include_directories(cxx SYSTEM BEFORE PUBLIC target_include_directories(cxx SYSTEM BEFORE PRIVATE $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/src>)
$<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include> target_include_directories(cxx SYSTEM BEFORE PUBLIC $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>)
$<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}>/src)
target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI) target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)
# Enable capturing stack traces for all exceptions. # Enable capturing stack traces for all exceptions.

View File

@ -83,5 +83,8 @@ RUN export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
--yes --no-install-recommends \ --yes --no-install-recommends \
&& apt-get clean && apt-get clean
# for external_symbolizer_path
RUN ln -s /usr/bin/llvm-symbolizer-15 /usr/bin/llvm-symbolizer
COPY build.sh / COPY build.sh /
CMD ["bash", "-c", "/build.sh 2>&1"] CMD ["bash", "-c", "/build.sh 2>&1"]

View File

@ -37,7 +37,7 @@ sudo xcode-select --install
``` bash ``` bash
brew update brew update
brew install cmake ninja libtool gettext llvm gcc binutils grep findutils brew install ccache cmake ninja libtool gettext llvm gcc binutils grep findutils
``` ```
## Checkout ClickHouse Sources {#checkout-clickhouse-sources} ## Checkout ClickHouse Sources {#checkout-clickhouse-sources}

View File

@ -15,7 +15,7 @@ Usage examples:
## Usage in ClickHouse Server {#usage-in-clickhouse-server} ## Usage in ClickHouse Server {#usage-in-clickhouse-server}
``` sql ``` sql
ENGINE = GenerateRandom(random_seed, max_string_length, max_array_length) ENGINE = GenerateRandom([random_seed] [,max_string_length] [,max_array_length])
``` ```
The `max_array_length` and `max_string_length` parameters specify maximum length of all The `max_array_length` and `max_string_length` parameters specify maximum length of all

View File

@ -6,19 +6,28 @@ title: "UK Property Price Paid"
--- ---
The dataset contains data about prices paid for real-estate property in England and Wales. The data is available since year 1995. The dataset contains data about prices paid for real-estate property in England and Wales. The data is available since year 1995.
The size of the dataset in uncompressed form is about 4 GiB and it will take about 270 MiB in ClickHouse. The size of the dataset in uncompressed form is about 4 GiB and it will take about 278 MiB in ClickHouse.
Source: https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads <br/> Source: https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads
Description of the fields: https://www.gov.uk/guidance/about-the-price-paid-data Description of the fields: https://www.gov.uk/guidance/about-the-price-paid-data
Contains HM Land Registry data © Crown copyright and database right 2021. This data is licensed under the Open Government Licence v3.0. Contains HM Land Registry data © Crown copyright and database right 2021. This data is licensed under the Open Government Licence v3.0.
## Download the Dataset {#download-dataset}
Run the command:
```bash
wget http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv
```
Download will take about 2 minutes with good internet connection.
## Create the Table {#create-table} ## Create the Table {#create-table}
```sql ```sql
CREATE TABLE uk_price_paid CREATE TABLE uk_price_paid
( (
uuid UUID,
price UInt32, price UInt32,
date Date, date Date,
postcode1 LowCardinality(String), postcode1 LowCardinality(String),
@ -33,68 +42,65 @@ CREATE TABLE uk_price_paid
town LowCardinality(String), town LowCardinality(String),
district LowCardinality(String), district LowCardinality(String),
county LowCardinality(String), county LowCardinality(String),
category UInt8, category UInt8
category2 UInt8 ) ENGINE = MergeTree ORDER BY (postcode1, postcode2, addr1, addr2);
) ORDER BY (postcode1, postcode2, addr1, addr2);
``` ```
## Preprocess and Import Data {#preprocess-import-data} ## Preprocess and Import Data {#preprocess-import-data}
In this example, we define the structure of source data from the CSV file and specify a query to preprocess the data with either `clickhouse-client` or the web based Play UI. We will use `clickhouse-local` tool for data preprocessing and `clickhouse-client` to upload it.
In this example, we define the structure of source data from the CSV file and specify a query to preprocess the data with `clickhouse-local`.
The preprocessing is: The preprocessing is:
- splitting the postcode to two different columns `postcode1` and `postcode2` that are better for storage and queries; - splitting the postcode to two different columns `postcode1` and `postcode2` that is better for storage and queries;
- coverting the `time` field to date as it only contains 00:00 time; - coverting the `time` field to date as it only contains 00:00 time;
- ignoring the [UUid](../../sql-reference/data-types/uuid.md) field because we don't need it for analysis; - ignoring the [UUid](../../sql-reference/data-types/uuid.md) field because we don't need it for analysis;
- transforming `type` and `duration` to more readable Enum fields with function [transform](../../sql-reference/functions/other-functions.md#transform); - transforming `type` and `duration` to more readable Enum fields with function [transform](../../sql-reference/functions/other-functions.md#transform);
- transforming `is_new` and `category` fields from single-character string (`Y`/`N` and `A`/`B`) to [UInt8](../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-uint256-int8-int16-int32-int64-int128-int256) field with 0 and 1. - transforming `is_new` and `category` fields from single-character string (`Y`/`N` and `A`/`B`) to [UInt8](../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-uint256-int8-int16-int32-int64-int128-int256) field with 0 and 1.
Preprocessed data is piped directly to `clickhouse-client` to be inserted into ClickHouse table in streaming fashion.
```bash ```bash
INSERT INTO uk_price_paid clickhouse-local --input-format CSV --structure '
WITH uuid String,
splitByChar(' ', postcode) AS p price UInt32,
SELECT time DateTime,
replaceRegexpAll(uuid_string, '{|}','') AS uuid, postcode String,
toUInt32(price_string) AS price, a String,
parseDateTimeBestEffortUS(time) AS date, b String,
p[1] AS postcode1, c String,
p[2] AS postcode2, addr1 String,
transform(a, ['T', 'S', 'D', 'F', 'O'], ['terraced', 'semi-detached', 'detached', 'flat', 'other']) AS type, addr2 String,
b = 'Y' AS is_new, street String,
transform(c, ['F', 'L', 'U'], ['freehold', 'leasehold', 'unknown']) AS duration, locality String,
addr1, town String,
addr2, district String,
street, county String,
locality, d String,
town, e String
district, ' --query "
county, WITH splitByChar(' ', postcode) AS p
d = 'B' AS category, SELECT
e = 'B' AS category2 price,
FROM url( toDate(time) AS date,
'http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv', p[1] AS postcode1,
'CSV', p[2] AS postcode2,
'uuid_string String, transform(a, ['T', 'S', 'D', 'F', 'O'], ['terraced', 'semi-detached', 'detached', 'flat', 'other']) AS type,
price_string String, b = 'Y' AS is_new,
time String, transform(c, ['F', 'L', 'U'], ['freehold', 'leasehold', 'unknown']) AS duration,
postcode String, addr1,
a String, addr2,
b String, street,
c String, locality,
addr1 String, town,
addr2 String, district,
street String, county,
locality String, d = 'B' AS category
town String, FROM table" --date_time_input_format best_effort < pp-complete.csv | clickhouse-client --query "INSERT INTO uk_price_paid FORMAT TSV"
district String,
county String,
d String,
e String'
)
SETTINGS max_http_get_redirects=1;
``` ```
It will take about 2 minutes depending on where you are in the world, and where your ClickHouse servers are. Almost all of the time is the download time of the CSV file from the UK government server. It will take about 40 seconds.
## Validate the Data {#validate-data} ## Validate the Data {#validate-data}
@ -106,13 +112,13 @@ SELECT count() FROM uk_price_paid;
Result: Result:
```response ```text
┌──count()─┐ ┌──count()─┐
│ 27450499 │ 26321785
└──────────┘ └──────────┘
``` ```
The size of dataset in ClickHouse is just 540 MiB, check it. The size of dataset in ClickHouse is just 278 MiB, check it.
Query: Query:
@ -124,14 +130,10 @@ Result:
```text ```text
┌─formatReadableSize(total_bytes)─┐ ┌─formatReadableSize(total_bytes)─┐
545.04 MiB │ 278.80 MiB │
└─────────────────────────────────┘ └─────────────────────────────────┘
``` ```
:::note
The above size is for a replicated table, if you are using this dataset with a single instance the size will be half.
:::
## Run Some Queries {#run-queries} ## Run Some Queries {#run-queries}
### Query 1. Average Price Per Year {#average-price} ### Query 1. Average Price Per Year {#average-price}
@ -144,7 +146,7 @@ SELECT toYear(date) AS year, round(avg(price)) AS price, bar(price, 0, 1000000,
Result: Result:
```response ```text
┌─year─┬──price─┬─bar(round(avg(price)), 0, 1000000, 80)─┐ ┌─year─┬──price─┬─bar(round(avg(price)), 0, 1000000, 80)─┐
│ 1995 │ 67932 │ █████▍ │ │ 1995 │ 67932 │ █████▍ │
│ 1996 │ 71505 │ █████▋ │ │ 1996 │ 71505 │ █████▋ │

View File

@ -175,6 +175,10 @@ You can also choose to use [HTTP compression](https://en.wikipedia.org/wiki/HTTP
- `br` - `br`
- `deflate` - `deflate`
- `xz` - `xz`
- `zstd`
- `lz4`
- `bz2`
- `snappy`
To send a compressed `POST` request, append the request header `Content-Encoding: compression_method`. To send a compressed `POST` request, append the request header `Content-Encoding: compression_method`.
In order for ClickHouse to compress the response, enable compression with [enable_http_compression](../operations/settings/settings.md#settings-enable_http_compression) setting and append `Accept-Encoding: compression_method` header to the request. You can configure the data compression level in the [http_zlib_compression_level](../operations/settings/settings.md#settings-http_zlib_compression_level) setting for all compression methods. In order for ClickHouse to compress the response, enable compression with [enable_http_compression](../operations/settings/settings.md#settings-enable_http_compression) setting and append `Accept-Encoding: compression_method` header to the request. You can configure the data compression level in the [http_zlib_compression_level](../operations/settings/settings.md#settings-http_zlib_compression_level) setting for all compression methods.

View File

@ -94,6 +94,21 @@ It is also possible for `Flat`, `Hashed`, `ComplexKeyHashed` dictionaries to onl
- If the source is HTTP then `update_field` will be added as a query parameter with the last update time as the parameter value. - If the source is HTTP then `update_field` will be added as a query parameter with the last update time as the parameter value.
- If the source is Executable then `update_field` will be added as an executable script argument with the last update time as the argument value. - If the source is Executable then `update_field` will be added as an executable script argument with the last update time as the argument value.
- If the source is ClickHouse, MySQL, PostgreSQL, ODBC there will be an additional part of `WHERE`, where `update_field` is compared as greater or equal with the last update time. - If the source is ClickHouse, MySQL, PostgreSQL, ODBC there will be an additional part of `WHERE`, where `update_field` is compared as greater or equal with the last update time.
- Per default, this `WHERE`-condition is checked at the highest level of the SQL-Query. Alternatively, the condition can be checked in any other `WHERE`-clause within the query using the `{condition}`-keyword. Example:
```sql
...
SOURCE(CLICKHOUSE(...
update_field 'added_time'
QUERY '
SELECT my_arr.1 AS x, my_arr.2 AS y, creation_time
FROM (
SELECT arrayZip(x_arr, y_arr) AS my_arr, creation_time
FROM dictionary_source
WHERE {condition}
)'
))
...
```
If `update_field` option is set, additional option `update_lag` can be set. Value of `update_lag` option is subtracted from previous update time before request updated data. If `update_field` option is set, additional option `update_lag` can be set. Value of `update_lag` option is subtracted from previous update time before request updated data.

View File

@ -267,7 +267,7 @@ Result:
└────────────────┘ └────────────────┘
``` ```
:::Attention :::note
The return type of `toStartOf*`, `toLastDayOfMonth`, `toMonday` functions described below is `Date` or `DateTime`. The return type of `toStartOf*`, `toLastDayOfMonth`, `toMonday` functions described below is `Date` or `DateTime`.
Though these functions can take values of the extended types `Date32` and `DateTime64` as an argument, passing them a time outside the normal range (year 1970 to 2149 for `Date` / 2106 for `DateTime`) will produce wrong results. Though these functions can take values of the extended types `Date32` and `DateTime64` as an argument, passing them a time outside the normal range (year 1970 to 2149 for `Date` / 2106 for `DateTime`) will produce wrong results.
In case argument is out of normal range: In case argument is out of normal range:

View File

@ -430,5 +430,119 @@ Result:
└────────────────────────────┘ └────────────────────────────┘
``` ```
## mapApply
**Syntax**
```sql
mapApply(func, map)
```
**Parameters**
- `func` - [Lamda function](../../sql-reference/functions/index.md#higher-order-functions---operator-and-lambdaparams-expr-function).
- `map` — [Map](../../sql-reference/data-types/map.md).
**Returned value**
- Returns a map obtained from the original map by application of `func(map1[i], …, mapN[i])` for each element.
**Example**
Query:
```sql
SELECT mapApply((k, v) -> (k, v * 10), _map) AS r
FROM
(
SELECT map('key1', number, 'key2', number * 2) AS _map
FROM numbers(3)
)
```
Result:
```text
┌─r─────────────────────┐
│ {'key1':0,'key2':0} │
│ {'key1':10,'key2':20} │
│ {'key1':20,'key2':40} │
└───────────────────────┘
```
## mapFilter
**Syntax**
```sql
mapFilter(func, map)
```
**Parameters**
- `func` - [Lamda function](../../sql-reference/functions/index.md#higher-order-functions---operator-and-lambdaparams-expr-function).
- `map` — [Map](../../sql-reference/data-types/map.md).
**Returned value**
- Returns a map containing only the elements in `map` for which `func(map1[i], …, mapN[i])` returns something other than 0.
**Example**
Query:
```sql
SELECT mapFilter((k, v) -> ((v % 2) = 0), _map) AS r
FROM
(
SELECT map('key1', number, 'key2', number * 2) AS _map
FROM numbers(3)
)
```
Result:
```text
┌─r───────────────────┐
│ {'key1':0,'key2':0} │
│ {'key2':2} │
│ {'key1':2,'key2':4} │
└─────────────────────┘
```
## mapUpdate
**Syntax**
```sql
mapUpdate(map1, map2)
```
**Parameters**
- `map1` [Map](../../sql-reference/data-types/map.md).
- `map2` [Map](../../sql-reference/data-types/map.md).
**Returned value**
- Returns a map1 with values updated of values for the corresponding keys in map2.
**Example**
Query:
```sql
SELECT mapUpdate(map('key1', 0, 'key3', 0), map('key1', 10, 'key2', 10)) AS map;
```
Result:
```text
┌─map────────────────────────────┐
│ {'key3':0,'key1':10,'key2':10} │
└────────────────────────────────┘
```
[Original article](https://clickhouse.com/docs/en/sql-reference/functions/tuple-map-functions/) <!--hide--> [Original article](https://clickhouse.com/docs/en/sql-reference/functions/tuple-map-functions/) <!--hide-->

View File

@ -303,7 +303,7 @@ SHOW USERS
## SHOW ROLES ## SHOW ROLES
Returns a list of [roles](../../operations/access-rights.md#role-management). To view another parameters, see system tables [system.roles](../../operations/system-tables/roles.md#system_tables-roles) and [system.role-grants](../../operations/system-tables/role-grants.md#system_tables-role_grants). Returns a list of [roles](../../operations/access-rights.md#role-management). To view another parameters, see system tables [system.roles](../../operations/system-tables/roles.md#system_tables-roles) and [system.role_grants](../../operations/system-tables/role-grants.md#system_tables-role_grants).
### Syntax ### Syntax

View File

@ -267,7 +267,7 @@ SELECT toUnixTimestamp('2017-11-05 08:07:47', 'Asia/Tokyo') AS unix_timestamp;
└────────────────┘ └────────────────┘
``` ```
:::Attention :::note
Тип возвращаемого описанными далее функциями `toStartOf*`, `toMonday` значения - `Date` или `DateTime`. Тип возвращаемого описанными далее функциями `toStartOf*`, `toMonday` значения - `Date` или `DateTime`.
Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат. Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат.
Возвращаемые значения для значений вне нормального диапазона: Возвращаемые значения для значений вне нормального диапазона:
@ -277,7 +277,7 @@ SELECT toUnixTimestamp('2017-11-05 08:07:47', 'Asia/Tokyo') AS unix_timestamp;
* `2149-05-31` будет результатом функции `toLastDayOfMonth` при обработке аргумента больше `2149-05-31`. * `2149-05-31` будет результатом функции `toLastDayOfMonth` при обработке аргумента больше `2149-05-31`.
::: :::
:::Attention :::note
Тип возвращаемого описанными далее функциями `toStartOf*`, `toLastDayOfMonth`, `toMonday` значения - `Date` или `DateTime`. Тип возвращаемого описанными далее функциями `toStartOf*`, `toLastDayOfMonth`, `toMonday` значения - `Date` или `DateTime`.
Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат. Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат.
Возвращаемые значения для значений вне нормального диапазона: Возвращаемые значения для значений вне нормального диапазона:

View File

@ -305,7 +305,7 @@ SHOW USERS
## SHOW ROLES {#show-roles-statement} ## SHOW ROLES {#show-roles-statement}
Выводит список [ролей](../../operations/access-rights.md#role-management). Для просмотра параметров ролей, см. системные таблицы [system.roles](../../operations/system-tables/roles.md#system_tables-roles) и [system.role-grants](../../operations/system-tables/role-grants.md#system_tables-role_grants). Выводит список [ролей](../../operations/access-rights.md#role-management). Для просмотра параметров ролей, см. системные таблицы [system.roles](../../operations/system-tables/roles.md#system_tables-roles) и [system.role_grants](../../operations/system-tables/role-grants.md#system_tables-role_grants).
### Синтаксис {#show-roles-syntax} ### Синтаксис {#show-roles-syntax}

View File

@ -0,0 +1,67 @@
#pragma once
#include "ICommand.h"
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class CommandMkDir : public ICommand
{
public:
CommandMkDir()
{
command_name = "mkdir";
command_option_description.emplace(createOptionsDescription("Allowed options", getTerminalWidth()));
description = "Create directory or directories recursively";
usage = "mkdir [OPTION]... <PATH>";
command_option_description->add_options()
("recursive", "recursively create directories")
;
}
void processOptions(
Poco::Util::LayeredConfiguration & config,
po::variables_map & options) const override
{
if (options.count("recursive"))
config.setBool("recursive", true);
}
void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
Poco::Util::LayeredConfiguration & config) override
{
if (command_arguments.size() != 1)
{
printHelpMessage();
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
}
String disk_name = config.getString("disk", "default");
String path = command_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path = fullPathWithValidate(disk, path);
bool recursive = config.getBool("recursive", false);
if (recursive)
disk->createDirectories(full_path);
else
disk->createDirectory(full_path);
}
};
}
std::unique_ptr <DB::ICommand> makeCommandMkDir()
{
return std::make_unique<DB::CommandMkDir>();
}

View File

@ -63,7 +63,7 @@ void DisksApp::addOptions(
positional_options_description.add("command_name", 1); positional_options_description.add("command_name", 1);
supported_commands = {"list-disks", "list", "move", "remove", "link", "copy", "write", "read"}; supported_commands = {"list-disks", "list", "move", "remove", "link", "copy", "write", "read", "mkdir"};
command_descriptions.emplace("list-disks", makeCommandListDisks()); command_descriptions.emplace("list-disks", makeCommandListDisks());
command_descriptions.emplace("list", makeCommandList()); command_descriptions.emplace("list", makeCommandList());
@ -73,6 +73,7 @@ void DisksApp::addOptions(
command_descriptions.emplace("copy", makeCommandCopy()); command_descriptions.emplace("copy", makeCommandCopy());
command_descriptions.emplace("write", makeCommandWrite()); command_descriptions.emplace("write", makeCommandWrite());
command_descriptions.emplace("read", makeCommandRead()); command_descriptions.emplace("read", makeCommandRead());
command_descriptions.emplace("mkdir", makeCommandMkDir());
} }
void DisksApp::processOptions() void DisksApp::processOptions()

View File

@ -4,6 +4,7 @@
#include "CommandLink.cpp" #include "CommandLink.cpp"
#include "CommandList.cpp" #include "CommandList.cpp"
#include "CommandListDisks.cpp" #include "CommandListDisks.cpp"
#include "CommandMkDir.cpp"
#include "CommandMove.cpp" #include "CommandMove.cpp"
#include "CommandRead.cpp" #include "CommandRead.cpp"
#include "CommandRemove.cpp" #include "CommandRemove.cpp"

View File

@ -65,3 +65,4 @@ std::unique_ptr <DB::ICommand> makeCommandMove();
std::unique_ptr <DB::ICommand> makeCommandRead(); std::unique_ptr <DB::ICommand> makeCommandRead();
std::unique_ptr <DB::ICommand> makeCommandRemove(); std::unique_ptr <DB::ICommand> makeCommandRemove();
std::unique_ptr <DB::ICommand> makeCommandWrite(); std::unique_ptr <DB::ICommand> makeCommandWrite();
std::unique_ptr <DB::ICommand> makeCommandMkDir();

View File

@ -52,15 +52,10 @@ void CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (current_thread) if (current_thread)
{ {
Int64 will_be = current_thread->untracked_memory + size; Int64 will_be = current_thread->untracked_memory + size;
Int64 limit = current_thread->untracked_memory_limit + current_thread->untracked_memory_limit_increase;
if (will_be > limit) if (will_be > current_thread->untracked_memory_limit)
{ {
/// Increase limit before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
current_thread->untracked_memory_limit_increase = current_thread->untracked_memory_limit;
memory_tracker->allocImpl(will_be, throw_if_memory_exceeded); memory_tracker->allocImpl(will_be, throw_if_memory_exceeded);
current_thread->untracked_memory_limit_increase = 0;
current_thread->untracked_memory = 0; current_thread->untracked_memory = 0;
} }
else else

View File

@ -133,8 +133,6 @@ public:
Int64 untracked_memory = 0; Int64 untracked_memory = 0;
/// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters. /// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.
Int64 untracked_memory_limit = 4 * 1024 * 1024; Int64 untracked_memory_limit = 4 * 1024 * 1024;
/// Increase limit in case of exception.
Int64 untracked_memory_limit_increase = 0;
/// Statistics of read and write rows/bytes /// Statistics of read and write rows/bytes
Progress progress_in; Progress progress_in;

View File

@ -431,7 +431,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks()
{ {
if (schedule) if (schedule)
{ {
std::lock_guard lock(bg_tasks_mutex); std::unique_lock lock(bg_tasks_mutex);
{ {
while (!upload_object_tasks.empty() && upload_object_tasks.front().is_finised) while (!upload_object_tasks.empty() && upload_object_tasks.front().is_finised)
{ {
@ -442,7 +442,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks()
if (exception) if (exception)
{ {
waitForAllBackGroundTasks(); waitForAllBackGroundTasksUnlocked(lock);
std::rethrow_exception(exception); std::rethrow_exception(exception);
} }
@ -457,7 +457,15 @@ void WriteBufferFromS3::waitForAllBackGroundTasks()
if (schedule) if (schedule)
{ {
std::unique_lock lock(bg_tasks_mutex); std::unique_lock lock(bg_tasks_mutex);
bg_tasks_condvar.wait(lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; }); waitForAllBackGroundTasksUnlocked(lock);
}
}
void WriteBufferFromS3::waitForAllBackGroundTasksUnlocked(std::unique_lock<std::mutex> & bg_tasks_lock)
{
if (schedule)
{
bg_tasks_condvar.wait(bg_tasks_lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; });
while (!upload_object_tasks.empty()) while (!upload_object_tasks.empty())
{ {
@ -472,7 +480,7 @@ void WriteBufferFromS3::waitForAllBackGroundTasks()
if (put_object_task) if (put_object_task)
{ {
bg_tasks_condvar.wait(lock, [this]() { return put_object_task->is_finised; }); bg_tasks_condvar.wait(bg_tasks_lock, [this]() { return put_object_task->is_finised; });
if (put_object_task->exception) if (put_object_task->exception)
std::rethrow_exception(put_object_task->exception); std::rethrow_exception(put_object_task->exception);
} }

View File

@ -84,6 +84,7 @@ private:
void waitForReadyBackGroundTasks(); void waitForReadyBackGroundTasks();
void waitForAllBackGroundTasks(); void waitForAllBackGroundTasks();
void waitForAllBackGroundTasksUnlocked(std::unique_lock<std::mutex> & bg_tasks_lock);
String bucket; String bucket;
String key; String key;

View File

@ -0,0 +1,113 @@
#include <Interpreters/AggregationUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
OutputBlockColumns prepareOutputBlockColumns(
const Aggregator::Params & params,
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions,
const Block & res_header,
Arenas & aggregates_pools,
bool final,
size_t rows)
{
MutableColumns key_columns(params.keys_size);
MutableColumns aggregate_columns(params.aggregates_size);
MutableColumns final_aggregate_columns(params.aggregates_size);
Aggregator::AggregateColumnsData aggregate_columns_data(params.aggregates_size);
for (size_t i = 0; i < params.keys_size; ++i)
{
key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
key_columns[i]->reserve(rows);
}
for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (!final)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
aggregate_columns[i] = res_header.getByName(aggregate_column_name).type->createColumn();
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
for (auto & pool : aggregates_pools)
column_aggregate_func.addArena(pool);
aggregate_columns_data[i] = &column_aggregate_func.getData();
aggregate_columns_data[i]->reserve(rows);
}
else
{
final_aggregate_columns[i] = aggregate_functions[i]->getReturnType()->createColumn();
final_aggregate_columns[i]->reserve(rows);
if (aggregate_functions[i]->isState())
{
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(final_aggregate_columns[i].get()))
for (auto & pool : aggregates_pools)
column_aggregate_func->addArena(pool);
/// Aggregate state can be wrapped into array if aggregate function ends with -Resample combinator.
final_aggregate_columns[i]->forEachSubcolumn(
[&aggregates_pools](auto & subcolumn)
{
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(subcolumn.get()))
for (auto & pool : aggregates_pools)
column_aggregate_func->addArena(pool);
});
}
}
}
if (key_columns.size() != params.keys_size)
throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};
std::vector<IColumn *> raw_key_columns;
raw_key_columns.reserve(key_columns.size());
for (auto & column : key_columns)
raw_key_columns.push_back(column.get());
return {
.key_columns = std::move(key_columns),
.raw_key_columns = std::move(raw_key_columns),
.aggregate_columns = std::move(aggregate_columns),
.final_aggregate_columns = std::move(final_aggregate_columns),
.aggregate_columns_data = std::move(aggregate_columns_data),
};
}
Block finalizeBlock(const Aggregator::Params & params, const Block & res_header, OutputBlockColumns && out_cols, bool final, size_t rows)
{
auto && [key_columns, raw_key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;
Block res = res_header.cloneEmpty();
for (size_t i = 0; i < params.keys_size; ++i)
res.getByPosition(i).column = std::move(key_columns[i]);
for (size_t i = 0; i < params.aggregates_size; ++i)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
if (final)
res.getByName(aggregate_column_name).column = std::move(final_aggregate_columns[i]);
else
res.getByName(aggregate_column_name).column = std::move(aggregate_columns[i]);
}
/// Change the size of the columns-constants in the block.
size_t columns = res_header.columns();
for (size_t i = 0; i < columns; ++i)
if (isColumnConst(*res.getByPosition(i).column))
res.getByPosition(i).column = res.getByPosition(i).column->cut(0, rows);
return res;
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Interpreters/Aggregator.h>
namespace DB
{
struct OutputBlockColumns
{
MutableColumns key_columns;
std::vector<IColumn *> raw_key_columns;
MutableColumns aggregate_columns;
MutableColumns final_aggregate_columns;
Aggregator::AggregateColumnsData aggregate_columns_data;
};
OutputBlockColumns prepareOutputBlockColumns(
const Aggregator::Params & params,
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions,
const Block & res_header,
Arenas & aggregates_pools,
bool final,
size_t rows);
Block finalizeBlock(const Aggregator::Params & params, const Block & res_header, OutputBlockColumns && out_cols, bool final, size_t rows);
}

View File

@ -34,6 +34,8 @@
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
#include <Interpreters/AggregationUtils.h>
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event ExternalAggregationWritePart; extern const Event ExternalAggregationWritePart;
@ -1587,16 +1589,10 @@ Block Aggregator::convertOneBucketToBlock(
bool final, bool final,
size_t bucket) const size_t bucket) const
{ {
Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(), // Used in ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform (expects single chunk for each bucket_id).
[bucket, &method, arena, this] ( constexpr bool return_single_block = true;
MutableColumns & key_columns, Block block = convertToBlockImpl<return_single_block>(
AggregateColumnsData & aggregate_columns, method, method.data.impls[bucket], arena, data_variants.aggregates_pools, final, method.data.impls[bucket].size());
MutableColumns & final_aggregate_columns,
bool final_)
{
convertToBlockImpl(method, method.data.impls[bucket],
key_columns, aggregate_columns, final_aggregate_columns, arena, final_);
});
block.info.bucket_num = bucket; block.info.bucket_num = bucket;
return block; return block;
@ -1702,26 +1698,17 @@ bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const
} }
template <typename Method, typename Table> template <bool return_single_block, typename Method, typename Table>
void Aggregator::convertToBlockImpl( Aggregator::ConvertToBlockRes<return_single_block>
Method & method, Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const
Table & data,
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final) const
{ {
if (data.empty()) if (data.empty())
return; {
auto && out_cols = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), aggregates_pools, final, rows);
return {finalizeBlock(params, getHeader(final), std::move(out_cols), final, rows)};
}
if (key_columns.size() != params.keys_size) ConvertToBlockRes<return_single_block> res;
throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};
std::vector<IColumn *> raw_key_columns;
raw_key_columns.reserve(key_columns.size());
for (auto & column : key_columns)
raw_key_columns.push_back(column.get());
if (final) if (final)
{ {
@ -1729,20 +1716,23 @@ void Aggregator::convertToBlockImpl(
if (compiled_aggregate_functions_holder) if (compiled_aggregate_functions_holder)
{ {
static constexpr bool use_compiled_functions = !Method::low_cardinality_optimization; static constexpr bool use_compiled_functions = !Method::low_cardinality_optimization;
convertToBlockImplFinal<Method, use_compiled_functions>(method, data, std::move(raw_key_columns), final_aggregate_columns, arena); res = convertToBlockImplFinal<Method, use_compiled_functions, return_single_block>(method, data, arena, aggregates_pools, rows);
} }
else else
#endif #endif
{ {
convertToBlockImplFinal<Method, false>(method, data, std::move(raw_key_columns), final_aggregate_columns, arena); res = convertToBlockImplFinal<Method, false, return_single_block>(method, data, arena, aggregates_pools, rows);
} }
} }
else else
{ {
convertToBlockImplNotFinal(method, data, std::move(raw_key_columns), aggregate_columns); res = convertToBlockImplNotFinal<return_single_block>(method, data, aggregates_pools, rows);
} }
/// In order to release memory early. /// In order to release memory early.
data.clearAndShrink(); data.clearAndShrink();
return res;
} }
@ -1811,38 +1801,9 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu
} }
template <typename Method, bool use_compiled_functions, typename Table> template <bool use_compiled_functions>
void NO_INLINE Aggregator::convertToBlockImplFinal( Block Aggregator::insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & places, OutputBlockColumns && out_cols, Arena * arena) const
Method & method,
Table & data,
std::vector<IColumn *> key_columns,
MutableColumns & final_aggregate_columns,
Arena * arena) const
{ {
if constexpr (Method::low_cardinality_optimization)
{
if (data.hasNullKeyData())
{
key_columns[0]->insertDefault();
insertAggregatesIntoColumns(data.getNullKeyData(), final_aggregate_columns, arena);
}
}
auto shuffled_key_sizes = method.shuffleKeyColumns(key_columns, key_sizes);
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
PaddedPODArray<AggregateDataPtr> places;
places.reserve(data.size());
data.forEachValue([&](const auto & key, auto & mapped)
{
method.insertKeyIntoColumns(key, key_columns, key_sizes_ref);
places.emplace_back(mapped);
/// Mark the cell as destroyed so it will not be destroyed in destructor.
mapped = nullptr;
});
std::exception_ptr exception; std::exception_ptr exception;
size_t aggregate_functions_destroy_index = 0; size_t aggregate_functions_destroy_index = 0;
@ -1863,7 +1824,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
if (!is_aggregate_function_compiled[i]) if (!is_aggregate_function_compiled[i])
continue; continue;
auto & final_aggregate_column = final_aggregate_columns[i]; auto & final_aggregate_column = out_cols.final_aggregate_columns[i];
final_aggregate_column = final_aggregate_column->cloneResized(places.size()); final_aggregate_column = final_aggregate_column->cloneResized(places.size());
columns_data.emplace_back(getColumnData(final_aggregate_column.get())); columns_data.emplace_back(getColumnData(final_aggregate_column.get()));
} }
@ -1884,7 +1845,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
} }
} }
auto & final_aggregate_column = final_aggregate_columns[aggregate_functions_destroy_index]; auto & final_aggregate_column = out_cols.final_aggregate_columns[aggregate_functions_destroy_index];
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index]; size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
/** We increase aggregate_functions_destroy_index because by function contract if insertResultIntoBatch /** We increase aggregate_functions_destroy_index because by function contract if insertResultIntoBatch
@ -1898,7 +1859,8 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
bool is_state = aggregate_functions[destroy_index]->isState(); bool is_state = aggregate_functions[destroy_index]->isState();
bool destroy_place_after_insert = !is_state; bool destroy_place_after_insert = !is_state;
aggregate_functions[destroy_index]->insertResultIntoBatch(0, places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert); aggregate_functions[destroy_index]->insertResultIntoBatch(
0, places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
} }
} }
catch (...) catch (...)
@ -1923,125 +1885,155 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
if (exception) if (exception)
std::rethrow_exception(exception); std::rethrow_exception(exception);
return finalizeBlock(params, getHeader(/* final */ true), std::move(out_cols), /* final */ true, places.size());
} }
template <typename Method, typename Table> template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
void NO_INLINE Aggregator::convertToBlockImplNotFinal( Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
Method & method, Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t) const
Table & data,
std::vector<IColumn *> key_columns,
AggregateColumnsData & aggregate_columns) const
{ {
if constexpr (Method::low_cardinality_optimization) const size_t max_block_size = params.max_block_size;
const bool final = true;
ConvertToBlockRes<return_single_block> res;
std::optional<OutputBlockColumns> out_cols;
std::optional<Sizes> shuffled_key_sizes;
PaddedPODArray<AggregateDataPtr> places;
auto init_out_cols = [&]()
{ {
if (data.hasNullKeyData()) out_cols = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), aggregates_pools, final, max_block_size);
if constexpr (Method::low_cardinality_optimization)
{ {
key_columns[0]->insertDefault(); if (data.hasNullKeyData())
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]);
data.getNullKeyData() = nullptr;
}
}
auto shuffled_key_sizes = method.shuffleKeyColumns(key_columns, key_sizes);
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
data.forEachValue([&](const auto & key, auto & mapped)
{
method.insertKeyIntoColumns(key, key_columns, key_sizes_ref);
/// reserved, so push_back does not throw exceptions
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(mapped + offsets_of_aggregate_states[i]);
mapped = nullptr;
});
}
template <typename Filler>
Block Aggregator::prepareBlockAndFill(
AggregatedDataVariants & data_variants,
bool final,
size_t rows,
Filler && filler) const
{
MutableColumns key_columns(params.keys_size);
MutableColumns aggregate_columns(params.aggregates_size);
MutableColumns final_aggregate_columns(params.aggregates_size);
AggregateColumnsData aggregate_columns_data(params.aggregates_size);
Block res_header = getHeader(final);
for (size_t i = 0; i < params.keys_size; ++i)
{
key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
key_columns[i]->reserve(rows);
}
for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (!final)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
aggregate_columns[i] = res_header.getByName(aggregate_column_name).type->createColumn();
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
for (auto & pool : data_variants.aggregates_pools)
column_aggregate_func.addArena(pool);
aggregate_columns_data[i] = &column_aggregate_func.getData();
aggregate_columns_data[i]->reserve(rows);
}
else
{
final_aggregate_columns[i] = aggregate_functions[i]->getReturnType()->createColumn();
final_aggregate_columns[i]->reserve(rows);
if (aggregate_functions[i]->isState())
{ {
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states. out_cols->key_columns[0]->insertDefault();
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(final_aggregate_columns[i].get())) insertAggregatesIntoColumns(data.getNullKeyData(), out_cols->final_aggregate_columns, arena);
for (auto & pool : data_variants.aggregates_pools) data.hasNullKeyData() = false;
column_aggregate_func->addArena(pool);
/// Aggregate state can be wrapped into array if aggregate function ends with -Resample combinator.
final_aggregate_columns[i]->forEachSubcolumn([&data_variants](auto & subcolumn)
{
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(subcolumn.get()))
for (auto & pool : data_variants.aggregates_pools)
column_aggregate_func->addArena(pool);
});
} }
} }
}
filler(key_columns, aggregate_columns_data, final_aggregate_columns, final); shuffled_key_sizes = method.shuffleKeyColumns(out_cols->raw_key_columns, key_sizes);
Block res = res_header.cloneEmpty(); places.reserve(max_block_size);
};
for (size_t i = 0; i < params.keys_size; ++i) // should be invoked at least once, because null data might be the only content of the `data`
res.getByPosition(i).column = std::move(key_columns[i]); init_out_cols();
for (size_t i = 0; i < params.aggregates_size; ++i) data.forEachValue(
[&](const auto & key, auto & mapped)
{
if (!out_cols.has_value())
init_out_cols();
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
places.emplace_back(mapped);
/// Mark the cell as destroyed so it will not be destroyed in destructor.
mapped = nullptr;
if constexpr (!return_single_block)
{
if (places.size() >= max_block_size)
{
res.emplace_back(insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena));
places.clear();
out_cols.reset();
}
}
});
if constexpr (return_single_block)
{ {
const auto & aggregate_column_name = params.aggregates[i].column_name; return insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena);
if (final)
res.getByName(aggregate_column_name).column = std::move(final_aggregate_columns[i]);
else
res.getByName(aggregate_column_name).column = std::move(aggregate_columns[i]);
} }
else
{
if (out_cols.has_value())
res.emplace_back(insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena));
return res;
}
}
/// Change the size of the columns-constants in the block. template <bool return_single_block, typename Method, typename Table>
size_t columns = res_header.columns(); Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
for (size_t i = 0; i < columns; ++i) Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t) const
if (isColumnConst(*res.getByPosition(i).column)) {
res.getByPosition(i).column = res.getByPosition(i).column->cut(0, rows); const size_t max_block_size = params.max_block_size;
const bool final = false;
ConvertToBlockRes<return_single_block> res;
std::optional<OutputBlockColumns> out_cols;
std::optional<Sizes> shuffled_key_sizes;
auto init_out_cols = [&]()
{
out_cols = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), aggregates_pools, final, max_block_size);
if constexpr (Method::low_cardinality_optimization)
{
if (data.hasNullKeyData())
{
out_cols->raw_key_columns[0]->insertDefault();
for (size_t i = 0; i < params.aggregates_size; ++i)
out_cols->aggregate_columns_data[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]);
data.getNullKeyData() = nullptr;
data.hasNullKeyData() = false;
}
}
shuffled_key_sizes = method.shuffleKeyColumns(out_cols->raw_key_columns, key_sizes);
};
// should be invoked at least once, because null data might be the only content of the `data`
init_out_cols();
size_t rows_in_current_block = 0;
data.forEachValue(
[&](const auto & key, auto & mapped)
{
if (!out_cols.has_value())
init_out_cols();
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
/// reserved, so push_back does not throw exceptions
for (size_t i = 0; i < params.aggregates_size; ++i)
out_cols->aggregate_columns_data[i]->push_back(mapped + offsets_of_aggregate_states[i]);
mapped = nullptr;
++rows_in_current_block;
if constexpr (!return_single_block)
{
if (rows_in_current_block >= max_block_size)
{
res.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols.value()), final, rows_in_current_block));
out_cols.reset();
rows_in_current_block = 0;
}
}
});
if constexpr (return_single_block)
{
return finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block);
}
else
{
if (rows_in_current_block)
res.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block));
return res;
}
return res; return res;
} }
@ -2105,39 +2097,35 @@ void Aggregator::createStatesAndFillKeyColumnsWithSingleKey(
Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const
{ {
size_t rows = 1; size_t rows = 1;
auto && out_cols
= prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), data_variants.aggregates_pools, final, rows);
auto && [key_columns, raw_key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;
auto filler = [&data_variants, this]( if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
bool final_)
{ {
if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row) AggregatedDataWithoutKey & data = data_variants.without_key;
if (!data)
throw Exception("Wrong data variant passed.", ErrorCodes::LOGICAL_ERROR);
if (!final)
{ {
AggregatedDataWithoutKey & data = data_variants.without_key; for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns_data[i]->push_back(data + offsets_of_aggregate_states[i]);
if (!data) data = nullptr;
throw Exception("Wrong data variant passed.", ErrorCodes::LOGICAL_ERROR); }
else
if (!final_) {
{ /// Always single-thread. It's safe to pass current arena from 'aggregates_pool'.
for (size_t i = 0; i < params.aggregates_size; ++i) insertAggregatesIntoColumns(data, final_aggregate_columns, data_variants.aggregates_pool);
aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]);
data = nullptr;
}
else
{
/// Always single-thread. It's safe to pass current arena from 'aggregates_pool'.
insertAggregatesIntoColumns(data, final_aggregate_columns, data_variants.aggregates_pool);
}
if (params.overflow_row)
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i]->insertDefault();
} }
};
Block block = prepareBlockAndFill(data_variants, final, rows, filler); if (params.overflow_row)
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i]->insertDefault();
}
Block block = finalizeBlock(params, getHeader(final), std::move(out_cols), final, rows);
if (is_overflows) if (is_overflows)
block.info.is_overflows = true; block.info.is_overflows = true;
@ -2148,29 +2136,22 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
return block; return block;
} }
Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const template <bool return_single_block>
Aggregator::ConvertToBlockRes<return_single_block>
Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const
{ {
size_t rows = data_variants.sizeWithoutOverflowRow(); const size_t rows = data_variants.sizeWithoutOverflowRow();
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
{ \
return convertToBlockImpl<return_single_block>( \
*data_variants.NAME, data_variants.NAME->data, data_variants.aggregates_pool, data_variants.aggregates_pools, final, rows); \
}
auto filler = [&data_variants, this]( if (false) {} // NOLINT
MutableColumns & key_columns, APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
AggregateColumnsData & aggregate_columns, #undef M
MutableColumns & final_aggregate_columns, else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
bool final_)
{
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \
key_columns, aggregate_columns, final_aggregate_columns, data_variants.aggregates_pool, final_);
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
};
return prepareBlockAndFill(data_variants, final, rows, filler);
} }
@ -2292,7 +2273,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
if (data_variants.type != AggregatedDataVariants::Type::without_key) if (data_variants.type != AggregatedDataVariants::Type::without_key)
{ {
if (!data_variants.isTwoLevel()) if (!data_variants.isTwoLevel())
blocks.emplace_back(prepareBlockAndFillSingleLevel(data_variants, final)); blocks.splice(blocks.end(), prepareBlockAndFillSingleLevel</* return_single_block */ false>(data_variants, final));
else else
blocks.splice(blocks.end(), prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get())); blocks.splice(blocks.end(), prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get()));
} }
@ -3044,9 +3025,15 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
Block block; Block block;
if (result.type == AggregatedDataVariants::Type::without_key || is_overflows) if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
{
block = prepareBlockAndFillWithoutKey(result, final, is_overflows); block = prepareBlockAndFillWithoutKey(result, final, is_overflows);
}
else else
block = prepareBlockAndFillSingleLevel(result, final); {
// Used during memory efficient merging (SortingAggregatedTransform expects single chunk for each bucket_id).
constexpr bool return_single_block = true;
block = prepareBlockAndFillSingleLevel<return_single_block>(result, final);
}
/// NOTE: two-level data is not possible here - chooseAggregationMethod chooses only among single-level methods. /// NOTE: two-level data is not possible here - chooseAggregationMethod chooses only among single-level methods.
if (!final) if (!final)
@ -3247,4 +3234,6 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) cons
} }
template Aggregator::ConvertToBlockRes<false>
Aggregator::prepareBlockAndFillSingleLevel<false>(AggregatedDataVariants & data_variants, bool final) const;
} }

View File

@ -1,8 +1,9 @@
#pragma once #pragma once
#include <mutex>
#include <memory>
#include <functional> #include <functional>
#include <memory>
#include <mutex>
#include <type_traits>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -872,6 +873,7 @@ using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants
class CompiledAggregateFunctionsHolder; class CompiledAggregateFunctionsHolder;
class NativeWriter; class NativeWriter;
struct OutputBlockColumns;
/** How are "total" values calculated with WITH TOTALS? /** How are "total" values calculated with WITH TOTALS?
* (For more details, see TotalsHavingTransform.) * (For more details, see TotalsHavingTransform.)
@ -933,6 +935,8 @@ public:
bool compile_aggregate_expressions; bool compile_aggregate_expressions;
size_t min_count_to_compile_aggregate_expression; size_t min_count_to_compile_aggregate_expression;
size_t max_block_size;
bool only_merge; bool only_merge;
struct StatsCollectingParams struct StatsCollectingParams
@ -969,6 +973,7 @@ public:
size_t min_free_disk_space_, size_t min_free_disk_space_,
bool compile_aggregate_expressions_, bool compile_aggregate_expressions_,
size_t min_count_to_compile_aggregate_expression_, size_t min_count_to_compile_aggregate_expression_,
size_t max_block_size_,
bool only_merge_ = false, // true for projections bool only_merge_ = false, // true for projections
const StatsCollectingParams & stats_collecting_params_ = {}) const StatsCollectingParams & stats_collecting_params_ = {})
: keys(keys_) : keys(keys_)
@ -987,15 +992,16 @@ public:
, min_free_disk_space(min_free_disk_space_) , min_free_disk_space(min_free_disk_space_)
, compile_aggregate_expressions(compile_aggregate_expressions_) , compile_aggregate_expressions(compile_aggregate_expressions_)
, min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_) , min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
, max_block_size(max_block_size_)
, only_merge(only_merge_) , only_merge(only_merge_)
, stats_collecting_params(stats_collecting_params_) , stats_collecting_params(stats_collecting_params_)
{ {
} }
/// Only parameters that matter during merge. /// Only parameters that matter during merge.
Params(const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_) Params(const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_, size_t max_block_size_)
: Params( : Params(
keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, true, {}) keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, max_block_size_, true, {})
{ {
} }
@ -1277,15 +1283,12 @@ private:
void mergeSingleLevelDataImpl( void mergeSingleLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data) const; ManyAggregatedDataVariants & non_empty_data) const;
template <typename Method, typename Table> template <bool return_single_block>
void convertToBlockImpl( using ConvertToBlockRes = std::conditional_t<return_single_block, Block, BlocksList>;
Method & method,
Table & data, template <bool return_single_block, typename Method, typename Table>
MutableColumns & key_columns, ConvertToBlockRes<return_single_block>
AggregateColumnsData & aggregate_columns, convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const;
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final) const;
template <typename Mapped> template <typename Mapped>
void insertAggregatesIntoColumns( void insertAggregatesIntoColumns(
@ -1293,27 +1296,16 @@ private:
MutableColumns & final_aggregate_columns, MutableColumns & final_aggregate_columns,
Arena * arena) const; Arena * arena) const;
template <typename Method, bool use_compiled_functions, typename Table> template <bool use_compiled_functions>
void convertToBlockImplFinal( Block insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & places, OutputBlockColumns && out_cols, Arena * arena) const;
Method & method,
Table & data,
std::vector<IColumn *> key_columns,
MutableColumns & final_aggregate_columns,
Arena * arena) const;
template <typename Method, typename Table> template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
void convertToBlockImplNotFinal( ConvertToBlockRes<return_single_block>
Method & method, convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t rows) const;
Table & data,
std::vector<IColumn *> key_columns,
AggregateColumnsData & aggregate_columns) const;
template <typename Filler> template <bool return_single_block, typename Method, typename Table>
Block prepareBlockAndFill( ConvertToBlockRes<return_single_block>
AggregatedDataVariants & data_variants, convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t rows) const;
bool final,
size_t rows,
Filler && filler) const;
template <typename Method> template <typename Method>
Block convertOneBucketToBlock( Block convertOneBucketToBlock(
@ -1331,9 +1323,11 @@ private:
std::atomic<bool> * is_cancelled = nullptr) const; std::atomic<bool> * is_cancelled = nullptr) const;
Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const; Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
Block prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const; BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const;
template <bool return_single_block>
ConvertToBlockRes<return_single_block> prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
template <typename Method> template <typename Method>
BlocksList prepareBlocksAndFillTwoLevelImpl( BlocksList prepareBlocksAndFillTwoLevelImpl(
AggregatedDataVariants & data_variants, AggregatedDataVariants & data_variants,

View File

@ -122,6 +122,7 @@ void FileCache::initialize()
fs::create_directories(cache_base_path); fs::create_directories(cache_base_path);
} }
status_file = make_unique<StatusFile>(fs::path(cache_base_path) / "status", StatusFile::write_full_info);
is_initialized = true; is_initialized = true;
} }
} }
@ -963,12 +964,19 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock
fs::directory_iterator key_prefix_it{cache_base_path}; fs::directory_iterator key_prefix_it{cache_base_path};
for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it) for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it)
{ {
if (!key_prefix_it->is_directory())
{
if (key_prefix_it->path().filename() != "status")
LOG_DEBUG(log, "Unexpected file {} (not a directory), will skip it", key_prefix_it->path().string());
continue;
}
fs::directory_iterator key_it{key_prefix_it->path()}; fs::directory_iterator key_it{key_prefix_it->path()};
for (; key_it != fs::directory_iterator(); ++key_it) for (; key_it != fs::directory_iterator(); ++key_it)
{ {
if (!key_it->is_directory()) if (!key_it->is_directory())
{ {
LOG_WARNING(log, "Unexpected file: {}. Expected a directory", key_it->path().string()); LOG_DEBUG(log, "Unexpected file {} (not a directory), will skip it", key_it->path().string());
continue; continue;
} }

View File

@ -18,6 +18,7 @@
#include <Interpreters/Cache/IFileCachePriority.h> #include <Interpreters/Cache/IFileCachePriority.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Interpreters/Cache/FileCacheKey.h> #include <Interpreters/Cache/FileCacheKey.h>
#include <Common/StatusFile.h>
namespace DB namespace DB
{ {
@ -143,6 +144,7 @@ private:
bool is_initialized = false; bool is_initialized = false;
std::exception_ptr initialization_exception; std::exception_ptr initialization_exception;
std::unique_ptr<StatusFile> status_file;
mutable std::mutex mutex; mutable std::mutex mutex;

View File

@ -1763,7 +1763,7 @@ static void executeMergeAggregatedImpl(
* but it can work more slowly. * but it can work more slowly.
*/ */
Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads); Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads, settings.max_block_size);
auto merging_aggregated = std::make_unique<MergingAggregatedStep>( auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(), query_plan.getCurrentDataStream(),
@ -2359,6 +2359,7 @@ static Aggregator::Params getAggregatorParams(
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions, settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression, settings.min_count_to_compile_aggregate_expression,
settings.max_block_size,
/* only_merge */ false, /* only_merge */ false,
stats_collecting_params stats_collecting_params
}; };

View File

@ -1,16 +1,17 @@
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
#include <DataTypes/IDataType.h>
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTQueryParameter.h>
#include <Interpreters/IdentifierSemantic.h> #include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h> #include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Interpreters/addTypeConversionToAST.h> #include <Interpreters/addTypeConversionToAST.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTQueryParameter.h>
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
namespace DB namespace DB
@ -30,7 +31,12 @@ void ReplaceQueryParameterVisitor::visit(ASTPtr & ast)
else if (ast->as<ASTIdentifier>() || ast->as<ASTTableIdentifier>()) else if (ast->as<ASTIdentifier>() || ast->as<ASTTableIdentifier>())
visitIdentifier(ast); visitIdentifier(ast);
else else
visitChildren(ast); {
if (auto * describe_query = dynamic_cast<ASTDescribeQuery *>(ast.get()); describe_query && describe_query->table_expression)
visitChildren(describe_query->table_expression);
else
visitChildren(ast);
}
} }

View File

@ -403,7 +403,13 @@ void Set::checkColumnsNumber(size_t num_key_columns) const
bool Set::areTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const bool Set::areTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const
{ {
return removeNullable(recursiveRemoveLowCardinality(data_types[set_type_idx]))->equals(*removeNullable(recursiveRemoveLowCardinality(other_type))); /// Out-of-bound access can happen when same set expression built with different columns.
/// Caller may call this method to make sure that the set is indeed the one they want
/// without awaring data_types.size().
if (set_type_idx >= data_types.size())
return false;
return removeNullable(recursiveRemoveLowCardinality(data_types[set_type_idx]))
->equals(*removeNullable(recursiveRemoveLowCardinality(other_type)));
} }
void Set::checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const void Set::checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const

View File

@ -521,10 +521,15 @@ void removeUnneededColumnsFromSelectClause(ASTSelectQuery * select_query, const
++new_elements_size; ++new_elements_size;
} }
/// removing aggregation can change number of rows, so `count()` result in outer sub-query would be wrong /// removing aggregation can change number of rows, so `count()` result in outer sub-query would be wrong
if (func && AggregateUtils::isAggregateFunction(*func) && !select_query->groupBy()) if (func && !select_query->groupBy())
{ {
new_elements[result_index] = elem; GetAggregatesVisitor::Data data = {};
++new_elements_size; GetAggregatesVisitor(data).visit(elem);
if (!data.aggregates.empty())
{
new_elements[result_index] = elem;
++new_elements_size;
}
} }
} }
} }

View File

@ -89,7 +89,6 @@ TEST(FileCache, get)
{ {
if (fs::exists(cache_base_path)) if (fs::exists(cache_base_path))
fs::remove_all(cache_base_path); fs::remove_all(cache_base_path);
fs::create_directories(cache_base_path);
DB::ThreadStatus thread_status; DB::ThreadStatus thread_status;
@ -103,373 +102,376 @@ TEST(FileCache, get)
DB::FileCacheSettings settings; DB::FileCacheSettings settings;
settings.max_size = 30; settings.max_size = 30;
settings.max_elements = 5; settings.max_elements = 5;
auto cache = DB::FileCache(cache_base_path, settings);
cache.initialize();
auto key = cache.hash("key1");
{ {
auto holder = cache.getOrSet(key, 0, 10, false); /// Add range [0, 9] auto cache = DB::FileCache(cache_base_path, settings);
auto segments = fromHolder(holder); cache.initialize();
/// Range was not present in cache. It should be added in cache as one while file segment. auto key = cache.hash("key1");
ASSERT_EQ(segments.size(), 1);
assertRange(1, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::EMPTY); {
auto holder = cache.getOrSet(key, 0, 10, false); /// Add range [0, 9]
auto segments = fromHolder(holder);
/// Range was not present in cache. It should be added in cache as one while file segment.
ASSERT_EQ(segments.size(), 1);
/// Exception because space not reserved. assertRange(1, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::EMPTY);
/// EXPECT_THROW(download(segments[0]), DB::Exception);
/// Exception because space can be reserved only by downloader
/// EXPECT_THROW(segments[0]->reserve(segments[0]->range().size()), DB::Exception);
ASSERT_TRUE(segments[0]->getOrSetDownloader() == DB::FileSegment::getCallerId()); /// Exception because space not reserved.
ASSERT_TRUE(segments[0]->reserve(segments[0]->range().size())); /// EXPECT_THROW(download(segments[0]), DB::Exception);
assertRange(2, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADING); /// Exception because space can be reserved only by downloader
/// EXPECT_THROW(segments[0]->reserve(segments[0]->range().size()), DB::Exception);
download(segments[0]); ASSERT_TRUE(segments[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
segments[0]->completeWithState(DB::FileSegment::State::DOWNLOADED); ASSERT_TRUE(segments[0]->reserve(segments[0]->range().size()));
assertRange(3, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED); assertRange(2, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADING);
}
/// Current cache: [__________] download(segments[0]);
/// ^ ^ segments[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
/// 0 9 assertRange(3, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
ASSERT_EQ(cache.getFileSegmentsNum(), 1); }
ASSERT_EQ(cache.getUsedCacheSize(), 10);
{ /// Current cache: [__________]
/// Want range [5, 14], but [0, 9] already in cache, so only [10, 14] will be put in cache. /// ^ ^
auto holder = cache.getOrSet(key, 5, 10, false); /// 0 9
auto segments = fromHolder(holder); ASSERT_EQ(cache.getFileSegmentsNum(), 1);
ASSERT_EQ(segments.size(), 2); ASSERT_EQ(cache.getUsedCacheSize(), 10);
assertRange(4, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED); {
assertRange(5, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::EMPTY); /// Want range [5, 14], but [0, 9] already in cache, so only [10, 14] will be put in cache.
auto holder = cache.getOrSet(key, 5, 10, false);
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 2);
ASSERT_TRUE(segments[1]->getOrSetDownloader() == DB::FileSegment::getCallerId()); assertRange(4, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
prepareAndDownload(segments[1]); assertRange(5, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::EMPTY);
segments[1]->completeWithState(DB::FileSegment::State::DOWNLOADED);
assertRange(6, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
}
/// Current cache: [__________][_____] ASSERT_TRUE(segments[1]->getOrSetDownloader() == DB::FileSegment::getCallerId());
/// ^ ^^ ^ prepareAndDownload(segments[1]);
/// 0 910 14 segments[1]->completeWithState(DB::FileSegment::State::DOWNLOADED);
ASSERT_EQ(cache.getFileSegmentsNum(), 2); assertRange(6, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
ASSERT_EQ(cache.getUsedCacheSize(), 15); }
{ /// Current cache: [__________][_____]
auto holder = cache.getOrSet(key, 9, 1, false); /// Get [9, 9] /// ^ ^^ ^
auto segments = fromHolder(holder); /// 0 910 14
ASSERT_EQ(segments.size(), 1); ASSERT_EQ(cache.getFileSegmentsNum(), 2);
assertRange(7, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED); ASSERT_EQ(cache.getUsedCacheSize(), 15);
}
{ {
auto holder = cache.getOrSet(key, 9, 2, false); /// Get [9, 10] auto holder = cache.getOrSet(key, 9, 1, false); /// Get [9, 9]
auto segments = fromHolder(holder); auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 2); ASSERT_EQ(segments.size(), 1);
assertRange(8, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED); assertRange(7, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
assertRange(9, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED); }
}
{ {
auto holder = cache.getOrSet(key, 10, 1, false); /// Get [10, 10] auto holder = cache.getOrSet(key, 9, 2, false); /// Get [9, 10]
auto segments = fromHolder(holder); auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1); ASSERT_EQ(segments.size(), 2);
assertRange(10, segments[0], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED); assertRange(8, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
} assertRange(9, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
}
complete(cache.getOrSet(key, 17, 4, false)); /// Get [17, 20] {
complete(cache.getOrSet(key, 24, 3, false)); /// Get [24, 26] auto holder = cache.getOrSet(key, 10, 1, false); /// Get [10, 10]
/// complete(cache.getOrSet(key, 27, 1, false)); /// Get [27, 27] auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
assertRange(10, segments[0], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
}
/// Current cache: [__________][_____] [____] [___][] complete(cache.getOrSet(key, 17, 4, false)); /// Get [17, 20]
/// ^ ^^ ^ ^ ^ ^ ^^^ complete(cache.getOrSet(key, 24, 3, false)); /// Get [24, 26]
/// 0 910 14 17 20 24 2627 /// complete(cache.getOrSet(key, 27, 1, false)); /// Get [27, 27]
///
ASSERT_EQ(cache.getFileSegmentsNum(), 4);
ASSERT_EQ(cache.getUsedCacheSize(), 22);
{ /// Current cache: [__________][_____] [____] [___][]
auto holder = cache.getOrSet(key, 0, 26, false); /// Get [0, 25] /// ^ ^^ ^ ^ ^ ^ ^^^
auto segments = fromHolder(holder); /// 0 910 14 17 20 24 2627
ASSERT_EQ(segments.size(), 6);
assertRange(11, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
assertRange(12, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
/// Missing [15, 16] should be added in cache.
assertRange(13, segments[2], DB::FileSegment::Range(15, 16), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments[2]);
segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED);
assertRange(14, segments[3], DB::FileSegment::Range(17, 20), DB::FileSegment::State::DOWNLOADED);
/// New [21, 23], but will not be added in cache because of elements limit (5)
assertRange(15, segments[4], DB::FileSegment::Range(21, 23), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(segments[4]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_FALSE(segments[4]->reserve(1));
assertRange(16, segments[5], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
/// Current cache: [__________][_____][ ][____] [___]
/// ^ ^ ^
/// 0 20 24
/// ///
ASSERT_EQ(cache.getFileSegmentsNum(), 4);
ASSERT_EQ(cache.getUsedCacheSize(), 22);
/// Range [27, 27] must be evicted in previous getOrSet [0, 25]. {
/// Let's not invalidate pointers to returned segments from range [0, 25] and auto holder = cache.getOrSet(key, 0, 26, false); /// Get [0, 25]
/// as max elements size is reached, next attempt to put something in cache should fail. auto segments = fromHolder(holder);
/// This will also check that [27, 27] was indeed evicted. ASSERT_EQ(segments.size(), 6);
auto holder1 = cache.getOrSet(key, 27, 1, false); assertRange(11, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
auto segments_1 = fromHolder(holder1); /// Get [27, 27] assertRange(12, segments[1], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
ASSERT_EQ(segments_1.size(), 1);
assertRange(17, segments_1[0], DB::FileSegment::Range(27, 27), DB::FileSegment::State::EMPTY);
}
{ /// Missing [15, 16] should be added in cache.
auto holder = cache.getOrSet(key, 12, 10, false); /// Get [12, 21] assertRange(13, segments[2], DB::FileSegment::Range(15, 16), DB::FileSegment::State::EMPTY);
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 4);
assertRange(18, segments[0], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED); ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
assertRange(19, segments[1], DB::FileSegment::Range(15, 16), DB::FileSegment::State::DOWNLOADED); prepareAndDownload(segments[2]);
assertRange(20, segments[2], DB::FileSegment::Range(17, 20), DB::FileSegment::State::DOWNLOADED);
assertRange(21, segments[3], DB::FileSegment::Range(21, 21), DB::FileSegment::State::EMPTY); segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED);
ASSERT_TRUE(segments[3]->getOrSetDownloader() == DB::FileSegment::getCallerId()); assertRange(14, segments[3], DB::FileSegment::Range(17, 20), DB::FileSegment::State::DOWNLOADED);
prepareAndDownload(segments[3]);
segments[3]->completeWithState(DB::FileSegment::State::DOWNLOADED); /// New [21, 23], but will not be added in cache because of elements limit (5)
ASSERT_TRUE(segments[3]->state() == DB::FileSegment::State::DOWNLOADED); assertRange(15, segments[4], DB::FileSegment::Range(21, 23), DB::FileSegment::State::EMPTY);
} ASSERT_TRUE(segments[4]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_FALSE(segments[4]->reserve(1));
/// Current cache: [_____][__][____][_] [___] assertRange(16, segments[5], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
/// ^ ^ ^ ^ ^
/// 10 17 21 24 26
ASSERT_EQ(cache.getFileSegmentsNum(), 5); /// Current cache: [__________][_____][ ][____] [___]
/// ^ ^ ^
/// 0 20 24
///
{ /// Range [27, 27] must be evicted in previous getOrSet [0, 25].
auto holder = cache.getOrSet(key, 23, 5, false); /// Get [23, 28] /// Let's not invalidate pointers to returned segments from range [0, 25] and
auto segments = fromHolder(holder); /// as max elements size is reached, next attempt to put something in cache should fail.
ASSERT_EQ(segments.size(), 3); /// This will also check that [27, 27] was indeed evicted.
assertRange(22, segments[0], DB::FileSegment::Range(23, 23), DB::FileSegment::State::EMPTY); auto holder1 = cache.getOrSet(key, 27, 1, false);
assertRange(23, segments[1], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED); auto segments_1 = fromHolder(holder1); /// Get [27, 27]
assertRange(24, segments[2], DB::FileSegment::Range(27, 27), DB::FileSegment::State::EMPTY); ASSERT_EQ(segments_1.size(), 1);
assertRange(17, segments_1[0], DB::FileSegment::Range(27, 27), DB::FileSegment::State::EMPTY);
}
ASSERT_TRUE(segments[0]->getOrSetDownloader() == DB::FileSegment::getCallerId()); {
ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId()); auto holder = cache.getOrSet(key, 12, 10, false); /// Get [12, 21]
prepareAndDownload(segments[0]); auto segments = fromHolder(holder);
prepareAndDownload(segments[2]); ASSERT_EQ(segments.size(), 4);
segments[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED);
}
/// Current cache: [____][_] [][___][__] assertRange(18, segments[0], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
/// ^ ^ ^^^ ^^ ^ assertRange(19, segments[1], DB::FileSegment::Range(15, 16), DB::FileSegment::State::DOWNLOADED);
/// 17 21 2324 26 28 assertRange(20, segments[2], DB::FileSegment::Range(17, 20), DB::FileSegment::State::DOWNLOADED);
{ assertRange(21, segments[3], DB::FileSegment::Range(21, 21), DB::FileSegment::State::EMPTY);
auto holder5 = cache.getOrSet(key, 2, 3,false); /// Get [2, 4]
auto s5 = fromHolder(holder5);
ASSERT_EQ(s5.size(), 1);
assertRange(25, s5[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::EMPTY);
auto holder1 = cache.getOrSet(key, 30, 2, false); /// Get [30, 31] ASSERT_TRUE(segments[3]->getOrSetDownloader() == DB::FileSegment::getCallerId());
auto s1 = fromHolder(holder1); prepareAndDownload(segments[3]);
ASSERT_EQ(s1.size(), 1);
assertRange(26, s1[0], DB::FileSegment::Range(30, 31), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(s5[0]->getOrSetDownloader() == DB::FileSegment::getCallerId()); segments[3]->completeWithState(DB::FileSegment::State::DOWNLOADED);
ASSERT_TRUE(s1[0]->getOrSetDownloader() == DB::FileSegment::getCallerId()); ASSERT_TRUE(segments[3]->state() == DB::FileSegment::State::DOWNLOADED);
prepareAndDownload(s5[0]); }
prepareAndDownload(s1[0]);
s5[0]->completeWithState(DB::FileSegment::State::DOWNLOADED); /// Current cache: [_____][__][____][_] [___]
s1[0]->completeWithState(DB::FileSegment::State::DOWNLOADED); /// ^ ^ ^ ^ ^
/// 10 17 21 24 26
ASSERT_EQ(cache.getFileSegmentsNum(), 5);
{
auto holder = cache.getOrSet(key, 23, 5, false); /// Get [23, 28]
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 3);
assertRange(22, segments[0], DB::FileSegment::Range(23, 23), DB::FileSegment::State::EMPTY);
assertRange(23, segments[1], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
assertRange(24, segments[2], DB::FileSegment::Range(27, 27), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(segments[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments[0]);
prepareAndDownload(segments[2]);
segments[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED);
}
/// Current cache: [____][_] [][___][__]
/// ^ ^ ^^^ ^^ ^
/// 17 21 2324 26 28
{
auto holder5 = cache.getOrSet(key, 2, 3,false); /// Get [2, 4]
auto s5 = fromHolder(holder5);
ASSERT_EQ(s5.size(), 1);
assertRange(25, s5[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::EMPTY);
auto holder1 = cache.getOrSet(key, 30, 2, false); /// Get [30, 31]
auto s1 = fromHolder(holder1);
ASSERT_EQ(s1.size(), 1);
assertRange(26, s1[0], DB::FileSegment::Range(30, 31), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(s5[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(s1[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(s5[0]);
prepareAndDownload(s1[0]);
s5[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
s1[0]->completeWithState(DB::FileSegment::State::DOWNLOADED);
/// Current cache: [___] [_][___][_] [__]
/// ^ ^ ^ ^ ^ ^ ^ ^
/// 2 4 23 24 26 27 30 31
auto holder2 = cache.getOrSet(key, 23, 1, false); /// Get [23, 23]
auto s2 = fromHolder(holder2);
ASSERT_EQ(s2.size(), 1);
auto holder3 = cache.getOrSet(key, 24, 3, false); /// Get [24, 26]
auto s3 = fromHolder(holder3);
ASSERT_EQ(s3.size(), 1);
auto holder4 = cache.getOrSet(key, 27, 1, false); /// Get [27, 27]
auto s4 = fromHolder(holder4);
ASSERT_EQ(s4.size(), 1);
/// All cache is now unreleasable because pointers are still hold
auto holder6 = cache.getOrSet(key, 0, 40, false);
auto f = fromHolder(holder6);
ASSERT_EQ(f.size(), 9);
assertRange(27, f[0], DB::FileSegment::Range(0, 1), DB::FileSegment::State::EMPTY);
assertRange(28, f[2], DB::FileSegment::Range(5, 22), DB::FileSegment::State::EMPTY);
assertRange(29, f[6], DB::FileSegment::Range(28, 29), DB::FileSegment::State::EMPTY);
assertRange(30, f[8], DB::FileSegment::Range(32, 39), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(f[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(f[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(f[6]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(f[8]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_FALSE(f[0]->reserve(1));
ASSERT_FALSE(f[2]->reserve(1));
ASSERT_FALSE(f[6]->reserve(1));
ASSERT_FALSE(f[8]->reserve(1));
}
{
auto holder = cache.getOrSet(key, 2, 3, false); /// Get [2, 4]
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
assertRange(31, segments[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
}
/// Current cache: [___] [_][___][_] [__] /// Current cache: [___] [_][___][_] [__]
/// ^ ^ ^ ^ ^ ^ ^ ^ /// ^ ^ ^ ^ ^ ^ ^ ^
/// 2 4 23 24 26 27 30 31 /// 2 4 23 24 26 27 30 31
auto holder2 = cache.getOrSet(key, 23, 1, false); /// Get [23, 23]
auto s2 = fromHolder(holder2);
ASSERT_EQ(s2.size(), 1);
auto holder3 = cache.getOrSet(key, 24, 3, false); /// Get [24, 26]
auto s3 = fromHolder(holder3);
ASSERT_EQ(s3.size(), 1);
auto holder4 = cache.getOrSet(key, 27, 1, false); /// Get [27, 27]
auto s4 = fromHolder(holder4);
ASSERT_EQ(s4.size(), 1);
/// All cache is now unreleasable because pointers are still hold
auto holder6 = cache.getOrSet(key, 0, 40, false);
auto f = fromHolder(holder6);
ASSERT_EQ(f.size(), 9);
assertRange(27, f[0], DB::FileSegment::Range(0, 1), DB::FileSegment::State::EMPTY);
assertRange(28, f[2], DB::FileSegment::Range(5, 22), DB::FileSegment::State::EMPTY);
assertRange(29, f[6], DB::FileSegment::Range(28, 29), DB::FileSegment::State::EMPTY);
assertRange(30, f[8], DB::FileSegment::Range(32, 39), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(f[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(f[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(f[6]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(f[8]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_FALSE(f[0]->reserve(1));
ASSERT_FALSE(f[2]->reserve(1));
ASSERT_FALSE(f[6]->reserve(1));
ASSERT_FALSE(f[8]->reserve(1));
}
{
auto holder = cache.getOrSet(key, 2, 3, false); /// Get [2, 4]
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
assertRange(31, segments[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
}
/// Current cache: [___] [_][___][_] [__]
/// ^ ^ ^ ^ ^ ^ ^ ^
/// 2 4 23 24 26 27 30 31
{
auto holder = cache.getOrSet(key, 25, 5, false); /// Get [25, 29]
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 3);
assertRange(32, segments[0], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
assertRange(33, segments[1], DB::FileSegment::Range(27, 27), DB::FileSegment::State::DOWNLOADED);
assertRange(34, segments[2], DB::FileSegment::Range(28, 29), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADING);
bool lets_start_download = false;
std::mutex mutex;
std::condition_variable cv;
std::thread other_1([&]
{ {
DB::ThreadStatus thread_status_1; auto holder = cache.getOrSet(key, 25, 5, false); /// Get [25, 29]
auto query_context_1 = DB::Context::createCopy(getContext().context); auto segments = fromHolder(holder);
query_context_1->makeQueryContext();
query_context_1->setCurrentQueryId("query_id_1");
DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1);
thread_status_1.attachQueryContext(query_context_1);
auto holder_2 = cache.getOrSet(key, 25, 5, false); /// Get [25, 29] once again.
auto segments_2 = fromHolder(holder_2);
ASSERT_EQ(segments.size(), 3); ASSERT_EQ(segments.size(), 3);
assertRange(35, segments_2[0], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED); assertRange(32, segments[0], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
assertRange(36, segments_2[1], DB::FileSegment::Range(27, 27), DB::FileSegment::State::DOWNLOADED); assertRange(33, segments[1], DB::FileSegment::Range(27, 27), DB::FileSegment::State::DOWNLOADED);
assertRange(37, segments_2[2], DB::FileSegment::Range(28, 29), DB::FileSegment::State::DOWNLOADING);
ASSERT_TRUE(segments[2]->getOrSetDownloader() != DB::FileSegment::getCallerId()); assertRange(34, segments[2], DB::FileSegment::Range(28, 29), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(segments[2]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADING); ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADING);
bool lets_start_download = false;
std::mutex mutex;
std::condition_variable cv;
std::thread other_1([&]
{ {
std::lock_guard lock(mutex); DB::ThreadStatus thread_status_1;
lets_start_download = true; auto query_context_1 = DB::Context::createCopy(getContext().context);
} query_context_1->makeQueryContext();
cv.notify_one(); query_context_1->setCurrentQueryId("query_id_1");
DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1);
thread_status_1.attachQueryContext(query_context_1);
segments_2[2]->wait(); auto holder_2 = cache.getOrSet(key, 25, 5, false); /// Get [25, 29] once again.
ASSERT_TRUE(segments_2[2]->state() == DB::FileSegment::State::DOWNLOADED); auto segments_2 = fromHolder(holder_2);
}); ASSERT_EQ(segments.size(), 3);
{ assertRange(35, segments_2[0], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
std::unique_lock lock(mutex); assertRange(36, segments_2[1], DB::FileSegment::Range(27, 27), DB::FileSegment::State::DOWNLOADED);
cv.wait(lock, [&]{ return lets_start_download; }); assertRange(37, segments_2[2], DB::FileSegment::Range(28, 29), DB::FileSegment::State::DOWNLOADING);
}
prepareAndDownload(segments[2]); ASSERT_TRUE(segments[2]->getOrSetDownloader() != DB::FileSegment::getCallerId());
segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED); ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADING);
ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADED);
other_1.join(); {
} std::lock_guard lock(mutex);
lets_start_download = true;
}
cv.notify_one();
/// Current cache: [___] [___][_][__][__] segments_2[2]->wait();
/// ^ ^ ^ ^ ^^ ^^ ^ ASSERT_TRUE(segments_2[2]->state() == DB::FileSegment::State::DOWNLOADED);
/// 2 4 24 26 27 2930 31 });
{
/// Now let's check the similar case but getting ERROR state after segment->wait(), when
/// state is changed not manually via segment->complete(state) but from destructor of holder
/// and notify_all() is also called from destructor of holder.
std::optional<DB::FileSegmentsHolder> holder;
holder.emplace(cache.getOrSet(key, 3, 23, false)); /// Get [3, 25]
auto segments = fromHolder(*holder);
ASSERT_EQ(segments.size(), 3);
assertRange(38, segments[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
assertRange(39, segments[1], DB::FileSegment::Range(5, 23), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(segments[1]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(segments[1]->state() == DB::FileSegment::State::DOWNLOADING);
assertRange(40, segments[2], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
bool lets_start_download = false;
std::mutex mutex;
std::condition_variable cv;
std::thread other_1([&]
{
DB::ThreadStatus thread_status_1;
auto query_context_1 = DB::Context::createCopy(getContext().context);
query_context_1->makeQueryContext();
query_context_1->setCurrentQueryId("query_id_1");
DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1);
thread_status_1.attachQueryContext(query_context_1);
auto holder_2 = cache.getOrSet(key, 3, 23, false); /// Get [3, 25] once again
auto segments_2 = fromHolder(*holder);
ASSERT_EQ(segments_2.size(), 3);
assertRange(41, segments_2[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
assertRange(42, segments_2[1], DB::FileSegment::Range(5, 23), DB::FileSegment::State::DOWNLOADING);
assertRange(43, segments_2[2], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
ASSERT_TRUE(segments_2[1]->getDownloader() != DB::FileSegment::getCallerId());
ASSERT_TRUE(segments_2[1]->state() == DB::FileSegment::State::DOWNLOADING);
{ {
std::lock_guard lock(mutex); std::unique_lock lock(mutex);
lets_start_download = true; cv.wait(lock, [&]{ return lets_start_download; });
} }
cv.notify_one();
segments_2[1]->wait(); prepareAndDownload(segments[2]);
printRanges(segments_2); segments[2]->completeWithState(DB::FileSegment::State::DOWNLOADED);
ASSERT_TRUE(segments_2[1]->state() == DB::FileSegment::State::PARTIALLY_DOWNLOADED); ASSERT_TRUE(segments[2]->state() == DB::FileSegment::State::DOWNLOADED);
ASSERT_TRUE(segments_2[1]->getOrSetDownloader() == DB::FileSegment::getCallerId()); other_1.join();
prepareAndDownload(segments_2[1]);
segments_2[1]->completeWithState(DB::FileSegment::State::DOWNLOADED);
});
{
std::unique_lock lock(mutex);
cv.wait(lock, [&]{ return lets_start_download; });
} }
holder.reset(); /// Current cache: [___] [___][_][__][__]
other_1.join(); /// ^ ^ ^ ^ ^^ ^^ ^
printRanges(segments); /// 2 4 24 26 27 2930 31
ASSERT_TRUE(segments[1]->state() == DB::FileSegment::State::DOWNLOADED);
{
/// Now let's check the similar case but getting ERROR state after segment->wait(), when
/// state is changed not manually via segment->complete(state) but from destructor of holder
/// and notify_all() is also called from destructor of holder.
std::optional<DB::FileSegmentsHolder> holder;
holder.emplace(cache.getOrSet(key, 3, 23, false)); /// Get [3, 25]
auto segments = fromHolder(*holder);
ASSERT_EQ(segments.size(), 3);
assertRange(38, segments[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
assertRange(39, segments[1], DB::FileSegment::Range(5, 23), DB::FileSegment::State::EMPTY);
ASSERT_TRUE(segments[1]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(segments[1]->state() == DB::FileSegment::State::DOWNLOADING);
assertRange(40, segments[2], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
bool lets_start_download = false;
std::mutex mutex;
std::condition_variable cv;
std::thread other_1([&]
{
DB::ThreadStatus thread_status_1;
auto query_context_1 = DB::Context::createCopy(getContext().context);
query_context_1->makeQueryContext();
query_context_1->setCurrentQueryId("query_id_1");
DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1);
thread_status_1.attachQueryContext(query_context_1);
auto holder_2 = cache.getOrSet(key, 3, 23, false); /// Get [3, 25] once again
auto segments_2 = fromHolder(*holder);
ASSERT_EQ(segments_2.size(), 3);
assertRange(41, segments_2[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
assertRange(42, segments_2[1], DB::FileSegment::Range(5, 23), DB::FileSegment::State::DOWNLOADING);
assertRange(43, segments_2[2], DB::FileSegment::Range(24, 26), DB::FileSegment::State::DOWNLOADED);
ASSERT_TRUE(segments_2[1]->getDownloader() != DB::FileSegment::getCallerId());
ASSERT_TRUE(segments_2[1]->state() == DB::FileSegment::State::DOWNLOADING);
{
std::lock_guard lock(mutex);
lets_start_download = true;
}
cv.notify_one();
segments_2[1]->wait();
printRanges(segments_2);
ASSERT_TRUE(segments_2[1]->state() == DB::FileSegment::State::PARTIALLY_DOWNLOADED);
ASSERT_TRUE(segments_2[1]->getOrSetDownloader() == DB::FileSegment::getCallerId());
prepareAndDownload(segments_2[1]);
segments_2[1]->completeWithState(DB::FileSegment::State::DOWNLOADED);
});
{
std::unique_lock lock(mutex);
cv.wait(lock, [&]{ return lets_start_download; });
}
holder.reset();
other_1.join();
printRanges(segments);
ASSERT_TRUE(segments[1]->state() == DB::FileSegment::State::DOWNLOADED);
}
} }
/// Current cache: [___][ ][___][_][__] /// Current cache: [___][ ][___][_][__]
@ -481,6 +483,7 @@ TEST(FileCache, get)
auto cache2 = DB::FileCache(cache_base_path, settings); auto cache2 = DB::FileCache(cache_base_path, settings);
cache2.initialize(); cache2.initialize();
auto key = cache2.hash("key1");
auto holder1 = cache2.getOrSet(key, 2, 28, false); /// Get [2, 29] auto holder1 = cache2.getOrSet(key, 2, 28, false); /// Get [2, 29]
@ -501,6 +504,7 @@ TEST(FileCache, get)
settings2.max_file_segment_size = 10; settings2.max_file_segment_size = 10;
auto cache2 = DB::FileCache(caches_dir / "cache2", settings2); auto cache2 = DB::FileCache(caches_dir / "cache2", settings2);
cache2.initialize(); cache2.initialize();
auto key = cache2.hash("key1");
auto holder1 = cache2.getOrSet(key, 0, 25, false); /// Get [0, 24] auto holder1 = cache2.getOrSet(key, 0, 25, false); /// Get [0, 24]
auto segments1 = fromHolder(holder1); auto segments1 = fromHolder(holder1);

View File

@ -182,6 +182,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params->params.min_free_disk_space, transform_params->params.min_free_disk_space,
transform_params->params.compile_aggregate_expressions, transform_params->params.compile_aggregate_expressions,
transform_params->params.min_count_to_compile_aggregate_expression, transform_params->params.min_count_to_compile_aggregate_expression,
transform_params->params.max_block_size,
/* only_merge */ false, /* only_merge */ false,
transform_params->params.stats_collecting_params}; transform_params->params.stats_collecting_params};
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(src_header, std::move(params_for_set), final); auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(src_header, std::move(params_for_set), final);
@ -376,16 +377,15 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
}); });
/// We add the explicit resize here, but not in case of aggregating in order, since AIO don't use two-level hash tables and thus returns only buckets with bucket_number = -1. /// We add the explicit resize here, but not in case of aggregating in order, since AIO don't use two-level hash tables and thus returns only buckets with bucket_number = -1.
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : pipeline.getNumStreams(), true /* force */); pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, true /* force */);
aggregating = collector.detachProcessors(0); aggregating = collector.detachProcessors(0);
} }
else else
{ {
pipeline.addSimpleTransform([&](const Block & header) pipeline.addSimpleTransform([&](const Block & header) { return std::make_shared<AggregatingTransform>(header, transform_params); });
{
return std::make_shared<AggregatingTransform>(header, transform_params); pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, false /* force */);
});
aggregating = collector.detachProcessors(0); aggregating = collector.detachProcessors(0);
} }

View File

@ -38,7 +38,8 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
settings.max_threads, settings.max_threads,
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions, settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression); settings.min_count_to_compile_aggregate_expression,
settings.max_block_size);
aggregator = std::make_unique<Aggregator>(header, params); aggregator = std::make_unique<Aggregator>(header, params);

View File

@ -182,7 +182,8 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
if (cur_block_size >= max_block_size || cur_block_bytes + current_memory_usage >= max_block_bytes) if (cur_block_size >= max_block_size || cur_block_bytes + current_memory_usage >= max_block_bytes)
{ {
if (group_by_key) if (group_by_key)
group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false); group_by_block
= params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ true>(variants, /* final= */ false);
cur_block_bytes += current_memory_usage; cur_block_bytes += current_memory_usage;
finalizeCurrentChunk(std::move(chunk), key_end); finalizeCurrentChunk(std::move(chunk), key_end);
return; return;
@ -293,7 +294,8 @@ void AggregatingInOrderTransform::generate()
if (cur_block_size && is_consume_finished) if (cur_block_size && is_consume_finished)
{ {
if (group_by_key) if (group_by_key)
group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false); group_by_block
= params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ true>(variants, /* final= */ false);
else else
params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns); params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns);
variants.invalidate(); variants.invalidate();

View File

@ -203,7 +203,7 @@ public:
{ {
auto & output = outputs.front(); auto & output = outputs.front();
if (finished && !has_input) if (finished && single_level_chunks.empty())
{ {
output.finish(); output.finish();
return Status::Finished; return Status::Finished;
@ -230,7 +230,7 @@ public:
if (!processors.empty()) if (!processors.empty())
return Status::ExpandPipeline; return Status::ExpandPipeline;
if (has_input) if (!single_level_chunks.empty())
return preparePushToOutput(); return preparePushToOutput();
/// Single level case. /// Single level case.
@ -244,11 +244,14 @@ public:
private: private:
IProcessor::Status preparePushToOutput() IProcessor::Status preparePushToOutput()
{ {
auto & output = outputs.front(); if (single_level_chunks.empty())
output.push(std::move(current_chunk)); throw Exception(ErrorCodes::LOGICAL_ERROR, "Some ready chunks expected");
has_input = false;
if (finished) auto & output = outputs.front();
output.push(std::move(single_level_chunks.back()));
single_level_chunks.pop_back();
if (finished && single_level_chunks.empty())
{ {
output.finish(); output.finish();
return Status::Finished; return Status::Finished;
@ -268,17 +271,17 @@ private:
{ {
auto chunk = input.pull(); auto chunk = input.pull();
auto bucket = getInfoFromChunk(chunk)->bucket_num; auto bucket = getInfoFromChunk(chunk)->bucket_num;
chunks[bucket] = std::move(chunk); two_level_chunks[bucket] = std::move(chunk);
} }
} }
if (!shared_data->is_bucket_processed[current_bucket_num]) if (!shared_data->is_bucket_processed[current_bucket_num])
return Status::NeedData; return Status::NeedData;
if (!chunks[current_bucket_num]) if (!two_level_chunks[current_bucket_num])
return Status::NeedData; return Status::NeedData;
output.push(std::move(chunks[current_bucket_num])); output.push(std::move(two_level_chunks[current_bucket_num]));
++current_bucket_num; ++current_bucket_num;
if (current_bucket_num == NUM_BUCKETS) if (current_bucket_num == NUM_BUCKETS)
@ -298,27 +301,16 @@ private:
size_t num_threads; size_t num_threads;
bool is_initialized = false; bool is_initialized = false;
bool has_input = false;
bool finished = false; bool finished = false;
Chunk current_chunk; Chunks single_level_chunks;
UInt32 current_bucket_num = 0; UInt32 current_bucket_num = 0;
static constexpr Int32 NUM_BUCKETS = 256; static constexpr Int32 NUM_BUCKETS = 256;
std::array<Chunk, NUM_BUCKETS> chunks; std::array<Chunk, NUM_BUCKETS> two_level_chunks;
Processors processors; Processors processors;
void setCurrentChunk(Chunk chunk)
{
if (has_input)
throw Exception("Current chunk was already set in "
"ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
has_input = true;
current_chunk = std::move(chunk);
}
void initialize() void initialize()
{ {
is_initialized = true; is_initialized = true;
@ -339,7 +331,7 @@ private:
auto block = params->aggregator.prepareBlockAndFillWithoutKey( auto block = params->aggregator.prepareBlockAndFillWithoutKey(
*first, params->final, first->type != AggregatedDataVariants::Type::without_key); *first, params->final, first->type != AggregatedDataVariants::Type::without_key);
setCurrentChunk(convertToChunk(block)); single_level_chunks.emplace_back(convertToChunk(block));
} }
} }
@ -364,9 +356,10 @@ private:
else else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
auto block = params->aggregator.prepareBlockAndFillSingleLevel(*first, params->final); auto blocks = params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ false>(*first, params->final);
for (auto & block : blocks)
single_level_chunks.emplace_back(convertToChunk(block));
setCurrentChunk(convertToChunk(block));
finished = true; finished = true;
} }

View File

@ -297,8 +297,10 @@ public:
assert(indexes_mapping.size() == data_types.size()); assert(indexes_mapping.size() == data_types.size());
for (size_t i = 0; i < indexes_mapping.size(); ++i) for (size_t i = 0; i < indexes_mapping.size(); ++i)
{
if (!candidate_set->areTypesEqual(indexes_mapping[i].tuple_index, data_types[i])) if (!candidate_set->areTypesEqual(indexes_mapping[i].tuple_index, data_types[i]))
return false; return false;
}
return true; return true;
}; };

View File

@ -5555,6 +5555,10 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
if (select_query->interpolate() && !select_query->interpolate()->children.empty()) if (select_query->interpolate() && !select_query->interpolate()->children.empty())
return std::nullopt; return std::nullopt;
// Currently projections don't support GROUPING SET yet.
if (select_query->group_by_with_grouping_sets)
return std::nullopt;
auto query_options = SelectQueryOptions( auto query_options = SelectQueryOptions(
QueryProcessingStage::WithMergeableState, QueryProcessingStage::WithMergeableState,
/* depth */ 1, /* depth */ 1,

View File

@ -313,6 +313,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions, settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression, settings.min_count_to_compile_aggregate_expression,
settings.max_block_size,
only_merge); only_merge);
return std::make_pair(params, only_merge); return std::make_pair(params, only_merge);

View File

@ -1,4 +1,8 @@
<test> <test>
<query>select sipHash64(number) from numbers(1e7) group by number format Null</query>
<query>select * from (select * from numbers(1e7) group by number) group by number format Null</query>
<query>select * from (select * from numbers(1e7) group by number) order by number format Null</query>
<query>select * from (select * from numbers_mt(1e7) group by number) group by number format Null</query> <query>select * from (select * from numbers_mt(1e7) group by number) group by number format Null</query>
<query>select * from (select * from numbers_mt(1e7) group by number) order by number format Null</query> <query>select * from (select * from numbers_mt(1e7) group by number) order by number format Null</query>
<query>select * from (select * from numbers_mt(1e7) group by number) group by number format Null settings max_bytes_before_external_group_by = 1</query> <query>select * from (select * from numbers_mt(1e7) group by number) group by number format Null settings max_bytes_before_external_group_by = 1</query>

View File

@ -1,4 +1,5 @@
SET joined_subquery_requires_alias = 0; SET joined_subquery_requires_alias = 0;
SET max_threads = 1;
-- incremental streaming usecase -- incremental streaming usecase
-- that has sense only if data filling order has guarantees of chronological order -- that has sense only if data filling order has guarantees of chronological order

View File

@ -28,7 +28,7 @@ WITH
ORDER BY event_time DESC ORDER BY event_time DESC
LIMIT 1 LIMIT 1
) AS id ) AS id
SELECT uniqExact(thread_id) SELECT uniqExact(thread_id) > 2
FROM system.query_thread_log FROM system.query_thread_log
WHERE (event_date >= (today() - 1)) AND (query_id = id) AND (thread_id != master_thread_id); WHERE (event_date >= (today() - 1)) AND (query_id = id) AND (thread_id != master_thread_id);

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Tags: no-replicated-database, no-parallel, no-fasttest, no-tsan, no-asan, no-random-settings, no-s3-storage # Tags: no-replicated-database, no-parallel, no-fasttest, no-tsan, no-asan, no-random-settings, no-s3-storage, no-msan
# Tag no-fasttest: max_memory_usage_for_user can interfere another queries running concurrently # Tag no-fasttest: max_memory_usage_for_user can interfere another queries running concurrently
# Regression for MemoryTracker that had been incorrectly accounted # Regression for MemoryTracker that had been incorrectly accounted

View File

@ -6,4 +6,4 @@
2020-01-01 00:00:00 2 2020-01-01 00:00:00 2
1 1
499999 499999
5 18

View File

@ -1,7 +1,7 @@
DROP TABLE IF EXISTS select_final; DROP TABLE IF EXISTS select_final;
SET do_not_merge_across_partitions_select_final = 1; SET do_not_merge_across_partitions_select_final = 1;
SET max_threads = 0; SET max_threads = 16;
CREATE TABLE select_final (t DateTime, x Int32, string String) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY (x, t); CREATE TABLE select_final (t DateTime, x Int32, string String) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY (x, t);

View File

@ -1,27 +1,12 @@
1 50 50 1 0 49 1 50 50 1 0 49
1 50 50 1 0 49 1 50 50 1 0 49
1 50 50 1 0 49 1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49 1 50 50 1 0 49
1 50 50 1 0 49 1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49 1 50 50 1 0 49
1 50 50 1 0 49 1 50 50 1 0 49
1 50 50 1 0 49 1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49 1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49 1 50 50 1 0 49
1 50 50 1 0 49 1 50 50 1 0 49
1 50 50 1 0 49 1 50 50 1 0 49
@ -29,32 +14,47 @@
1 50 50 1 0 49 1 50 50 1 0 49
1 50 50 1 0 49 1 50 50 1 0 49
1 50 50 1 0 49 1 50 50 1 0 49
1 50 51 0 1 51 1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49 1 50 50 1 0 49
1 50 51 0 1 51 1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51 1 50 51 0 1 51
1 50 51 0 1 51 1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51 1 50 51 0 1 51
1 50 51 0 1 51 1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51 1 50 51 0 1 51
1 50 51 0 1 51 1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51 1 50 51 0 1 51
1 50 50 1 0 49 1 50 51 0 1 51
1 50 50 1 0 49 1 50 51 0 1 51
1 50 50 1 0 49 1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51 1 50 51 0 1 51

View File

@ -52,6 +52,7 @@ ALL LEFT JOIN
FROM group_bitmap_data_test FROM group_bitmap_data_test
WHERE pickup_date = '2019-01-01' WHERE pickup_date = '2019-01-01'
GROUP BY city_id GROUP BY city_id
) AS js2 USING (city_id); ) AS js2 USING (city_id)
ORDER BY today_users, before_users, ll_users, old_users, new_users, diff_users;
DROP TABLE IF EXISTS group_bitmap_data_test; DROP TABLE IF EXISTS group_bitmap_data_test;

View File

@ -0,0 +1,28 @@
a 2
a x 1
a y 1
b 2
b x 1
b y 1
4
a 2
a x 1
a y 1
b 2
b x 1
b y 1
4
x 2
y 2
a 2
a x 1
a y 1
b 2
b x 1
b y 1
a x 1
a y 1
b x 1
b y 1
4

View File

@ -0,0 +1,15 @@
drop table if exists test;
create table test(dim1 String, dim2 String, projection p1 (select dim1, dim2, count() group by dim1, dim2)) engine MergeTree order by dim1;
insert into test values ('a', 'x') ('a', 'y') ('b', 'x') ('b', 'y');
select dim1, dim2, count() from test group by grouping sets ((dim1, dim2), dim1) order by dim1, dim2, count();
select dim1, dim2, count() from test group by dim1, dim2 with rollup order by dim1, dim2, count();
select dim1, dim2, count() from test group by dim1, dim2 with cube order by dim1, dim2, count();
select dim1, dim2, count() from test group by dim1, dim2 with totals order by dim1, dim2, count();
drop table test;

View File

@ -12,7 +12,7 @@ select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by n
-- and the query with GROUP BY on remote servers will first do GROUP BY and then send the block, -- and the query with GROUP BY on remote servers will first do GROUP BY and then send the block,
-- so the initiator will first receive all blocks from remotes and only after start merging, -- so the initiator will first receive all blocks from remotes and only after start merging,
-- and will hit the memory limit. -- and will hit the memory limit.
select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by number order by number limit 1e6 settings distributed_group_by_no_merge=2, max_memory_usage='100Mi'; -- { serverError 241 } select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by number order by number limit 1e6 settings distributed_group_by_no_merge=2, max_memory_usage='100Mi', max_block_size=1e12; -- { serverError 241 }
-- with optimize_aggregation_in_order=1 remote servers will produce blocks more frequently, -- with optimize_aggregation_in_order=1 remote servers will produce blocks more frequently,
-- since they don't need to wait until the aggregation will be finished, -- since they don't need to wait until the aggregation will be finished,

View File

@ -1 +1,2 @@
select count(1) from (SELECT 1 AS a, count(1) FROM numbers(5)) select count(1) from (SELECT 1 AS a, count(1) FROM numbers(5));
select count(1) from (SELECT 1 AS a, count(1) + 1 FROM numbers(5));

View File

@ -1,24 +1,24 @@
SET join_algorithm = 'full_sorting_merge'; SET join_algorithm = 'full_sorting_merge';
SELECT * FROM (SELECT 1 as key) AS t1 JOIN (SELECT 1 as key) t2 ON t1.key = t2.key; SELECT * FROM (SELECT 1 as key) AS t1 JOIN (SELECT 1 as key) t2 ON t1.key = t2.key ORDER BY key;
SELECT * FROM (SELECT 1 as key) AS t1 JOIN (SELECT 1 as key) t2 USING key; SELECT * FROM (SELECT 1 as key) AS t1 JOIN (SELECT 1 as key) t2 USING key ORDER BY key;
SELECT * FROM (SELECT 1 :: UInt32 as key) AS t1 FULL JOIN (SELECT 1 :: Nullable(UInt32) as key) t2 USING (key); SELECT * FROM (SELECT 1 :: UInt32 as key) AS t1 FULL JOIN (SELECT 1 :: Nullable(UInt32) as key) t2 USING (key) ORDER BY key;
SELECT * FROM (SELECT 1 :: UInt32 as key) AS t1 FULL JOIN (SELECT NULL :: Nullable(UInt32) as key) t2 USING (key); SELECT * FROM (SELECT 1 :: UInt32 as key) AS t1 FULL JOIN (SELECT NULL :: Nullable(UInt32) as key) t2 USING (key) ORDER BY key;
SELECT * FROM (SELECT 1 :: Int32 as key) AS t1 JOIN (SELECT 1 :: UInt32 as key) t2 ON t1.key = t2.key; SELECT * FROM (SELECT 1 :: Int32 as key) AS t1 JOIN (SELECT 1 :: UInt32 as key) t2 ON t1.key = t2.key ORDER BY key;
SELECT * FROM (SELECT -1 :: Nullable(Int32) as key) AS t1 FULL JOIN (SELECT 4294967295 :: UInt32 as key) t2 ON t1.key = t2.key; SELECT * FROM (SELECT -1 :: Nullable(Int32) as key) AS t1 FULL JOIN (SELECT 4294967295 :: UInt32 as key) t2 ON t1.key = t2.key ORDER BY key;
SELECT * FROM (SELECT 'a' :: LowCardinality(String) AS key) AS t1 JOIN (SELECT 'a' :: String AS key) AS t2 ON t1.key = t2.key; SELECT * FROM (SELECT 'a' :: LowCardinality(String) AS key) AS t1 JOIN (SELECT 'a' :: String AS key) AS t2 ON t1.key = t2.key ORDER BY key;
SELECT * FROM (SELECT 'a' :: LowCardinality(Nullable(String)) AS key) AS t1 JOIN (SELECT 'a' :: String AS key) AS t2 ON t1.key = t2.key; SELECT * FROM (SELECT 'a' :: LowCardinality(Nullable(String)) AS key) AS t1 JOIN (SELECT 'a' :: String AS key) AS t2 ON t1.key = t2.key ORDER BY key;
SELECT * FROM (SELECT 'a' :: LowCardinality(Nullable(String)) AS key) AS t1 JOIN (SELECT 'a' :: Nullable(String) AS key) AS t2 ON t1.key = t2.key; SELECT * FROM (SELECT 'a' :: LowCardinality(Nullable(String)) AS key) AS t1 JOIN (SELECT 'a' :: Nullable(String) AS key) AS t2 ON t1.key = t2.key ORDER BY key;
SELECT * FROM (SELECT 'a' :: LowCardinality(String) AS key) AS t1 JOIN (SELECT 'a' :: LowCardinality(String) AS key) AS t2 ON t1.key = t2.key; SELECT * FROM (SELECT 'a' :: LowCardinality(String) AS key) AS t1 JOIN (SELECT 'a' :: LowCardinality(String) AS key) AS t2 ON t1.key = t2.key ORDER BY key;
SELECT 5 == count() FROM (SELECT number as a from numbers(5)) as t1 LEFT JOIN (SELECT number as b from numbers(5) WHERE number > 100) as t2 ON t1.a = t2.b; SELECT 5 == count() FROM (SELECT number as a from numbers(5)) as t1 LEFT JOIN (SELECT number as b from numbers(5) WHERE number > 100) as t2 ON t1.a = t2.b ORDER BY 1;
SELECT 5 == count() FROM (SELECT number as a from numbers(5) WHERE number > 100) as t1 RIGHT JOIN (SELECT number as b from numbers(5)) as t2 ON t1.a = t2.b; SELECT 5 == count() FROM (SELECT number as a from numbers(5) WHERE number > 100) as t1 RIGHT JOIN (SELECT number as b from numbers(5)) as t2 ON t1.a = t2.b ORDER BY 1;

View File

@ -1,5 +1,22 @@
-- { echoOn } -- { echoOn }
explain pipeline select * from (select * from numbers(1e8) group by number) group by number;
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 16 → 16
AggregatingTransform × 16
StrictResize 16 → 16
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 1 → 16
AggregatingTransform
(Expression)
ExpressionTransform
(ReadFromStorage)
Limit
Numbers 0 → 1
explain pipeline select * from (select * from numbers_mt(1e8) group by number) group by number; explain pipeline select * from (select * from numbers_mt(1e8) group by number) group by number;
(Expression) (Expression)
ExpressionTransform × 16 ExpressionTransform × 16

View File

@ -1,9 +1,12 @@
set max_threads = 16; set max_threads = 16;
set prefer_localhost_replica = 1; set prefer_localhost_replica = 1;
set optimize_aggregation_in_order = 0; set optimize_aggregation_in_order = 0;
set max_block_size = 65505;
-- { echoOn } -- { echoOn }
explain pipeline select * from (select * from numbers(1e8) group by number) group by number;
explain pipeline select * from (select * from numbers_mt(1e8) group by number) group by number; explain pipeline select * from (select * from numbers_mt(1e8) group by number) group by number;
explain pipeline select * from (select * from numbers_mt(1e8) group by number) order by number; explain pipeline select * from (select * from numbers_mt(1e8) group by number) order by number;

View File

@ -0,0 +1,9 @@
SET max_block_size = 4213;
SELECT DISTINCT (blockSize() <= 4213)
FROM
(
SELECT number
FROM numbers(100000)
GROUP BY number
);

View File

@ -7,3 +7,10 @@ UInt64 String DateTime Map(UUID, Array(Float32))
13 str 2022-08-04 18:30:53 {'10':[11,12],'13':[14,15]} 13 str 2022-08-04 18:30:53 {'10':[11,12],'13':[14,15]}
1 1
1 1
_CAST(42, \'Int64\') Int64
_CAST([1, 2, 3], \'Array(UInt8)\') Array(UInt8)
_CAST(((\'abc\', 22), (\'def\', 33)), \'Map(String, UInt8)\') Map(String, UInt8)
_CAST([[4, 5, 6], [7], [8, 9]], \'Array(Array(UInt8))\') Array(Array(UInt8))
_CAST(((10, [11, 12]), (13, [14, 15])), \'Map(UInt8, Array(UInt8))\') Map(UInt8, Array(UInt8))
_CAST(((\'ghj\', ((\'klm\', [16, 17]))), (\'nop\', ((\'rst\', [18])))), \'Map(String, Map(String, Array(UInt8)))\') Map(String, Map(String, Array(UInt8)))
a Int8

View File

@ -68,13 +68,27 @@ $CLICKHOUSE_CLIENT -n -q "select {n: UInt8} -- { serverError 456 }"
$CLICKHOUSE_CLIENT -n -q "set param_n = 12; set param_n = 13; select {n: UInt8}" $CLICKHOUSE_CLIENT -n -q "set param_n = 12; set param_n = 13; select {n: UInt8}"
# but multiple different parameters could be defined within each session # multiple different parameters could be defined within each session
$CLICKHOUSE_CLIENT -n -q " $CLICKHOUSE_CLIENT -n -q "
set param_a = 13, param_b = 'str'; set param_a = 13, param_b = 'str';
set param_c = '2022-08-04 18:30:53'; set param_c = '2022-08-04 18:30:53';
set param_d = '{\'10\': [11, 12], \'13\': [14, 15]}'; set param_d = '{\'10\': [11, 12], \'13\': [14, 15]}';
select {a: UInt32}, {b: String}, {c: DateTime}, {d: Map(String, Array(UInt8))}" select {a: UInt32}, {b: String}, {c: DateTime}, {d: Map(String, Array(UInt8))}"
# empty parameter name is not allowed # empty parameter name is not allowed
$CLICKHOUSE_CLIENT --param_="" -q "select 1" 2>&1 | grep -c 'Code: 36' $CLICKHOUSE_CLIENT --param_="" -q "select 1" 2>&1 | grep -c 'Code: 36'
$CLICKHOUSE_CLIENT -q "set param_ = ''" 2>&1 | grep -c 'Code: 36' $CLICKHOUSE_CLIENT -q "set param_ = ''" 2>&1 | grep -c 'Code: 36'
# parameters are also supported for DESCRIBE TABLE queries
$CLICKHOUSE_CLIENT \
--param_id="42" \
--param_arr="[1, 2, 3]" \
--param_map="{'abc': 22, 'def': 33}" \
--param_mul_arr="[[4, 5, 6], [7], [8, 9]]" \
--param_map_arr="{10: [11, 12], 13: [14, 15]}" \
--param_map_map_arr="{'ghj': {'klm': [16, 17]}, 'nop': {'rst': [18]}}" \
-q "describe table(select {id: Int64}, {arr: Array(UInt8)}, {map: Map(String, UInt8)}, {mul_arr: Array(Array(UInt8))}, {map_arr: Map(UInt8, Array(UInt8))}, {map_map_arr: Map(String, Map(String, Array(UInt8)))})"
$CLICKHOUSE_CLIENT --param_p=42 -q "describe table (select * from (select {p:Int8} as a group by a) order by a)"

View File

@ -0,0 +1,3 @@
CREATE TABLE set_crash (key1 Int32, id1 Int64, c1 Int64) ENGINE = MergeTree PARTITION BY id1 ORDER BY key1;
INSERT INTO set_crash VALUES (-1, 1, 0);
SELECT 1 in (-1, 1) FROM set_crash WHERE (key1, id1) in (-1, 1);