Merge branch 'master' into stress_s3

This commit is contained in:
alesapin 2022-09-04 13:00:28 +02:00 committed by GitHub
commit a67703b76b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
77 changed files with 2309 additions and 773 deletions

View File

@ -54,9 +54,8 @@ set(SRCS
add_library(cxx ${SRCS})
set_target_properties(cxx PROPERTIES FOLDER "contrib/libcxx-cmake")
target_include_directories(cxx SYSTEM BEFORE PUBLIC
$<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}>/src)
target_include_directories(cxx SYSTEM BEFORE PRIVATE $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/src>)
target_include_directories(cxx SYSTEM BEFORE PUBLIC $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>)
target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)
# Enable capturing stack traces for all exceptions.

View File

@ -37,7 +37,7 @@ sudo xcode-select --install
``` bash
brew update
brew install cmake ninja libtool gettext llvm gcc binutils grep findutils
brew install ccache cmake ninja libtool gettext llvm gcc binutils grep findutils
```
## Checkout ClickHouse Sources {#checkout-clickhouse-sources}

View File

@ -15,7 +15,7 @@ Usage examples:
## Usage in ClickHouse Server {#usage-in-clickhouse-server}
``` sql
ENGINE = GenerateRandom(random_seed, max_string_length, max_array_length)
ENGINE = GenerateRandom([random_seed] [,max_string_length] [,max_array_length])
```
The `max_array_length` and `max_string_length` parameters specify maximum length of all

View File

