diff --git a/docs/en/engines/table-engines/integrations/s3queue.md b/docs/en/engines/table-engines/integrations/s3queue.md
index 11fc357d222..89a70420069 100644
--- a/docs/en/engines/table-engines/integrations/s3queue.md
+++ b/docs/en/engines/table-engines/integrations/s3queue.md
@@ -122,7 +122,7 @@ Default value: `0`.
### s3queue_polling_min_timeout_ms {#polling_min_timeout_ms}
-Minimal timeout before next polling (in milliseconds).
+Specifies the minimum time, in milliseconds, that ClickHouse waits before making the next polling attempt.
Possible values:
@@ -132,7 +132,7 @@ Default value: `1000`.
### s3queue_polling_max_timeout_ms {#polling_max_timeout_ms}
-Maximum timeout before next polling (in milliseconds).
+Defines the maximum time, in milliseconds, that ClickHouse waits before initiating the next polling attempt.
Possible values:
@@ -142,7 +142,7 @@ Default value: `10000`.
### s3queue_polling_backoff_ms {#polling_backoff_ms}
-Polling backoff (in milliseconds).
+Determines the additional wait time added to the previous polling interval when no new files are found. The next poll occurs after the sum of the previous interval and this backoff value, or the maximum interval, whichever is lower.
Possible values:
diff --git a/docs/en/engines/table-engines/mergetree-family/aggregatingmergetree.md b/docs/en/engines/table-engines/mergetree-family/aggregatingmergetree.md
index 819038ee32c..dc0f8683f11 100644
--- a/docs/en/engines/table-engines/mergetree-family/aggregatingmergetree.md
+++ b/docs/en/engines/table-engines/mergetree-family/aggregatingmergetree.md
@@ -10,6 +10,11 @@ The engine inherits from [MergeTree](../../../engines/table-engines/mergetree-fa
You can use `AggregatingMergeTree` tables for incremental data aggregation, including for aggregated materialized views.
+You can see an example of how to use the AggregatingMergeTree and Aggregate functions in the below video:
+
+
+
+
The engine processes all columns with the following types:
## [AggregateFunction](../../../sql-reference/data-types/aggregatefunction.md)
diff --git a/docs/en/operations/system-tables/asynchronous_metrics.md b/docs/en/operations/system-tables/asynchronous_metrics.md
index 762d187917c..d506daba95c 100644
--- a/docs/en/operations/system-tables/asynchronous_metrics.md
+++ b/docs/en/operations/system-tables/asynchronous_metrics.md
@@ -211,7 +211,7 @@ Number of threads in the server of the replicas communication protocol (without
The difference in time the thread for calculation of the asynchronous metrics was scheduled to wake up and the time it was in fact, woken up. A proxy-indicator of overall system latency and responsiveness.
-### LoadAverage_*N*
+### LoadAverage*N*
The whole system load, averaged with exponential smoothing over 1 minute. The load represents the number of threads across all the processes (the scheduling entities of the OS kernel), that are currently running by CPU or waiting for IO, or ready to run but not being scheduled at this point of time. This number includes all the processes, not only clickhouse-server. The number can be greater than the number of CPU cores, if the system is overloaded, and many processes are ready to run but waiting for CPU or IO.
diff --git a/docs/en/sql-reference/aggregate-functions/index.md b/docs/en/sql-reference/aggregate-functions/index.md
index 5056ef2c7aa..c297214a49c 100644
--- a/docs/en/sql-reference/aggregate-functions/index.md
+++ b/docs/en/sql-reference/aggregate-functions/index.md
@@ -75,7 +75,7 @@ FROM t_null_big
└────────────────────┴─────────────────────┘
```
-Also you can use [Tuple](/docs/en/sql-reference/data-types/tuple.md) to work around NULL skipping behavior. The a `Tuple` that contains only a `NULL` value is not `NULL`, so the aggregate functions won't skip that row because of that `NULL` value.
+Also you can use [Tuple](/docs/en/sql-reference/data-types/tuple.md) to work around NULL skipping behavior. A `Tuple` that contains only a `NULL` value is not `NULL`, so the aggregate functions won't skip that row because of that `NULL` value.
```sql
SELECT
@@ -110,7 +110,7 @@ GROUP BY v
└──────┴─────────┴──────────┘
```
-And here is an example of of first_value with `RESPECT NULLS` where we can see that NULL inputs are respected and it will return the first value read, whether it's NULL or not:
+And here is an example of first_value with `RESPECT NULLS` where we can see that NULL inputs are respected and it will return the first value read, whether it's NULL or not:
```sql
SELECT
diff --git a/docs/en/sql-reference/aggregate-functions/reference/any.md b/docs/en/sql-reference/aggregate-functions/reference/any.md
index 972263585f2..e7bebd4d460 100644
--- a/docs/en/sql-reference/aggregate-functions/reference/any.md
+++ b/docs/en/sql-reference/aggregate-functions/reference/any.md
@@ -5,7 +5,15 @@ sidebar_position: 102
# any
-Selects the first encountered value of a column, ignoring any `NULL` values.
+Selects the first encountered value of a column.
+
+:::warning
+As a query can be executed in arbitrary order, the result of this function is non-deterministic.
+If you need an arbitrary but deterministic result, use functions [`min`](../reference/min.md) or [`max`](../reference/max.md).
+:::
+
+By default, the function never returns NULL, i.e. ignores NULL values in the input column.
+However, if the function is used with the `RESPECT NULLS` modifier, it returns the first value reads no matter if NULL or not.
**Syntax**
@@ -13,46 +21,51 @@ Selects the first encountered value of a column, ignoring any `NULL` values.
any(column) [RESPECT NULLS]
```
-Aliases: `any_value`, [`first_value`](../reference/first_value.md).
+Aliases `any(column)` (without `RESPECT NULLS`)
+- `any_value`
+- [`first_value`](../reference/first_value.md).
+
+Alias for `any(column) RESPECT NULLS`
+- `anyRespectNulls`, `any_respect_nulls`
+- `firstValueRespectNulls`, `first_value_respect_nulls`
+- `anyValueRespectNulls`, `any_value_respect_nulls`
**Parameters**
-- `column`: The column name.
+- `column`: The column name.
**Returned value**
-:::note
-Supports the `RESPECT NULLS` modifier after the function name. Using this modifier will ensure the function selects the first value passed, regardless of whether it is `NULL` or not.
-:::
+The first value encountered.
:::note
-The return type of the function is the same as the input, except for LowCardinality which is discarded. This means that given no rows as input it will return the default value of that type (0 for integers, or Null for a Nullable() column). You might use the `-OrNull` [combinator](../../../sql-reference/aggregate-functions/combinators.md) ) to modify this behaviour.
-:::
-
-:::warning
-The query can be executed in any order and even in a different order each time, so the result of this function is indeterminate.
-To get a determinate result, you can use the [`min`](../reference/min.md) or [`max`](../reference/max.md) function instead of `any`.
+The return type of the function is the same as the input, except for LowCardinality which is discarded.
+This means that given no rows as input it will return the default value of that type (0 for integers, or Null for a Nullable() column).
+You might use the `-OrNull` [combinator](../../../sql-reference/aggregate-functions/combinators.md) ) to modify this behaviour.
:::
**Implementation details**
-In some cases, you can rely on the order of execution. This applies to cases when `SELECT` comes from a subquery that uses `ORDER BY`.
+In some cases, you can rely on the order of execution.
+This applies to cases when `SELECT` comes from a subquery that uses `ORDER BY`.
-When a `SELECT` query has the `GROUP BY` clause or at least one aggregate function, ClickHouse (in contrast to MySQL) requires that all expressions in the `SELECT`, `HAVING`, and `ORDER BY` clauses be calculated from keys or from aggregate functions. In other words, each column selected from the table must be used either in keys or inside aggregate functions. To get behavior like in MySQL, you can put the other columns in the `any` aggregate function.
+When a `SELECT` query has the `GROUP BY` clause or at least one aggregate function, ClickHouse (in contrast to MySQL) requires that all expressions in the `SELECT`, `HAVING`, and `ORDER BY` clauses be calculated from keys or from aggregate functions.
+In other words, each column selected from the table must be used either in keys or inside aggregate functions.
+To get behavior like in MySQL, you can put the other columns in the `any` aggregate function.
**Example**
Query:
```sql
-CREATE TABLE any_nulls (city Nullable(String)) ENGINE=Log;
+CREATE TABLE tab (city Nullable(String)) ENGINE=Memory;
-INSERT INTO any_nulls (city) VALUES (NULL), ('Amsterdam'), ('New York'), ('Tokyo'), ('Valencia'), (NULL);
+INSERT INTO tab (city) VALUES (NULL), ('Amsterdam'), ('New York'), ('Tokyo'), ('Valencia'), (NULL);
-SELECT any(city) FROM any_nulls;
+SELECT any(city), anyRespectNulls(city) FROM tab;
```
```response
-┌─any(city)─┐
-│ Amsterdam │
-└───────────┘
+┌─any(city)─┬─anyRespectNulls(city)─┐
+│ Amsterdam │ ᴺᵁᴸᴸ │
+└───────────┴───────────────────────┘
```
diff --git a/docs/en/sql-reference/aggregate-functions/reference/anylast.md b/docs/en/sql-reference/aggregate-functions/reference/anylast.md
index 4fe21531c76..3d80533e146 100644
--- a/docs/en/sql-reference/aggregate-functions/reference/anylast.md
+++ b/docs/en/sql-reference/aggregate-functions/reference/anylast.md
@@ -5,7 +5,15 @@ sidebar_position: 105
# anyLast
-Selects the last value encountered, ignoring any `NULL` values by default. The result is just as indeterminate as for the [any](../../../sql-reference/aggregate-functions/reference/any.md) function.
+Selects the last encountered value of a column.
+
+:::warning
+As a query can be executed in arbitrary order, the result of this function is non-deterministic.
+If you need an arbitrary but deterministic result, use functions [`min`](../reference/min.md) or [`max`](../reference/max.md).
+:::
+
+By default, the function never returns NULL, i.e. ignores NULL values in the input column.
+However, if the function is used with the `RESPECT NULLS` modifier, it returns the first value reads no matter if NULL or not.
**Syntax**
@@ -13,12 +21,15 @@ Selects the last value encountered, ignoring any `NULL` values by default. The r
anyLast(column) [RESPECT NULLS]
```
-**Parameters**
-- `column`: The column name.
+Alias `anyLast(column)` (without `RESPECT NULLS`)
+- [`last_value`](../reference/last_value.md).
-:::note
-Supports the `RESPECT NULLS` modifier after the function name. Using this modifier will ensure the function selects the last value passed, regardless of whether it is `NULL` or not.
-:::
+Aliases for `anyLast(column) RESPECT NULLS`
+- `anyLastRespectNulls`, `anyLast_respect_nulls`
+- `lastValueRespectNulls`, `last_value_respect_nulls`
+
+**Parameters**
+- `column`: The column name.
**Returned value**
@@ -29,15 +40,15 @@ Supports the `RESPECT NULLS` modifier after the function name. Using this modifi
Query:
```sql
-CREATE TABLE any_last_nulls (city Nullable(String)) ENGINE=Log;
+CREATE TABLE tab (city Nullable(String)) ENGINE=Memory;
-INSERT INTO any_last_nulls (city) VALUES ('Amsterdam'),(NULL),('New York'),('Tokyo'),('Valencia'),(NULL);
+INSERT INTO tab (city) VALUES ('Amsterdam'),(NULL),('New York'),('Tokyo'),('Valencia'),(NULL);
-SELECT anyLast(city) FROM any_last_nulls;
+SELECT anyLast(city), anyLastRespectNulls(city) FROM tab;
```
```response
-┌─anyLast(city)─┐
-│ Valencia │
-└───────────────┘
+┌─anyLast(city)─┬─anyLastRespectNulls(city)─┐
+│ Valencia │ ᴺᵁᴸᴸ │
+└───────────────┴───────────────────────────┘
```
diff --git a/docs/en/sql-reference/functions/type-conversion-functions.md b/docs/en/sql-reference/functions/type-conversion-functions.md
index 1c92a459e13..5f4c59f5218 100644
--- a/docs/en/sql-reference/functions/type-conversion-functions.md
+++ b/docs/en/sql-reference/functions/type-conversion-functions.md
@@ -6791,7 +6791,7 @@ parseDateTime(str[, format[, timezone]])
**Returned value(s)**
-Returns DateTime values parsed from input string according to a MySQL style format string.
+Return a [DateTime](../data-types/datetime.md) value parsed from the input string according to a MySQL-style format string.
**Supported format specifiers**
@@ -6840,7 +6840,7 @@ parseDateTimeInJodaSyntax(str[, format[, timezone]])
**Returned value(s)**
-Returns DateTime values parsed from input string according to a Joda style format.
+Return a [DateTime](../data-types/datetime.md) value parsed from the input string according to a Joda-style format string.
**Supported format specifiers**
@@ -6885,7 +6885,8 @@ parseDateTime64(str[, format[, timezone]])
**Returned value(s)**
-Returns [DateTime64](../data-types/datetime64.md) type values parsed from input string according to a MySQL style format string.
+Return a [DateTime64](../data-types/datetime64.md) value parsed from the input string according to a MySQL-style format string.
+The precision of the returned value is 6.
## parseDateTime64OrZero
@@ -6913,7 +6914,8 @@ parseDateTime64InJodaSyntax(str[, format[, timezone]])
**Returned value(s)**
-Returns [DateTime64](../data-types/datetime64.md) type values parsed from input string according to a joda style format string.
+Return a [DateTime64](../data-types/datetime64.md) value parsed from the input string according to a Joda-style format string.
+The precision of the returned value equal to the number of `S` placeholders in the format string (but at most 6).
## parseDateTime64InJodaSyntaxOrZero
diff --git a/docs/en/sql-reference/window-functions/first_value.md b/docs/en/sql-reference/window-functions/first_value.md
index 30c3b1f99dc..c6e978bfc92 100644
--- a/docs/en/sql-reference/window-functions/first_value.md
+++ b/docs/en/sql-reference/window-functions/first_value.md
@@ -15,7 +15,7 @@ first_value (column_name) [[RESPECT NULLS] | [IGNORE NULLS]]
OVER ([[PARTITION BY grouping_column] [ORDER BY sorting_column]
[ROWS or RANGE expression_to_bound_rows_withing_the_group]] | [window_name])
FROM table_name
-WINDOW window_name as ([[PARTITION BY grouping_column] [ORDER BY sorting_column])
+WINDOW window_name as ([PARTITION BY grouping_column] [ORDER BY sorting_column])
```
Alias: `any`.
@@ -23,6 +23,8 @@ Alias: `any`.
:::note
Using the optional modifier `RESPECT NULLS` after `first_value(column_name)` will ensure that `NULL` arguments are not skipped.
See [NULL processing](../aggregate-functions/index.md/#null-processing) for more information.
+
+Alias: `firstValueRespectNulls`
:::
For more detail on window function syntax see: [Window Functions - Syntax](./index.md/#syntax).
@@ -48,7 +50,7 @@ CREATE TABLE salaries
)
Engine = Memory;
-INSERT INTO salaries FORMAT Values
+INSERT INTO salaries FORMAT VALUES
('Port Elizabeth Barbarians', 'Gary Chen', 196000, 'F'),
('New Coreystad Archdukes', 'Charles Juarez', 190000, 'F'),
('Port Elizabeth Barbarians', 'Michael Stanley', 100000, 'D'),
diff --git a/docs/en/sql-reference/window-functions/last_value.md b/docs/en/sql-reference/window-functions/last_value.md
index dd7f5fa078a..9f3ef8ba4f6 100644
--- a/docs/en/sql-reference/window-functions/last_value.md
+++ b/docs/en/sql-reference/window-functions/last_value.md
@@ -23,6 +23,8 @@ Alias: `anyLast`.
:::note
Using the optional modifier `RESPECT NULLS` after `first_value(column_name)` will ensure that `NULL` arguments are not skipped.
See [NULL processing](../aggregate-functions/index.md/#null-processing) for more information.
+
+Alias: `lastValueRespectNulls`
:::
For more detail on window function syntax see: [Window Functions - Syntax](./index.md/#syntax).
@@ -33,7 +35,7 @@ For more detail on window function syntax see: [Window Functions - Syntax](./ind
**Example**
-In this example the `last_value` function is used to find the highest paid footballer from a fictional dataset of salaries of Premier League football players.
+In this example the `last_value` function is used to find the lowest paid footballer from a fictional dataset of salaries of Premier League football players.
Query:
@@ -48,7 +50,7 @@ CREATE TABLE salaries
)
Engine = Memory;
-INSERT INTO salaries FORMAT Values
+INSERT INTO salaries FORMAT VALUES
('Port Elizabeth Barbarians', 'Gary Chen', 196000, 'F'),
('New Coreystad Archdukes', 'Charles Juarez', 190000, 'F'),
('Port Elizabeth Barbarians', 'Michael Stanley', 100000, 'D'),
diff --git a/src/AggregateFunctions/AggregateFunctionAnyRespectNulls.cpp b/src/AggregateFunctions/AggregateFunctionAnyRespectNulls.cpp
index cce4f26d813..83fc98ada11 100644
--- a/src/AggregateFunctions/AggregateFunctionAnyRespectNulls.cpp
+++ b/src/AggregateFunctions/AggregateFunctionAnyRespectNulls.cpp
@@ -221,11 +221,16 @@ void registerAggregateFunctionsAnyRespectNulls(AggregateFunctionFactory & factor
= {.returns_default_when_only_null = false, .is_order_dependent = true, .is_window_function = true};
factory.registerFunction("any_respect_nulls", {createAggregateFunctionAnyRespectNulls, default_properties_for_respect_nulls});
- factory.registerAlias("any_value_respect_nulls", "any_respect_nulls", AggregateFunctionFactory::Case::Insensitive);
+ factory.registerAlias("anyRespectNulls", "any_respect_nulls", AggregateFunctionFactory::Case::Sensitive);
factory.registerAlias("first_value_respect_nulls", "any_respect_nulls", AggregateFunctionFactory::Case::Insensitive);
+ factory.registerAlias("firstValueRespectNulls", "any_respect_nulls", AggregateFunctionFactory::Case::Sensitive);
+ factory.registerAlias("any_value_respect_nulls", "any_respect_nulls", AggregateFunctionFactory::Case::Insensitive);
+ factory.registerAlias("anyValueRespectNulls", "any_respect_nulls", AggregateFunctionFactory::Case::Sensitive);
factory.registerFunction("anyLast_respect_nulls", {createAggregateFunctionAnyLastRespectNulls, default_properties_for_respect_nulls});
+ factory.registerAlias("anyLastRespectNulls", "anyLast_respect_nulls", AggregateFunctionFactory::Case::Sensitive);
factory.registerAlias("last_value_respect_nulls", "anyLast_respect_nulls", AggregateFunctionFactory::Case::Insensitive);
+ factory.registerAlias("lastValueRespectNulls", "anyLast_respect_nulls", AggregateFunctionFactory::Case::Sensitive);
/// Must happen after registering any and anyLast
factory.registerNullsActionTransformation("any", "any_respect_nulls");
diff --git a/src/Backups/BackupCoordinationStageSync.cpp b/src/Backups/BackupCoordinationStageSync.cpp
index df5f08091ba..1d7f93398cc 100644
--- a/src/Backups/BackupCoordinationStageSync.cpp
+++ b/src/Backups/BackupCoordinationStageSync.cpp
@@ -685,13 +685,13 @@ void BackupCoordinationStageSync::cancelQueryIfError()
{
std::lock_guard lock{mutex};
- if (!state.host_with_error)
- return;
-
- exception = state.hosts.at(*state.host_with_error).exception;
+ if (state.host_with_error)
+ exception = state.hosts.at(*state.host_with_error).exception;
}
- chassert(exception);
+ if (!exception)
+ return;
+
process_list_element->cancelQuery(false, exception);
state_changed.notify_all();
}
@@ -741,6 +741,11 @@ void BackupCoordinationStageSync::cancelQueryIfDisconnectedTooLong()
if (!exception)
return;
+ /// In this function we only pass the new `exception` (about that the connection was lost) to `process_list_element`.
+ /// We don't try to create the 'error' node here (because this function is called from watchingThread() and
+ /// we don't want the watching thread to try waiting here for retries or a reconnection).
+ /// Also we don't set the `state.host_with_error` field here because `state.host_with_error` can only be set
+ /// AFTER creating the 'error' node (see the comment for `State`).
process_list_element->cancelQuery(false, exception);
state_changed.notify_all();
}
@@ -870,6 +875,9 @@ bool BackupCoordinationStageSync::checkIfHostsReachStage(const Strings & hosts,
continue;
}
+ if (state.host_with_error)
+ std::rethrow_exception(state.hosts.at(*state.host_with_error).exception);
+
if (host_info.finished)
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
"{} finished without coming to stage {}", getHostDesc(host), stage_to_wait);
@@ -1150,6 +1158,9 @@ bool BackupCoordinationStageSync::checkIfOtherHostsFinish(
if ((host == current_host) || host_info.finished)
continue;
+ if (throw_if_error && state.host_with_error)
+ std::rethrow_exception(state.hosts.at(*state.host_with_error).exception);
+
String reason_text = reason.empty() ? "" : (" " + reason);
String host_status;
diff --git a/src/Backups/BackupCoordinationStageSync.h b/src/Backups/BackupCoordinationStageSync.h
index 879b2422b84..acfd31c05af 100644
--- a/src/Backups/BackupCoordinationStageSync.h
+++ b/src/Backups/BackupCoordinationStageSync.h
@@ -197,6 +197,9 @@ private:
};
/// Information about all the host participating in the current BACKUP or RESTORE operation.
+ /// This information is read from ZooKeeper.
+ /// To simplify the programming logic `state` can only be updated AFTER changing corresponding nodes in ZooKeeper
+ /// (for example, first we create the 'error' node, and only after that we set or read from ZK the `state.host_with_error` field).
struct State
{
std::map hosts; /// std::map because we need to compare states
diff --git a/src/Columns/ColumnVector.h b/src/Columns/ColumnVector.h
index 47840a90a0e..e8bb6ad6798 100644
--- a/src/Columns/ColumnVector.h
+++ b/src/Columns/ColumnVector.h
@@ -52,6 +52,7 @@ private:
explicit ColumnVector(const size_t n) : data(n) {}
ColumnVector(const size_t n, const ValueType x) : data(n, x) {}
ColumnVector(const ColumnVector & src) : data(src.data.begin(), src.data.end()) {}
+ ColumnVector(Container::const_iterator begin, Container::const_iterator end) : data(begin, end) { }
/// Sugar constructor.
ColumnVector(std::initializer_list il) : data{il} {}
diff --git a/src/Common/PipeFDs.cpp b/src/Common/PipeFDs.cpp
index 50eeda1bbe2..c7ca33bc405 100644
--- a/src/Common/PipeFDs.cpp
+++ b/src/Common/PipeFDs.cpp
@@ -10,6 +10,7 @@
#include
#include
+
namespace DB
{
diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp
index 0e2e1f8a3f0..907edf8cc18 100644
--- a/src/Core/Settings.cpp
+++ b/src/Core/Settings.cpp
@@ -150,6 +150,9 @@ Squash blocks passed to the external table to a specified size in bytes, if bloc
)", 0) \
DECLARE(UInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, R"(
Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.
+)", 0) \
+ DECLARE(UInt64, min_joined_block_size_bytes, 524288, R"(
+Minimum block size for JOIN result (if join algorithm supports it). 0 means unlimited.
)", 0) \
DECLARE(UInt64, max_insert_threads, 0, R"(
The maximum number of threads to execute the `INSERT SELECT` query.
diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp
index dc95054f722..64913f03754 100644
--- a/src/Core/SettingsChangesHistory.cpp
+++ b/src/Core/SettingsChangesHistory.cpp
@@ -80,6 +80,7 @@ static std::initializer_list(&type))
+ {
+
+ const auto & left_date_time = left.safeGet();
+ TransformDateTime64 transformer_left(left_date_time.getScale());
+
+ const auto & right_date_time = right.safeGet();
+ TransformDateTime64 transformer_right(right_date_time.getScale());
+
+ return transformer_left.execute(left_date_time.getValue(), date_lut)
+ == transformer_right.execute(right_date_time.getValue(), date_lut)
+ ? is_monotonic
+ : is_not_monotonic;
+ }
+
return Transform::FactorTransform::execute(UInt32(left.safeGet()), date_lut)
== Transform::FactorTransform::execute(UInt32(right.safeGet()), date_lut)
? is_monotonic
diff --git a/src/Functions/parseDateTime.cpp b/src/Functions/parseDateTime.cpp
index 9a9a8fd93b4..362a1c30288 100644
--- a/src/Functions/parseDateTime.cpp
+++ b/src/Functions/parseDateTime.cpp
@@ -457,16 +457,16 @@ namespace
return {};
}
- [[nodiscard]]
- VoidOrError setScale(UInt8 scale_, ParseSyntax parse_syntax_)
+ void setScale(UInt32 scale_, ParseSyntax parse_syntax_)
{
+ /// Because the scale argument for parseDateTime*() is constant, always throw an exception (don't allow continuing to the
+ /// next row like in other set* functions)
if (parse_syntax_ == ParseSyntax::MySQL && scale_ != 6)
- RETURN_ERROR(ErrorCodes::CANNOT_PARSE_DATETIME, "Value {} for scale must be 6 for MySQL parse syntax", std::to_string(scale_))
+ throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Precision {} is invalid (must be 6)", scale);
else if (parse_syntax_ == ParseSyntax::Joda && scale_ > 6)
- RETURN_ERROR(ErrorCodes::CANNOT_PARSE_DATETIME, "Value {} for scale must be in the range [0, 6] for Joda syntax", std::to_string(scale_))
+ throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Precision {} is invalid (must be [0, 6])", scale);
scale = scale_;
- return {};
}
/// For debug
@@ -611,7 +611,6 @@ namespace
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
-
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1, 2}; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
@@ -637,13 +636,13 @@ namespace
data_type = std::make_shared(6, time_zone_name);
else
{
+ /// The precision of the return type is the number of 'S' placeholders.
String format = getFormat(arguments);
std::vector instructions = parseFormat(format);
- /// How many 'S' characters does the format string contain?
- UInt32 s_count = 0;
+ size_t s_count = 0;
for (const auto & instruction : instructions)
{
- const String fragment = instruction.getFragment();
+ const String & fragment = instruction.getFragment();
for (char c : fragment)
{
if (c == 'S')
@@ -654,7 +653,6 @@ namespace
if (s_count > 0)
break;
}
- /// Use s_count as DateTime64's scale.
data_type = std::make_shared(s_count, time_zone_name);
}
}
@@ -715,25 +713,18 @@ namespace
const String format = getFormat(arguments);
const std::vector instructions = parseFormat(format);
const auto & time_zone = getTimeZone(arguments);
- /// Make datetime fit in a cache line.
- alignas(64) DateTime datetime;
+ alignas(64) DateTime datetime; /// Make datetime fit in a cache line.
for (size_t i = 0; i < input_rows_count; ++i)
{
datetime.reset();
+ if constexpr (return_type == ReturnType::DateTime64)
+ datetime.setScale(scale, parse_syntax);
+
StringRef str_ref = col_str->getDataAt(i);
Pos cur = str_ref.data;
Pos end = str_ref.data + str_ref.size;
bool error = false;
- if constexpr (return_type == ReturnType::DateTime64)
- {
- if (auto result = datetime.setScale(static_cast(scale), parse_syntax); !result.has_value())
- {
- const ErrorCodeAndMessage & err = result.error();
- throw Exception(err.error_code, "Invalid scale value: {}, {}", std::to_string(scale), err.error_message);
- }
- }
-
for (const auto & instruction : instructions)
{
if (auto result = instruction.perform(cur, end, datetime); result.has_value())
diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp
index 92aa831f233..2f797d4810b 100644
--- a/src/Interpreters/Aggregator.cpp
+++ b/src/Interpreters/Aggregator.cpp
@@ -3387,6 +3387,8 @@ UInt64 calculateCacheKey(const DB::ASTPtr & select_query)
SipHash hash;
hash.update(select.tables()->getTreeHash(/*ignore_aliases=*/true));
+ if (const auto prewhere = select.prewhere())
+ hash.update(prewhere->getTreeHash(/*ignore_aliases=*/true));
if (const auto where = select.where())
hash.update(where->getTreeHash(/*ignore_aliases=*/true));
if (const auto group_by = select.groupBy())
diff --git a/src/Interpreters/ConcurrentHashJoin.cpp b/src/Interpreters/ConcurrentHashJoin.cpp
index d906540d6df..ec7e8d4aef8 100644
--- a/src/Interpreters/ConcurrentHashJoin.cpp
+++ b/src/Interpreters/ConcurrentHashJoin.cpp
@@ -5,16 +5,21 @@
#include
#include
#include
+#include
+#include
#include
#include
#include
+#include
#include
#include
#include
+#include
#include
#include
#include
#include
+#include
#include
#include
#include
@@ -24,6 +29,12 @@
#include
#include
+#include
+#include
+#include
+
+using namespace DB;
+
namespace ProfileEvents
{
extern const Event HashJoinPreallocatedElementsInHashTables;
@@ -116,9 +127,7 @@ ConcurrentHashJoin::ConcurrentHashJoin(
auto inner_hash_join = std::make_shared();
inner_hash_join->data = std::make_unique(
table_join_, right_sample_block, any_take_last_row_, reserve_size, fmt::format("concurrent{}", idx));
- /// Non zero `max_joined_block_rows` allows to process block partially and return not processed part.
- /// TODO: It's not handled properly in ConcurrentHashJoin case, so we set it to 0 to disable this feature.
- inner_hash_join->data->setMaxJoinedBlockRows(0);
+ inner_hash_join->data->setMaxJoinedBlockRows(table_join->maxJoinedBlockRows());
hash_joins[idx] = std::move(inner_hash_join);
});
}
@@ -165,10 +174,13 @@ ConcurrentHashJoin::~ConcurrentHashJoin()
}
}
-bool ConcurrentHashJoin::addBlockToJoin(const Block & right_block, bool check_limits)
+bool ConcurrentHashJoin::addBlockToJoin(const Block & right_block_, bool check_limits)
{
- Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_right, right_block);
+ /// We materialize columns here to avoid materializing them multiple times on different threads
+ /// (inside different `hash_join`-s) because the block will be shared.
+ Block right_block = hash_joins[0]->data->materializeColumnsFromRightBlock(right_block_);
+ auto dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_right, std::move(right_block));
size_t blocks_left = 0;
for (const auto & block : dispatched_blocks)
{
@@ -211,19 +223,52 @@ bool ConcurrentHashJoin::addBlockToJoin(const Block & right_block, bool check_li
void ConcurrentHashJoin::joinBlock(Block & block, std::shared_ptr & /*not_processed*/)
{
- Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_left, block);
+ Blocks res;
+ ExtraScatteredBlocks extra_blocks;
+ joinBlock(block, extra_blocks, res);
+ chassert(!extra_blocks.rows());
+ block = concatenateBlocks(res);
+}
+
+void ConcurrentHashJoin::joinBlock(Block & block, ExtraScatteredBlocks & extra_blocks, std::vector & res)
+{
+ ScatteredBlocks dispatched_blocks;
+ auto & remaining_blocks = extra_blocks.remaining_blocks;
+ if (extra_blocks.rows())
+ {
+ dispatched_blocks.swap(remaining_blocks);
+ }
+ else
+ {
+ hash_joins[0]->data->materializeColumnsFromLeftBlock(block);
+ dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_left, std::move(block));
+ }
+
block = {};
+
+ /// Just in case, should be no-op always
+ remaining_blocks.resize(slots);
+
+ chassert(res.empty());
+ res.clear();
+ res.reserve(dispatched_blocks.size());
+
for (size_t i = 0; i < dispatched_blocks.size(); ++i)
{
std::shared_ptr none_extra_block;
auto & hash_join = hash_joins[i];
auto & dispatched_block = dispatched_blocks[i];
- hash_join->data->joinBlock(dispatched_block, none_extra_block);
+ if (dispatched_block && (i == 0 || dispatched_block.rows()))
+ hash_join->data->joinBlock(dispatched_block, remaining_blocks[i]);
if (none_extra_block && !none_extra_block->empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "not_processed should be empty");
}
-
- block = concatenateBlocks(dispatched_blocks);
+ for (size_t i = 0; i < dispatched_blocks.size(); ++i)
+ {
+ auto & dispatched_block = dispatched_blocks[i];
+ if (dispatched_block && (i == 0 || dispatched_block.rows()))
+ res.emplace_back(std::move(dispatched_block).getSourceBlock());
+ }
}
void ConcurrentHashJoin::checkTypesOfKeys(const Block & block) const
@@ -302,10 +347,9 @@ static ALWAYS_INLINE IColumn::Selector hashToSelector(const WeakHash32 & hash, s
return selector;
}
-IColumn::Selector ConcurrentHashJoin::selectDispatchBlock(const Strings & key_columns_names, const Block & from_block)
+IColumn::Selector selectDispatchBlock(size_t num_shards, const Strings & key_columns_names, const Block & from_block)
{
size_t num_rows = from_block.rows();
- size_t num_shards = hash_joins.size();
WeakHash32 hash(num_rows);
for (const auto & key_name : key_columns_names)
@@ -317,40 +361,101 @@ IColumn::Selector ConcurrentHashJoin::selectDispatchBlock(const Strings & key_co
return hashToSelector(hash, num_shards);
}
-Blocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, const Block & from_block)
+ScatteredBlocks scatterBlocksByCopying(size_t num_shards, const IColumn::Selector & selector, const Block & from_block)
{
- /// TODO: use JoinCommon::scatterBlockByHash
- size_t num_shards = hash_joins.size();
- size_t num_cols = from_block.columns();
-
- IColumn::Selector selector = selectDispatchBlock(key_columns_names, from_block);
-
- Blocks result(num_shards);
+ Blocks blocks(num_shards);
for (size_t i = 0; i < num_shards; ++i)
- result[i] = from_block.cloneEmpty();
+ blocks[i] = from_block.cloneEmpty();
- for (size_t i = 0; i < num_cols; ++i)
+ for (size_t i = 0; i < from_block.columns(); ++i)
{
auto dispatched_columns = from_block.getByPosition(i).column->scatter(num_shards, selector);
- assert(result.size() == dispatched_columns.size());
+ chassert(blocks.size() == dispatched_columns.size());
for (size_t block_index = 0; block_index < num_shards; ++block_index)
{
- result[block_index].getByPosition(i).column = std::move(dispatched_columns[block_index]);
+ blocks[block_index].getByPosition(i).column = std::move(dispatched_columns[block_index]);
}
}
+
+ ScatteredBlocks result;
+ result.reserve(num_shards);
+ for (size_t i = 0; i < num_shards; ++i)
+ result.emplace_back(std::move(blocks[i]));
return result;
}
-UInt64 calculateCacheKey(std::shared_ptr & table_join, const QueryTreeNodePtr & right_table_expression)
+ScatteredBlocks scatterBlocksWithSelector(size_t num_shards, const IColumn::Selector & selector, const Block & from_block)
{
+ std::vector selectors(num_shards);
+ for (size_t i = 0; i < num_shards; ++i)
+ {
+ selectors[i] = ScatteredBlock::Indexes::create();
+ selectors[i]->reserve(selector.size() / num_shards + 1);
+ }
+ for (size_t i = 0; i < selector.size(); ++i)
+ {
+ const size_t shard = selector[i];
+ selectors[shard]->getData().push_back(i);
+ }
+ ScatteredBlocks result;
+ result.reserve(num_shards);
+ for (size_t i = 0; i < num_shards; ++i)
+ result.emplace_back(from_block, std::move(selectors[i]));
+ return result;
+}
+
+ScatteredBlocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, Block && from_block)
+{
+ size_t num_shards = hash_joins.size();
+ if (num_shards == 1)
+ {
+ ScatteredBlocks res;
+ res.emplace_back(std::move(from_block));
+ return res;
+ }
+
+ IColumn::Selector selector = selectDispatchBlock(num_shards, key_columns_names, from_block);
+
+ /// With zero-copy approach we won't copy the source columns, but will create a new one with indices.
+ /// This is not beneficial when the whole set of columns is e.g. a single small column.
+ constexpr auto threshold = sizeof(IColumn::Selector::value_type);
+ const auto & data_types = from_block.getDataTypes();
+ const bool use_zero_copy_approach
+ = std::accumulate(
+ data_types.begin(),
+ data_types.end(),
+ 0u,
+ [](size_t sum, const DataTypePtr & type)
+ { return sum + (type->haveMaximumSizeOfValue() ? type->getMaximumSizeOfValueInMemory() : threshold + 1); })
+ > threshold;
+
+ return use_zero_copy_approach ? scatterBlocksWithSelector(num_shards, selector, from_block)
+ : scatterBlocksByCopying(num_shards, selector, from_block);
+}
+
+UInt64 calculateCacheKey(
+ std::shared_ptr & table_join, const QueryTreeNodePtr & right_table_expression, const SelectQueryInfo & select_query_info)
+{
+ const auto * select = select_query_info.query->as();
+ if (!select)
+ return 0;
+
IQueryTreeNode::HashState hash;
+
+ if (const auto prewhere = select->prewhere())
+ hash.update(prewhere->getTreeHash(/*ignore_aliases=*/true));
+ if (const auto where = select->where())
+ hash.update(where->getTreeHash(/*ignore_aliases=*/true));
+
chassert(right_table_expression);
hash.update(right_table_expression->getTreeHash());
+
chassert(table_join && table_join->oneDisjunct());
const auto keys
= NameOrderedSet{table_join->getClauses().at(0).key_names_right.begin(), table_join->getClauses().at(0).key_names_right.end()};
for (const auto & name : keys)
hash.update(name);
+
return hash.get64();
}
}
diff --git a/src/Interpreters/ConcurrentHashJoin.h b/src/Interpreters/ConcurrentHashJoin.h
index a911edaccc3..c1a421f713b 100644
--- a/src/Interpreters/ConcurrentHashJoin.h
+++ b/src/Interpreters/ConcurrentHashJoin.h
@@ -1,13 +1,11 @@
#pragma once
-#include
#include
-#include
#include
#include
#include
-#include
#include
+#include
#include
#include
#include
@@ -17,6 +15,8 @@
namespace DB
{
+struct SelectQueryInfo;
+
/**
* Can run addBlockToJoin() parallelly to speedup the join process. On test, it almose linear speedup by
* the degree of parallelism.
@@ -47,7 +47,7 @@ public:
std::string getName() const override { return "ConcurrentHashJoin"; }
const TableJoin & getTableJoin() const override { return *table_join; }
- bool addBlockToJoin(const Block & block, bool check_limits) override;
+ bool addBlockToJoin(const Block & right_block_, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
void joinBlock(Block & block, std::shared_ptr & not_processed) override;
void setTotals(const Block & block) override;
@@ -57,6 +57,9 @@ public:
bool alwaysReturnsEmptySet() const override;
bool supportParallelJoin() const override { return true; }
+ bool isScatteredJoin() const override { return true; }
+ void joinBlock(Block & block, ExtraScatteredBlocks & extra_blocks, std::vector & res) override;
+
IBlocksStreamPtr
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
@@ -78,9 +81,9 @@ private:
std::mutex totals_mutex;
Block totals;
- IColumn::Selector selectDispatchBlock(const Strings & key_columns_names, const Block & from_block);
- Blocks dispatchBlock(const Strings & key_columns_names, const Block & from_block);
+ ScatteredBlocks dispatchBlock(const Strings & key_columns_names, Block && from_block);
};
-UInt64 calculateCacheKey(std::shared_ptr & table_join, const QueryTreeNodePtr & right_table_expression);
+UInt64 calculateCacheKey(
+ std::shared_ptr & table_join, const QueryTreeNodePtr & right_table_expression, const SelectQueryInfo & select_query_info);
}
diff --git a/src/Interpreters/HashJoin/AddedColumns.cpp b/src/Interpreters/HashJoin/AddedColumns.cpp
index 2e1ecb0da72..c8f64204266 100644
--- a/src/Interpreters/HashJoin/AddedColumns.cpp
+++ b/src/Interpreters/HashJoin/AddedColumns.cpp
@@ -3,14 +3,16 @@
namespace DB
{
-JoinOnKeyColumns::JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_)
- : key_names(key_names_)
- , materialized_keys_holder(JoinCommon::materializeColumns(
- block, key_names)) /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them.
+JoinOnKeyColumns::JoinOnKeyColumns(
+ const ScatteredBlock & block_, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_)
+ : block(block_)
+ , key_names(key_names_)
+ /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them.
+ , materialized_keys_holder(JoinCommon::materializeColumns(block.getSourceBlock(), key_names))
, key_columns(JoinCommon::getRawPointers(materialized_keys_holder))
, null_map(nullptr)
, null_map_holder(extractNestedColumnsAndNullMap(key_columns, null_map))
- , join_mask_column(JoinCommon::getColumnAsMask(block, cond_column_name))
+ , join_mask_column(JoinCommon::getColumnAsMask(block.getSourceBlock(), cond_column_name))
, key_sizes(key_sizes_)
{
}
diff --git a/src/Interpreters/HashJoin/AddedColumns.h b/src/Interpreters/HashJoin/AddedColumns.h
index 4603d493329..885c1baca8c 100644
--- a/src/Interpreters/HashJoin/AddedColumns.h
+++ b/src/Interpreters/HashJoin/AddedColumns.h
@@ -1,4 +1,6 @@
#pragma once
+
+#include
#include
#include
@@ -14,6 +16,8 @@ using ExpressionActionsPtr = std::shared_ptr;
struct JoinOnKeyColumns
{
+ const ScatteredBlock & block;
+
Names key_names;
Columns materialized_keys_holder;
@@ -27,9 +31,13 @@ struct JoinOnKeyColumns
Sizes key_sizes;
- explicit JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_);
+ JoinOnKeyColumns(
+ const ScatteredBlock & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_);
- bool isRowFiltered(size_t i) const { return join_mask_column.isRowFiltered(i); }
+ bool isRowFiltered(size_t i) const
+ {
+ return join_mask_column.isRowFiltered(i);
+ }
};
template
@@ -54,7 +62,7 @@ public:
};
AddedColumns(
- const Block & left_block_,
+ const ScatteredBlock & left_block_,
const Block & block_with_columns_to_add,
const Block & saved_block_sample,
const HashJoin & join,
@@ -62,10 +70,11 @@ public:
ExpressionActionsPtr additional_filter_expression_,
bool is_asof_join,
bool is_join_get_)
- : left_block(left_block_)
+ : src_block(left_block_)
+ , left_block(left_block_.getSourceBlock())
, join_on_keys(join_on_keys_)
, additional_filter_expression(additional_filter_expression_)
- , rows_to_add(left_block.rows())
+ , rows_to_add(left_block_.rows())
, join_data_avg_perkey_rows(join.getJoinedData()->avgPerKeyRows())
, output_by_row_list_threshold(join.getTableJoin().outputByRowListPerkeyRowsThreshold())
, join_data_sorted(join.getJoinedData()->sorted)
@@ -139,6 +148,7 @@ public:
static constexpr bool isLazy() { return lazy; }
+ const ScatteredBlock & src_block;
Block left_block;
std::vector join_on_keys;
ExpressionActionsPtr additional_filter_expression;
@@ -159,7 +169,7 @@ public:
return;
/// Do not allow big allocations when user set max_joined_block_rows to huge value
- size_t reserve_size = std::min(max_joined_block_rows, DEFAULT_BLOCK_SIZE * 2);
+ size_t reserve_size = std::min(max_joined_block_rows, rows_to_add * 2);
if (need_replicate)
/// Reserve 10% more space for columns, because some rows can be repeated
@@ -218,7 +228,7 @@ private:
void addColumn(const ColumnWithTypeAndName & src_column, const std::string & qualified_name)
{
columns.push_back(src_column.column->cloneEmpty());
- columns.back()->reserve(src_column.column->size());
+ columns.back()->reserve(rows_to_add);
type_name.emplace_back(src_column.type, src_column.name, qualified_name);
}
diff --git a/src/Interpreters/HashJoin/HashJoin.cpp b/src/Interpreters/HashJoin/HashJoin.cpp
index a2c9f94a6ae..1d0c6f75b8e 100644
--- a/src/Interpreters/HashJoin/HashJoin.cpp
+++ b/src/Interpreters/HashJoin/HashJoin.cpp
@@ -13,24 +13,23 @@
#include
+#include
#include
#include
-#include
#include
#include
#include
-#include
-#include
#include
#include
+#include
+#include
+#include
#include
-#include
#include
#include
-#include
-
+#include
#include
#include
@@ -40,16 +39,16 @@ namespace DB
namespace ErrorCodes
{
- extern const int NOT_IMPLEMENTED;
- extern const int NO_SUCH_COLUMN_IN_TABLE;
- extern const int INCOMPATIBLE_TYPE_OF_JOIN;
- extern const int UNSUPPORTED_JOIN_KEYS;
- extern const int LOGICAL_ERROR;
- extern const int SYNTAX_ERROR;
- extern const int SET_SIZE_LIMIT_EXCEEDED;
- extern const int TYPE_MISMATCH;
- extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
- extern const int INVALID_JOIN_ON_EXPRESSION;
+extern const int NOT_IMPLEMENTED;
+extern const int NO_SUCH_COLUMN_IN_TABLE;
+extern const int INCOMPATIBLE_TYPE_OF_JOIN;
+extern const int UNSUPPORTED_JOIN_KEYS;
+extern const int LOGICAL_ERROR;
+extern const int SYNTAX_ERROR;
+extern const int SET_SIZE_LIMIT_EXCEEDED;
+extern const int TYPE_MISMATCH;
+extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
+extern const int INVALID_JOIN_ON_EXPRESSION;
}
namespace
@@ -72,6 +71,40 @@ Int64 getCurrentQueryMemoryUsage()
return 0;
}
+Block filterColumnsPresentInSampleBlock(const Block & block, const Block & sample_block)
+{
+ Block filtered_block;
+ for (const auto & sample_column : sample_block.getColumnsWithTypeAndName())
+ filtered_block.insert(block.getByName(sample_column.name));
+ return filtered_block;
+}
+
+ScatteredBlock filterColumnsPresentInSampleBlock(const ScatteredBlock & block, const Block & sample_block)
+{
+ return ScatteredBlock{filterColumnsPresentInSampleBlock(block.getSourceBlock(), sample_block)};
+}
+
+Block materializeColumnsFromRightBlock(Block block, const Block & sample_block, const Names &)
+{
+ for (const auto & sample_column : sample_block.getColumnsWithTypeAndName())
+ {
+ auto & column = block.getByName(sample_column.name);
+
+ /// There's no optimization for right side const columns. Remove constness if any.
+ column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst());
+
+ if (column.column->lowCardinality() && !sample_column.column->lowCardinality())
+ {
+ column.column = column.column->convertToFullColumnIfLowCardinality();
+ column.type = removeLowCardinality(column.type);
+ }
+
+ if (sample_column.column->isNullable())
+ JoinCommon::convertColumnToNullable(column);
+ }
+
+ return block;
+}
}
static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nullable)
@@ -91,8 +124,12 @@ static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nulla
}
}
-HashJoin::HashJoin(std::shared_ptr table_join_, const Block & right_sample_block_,
- bool any_take_last_row_, size_t reserve_num_, const String & instance_id_)
+HashJoin::HashJoin(
+ std::shared_ptr table_join_,
+ const Block & right_sample_block_,
+ bool any_take_last_row_,
+ size_t reserve_num_,
+ const String & instance_id_)
: table_join(table_join_)
, kind(table_join->kind())
, strictness(table_join->strictness())
@@ -107,8 +144,15 @@ HashJoin::HashJoin(std::shared_ptr table_join_, const Block & right_s
, instance_log_id(!instance_id_.empty() ? "(" + instance_id_ + ") " : "")
, log(getLogger("HashJoin"))
{
- LOG_TRACE(log, "{}Keys: {}, datatype: {}, kind: {}, strictness: {}, right header: {}",
- instance_log_id, TableJoin::formatClauses(table_join->getClauses(), true), data->type, kind, strictness, right_sample_block.dumpStructure());
+ LOG_TRACE(
+ log,
+ "{}Keys: {}, datatype: {}, kind: {}, strictness: {}, right header: {}",
+ instance_log_id,
+ TableJoin::formatClauses(table_join->getClauses(), true),
+ data->type,
+ kind,
+ strictness,
+ right_sample_block.dumpStructure());
validateAdditionalFilterExpression(table_join->getMixedJoinExpression());
@@ -252,8 +296,8 @@ HashJoin::Type HashJoin::chooseMethod(JoinKind kind, const ColumnRawPtrs & key_c
};
const auto * key_column = key_columns[0];
- if (is_string_column(key_column) ||
- (isColumnConst(*key_column) && is_string_column(assert_cast(key_column)->getDataColumnPtr().get())))
+ if (is_string_column(key_column)
+ || (isColumnConst(*key_column) && is_string_column(assert_cast(key_column)->getDataColumnPtr().get())))
return Type::key_string;
}
@@ -323,7 +367,8 @@ size_t HashJoin::getTotalRowCount() const
auto prefer_use_maps_all = table_join->getMixedJoinExpression() != nullptr;
for (const auto & map : data->maps)
{
- joinDispatch(kind, strictness, map, prefer_use_maps_all, [&](auto, auto, auto & map_) { res += map_.getTotalRowCount(data->type); });
+ joinDispatch(
+ kind, strictness, map, prefer_use_maps_all, [&](auto, auto, auto & map_) { res += map_.getTotalRowCount(data->type); });
}
}
@@ -338,16 +383,22 @@ void HashJoin::doDebugAsserts() const
debug_blocks_allocated_size += block.allocatedBytes();
if (data->blocks_allocated_size != debug_blocks_allocated_size)
- throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_allocated_size != debug_blocks_allocated_size ({} != {})",
- data->blocks_allocated_size, debug_blocks_allocated_size);
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "data->blocks_allocated_size != debug_blocks_allocated_size ({} != {})",
+ data->blocks_allocated_size,
+ debug_blocks_allocated_size);
size_t debug_blocks_nullmaps_allocated_size = 0;
for (const auto & nullmap : data->blocks_nullmaps)
- debug_blocks_nullmaps_allocated_size += nullmap.second->allocatedBytes();
+ debug_blocks_nullmaps_allocated_size += nullmap.allocatedBytes();
if (data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size)
- throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
- data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size);
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
+ data->blocks_nullmaps_allocated_size,
+ debug_blocks_nullmaps_allocated_size);
#endif
}
@@ -369,7 +420,12 @@ size_t HashJoin::getTotalByteCount() const
auto prefer_use_maps_all = table_join->getMixedJoinExpression() != nullptr;
for (const auto & map : data->maps)
{
- joinDispatch(kind, strictness, map, prefer_use_maps_all, [&](auto, auto, auto & map_) { res += map_.getTotalByteCountImpl(data->type); });
+ joinDispatch(
+ kind,
+ strictness,
+ map,
+ prefer_use_maps_all,
+ [&](auto, auto, auto & map_) { res += map_.getTotalByteCountImpl(data->type); });
}
}
return res;
@@ -386,11 +442,8 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample)
bool multiple_disjuncts = !table_join->oneDisjunct();
/// We could remove key columns for LEFT | INNER HashJoin but we should keep them for JoinSwitcher (if any).
- bool save_key_columns = table_join->isEnabledAlgorithm(JoinAlgorithm::AUTO) ||
- table_join->isEnabledAlgorithm(JoinAlgorithm::GRACE_HASH) ||
- isRightOrFull(kind) ||
- multiple_disjuncts ||
- table_join->getMixedJoinExpression();
+ bool save_key_columns = table_join->isEnabledAlgorithm(JoinAlgorithm::AUTO) || table_join->isEnabledAlgorithm(JoinAlgorithm::GRACE_HASH)
+ || isRightOrFull(kind) || multiple_disjuncts || table_join->getMixedJoinExpression();
if (save_key_columns)
{
saved_block_sample = right_table_keys.cloneEmpty();
@@ -411,29 +464,27 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample)
}
}
+void HashJoin::materializeColumnsFromLeftBlock(Block & block) const
+{
+ /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized.
+ * Because if they are constants, then in the "not joined" rows, they may have different values
+ * - default values, which can differ from the values of these constants.
+ */
+ if (kind == JoinKind::Right || kind == JoinKind::Full)
+ {
+ materializeBlockInplace(block);
+ }
+}
+
+Block HashJoin::materializeColumnsFromRightBlock(Block block) const
+{
+ return DB::materializeColumnsFromRightBlock(std::move(block), savedBlockSample(), table_join->getAllNames(JoinTableSide::Right));
+}
+
Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block_sample_)
{
- Block structured_block;
- for (const auto & sample_column : saved_block_sample_.getColumnsWithTypeAndName())
- {
- ColumnWithTypeAndName column = block.getByName(sample_column.name);
-
- /// There's no optimization for right side const columns. Remove constness if any.
- column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst());
-
- if (column.column->lowCardinality() && !sample_column.column->lowCardinality())
- {
- column.column = column.column->convertToFullColumnIfLowCardinality();
- column.type = removeLowCardinality(column.type);
- }
-
- if (sample_column.column->isNullable())
- JoinCommon::convertColumnToNullable(column);
-
- structured_block.insert(std::move(column));
- }
-
- return structured_block;
+ Block prepared_block = DB::materializeColumnsFromRightBlock(block, saved_block_sample_, {});
+ return filterColumnsPresentInSampleBlock(prepared_block, saved_block_sample_);
}
Block HashJoin::prepareRightBlock(const Block & block) const
@@ -441,15 +492,22 @@ Block HashJoin::prepareRightBlock(const Block & block) const
return prepareRightBlock(block, savedBlockSample());
}
-bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
+bool HashJoin::addBlockToJoin(const Block & source_block, bool check_limits)
+{
+ auto materialized = materializeColumnsFromRightBlock(source_block);
+ auto scattered_block = ScatteredBlock{materialized};
+ return addBlockToJoin(scattered_block, check_limits);
+}
+
+bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
{
if (!data)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Join data was released");
/// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency.
/// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code.
- if (unlikely(source_block_.rows() > std::numeric_limits::max()))
- throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Too many rows in right table block for HashJoin: {}", source_block_.rows());
+ if (unlikely(source_block.rows() > std::numeric_limits::max()))
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Too many rows in right table block for HashJoin: {}", source_block.rows());
/** We do not allocate memory for stored blocks inside HashJoin, only for hash table.
* In case when we have all the blocks allocated before the first `addBlockToJoin` call, will already be quite high.
@@ -458,7 +516,6 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (!memory_usage_before_adding_blocks)
memory_usage_before_adding_blocks = getCurrentQueryMemoryUsage();
- Block source_block = source_block_;
if (strictness == JoinStrictness::Asof)
{
chassert(kind == JoinKind::Left || kind == JoinKind::Inner);
@@ -467,7 +524,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
/// We support only INNER/LEFT ASOF join, so rows with NULLs never return from the right joined table.
/// So filter them out here not to handle in implementation.
const auto & asof_key_name = table_join->getOnlyClause().key_names_right.back();
- auto & asof_column = source_block.getByName(asof_key_name);
+ const auto & asof_column = source_block.getByName(asof_key_name);
if (asof_column.type->isNullable())
{
@@ -485,13 +542,12 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
for (size_t i = 0; i < asof_column_nullable.size(); ++i)
negative_null_map[i] = !asof_column_nullable[i];
- for (auto & column : source_block)
- column.column = column.column->filter(negative_null_map, -1);
+ source_block.filter(negative_null_map);
}
}
}
- size_t rows = source_block.rows();
+ const size_t rows = source_block.rows();
data->rows_to_join += rows;
const auto & right_key_names = table_join->getAllNames(JoinTableSide::Right);
ColumnPtrMap all_key_columns(right_key_names.size());
@@ -501,7 +557,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
all_key_columns[column_name] = recursiveRemoveSparse(column->convertToFullColumnIfConst())->convertToFullColumnIfLowCardinality();
}
- Block block_to_save = prepareRightBlock(source_block);
+ ScatteredBlock block_to_save = filterColumnsPresentInSampleBlock(source_block, savedBlockSample());
if (shrink_blocks)
block_to_save = block_to_save.shrinkToFit();
@@ -515,7 +571,8 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (!tmp_stream)
tmp_stream.emplace(right_sample_block, tmp_data.get());
- tmp_stream.value()->write(block_to_save);
+ chassert(!source_block.wasScattered()); /// We don't run parallel_hash for cross join
+ tmp_stream.value()->write(block_to_save.getSourceBlock());
return true;
}
@@ -527,7 +584,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (storage_join_lock)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "addBlockToJoin called when HashJoin locked to prevent updates");
- assertBlocksHaveEqualStructure(data->sample_block, block_to_save, "joined block");
+ assertBlocksHaveEqualStructure(data->sample_block, block_to_save.getSourceBlock(), "joined block");
size_t min_bytes_to_compress = table_join->crossJoinMinBytesToCompress();
size_t min_rows_to_compress = table_join->crossJoinMinRowsToCompress();
@@ -536,6 +593,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
&& ((min_bytes_to_compress && getTotalByteCount() >= min_bytes_to_compress)
|| (min_rows_to_compress && getTotalRowCount() >= min_rows_to_compress)))
{
+ chassert(!source_block.wasScattered()); /// We don't run parallel_hash for cross join
block_to_save = block_to_save.compress();
have_compressed = true;
}
@@ -543,7 +601,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
doDebugAsserts();
data->blocks_allocated_size += block_to_save.allocatedBytes();
data->blocks.emplace_back(std::move(block_to_save));
- Block * stored_block = &data->blocks.back();
+ const auto * stored_block = &data->blocks.back();
doDebugAsserts();
if (rows)
@@ -570,7 +628,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
save_nullmap |= (*null_map)[i];
}
- auto join_mask_col = JoinCommon::getColumnAsMask(source_block, onexprs[onexpr_idx].condColumnNames().second);
+ auto join_mask_col = JoinCommon::getColumnAsMask(source_block.getSourceBlock(), onexprs[onexpr_idx].condColumnNames().second);
/// Save blocks that do not hold conditions in ON section
ColumnUInt8::MutablePtr not_joined_map = nullptr;
if (!flag_per_row && isRightOrFull(kind) && join_mask_col.hasData())
@@ -595,39 +653,45 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
bool is_inserted = false;
if (kind != JoinKind::Cross)
{
- joinDispatch(kind, strictness, data->maps[onexpr_idx], prefer_use_maps_all, [&](auto kind_, auto strictness_, auto & map)
- {
- size_t size = HashJoinMethods>::insertFromBlockImpl(
+ joinDispatch(
+ kind,
+ strictness,
+ data->maps[onexpr_idx],
+ prefer_use_maps_all,
+ [&](auto kind_, auto strictness_, auto & map)
+ {
+ size_t size = HashJoinMethods>::insertFromBlockImpl(
*this,
data->type,
map,
- rows,
key_columns,
key_sizes[onexpr_idx],
- stored_block,
+ &stored_block->getSourceBlock(),
+ source_block.getSelector(),
null_map,
join_mask_col.getData(),
data->pool,
is_inserted);
- if (flag_per_row)
- used_flags->reinit, MapsAll>>(stored_block);
- else if (is_inserted)
- /// Number of buckets + 1 value from zero storage
- used_flags->reinit, MapsAll>>(size + 1);
- });
+ if (flag_per_row)
+ used_flags->reinit, MapsAll>>(
+ &stored_block->getSourceBlock());
+ else if (is_inserted)
+ /// Number of buckets + 1 value from zero storage
+ used_flags->reinit, MapsAll>>(size + 1);
+ });
}
if (!flag_per_row && save_nullmap && is_inserted)
{
- data->blocks_nullmaps_allocated_size += null_map_holder->allocatedBytes();
data->blocks_nullmaps.emplace_back(stored_block, null_map_holder);
+ data->blocks_nullmaps_allocated_size += data->blocks_nullmaps.back().allocatedBytes();
}
if (!flag_per_row && not_joined_map && is_inserted)
{
- data->blocks_nullmaps_allocated_size += not_joined_map->allocatedBytes();
data->blocks_nullmaps.emplace_back(stored_block, std::move(not_joined_map));
+ data->blocks_nullmaps_allocated_size += data->blocks_nullmaps.back().allocatedBytes();
}
if (!flag_per_row && !is_inserted)
@@ -654,7 +718,6 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_optimize)
{
-
Int64 current_memory_usage = getCurrentQueryMemoryUsage();
Int64 query_memory_usage_delta = current_memory_usage - memory_usage_before_adding_blocks;
Int64 max_total_bytes_for_query = memory_usage_before_adding_blocks ? table_join->getMaxMemoryUsage() : 0;
@@ -671,15 +734,19 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
* is bigger than half of all memory available for query,
* then shrink stored blocks to fit.
*/
- shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) ||
- (max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2);
+ shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2)
+ || (max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2);
if (!shrink_blocks)
return;
}
- LOG_DEBUG(log, "Shrinking stored blocks, memory consumption is {} {} calculated by join, {} {} by memory tracker",
- ReadableSize(total_bytes_in_join), max_total_bytes_in_join ? fmt::format("/ {}", ReadableSize(max_total_bytes_in_join)) : "",
- ReadableSize(query_memory_usage_delta), max_total_bytes_for_query ? fmt::format("/ {}", ReadableSize(max_total_bytes_for_query)) : "");
+ LOG_DEBUG(
+ log,
+ "Shrinking stored blocks, memory consumption is {} {} calculated by join, {} {} by memory tracker",
+ ReadableSize(total_bytes_in_join),
+ max_total_bytes_in_join ? fmt::format("/ {}", ReadableSize(max_total_bytes_in_join)) : "",
+ ReadableSize(query_memory_usage_delta),
+ max_total_bytes_for_query ? fmt::format("/ {}", ReadableSize(max_total_bytes_for_query)) : "");
for (auto & stored_block : data->blocks)
{
@@ -692,10 +759,13 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
if (old_size >= new_size)
{
if (data->blocks_allocated_size < old_size - new_size)
- throw Exception(ErrorCodes::LOGICAL_ERROR,
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
"Blocks allocated size value is broken: "
"blocks_allocated_size = {}, old_size = {}, new_size = {}",
- data->blocks_allocated_size, old_size, new_size);
+ data->blocks_allocated_size,
+ old_size,
+ new_size);
data->blocks_allocated_size -= old_size - new_size;
}
@@ -710,9 +780,13 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
Int64 new_current_memory_usage = getCurrentQueryMemoryUsage();
- LOG_DEBUG(log, "Shrunk stored blocks {} freed ({} by memory tracker), new memory consumption is {} ({} by memory tracker)",
- ReadableSize(total_bytes_in_join - new_total_bytes_in_join), ReadableSize(current_memory_usage - new_current_memory_usage),
- ReadableSize(new_total_bytes_in_join), ReadableSize(new_current_memory_usage));
+ LOG_DEBUG(
+ log,
+ "Shrunk stored blocks {} freed ({} by memory tracker), new memory consumption is {} ({} by memory tracker)",
+ ReadableSize(total_bytes_in_join - new_total_bytes_in_join),
+ ReadableSize(current_memory_usage - new_current_memory_usage),
+ ReadableSize(new_total_bytes_in_join),
+ ReadableSize(new_current_memory_usage));
total_bytes_in_join = new_total_bytes_in_join;
}
@@ -776,7 +850,7 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
}
};
- for (const Block & block_right : data->blocks)
+ for (const auto & block_right : data->blocks)
{
++block_number;
if (block_number < start_right_block)
@@ -784,9 +858,12 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
/// The following statement cannot be substituted with `process_right_block(!have_compressed ? block_right : block_right.decompress())`
/// because it will lead to copying of `block_right` even if its branch is taken (because common type of `block_right` and `block_right.decompress()` is `Block`).
if (!have_compressed)
- process_right_block(block_right);
+ process_right_block(block_right.getSourceBlock());
else
- process_right_block(block_right.decompress());
+ {
+ chassert(!block_right.wasScattered()); /// Compression only happens for cross join and scattering only for concurrent hash
+ process_right_block(block_right.getSourceBlock().decompress());
+ }
if (rows_added > max_joined_block_rows)
{
@@ -837,9 +914,11 @@ DataTypePtr HashJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types,
{
size_t num_keys = data_types.size();
if (right_table_keys.columns() != num_keys)
- throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
+ throw Exception(
+ ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function joinGet{} doesn't match: passed, should be equal to {}",
- toString(or_null ? "OrNull" : ""), toString(num_keys));
+ toString(or_null ? "OrNull" : ""),
+ toString(num_keys));
for (size_t i = 0; i < num_keys; ++i)
{
@@ -848,8 +927,13 @@ DataTypePtr HashJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types,
auto left_type = removeNullable(recursiveRemoveLowCardinality(left_type_origin));
auto right_type = removeNullable(recursiveRemoveLowCardinality(right_type_origin));
if (!left_type->equals(*right_type))
- throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch in joinGet key {}: "
- "found type {}, while the needed type is {}", i, left_type->getName(), right_type->getName());
+ throw Exception(
+ ErrorCodes::TYPE_MISMATCH,
+ "Type mismatch in joinGet key {}: "
+ "found type {}, while the needed type is {}",
+ i,
+ left_type->getName(),
+ right_type->getName());
}
if (!sample_block_with_columns_to_add.has(column_name))
@@ -865,8 +949,7 @@ DataTypePtr HashJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types,
/// TODO: return array of values when strictness == JoinStrictness::All
ColumnWithTypeAndName HashJoin::joinGet(const Block & block, const Block & block_with_columns_to_add) const
{
- bool is_valid = (strictness == JoinStrictness::Any || strictness == JoinStrictness::RightAny)
- && kind == JoinKind::Left;
+ bool is_valid = (strictness == JoinStrictness::Any || strictness == JoinStrictness::RightAny) && kind == JoinKind::Left;
if (!is_valid)
throw Exception(ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN, "joinGet only supports StorageJoin of type Left Any");
const auto & key_names_right = table_join->getOnlyClause().key_names_right;
@@ -880,12 +963,14 @@ ColumnWithTypeAndName HashJoin::joinGet(const Block & block, const Block & block
keys.insert(std::move(key));
}
- static_assert(!MapGetter::flagged,
- "joinGet are not protected from hash table changes between block processing");
+ static_assert(
+ !MapGetter::flagged,
+ "joinGet are not protected from hash table changes between block processing");
std::vector maps_vector;
maps_vector.push_back(&std::get(data->maps[0]));
- HashJoinMethods::joinBlockImpl(*this, keys, block_with_columns_to_add, maps_vector, /* is_join_get = */ true);
+ HashJoinMethods::joinBlockImpl(
+ *this, keys, block_with_columns_to_add, maps_vector, /* is_join_get = */ true);
return keys.getByPosition(keys.columns() - 1);
}
@@ -906,8 +991,7 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{
auto cond_column_name = onexpr.condColumnNames();
JoinCommon::checkTypesOfKeys(
- block, onexpr.key_names_left, cond_column_name.first,
- right_sample_block, onexpr.key_names_right, cond_column_name.second);
+ block, onexpr.key_names_left, cond_column_name.first, right_sample_block, onexpr.key_names_right, cond_column_name.second);
}
if (kind == JoinKind::Cross)
@@ -916,20 +1000,85 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
return;
}
- if (kind == JoinKind::Right || kind == JoinKind::Full)
- {
- materializeBlockInplace(block);
- }
+ materializeColumnsFromLeftBlock(block);
bool prefer_use_maps_all = table_join->getMixedJoinExpression() != nullptr;
{
- std::vectormaps[0])> * > maps_vector;
+ std::vectormaps[0])> *> maps_vector;
for (size_t i = 0; i < table_join->getClauses().size(); ++i)
maps_vector.push_back(&data->maps[i]);
- if (joinDispatch(kind, strictness, maps_vector, prefer_use_maps_all, [&](auto kind_, auto strictness_, auto & maps_vector_)
+ if (joinDispatch(
+ kind,
+ strictness,
+ maps_vector,
+ prefer_use_maps_all,
+ [&](auto kind_, auto strictness_, auto & maps_vector_)
+ {
+ Block remaining_block;
+ if constexpr (std::is_same_v, std::vector>)
+ {
+ remaining_block = HashJoinMethods::joinBlockImpl(
+ *this, block, sample_block_with_columns_to_add, maps_vector_);
+ }
+ else if constexpr (std::is_same_v, std::vector>)
+ {
+ remaining_block = HashJoinMethods::joinBlockImpl(
+ *this, block, sample_block_with_columns_to_add, maps_vector_);
+ }
+ else if constexpr (std::is_same_v, std::vector>)
+ {
+ remaining_block = HashJoinMethods::joinBlockImpl(
+ *this, block, sample_block_with_columns_to_add, maps_vector_);
+ }
+ else
+ {
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown maps type");
+ }
+ if (remaining_block.rows())
+ not_processed = std::make_shared(ExtraBlock{std::move(remaining_block)});
+ else
+ not_processed.reset();
+ }))
+ {
+ /// Joined
+ }
+ else
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong JOIN combination: {} {}", strictness, kind);
+ }
+}
+
+void HashJoin::joinBlock(ScatteredBlock & block, ScatteredBlock & remaining_block)
+{
+ if (!data)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released");
+
+ chassert(kind == JoinKind::Left || kind == JoinKind::Inner);
+
+ for (const auto & onexpr : table_join->getClauses())
+ {
+ auto cond_column_name = onexpr.condColumnNames();
+ JoinCommon::checkTypesOfKeys(
+ block.getSourceBlock(),
+ onexpr.key_names_left,
+ cond_column_name.first,
+ right_sample_block,
+ onexpr.key_names_right,
+ cond_column_name.second);
+ }
+
+ std::vectormaps[0])> *> maps_vector;
+ for (size_t i = 0; i < table_join->getClauses().size(); ++i)
+ maps_vector.push_back(&data->maps[i]);
+
+ bool prefer_use_maps_all = table_join->getMixedJoinExpression() != nullptr;
+ const bool joined = joinDispatch(
+ kind,
+ strictness,
+ maps_vector,
+ prefer_use_maps_all,
+ [&](auto kind_, auto strictness_, auto & maps_vector_)
{
- Block remaining_block;
if constexpr (std::is_same_v, std::vector>)
{
remaining_block = HashJoinMethods::joinBlockImpl(
@@ -949,17 +1098,9 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown maps type");
}
- if (remaining_block.rows())
- not_processed = std::make_shared(ExtraBlock{std::move(remaining_block)});
- else
- not_processed.reset();
- }))
- {
- /// Joined
- }
- else
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong JOIN combination: {} {}", strictness, kind);
- }
+ });
+
+ chassert(joined);
}
HashJoin::~HashJoin()
@@ -1023,10 +1164,7 @@ class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller
{
public:
NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_, bool flag_per_row_)
- : parent(parent_)
- , max_block_size(max_block_size_)
- , flag_per_row(flag_per_row_)
- , current_block_start(0)
+ : parent(parent_), max_block_size(max_block_size_), flag_per_row(flag_per_row_), current_block_start(0)
{
if (parent.data == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released");
@@ -1043,14 +1181,12 @@ public:
}
else
{
- auto fill_callback = [&](auto, auto, auto & map)
- {
- rows_added = fillColumnsFromMap(map, columns_right);
- };
+ auto fill_callback = [&](auto, auto, auto & map) { rows_added = fillColumnsFromMap(map, columns_right); };
bool prefer_use_maps_all = parent.table_join->getMixedJoinExpression() != nullptr;
if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps.front(), prefer_use_maps_all, fill_callback))
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown JOIN strictness '{}' (must be on of: ANY, ALL, ASOF)", parent.strictness);
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR, "Unknown JOIN strictness '{}' (must be on of: ANY, ALL, ASOF)", parent.strictness);
}
if (!flag_per_row)
@@ -1070,14 +1206,14 @@ private:
std::any position;
std::optional nulls_position;
- std::optional used_position;
+ std::optional used_position;
- size_t fillColumnsFromData(const BlocksList & blocks, MutableColumns & columns_right)
+ size_t fillColumnsFromData(const HashJoin::ScatteredBlocksList & blocks, MutableColumns & columns_right)
{
if (!position.has_value())
- position = std::make_any(blocks.begin());
+ position = std::make_any(blocks.begin());
- auto & block_it = std::any_cast(position);
+ auto & block_it = std::any_cast(position);
auto end = blocks.end();
size_t rows_added = 0;
@@ -1113,11 +1249,11 @@ private:
{
switch (parent.data->type)
{
- #define M(TYPE) \
- case HashJoin::Type::TYPE: \
- return fillColumns(*maps.TYPE, columns_keys_and_right);
+#define M(TYPE) \
+ case HashJoin::Type::TYPE: \
+ return fillColumns(*maps.TYPE, columns_keys_and_right);
APPLY_FOR_JOIN_VARIANTS(M)
- #undef M
+#undef M
default:
throw Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Unsupported JOIN keys (type: {})", parent.data->type);
}
@@ -1137,11 +1273,11 @@ private:
for (auto & it = *used_position; it != end && rows_added < max_block_size; ++it)
{
- const Block & mapped_block = *it;
+ const auto & mapped_block = *it;
for (size_t row = 0; row < mapped_block.rows(); ++row)
{
- if (!parent.isUsed(&mapped_block, row))
+ if (!parent.isUsed(&mapped_block.getSourceBlock(), row))
{
for (size_t colnum = 0; colnum < columns_keys_and_right.size(); ++colnum)
{
@@ -1194,10 +1330,10 @@ private:
for (auto & it = *nulls_position; it != end && rows_added < max_block_size; ++it)
{
- const auto * block = it->first;
+ const auto * block = it->block;
ConstNullMapPtr nullmap = nullptr;
- if (it->second)
- nullmap = &assert_cast(*it->second).getData();
+ if (it->column)
+ nullmap = &assert_cast(*it->column).getData();
for (size_t row = 0; row < block->rows(); ++row)
{
@@ -1212,9 +1348,8 @@ private:
}
};
-IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
- const Block & result_sample_block,
- UInt64 max_block_size) const
+IBlocksStreamPtr
+HashJoin::getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const
{
if (!JoinCommon::hasNonJoinedBlocks(*table_join))
return {};
@@ -1227,9 +1362,14 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
size_t expected_columns_count = left_columns_count + required_right_keys.columns() + sample_block_with_columns_to_add.columns();
if (expected_columns_count != result_sample_block.columns())
{
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected number of columns in result sample block: {} instead of {} ({} + {} + {})",
- result_sample_block.columns(), expected_columns_count,
- left_columns_count, required_right_keys.columns(), sample_block_with_columns_to_add.columns());
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Unexpected number of columns in result sample block: {} instead of {} ({} + {} + {})",
+ result_sample_block.columns(),
+ expected_columns_count,
+ left_columns_count,
+ required_right_keys.columns(),
+ sample_block_with_columns_to_add.columns());
}
}
@@ -1250,22 +1390,37 @@ void HashJoin::reuseJoinedData(const HashJoin & join)
bool prefer_use_maps_all = join.table_join->getMixedJoinExpression() != nullptr;
for (auto & map : data->maps)
{
- joinDispatch(kind, strictness, map, prefer_use_maps_all, [this](auto kind_, auto strictness_, auto & map_)
- {
- used_flags->reinit, MapsAll>>(map_.getBufferSizeInCells(data->type) + 1);
- });
+ joinDispatch(
+ kind,
+ strictness,
+ map,
+ prefer_use_maps_all,
+ [this](auto kind_, auto strictness_, auto & map_)
+ {
+ used_flags->reinit, MapsAll>>(
+ map_.getBufferSizeInCells(data->type) + 1);
+ });
}
}
-BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
+BlocksList HashJoin::releaseJoinedBlocks(bool restructure [[maybe_unused]])
{
- LOG_TRACE(log, "{}Join data is being released, {} bytes and {} rows in hash table", instance_log_id, getTotalByteCount(), getTotalRowCount());
+ LOG_TRACE(
+ log, "{}Join data is being released, {} bytes and {} rows in hash table", instance_log_id, getTotalByteCount(), getTotalRowCount());
- BlocksList right_blocks = std::move(data->blocks);
+ auto extract_source_blocks = [](ScatteredBlocksList && blocks)
+ {
+ BlocksList result;
+ for (auto & block : blocks)
+ result.emplace_back(std::move(block).getSourceBlock());
+ return result;
+ };
+
+ ScatteredBlocksList right_blocks = std::move(data->blocks);
if (!restructure)
{
data.reset();
- return right_blocks;
+ return extract_source_blocks(std::move(right_blocks));
}
data->maps.clear();
@@ -1279,7 +1434,7 @@ BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
if (!right_blocks.empty())
{
positions.reserve(right_sample_block.columns());
- const Block & tmp_block = *right_blocks.begin();
+ const Block & tmp_block = right_blocks.begin()->getSourceBlock();
for (const auto & sample_column : right_sample_block)
{
positions.emplace_back(tmp_block.getPositionByName(sample_column.name));
@@ -1287,12 +1442,12 @@ BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
}
}
- for (Block & saved_block : right_blocks)
+ for (ScatteredBlock & saved_block : right_blocks)
{
Block restored_block;
for (size_t i = 0; i < positions.size(); ++i)
{
- auto & column = saved_block.getByPosition(positions[i]);
+ auto & column = saved_block.getSourceBlock().getByPosition(positions[i]);
correctNullabilityInplace(column, is_nullable[i]);
restored_block.insert(column);
}
@@ -1318,7 +1473,8 @@ void HashJoin::validateAdditionalFilterExpression(ExpressionActionsPtr additiona
if (expression_sample_block.columns() != 1)
{
- throw Exception(ErrorCodes::LOGICAL_ERROR,
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
"Unexpected expression in JOIN ON section. Expected single column, got '{}'",
expression_sample_block.dumpStructure());
}
@@ -1326,7 +1482,8 @@ void HashJoin::validateAdditionalFilterExpression(ExpressionActionsPtr additiona
auto type = removeNullable(expression_sample_block.getByPosition(0).type);
if (!type->equals(*std::make_shared()))
{
- throw Exception(ErrorCodes::LOGICAL_ERROR,
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
"Unexpected expression in JOIN ON section. Expected boolean (UInt8), got '{}'. expression:\n{}",
expression_sample_block.getByPosition(0).type->getName(),
additional_filter_expression->dumpActions());
@@ -1334,10 +1491,12 @@ void HashJoin::validateAdditionalFilterExpression(ExpressionActionsPtr additiona
bool is_supported = ((strictness == JoinStrictness::All) && (isInnerOrLeft(kind) || isRightOrFull(kind)))
|| ((strictness == JoinStrictness::Semi || strictness == JoinStrictness::Any || strictness == JoinStrictness::Anti)
- && (isLeft(kind) || isRight(kind))) || (strictness == JoinStrictness::Any && (isInner(kind)));
+ && (isLeft(kind) || isRight(kind)))
+ || (strictness == JoinStrictness::Any && (isInner(kind)));
if (!is_supported)
{
- throw Exception(ErrorCodes::INVALID_JOIN_ON_EXPRESSION,
+ throw Exception(
+ ErrorCodes::INVALID_JOIN_ON_EXPRESSION,
"Non equi condition '{}' from JOIN ON section is supported only for ALL INNER/LEFT/FULL/RIGHT JOINs",
expression_sample_block.getByPosition(0).name);
}
@@ -1353,7 +1512,6 @@ bool HashJoin::isUsed(const Block * block_ptr, size_t row_idx) const
return used_flags->getUsedSafe(block_ptr, row_idx);
}
-
bool HashJoin::needUsedFlagsForPerRightTableRow(std::shared_ptr table_join_) const
{
if (!table_join_->oneDisjunct())
@@ -1372,7 +1530,7 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only left or inner join table can be reranged.");
else
{
- auto merge_rows_into_one_block = [&](BlocksList & blocks, RowRefList & rows_ref)
+ auto merge_rows_into_one_block = [&](ScatteredBlocksList & blocks, RowRefList & rows_ref)
{
auto it = rows_ref.begin();
if (it.ok())
@@ -1384,7 +1542,7 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
{
return;
}
- auto & block = blocks.back();
+ auto & block = blocks.back().getSourceBlock();
size_t start_row = block.rows();
for (; it.ok(); ++it)
{
@@ -1401,23 +1559,22 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
}
};
- auto visit_rows_map = [&](BlocksList & blocks, MapsAll & rows_map)
+ auto visit_rows_map = [&](ScatteredBlocksList & blocks, MapsAll & rows_map)
{
switch (data->type)
{
- #define M(TYPE) \
- case Type::TYPE: \
- {\
- rows_map.TYPE->forEachMapped([&](RowRefList & rows_ref) { merge_rows_into_one_block(blocks, rows_ref); }); \
- break; \
- }
+#define M(TYPE) \
+ case Type::TYPE: { \
+ rows_map.TYPE->forEachMapped([&](RowRefList & rows_ref) { merge_rows_into_one_block(blocks, rows_ref); }); \
+ break; \
+ }
APPLY_FOR_JOIN_VARIANTS(M)
- #undef M
+#undef M
default:
break;
}
};
- BlocksList sorted_blocks;
+ ScatteredBlocksList sorted_blocks;
visit_rows_map(sorted_blocks, map);
doDebugAsserts();
data->blocks.swap(sorted_blocks);
diff --git a/src/Interpreters/HashJoin/HashJoin.h b/src/Interpreters/HashJoin/HashJoin.h
index 8572c5df096..e478bc66b3c 100644
--- a/src/Interpreters/HashJoin/HashJoin.h
+++ b/src/Interpreters/HashJoin/HashJoin.h
@@ -1,9 +1,11 @@
#pragma once
-#include
-#include
-#include
+#include
#include
+#include
+#include
+#include
+#include
#include
#include
@@ -12,22 +14,19 @@
#include
#include
-#include
-#include
-#include
-#include
-#include
-
-#include
#include
-
-#include
-
+#include
#include
-
-#include
+#include
#include
#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
namespace DB
{
@@ -142,13 +141,21 @@ public:
*/
bool addBlockToJoin(const Block & source_block_, bool check_limits) override;
+ /// Called directly from ConcurrentJoin::addBlockToJoin
+ bool addBlockToJoin(ScatteredBlock & source_block_, bool check_limits);
+
void checkTypesOfKeys(const Block & block) const override;
+ using IJoin::joinBlock;
+
/** Join data from the map (that was previously built by calls to addBlockToJoin) to the block with data from "left" table.
* Could be called from different threads in parallel.
*/
void joinBlock(Block & block, ExtraBlockPtr & not_processed) override;
+ /// Called directly from ConcurrentJoin::joinBlock
+ void joinBlock(ScatteredBlock & block, ScatteredBlock & remaining_block);
+
/// Check joinGet arguments and infer the return type.
DataTypePtr joinGetCheckAndGetReturnType(const DataTypes & data_types, const String & column_name, bool or_null) const;
@@ -327,8 +334,17 @@ public:
using MapsVariant = std::variant;
- using RawBlockPtr = const Block *;
- using BlockNullmapList = std::deque>;
+ using RawBlockPtr = const ScatteredBlock *;
+ struct NullMapHolder
+ {
+ size_t allocatedBytes() const { return !column->empty() ? column->allocatedBytes() * block->rows() / column->size() : 0; }
+
+ RawBlockPtr block;
+ ColumnPtr column;
+ };
+ using BlockNullmapList = std::deque;
+
+ using ScatteredBlocksList = std::list;
struct RightTableData
{
@@ -337,7 +353,7 @@ public:
std::vector maps;
Block sample_block; /// Block as it would appear in the BlockList
- BlocksList blocks; /// Blocks of "right" table.
+ ScatteredBlocksList blocks; /// Blocks of "right" table.
BlockNullmapList blocks_nullmaps; /// Nullmaps for blocks of "right" table (if needed)
/// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.
@@ -389,6 +405,9 @@ public:
void setMaxJoinedBlockRows(size_t value) { max_joined_block_rows = value; }
+ void materializeColumnsFromLeftBlock(Block & block) const;
+ Block materializeColumnsFromRightBlock(Block block) const;
+
private:
friend class NotJoinedHash;
@@ -473,5 +492,4 @@ private:
void tryRerangeRightTableDataImpl(Map & map);
void doDebugAsserts() const;
};
-
}
diff --git a/src/Interpreters/HashJoin/HashJoinMethods.h b/src/Interpreters/HashJoin/HashJoinMethods.h
index c5b54a62f36..10fb50a6b83 100644
--- a/src/Interpreters/HashJoin/HashJoinMethods.h
+++ b/src/Interpreters/HashJoin/HashJoinMethods.h
@@ -19,7 +19,7 @@ template
struct Inserter
{
static ALWAYS_INLINE bool
- insertOne(const HashJoin & join, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
+ insertOne(const HashJoin & join, HashMap & map, KeyGetter & key_getter, const Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
@@ -31,7 +31,8 @@ struct Inserter
return false;
}
- static ALWAYS_INLINE void insertAll(const HashJoin &, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
+ static ALWAYS_INLINE void
+ insertAll(const HashJoin &, HashMap & map, KeyGetter & key_getter, const Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
@@ -45,7 +46,13 @@ struct Inserter
}
static ALWAYS_INLINE void insertAsof(
- HashJoin & join, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn & asof_column)
+ HashJoin & join,
+ HashMap & map,
+ KeyGetter & key_getter,
+ const Block * stored_block,
+ size_t i,
+ Arena & pool,
+ const IColumn & asof_column)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
typename HashMap::mapped_type * time_series_map = &emplace_result.getMapped();
@@ -66,10 +73,10 @@ public:
HashJoin & join,
HashJoin::Type type,
MapsTemplate & maps,
- size_t rows,
const ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
- Block * stored_block,
+ const Block * stored_block,
+ const ScatteredBlock::Selector & selector,
ConstNullMapPtr null_map,
UInt8ColumnDataPtr join_mask,
Arena & pool,
@@ -83,14 +90,30 @@ public:
const Block & block_with_columns_to_add,
const MapsTemplateVector & maps_,
bool is_join_get = false);
+
+ static ScatteredBlock joinBlockImpl(
+ const HashJoin & join,
+ ScatteredBlock & block,
+ const Block & block_with_columns_to_add,
+ const MapsTemplateVector & maps_,
+ bool is_join_get = false);
+
private:
template
static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes);
- template
+ template
static size_t insertFromBlockImplTypeCase(
- HashJoin & join, HashMap & map, size_t rows, const ColumnRawPtrs & key_columns,
- const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool, bool & is_inserted);
+ HashJoin & join,
+ HashMap & map,
+ const ColumnRawPtrs & key_columns,
+ const Sizes & key_sizes,
+ const Block * stored_block,
+ const Selector & selector,
+ ConstNullMapPtr null_map,
+ UInt8ColumnDataPtr join_mask,
+ Arena & pool,
+ bool & is_inserted);
template
static size_t switchJoinRightColumns(
@@ -115,12 +138,13 @@ private:
/// Joins right table columns which indexes are present in right_indexes using specified map.
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
- template
+ template
static size_t joinRightColumns(
std::vector && key_getter_vector,
const std::vector & mapv,
AddedColumns & added_columns,
- JoinStuff::JoinUsedFlags & used_flags);
+ JoinStuff::JoinUsedFlags & used_flags,
+ const Selector & selector);
template
static void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unused]]);
diff --git a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h
index 45a766e2df6..5cfb5d469e1 100644
--- a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h
+++ b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h
@@ -1,5 +1,8 @@
#pragma once
+#include
#include
+#include "Columns/IColumn.h"
+#include "Interpreters/HashJoin/ScatteredBlock.h"
namespace DB
{
@@ -13,10 +16,10 @@ size_t HashJoinMethods::insertFromBlockImpl(
HashJoin & join,
HashJoin::Type type,
MapsTemplate & maps,
- size_t rows,
const ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
- Block * stored_block,
+ const Block * stored_block,
+ const ScatteredBlock::Selector & selector,
ConstNullMapPtr null_map,
UInt8ColumnDataPtr join_mask,
Arena & pool,
@@ -33,9 +36,14 @@ size_t HashJoinMethods::insertFromBlockImpl(
#define M(TYPE) \
case HashJoin::Type::TYPE: \
- return insertFromBlockImplTypeCase< \
- typename KeyGetterForType>::Type>( \
- join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool, is_inserted); \
+ if (selector.isContinuousRange()) \
+ return insertFromBlockImplTypeCase< \
+ typename KeyGetterForType>::Type>( \
+ join, *maps.TYPE, key_columns, key_sizes, stored_block, selector.getRange(), null_map, join_mask, pool, is_inserted); \
+ else \
+ return insertFromBlockImplTypeCase< \
+ typename KeyGetterForType>::Type>( \
+ join, *maps.TYPE, key_columns, key_sizes, stored_block, selector.getIndexes(), null_map, join_mask, pool, is_inserted); \
break;
APPLY_FOR_JOIN_VARIANTS(M)
@@ -46,6 +54,22 @@ size_t HashJoinMethods::insertFromBlockImpl(
template
Block HashJoinMethods::joinBlockImpl(
const HashJoin & join, Block & block, const Block & block_with_columns_to_add, const MapsTemplateVector & maps_, bool is_join_get)
+{
+ ScatteredBlock scattered_block{block};
+ auto ret = joinBlockImpl(join, scattered_block, block_with_columns_to_add, maps_, is_join_get);
+ ret.filterBySelector();
+ scattered_block.filterBySelector();
+ block = std::move(scattered_block.getSourceBlock());
+ return ret.getSourceBlock();
+}
+
+template
+ScatteredBlock HashJoinMethods::joinBlockImpl(
+ const HashJoin & join,
+ ScatteredBlock & block,
+ const Block & block_with_columns_to_add,
+ const MapsTemplateVector & maps_,
+ bool is_join_get)
{
constexpr JoinFeatures join_features;
@@ -56,16 +80,8 @@ Block HashJoinMethods::joinBlockImpl(
const auto & key_names = !is_join_get ? onexprs[i].key_names_left : onexprs[i].key_names_right;
join_on_keys.emplace_back(block, key_names, onexprs[i].condColumnNames().first, join.key_sizes[i]);
}
- size_t existing_columns = block.columns();
-
- /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized.
- * Because if they are constants, then in the "not joined" rows, they may have different values
- * - default values, which can differ from the values of these constants.
- */
- if constexpr (join_features.right || join_features.full)
- {
- materializeBlockInplace(block);
- }
+ auto & source_block = block.getSourceBlock();
+ size_t existing_columns = source_block.columns();
/** For LEFT/INNER JOIN, the saved blocks do not contain keys.
* For FULL/RIGHT JOIN, the saved blocks contain keys;
@@ -90,26 +106,28 @@ Block HashJoinMethods::joinBlockImpl(
else
added_columns.reserve(join_features.need_replication);
- size_t num_joined = switchJoinRightColumns(maps_, added_columns, join.data->type, *join.used_flags);
+ const size_t num_joined = switchJoinRightColumns(maps_, added_columns, join.data->type, *join.used_flags);
/// Do not hold memory for join_on_keys anymore
added_columns.join_on_keys.clear();
- Block remaining_block = sliceBlock(block, num_joined);
+ auto remaining_block = block.cut(num_joined);
if (is_join_get)
added_columns.buildJoinGetOutput();
else
added_columns.buildOutput();
+
+ if constexpr (join_features.need_filter)
+ block.filter(added_columns.filter);
+
+ block.filterBySelector();
+
for (size_t i = 0; i < added_columns.size(); ++i)
- block.insert(added_columns.moveColumn(i));
+ source_block.insert(added_columns.moveColumn(i));
std::vector right_keys_to_replicate [[maybe_unused]];
if constexpr (join_features.need_filter)
{
- /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones.
- for (size_t i = 0; i < existing_columns; ++i)
- block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(added_columns.filter, -1);
-
/// Add join key columns from right block if needed using value from left table because of equality
for (size_t i = 0; i < join.required_right_keys.columns(); ++i)
{
@@ -121,7 +139,7 @@ Block HashJoinMethods::joinBlockImpl(
const auto & left_column = block.getByName(join.required_right_keys_sources[i]);
const auto & right_col_name = join.getTableJoin().renamedRightColumnName(right_key.name);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column);
- block.insert(std::move(right_col));
+ source_block.insert(std::move(right_col));
}
}
else if (has_required_right_keys)
@@ -137,28 +155,28 @@ Block HashJoinMethods::joinBlockImpl(
const auto & left_column = block.getByName(join.required_right_keys_sources[i]);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column, &added_columns.filter);
- block.insert(std::move(right_col));
+ source_block.insert(std::move(right_col));
if constexpr (join_features.need_replication)
- right_keys_to_replicate.push_back(block.getPositionByName(right_col_name));
+ right_keys_to_replicate.push_back(source_block.getPositionByName(right_col_name));
}
}
if constexpr (join_features.need_replication)
{
- std::unique_ptr & offsets_to_replicate = added_columns.offsets_to_replicate;
+ IColumn::Offsets & offsets = *added_columns.offsets_to_replicate;
- /// If ALL ... JOIN - we replicate all the columns except the new ones.
+ chassert(block);
+ chassert(offsets.size() == block.rows());
+
+ auto && columns = block.getSourceBlock().getColumns();
for (size_t i = 0; i < existing_columns; ++i)
- {
- block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicate(*offsets_to_replicate);
- }
-
- /// Replicate additional right keys
+ columns[i] = columns[i]->replicate(offsets);
for (size_t pos : right_keys_to_replicate)
- {
- block.safeGetByPosition(pos).column = block.safeGetByPosition(pos).column->replicate(*offsets_to_replicate);
- }
+ columns[pos] = columns[pos]->replicate(offsets);
+
+ block.getSourceBlock().setColumns(columns);
+ block = ScatteredBlock(std::move(block).getSourceBlock());
}
return remaining_block;
}
@@ -180,14 +198,14 @@ KeyGetter HashJoinMethods::createKeyGetter(const
}
template
-template
+template
size_t HashJoinMethods::insertFromBlockImplTypeCase(
HashJoin & join,
HashMap & map,
- size_t rows,
const ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
- Block * stored_block,
+ const Block * stored_block,
+ const Selector & selector,
ConstNullMapPtr null_map,
UInt8ColumnDataPtr join_mask,
Arena & pool,
@@ -205,9 +223,22 @@ size_t HashJoinMethods::insertFromBlockImplTypeC
/// For ALL and ASOF join always insert values
is_inserted = !mapped_one || is_asof_join;
+ size_t rows = 0;
+ if constexpr (std::is_same_v, ScatteredBlock::Indexes>)
+ rows = selector.getData().size();
+ else
+ rows = selector.second - selector.first;
+
for (size_t i = 0; i < rows; ++i)
{
- if (null_map && (*null_map)[i])
+ size_t ind = 0;
+ if constexpr (std::is_same_v, ScatteredBlock::Indexes>)
+ ind = selector.getData()[i];
+ else
+ ind = selector.first + i;
+
+ chassert(!null_map || ind < null_map->size());
+ if (null_map && (*null_map)[ind])
{
/// nulls are not inserted into hash table,
/// keep them for RIGHT and FULL joins
@@ -216,15 +247,16 @@ size_t HashJoinMethods::insertFromBlockImplTypeC
}
/// Check condition for right table from ON section
- if (join_mask && !(*join_mask)[i])
+ chassert(!join_mask || ind < join_mask->size());
+ if (join_mask && !(*join_mask)[ind])
continue;
if constexpr (is_asof_join)
- Inserter::insertAsof(join, map, key_getter, stored_block, i, pool, *asof_column);
+ Inserter::insertAsof(join, map, key_getter, stored_block, ind, pool, *asof_column);
else if constexpr (mapped_one)
- is_inserted |= Inserter::insertOne(join, map, key_getter, stored_block, i, pool);
+ is_inserted |= Inserter::insertOne(join, map, key_getter, stored_block, ind, pool);
else
- Inserter::insertAll(join, map, key_getter, stored_block, i, pool);
+ Inserter::insertAll(join, map, key_getter, stored_block, ind, pool);
}
return map.getBufferSizeInCells();
}
@@ -318,26 +350,43 @@ size_t HashJoinMethods::joinRightColumnsSwitchMu
if (added_columns.additional_filter_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Additional filter expression is not supported for this JOIN");
- return mapv.size() > 1 ? joinRightColumns(
- std::forward>(key_getter_vector), mapv, added_columns, used_flags)
- : joinRightColumns(
- std::forward>(key_getter_vector), mapv, added_columns, used_flags);
+ auto & block = added_columns.src_block;
+ if (block.getSelector().isContinuousRange())
+ {
+ if (mapv.size() > 1)
+ return joinRightColumns(
+ std::move(key_getter_vector), mapv, added_columns, used_flags, block.getSelector().getRange());
+ else
+ return joinRightColumns(
+ std::move(key_getter_vector), mapv, added_columns, used_flags, block.getSelector().getRange());
+ }
+ else
+ {
+ if (mapv.size() > 1)
+ return joinRightColumns(
+ std::move(key_getter_vector), mapv, added_columns, used_flags, block.getSelector().getIndexes());
+ else
+ return joinRightColumns(
+ std::move(key_getter_vector), mapv, added_columns, used_flags, block.getSelector().getIndexes());
+ }
}
/// Joins right table columns which indexes are present in right_indexes using specified map.
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template
-template
+template
size_t HashJoinMethods::joinRightColumns(
std::vector && key_getter_vector,
const std::vector & mapv,
AddedColumns & added_columns,
- JoinStuff::JoinUsedFlags & used_flags)
+ JoinStuff::JoinUsedFlags & used_flags,
+ const Selector & selector)
{
constexpr JoinFeatures join_features;
- size_t rows = added_columns.rows_to_add;
+ auto & block = added_columns.src_block;
+ size_t rows = block.rows();
if constexpr (need_filter)
added_columns.filter = IColumn::Filter(rows, 0);
if constexpr (!flag_per_row && (STRICTNESS == JoinStrictness::All || (STRICTNESS == JoinStrictness::Semi && KIND == JoinKind::Right)))
@@ -353,6 +402,12 @@ size_t HashJoinMethods::joinRightColumns(
size_t i = 0;
for (; i < rows; ++i)
{
+ size_t ind = 0;
+ if constexpr (std::is_same_v, ScatteredBlock::Indexes>)
+ ind = selector.getData()[i];
+ else
+ ind = selector.first + i;
+
if constexpr (join_features.need_replication)
{
if (unlikely(current_offset >= max_joined_block_rows))
@@ -368,12 +423,12 @@ size_t HashJoinMethods::joinRightColumns(
for (size_t onexpr_idx = 0; onexpr_idx < added_columns.join_on_keys.size(); ++onexpr_idx)
{
const auto & join_keys = added_columns.join_on_keys[onexpr_idx];
- if (join_keys.null_map && (*join_keys.null_map)[i])
+ if (join_keys.null_map && (*join_keys.null_map)[ind])
continue;
- bool row_acceptable = !join_keys.isRowFiltered(i);
+ bool row_acceptable = !join_keys.isRowFiltered(ind);
using FindResult = typename KeyGetter::FindResult;
- auto find_result = row_acceptable ? key_getter_vector[onexpr_idx].findKey(*(mapv[onexpr_idx]), i, pool) : FindResult();
+ auto find_result = row_acceptable ? key_getter_vector[onexpr_idx].findKey(*(mapv[onexpr_idx]), ind, pool) : FindResult();
if (find_result.isFound())
{
@@ -383,7 +438,7 @@ size_t HashJoinMethods::joinRightColumns(
{
const IColumn & left_asof_key = added_columns.leftAsofKey();
- auto row_ref = mapped->findAsof(left_asof_key, i);
+ auto row_ref = mapped->findAsof(left_asof_key, ind);
if (row_ref && row_ref->block)
{
setUsed(added_columns.filter, i);
@@ -848,23 +903,6 @@ size_t HashJoinMethods::joinRightColumnsWithAddt
return left_row_iter;
}
-template
-Block HashJoinMethods::sliceBlock(Block & block, size_t num_rows)
-{
- size_t total_rows = block.rows();
- if (num_rows >= total_rows)
- return {};
- size_t remaining_rows = total_rows - num_rows;
- Block remaining_block = block.cloneEmpty();
- for (size_t i = 0; i < block.columns(); ++i)
- {
- auto & col = block.getByPosition(i);
- remaining_block.getByPosition(i).column = col.column->cut(num_rows, remaining_rows);
- col.column = col.column->cut(0, num_rows);
- }
- return remaining_block;
-}
-
template
ColumnWithTypeAndName HashJoinMethods