@ -6,19 +6,28 @@ title: "UK Property Price Paid"
---
The dataset contains data about prices paid for real-estate property in England and Wales. The data is available since year 1995.
The size of the dataset in uncompressed form is about 4 GiB and it will take about 270 MiB in ClickHouse.
The size of the dataset in uncompressed form is about 4 GiB and it will take about 278 MiB in ClickHouse.
Source: https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads <br/>
Source: https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads
Description of the fields: https://www.gov.uk/guidance/about-the-price-paid-data
Contains HM Land Registry data © Crown copyright and database right 2021. This data is licensed under the Open Government Licence v3.0.
## Download the Dataset {#download-dataset}
Run the command:
```bash
wget http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv
```
Download will take about 2 minutes with good internet connection.
## Create the Table {#create-table}
```sql
CREATE TABLE uk_price_paid
(
uuid UUID,
price UInt32,
date Date,
postcode1 LowCardinality(String),
@ -33,50 +42,30 @@ CREATE TABLE uk_price_paid
town LowCardinality(String),
district LowCardinality(String),
county LowCardinality(String),
category UInt8,
category2 UInt8
) ORDER BY (postcode1, postcode2, addr1, addr2);
category UInt8
) ENGINE = MergeTree ORDER BY (postcode1, postcode2, addr1, addr2);
```
## Preprocess and Import Data {#preprocess-import-data}
In this example, we define the structure of source data from the CSV file and specify a query to preprocess the data with either `clickhouse-client` or the web based Play UI.
We will use `clickhouse-local` tool for data preprocessing and `clickhouse-client` to upload it.
In this example, we define the structure of source data from the CSV file and specify a query to preprocess the data with `clickhouse-local`.
The preprocessing is:
- splitting the postcode to two different columns `postcode1` and `postcode2` that are better for storage and queries;
- splitting the postcode to two different columns `postcode1` and `postcode2` that is better for storage and queries;
- coverting the `time` field to date as it only contains 00:00 time;
- ignoring the [UUid](../../sql-reference/data-types/uuid.md) field because we don't need it for analysis;
- transforming `type` and `duration` to more readable Enum fields with function [transform](../../sql-reference/functions/other-functions.md#transform);
- transforming `is_new` and `category` fields from single-character string (`Y`/`N` and `A`/`B`) to [UInt8](../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-uint256-int8-int16-int32-int64-int128-int256) field with 0 and 1.
Preprocessed data is piped directly to `clickhouse-client` to be inserted into ClickHouse table in streaming fashion.
```bash
INSERT INTO uk_price_paid
WITH
splitByChar(' ', postcode) AS p
SELECT
replaceRegexpAll(uuid_string, '{|}','') AS uuid,
toUInt32(price_string) AS price,
parseDateTimeBestEffortUS(time) AS date,
p[1] AS postcode1,
p[2] AS postcode2,
transform(a, ['T', 'S', 'D', 'F', 'O'], ['terraced', 'semi-detached', 'detached', 'flat', 'other']) AS type,
b = 'Y' AS is_new,
transform(c, ['F', 'L', 'U'], ['freehold', 'leasehold', 'unknown']) AS duration,
addr1,
addr2,
street,
locality,
town,
district,
county,
d = 'B' AS category,
e = 'B' AS category2
FROM url(
'http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv',
'CSV',
'uuid_string String,
price_string String,
time String,
clickhouse-local --input-format CSV --structure '
uuid String,
price UInt32,
time DateTime,
postcode String,
a String,
b String,
@ -89,12 +78,29 @@ FROM url(
district String,
county String,
d String,
e String'
)
SETTINGS max_http_get_redirects=1;
e String
' --query "
WITH splitByChar(' ', postcode) AS p
SELECT
price,
toDate(time) AS date,
p[1] AS postcode1,
p[2] AS postcode2,
transform(a, ['T', 'S', 'D', 'F', 'O'], ['terraced', 'semi-detached', 'detached', 'flat', 'other']) AS type,
b = 'Y' AS is_new,
transform(c, ['F', 'L', 'U'], ['freehold', 'leasehold', 'unknown']) AS duration,
addr1,
addr2,
street,
locality,
town,
district,
county,
d = 'B' AS category
FROM table" --date_time_input_format best_effort < pp-complete.csv | clickhouse-client --query "INSERT INTO uk_price_paid FORMAT TSV"
```
It will take about 2 minutes depending on where you are in the world, and where your ClickHouse servers are. Almost all of the time is the download time of the CSV file from the UK government server.
It will take about 40 seconds.
## Validate the Data {#validate-data}
@ -106,13 +112,13 @@ SELECT count() FROM uk_price_paid;
Result:
```response
```text
┌──count()─┐
│ 27450499
│ 26321785
└──────────┘
```
The size of dataset in ClickHouse is just 540 MiB, check it.
The size of dataset in ClickHouse is just 278 MiB, check it.
Query:
@ -124,14 +130,10 @@ Result:
```text
┌─formatReadableSize(total_bytes)─┐
545.04 MiB │
278.80 MiB │
└─────────────────────────────────┘
```
:::note
The above size is for a replicated table, if you are using this dataset with a single instance the size will be half.
:::
## Run Some Queries {#run-queries}
### Query 1. Average Price Per Year {#average-price}
@ -144,7 +146,7 @@ SELECT toYear(date) AS year, round(avg(price)) AS price, bar(price, 0, 1000000,
Result:
```response
```text
┌─year─┬──price─┬─bar(round(avg(price)), 0, 1000000, 80)─┐
│ 1995 │ 67932 │ █████▍ │
│ 1996 │ 71505 │ █████▋ │

View File

@ -175,6 +175,10 @@ You can also choose to use [HTTP compression](https://en.wikipedia.org/wiki/HTTP
- `br`
- `deflate`
- `xz`
- `zstd`
- `lz4`
- `bz2`
- `snappy`
To send a compressed `POST` request, append the request header `Content-Encoding: compression_method`.
In order for ClickHouse to compress the response, enable compression with [enable_http_compression](../operations/settings/settings.md#settings-enable_http_compression) setting and append `Accept-Encoding: compression_method` header to the request. You can configure the data compression level in the [http_zlib_compression_level](../operations/settings/settings.md#settings-http_zlib_compression_level) setting for all compression methods.

View File

@ -94,6 +94,21 @@ It is also possible for `Flat`, `Hashed`, `ComplexKeyHashed` dictionaries to onl
- If the source is HTTP then `update_field` will be added as a query parameter with the last update time as the parameter value.
- If the source is Executable then `update_field` will be added as an executable script argument with the last update time as the argument value.
- If the source is ClickHouse, MySQL, PostgreSQL, ODBC there will be an additional part of `WHERE`, where `update_field` is compared as greater or equal with the last update time.
- Per default, this `WHERE`-condition is checked at the highest level of the SQL-Query. Alternatively, the condition can be checked in any other `WHERE`-clause within the query using the `{condition}`-keyword. Example:
```sql
...
SOURCE(CLICKHOUSE(...
update_field 'added_time'
QUERY '
SELECT my_arr.1 AS x, my_arr.2 AS y, creation_time
FROM (
SELECT arrayZip(x_arr, y_arr) AS my_arr, creation_time
FROM dictionary_source
WHERE {condition}
)'
))
...
```
If `update_field` option is set, additional option `update_lag` can be set. Value of `update_lag` option is subtracted from previous update time before request updated data.

View File

@ -267,7 +267,7 @@ Result:
└────────────────┘
```
:::Attention
:::note
The return type of `toStartOf*`, `toLastDayOfMonth`, `toMonday` functions described below is `Date` or `DateTime`.
Though these functions can take values of the extended types `Date32` and `DateTime64` as an argument, passing them a time outside the normal range (year 1970 to 2149 for `Date` / 2106 for `DateTime`) will produce wrong results.
In case argument is out of normal range:

View File

@ -430,5 +430,119 @@ Result:
└────────────────────────────┘
```
## mapApply
**Syntax**
```sql
mapApply(func, map)
```
**Parameters**
- `func` - [Lamda function](../../sql-reference/functions/index.md#higher-order-functions---operator-and-lambdaparams-expr-function).
- `map` — [Map](../../sql-reference/data-types/map.md).
**Returned value**
- Returns a map obtained from the original map by application of `func(map1[i], …, mapN[i])` for each element.
**Example**
Query:
```sql
SELECT mapApply((k, v) -> (k, v * 10), _map) AS r
FROM
(
SELECT map('key1', number, 'key2', number * 2) AS _map
FROM numbers(3)
)
```
Result:
```text
┌─r─────────────────────┐
│ {'key1':0,'key2':0} │
│ {'key1':10,'key2':20} │
│ {'key1':20,'key2':40} │
└───────────────────────┘
```
## mapFilter
**Syntax**
```sql
mapFilter(func, map)
```
**Parameters**
- `func` - [Lamda function](../../sql-reference/functions/index.md#higher-order-functions---operator-and-lambdaparams-expr-function).
- `map` — [Map](../../sql-reference/data-types/map.md).
**Returned value**
- Returns a map containing only the elements in `map` for which `func(map1[i], …, mapN[i])` returns something other than 0.
**Example**
Query:
```sql
SELECT mapFilter((k, v) -> ((v % 2) = 0), _map) AS r
FROM
(
SELECT map('key1', number, 'key2', number * 2) AS _map
FROM numbers(3)
)
```
Result:
```text
┌─r───────────────────┐
│ {'key1':0,'key2':0} │
│ {'key2':2} │
│ {'key1':2,'key2':4} │
└─────────────────────┘
```
## mapUpdate
**Syntax**
```sql
mapUpdate(map1, map2)
```
**Parameters**
- `map1` [Map](../../sql-reference/data-types/map.md).
- `map2` [Map](../../sql-reference/data-types/map.md).
**Returned value**
- Returns a map1 with values updated of values for the corresponding keys in map2.
**Example**
Query:
```sql
SELECT mapUpdate(map('key1', 0, 'key3', 0), map('key1', 10, 'key2', 10)) AS map;
```
Result:
```text
┌─map────────────────────────────┐
│ {'key3':0,'key1':10,'key2':10} │
└────────────────────────────────┘
```
[Original article](https://clickhouse.com/docs/en/sql-reference/functions/tuple-map-functions/) <!--hide-->

View File

@ -303,7 +303,7 @@ SHOW USERS
## SHOW ROLES
Returns a list of [roles](../../operations/access-rights.md#role-management). To view another parameters, see system tables [system.roles](../../operations/system-tables/roles.md#system_tables-roles) and [system.role-grants](../../operations/system-tables/role-grants.md#system_tables-role_grants).
Returns a list of [roles](../../operations/access-rights.md#role-management). To view another parameters, see system tables [system.roles](../../operations/system-tables/roles.md#system_tables-roles) and [system.role_grants](../../operations/system-tables/role-grants.md#system_tables-role_grants).
### Syntax

View File

@ -267,7 +267,7 @@ SELECT toUnixTimestamp('2017-11-05 08:07:47', 'Asia/Tokyo') AS unix_timestamp;
└────────────────┘
```
:::Attention
:::note
Тип возвращаемого описанными далее функциями `toStartOf*`, `toMonday` значения - `Date` или `DateTime`.
Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат.
Возвращаемые значения для значений вне нормального диапазона:
@ -277,7 +277,7 @@ SELECT toUnixTimestamp('2017-11-05 08:07:47', 'Asia/Tokyo') AS unix_timestamp;
* `2149-05-31` будет результатом функции `toLastDayOfMonth` при обработке аргумента больше `2149-05-31`.
:::
:::Attention
:::note
Тип возвращаемого описанными далее функциями `toStartOf*`, `toLastDayOfMonth`, `toMonday` значения - `Date` или `DateTime`.
Хотя эти функции могут принимать значения типа `Date32` или `DateTime64` в качестве аргумента, при обработке аргумента вне нормального диапазона значений (`1970` - `2148` для `Date` и `1970-01-01 00:00:00`-`2106-02-07 08:28:15` для `DateTime`) будет получен некорректный результат.
Возвращаемые значения для значений вне нормального диапазона:

View File

@ -305,7 +305,7 @@ SHOW USERS
## SHOW ROLES {#show-roles-statement}
Выводит список [ролей](../../operations/access-rights.md#role-management). Для просмотра параметров ролей, см. системные таблицы [system.roles](../../operations/system-tables/roles.md#system_tables-roles) и [system.role-grants](../../operations/system-tables/role-grants.md#system_tables-role_grants).
Выводит список [ролей](../../operations/access-rights.md#role-management). Для просмотра параметров ролей, см. системные таблицы [system.roles](../../operations/system-tables/roles.md#system_tables-roles) и [system.role_grants](../../operations/system-tables/role-grants.md#system_tables-role_grants).
### Синтаксис {#show-roles-syntax}

View File

@ -0,0 +1,67 @@
#pragma once
#include "ICommand.h"
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class CommandMkDir : public ICommand
{
public:
CommandMkDir()
{
command_name = "mkdir";
command_option_description.emplace(createOptionsDescription("Allowed options", getTerminalWidth()));
description = "Create directory or directories recursively";
usage = "mkdir [OPTION]... <PATH>";
command_option_description->add_options()
("recursive", "recursively create directories")
;
}
void processOptions(
Poco::Util::LayeredConfiguration & config,
po::variables_map & options) const override
{
if (options.count("recursive"))
config.setBool("recursive", true);
}
void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
Poco::Util::LayeredConfiguration & config) override
{
if (command_arguments.size() != 1)
{
printHelpMessage();
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
}
String disk_name = config.getString("disk", "default");
String path = command_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path = fullPathWithValidate(disk, path);
bool recursive = config.getBool("recursive", false);
if (recursive)
disk->createDirectories(full_path);
else
disk->createDirectory(full_path);
}
};
}
std::unique_ptr <DB::ICommand> makeCommandMkDir()
{
return std::make_unique<DB::CommandMkDir>();
}

View File

@ -63,7 +63,7 @@ void DisksApp::addOptions(
positional_options_description.add("command_name", 1);
supported_commands = {"list-disks", "list", "move", "remove", "link", "copy", "write", "read"};
supported_commands = {"list-disks", "list", "move", "remove", "link", "copy", "write", "read", "mkdir"};
command_descriptions.emplace("list-disks", makeCommandListDisks());
command_descriptions.emplace("list", makeCommandList());
@ -73,6 +73,7 @@ void DisksApp::addOptions(
command_descriptions.emplace("copy", makeCommandCopy());
command_descriptions.emplace("write", makeCommandWrite());
command_descriptions.emplace("read", makeCommandRead());
command_descriptions.emplace("mkdir", makeCommandMkDir());
}
void DisksApp::processOptions()

View File

@ -4,6 +4,7 @@
#include "CommandLink.cpp"
#include "CommandList.cpp"
#include "CommandListDisks.cpp"
#include "CommandMkDir.cpp"
#include "CommandMove.cpp"
#include "CommandRead.cpp"
#include "CommandRemove.cpp"

View File

@ -65,3 +65,4 @@ std::unique_ptr <DB::ICommand> makeCommandMove();
std::unique_ptr <DB::ICommand> makeCommandRead();
std::unique_ptr <DB::ICommand> makeCommandRemove();
std::unique_ptr <DB::ICommand> makeCommandWrite();
std::unique_ptr <DB::ICommand> makeCommandMkDir();

View File

@ -52,15 +52,10 @@ void CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (current_thread)
{
Int64 will_be = current_thread->untracked_memory + size;
Int64 limit = current_thread->untracked_memory_limit + current_thread->untracked_memory_limit_increase;
if (will_be > limit)
if (will_be > current_thread->untracked_memory_limit)
{
/// Increase limit before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
current_thread->untracked_memory_limit_increase = current_thread->untracked_memory_limit;
memory_tracker->allocImpl(will_be, throw_if_memory_exceeded);
current_thread->untracked_memory_limit_increase = 0;
current_thread->untracked_memory = 0;
}
else

View File

@ -133,8 +133,6 @@ public:
Int64 untracked_memory = 0;
/// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.
Int64 untracked_memory_limit = 4 * 1024 * 1024;
/// Increase limit in case of exception.
Int64 untracked_memory_limit_increase = 0;
/// Statistics of read and write rows/bytes
Progress progress_in;

View File

@ -366,6 +366,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, partial_merge_join_left_table_buffer_bytes, 0, "If not 0 group left table blocks in bigger ones for left-side table in partial merge join. It uses up to 2x of specified memory per joining thread.", 0) \
M(UInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \
M(UInt64, join_on_disk_max_files_to_merge, 64, "For MergeJoin on disk set how much files it's allowed to sort simultaneously. Then this value bigger then more memory used and then less disk I/O needed. Minimum is 2.", 0) \
M(UInt64, max_rows_in_set_to_optimize_join, 100'000, "Maximal size of the set to filter joined tables by each other row sets before joining. 0 - disable.", 0) \
\
M(Bool, compatibility_ignore_collation_in_create_table, true, "Compatibility ignore collation in create table", 0) \
\
M(String, temporary_files_codec, "LZ4", "Set compression codec for temporary files (sort and join on disk). I.e. LZ4, NONE.", 0) \

View File

@ -0,0 +1,113 @@
#include <Interpreters/AggregationUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
OutputBlockColumns prepareOutputBlockColumns(
const Aggregator::Params & params,
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions,
const Block & res_header,
Arenas & aggregates_pools,
bool final,
size_t rows)
{
MutableColumns key_columns(params.keys_size);
MutableColumns aggregate_columns(params.aggregates_size);
MutableColumns final_aggregate_columns(params.aggregates_size);
Aggregator::AggregateColumnsData aggregate_columns_data(params.aggregates_size);
for (size_t i = 0; i < params.keys_size; ++i)
{
key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
key_columns[i]->reserve(rows);
}
for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (!final)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
aggregate_columns[i] = res_header.getByName(aggregate_column_name).type->createColumn();
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
for (auto & pool : aggregates_pools)
column_aggregate_func.addArena(pool);
aggregate_columns_data[i] = &column_aggregate_func.getData();
aggregate_columns_data[i]->reserve(rows);
}
else
{
final_aggregate_columns[i] = aggregate_functions[i]->getReturnType()->createColumn();
final_aggregate_columns[i]->reserve(rows);
if (aggregate_functions[i]->isState())
{
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(final_aggregate_columns[i].get()))
for (auto & pool : aggregates_pools)
column_aggregate_func->addArena(pool);
/// Aggregate state can be wrapped into array if aggregate function ends with -Resample combinator.
final_aggregate_columns[i]->forEachSubcolumn(
[&aggregates_pools](auto & subcolumn)
{
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(subcolumn.get()))
for (auto & pool : aggregates_pools)
column_aggregate_func->addArena(pool);
});
}
}
}
if (key_columns.size() != params.keys_size)
throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};
std::vector<IColumn *> raw_key_columns;
raw_key_columns.reserve(key_columns.size());
for (auto & column : key_columns)
raw_key_columns.push_back(column.get());
return {
.key_columns = std::move(key_columns),
.raw_key_columns = std::move(raw_key_columns),
.aggregate_columns = std::move(aggregate_columns),
.final_aggregate_columns = std::move(final_aggregate_columns),
.aggregate_columns_data = std::move(aggregate_columns_data),
};
}
Block finalizeBlock(const Aggregator::Params & params, const Block & res_header, OutputBlockColumns && out_cols, bool final, size_t rows)
{
auto && [key_columns, raw_key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;
Block res = res_header.cloneEmpty();
for (size_t i = 0; i < params.keys_size; ++i)
res.getByPosition(i).column = std::move(key_columns[i]);
for (size_t i = 0; i < params.aggregates_size; ++i)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
if (final)
res.getByName(aggregate_column_name).column = std::move(final_aggregate_columns[i]);
else
res.getByName(aggregate_column_name).column = std::move(aggregate_columns[i]);
}
/// Change the size of the columns-constants in the block.
size_t columns = res_header.columns();
for (size_t i = 0; i < columns; ++i)
if (isColumnConst(*res.getByPosition(i).column))
res.getByPosition(i).column = res.getByPosition(i).column->cut(0, rows);
return res;
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Interpreters/Aggregator.h>
namespace DB
{
struct OutputBlockColumns
{
MutableColumns key_columns;
std::vector<IColumn *> raw_key_columns;
MutableColumns aggregate_columns;
MutableColumns final_aggregate_columns;
Aggregator::AggregateColumnsData aggregate_columns_data;
};
OutputBlockColumns prepareOutputBlockColumns(
const Aggregator::Params & params,
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions,
const Block & res_header,
Arenas & aggregates_pools,
bool final,
size_t rows);
Block finalizeBlock(const Aggregator::Params & params, const Block & res_header, OutputBlockColumns && out_cols, bool final, size_t rows);
}

View File

@ -34,6 +34,8 @@
#include <Parsers/ASTSelectQuery.h>
#include <Interpreters/AggregationUtils.h>
namespace ProfileEvents
{
extern const Event ExternalAggregationWritePart;
@ -1587,16 +1589,10 @@ Block Aggregator::convertOneBucketToBlock(
bool final,
size_t bucket) const
{
Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(),
[bucket, &method, arena, this] (
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
bool final_)
{
convertToBlockImpl(method, method.data.impls[bucket],
key_columns, aggregate_columns, final_aggregate_columns, arena, final_);
});
// Used in ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform (expects single chunk for each bucket_id).
constexpr bool return_single_block = true;
Block block = convertToBlockImpl<return_single_block>(
method, method.data.impls[bucket], arena, data_variants.aggregates_pools, final, method.data.impls[bucket].size());
block.info.bucket_num = bucket;
return block;
@ -1702,26 +1698,17 @@ bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const
}
template <typename Method, typename Table>
void Aggregator::convertToBlockImpl(
Method & method,
Table & data,
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final) const
template <bool return_single_block, typename Method, typename Table>
Aggregator::ConvertToBlockRes<return_single_block>
Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const
{
if (data.empty())
return;
{
auto && out_cols = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), aggregates_pools, final, rows);
return {finalizeBlock(params, getHeader(final), std::move(out_cols), final, rows)};
}
if (key_columns.size() != params.keys_size)
throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};
std::vector<IColumn *> raw_key_columns;
raw_key_columns.reserve(key_columns.size());
for (auto & column : key_columns)
raw_key_columns.push_back(column.get());
ConvertToBlockRes<return_single_block> res;
if (final)
{
@ -1729,20 +1716,23 @@ void Aggregator::convertToBlockImpl(
if (compiled_aggregate_functions_holder)
{
static constexpr bool use_compiled_functions = !Method::low_cardinality_optimization;
convertToBlockImplFinal<Method, use_compiled_functions>(method, data, std::move(raw_key_columns), final_aggregate_columns, arena);
res = convertToBlockImplFinal<Method, use_compiled_functions, return_single_block>(method, data, arena, aggregates_pools, rows);
}
else
#endif
{
convertToBlockImplFinal<Method, false>(method, data, std::move(raw_key_columns), final_aggregate_columns, arena);
res = convertToBlockImplFinal<Method, false, return_single_block>(method, data, arena, aggregates_pools, rows);
}
}
else
{
convertToBlockImplNotFinal(method, data, std::move(raw_key_columns), aggregate_columns);
res = convertToBlockImplNotFinal<return_single_block>(method, data, aggregates_pools, rows);
}
/// In order to release memory early.
data.clearAndShrink();
return res;
}
@ -1811,38 +1801,9 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu
}
template <typename Method, bool use_compiled_functions, typename Table>
void NO_INLINE Aggregator::convertToBlockImplFinal(
Method & method,
Table & data,
std::vector<IColumn *> key_columns,
MutableColumns & final_aggregate_columns,
Arena * arena) const
template <bool use_compiled_functions>
Block Aggregator::insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & places, OutputBlockColumns && out_cols, Arena * arena) const
{
if constexpr (Method::low_cardinality_optimization)
{
if (data.hasNullKeyData())
{
key_columns[0]->insertDefault();
insertAggregatesIntoColumns(data.getNullKeyData(), final_aggregate_columns, arena);
}
}
auto shuffled_key_sizes = method.shuffleKeyColumns(key_columns, key_sizes);
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
PaddedPODArray<AggregateDataPtr> places;
places.reserve(data.size());
data.forEachValue([&](const auto & key, auto & mapped)
{
method.insertKeyIntoColumns(key, key_columns, key_sizes_ref);
places.emplace_back(mapped);
/// Mark the cell as destroyed so it will not be destroyed in destructor.
mapped = nullptr;
});
std::exception_ptr exception;
size_t aggregate_functions_destroy_index = 0;
@ -1863,7 +1824,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
if (!is_aggregate_function_compiled[i])
continue;
auto & final_aggregate_column = final_aggregate_columns[i];
auto & final_aggregate_column = out_cols.final_aggregate_columns[i];
final_aggregate_column = final_aggregate_column->cloneResized(places.size());
columns_data.emplace_back(getColumnData(final_aggregate_column.get()));
}
@ -1884,7 +1845,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
}
}
auto & final_aggregate_column = final_aggregate_columns[aggregate_functions_destroy_index];
auto & final_aggregate_column = out_cols.final_aggregate_columns[aggregate_functions_destroy_index];
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
/** We increase aggregate_functions_destroy_index because by function contract if insertResultIntoBatch
@ -1898,7 +1859,8 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
bool is_state = aggregate_functions[destroy_index]->isState();
bool destroy_place_after_insert = !is_state;
aggregate_functions[destroy_index]->insertResultIntoBatch(0, places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
aggregate_functions[destroy_index]->insertResultIntoBatch(
0, places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
}
}
catch (...)
@ -1923,125 +1885,155 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
if (exception)
std::rethrow_exception(exception);
return finalizeBlock(params, getHeader(/* final */ true), std::move(out_cols), /* final */ true, places.size());
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::convertToBlockImplNotFinal(
Method & method,
Table & data,
std::vector<IColumn *> key_columns,
AggregateColumnsData & aggregate_columns) const
template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t) const
{
const size_t max_block_size = params.max_block_size;
const bool final = true;
ConvertToBlockRes<return_single_block> res;
std::optional<OutputBlockColumns> out_cols;
std::optional<Sizes> shuffled_key_sizes;
PaddedPODArray<AggregateDataPtr> places;
auto init_out_cols = [&]()
{
out_cols = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), aggregates_pools, final, max_block_size);
if constexpr (Method::low_cardinality_optimization)
{
if (data.hasNullKeyData())
{
key_columns[0]->insertDefault();
out_cols->key_columns[0]->insertDefault();
insertAggregatesIntoColumns(data.getNullKeyData(), out_cols->final_aggregate_columns, arena);
data.hasNullKeyData() = false;
}
}
shuffled_key_sizes = method.shuffleKeyColumns(out_cols->raw_key_columns, key_sizes);
places.reserve(max_block_size);
};
// should be invoked at least once, because null data might be the only content of the `data`
init_out_cols();
data.forEachValue(
[&](const auto & key, auto & mapped)
{
if (!out_cols.has_value())
init_out_cols();
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
places.emplace_back(mapped);
/// Mark the cell as destroyed so it will not be destroyed in destructor.
mapped = nullptr;
if constexpr (!return_single_block)
{
if (places.size() >= max_block_size)
{
res.emplace_back(insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena));
places.clear();
out_cols.reset();
}
}
});
if constexpr (return_single_block)
{
return insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena);
}
else
{
if (out_cols.has_value())
res.emplace_back(insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena));
return res;
}
}
template <bool return_single_block, typename Method, typename Table>
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t) const
{
const size_t max_block_size = params.max_block_size;
const bool final = false;
ConvertToBlockRes<return_single_block> res;
std::optional<OutputBlockColumns> out_cols;
std::optional<Sizes> shuffled_key_sizes;
auto init_out_cols = [&]()
{
out_cols = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), aggregates_pools, final, max_block_size);
if constexpr (Method::low_cardinality_optimization)
{
if (data.hasNullKeyData())
{
out_cols->raw_key_columns[0]->insertDefault();
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]);
out_cols->aggregate_columns_data[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]);
data.getNullKeyData() = nullptr;
data.hasNullKeyData() = false;
}
}
auto shuffled_key_sizes = method.shuffleKeyColumns(key_columns, key_sizes);
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
shuffled_key_sizes = method.shuffleKeyColumns(out_cols->raw_key_columns, key_sizes);
};
data.forEachValue([&](const auto & key, auto & mapped)
// should be invoked at least once, because null data might be the only content of the `data`
init_out_cols();
size_t rows_in_current_block = 0;
data.forEachValue(
[&](const auto & key, auto & mapped)
{
method.insertKeyIntoColumns(key, key_columns, key_sizes_ref);
if (!out_cols.has_value())
init_out_cols();
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
/// reserved, so push_back does not throw exceptions
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(mapped + offsets_of_aggregate_states[i]);
out_cols->aggregate_columns_data[i]->push_back(mapped + offsets_of_aggregate_states[i]);
mapped = nullptr;
++rows_in_current_block;
if constexpr (!return_single_block)
{
if (rows_in_current_block >= max_block_size)
{
res.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols.value()), final, rows_in_current_block));
out_cols.reset();
rows_in_current_block = 0;
}
}
});
}
template <typename Filler>
Block Aggregator::prepareBlockAndFill(
AggregatedDataVariants & data_variants,
bool final,
size_t rows,
Filler && filler) const
if constexpr (return_single_block)
{
MutableColumns key_columns(params.keys_size);
MutableColumns aggregate_columns(params.aggregates_size);
MutableColumns final_aggregate_columns(params.aggregates_size);
AggregateColumnsData aggregate_columns_data(params.aggregates_size);
Block res_header = getHeader(final);
for (size_t i = 0; i < params.keys_size; ++i)
{
key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
key_columns[i]->reserve(rows);
}
for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (!final)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
aggregate_columns[i] = res_header.getByName(aggregate_column_name).type->createColumn();
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
for (auto & pool : data_variants.aggregates_pools)
column_aggregate_func.addArena(pool);
aggregate_columns_data[i] = &column_aggregate_func.getData();
aggregate_columns_data[i]->reserve(rows);
return finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block);
}
else
{
final_aggregate_columns[i] = aggregate_functions[i]->getReturnType()->createColumn();
final_aggregate_columns[i]->reserve(rows);
if (aggregate_functions[i]->isState())
{
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(final_aggregate_columns[i].get()))
for (auto & pool : data_variants.aggregates_pools)
column_aggregate_func->addArena(pool);
/// Aggregate state can be wrapped into array if aggregate function ends with -Resample combinator.
final_aggregate_columns[i]->forEachSubcolumn([&data_variants](auto & subcolumn)
{
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(subcolumn.get()))
for (auto & pool : data_variants.aggregates_pools)
column_aggregate_func->addArena(pool);
});
if (rows_in_current_block)
res.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block));
return res;
}
}
}
filler(key_columns, aggregate_columns_data, final_aggregate_columns, final);
Block res = res_header.cloneEmpty();
for (size_t i = 0; i < params.keys_size; ++i)
res.getByPosition(i).column = std::move(key_columns[i]);
for (size_t i = 0; i < params.aggregates_size; ++i)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
if (final)
res.getByName(aggregate_column_name).column = std::move(final_aggregate_columns[i]);
else
res.getByName(aggregate_column_name).column = std::move(aggregate_columns[i]);
}
/// Change the size of the columns-constants in the block.
size_t columns = res_header.columns();
for (size_t i = 0; i < columns; ++i)
if (isColumnConst(*res.getByPosition(i).column))
res.getByPosition(i).column = res.getByPosition(i).column->cut(0, rows);
return res;
}
@ -2105,13 +2097,10 @@ void Aggregator::createStatesAndFillKeyColumnsWithSingleKey(
Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const
{
size_t rows = 1;
auto && out_cols
= prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), data_variants.aggregates_pools, final, rows);
auto && [key_columns, raw_key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;
auto filler = [&data_variants, this](
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
bool final_)
{
if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
{
AggregatedDataWithoutKey & data = data_variants.without_key;
@ -2119,10 +2108,10 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
if (!data)
throw Exception("Wrong data variant passed.", ErrorCodes::LOGICAL_ERROR);
if (!final_)
if (!final)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]);
aggregate_columns_data[i]->push_back(data + offsets_of_aggregate_states[i]);
data = nullptr;
}
else
@ -2135,9 +2124,8 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i]->insertDefault();
}
};
Block block = prepareBlockAndFill(data_variants, final, rows, filler);
Block block = finalizeBlock(params, getHeader(final), std::move(out_cols), final, rows);
if (is_overflows)
block.info.is_overflows = true;
@ -2148,29 +2136,22 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
return block;
}
Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const
{
size_t rows = data_variants.sizeWithoutOverflowRow();
auto filler = [&data_variants, this](
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
bool final_)
template <bool return_single_block>
Aggregator::ConvertToBlockRes<return_single_block>
Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const
{
const size_t rows = data_variants.sizeWithoutOverflowRow();
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \
key_columns, aggregate_columns, final_aggregate_columns, data_variants.aggregates_pool, final_);
{ \
return convertToBlockImpl<return_single_block>( \
*data_variants.NAME, data_variants.NAME->data, data_variants.aggregates_pool, data_variants.aggregates_pools, final, rows); \
}
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
};
return prepareBlockAndFill(data_variants, final, rows, filler);
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
@ -2292,7 +2273,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
if (data_variants.type != AggregatedDataVariants::Type::without_key)
{
if (!data_variants.isTwoLevel())
blocks.emplace_back(prepareBlockAndFillSingleLevel(data_variants, final));
blocks.splice(blocks.end(), prepareBlockAndFillSingleLevel</* return_single_block */ false>(data_variants, final));
else
blocks.splice(blocks.end(), prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get()));
}
@ -3044,9 +3025,15 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
Block block;
if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
{
block = prepareBlockAndFillWithoutKey(result, final, is_overflows);
}
else
block = prepareBlockAndFillSingleLevel(result, final);
{
// Used during memory efficient merging (SortingAggregatedTransform expects single chunk for each bucket_id).
constexpr bool return_single_block = true;
block = prepareBlockAndFillSingleLevel<return_single_block>(result, final);
}
/// NOTE: two-level data is not possible here - chooseAggregationMethod chooses only among single-level methods.
if (!final)
@ -3247,4 +3234,6 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) cons
}
template Aggregator::ConvertToBlockRes<false>
Aggregator::prepareBlockAndFillSingleLevel<false>(AggregatedDataVariants & data_variants, bool final) const;
}

View File

@ -1,8 +1,9 @@
#pragma once
#include <mutex>
#include <memory>
#include <functional>
#include <memory>
#include <mutex>
#include <type_traits>
#include <Common/logger_useful.h>
@ -872,6 +873,7 @@ using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants
class CompiledAggregateFunctionsHolder;
class NativeWriter;
struct OutputBlockColumns;
/** How are "total" values calculated with WITH TOTALS?
* (For more details, see TotalsHavingTransform.)
@ -933,6 +935,8 @@ public:
bool compile_aggregate_expressions;
size_t min_count_to_compile_aggregate_expression;
size_t max_block_size;
bool only_merge;
struct StatsCollectingParams
@ -969,6 +973,7 @@ public:
size_t min_free_disk_space_,
bool compile_aggregate_expressions_,
size_t min_count_to_compile_aggregate_expression_,
size_t max_block_size_,
bool only_merge_ = false, // true for projections
const StatsCollectingParams & stats_collecting_params_ = {})
: keys(keys_)
@ -987,15 +992,16 @@ public:
, min_free_disk_space(min_free_disk_space_)
, compile_aggregate_expressions(compile_aggregate_expressions_)
, min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
, max_block_size(max_block_size_)
, only_merge(only_merge_)
, stats_collecting_params(stats_collecting_params_)
{
}
/// Only parameters that matter during merge.
Params(const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
Params(const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_, size_t max_block_size_)
: Params(
keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, true, {})
keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, max_block_size_, true, {})
{
}
@ -1277,15 +1283,12 @@ private:
void mergeSingleLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
template <typename Method, typename Table>
void convertToBlockImpl(
Method & method,
Table & data,
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final) const;
template <bool return_single_block>
using ConvertToBlockRes = std::conditional_t<return_single_block, Block, BlocksList>;
template <bool return_single_block, typename Method, typename Table>
ConvertToBlockRes<return_single_block>
convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const;
template <typename Mapped>
void insertAggregatesIntoColumns(
@ -1293,27 +1296,16 @@ private:
MutableColumns & final_aggregate_columns,
Arena * arena) const;
template <typename Method, bool use_compiled_functions, typename Table>
void convertToBlockImplFinal(
Method & method,
Table & data,
std::vector<IColumn *> key_columns,
MutableColumns & final_aggregate_columns,
Arena * arena) const;
template <bool use_compiled_functions>
Block insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & places, OutputBlockColumns && out_cols, Arena * arena) const;
template <typename Method, typename Table>
void convertToBlockImplNotFinal(
Method & method,
Table & data,
std::vector<IColumn *> key_columns,
AggregateColumnsData & aggregate_columns) const;
template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
ConvertToBlockRes<return_single_block>
convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t rows) const;
template <typename Filler>
Block prepareBlockAndFill(
AggregatedDataVariants & data_variants,
bool final,
size_t rows,
Filler && filler) const;
template <bool return_single_block, typename Method, typename Table>
ConvertToBlockRes<return_single_block>
convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t rows) const;
template <typename Method>
Block convertOneBucketToBlock(
@ -1331,9 +1323,11 @@ private:
std::atomic<bool> * is_cancelled = nullptr) const;
Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
Block prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const;
template <bool return_single_block>
ConvertToBlockRes<return_single_block> prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
template <typename Method>
BlocksList prepareBlocksAndFillTwoLevelImpl(
AggregatedDataVariants & data_variants,

View File

@ -122,6 +122,7 @@ void FileCache::initialize()
fs::create_directories(cache_base_path);
}
status_file = make_unique<StatusFile>(fs::path(cache_base_path) / "status", StatusFile::write_full_info);
is_initialized = true;
}
}
@ -963,12 +964,19 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock
fs::directory_iterator key_prefix_it{cache_base_path};
for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it)
{
if (!key_prefix_it->is_directory())
{
if (key_prefix_it->path().filename() != "status")
LOG_DEBUG(log, "Unexpected file {} (not a directory), will skip it", key_prefix_it->path().string());
continue;
}
fs::directory_iterator key_it{key_prefix_it->path()};
for (; key_it != fs::directory_iterator(); ++key_it)
{
if (!key_it->is_directory())
{
LOG_WARNING(log, "Unexpected file: {}. Expected a directory", key_it->path().string());
LOG_DEBUG(log, "Unexpected file {} (not a directory), will skip it", key_it->path().string());
continue;
}

View File

@ -18,6 +18,7 @@
#include <Interpreters/Cache/IFileCachePriority.h>
#include <Common/logger_useful.h>
#include <Interpreters/Cache/FileCacheKey.h>
#include <Common/StatusFile.h>
namespace DB
{
@ -143,6 +144,7 @@ private:
bool is_initialized = false;
std::exception_ptr initialization_exception;
std::unique_ptr<StatusFile> status_file;
mutable std::mutex mutex;

View File

@ -39,6 +39,7 @@
#include <QueryPipeline/Pipe.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
@ -1436,7 +1437,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (!joined_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no joined plan for query");
auto add_sorting = [&settings, this] (QueryPlan & plan, const Names & key_names, bool is_right)
auto add_sorting = [&settings, this] (QueryPlan & plan, const Names & key_names, JoinTableSide join_pos)
{
SortDescription order_descr;
order_descr.reserve(key_names.size());
@ -1455,15 +1456,43 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
this->context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data,
settings.optimize_sorting_by_input_stream_properties);
sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", is_right ? "right" : "left"));
sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", join_pos));
plan.addStep(std::move(sorting_step));
};
auto crosswise_connection = CreateSetAndFilterOnTheFlyStep::createCrossConnection();
auto add_create_set = [&settings, crosswise_connection](QueryPlan & plan, const Names & key_names, JoinTableSide join_pos)
{
auto creating_set_step = std::make_unique<CreateSetAndFilterOnTheFlyStep>(
plan.getCurrentDataStream(), key_names, settings.max_rows_in_set_to_optimize_join, crosswise_connection, join_pos);
creating_set_step->setStepDescription(fmt::format("Create set and filter {} joined stream", join_pos));
auto * step_raw_ptr = creating_set_step.get();
plan.addStep(std::move(creating_set_step));
return step_raw_ptr;
};
if (expressions.join->pipelineType() == JoinPipelineType::YShaped)
{
const auto & join_clause = expressions.join->getTableJoin().getOnlyClause();
add_sorting(query_plan, join_clause.key_names_left, false);
add_sorting(*joined_plan, join_clause.key_names_right, true);
const auto & table_join = expressions.join->getTableJoin();
const auto & join_clause = table_join.getOnlyClause();
auto join_kind = table_join.kind();
bool kind_allows_filtering = isInner(join_kind) || isLeft(join_kind) || isRight(join_kind);
if (settings.max_rows_in_set_to_optimize_join > 0 && kind_allows_filtering)
{
auto * left_set = add_create_set(query_plan, join_clause.key_names_left, JoinTableSide::Left);
auto * right_set = add_create_set(*joined_plan, join_clause.key_names_right, JoinTableSide::Right);
if (isInnerOrLeft(join_kind))
right_set->setFiltering(left_set->getSet());
if (isInnerOrRight(join_kind))
left_set->setFiltering(right_set->getSet());
}
add_sorting(query_plan, join_clause.key_names_left, JoinTableSide::Left);
add_sorting(*joined_plan, join_clause.key_names_right, JoinTableSide::Right);
}
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
@ -1734,7 +1763,7 @@ static void executeMergeAggregatedImpl(
* but it can work more slowly.
*/
Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads);
Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads, settings.max_block_size);
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
@ -2330,6 +2359,7 @@ static Aggregator::Params getAggregatorParams(
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression,
settings.max_block_size,
/* only_merge */ false,
stats_collecting_params
};

View File

@ -22,6 +22,8 @@
#include <Interpreters/castColumn.h>
#include <Interpreters/Context.h>
#include <Processors/Chunk.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <base/range.h>
@ -162,8 +164,16 @@ void Set::setHeader(const ColumnsWithTypeAndName & header)
data.init(data.chooseMethod(key_columns, key_sizes));
}
bool Set::insertFromBlock(const ColumnsWithTypeAndName & columns)
{
Columns cols;
cols.reserve(columns.size());
for (const auto & column : columns)
cols.emplace_back(column.column);
return insertFromBlock(cols);
}
bool Set::insertFromBlock(const Columns & columns)
{
std::lock_guard<std::shared_mutex> lock(rwlock);
@ -179,11 +189,11 @@ bool Set::insertFromBlock(const ColumnsWithTypeAndName & columns)
/// Remember the columns we will work with
for (size_t i = 0; i < keys_size; ++i)
{
materialized_columns.emplace_back(columns.at(i).column->convertToFullIfNeeded());
materialized_columns.emplace_back(columns.at(i)->convertToFullIfNeeded());
key_columns.emplace_back(materialized_columns.back().get());
}
size_t rows = columns.at(0).column->size();
size_t rows = columns.at(0)->size();
/// We will insert to the Set only keys, where all components are not NULL.
ConstNullMapPtr null_map{};
@ -393,7 +403,13 @@ void Set::checkColumnsNumber(size_t num_key_columns) const
bool Set::areTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const
{
return removeNullable(recursiveRemoveLowCardinality(data_types[set_type_idx]))->equals(*removeNullable(recursiveRemoveLowCardinality(other_type)));
/// Out-of-bound access can happen when same set expression built with different columns.
/// Caller may call this method to make sure that the set is indeed the one they want
/// without awaring data_types.size().
if (set_type_idx >= data_types.size())
return false;
return removeNullable(recursiveRemoveLowCardinality(data_types[set_type_idx]))
->equals(*removeNullable(recursiveRemoveLowCardinality(other_type)));
}
void Set::checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const

View File

@ -20,6 +20,7 @@ class Context;
class IFunctionBase;
using FunctionBasePtr = std::shared_ptr<IFunctionBase>;
class Chunk;
/** Data structure for implementation of IN expression.
*/
@ -45,11 +46,14 @@ public:
void setHeader(const ColumnsWithTypeAndName & header);
/// Returns false, if some limit was exceeded and no need to insert more data.
bool insertFromBlock(const Columns & columns);
bool insertFromBlock(const ColumnsWithTypeAndName & columns);
/// Call after all blocks were inserted. To get the information that set is already created.
void finishInsert() { is_created = true; }
bool isCreated() const { return is_created; }
/// finishInsert and isCreated are thread-safe
bool isCreated() const { return is_created.load(); }
/** For columns of 'block', check belonging of corresponding rows to the set.
* Return UInt8 column with the result.
@ -111,7 +115,7 @@ private:
bool transform_null_in;
/// Check if set contains all the data.
bool is_created = false;
std::atomic<bool> is_created = false;
/// If in the left part columns contains the same types as the elements of the set.
void executeOrdinary(

View File

@ -73,16 +73,32 @@ public:
return key_names_right.size();
}
String formatDebug() const
String formatDebug(bool short_format = false) const
{
return fmt::format("Left keys: [{}] Right keys [{}] Condition columns: '{}', '{}'",
fmt::join(key_names_left, ", "), fmt::join(key_names_right, ", "),
condColumnNames().first, condColumnNames().second);
const auto & [left_cond, right_cond] = condColumnNames();
if (short_format)
{
return fmt::format("({}) = ({}){}{}", fmt::join(key_names_left, ", "), fmt::join(key_names_right, ", "),
!left_cond.empty() ? " AND " + left_cond : "", !right_cond.empty() ? " AND " + right_cond : "");
}
return fmt::format(
"Left keys: [{}] Right keys [{}] Condition columns: '{}', '{}'",
fmt::join(key_names_left, ", "), fmt::join(key_names_right, ", "), left_cond, right_cond);
}
};
using Clauses = std::vector<JoinOnClause>;
static std::string formatClauses(const Clauses & clauses, bool short_format = false)
{
std::vector<std::string> res;
for (const auto & clause : clauses)
res.push_back("[" + clause.formatDebug(short_format) + "]");
return fmt::format("{}", fmt::join(res, "; "));
}
private:
/** Query of the form `SELECT expr(x) AS k FROM t1 ANY LEFT JOIN (SELECT expr(x) AS k FROM t2) USING k`
* The join is made by column k.

View File

@ -521,13 +521,18 @@ void removeUnneededColumnsFromSelectClause(ASTSelectQuery * select_query, const
++new_elements_size;
}
/// removing aggregation can change number of rows, so `count()` result in outer sub-query would be wrong
if (func && AggregateUtils::isAggregateFunction(*func) && !select_query->groupBy())
if (func && !select_query->groupBy())
{
GetAggregatesVisitor::Data data = {};
GetAggregatesVisitor(data).visit(elem);
if (!data.aggregates.empty())
{
new_elements[result_index] = elem;
++new_elements_size;
}
}
}
}
/// Remove empty nodes.
std::erase(new_elements, ASTPtr{});

View File

@ -89,7 +89,6 @@ TEST(FileCache, get)
{
if (fs::exists(cache_base_path))
fs::remove_all(cache_base_path);
fs::create_directories(cache_base_path);
DB::ThreadStatus thread_status;
@ -103,6 +102,8 @@ TEST(FileCache, get)
DB::FileCacheSettings settings;
settings.max_size = 30;
settings.max_elements = 5;
{
auto cache = DB::FileCache(cache_base_path, settings);
cache.initialize();
auto key = cache.hash("key1");
@ -471,6 +472,7 @@ TEST(FileCache, get)
printRanges(segments);
ASSERT_TRUE(segments[1]->state() == DB::FileSegment::State::DOWNLOADED);
}
}
/// Current cache: [___][ ][___][_][__]
/// ^ ^^ ^ ^^ ^ ^
@ -481,6 +483,7 @@ TEST(FileCache, get)
auto cache2 = DB::FileCache(cache_base_path, settings);
cache2.initialize();
auto key = cache2.hash("key1");
auto holder1 = cache2.getOrSet(key, 2, 28, false); /// Get [2, 29]
@ -501,6 +504,7 @@ TEST(FileCache, get)
settings2.max_file_segment_size = 10;
auto cache2 = DB::FileCache(caches_dir / "cache2", settings2);
cache2.initialize();
auto key = cache2.hash("key1");
auto holder1 = cache2.getOrSet(key, 0, 25, false); /// Get [0, 24]
auto segments1 = fromHolder(holder1);

View File

@ -0,0 +1,198 @@
#include <Processors/PingPongProcessor.h>
namespace DB
{
/// Create list with `num_ports` of regular ports and 1 auxiliary port with empty header.
template <typename T> requires std::is_same_v<T, InputPorts> || std::is_same_v<T, OutputPorts>
static T createPortsWithSpecial(const Block & header, size_t num_ports)
{
T res(num_ports, header);
res.emplace_back(Block());
return res;
}
PingPongProcessor::PingPongProcessor(const Block & header, size_t num_ports, Order order_)
: IProcessor(createPortsWithSpecial<InputPorts>(header, num_ports),
createPortsWithSpecial<OutputPorts>(header, num_ports))
, aux_in_port(inputs.back())
, aux_out_port(outputs.back())
, order(order_)
{
assert(order == First || order == Second);
port_pairs.resize(num_ports);
auto input_it = inputs.begin();
auto output_it = outputs.begin();
for (size_t i = 0; i < num_ports; ++i)
{
port_pairs[i].input_port = &*input_it;
++input_it;
port_pairs[i].output_port = &*output_it;
++output_it;
}
}
void PingPongProcessor::finishPair(PortsPair & pair)
{
if (!pair.is_finished)
{
pair.output_port->finish();
pair.input_port->close();
pair.is_finished = true;
++num_finished_pairs;
}
}
bool PingPongProcessor::processPair(PortsPair & pair)
{
if (pair.output_port->isFinished())
{
finishPair(pair);
return false;
}
if (pair.input_port->isFinished())
{
finishPair(pair);
return false;
}
if (!pair.output_port->canPush())
{
pair.input_port->setNotNeeded();
return false;
}
pair.input_port->setNeeded();
if (pair.input_port->hasData())
{
Chunk chunk = pair.input_port->pull(true);
ready_to_send |= consume(chunk);
pair.output_port->push(std::move(chunk));
}
return true;
}
bool PingPongProcessor::isPairsFinished() const
{
return num_finished_pairs == port_pairs.size();
}
IProcessor::Status PingPongProcessor::processRegularPorts()
{
if (isPairsFinished())
return Status::Finished;
bool need_data = false;
for (auto & pair : port_pairs)
need_data = processPair(pair) || need_data;
if (isPairsFinished())
return Status::Finished;
if (need_data)
return Status::NeedData;
return Status::PortFull;
}
bool PingPongProcessor::sendPing()
{
if (aux_out_port.canPush())
{
Chunk chunk(aux_out_port.getHeader().cloneEmpty().getColumns(), 0);
aux_out_port.push(std::move(chunk));
is_send = true;
aux_out_port.finish();
return true;
}
return false;
}
bool PingPongProcessor::recievePing()
{
if (aux_in_port.hasData())
{
aux_in_port.pull();
is_received = true;
aux_in_port.close();
return true;
}
return false;
}
bool PingPongProcessor::canSend() const
{
return !is_send && (ready_to_send || isPairsFinished());
}
IProcessor::Status PingPongProcessor::prepare()
{
if (!set_needed_once && !is_received && !aux_in_port.isFinished())
{
set_needed_once = true;
aux_in_port.setNeeded();
}
if (order == First || is_send)
{
if (!is_received)
{
bool received = recievePing();
if (!received)
{
return Status::NeedData;
}
}
}
if (order == Second || is_received)
{
if (!is_send && canSend())
{
bool sent = sendPing();
if (!sent)
return Status::PortFull;
}
}
auto status = processRegularPorts();
if (status == Status::Finished)
{
if (order == First || is_send)
{
if (!is_received)
{
bool received = recievePing();
if (!received)
{
return Status::NeedData;
}
}
}
if (order == Second || is_received)
{
if (!is_send && canSend())
{
bool sent = sendPing();
if (!sent)
return Status::PortFull;
}
}
}
return status;
}
std::pair<InputPort *, OutputPort *> PingPongProcessor::getAuxPorts()
{
return std::make_pair(&aux_in_port, &aux_out_port);
}
}

View File

@ -0,0 +1,105 @@
#pragma once
#include <Processors/IProcessor.h>
#include <base/unit.h>
#include <Processors/Chunk.h>
#include <Common/logger_useful.h>
namespace DB
{
/*
* Processor with N inputs and N outputs. Moves data from i-th input to i-th output as is.
* It has a pair of auxiliary ports to notify another instance by sending empty chunk after some condition holds.
* You should use this processor in pair of instances and connect auxiliary ports crosswise.
*
*
* aux
* PingPongProcessor PingPongProcessor
* aux
*
*
* One of the processors starts processing data, and another waits for notification.
* When `consume` returns true, the first stops processing, sends a ping to another and waits for notification.
* After that, the second one also processes data until `consume`, then send a notification back to the first one.
* After this roundtrip, processors bypass data from regular inputs to outputs.
*/
class PingPongProcessor : public IProcessor
{
public:
enum class Order : uint8_t
{
/// Processor that starts processing data.
First,
/// Processor that waits for notification.
Second,
};
using enum Order;
PingPongProcessor(const Block & header, size_t num_ports, Order order_);
Status prepare() override;
std::pair<InputPort *, OutputPort *> getAuxPorts();
/// Returns `true` when enough data consumed
virtual bool consume(const Chunk & chunk) = 0;
protected:
struct PortsPair
{
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
bool is_finished = false;
};
bool sendPing();
bool recievePing();
bool canSend() const;
bool isPairsFinished() const;
bool processPair(PortsPair & pair);
void finishPair(PortsPair & pair);
Status processRegularPorts();
std::vector<PortsPair> port_pairs;
size_t num_finished_pairs = 0;
InputPort & aux_in_port;
OutputPort & aux_out_port;
bool is_send = false;
bool is_received = false;
bool ready_to_send = false;
/// Used to set 'needed' flag once for auxiliary input at first `prepare` call.
bool set_needed_once = false;
Order order;
};
/// Reads first N rows from two streams evenly.
class ReadHeadBalancedProcessor : public PingPongProcessor
{
public:
ReadHeadBalancedProcessor(const Block & header, size_t num_ports, size_t size_to_wait_, Order order_)
: PingPongProcessor(header, num_ports, order_) , data_consumed(0) , size_to_wait(size_to_wait_)
{
}
String getName() const override { return "ReadHeadBalancedProcessor"; }
bool consume(const Chunk & chunk) override
{
data_consumed += chunk.getNumRows();
return data_consumed > size_to_wait;
}
private:
size_t data_consumed;
size_t size_to_wait;
};
}

View File

@ -8,16 +8,16 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
void connect(OutputPort & output, InputPort & input)
void connect(OutputPort & output, InputPort & input, bool reconnect)
{
if (input.state)
if (!reconnect && input.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", input.header.dumpStructure());
if (output.state)
if (!reconnect && output.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", output.header.dumpStructure());
auto out_name = output.getProcessor().getName();
auto in_name = input.getProcessor().getName();
auto out_name = output.processor ? output.getProcessor().getName() : "null";
auto in_name = input.processor ? input.getProcessor().getName() : "null";
assertCompatibleHeader(output.getHeader(), input.getHeader(), fmt::format("function connect between {} and {}", out_name, in_name));

View File

@ -25,7 +25,7 @@ namespace ErrorCodes
class Port
{
friend void connect(OutputPort &, InputPort &);
friend void connect(OutputPort &, InputPort &, bool);
friend class IProcessor;
public:
@ -267,7 +267,7 @@ protected:
/// * You can pull only if port hasData().
class InputPort : public Port
{
friend void connect(OutputPort &, InputPort &);
friend void connect(OutputPort &, InputPort &, bool);
private:
OutputPort * output_port = nullptr;
@ -390,7 +390,7 @@ public:
/// * You can push only if port doesn't hasData().
class OutputPort : public Port
{
friend void connect(OutputPort &, InputPort &);
friend void connect(OutputPort &, InputPort &, bool);
private:
InputPort * input_port = nullptr;
@ -483,6 +483,6 @@ using InputPorts = std::list<InputPort>;
using OutputPorts = std::list<OutputPort>;
void connect(OutputPort & output, InputPort & input);
void connect(OutputPort & output, InputPort & input, bool reconnect = false);
}

View File

@ -182,6 +182,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params->params.min_free_disk_space,
transform_params->params.compile_aggregate_expressions,
transform_params->params.min_count_to_compile_aggregate_expression,
transform_params->params.max_block_size,
/* only_merge */ false,
transform_params->params.stats_collecting_params};
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(src_header, std::move(params_for_set), final);
@ -376,16 +377,15 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
});
/// We add the explicit resize here, but not in case of aggregating in order, since AIO don't use two-level hash tables and thus returns only buckets with bucket_number = -1.
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : pipeline.getNumStreams(), true /* force */);
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, true /* force */);
aggregating = collector.detachProcessors(0);
}
else
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(header, transform_params);
});
pipeline.addSimpleTransform([&](const Block & header) { return std::make_shared<AggregatingTransform>(header, transform_params); });
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, false /* force */);
aggregating = collector.detachProcessors(0);
}

View File

@ -0,0 +1,205 @@
#include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
#include <Processors/Transforms/CreateSetAndFilterOnTheFlyTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Processors/IProcessor.h>
#include <Processors/PingPongProcessor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static void connectAllInputs(OutputPortRawPtrs ports, InputPorts & inputs, size_t num_ports)
{
auto input_it = inputs.begin();
for (size_t i = 0; i < num_ports; ++i)
{
connect(*ports[i], *input_it);
input_it++;
}
}
static ColumnsWithTypeAndName getColumnSubset(const Block & block, const Names & column_names)
{
ColumnsWithTypeAndName result;
for (const auto & name : column_names)
result.emplace_back(block.getByName(name));
return result;
}
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}
class CreateSetAndFilterOnTheFlyStep::CrosswiseConnection : public boost::noncopyable
{
public:
using PortPair = std::pair<InputPort *, OutputPort *>;
/// Remember ports passed on the first call and connect with ones from second call.
/// Thread-safe.
void connectPorts(PortPair rhs_ports, IProcessor * proc)
{
assert(!rhs_ports.first->isConnected() && !rhs_ports.second->isConnected());
std::lock_guard<std::mutex> lock(mux);
if (input_port || output_port)
{
assert(input_port && output_port);
assert(!input_port->isConnected());
connect(*rhs_ports.second, *input_port);
connect(*output_port, *rhs_ports.first, /* reconnect= */ true);
}
else
{
std::tie(input_port, output_port) = rhs_ports;
assert(input_port && output_port);
assert(!input_port->isConnected() && !output_port->isConnected());
dummy_input_port = std::make_unique<InputPort>(output_port->getHeader(), proc);
connect(*output_port, *dummy_input_port);
}
}
private:
std::mutex mux;
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
/// Output ports should always be connected, and we can't add a step to the pipeline without them.
/// So, connect the port from the first processor to this dummy port and then reconnect to the second processor.
std::unique_ptr<InputPort> dummy_input_port;
};
CreateSetAndFilterOnTheFlyStep::CrosswiseConnectionPtr CreateSetAndFilterOnTheFlyStep::createCrossConnection()
{
return std::make_shared<CreateSetAndFilterOnTheFlyStep::CrosswiseConnection>();
}
CreateSetAndFilterOnTheFlyStep::CreateSetAndFilterOnTheFlyStep(
const DataStream & input_stream_,
const Names & column_names_,
size_t max_rows_in_set_,
CrosswiseConnectionPtr crosswise_connection_,
JoinTableSide position_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, column_names(column_names_)
, max_rows_in_set(max_rows_in_set_)
, own_set(std::make_shared<SetWithState>(SizeLimits(max_rows_in_set, 0, OverflowMode::BREAK), false, true))
, filtering_set(nullptr)
, crosswise_connection(crosswise_connection_)
, position(position_)
{
if (crosswise_connection == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Crosswise connection is not initialized");
if (input_streams.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Step requires exactly one input stream, got {}", input_streams.size());
own_set->setHeader(getColumnSubset(input_streams[0].header, column_names));
}
void CreateSetAndFilterOnTheFlyStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
size_t num_streams = pipeline.getNumStreams();
pipeline.addSimpleTransform([this, num_streams](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
auto res = std::make_shared<CreatingSetsOnTheFlyTransform>(header, column_names, num_streams, own_set);
res->setDescription(this->getStepDescription());
return res;
});
Block input_header = pipeline.getHeader();
auto pipeline_transform = [&input_header, this](OutputPortRawPtrs ports)
{
Processors result_transforms;
size_t num_ports = ports.size();
/// Add balancing transform
auto idx = position == JoinTableSide::Left ? PingPongProcessor::First : PingPongProcessor::Second;
auto stream_balancer = std::make_shared<ReadHeadBalancedProcessor>(input_header, num_ports, max_rows_in_set, idx);
stream_balancer->setDescription(getStepDescription());
/// Regular inputs just bypass data for respective ports
connectAllInputs(ports, stream_balancer->getInputs(), num_ports);
/// Connect auxiliary ports
crosswise_connection->connectPorts(stream_balancer->getAuxPorts(), stream_balancer.get());
if (!filtering_set)
{
LOG_DEBUG(log, "Skip filtering {} stream", position);
result_transforms.emplace_back(std::move(stream_balancer));
return result_transforms;
}
/// Add filtering transform, ports just connected respectively
auto & outputs = stream_balancer->getOutputs();
auto output_it = outputs.begin();
for (size_t i = 0; i < outputs.size() - 1; ++i)
{
auto & port = *output_it++;
auto transform = std::make_shared<FilterBySetOnTheFlyTransform>(port.getHeader(), column_names, filtering_set);
transform->setDescription(this->getStepDescription());
connect(port, transform->getInputPort());
result_transforms.emplace_back(std::move(transform));
}
assert(output_it == std::prev(outputs.end()));
result_transforms.emplace_back(std::move(stream_balancer));
return result_transforms;
};
/// Auxiliary port stream_balancer can be connected later (by crosswise_connection).
/// So, use unsafe `transform` with `check_ports = false` to avoid assertions
pipeline.transform(std::move(pipeline_transform), /* check_ports= */ false);
}
void CreateSetAndFilterOnTheFlyStep::describeActions(JSONBuilder::JSONMap & map) const
{
map.add(getName(), true);
}
void CreateSetAndFilterOnTheFlyStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
settings.out << prefix << getName();
settings.out << '\n';
}
void CreateSetAndFilterOnTheFlyStep::updateOutputStream()
{
if (input_streams.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{} requires exactly one input stream, got {}", getName(), input_streams.size());
own_set->setHeader(getColumnSubset(input_streams[0].header, column_names));
output_stream = input_streams[0];
}
}

View File

@ -0,0 +1,59 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/Transforms/CreateSetAndFilterOnTheFlyTransform.h>
#include <Processors/DelayedPortsProcessor.h>
namespace DB
{
/*
* Used to optimize JOIN when joining a small table over a large table.
* Currently applied only for the full sorting join.
* It tries to build a set for each stream.
* Once one stream is finished, it starts to filter another stream with this set.
*/
class CreateSetAndFilterOnTheFlyStep : public ITransformingStep
{
public:
/// Two instances of step need some shared state to connect processors crosswise
class CrosswiseConnection;
using CrosswiseConnectionPtr = std::shared_ptr<CrosswiseConnection>;
static CrosswiseConnectionPtr createCrossConnection();
CreateSetAndFilterOnTheFlyStep(
const DataStream & input_stream_,
const Names & column_names_,
size_t max_rows_in_set_,
CrosswiseConnectionPtr crosswise_connection_,
JoinTableSide position_);
String getName() const override { return "CreateSetAndFilterOnTheFlyStep"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
SetWithStatePtr getSet() const { return own_set; }
/// Set for another stream.
void setFiltering(SetWithStatePtr filtering_set_) { filtering_set = filtering_set_; }
private:
void updateOutputStream() override;
Names column_names;
size_t max_rows_in_set;
SetWithStatePtr own_set;
SetWithStatePtr filtering_set;
CrosswiseConnectionPtr crosswise_connection;
JoinTableSide position;
Poco::Logger * log = &Poco::Logger::get("CreateSetAndFilterOnTheFlyStep");
};
}

View File

@ -34,8 +34,12 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines
throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps");
if (join->pipelineType() == JoinPipelineType::YShaped)
return QueryPipelineBuilder::joinPipelinesYShaped(
{
auto joined_pipeline = QueryPipelineBuilder::joinPipelinesYShaped(
std::move(pipelines[0]), std::move(pipelines[1]), join, output_stream->header, max_block_size, &processors);
joined_pipeline->resize(max_streams);
return joined_pipeline;
}
return QueryPipelineBuilder::joinPipelinesRightLeft(
std::move(pipelines[0]),

View File

@ -8,6 +8,7 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h>
@ -22,6 +23,7 @@
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/TableJoin.h>
#include <fmt/format.h>
namespace DB::ErrorCodes
{
@ -134,10 +136,24 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
static size_t
tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, const Names & allowed_inputs,
bool can_remove_filter = true)
bool can_remove_filter = true, size_t child_idx = 0)
{
if (auto split_filter = splitFilter(parent_node, allowed_inputs, 0))
return tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, 0);
if (auto split_filter = splitFilter(parent_node, allowed_inputs, child_idx))
return tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, child_idx);
return 0;
}
/// Push down filter through specified type of step
template <typename Step>
static size_t simplePushDownOverStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, QueryPlanStepPtr & child)
{
if (typeid_cast<Step *>(child.get()))
{
Names allowed_inputs = child->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
return updated_steps;
}
return 0;
}
@ -234,12 +250,8 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
return updated_steps;
}
if (auto * distinct = typeid_cast<DistinctStep *>(child.get()))
{
Names allowed_inputs = distinct->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
if (auto updated_steps = simplePushDownOverStep<DistinctStep>(parent_node, nodes, child))
return updated_steps;
}
if (auto * join = typeid_cast<JoinStep *>(child.get()))
{
@ -290,7 +302,7 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
const size_t updated_steps = tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, child_idx);
if (updated_steps > 0)
{
LOG_DEBUG(&Poco::Logger::get("QueryPlanOptimizations"), "Pushed down filter to {} side of join", kind);
LOG_DEBUG(&Poco::Logger::get("QueryPlanOptimizations"), "Pushed down filter {} to the {} side of join", split_filter_column_name, kind);
}
return updated_steps;
};
@ -321,12 +333,11 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
// {
// }
if (typeid_cast<SortingStep *>(child.get()))
{
Names allowed_inputs = child->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
if (auto updated_steps = simplePushDownOverStep<SortingStep>(parent_node, nodes, child))
return updated_steps;
if (auto updated_steps = simplePushDownOverStep<CreateSetAndFilterOnTheFlyStep>(parent_node, nodes, child))
return updated_steps;
}
if (auto * union_step = typeid_cast<UnionStep *>(child.get()))
{

View File

@ -85,6 +85,13 @@ public:
{
}
StrictResizeProcessor(InputPorts inputs_, OutputPorts outputs_)
: IProcessor(inputs_, outputs_)
, current_input(inputs.begin())
, current_output(outputs.begin())
{
}
String getName() const override { return "StrictResize"; }
Status prepare(const PortNumbers &, const PortNumbers &) override;

View File

@ -38,7 +38,8 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
settings.min_count_to_compile_aggregate_expression,
settings.max_block_size);
aggregator = std::make_unique<Aggregator>(header, params);

View File

@ -182,7 +182,8 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
if (cur_block_size >= max_block_size || cur_block_bytes + current_memory_usage >= max_block_bytes)
{
if (group_by_key)
group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false);
group_by_block
= params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ true>(variants, /* final= */ false);
cur_block_bytes += current_memory_usage;
finalizeCurrentChunk(std::move(chunk), key_end);
return;
@ -293,7 +294,8 @@ void AggregatingInOrderTransform::generate()
if (cur_block_size && is_consume_finished)
{
if (group_by_key)
group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false);
group_by_block
= params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ true>(variants, /* final= */ false);
else
params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns);
variants.invalidate();

View File

@ -203,7 +203,7 @@ public:
{
auto & output = outputs.front();
if (finished && !has_input)
if (finished && single_level_chunks.empty())
{
output.finish();
return Status::Finished;
@ -230,7 +230,7 @@ public:
if (!processors.empty())
return Status::ExpandPipeline;
if (has_input)
if (!single_level_chunks.empty())
return preparePushToOutput();
/// Single level case.
@ -244,11 +244,14 @@ public:
private:
IProcessor::Status preparePushToOutput()
{
auto & output = outputs.front();
output.push(std::move(current_chunk));
has_input = false;
if (single_level_chunks.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Some ready chunks expected");
if (finished)
auto & output = outputs.front();
output.push(std::move(single_level_chunks.back()));
single_level_chunks.pop_back();
if (finished && single_level_chunks.empty())
{
output.finish();
return Status::Finished;
@ -268,17 +271,17 @@ private:
{
auto chunk = input.pull();
auto bucket = getInfoFromChunk(chunk)->bucket_num;
chunks[bucket] = std::move(chunk);
two_level_chunks[bucket] = std::move(chunk);
}
}
if (!shared_data->is_bucket_processed[current_bucket_num])
return Status::NeedData;
if (!chunks[current_bucket_num])
if (!two_level_chunks[current_bucket_num])
return Status::NeedData;
output.push(std::move(chunks[current_bucket_num]));
output.push(std::move(two_level_chunks[current_bucket_num]));
++current_bucket_num;
if (current_bucket_num == NUM_BUCKETS)
@ -298,27 +301,16 @@ private:
size_t num_threads;
bool is_initialized = false;
bool has_input = false;
bool finished = false;
Chunk current_chunk;
Chunks single_level_chunks;
UInt32 current_bucket_num = 0;
static constexpr Int32 NUM_BUCKETS = 256;
std::array<Chunk, NUM_BUCKETS> chunks;
std::array<Chunk, NUM_BUCKETS> two_level_chunks;
Processors processors;
void setCurrentChunk(Chunk chunk)
{
if (has_input)
throw Exception("Current chunk was already set in "
"ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
has_input = true;
current_chunk = std::move(chunk);
}
void initialize()
{
is_initialized = true;
@ -339,7 +331,7 @@ private:
auto block = params->aggregator.prepareBlockAndFillWithoutKey(
*first, params->final, first->type != AggregatedDataVariants::Type::without_key);
setCurrentChunk(convertToChunk(block));
single_level_chunks.emplace_back(convertToChunk(block));
}
}
@ -364,9 +356,10 @@ private:
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
auto block = params->aggregator.prepareBlockAndFillSingleLevel(*first, params->final);
auto blocks = params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ false>(*first, params->final);
for (auto & block : blocks)
single_level_chunks.emplace_back(convertToChunk(block));
setCurrentChunk(convertToChunk(block));
finished = true;
}

View File

@ -0,0 +1,195 @@
#include <Processors/Transforms/CreateSetAndFilterOnTheFlyTransform.h>
#include <cstddef>
#include <mutex>
#include <Interpreters/Set.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/logger_useful.h>
#include <Columns/IColumn.h>
#include <Core/ColumnWithTypeAndName.h>
#include <base/types.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
std::vector<size_t> getColumnIndices(const Block & block, const Names & column_names)
{
std::vector<size_t> indices;
for (const auto & name : column_names)
indices.push_back(block.getPositionByName(name));
return indices;
}
Columns getColumnsByIndices(const Chunk & chunk, const std::vector<size_t> & indices)
{
Columns columns;
const Columns & all_cols = chunk.getColumns();
for (const auto & index : indices)
columns.push_back(all_cols.at(index));
return columns;
}
ColumnsWithTypeAndName getColumnsByIndices(const Block & sample_block, const Chunk & chunk, const std::vector<size_t> & indices)
{
Block block = sample_block.cloneEmpty();
block.setColumns(getColumnsByIndices(chunk, indices));
return block.getColumnsWithTypeAndName();
}
}
CreatingSetsOnTheFlyTransform::CreatingSetsOnTheFlyTransform(
const Block & header_, const Names & column_names_, size_t num_streams_, SetWithStatePtr set_)
: ISimpleTransform(header_, header_, true)
, column_names(column_names_)
, key_column_indices(getColumnIndices(inputs.front().getHeader(), column_names))
, num_streams(num_streams_)
, set(set_)
{
}
IProcessor::Status CreatingSetsOnTheFlyTransform::prepare()
{
IProcessor::Status status = ISimpleTransform::prepare();
if (!set || status != Status::Finished)
/// Nothing to do with set
return status;
/// Finalize set
if (set->state == SetWithState::State::Creating)
{
if (input.isFinished())
{
set->finished_count++;
if (set->finished_count != num_streams)
/// Not all instances of processor are finished
return status;
set->finishInsert();
set->state = SetWithState::State::Finished;
LOG_DEBUG(log, "{}: finish building set for [{}] with {} rows, set size is {}",
getDescription(), fmt::join(column_names, ", "), set->getTotalRowCount(),
formatReadableSizeWithBinarySuffix(set->getTotalByteCount()));
set.reset();
}
else
{
/// Should not happen because processor inserted before join that reads all the data
throw Exception(ErrorCodes::LOGICAL_ERROR, "Processor finished, but not all input was read");
}
}
return status;
}
void CreatingSetsOnTheFlyTransform::transform(Chunk & chunk)
{
if (!set || set->state != SetWithState::State::Creating)
{
/// If set building suspended by another processor, release pointer
if (set != nullptr)
set.reset();
return;
}
if (chunk.getNumRows())
{
Columns key_columns = getColumnsByIndices(chunk, key_column_indices);
bool limit_exceeded = !set->insertFromBlock(key_columns);
if (limit_exceeded)
{
auto prev_state = set->state.exchange(SetWithState::State::Suspended);
/// Print log only after first state switch
if (prev_state == SetWithState::State::Creating)
{
LOG_DEBUG(log, "{}: set limit exceeded, give up building set, after reading {} rows and using {}",
getDescription(), set->getTotalRowCount(), formatReadableSizeWithBinarySuffix(set->getTotalByteCount()));
}
/// Probaply we need to clear set here, because it's unneeded anymore
/// But now `Set` doesn't have such method, so reset pointer in all processors and then it should be freed
set.reset();
}
}
}
FilterBySetOnTheFlyTransform::FilterBySetOnTheFlyTransform(const Block & header_, const Names & column_names_, SetWithStatePtr set_)
: ISimpleTransform(header_, header_, true)
, column_names(column_names_)
, key_column_indices(getColumnIndices(inputs.front().getHeader(), column_names))
, set(set_)
{
const auto & header = inputs.front().getHeader();
for (size_t idx : key_column_indices)
key_sample_block.insert(header.getByPosition(idx));
}
IProcessor::Status FilterBySetOnTheFlyTransform::prepare()
{
auto status = ISimpleTransform::prepare();
if (set && set->state == SetWithState::State::Suspended)
set.reset();
if (status == Status::Finished)
{
bool has_filter = set && set->state == SetWithState::State::Finished;
if (has_filter)
{
LOG_DEBUG(log, "Finished {} by [{}]: consumed {} rows in total, {} rows bypassed, result {} rows, {:.2f}% filtered",
Poco::toLower(getDescription()), fmt::join(column_names, ", "),
stat.consumed_rows, stat.consumed_rows_before_set, stat.result_rows,
100 - 100.0 * stat.result_rows / stat.consumed_rows);
}
else
{
LOG_DEBUG(log, "Finished {}: bypass {} rows", Poco::toLower(getDescription()), stat.consumed_rows);
}
/// Release set to free memory
set = nullptr;
}
return status;
}
void FilterBySetOnTheFlyTransform::transform(Chunk & chunk)
{
stat.consumed_rows += chunk.getNumRows();
stat.result_rows += chunk.getNumRows();
bool can_filter = set && set->state == SetWithState::State::Finished;
if (!can_filter)
stat.consumed_rows_before_set += chunk.getNumRows();
if (can_filter && chunk.getNumRows())
{
auto key_columns = getColumnsByIndices(key_sample_block, chunk, key_column_indices);
ColumnPtr mask_col = set->execute(key_columns, false);
const auto & mask = assert_cast<const ColumnUInt8 *>(mask_col.get())->getData();
stat.result_rows -= chunk.getNumRows();
Columns columns = chunk.detachColumns();
size_t result_num_rows = 0;
for (auto & col : columns)
{
col = col->filter(mask, /* negative */ false);
result_num_rows = col->size();
}
stat.result_rows += result_num_rows;
chunk.setColumns(std::move(columns), result_num_rows);
}
}
}

View File

@ -0,0 +1,114 @@
#pragma once
#include <atomic>
#include <mutex>
#include <vector>
#include <Processors/ISimpleTransform.h>
#include <Poco/Logger.h>
#include <Interpreters/Set.h>
namespace DB
{
struct SetWithState : public Set
{
using Set::Set;
/// Flow: Creating -> Finished or Suspended
enum class State
{
/// Set is not yet created,
/// Creating processor continues to build set.
/// Filtering bypasses data.
Creating,
/// Set is finished.
/// Creating processor is finished.
/// Filtering filter stream with this set.
Finished,
/// Set building is canceled (due to limit exceeded).
/// Creating and filtering processors bypass data.
Suspended,
};
std::atomic<State> state = State::Creating;
/// Track number of processors that are currently working on this set.
/// Last one finalizes set.
std::atomic_size_t finished_count = 0;
};
using SetWithStatePtr = std::shared_ptr<SetWithState>;
/*
* Create a set on the fly for incoming stream.
* The set is created from the key columns of the input block.
* Data is not changed and returned as is.
* Can be executed in parallel, but blocks on operations with set.
*/
class CreatingSetsOnTheFlyTransform : public ISimpleTransform
{
public:
CreatingSetsOnTheFlyTransform(const Block & header_, const Names & column_names_, size_t num_streams_, SetWithStatePtr set_);
String getName() const override { return "CreatingSetsOnTheFlyTransform"; }
Status prepare() override;
void transform(Chunk & chunk) override;
private:
Names column_names;
std::vector<size_t> key_column_indices;
size_t num_streams;
/// Set to fill
SetWithStatePtr set;
Poco::Logger * log = &Poco::Logger::get("CreatingSetsOnTheFlyTransform");
};
/*
* Filter the input chunk by the set.
* When set building is not completed, just return the source data.
*/
class FilterBySetOnTheFlyTransform : public ISimpleTransform
{
public:
FilterBySetOnTheFlyTransform(const Block & header_, const Names & column_names_, SetWithStatePtr set_);
String getName() const override { return "FilterBySetOnTheFlyTransform"; }
Status prepare() override;
void transform(Chunk & chunk) override;
private:
/// Set::execute requires ColumnsWithTypesAndNames, so we need to convert Chunk to that format
Block key_sample_block;
Names column_names;
std::vector<size_t> key_column_indices;
/// Filter by this set when it's created
SetWithStatePtr set;
/// Statistics to log
struct Stat
{
/// Total number of rows
size_t consumed_rows = 0;
/// Number of bypassed rows (processed before set is created)
size_t consumed_rows_before_set = 0;
/// Number of rows that passed the filter
size_t result_rows = 0;
} stat;
Poco::Logger * log = &Poco::Logger::get("FilterBySetOnTheFlyTransform");
};
}

View File

@ -770,7 +770,7 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
header.clear();
}
void Pipe::transform(const Transformer & transformer)
void Pipe::transform(const Transformer & transformer, bool check_ports)
{
if (output_ports.empty())
throw Exception("Cannot transform empty Pipe", ErrorCodes::LOGICAL_ERROR);
@ -784,6 +784,9 @@ void Pipe::transform(const Transformer & transformer)
for (const auto & port : output_ports)
{
if (!check_ports)
break;
if (!port->isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -799,6 +802,9 @@ void Pipe::transform(const Transformer & transformer)
{
for (const auto & port : processor->getInputs())
{
if (!check_ports)
break;
if (!port.isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -806,7 +812,7 @@ void Pipe::transform(const Transformer & transformer)
processor->getName());
const auto * connected_processor = &port.getOutputPort().getProcessor();
if (!set.contains(connected_processor))
if (check_ports && !set.contains(connected_processor))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has input port which is connected with unknown processor {}",
@ -823,7 +829,7 @@ void Pipe::transform(const Transformer & transformer)
}
const auto * connected_processor = &port.getInputPort().getProcessor();
if (!set.contains(connected_processor))
if (check_ports && !set.contains(connected_processor))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has output port which is connected with unknown processor {}",

View File

@ -85,13 +85,13 @@ public:
/// Add chain to every output port.
void addChains(std::vector<Chain> chains);
/// Changes the number of output ports if needed. Adds ResizeTransform.
/// Changes the number of output ports if needed. Adds (Strict)ResizeProcessor.
void resize(size_t num_streams, bool force = false, bool strict = false);
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform Pipe in general way.
void transform(const Transformer & transformer);
void transform(const Transformer & transformer, bool check_ports = true);
/// Unite several pipes together. They should have same header.
static Pipe unitePipes(Pipes pipes);

View File

@ -159,10 +159,10 @@ void QueryPipelineBuilder::addChain(Chain chain)
pipe.addChains(std::move(chains));
}
void QueryPipelineBuilder::transform(const Transformer & transformer)
void QueryPipelineBuilder::transform(const Transformer & transformer, bool check_ports)
{
checkInitializedAndNotCompleted();
pipe.transform(transformer);
pipe.transform(transformer, check_ports);
}
void QueryPipelineBuilder::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
@ -348,8 +348,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesYShaped
left->pipe.dropExtremes();
right->pipe.dropExtremes();
if (left->pipe.output_ports.size() != 1 || right->pipe.output_ports.size() != 1)
if (left->getNumStreams() != 1 || right->getNumStreams() != 1)
throw Exception("Join is supported only for pipelines with one output port", ErrorCodes::LOGICAL_ERROR);
if (left->hasTotals() || right->hasTotals())
@ -359,8 +358,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesYShaped
auto joining = std::make_shared<MergeJoinTransform>(join, inputs, out_header, max_block_size);
auto result = mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors);
return result;
return mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors);
}
std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLeft(

View File

@ -69,7 +69,7 @@ public:
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform pipeline in general way.
void transform(const Transformer & transformer);
void transform(const Transformer & transformer, bool check_ports = true);
/// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port.
void addTotalsHavingTransform(ProcessorPtr transform);

View File

@ -297,8 +297,10 @@ public:
assert(indexes_mapping.size() == data_types.size());
for (size_t i = 0; i < indexes_mapping.size(); ++i)
{
if (!candidate_set->areTypesEqual(indexes_mapping[i].tuple_index, data_types[i]))
return false;
}
return true;
};

View File

@ -5569,6 +5569,10 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
if (select_query->interpolate() && !select_query->interpolate()->children.empty())
return std::nullopt;
// Currently projections don't support GROUPING SET yet.
if (select_query->group_by_with_grouping_sets)
return std::nullopt;
auto query_options = SelectQueryOptions(
QueryProcessingStage::WithMergeableState,
/* depth */ 1,

View File

@ -313,6 +313,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression,
settings.max_block_size,
only_merge);
return std::make_pair(params, only_merge);

View File

@ -0,0 +1,45 @@
<test>
<substitutions>
<substitution>
<name>table_size</name>
<values>
<value>100000000</value>
</values>
</substitution>
</substitutions>
<settings>
<join_algorithm>full_sorting_merge</join_algorithm>
</settings>
<create_query>
CREATE TABLE t1 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT
sipHash64(number, 't1_x') % {table_size} AS x,
sipHash64(number, 't1_y') % {table_size} AS y
FROM numbers({table_size})
</create_query>
<create_query>
CREATE TABLE t2 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT
sipHash64(number, 't2_x') % {table_size} AS x,
sipHash64(number, 't2_y') % {table_size} AS y
FROM numbers({table_size})
</create_query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE less(t1.y, 10000)</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE less(t1.y, 10000)</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE greater(t1.y, {table_size} - 10000)</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE greater(t1.y, {table_size} - 10000)</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 100 = 0</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE t1.y % 100 = 0</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 1000 = 0</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE t1.y % 1000 = 0</query>
<drop_query>DROP TABLE IF EXISTS t1</drop_query>
<drop_query>DROP TABLE IF EXISTS t2</drop_query>
</test>

View File

@ -1,4 +1,8 @@
<test>
<query>select sipHash64(number) from numbers(1e7) group by number format Null</query>
<query>select * from (select * from numbers(1e7) group by number) group by number format Null</query>
<query>select * from (select * from numbers(1e7) group by number) order by number format Null</query>
<query>select * from (select * from numbers_mt(1e7) group by number) group by number format Null</query>
<query>select * from (select * from numbers_mt(1e7) group by number) order by number format Null</query>
<query>select * from (select * from numbers_mt(1e7) group by number) group by number format Null settings max_bytes_before_external_group_by = 1</query>

View File

@ -1,4 +1,5 @@
SET joined_subquery_requires_alias = 0;
SET max_threads = 1;
-- incremental streaming usecase
-- that has sense only if data filling order has guarantees of chronological order

View File

@ -28,7 +28,7 @@ WITH
ORDER BY event_time DESC
LIMIT 1
) AS id
SELECT uniqExact(thread_id)
SELECT uniqExact(thread_id) > 2
FROM system.query_thread_log
WHERE (event_date >= (today() - 1)) AND (query_id = id) AND (thread_id != master_thread_id);

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: no-replicated-database, no-parallel, no-fasttest, no-tsan, no-asan, no-random-settings, no-s3-storage
# Tags: no-replicated-database, no-parallel, no-fasttest, no-tsan, no-asan, no-random-settings, no-s3-storage, no-msan
# Tag no-fasttest: max_memory_usage_for_user can interfere another queries running concurrently
# Regression for MemoryTracker that had been incorrectly accounted

View File

@ -6,4 +6,4 @@
2020-01-01 00:00:00 2
1
499999
5
18

View File

@ -1,7 +1,7 @@
DROP TABLE IF EXISTS select_final;
SET do_not_merge_across_partitions_select_final = 1;
SET max_threads = 0;
SET max_threads = 16;
CREATE TABLE select_final (t DateTime, x Int32, string String) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY (x, t);

View File

@ -1,27 +1,12 @@
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
@ -29,32 +14,47 @@
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51
1 50 51 0 1 51
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51
1 50 50 1 0 49
1 50 50 1 0 49
1 50 50 1 0 49
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51
1 50 51 0 1 51

View File

@ -52,6 +52,7 @@ ALL LEFT JOIN
FROM group_bitmap_data_test
WHERE pickup_date = '2019-01-01'
GROUP BY city_id
) AS js2 USING (city_id);
) AS js2 USING (city_id)
ORDER BY today_users, before_users, ll_users, old_users, new_users, diff_users;
DROP TABLE IF EXISTS group_bitmap_data_test;

View File

@ -0,0 +1,28 @@
a 2
a x 1
a y 1
b 2
b x 1
b y 1
4
a 2
a x 1
a y 1
b 2
b x 1
b y 1
4
x 2
y 2
a 2
a x 1
a y 1
b 2
b x 1
b y 1
a x 1
a y 1
b x 1
b y 1
4

View File

@ -0,0 +1,15 @@
drop table if exists test;
create table test(dim1 String, dim2 String, projection p1 (select dim1, dim2, count() group by dim1, dim2)) engine MergeTree order by dim1;
insert into test values ('a', 'x') ('a', 'y') ('b', 'x') ('b', 'y');
select dim1, dim2, count() from test group by grouping sets ((dim1, dim2), dim1) order by dim1, dim2, count();
select dim1, dim2, count() from test group by dim1, dim2 with rollup order by dim1, dim2, count();
select dim1, dim2, count() from test group by dim1, dim2 with cube order by dim1, dim2, count();
select dim1, dim2, count() from test group by dim1, dim2 with totals order by dim1, dim2, count();
drop table test;

View File

@ -12,7 +12,7 @@ select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by n
-- and the query with GROUP BY on remote servers will first do GROUP BY and then send the block,
-- so the initiator will first receive all blocks from remotes and only after start merging,
-- and will hit the memory limit.
select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by number order by number limit 1e6 settings distributed_group_by_no_merge=2, max_memory_usage='100Mi'; -- { serverError 241 }
select * from remote('127.{2..11}', view(select * from numbers(1e6))) group by number order by number limit 1e6 settings distributed_group_by_no_merge=2, max_memory_usage='100Mi', max_block_size=1e12; -- { serverError 241 }
-- with optimize_aggregation_in_order=1 remote servers will produce blocks more frequently,
-- since they don't need to wait until the aggregation will be finished,

View File

@ -1 +1,2 @@
select count(1) from (SELECT 1 AS a, count(1) FROM numbers(5))
select count(1) from (SELECT 1 AS a, count(1) FROM numbers(5));
select count(1) from (SELECT 1 AS a, count(1) + 1 FROM numbers(5));

View File

@ -1,5 +1,22 @@
-- { echoOn }
explain pipeline select * from (select * from numbers(1e8) group by number) group by number;
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 16 → 16
AggregatingTransform × 16
StrictResize 16 → 16
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 1 → 16
AggregatingTransform
(Expression)
ExpressionTransform
(ReadFromStorage)
Limit
Numbers 0 → 1
explain pipeline select * from (select * from numbers_mt(1e8) group by number) group by number;
(Expression)
ExpressionTransform × 16

View File

@ -1,9 +1,12 @@
set max_threads = 16;
set prefer_localhost_replica = 1;
set optimize_aggregation_in_order = 0;
set max_block_size = 65505;
-- { echoOn }
explain pipeline select * from (select * from numbers(1e8) group by number) group by number;
explain pipeline select * from (select * from numbers_mt(1e8) group by number) group by number;
explain pipeline select * from (select * from numbers_mt(1e8) group by number) order by number;

View File

@ -0,0 +1,9 @@
SET max_block_size = 4213;
SELECT DISTINCT (blockSize() <= 4213)
FROM
(
SELECT number
FROM numbers(100000)
GROUP BY number
);

View File

@ -7,6 +7,8 @@ USING (key);
SET join_algorithm = 'full_sorting_merge';
SET max_rows_in_set_to_optimize_join = 0;
EXPLAIN actions=0, description=0, header=1
SELECT * FROM ( SELECT 'key2' AS key ) AS s1
JOIN ( SELECT 'key1' AS key, '1' AS value UNION ALL SELECT 'key2' AS key, '1' AS value ) AS s2

View File

@ -0,0 +1,7 @@
106
46
42
51
42
24
10

View File

@ -0,0 +1,20 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't1_x') % 100 AS x, sipHash64(number, 't1_y') % 100 AS y FROM numbers(100);
CREATE TABLE t2 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't2_x') % 100 AS x, sipHash64(number, 't2_y') % 100 AS y FROM numbers(100);
SET max_rows_in_set_to_optimize_join = 1000;
SET join_algorithm = 'full_sorting_merge';
-- different combinations of conditions on key/attribute columns for the left/right tables
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.x % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t2.y % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t2.x % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 2 == 0 AND t2.y % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.x % 2 == 0 AND t2.x % 2 == 0 AND t1.y % 2 == 0 AND t2.y % 2 == 0;

View File

@ -0,0 +1,10 @@
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok

View File

@ -0,0 +1,55 @@
#!/usr/bin/env bash
# Tags: no-asan,no-msan,no-tsan,no-ubsan
#
# Test doesn't run complex queries, just test the logic of setting, so no need to run with different builds.
# Also, we run similar queries in 02382_join_and_filtering_set.sql which is enabled for these builds.
#
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -mn -q """
CREATE TABLE t1 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't1_x') % 100 AS x, sipHash64(number, 't1_y') % 100 AS y FROM numbers(100);
CREATE TABLE t2 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't2_x') % 100 AS x, sipHash64(number, 't2_y') % 100 AS y FROM numbers(100);
"""
# Arguments:
# - value of max_rows_in_set_to_optimize_join
# - join kind
# - expected number of steps in plan
# - expected number of steps in pipeline
function test() {
PARAM_VALUE=$1
JOIN_KIND=${2:-}
EXPECTED_PLAN_STEPS=$3
RES=$(
$CLICKHOUSE_CLIENT --max_rows_in_set_to_optimize_join=${PARAM_VALUE} --join_algorithm='full_sorting_merge' \
-q "EXPLAIN PLAN SELECT count() FROM t1 ${JOIN_KIND} JOIN t2 ON t1.x = t2.x" | grep -o 'CreateSetAndFilterOnTheFlyStep' | wc -l
)
[ "$RES" -eq "$EXPECTED_PLAN_STEPS" ] && echo "Ok" || echo "Fail: $RES != $EXPECTED_PLAN_STEPS"
EXPECTED_PIPELINE_STEPS=$4
RES=$(
$CLICKHOUSE_CLIENT --max_rows_in_set_to_optimize_join=${PARAM_VALUE} --join_algorithm='full_sorting_merge' \
-q "EXPLAIN PIPELINE SELECT count() FROM t1 ${JOIN_KIND} JOIN t2 ON t1.x = t2.x" \
| grep -o -e ReadHeadBalancedProcessor -e FilterBySetOnTheFlyTransform -e CreatingSetsOnTheFlyTransform | wc -l
)
[ "$RES" -eq "$EXPECTED_PIPELINE_STEPS" ] && echo "Ok" || echo "Fail: $RES != $EXPECTED_PIPELINE_STEPS"
}
test 1000 '' 2 6
# no filtering for left/right side
test 1000 'LEFT' 2 5
test 1000 'RIGHT' 2 5
# when disabled no extra steps should be created
test 1000 'FULL' 0 0
test 0 '' 0 0

View File

@ -0,0 +1,3 @@
CREATE TABLE set_crash (key1 Int32, id1 Int64, c1 Int64) ENGINE = MergeTree PARTITION BY id1 ORDER BY key1;
INSERT INTO set_crash VALUES (-1, 1, 0);
SELECT 1 in (-1, 1) FROM set_crash WHERE (key1, id1) in (-1, 1);