diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index 950e672272a..3e0131a388a 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -85,4 +85,4 @@ At a minimum, the following information should be added (but add more as needed)
- [ ] 3
- [ ] 4
-
+
diff --git a/.github/workflows/backport_branches.yml b/.github/workflows/backport_branches.yml
index 2a98722414b..b0380b939bb 100644
--- a/.github/workflows/backport_branches.yml
+++ b/.github/workflows/backport_branches.yml
@@ -9,6 +9,12 @@ on: # yamllint disable-line rule:truthy
push:
branches:
- 'backport/**'
+
+# Cancel the previous wf run in PRs.
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: true
+
jobs:
RunConfig:
runs-on: [self-hosted, style-checker-aarch64]
diff --git a/.github/workflows/cancel.yml b/.github/workflows/cancel.yml
deleted file mode 100644
index 3c2be767ad2..00000000000
--- a/.github/workflows/cancel.yml
+++ /dev/null
@@ -1,19 +0,0 @@
-name: Cancel
-
-env:
- # Force the stdout and stderr streams to be unbuffered
- PYTHONUNBUFFERED: 1
-
-on: # yamllint disable-line rule:truthy
- workflow_run:
- workflows: ["PullRequestCI", "ReleaseBranchCI", "DocsCheck", "BackportPR"]
- types:
- - requested
-jobs:
- cancel:
- runs-on: [self-hosted, style-checker]
- steps:
- - uses: styfle/cancel-workflow-action@0.9.1
- with:
- all_but_latest: true
- workflow_id: ${{ github.event.workflow.id }}
diff --git a/.github/workflows/debug.yml b/.github/workflows/debug.yml
deleted file mode 100644
index 5abed268ecd..00000000000
--- a/.github/workflows/debug.yml
+++ /dev/null
@@ -1,11 +0,0 @@
-# The CI for each commit, prints envs and content of GITHUB_EVENT_PATH
-name: Debug
-
-'on':
- [push, pull_request, pull_request_review, release, workflow_dispatch, workflow_call]
-
-jobs:
- DebugInfo:
- runs-on: ubuntu-latest
- steps:
- - uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml
index 515236bb826..3e1c5576e7d 100644
--- a/.github/workflows/nightly.yml
+++ b/.github/workflows/nightly.yml
@@ -10,14 +10,13 @@ env:
workflow_dispatch:
jobs:
- Debug:
- # The task for having a preserved ENV and event.json for later investigation
- uses: ./.github/workflows/debug.yml
RunConfig:
runs-on: [self-hosted, style-checker-aarch64]
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
+ - name: DebugInfo
+ uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml
index a6e369ae0e6..9f16e32707e 100644
--- a/.github/workflows/pull_request.yml
+++ b/.github/workflows/pull_request.yml
@@ -14,6 +14,11 @@ on: # yamllint disable-line rule:truthy
branches:
- master
+# Cancel the previous wf run in PRs.
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: true
+
jobs:
RunConfig:
runs-on: [self-hosted, style-checker-aarch64]
diff --git a/.github/workflows/pull_request_approved.yml b/.github/workflows/pull_request_approved.yml
deleted file mode 100644
index 3de4978ad68..00000000000
--- a/.github/workflows/pull_request_approved.yml
+++ /dev/null
@@ -1,23 +0,0 @@
-name: PullRequestApprovedCI
-
-env:
- # Force the stdout and stderr streams to be unbuffered
- PYTHONUNBUFFERED: 1
-
-on: # yamllint disable-line rule:truthy
- pull_request_review:
- types:
- - submitted
-
-jobs:
- MergeOnApproval:
- runs-on: [self-hosted, style-checker]
- steps:
- - name: Check out repository code
- uses: ClickHouse/checkout@v1
- with:
- clear-repository: true
- - name: Merge approved PR
- run: |
- cd "$GITHUB_WORKSPACE/tests/ci"
- python3 merge_pr.py --check-approved
diff --git a/docs/en/interfaces/formats.md b/docs/en/interfaces/formats.md
index d25e18bd397..a137eb2bdf2 100644
--- a/docs/en/interfaces/formats.md
+++ b/docs/en/interfaces/formats.md
@@ -75,7 +75,7 @@ The supported formats are:
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
| [ORC](#data-format-orc) | ✔ | ✔ |
| [One](#data-format-one) | ✔ | ✗ |
-| [Npy](#data-format-npy) | ✔ | ✗ |
+| [Npy](#data-format-npy) | ✔ | ✔ |
| [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNames](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
@@ -2466,23 +2466,22 @@ Result:
## Npy {#data-format-npy}
-This function is designed to load a NumPy array from a .npy file into ClickHouse. The NumPy file format is a binary format used for efficiently storing arrays of numerical data. During import, ClickHouse treats top level dimension as an array of rows with single column. Supported Npy data types and their corresponding type in ClickHouse:
-| Npy type | ClickHouse type |
-|:--------:|:---------------:|
-| b1 | UInt8 |
-| i1 | Int8 |
-| i2 | Int16 |
-| i4 | Int32 |
-| i8 | Int64 |
-| u1 | UInt8 |
-| u2 | UInt16 |
-| u4 | UInt32 |
-| u8 | UInt64 |
-| f2 | Float32 |
-| f4 | Float32 |
-| f8 | Float64 |
-| S | String |
-| U | String |
+This function is designed to load a NumPy array from a .npy file into ClickHouse. The NumPy file format is a binary format used for efficiently storing arrays of numerical data. During import, ClickHouse treats top level dimension as an array of rows with single column. Supported Npy data types and their corresponding type in ClickHouse:
+
+| Npy data type (`INSERT`) | ClickHouse data type | Npy data type (`SELECT`) |
+|--------------------------|-----------------------------------------------------------------|--------------------------|
+| `i1` | [Int8](/docs/en/sql-reference/data-types/int-uint.md) | `i1` |
+| `i2` | [Int16](/docs/en/sql-reference/data-types/int-uint.md) | `i2` |
+| `i4` | [Int32](/docs/en/sql-reference/data-types/int-uint.md) | `i4` |
+| `i8` | [Int64](/docs/en/sql-reference/data-types/int-uint.md) | `i8` |
+| `u1`, `b1` | [UInt8](/docs/en/sql-reference/data-types/int-uint.md) | `u1` |
+| `u2` | [UInt16](/docs/en/sql-reference/data-types/int-uint.md) | `u2` |
+| `u4` | [UInt32](/docs/en/sql-reference/data-types/int-uint.md) | `u4` |
+| `u8` | [UInt64](/docs/en/sql-reference/data-types/int-uint.md) | `u8` |
+| `f2`, `f4` | [Float32](/docs/en/sql-reference/data-types/float.md) | `f4` |
+| `f8` | [Float64](/docs/en/sql-reference/data-types/float.md) | `f8` |
+| `S`, `U` | [String](/docs/en/sql-reference/data-types/string.md) | `S` |
+| | [FixedString](/docs/en/sql-reference/data-types/fixedstring.md) | `S` |
**Example of saving an array in .npy format using Python**
@@ -2509,6 +2508,14 @@ Result:
└───────────────┘
```
+**Selecting Data**
+
+You can select data from a ClickHouse table and save them into some file in the Npy format by the following command:
+
+```bash
+$ clickhouse-client --query="SELECT {column} FROM {some_table} FORMAT Npy" > {filename.npy}
+```
+
## LineAsString {#lineasstring}
In this format, every line of input data is interpreted as a single string value. This format can only be parsed for table with a single field of type [String](/docs/en/sql-reference/data-types/string.md). The remaining columns must be set to [DEFAULT](/docs/en/sql-reference/statements/create/table.md/#default) or [MATERIALIZED](/docs/en/sql-reference/statements/create/table.md/#materialized), or omitted.
diff --git a/docs/en/sql-reference/data-types/map.md b/docs/en/sql-reference/data-types/map.md
index 9d495126d28..18c7816f811 100644
--- a/docs/en/sql-reference/data-types/map.md
+++ b/docs/en/sql-reference/data-types/map.md
@@ -7,6 +7,7 @@ sidebar_label: Map(K, V)
# Map(K, V)
`Map(K, V)` data type stores `key:value` pairs.
+The Map datatype is implemented as `Array(Tuple(key T1, value T2))`, which means that the order of keys in each map does not change, i.e., this data type maintains insertion order.
**Parameters**
diff --git a/docs/en/sql-reference/functions/conditional-functions.md b/docs/en/sql-reference/functions/conditional-functions.md
index eb4e98961f1..564186fd8db 100644
--- a/docs/en/sql-reference/functions/conditional-functions.md
+++ b/docs/en/sql-reference/functions/conditional-functions.md
@@ -234,3 +234,34 @@ SELECT least(toDateTime32(now() + toIntervalDay(1)), toDateTime64(now(), 3))
:::note
The type returned is a DateTime64 as the DataTime32 must be promoted to 64 bit for the comparison.
:::
+
+## clamp
+
+Constrain the return value between A and B.
+
+**Syntax**
+
+``` sql
+clamp(value, min, max)
+```
+
+**Arguments**
+
+- `value` – Input value.
+- `min` – Limit the lower bound.
+- `max` – Limit the upper bound.
+
+**Returned values**
+
+If the value is less than the minimum value, return the minimum value; if it is greater than the maximum value, return the maximum value; otherwise, return the current value.
+
+Examples:
+
+```sql
+SELECT clamp(1, 2, 3) result, toTypeName(result) type;
+```
+```response
+┌─result─┬─type────┐
+│ 2 │ Float64 │
+└────────┴─────────┘
+```
\ No newline at end of file
diff --git a/docs/en/sql-reference/statements/select/join.md b/docs/en/sql-reference/statements/select/join.md
index 4ef407a4d13..34c6016235a 100644
--- a/docs/en/sql-reference/statements/select/join.md
+++ b/docs/en/sql-reference/statements/select/join.md
@@ -151,6 +151,14 @@ Result:
Query with `INNER` type of a join and conditions with `OR` and `AND`:
+:::note
+
+By default, non-equal conditions are supported as long as they use columns from the same table.
+For example, `t1.a = t2.key AND t1.b > 0 AND t2.b > t2.c`, because `t1.b > 0` uses columns only from `t1` and `t2.b > t2.c` uses columns only from `t2`.
+However, you can try experimental support for conditions like `t1.a = t2.key AND t1.b > t2.key`, check out section below for more details.
+
+:::
+
``` sql
SELECT a, b, val FROM t1 INNER JOIN t2 ON t1.a = t2.key OR t1.b = t2.key AND t2.val > 3;
```
@@ -165,7 +173,7 @@ Result:
└───┴────┴─────┘
```
-## [experimental] Join with inequality conditions
+## [experimental] Join with inequality conditions for columns from different tables
:::note
This feature is experimental. To use it, set `allow_experimental_join_condition` to 1 in your configuration files or by using the `SET` command:
diff --git a/src/Analyzer/FunctionNode.h b/src/Analyzer/FunctionNode.h
index 8d14b7eeb0d..8abffcfc8ee 100644
--- a/src/Analyzer/FunctionNode.h
+++ b/src/Analyzer/FunctionNode.h
@@ -201,8 +201,11 @@ public:
void convertToNullable() override
{
- chassert(kind == FunctionKind::ORDINARY);
- wrap_with_nullable = true;
+ /// Ignore other function kinds.
+ /// We might try to convert aggregate/window function for invalid query
+ /// before the validation happened.
+ if (kind == FunctionKind::ORDINARY)
+ wrap_with_nullable = true;
}
void dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const override;
diff --git a/src/Analyzer/Passes/QueryAnalysisPass.cpp b/src/Analyzer/Passes/QueryAnalysisPass.cpp
index 0d2cd5c5537..d6e1ceb243a 100644
--- a/src/Analyzer/Passes/QueryAnalysisPass.cpp
+++ b/src/Analyzer/Passes/QueryAnalysisPass.cpp
@@ -475,7 +475,7 @@ struct TableExpressionData
class ExpressionsStack
{
public:
- void pushNode(const QueryTreeNodePtr & node)
+ void push(const QueryTreeNodePtr & node)
{
if (node->hasAlias())
{
@@ -492,7 +492,7 @@ public:
expressions.emplace_back(node);
}
- void popNode()
+ void pop()
{
const auto & top_expression = expressions.back();
const auto & top_expression_alias = top_expression->getAlias();
@@ -730,6 +730,8 @@ struct IdentifierResolveScope
join_use_nulls = context->getSettingsRef().join_use_nulls;
else if (parent_scope)
join_use_nulls = parent_scope->join_use_nulls;
+
+ alias_name_to_expression_node = &alias_name_to_expression_node_before_group_by;
}
QueryTreeNodePtr scope_node;
@@ -745,7 +747,10 @@ struct IdentifierResolveScope
std::unordered_map expression_argument_name_to_node;
/// Alias name to query expression node
- std::unordered_map alias_name_to_expression_node;
+ std::unordered_map alias_name_to_expression_node_before_group_by;
+ std::unordered_map alias_name_to_expression_node_after_group_by;
+
+ std::unordered_map * alias_name_to_expression_node = nullptr;
/// Alias name to lambda node
std::unordered_map alias_name_to_lambda_node;
@@ -878,6 +883,22 @@ struct IdentifierResolveScope
return it->second;
}
+ void pushExpressionNode(const QueryTreeNodePtr & node)
+ {
+ bool had_aggregate_function = expressions_in_resolve_process_stack.hasAggregateFunction();
+ expressions_in_resolve_process_stack.push(node);
+ if (group_by_use_nulls && had_aggregate_function != expressions_in_resolve_process_stack.hasAggregateFunction())
+ alias_name_to_expression_node = &alias_name_to_expression_node_before_group_by;
+ }
+
+ void popExpressionNode()
+ {
+ bool had_aggregate_function = expressions_in_resolve_process_stack.hasAggregateFunction();
+ expressions_in_resolve_process_stack.pop();
+ if (group_by_use_nulls && had_aggregate_function != expressions_in_resolve_process_stack.hasAggregateFunction())
+ alias_name_to_expression_node = &alias_name_to_expression_node_after_group_by;
+ }
+
/// Dump identifier resolve scope
[[maybe_unused]] void dump(WriteBuffer & buffer) const
{
@@ -894,8 +915,8 @@ struct IdentifierResolveScope
for (const auto & [alias_name, node] : expression_argument_name_to_node)
buffer << "Alias name " << alias_name << " node " << node->formatASTForErrorMessage() << '\n';
- buffer << "Alias name to expression node table size " << alias_name_to_expression_node.size() << '\n';
- for (const auto & [alias_name, node] : alias_name_to_expression_node)
+ buffer << "Alias name to expression node table size " << alias_name_to_expression_node->size() << '\n';
+ for (const auto & [alias_name, node] : *alias_name_to_expression_node)
buffer << "Alias name " << alias_name << " expression node " << node->dumpTree() << '\n';
buffer << "Alias name to function node table size " << alias_name_to_lambda_node.size() << '\n';
@@ -1023,7 +1044,7 @@ private:
if (is_lambda_node)
{
- if (scope.alias_name_to_expression_node.contains(alias))
+ if (scope.alias_name_to_expression_node->contains(alias))
scope.nodes_with_duplicated_aliases.insert(node->clone());
auto [_, inserted] = scope.alias_name_to_lambda_node.insert(std::make_pair(alias, node));
@@ -1036,7 +1057,7 @@ private:
if (scope.alias_name_to_lambda_node.contains(alias))
scope.nodes_with_duplicated_aliases.insert(node->clone());
- auto [_, inserted] = scope.alias_name_to_expression_node.insert(std::make_pair(alias, node));
+ auto [_, inserted] = scope.alias_name_to_expression_node->insert(std::make_pair(alias, node));
if (!inserted)
scope.nodes_with_duplicated_aliases.insert(node->clone());
@@ -1838,7 +1859,7 @@ void QueryAnalyzer::collectScopeValidIdentifiersForTypoCorrection(
if (allow_expression_identifiers)
{
- for (const auto & [name, expression] : scope.alias_name_to_expression_node)
+ for (const auto & [name, expression] : *scope.alias_name_to_expression_node)
{
assert(expression);
auto expression_identifier = Identifier(name);
@@ -1868,7 +1889,7 @@ void QueryAnalyzer::collectScopeValidIdentifiersForTypoCorrection(
{
if (allow_function_identifiers)
{
- for (const auto & [name, _] : scope.alias_name_to_expression_node)
+ for (const auto & [name, _] : *scope.alias_name_to_expression_node)
valid_identifiers_result.insert(Identifier(name));
}
@@ -2768,7 +2789,7 @@ bool QueryAnalyzer::tryBindIdentifierToAliases(const IdentifierLookup & identifi
auto get_alias_name_to_node_map = [&]() -> const std::unordered_map &
{
if (identifier_lookup.isExpressionLookup())
- return scope.alias_name_to_expression_node;
+ return *scope.alias_name_to_expression_node;
else if (identifier_lookup.isFunctionLookup())
return scope.alias_name_to_lambda_node;
@@ -2830,7 +2851,7 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromAliases(const Identifier
auto get_alias_name_to_node_map = [&]() -> std::unordered_map &
{
if (identifier_lookup.isExpressionLookup())
- return scope.alias_name_to_expression_node;
+ return *scope.alias_name_to_expression_node;
else if (identifier_lookup.isFunctionLookup())
return scope.alias_name_to_lambda_node;
@@ -2868,7 +2889,7 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromAliases(const Identifier
/// Resolve expression if necessary
if (node_type == QueryTreeNodeType::IDENTIFIER)
{
- scope.expressions_in_resolve_process_stack.pushNode(it->second);
+ scope.pushExpressionNode(it->second);
auto & alias_identifier_node = it->second->as();
auto identifier = alias_identifier_node.getIdentifier();
@@ -2899,9 +2920,9 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromAliases(const Identifier
if (identifier_lookup.isExpressionLookup())
scope.alias_name_to_lambda_node.erase(identifier_bind_part);
else if (identifier_lookup.isFunctionLookup())
- scope.alias_name_to_expression_node.erase(identifier_bind_part);
+ scope.alias_name_to_expression_node->erase(identifier_bind_part);
- scope.expressions_in_resolve_process_stack.popNode();
+ scope.popExpressionNode();
}
else if (node_type == QueryTreeNodeType::FUNCTION)
{
@@ -4098,8 +4119,8 @@ IdentifierResolveResult QueryAnalyzer::tryResolveIdentifier(const IdentifierLook
* SELECT id FROM ( SELECT ... ) AS subquery ARRAY JOIN [0] AS id INNER JOIN second_table USING (id)
* In the example, identifier `id` should be resolved into one from USING (id) column.
*/
- auto alias_it = scope.alias_name_to_expression_node.find(identifier_lookup.identifier.getFullName());
- if (alias_it != scope.alias_name_to_expression_node.end() && alias_it->second->getNodeType() == QueryTreeNodeType::COLUMN)
+ auto alias_it = scope.alias_name_to_expression_node->find(identifier_lookup.identifier.getFullName());
+ if (alias_it != scope.alias_name_to_expression_node->end() && alias_it->second->getNodeType() == QueryTreeNodeType::COLUMN)
{
const auto & column_node = alias_it->second->as();
if (column_node.getColumnSource()->getNodeType() == QueryTreeNodeType::ARRAY_JOIN)
@@ -4814,6 +4835,19 @@ ProjectionNames QueryAnalyzer::resolveMatcher(QueryTreeNodePtr & matcher_node, I
}
}
+ if (!scope.expressions_in_resolve_process_stack.hasAggregateFunction())
+ {
+ for (auto & [node, _] : matched_expression_nodes_with_names)
+ {
+ auto it = scope.nullable_group_by_keys.find(node);
+ if (it != scope.nullable_group_by_keys.end())
+ {
+ node = it->node->clone();
+ node->convertToNullable();
+ }
+ }
+ }
+
std::unordered_map> strict_transformer_to_used_column_names;
for (const auto & transformer : matcher_node_typed.getColumnTransformers().getNodes())
{
@@ -5007,7 +5041,10 @@ ProjectionNames QueryAnalyzer::resolveMatcher(QueryTreeNodePtr & matcher_node, I
scope.scope_node->formatASTForErrorMessage());
}
+ auto original_ast = matcher_node->getOriginalAST();
matcher_node = std::move(list);
+ if (original_ast)
+ matcher_node->setOriginalAST(original_ast);
return result_projection_names;
}
@@ -5203,10 +5240,14 @@ ProjectionNames QueryAnalyzer::resolveLambda(const QueryTreeNodePtr & lambda_nod
for (size_t i = 0; i < lambda_arguments_nodes_size; ++i)
{
auto & lambda_argument_node = lambda_arguments_nodes[i];
- auto & lambda_argument_node_typed = lambda_argument_node->as();
- const auto & lambda_argument_name = lambda_argument_node_typed.getIdentifier().getFullName();
+ const auto * lambda_argument_identifier = lambda_argument_node->as();
+ const auto * lambda_argument_column = lambda_argument_node->as();
+ if (!lambda_argument_identifier && !lambda_argument_column)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected IDENTIFIER or COLUMN as lambda argument, got {}", lambda_node->dumpTree());
+ const auto & lambda_argument_name = lambda_argument_identifier ? lambda_argument_identifier->getIdentifier().getFullName()
+ : lambda_argument_column->getColumnName();
- bool has_expression_node = scope.alias_name_to_expression_node.contains(lambda_argument_name);
+ bool has_expression_node = scope.alias_name_to_expression_node->contains(lambda_argument_name);
bool has_alias_node = scope.alias_name_to_lambda_node.contains(lambda_argument_name);
if (has_expression_node || has_alias_node)
@@ -5214,7 +5255,7 @@ ProjectionNames QueryAnalyzer::resolveLambda(const QueryTreeNodePtr & lambda_nod
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Alias name '{}' inside lambda {} cannot have same name as lambda argument. In scope {}",
lambda_argument_name,
- lambda_argument_node_typed.formatASTForErrorMessage(),
+ lambda_argument_node->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
}
@@ -6233,8 +6274,8 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
*
* To resolve b we need to resolve a.
*/
- auto it = scope.alias_name_to_expression_node.find(node_alias);
- if (it != scope.alias_name_to_expression_node.end())
+ auto it = scope.alias_name_to_expression_node->find(node_alias);
+ if (it != scope.alias_name_to_expression_node->end())
node = it->second;
if (allow_lambda_expression)
@@ -6245,7 +6286,7 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
}
}
- scope.expressions_in_resolve_process_stack.pushNode(node);
+ scope.pushExpressionNode(node);
auto node_type = node->getNodeType();
@@ -6274,7 +6315,7 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
resolved_identifier_node = tryResolveIdentifier({unresolved_identifier, IdentifierLookupContext::FUNCTION}, scope).resolved_identifier;
if (resolved_identifier_node && !node_alias.empty())
- scope.alias_name_to_expression_node.erase(node_alias);
+ scope.alias_name_to_expression_node->erase(node_alias);
}
if (!resolved_identifier_node && allow_table_expression)
@@ -6490,13 +6531,23 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
validateTreeSize(node, scope.context->getSettingsRef().max_expanded_ast_elements, node_to_tree_size);
- if (!scope.expressions_in_resolve_process_stack.hasAggregateFunction())
+ /// Lambda can be inside the aggregate function, so we should check parent scopes.
+ /// Most likely only the root scope can have an arrgegate function, but let's check all just in case.
+ bool in_aggregate_function_scope = false;
+ for (const auto * scope_ptr = &scope; scope_ptr; scope_ptr = scope_ptr->parent_scope)
+ in_aggregate_function_scope = in_aggregate_function_scope || scope_ptr->expressions_in_resolve_process_stack.hasAggregateFunction();
+
+ if (!in_aggregate_function_scope)
{
- auto it = scope.nullable_group_by_keys.find(node);
- if (it != scope.nullable_group_by_keys.end())
+ for (const auto * scope_ptr = &scope; scope_ptr; scope_ptr = scope_ptr->parent_scope)
{
- node = it->node->clone();
- node->convertToNullable();
+ auto it = scope_ptr->nullable_group_by_keys.find(node);
+ if (it != scope_ptr->nullable_group_by_keys.end())
+ {
+ node = it->node->clone();
+ node->convertToNullable();
+ break;
+ }
}
}
@@ -6505,8 +6556,8 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
*/
if (!node_alias.empty() && use_alias_table && !scope.group_by_use_nulls)
{
- auto it = scope.alias_name_to_expression_node.find(node_alias);
- if (it != scope.alias_name_to_expression_node.end())
+ auto it = scope.alias_name_to_expression_node->find(node_alias);
+ if (it != scope.alias_name_to_expression_node->end())
it->second = node;
if (allow_lambda_expression)
@@ -6519,7 +6570,7 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
resolved_expressions.emplace(node, result_projection_names);
- scope.expressions_in_resolve_process_stack.popNode();
+ scope.popExpressionNode();
bool expression_was_root = scope.expressions_in_resolve_process_stack.empty();
if (expression_was_root)
scope.non_cached_identifier_lookups_during_expression_resolve.clear();
@@ -6863,11 +6914,11 @@ void QueryAnalyzer::initializeQueryJoinTreeNode(QueryTreeNodePtr & join_tree_nod
*/
resolve_settings.allow_to_resolve_subquery_during_identifier_resolution = false;
- scope.expressions_in_resolve_process_stack.pushNode(current_join_tree_node);
+ scope.pushExpressionNode(current_join_tree_node);
auto table_identifier_resolve_result = tryResolveIdentifier(table_identifier_lookup, scope, resolve_settings);
- scope.expressions_in_resolve_process_stack.popNode();
+ scope.popExpressionNode();
bool expression_was_root = scope.expressions_in_resolve_process_stack.empty();
if (expression_was_root)
scope.non_cached_identifier_lookups_during_expression_resolve.clear();
@@ -7453,7 +7504,7 @@ void QueryAnalyzer::resolveArrayJoin(QueryTreeNodePtr & array_join_node, Identif
for (auto & array_join_expression : array_join_nodes)
{
auto array_join_expression_alias = array_join_expression->getAlias();
- if (!array_join_expression_alias.empty() && scope.alias_name_to_expression_node.contains(array_join_expression_alias))
+ if (!array_join_expression_alias.empty() && scope.alias_name_to_expression_node->contains(array_join_expression_alias))
throw Exception(ErrorCodes::MULTIPLE_EXPRESSIONS_FOR_ALIAS,
"ARRAY JOIN expression {} with duplicate alias {}. In scope {}",
array_join_expression->formatASTForErrorMessage(),
@@ -7547,8 +7598,8 @@ void QueryAnalyzer::resolveArrayJoin(QueryTreeNodePtr & array_join_node, Identif
array_join_nodes = std::move(array_join_column_expressions);
for (auto & array_join_column_expression : array_join_nodes)
{
- auto it = scope.alias_name_to_expression_node.find(array_join_column_expression->getAlias());
- if (it != scope.alias_name_to_expression_node.end())
+ auto it = scope.alias_name_to_expression_node->find(array_join_column_expression->getAlias());
+ if (it != scope.alias_name_to_expression_node->end())
{
auto & array_join_column_expression_typed = array_join_column_expression->as();
auto array_join_column = std::make_shared(array_join_column_expression_typed.getColumn(),
@@ -8008,7 +8059,12 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
window_node_typed.setParentWindowName({});
}
- scope.window_name_to_window_node.emplace(window_node_typed.getAlias(), window_node);
+ auto [_, inserted] = scope.window_name_to_window_node.emplace(window_node_typed.getAlias(), window_node);
+ if (!inserted)
+ throw Exception(ErrorCodes::BAD_ARGUMENTS,
+ "Window '{}' is already defined. In scope {}",
+ window_node_typed.getAlias(),
+ scope.scope_node->formatASTForErrorMessage());
}
/** Disable identifier cache during JOIN TREE resolve.
@@ -8077,8 +8133,10 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
/// Clone is needed cause aliases share subtrees.
/// If not clone, the same (shared) subtree could be resolved again with different (Nullable) type
/// See 03023_group_by_use_nulls_analyzer_crashes
- for (auto & [_, node] : scope.alias_name_to_expression_node)
- node = node->clone();
+ for (auto & [key, node] : scope.alias_name_to_expression_node_before_group_by)
+ scope.alias_name_to_expression_node_after_group_by[key] = node->clone();
+
+ scope.alias_name_to_expression_node = &scope.alias_name_to_expression_node_after_group_by;
}
if (query_node_typed.hasHaving())
@@ -8162,8 +8220,8 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
bool has_node_in_alias_table = false;
- auto it = scope.alias_name_to_expression_node.find(node_alias);
- if (it != scope.alias_name_to_expression_node.end())
+ auto it = scope.alias_name_to_expression_node->find(node_alias);
+ if (it != scope.alias_name_to_expression_node->end())
{
has_node_in_alias_table = true;
@@ -8222,7 +8280,7 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
/// Remove aliases from expression and lambda nodes
- for (auto & [_, node] : scope.alias_name_to_expression_node)
+ for (auto & [_, node] : *scope.alias_name_to_expression_node)
node->removeAlias();
for (auto & [_, node] : scope.alias_name_to_lambda_node)
diff --git a/src/Analyzer/ValidationUtils.cpp b/src/Analyzer/ValidationUtils.cpp
index e17639367eb..9e977964755 100644
--- a/src/Analyzer/ValidationUtils.cpp
+++ b/src/Analyzer/ValidationUtils.cpp
@@ -26,6 +26,10 @@ namespace
void validateFilter(const QueryTreeNodePtr & filter_node, std::string_view exception_place_message, const QueryTreeNodePtr & query_node)
{
+ if (filter_node->getNodeType() == QueryTreeNodeType::LIST)
+ throw Exception(ErrorCodes::BAD_ARGUMENTS,
+ "Unsupported expression '{}' in filter", filter_node->formatASTForErrorMessage());
+
auto filter_node_result_type = filter_node->getResultType();
if (!filter_node_result_type->canBeUsedInBooleanContext())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
diff --git a/src/Columns/ColumnString.h b/src/Columns/ColumnString.h
index 2d1d69ced73..cbda5466303 100644
--- a/src/Columns/ColumnString.h
+++ b/src/Columns/ColumnString.h
@@ -1,7 +1,6 @@
#pragma once
#include
-#include
#include
#include
@@ -12,6 +11,8 @@
#include
#include
+#include
+
class Collator;
@@ -42,7 +43,11 @@ private:
size_t ALWAYS_INLINE offsetAt(ssize_t i) const { return offsets[i - 1]; }
/// Size of i-th element, including terminating zero.
- size_t ALWAYS_INLINE sizeAt(ssize_t i) const { return offsets[i] - offsets[i - 1]; }
+ size_t ALWAYS_INLINE sizeAt(ssize_t i) const
+ {
+ chassert(offsets[i] > offsets[i - 1]);
+ return offsets[i] - offsets[i - 1];
+ }
struct ComparatorBase;
@@ -79,7 +84,7 @@ public:
size_t byteSizeAt(size_t n) const override
{
- assert(n < size());
+ chassert(n < size());
return sizeAt(n) + sizeof(offsets[0]);
}
@@ -94,25 +99,25 @@ public:
Field operator[](size_t n) const override
{
- assert(n < size());
+ chassert(n < size());
return Field(&chars[offsetAt(n)], sizeAt(n) - 1);
}
void get(size_t n, Field & res) const override
{
- assert(n < size());
+ chassert(n < size());
res = std::string_view{reinterpret_cast(&chars[offsetAt(n)]), sizeAt(n) - 1};
}
StringRef getDataAt(size_t n) const override
{
- assert(n < size());
+ chassert(n < size());
return StringRef(&chars[offsetAt(n)], sizeAt(n) - 1);
}
bool isDefaultAt(size_t n) const override
{
- assert(n < size());
+ chassert(n < size());
return sizeAt(n) == 1;
}
diff --git a/src/Columns/ColumnUnique.cpp b/src/Columns/ColumnUnique.cpp
index edfee69a752..54f45204c00 100644
--- a/src/Columns/ColumnUnique.cpp
+++ b/src/Columns/ColumnUnique.cpp
@@ -21,5 +21,8 @@ template class ColumnUnique;
template class ColumnUnique;
template class ColumnUnique;
template class ColumnUnique;
+template class ColumnUnique;
+template class ColumnUnique;
+template class ColumnUnique;
}
diff --git a/src/Common/CaresPTRResolver.cpp b/src/Common/CaresPTRResolver.cpp
index 0261f4a130f..df456c9cfbd 100644
--- a/src/Common/CaresPTRResolver.cpp
+++ b/src/Common/CaresPTRResolver.cpp
@@ -173,11 +173,6 @@ namespace DB
return true;
}
- void CaresPTRResolver::cancel_requests(ares_channel channel)
- {
- ares_cancel(channel);
- }
-
std::span CaresPTRResolver::get_readable_sockets(int * sockets, pollfd * pollfd, ares_channel channel)
{
int sockets_bitmask = ares_getsock(channel, sockets, ARES_GETSOCK_MAXNUM);
diff --git a/src/Common/CaresPTRResolver.h b/src/Common/CaresPTRResolver.h
index 24a5e422ca8..95194e0d5ce 100644
--- a/src/Common/CaresPTRResolver.h
+++ b/src/Common/CaresPTRResolver.h
@@ -44,8 +44,6 @@ namespace DB
private:
bool wait_and_process(ares_channel channel);
- void cancel_requests(ares_channel channel);
-
void resolve(const std::string & ip, std::unordered_set & response, ares_channel channel);
void resolve_v6(const std::string & ip, std::unordered_set & response, ares_channel channel);
diff --git a/src/Common/PageCache.cpp b/src/Common/PageCache.cpp
index d4598d4683b..56bd8c1a339 100644
--- a/src/Common/PageCache.cpp
+++ b/src/Common/PageCache.cpp
@@ -198,12 +198,18 @@ size_t PageCache::getPinnedSize() const
PageCache::MemoryStats PageCache::getResidentSetSize() const
{
MemoryStats stats;
+
#ifdef OS_LINUX
if (use_madv_free)
{
std::unordered_set cache_mmap_addrs;
{
std::lock_guard lock(global_mutex);
+
+ /// Don't spend time on reading smaps if page cache is not used.
+ if (mmaps.empty())
+ return stats;
+
for (const auto & m : mmaps)
cache_mmap_addrs.insert(reinterpret_cast(m.ptr));
}
@@ -258,7 +264,7 @@ PageCache::MemoryStats PageCache::getResidentSetSize() const
UInt64 addr = unhexUInt(s.c_str());
current_range_is_cache = cache_mmap_addrs.contains(addr);
}
- else if (s == "Rss:" || s == "LazyFree")
+ else if (s == "Rss:" || s == "LazyFree:")
{
skip_whitespace();
size_t val;
diff --git a/src/Common/Scheduler/Nodes/FifoQueue.h b/src/Common/Scheduler/Nodes/FifoQueue.h
index 45ed32343ff..9ec997c06d2 100644
--- a/src/Common/Scheduler/Nodes/FifoQueue.h
+++ b/src/Common/Scheduler/Nodes/FifoQueue.h
@@ -6,7 +6,8 @@
#include
-#include
+#include
+
#include
@@ -15,6 +16,7 @@ namespace DB
namespace ErrorCodes
{
+ extern const int LOGICAL_ERROR;
extern const int INVALID_SCHEDULER_NODE;
}
@@ -42,7 +44,7 @@ public:
std::lock_guard lock(mutex);
queue_cost += request->cost;
bool was_empty = requests.empty();
- requests.push_back(request);
+ requests.push_back(*request);
if (was_empty)
scheduleActivation();
}
@@ -52,7 +54,7 @@ public:
std::lock_guard lock(mutex);
if (requests.empty())
return {nullptr, false};
- ResourceRequest * result = requests.front();
+ ResourceRequest * result = &requests.front();
requests.pop_front();
if (requests.empty())
busy_periods++;
@@ -65,19 +67,24 @@ public:
bool cancelRequest(ResourceRequest * request) override
{
std::lock_guard lock(mutex);
- // TODO(serxa): reimplement queue as intrusive list of ResourceRequest to make this O(1) instead of O(N)
- for (auto i = requests.begin(), e = requests.end(); i != e; ++i)
+ if (request->is_linked())
{
- if (*i == request)
- {
- requests.erase(i);
- if (requests.empty())
- busy_periods++;
- queue_cost -= request->cost;
- canceled_requests++;
- canceled_cost += request->cost;
- return true;
- }
+ // It's impossible to check that `request` is indeed inserted to this queue and not another queue.
+ // It's up to caller to make sure this is the case. Otherwise, list sizes will be corrupted.
+ // Not tracking list sizes is not an option, because another problem appears: removing from list w/o locking.
+ // Another possible solution - keep track if request `is_cancelable` guarded by `mutex`
+ // Simple check for list size corruption
+ if (requests.empty())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "trying to cancel request (linked into another queue) from empty queue: {}", getPath());
+
+ requests.erase(requests.iterator_to(*request));
+
+ if (requests.empty())
+ busy_periods++;
+ queue_cost -= request->cost;
+ canceled_requests++;
+ canceled_cost += request->cost;
+ return true;
}
return false;
}
@@ -124,7 +131,7 @@ public:
private:
std::mutex mutex;
Int64 queue_cost = 0;
- std::deque requests; // TODO(serxa): reimplement it using intrusive list to avoid allocations/deallocations and O(N) during cancel
+ boost::intrusive::list requests;
};
}
diff --git a/src/Common/Scheduler/ResourceRequest.h b/src/Common/Scheduler/ResourceRequest.h
index f3153ad382c..d64f624cec5 100644
--- a/src/Common/Scheduler/ResourceRequest.h
+++ b/src/Common/Scheduler/ResourceRequest.h
@@ -1,5 +1,6 @@
#pragma once
+#include
#include
#include
@@ -41,7 +42,7 @@ constexpr ResourceCost ResourceCostMax = std::numeric_limits::max();
* Returning true means successful cancel and therefore steps (4) and (5) are not going to happen
* and step (6) MUST be omitted.
*/
-class ResourceRequest
+class ResourceRequest : public boost::intrusive::list_base_hook<>
{
public:
/// Cost of request execution; should be filled before request enqueueing.
@@ -62,6 +63,7 @@ public:
{
cost = cost_;
constraint = nullptr;
+ // Note that list_base_hook should be reset independently (by intrusive list)
}
virtual ~ResourceRequest() = default;
diff --git a/src/Common/UTF8Helpers.cpp b/src/Common/UTF8Helpers.cpp
index be1f222dc96..b8f5c000e75 100644
--- a/src/Common/UTF8Helpers.cpp
+++ b/src/Common/UTF8Helpers.cpp
@@ -1,9 +1,13 @@
-#include
#include
+#include
+#include
#include
#include
+#if USE_MULTITARGET_CODE
+#include
+#endif
namespace DB
{
@@ -215,5 +219,71 @@ size_t computeBytesBeforeWidth(const UInt8 * data, size_t size, size_t prefix, s
return computeWidthImpl(data, size, prefix, limit);
}
+
+DECLARE_DEFAULT_CODE(
+bool isAllASCII(const UInt8 * data, size_t size)
+{
+ UInt8 mask = 0;
+ for (size_t i = 0; i < size; ++i)
+ mask |= data[i];
+
+ return !(mask & 0x80);
+})
+
+DECLARE_SSE42_SPECIFIC_CODE(
+/// Copy from https://github.com/lemire/fastvalidate-utf-8/blob/master/include/simdasciicheck.h
+bool isAllASCII(const UInt8 * data, size_t size)
+{
+ __m128i masks = _mm_setzero_si128();
+
+ size_t i = 0;
+ for (; i + 16 <= size; i += 16)
+ {
+ __m128i bytes = _mm_loadu_si128(reinterpret_cast(data + i));
+ masks = _mm_or_si128(masks, bytes);
+ }
+ int mask = _mm_movemask_epi8(masks);
+
+ UInt8 tail_mask = 0;
+ for (; i < size; i++)
+ tail_mask |= data[i];
+
+ mask |= (tail_mask & 0x80);
+ return !mask;
+})
+
+DECLARE_AVX2_SPECIFIC_CODE(
+bool isAllASCII(const UInt8 * data, size_t size)
+{
+ __m256i masks = _mm256_setzero_si256();
+
+ size_t i = 0;
+ for (; i + 32 <= size; i += 32)
+ {
+ __m256i bytes = _mm256_loadu_si256(reinterpret_cast(data + i));
+ masks = _mm256_or_si256(masks, bytes);
+ }
+ int mask = _mm256_movemask_epi8(masks);
+
+ UInt8 tail_mask = 0;
+ for (; i < size; i++)
+ tail_mask |= data[i];
+
+ mask |= (tail_mask & 0x80);
+ return !mask;
+})
+
+bool isAllASCII(const UInt8* data, size_t size)
+{
+#if USE_MULTITARGET_CODE
+ if (isArchSupported(TargetArch::AVX2))
+ return TargetSpecific::AVX2::isAllASCII(data, size);
+ if (isArchSupported(TargetArch::SSE42))
+ return TargetSpecific::SSE42::isAllASCII(data, size);
+#endif
+ return TargetSpecific::Default::isAllASCII(data, size);
+}
+
+
}
}
diff --git a/src/Common/UTF8Helpers.h b/src/Common/UTF8Helpers.h
index a4dd88921b7..933b62c7b63 100644
--- a/src/Common/UTF8Helpers.h
+++ b/src/Common/UTF8Helpers.h
@@ -136,7 +136,10 @@ size_t computeWidth(const UInt8 * data, size_t size, size_t prefix = 0) noexcept
*/
size_t computeBytesBeforeWidth(const UInt8 * data, size_t size, size_t prefix, size_t limit) noexcept;
-}
+/// If all the characters in the string are ASCII, return true.
+bool isAllASCII(const UInt8* data, size_t size);
+
+}
}
diff --git a/src/Formats/NumpyDataTypes.h b/src/Formats/NumpyDataTypes.h
index cb40c67cd19..062f743c0ea 100644
--- a/src/Formats/NumpyDataTypes.h
+++ b/src/Formats/NumpyDataTypes.h
@@ -1,10 +1,12 @@
#pragma once
#include
#include
+#include
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
+ extern const int NOT_IMPLEMENTED;
}
enum class NumpyDataTypeIndex : uint8_t
@@ -29,9 +31,9 @@ class NumpyDataType
public:
enum Endianness
{
- LITTLE,
- BIG,
- NONE,
+ LITTLE = '<',
+ BIG = '>',
+ NONE = '|',
};
NumpyDataTypeIndex type_index;
@@ -41,15 +43,18 @@ public:
Endianness getEndianness() const { return endianness; }
virtual NumpyDataTypeIndex getTypeIndex() const = 0;
+ virtual size_t getSize() const { throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Function getSize() is not implemented"); }
+ virtual void setSize(size_t) { throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Function setSize() is not implemented"); }
+ virtual String str() const { throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Function str() is not implemented"); }
-private:
+protected:
Endianness endianness;
};
class NumpyDataTypeInt : public NumpyDataType
{
public:
- NumpyDataTypeInt(Endianness endianness, size_t size_, bool is_signed_) : NumpyDataType(endianness), size(size_), is_signed(is_signed_)
+ NumpyDataTypeInt(Endianness endianness_, size_t size_, bool is_signed_) : NumpyDataType(endianness_), size(size_), is_signed(is_signed_)
{
switch (size)
{
@@ -67,6 +72,14 @@ public:
return type_index;
}
bool isSigned() const { return is_signed; }
+ String str() const override
+ {
+ DB::WriteBufferFromOwnString buf;
+ writeChar(static_cast(endianness), buf);
+ writeChar(is_signed ? 'i' : 'u', buf);
+ writeIntText(size, buf);
+ return buf.str();
+ }
private:
size_t size;
@@ -76,7 +89,7 @@ private:
class NumpyDataTypeFloat : public NumpyDataType
{
public:
- NumpyDataTypeFloat(Endianness endianness, size_t size_) : NumpyDataType(endianness), size(size_)
+ NumpyDataTypeFloat(Endianness endianness_, size_t size_) : NumpyDataType(endianness_), size(size_)
{
switch (size)
{
@@ -92,6 +105,14 @@ public:
{
return type_index;
}
+ String str() const override
+ {
+ DB::WriteBufferFromOwnString buf;
+ writeChar(static_cast(endianness), buf);
+ writeChar('f', buf);
+ writeIntText(size, buf);
+ return buf.str();
+ }
private:
size_t size;
};
@@ -99,13 +120,22 @@ private:
class NumpyDataTypeString : public NumpyDataType
{
public:
- NumpyDataTypeString(Endianness endianness, size_t size_) : NumpyDataType(endianness), size(size_)
+ NumpyDataTypeString(Endianness endianness_, size_t size_) : NumpyDataType(endianness_), size(size_)
{
type_index = NumpyDataTypeIndex::String;
}
NumpyDataTypeIndex getTypeIndex() const override { return type_index; }
- size_t getSize() const { return size; }
+ size_t getSize() const override { return size; }
+ void setSize(size_t size_) override { size = size_; }
+ String str() const override
+ {
+ DB::WriteBufferFromOwnString buf;
+ writeChar(static_cast(endianness), buf);
+ writeChar('S', buf);
+ writeIntText(size, buf);
+ return buf.str();
+ }
private:
size_t size;
};
@@ -113,13 +143,13 @@ private:
class NumpyDataTypeUnicode : public NumpyDataType
{
public:
- NumpyDataTypeUnicode(Endianness endianness, size_t size_) : NumpyDataType(endianness), size(size_)
+ NumpyDataTypeUnicode(Endianness endianness_, size_t size_) : NumpyDataType(endianness_), size(size_)
{
type_index = NumpyDataTypeIndex::Unicode;
}
NumpyDataTypeIndex getTypeIndex() const override { return type_index; }
- size_t getSize() const { return size * 4; }
+ size_t getSize() const override { return size * 4; }
private:
size_t size;
};
diff --git a/src/Formats/registerFormats.cpp b/src/Formats/registerFormats.cpp
index 1f851da850a..57ca1bb49c8 100644
--- a/src/Formats/registerFormats.cpp
+++ b/src/Formats/registerFormats.cpp
@@ -76,6 +76,8 @@ void registerInputFormatCustomSeparated(FormatFactory & factory);
void registerOutputFormatCustomSeparated(FormatFactory & factory);
void registerInputFormatCapnProto(FormatFactory & factory);
void registerOutputFormatCapnProto(FormatFactory & factory);
+void registerInputFormatNpy(FormatFactory & factory);
+void registerOutputFormatNpy(FormatFactory & factory);
void registerInputFormatForm(FormatFactory & factory);
/// Output only (presentational) formats.
@@ -104,7 +106,6 @@ void registerInputFormatMySQLDump(FormatFactory & factory);
void registerInputFormatParquetMetadata(FormatFactory & factory);
void registerInputFormatDWARF(FormatFactory & factory);
void registerInputFormatOne(FormatFactory & factory);
-void registerInputFormatNpy(FormatFactory & factory);
#if USE_HIVE
void registerInputFormatHiveText(FormatFactory & factory);
@@ -224,6 +225,8 @@ void registerFormats()
registerOutputFormatAvro(factory);
registerInputFormatArrow(factory);
registerOutputFormatArrow(factory);
+ registerInputFormatNpy(factory);
+ registerOutputFormatNpy(factory);
registerOutputFormatPretty(factory);
registerOutputFormatPrettyCompact(factory);
@@ -254,7 +257,6 @@ void registerFormats()
registerInputFormatParquetMetadata(factory);
registerInputFormatDWARF(factory);
registerInputFormatOne(factory);
- registerInputFormatNpy(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONAsString(factory);
diff --git a/src/Functions/FunctionHelpers.cpp b/src/Functions/FunctionHelpers.cpp
index 048a601de81..d85bb0e7060 100644
--- a/src/Functions/FunctionHelpers.cpp
+++ b/src/Functions/FunctionHelpers.cpp
@@ -80,7 +80,7 @@ ColumnWithTypeAndName columnGetNested(const ColumnWithTypeAndName & col)
return ColumnWithTypeAndName{ nullable_res, nested_type, col.name };
}
else
- throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column for DataTypeNullable");
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} for DataTypeNullable", col.dumpStructure());
}
return col;
}
diff --git a/src/Functions/GatherUtils/Sinks.h b/src/Functions/GatherUtils/Sinks.h
index a8054da1159..2aa7c147136 100644
--- a/src/Functions/GatherUtils/Sinks.h
+++ b/src/Functions/GatherUtils/Sinks.h
@@ -48,7 +48,7 @@ struct NumericArraySink : public ArraySinkImpl>
NumericArraySink(IColumn & elements_, ColumnArray::Offsets & offsets_, size_t column_size)
: elements(assert_cast(elements_).getData()), offsets(offsets_)
{
- offsets.resize(column_size);
+ offsets.resize_exact(column_size);
}
void next()
@@ -69,7 +69,7 @@ struct NumericArraySink : public ArraySinkImpl>
void reserve(size_t num_elements)
{
- elements.reserve(num_elements);
+ elements.reserve_exact(num_elements);
}
};
@@ -85,7 +85,7 @@ struct StringSink
StringSink(ColumnString & col, size_t column_size)
: elements(col.getChars()), offsets(col.getOffsets())
{
- offsets.resize(column_size);
+ offsets.resize_exact(column_size);
}
void ALWAYS_INLINE next()
@@ -108,7 +108,7 @@ struct StringSink
void reserve(size_t num_elements)
{
- elements.reserve(num_elements);
+ elements.reserve_exact(num_elements);
}
};
@@ -125,7 +125,7 @@ struct FixedStringSink
FixedStringSink(ColumnFixedString & col, size_t column_size)
: elements(col.getChars()), string_size(col.getN()), total_rows(column_size)
{
- elements.resize(column_size * string_size);
+ elements.resize_exact(column_size * string_size);
}
void next()
@@ -146,7 +146,7 @@ struct FixedStringSink
void reserve(size_t num_elements)
{
- elements.reserve(num_elements);
+ elements.reserve_exact(num_elements);
}
};
@@ -165,7 +165,7 @@ struct GenericArraySink : public ArraySinkImpl
GenericArraySink(IColumn & elements_, ColumnArray::Offsets & offsets_, size_t column_size)
: elements(elements_), offsets(offsets_)
{
- offsets.resize(column_size);
+ offsets.resize_exact(column_size);
}
void next()
@@ -210,7 +210,7 @@ struct NullableArraySink : public ArraySink
void reserve(size_t num_elements)
{
ArraySink::reserve(num_elements);
- null_map.reserve(num_elements);
+ null_map.reserve_exact(num_elements);
}
};
diff --git a/src/Functions/GatherUtils/Sources.h b/src/Functions/GatherUtils/Sources.h
index 4e3009a695d..e5e3451fe4c 100644
--- a/src/Functions/GatherUtils/Sources.h
+++ b/src/Functions/GatherUtils/Sources.h
@@ -323,6 +323,8 @@ struct StringSource
return {&elements[prev_offset], length + elem_size > offset ? std::min(elem_size, length + elem_size - offset) : 0};
return {&elements[prev_offset + elem_size - offset], std::min(length, offset)};
}
+
+ const ColumnString::Chars & getElements() const { return elements; }
};
/// Treats Enum values as Strings, modeled after StringSource
@@ -517,11 +519,12 @@ struct FixedStringSource
const UInt8 * pos;
const UInt8 * end;
size_t string_size;
+ const typename ColumnString::Chars & elements;
+
size_t row_num = 0;
size_t column_size = 0;
- explicit FixedStringSource(const ColumnFixedString & col)
- : string_size(col.getN())
+ explicit FixedStringSource(const ColumnFixedString & col) : string_size(col.getN()), elements(col.getChars())
{
const auto & chars = col.getChars();
pos = chars.data();
@@ -592,6 +595,8 @@ struct FixedStringSource
return {pos, length + string_size > offset ? std::min(string_size, length + string_size - offset) : 0};
return {pos + string_size - offset, std::min(length, offset)};
}
+
+ const ColumnString::Chars & getElements() const { return elements; }
};
diff --git a/src/Functions/LowerUpperImpl.h b/src/Functions/LowerUpperImpl.h
index f093e00f7ab..72b3ce1ca34 100644
--- a/src/Functions/LowerUpperImpl.h
+++ b/src/Functions/LowerUpperImpl.h
@@ -13,14 +13,14 @@ struct LowerUpperImpl
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
- res_data.resize(data.size());
+ res_data.resize_exact(data.size());
res_offsets.assign(offsets);
array(data.data(), data.data() + data.size(), res_data.data());
}
static void vectorFixed(const ColumnString::Chars & data, size_t /*n*/, ColumnString::Chars & res_data)
{
- res_data.resize(data.size());
+ res_data.resize_exact(data.size());
array(data.data(), data.data() + data.size(), res_data.data());
}
diff --git a/src/Functions/LowerUpperUTF8Impl.h b/src/Functions/LowerUpperUTF8Impl.h
index 7ca98166576..bb794a0f8ed 100644
--- a/src/Functions/LowerUpperUTF8Impl.h
+++ b/src/Functions/LowerUpperUTF8Impl.h
@@ -1,8 +1,9 @@
#pragma once
#include
+#include
+#include
#include
#include
-#include
#ifdef __SSE2__
#include
@@ -92,7 +93,15 @@ struct LowerUpperUTF8Impl
{
if (data.empty())
return;
- res_data.resize(data.size());
+
+ bool all_ascii = UTF8::isAllASCII(data.data(), data.size());
+ if (all_ascii)
+ {
+ LowerUpperImpl::vector(data, offsets, res_data, res_offsets);
+ return;
+ }
+
+ res_data.resize_exact(data.size());
res_offsets.assign(offsets);
array(data.data(), data.data() + data.size(), offsets, res_data.data());
}
diff --git a/src/Functions/clamp.cpp b/src/Functions/clamp.cpp
new file mode 100644
index 00000000000..bb347a575e4
--- /dev/null
+++ b/src/Functions/clamp.cpp
@@ -0,0 +1,69 @@
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
+ extern const int BAD_ARGUMENTS;
+}
+
+
+class FunctionClamp : public IFunction
+{
+
+public:
+ static constexpr auto name = "clamp";
+
+ String getName() const override { return name; }
+ size_t getNumberOfArguments() const override { return 3; }
+ bool useDefaultImplementationForConstants() const override { return true; }
+ bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
+ static FunctionPtr create(ContextPtr) { return std::make_shared(); }
+
+ DataTypePtr getReturnTypeImpl(const DataTypes & types) const override
+ {
+ if (types.size() != 3)
+ throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires 3 arguments", getName());
+
+ return getLeastSupertype(types);
+ }
+
+ ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
+ {
+ size_t arg_size = arguments.size();
+ Columns converted_columns(arg_size);
+ for (size_t arg = 0; arg < arg_size; ++arg)
+ converted_columns[arg] = castColumn(arguments[arg], result_type)->convertToFullColumnIfConst();
+
+ auto result_column = result_type->createColumn();
+ for (size_t row_num = 0; row_num < input_rows_count; ++row_num)
+ {
+ if (converted_columns[1]->compareAt(row_num, row_num, *converted_columns[2], 1) > 0)
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "The minimum value cannot be greater than the maximum value for function {}", getName());
+
+ size_t best_arg = 0;
+ if (converted_columns[1]->compareAt(row_num, row_num, *converted_columns[best_arg], 1) > 0)
+ best_arg = 1;
+ else if (converted_columns[2]->compareAt(row_num, row_num, *converted_columns[best_arg], 1) < 0)
+ best_arg = 2;
+
+ result_column->insertFrom(*converted_columns[best_arg], row_num);
+ }
+
+ return result_column;
+ }
+
+};
+
+REGISTER_FUNCTION(Clamp)
+{
+ factory.registerFunction();
+}
+}
diff --git a/src/Functions/padString.cpp b/src/Functions/padString.cpp
index ccef87d83e7..0922e0ddb8a 100644
--- a/src/Functions/padString.cpp
+++ b/src/Functions/padString.cpp
@@ -210,19 +210,18 @@ namespace
pad_string = column_pad_const->getValue();
}
- PaddingChars padding_chars{pad_string};
auto col_res = ColumnString::create();
StringSink res_sink{*col_res, input_rows_count};
if (const ColumnString * col = checkAndGetColumn(column_string.get()))
- executeForSource(StringSource{*col}, column_length, padding_chars, res_sink);
+ executeForSource(StringSource{*col}, column_length, pad_string, res_sink);
else if (const ColumnFixedString * col_fixed = checkAndGetColumn(column_string.get()))
- executeForSource(FixedStringSource{*col_fixed}, column_length, padding_chars, res_sink);
+ executeForSource(FixedStringSource{*col_fixed}, column_length, pad_string, res_sink);
else if (const ColumnConst * col_const = checkAndGetColumnConst(column_string.get()))
- executeForSource(ConstSource{*col_const}, column_length, padding_chars, res_sink);
+ executeForSource(ConstSource{*col_const}, column_length, pad_string, res_sink);
else if (const ColumnConst * col_const_fixed = checkAndGetColumnConst(column_string.get()))
- executeForSource(ConstSource{*col_const_fixed}, column_length, padding_chars, res_sink);
+ executeForSource(ConstSource{*col_const_fixed}, column_length, pad_string, res_sink);
else
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
@@ -235,23 +234,40 @@ namespace
private:
template
- void executeForSource(
- SourceStrings && strings,
- const ColumnPtr & column_length,
- const PaddingChars & padding_chars,
- StringSink & res_sink) const
+ void executeForSource(SourceStrings && strings, const ColumnPtr & column_length, const String & pad_string, StringSink & res_sink) const
{
- if (const auto * col_const = checkAndGetColumn(column_length.get()))
- executeForSourceAndLength(std::forward(strings), ConstSource{*col_const}, padding_chars, res_sink);
+ const auto & chars = strings.getElements();
+ bool all_ascii = UTF8::isAllASCII(reinterpret_cast(pad_string.data()), pad_string.size())
+ && UTF8::isAllASCII(chars.data(), chars.size());
+ bool is_actually_utf8 = is_utf8 && !all_ascii;
+
+ if (!is_actually_utf8)
+ {
+ PaddingChars padding_chars{pad_string};
+ if (const auto * col_const = checkAndGetColumn(column_length.get()))
+ executeForSourceAndLength(
+ std::forward(strings), ConstSource{*col_const}, padding_chars, res_sink);
+ else
+ executeForSourceAndLength(
+ std::forward(strings), GenericValueSource{*column_length}, padding_chars, res_sink);
+ }
else
- executeForSourceAndLength(std::forward(strings), GenericValueSource{*column_length}, padding_chars, res_sink);
+ {
+ PaddingChars padding_chars{pad_string};
+ if (const auto * col_const = checkAndGetColumn(column_length.get()))
+ executeForSourceAndLength(
+ std::forward(strings), ConstSource{*col_const}, padding_chars, res_sink);
+ else
+ executeForSourceAndLength(
+ std::forward(strings), GenericValueSource{*column_length}, padding_chars, res_sink);
+ }
}
- template
+ template
void executeForSourceAndLength(
SourceStrings && strings,
SourceLengths && lengths,
- const PaddingChars & padding_chars,
+ const PaddingChars & padding_chars,
StringSink & res_sink) const
{
bool is_const_new_length = lengths.isConst();
@@ -263,7 +279,7 @@ namespace
for (; !res_sink.isEnd(); res_sink.next(), strings.next(), lengths.next())
{
auto str = strings.getWhole();
- ssize_t current_length = getLengthOfSlice(str);
+ ssize_t current_length = getLengthOfSlice(str);
if (!res_sink.rowNum() || !is_const_new_length)
{
@@ -293,7 +309,7 @@ namespace
}
else if (new_length < current_length)
{
- str = removeSuffixFromSlice(str, current_length - new_length);
+ str = removeSuffixFromSlice(str, current_length - new_length);
writeSlice(str, res_sink);
}
else if (new_length > current_length)
diff --git a/src/Functions/reverse.cpp b/src/Functions/reverse.cpp
index 32b998523c7..39608b77997 100644
--- a/src/Functions/reverse.cpp
+++ b/src/Functions/reverse.cpp
@@ -1,10 +1,10 @@
#include
-#include
#include
#include
#include
#include
#include
+#include "reverse.h"
namespace DB
@@ -17,42 +17,6 @@ namespace ErrorCodes
namespace
{
-
-/** Reverse the string as a sequence of bytes.
- */
-struct ReverseImpl
-{
- static void vector(const ColumnString::Chars & data,
- const ColumnString::Offsets & offsets,
- ColumnString::Chars & res_data,
- ColumnString::Offsets & res_offsets)
- {
- res_data.resize(data.size());
- res_offsets.assign(offsets);
- size_t size = offsets.size();
-
- ColumnString::Offset prev_offset = 0;
- for (size_t i = 0; i < size; ++i)
- {
- for (size_t j = prev_offset; j < offsets[i] - 1; ++j)
- res_data[j] = data[offsets[i] + prev_offset - 2 - j];
- res_data[offsets[i] - 1] = 0;
- prev_offset = offsets[i];
- }
- }
-
- static void vectorFixed(const ColumnString::Chars & data, size_t n, ColumnString::Chars & res_data)
- {
- res_data.resize(data.size());
- size_t size = data.size() / n;
-
- for (size_t i = 0; i < size; ++i)
- for (size_t j = i * n; j < (i + 1) * n; ++j)
- res_data[j] = data[(i * 2 + 1) * n - j - 1];
- }
-};
-
-
class FunctionReverse : public IFunction
{
public:
diff --git a/src/Functions/reverse.h b/src/Functions/reverse.h
new file mode 100644
index 00000000000..5f999af4297
--- /dev/null
+++ b/src/Functions/reverse.h
@@ -0,0 +1,42 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+
+/** Reverse the string as a sequence of bytes.
+ */
+struct ReverseImpl
+{
+ static void vector(const ColumnString::Chars & data,
+ const ColumnString::Offsets & offsets,
+ ColumnString::Chars & res_data,
+ ColumnString::Offsets & res_offsets)
+ {
+ res_data.resize_exact(data.size());
+ res_offsets.assign(offsets);
+ size_t size = offsets.size();
+
+ ColumnString::Offset prev_offset = 0;
+ for (size_t i = 0; i < size; ++i)
+ {
+ for (size_t j = prev_offset; j < offsets[i] - 1; ++j)
+ res_data[j] = data[offsets[i] + prev_offset - 2 - j];
+ res_data[offsets[i] - 1] = 0;
+ prev_offset = offsets[i];
+ }
+ }
+
+ static void vectorFixed(const ColumnString::Chars & data, size_t n, ColumnString::Chars & res_data)
+ {
+ res_data.resize_exact(data.size());
+ size_t size = data.size() / n;
+
+ for (size_t i = 0; i < size; ++i)
+ for (size_t j = i * n; j < (i + 1) * n; ++j)
+ res_data[j] = data[(i * 2 + 1) * n - j - 1];
+ }
+};
+
+}
diff --git a/src/Functions/reverseUTF8.cpp b/src/Functions/reverseUTF8.cpp
index 8a76af05d86..4ea861919a1 100644
--- a/src/Functions/reverseUTF8.cpp
+++ b/src/Functions/reverseUTF8.cpp
@@ -1,7 +1,9 @@
-#include
#include
+#include
#include
#include
+#include
+#include "reverse.h"
namespace DB
@@ -25,10 +27,18 @@ struct ReverseUTF8Impl
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
+ bool all_ascii = UTF8::isAllASCII(data.data(), data.size());
+ if (all_ascii)
+ {
+ ReverseImpl::vector(data, offsets, res_data, res_offsets);
+ return;
+ }
+
res_data.resize(data.size());
res_offsets.assign(offsets);
size_t size = offsets.size();
+
ColumnString::Offset prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
diff --git a/src/Functions/substring.cpp b/src/Functions/substring.cpp
index e809914f5f0..122f83d758b 100644
--- a/src/Functions/substring.cpp
+++ b/src/Functions/substring.cpp
@@ -148,9 +148,23 @@ public:
if constexpr (is_utf8)
{
if (const ColumnString * col = checkAndGetColumn(column_string.get()))
- return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, UTF8StringSource(*col), input_rows_count);
+ {
+ bool all_ascii = UTF8::isAllASCII(col->getChars().data(), col->getChars().size());
+ if (all_ascii)
+ return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, StringSource(*col), input_rows_count);
+ else
+ return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, UTF8StringSource(*col), input_rows_count);
+ }
+
if (const ColumnConst * col_const = checkAndGetColumnConst(column_string.get()))
- return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, ConstSource(*col_const), input_rows_count);
+ {
+ StringRef str_ref = col_const->getDataAt(0);
+ bool all_ascii = UTF8::isAllASCII(reinterpret_cast(str_ref.data), str_ref.size);
+ if (all_ascii)
+ return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, ConstSource(*col_const), input_rows_count);
+ else
+ return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, ConstSource(*col_const), input_rows_count);
+ }
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}", arguments[0].column->getName(), getName());
}
else
diff --git a/src/Functions/substringIndex.cpp b/src/Functions/substringIndex.cpp
index 5f3f054b624..74474cb4b23 100644
--- a/src/Functions/substringIndex.cpp
+++ b/src/Functions/substringIndex.cpp
@@ -129,8 +129,10 @@ namespace
res_data.reserve(str_column->getChars().size() / 2);
res_offsets.reserve(rows);
+ bool all_ascii = UTF8::isAllASCII(str_column->getChars().data(), str_column->getChars().size())
+ && UTF8::isAllASCII(reinterpret_cast(delim.data()), delim.size());
std::unique_ptr searcher
- = !is_utf8 ? nullptr : std::make_unique(delim.data(), delim.size());
+ = !is_utf8 || all_ascii ? nullptr : std::make_unique(delim.data(), delim.size());
for (size_t i = 0; i < rows; ++i)
{
@@ -140,10 +142,12 @@ namespace
StringRef res_ref;
if constexpr (!is_utf8)
res_ref = substringIndex(str_ref, delim[0], count);
+ else if (all_ascii)
+ res_ref = substringIndex(str_ref, delim[0], count);
else
res_ref = substringIndexUTF8(searcher.get(), str_ref, delim, count);
- appendToResultColumn(res_ref, res_data, res_offsets);
+ appendToResultColumn(res_ref, res_data, res_offsets);
}
}
@@ -158,8 +162,10 @@ namespace
res_data.reserve(str_column->getChars().size() / 2);
res_offsets.reserve(rows);
+ bool all_ascii = UTF8::isAllASCII(str_column->getChars().data(), str_column->getChars().size())
+ && UTF8::isAllASCII(reinterpret_cast(delim.data()), delim.size());
std::unique_ptr searcher
- = !is_utf8 ? nullptr : std::make_unique(delim.data(), delim.size());
+ = !is_utf8 || all_ascii ? nullptr : std::make_unique(delim.data(), delim.size());
for (size_t i = 0; i < rows; ++i)
{
@@ -168,10 +174,12 @@ namespace
StringRef res_ref;
if constexpr (!is_utf8)
res_ref = substringIndex(str_ref, delim[0], count);
+ else if (all_ascii)
+ res_ref = substringIndex(str_ref, delim[0], count);
else
res_ref = substringIndexUTF8(searcher.get(), str_ref, delim, count);
- appendToResultColumn(res_ref, res_data, res_offsets);
+ appendToResultColumn(res_ref, res_data, res_offsets);
}
}
@@ -186,8 +194,10 @@ namespace
res_data.reserve(str.size() * rows / 2);
res_offsets.reserve(rows);
+ bool all_ascii = UTF8::isAllASCII(reinterpret_cast(str.data()), str.size())
+ && UTF8::isAllASCII(reinterpret_cast(delim.data()), delim.size());
std::unique_ptr searcher
- = !is_utf8 ? nullptr : std::make_unique(delim.data(), delim.size());
+ = !is_utf8 || all_ascii ? nullptr : std::make_unique(delim.data(), delim.size());
StringRef str_ref{str.data(), str.size()};
for (size_t i = 0; i < rows; ++i)
@@ -197,18 +207,26 @@ namespace
StringRef res_ref;
if constexpr (!is_utf8)
res_ref = substringIndex(str_ref, delim[0], count);
+ else if (all_ascii)
+ res_ref = substringIndex(str_ref, delim[0], count);
else
res_ref = substringIndexUTF8(searcher.get(), str_ref, delim, count);
- appendToResultColumn(res_ref, res_data, res_offsets);
+ appendToResultColumn(res_ref, res_data, res_offsets);
}
}
+ template
static void appendToResultColumn(const StringRef & res_ref, ColumnString::Chars & res_data, ColumnString::Offsets & res_offsets)
{
size_t res_offset = res_data.size();
res_data.resize(res_offset + res_ref.size + 1);
- memcpy(&res_data[res_offset], res_ref.data, res_ref.size);
+
+ if constexpr (padded)
+ memcpySmallAllowReadWriteOverflow15(&res_data[res_offset], res_ref.data, res_ref.size);
+ else
+ memcpy(&res_data[res_offset], res_ref.data, res_ref.size);
+
res_offset += res_ref.size;
res_data[res_offset] = 0;
++res_offset;
diff --git a/src/Functions/trim.cpp b/src/Functions/trim.cpp
index dd51c606ff7..1f0011b8e99 100644
--- a/src/Functions/trim.cpp
+++ b/src/Functions/trim.cpp
@@ -46,8 +46,8 @@ public:
ColumnString::Offsets & res_offsets)
{
size_t size = offsets.size();
- res_offsets.resize(size);
- res_data.reserve(data.size());
+ res_offsets.resize_exact(size);
+ res_data.reserve_exact(data.size());
size_t prev_offset = 0;
size_t res_offset = 0;
diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp
index 813546aa052..8823af55936 100644
--- a/src/IO/ReadBufferFromS3.cpp
+++ b/src/IO/ReadBufferFromS3.cpp
@@ -191,10 +191,14 @@ size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, cons
result = sendRequest(attempt, range_begin, range_begin + n - 1);
std::istream & istr = result->GetBody();
- copyFromIStreamWithProgressCallback(istr, to, n, progress_callback, &bytes_copied);
+ bool cancelled = false;
+ copyFromIStreamWithProgressCallback(istr, to, n, progress_callback, &bytes_copied, &cancelled);
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Bytes, bytes_copied);
+ if (cancelled)
+ return initial_n - n + bytes_copied;
+
if (read_settings.remote_throttler)
read_settings.remote_throttler->add(bytes_copied, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp
index 0db998c14fc..5f438a7e5f9 100644
--- a/src/Interpreters/HashJoin.cpp
+++ b/src/Interpreters/HashJoin.cpp
@@ -35,10 +35,17 @@
#include
#include
#include
+#include "Core/Joins.h"
+#include "Interpreters/TemporaryDataOnDisk.h"
#include
#include
+namespace CurrentMetrics
+{
+ extern const Metric TemporaryFilesForJoin;
+}
+
namespace DB
{
@@ -63,6 +70,7 @@ struct NotProcessedCrossJoin : public ExtraBlock
{
size_t left_position;
size_t right_block;
+ std::unique_ptr reader;
};
@@ -249,6 +257,10 @@ HashJoin::HashJoin(std::shared_ptr table_join_, const Block & right_s
, instance_id(instance_id_)
, asof_inequality(table_join->getAsofInequality())
, data(std::make_shared())
+ , tmp_data(
+ table_join_->getTempDataOnDisk()
+ ? std::make_unique(table_join_->getTempDataOnDisk(), CurrentMetrics::TemporaryFilesForJoin)
+ : nullptr)
, right_sample_block(right_sample_block_)
, max_joined_block_rows(table_join->maxJoinedBlockRows())
, instance_log_id(!instance_id_.empty() ? "(" + instance_id_ + ") " : "")
@@ -827,6 +839,21 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (shrink_blocks)
block_to_save = block_to_save.shrinkToFit();
+ size_t max_bytes_in_join = table_join->sizeLimits().max_bytes;
+ size_t max_rows_in_join = table_join->sizeLimits().max_rows;
+
+ if (kind == JoinKind::Cross && tmp_data
+ && (tmp_stream || (max_bytes_in_join && getTotalByteCount() + block_to_save.allocatedBytes() >= max_bytes_in_join)
+ || (max_rows_in_join && getTotalRowCount() + block_to_save.rows() >= max_rows_in_join)))
+ {
+ if (tmp_stream == nullptr)
+ {
+ tmp_stream = &tmp_data->createStream(right_sample_block);
+ }
+ tmp_stream->write(block_to_save);
+ return true;
+ }
+
size_t total_rows = 0;
size_t total_bytes = 0;
{
@@ -944,7 +971,6 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
shrinkStoredBlocksToFit(total_bytes);
-
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
@@ -2238,11 +2264,13 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
{
size_t start_left_row = 0;
size_t start_right_block = 0;
+ std::unique_ptr reader = nullptr;
if (not_processed)
{
auto & continuation = static_cast(*not_processed);
start_left_row = continuation.left_position;
start_right_block = continuation.right_block;
+ reader = std::move(continuation.reader);
not_processed.reset();
}
@@ -2271,18 +2299,12 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
size_t rows_left = block.rows();
size_t rows_added = 0;
-
for (size_t left_row = start_left_row; left_row < rows_left; ++left_row)
{
size_t block_number = 0;
- for (const Block & compressed_block_right : data->blocks)
+
+ auto process_right_block = [&](const Block & block_right)
{
- ++block_number;
- if (block_number < start_right_block)
- continue;
-
- auto block_right = compressed_block_right.decompress();
-
size_t rows_right = block_right.rows();
rows_added += rows_right;
@@ -2294,6 +2316,44 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
const IColumn & column_right = *block_right.getByPosition(col_num).column;
dst_columns[num_existing_columns + col_num]->insertRangeFrom(column_right, 0, rows_right);
}
+ };
+
+ for (const Block & compressed_block_right : data->blocks)
+ {
+ ++block_number;
+ if (block_number < start_right_block)
+ continue;
+
+ auto block_right = compressed_block_right.decompress();
+ process_right_block(block_right);
+ if (rows_added > max_joined_block_rows)
+ {
+ break;
+ }
+ }
+
+ if (tmp_stream && rows_added <= max_joined_block_rows)
+ {
+ if (reader == nullptr)
+ {
+ tmp_stream->finishWritingAsyncSafe();
+ reader = tmp_stream->getReadStream();
+ }
+ while (auto block_right = reader->read())
+ {
+ ++block_number;
+ process_right_block(block_right);
+ if (rows_added > max_joined_block_rows)
+ {
+ break;
+ }
+ }
+
+ /// It means, that reader->read() returned {}
+ if (rows_added <= max_joined_block_rows)
+ {
+ reader.reset();
+ }
}
start_right_block = 0;
@@ -2301,7 +2361,7 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
if (rows_added > max_joined_block_rows)
{
not_processed = std::make_shared(
- NotProcessedCrossJoin{{block.cloneEmpty()}, left_row, block_number + 1});
+ NotProcessedCrossJoin{{block.cloneEmpty()}, left_row, block_number + 1, std::move(reader)});
not_processed->block.swap(block);
break;
}
diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h
index 454f38ce08b..86db8943926 100644
--- a/src/Interpreters/HashJoin.h
+++ b/src/Interpreters/HashJoin.h
@@ -26,6 +26,7 @@
#include
#include
+#include
namespace DB
{
@@ -442,6 +443,10 @@ private:
RightTableDataPtr data;
std::vector key_sizes;
+ /// Needed to do external cross join
+ TemporaryDataOnDiskPtr tmp_data;
+ TemporaryFileStream* tmp_stream{nullptr};
+
/// Block with columns from the right-side table.
Block right_sample_block;
/// Block with columns from the right-side table except key columns.
diff --git a/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp b/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp
index 539d7a59f6f..d4af111eec0 100644
--- a/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp
+++ b/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp
@@ -5,6 +5,8 @@
#include
#include
#include
+#include
+#include
#include
@@ -38,22 +40,47 @@ namespace ErrorCodes
namespace
{
-ASTPtr normalizeAndValidateQuery(const ASTPtr & query)
+ASTPtr normalizeAndValidateQuery(const ASTPtr & query, const Names & column_names)
{
+ ASTPtr result_query;
+
if (query->as() || query->as())
- {
- return query;
- }
+ result_query = query;
else if (auto * subquery = query->as())
- {
- return subquery->children[0];
- }
+ result_query = subquery->children[0];
else
- {
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Expected ASTSelectWithUnionQuery or ASTSelectQuery. Actual {}",
query->formatForErrorMessage());
- }
+
+ if (column_names.empty())
+ return result_query;
+
+ /// The initial query the VIEW references to is wrapped here with another SELECT query to allow reading only necessary columns.
+ auto select_query = std::make_shared();
+
+ auto result_table_expression_ast = std::make_shared();
+ result_table_expression_ast->children.push_back(std::make_shared(std::move(result_query)));
+ result_table_expression_ast->subquery = result_table_expression_ast->children.back();
+
+ auto tables_in_select_query_element_ast = std::make_shared();
+ tables_in_select_query_element_ast->children.push_back(std::move(result_table_expression_ast));
+ tables_in_select_query_element_ast->table_expression = tables_in_select_query_element_ast->children.back();
+
+ ASTPtr tables_in_select_query_ast = std::make_shared();
+ tables_in_select_query_ast->children.push_back(std::move(tables_in_select_query_element_ast));
+
+ select_query->setExpression(ASTSelectQuery::Expression::TABLES, std::move(tables_in_select_query_ast));
+
+ auto projection_expression_list_ast = std::make_shared();
+ projection_expression_list_ast->children.reserve(column_names.size());
+
+ for (const auto & column_name : column_names)
+ projection_expression_list_ast->children.push_back(std::make_shared(column_name));
+
+ select_query->setExpression(ASTSelectQuery::Expression::SELECT, std::move(projection_expression_list_ast));
+
+ return select_query;
}
ContextMutablePtr buildContext(const ContextPtr & context, const SelectQueryOptions & select_query_options)
@@ -125,8 +152,9 @@ QueryTreeNodePtr buildQueryTreeAndRunPasses(const ASTPtr & query,
InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(
const ASTPtr & query_,
const ContextPtr & context_,
- const SelectQueryOptions & select_query_options_)
- : query(normalizeAndValidateQuery(query_))
+ const SelectQueryOptions & select_query_options_,
+ const Names & column_names)
+ : query(normalizeAndValidateQuery(query_, column_names))
, context(buildContext(context_, select_query_options_))
, select_query_options(select_query_options_)
, query_tree(buildQueryTreeAndRunPasses(query, select_query_options, context, nullptr /*storage*/))
@@ -138,8 +166,9 @@ InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(
const ASTPtr & query_,
const ContextPtr & context_,
const StoragePtr & storage_,
- const SelectQueryOptions & select_query_options_)
- : query(normalizeAndValidateQuery(query_))
+ const SelectQueryOptions & select_query_options_,
+ const Names & column_names)
+ : query(normalizeAndValidateQuery(query_, column_names))
, context(buildContext(context_, select_query_options_))
, select_query_options(select_query_options_)
, query_tree(buildQueryTreeAndRunPasses(query, select_query_options, context, storage_))
diff --git a/src/Interpreters/InterpreterSelectQueryAnalyzer.h b/src/Interpreters/InterpreterSelectQueryAnalyzer.h
index 2ad7e6a50f3..73c524cbe28 100644
--- a/src/Interpreters/InterpreterSelectQueryAnalyzer.h
+++ b/src/Interpreters/InterpreterSelectQueryAnalyzer.h
@@ -16,7 +16,8 @@ public:
/// Initialize interpreter with query AST
InterpreterSelectQueryAnalyzer(const ASTPtr & query_,
const ContextPtr & context_,
- const SelectQueryOptions & select_query_options_);
+ const SelectQueryOptions & select_query_options_,
+ const Names & column_names = {});
/** Initialize interpreter with query AST and storage.
* After query tree is built left most table expression is replaced with table node that
@@ -25,7 +26,8 @@ public:
InterpreterSelectQueryAnalyzer(const ASTPtr & query_,
const ContextPtr & context_,
const StoragePtr & storage_,
- const SelectQueryOptions & select_query_options_);
+ const SelectQueryOptions & select_query_options_,
+ const Names & column_names = {});
/** Initialize interpreter with query tree.
* No query tree passes are applied.
diff --git a/src/Interpreters/JoinedTables.cpp b/src/Interpreters/JoinedTables.cpp
index 5b549a19083..457ed3ef4a6 100644
--- a/src/Interpreters/JoinedTables.cpp
+++ b/src/Interpreters/JoinedTables.cpp
@@ -310,7 +310,7 @@ std::shared_ptr JoinedTables::makeTableJoin(const ASTSelectQuery & se
auto settings = context->getSettingsRef();
MultiEnum join_algorithm = settings.join_algorithm;
bool try_use_direct_join = join_algorithm.isSet(JoinAlgorithm::DIRECT) || join_algorithm.isSet(JoinAlgorithm::DEFAULT);
- auto table_join = std::make_shared(settings, context->getGlobalTemporaryVolume());
+ auto table_join = std::make_shared(settings, context->getGlobalTemporaryVolume(), context->getTempDataOnDisk());
const ASTTablesInSelectQueryElement * ast_join = select_query_.join();
const auto & table_to_join = ast_join->table_expression->as();
diff --git a/src/Interpreters/TableJoin.cpp b/src/Interpreters/TableJoin.cpp
index 48d59dd3b24..1ee8ca14b2f 100644
--- a/src/Interpreters/TableJoin.cpp
+++ b/src/Interpreters/TableJoin.cpp
@@ -103,7 +103,7 @@ bool forAllKeys(OnExpr & expressions, Func callback)
}
-TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
+TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_, TemporaryDataOnDiskScopePtr tmp_data_)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
, default_max_bytes(settings.default_max_bytes_in_join)
, join_use_nulls(settings.join_use_nulls)
@@ -117,6 +117,7 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
, temporary_files_codec(settings.temporary_files_codec)
, max_memory_usage(settings.max_memory_usage)
, tmp_volume(tmp_volume_)
+ , tmp_data(tmp_data_)
{
}
diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h
index 88905edd3e8..8e83233e54c 100644
--- a/src/Interpreters/TableJoin.h
+++ b/src/Interpreters/TableJoin.h
@@ -9,6 +9,7 @@
#include
#include
#include
+#include
#include
#include
@@ -188,6 +189,8 @@ private:
VolumePtr tmp_volume;
+ TemporaryDataOnDiskScopePtr tmp_data;
+
std::shared_ptr right_storage_join;
std::shared_ptr right_kv_storage;
@@ -233,7 +236,7 @@ private:
public:
TableJoin() = default;
- TableJoin(const Settings & settings, VolumePtr tmp_volume_);
+ TableJoin(const Settings & settings, VolumePtr tmp_volume_, TemporaryDataOnDiskScopePtr tmp_data_);
/// for StorageJoin
TableJoin(SizeLimits limits, bool use_nulls, JoinKind kind, JoinStrictness strictness,
@@ -259,6 +262,8 @@ public:
VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }
+ TemporaryDataOnDiskScopePtr getTempDataOnDisk() { return tmp_data; }
+
ActionsDAGPtr createJoinedBlockActions(ContextPtr context) const;
const std::vector & getEnabledJoinAlgorithms() const { return join_algorithm; }
diff --git a/src/Interpreters/TemporaryDataOnDisk.cpp b/src/Interpreters/TemporaryDataOnDisk.cpp
index 9a237738b3e..a74b5bba2b9 100644
--- a/src/Interpreters/TemporaryDataOnDisk.cpp
+++ b/src/Interpreters/TemporaryDataOnDisk.cpp
@@ -1,12 +1,11 @@
+#include
+#include
#include
#include
-#include
#include
-#include
#include
#include
-#include
#include
#include
#include
@@ -14,6 +13,7 @@
#include
#include
+#include "Common/Exception.h"
namespace ProfileEvents
{
@@ -224,33 +224,26 @@ struct TemporaryFileStream::OutputWriter
bool finalized = false;
};
-struct TemporaryFileStream::InputReader
+TemporaryFileStream::Reader::Reader(const String & path, const Block & header_, size_t size)
+ : in_file_buf(path, size ? std::min(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
+ , in_compressed_buf(in_file_buf)
+ , in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION)
{
- InputReader(const String & path, const Block & header_, size_t size = 0)
- : in_file_buf(path, size ? std::min(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
- , in_compressed_buf(in_file_buf)
- , in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION)
- {
- LOG_TEST(getLogger("TemporaryFileStream"), "Reading {} from {}", header_.dumpStructure(), path);
- }
+ LOG_TEST(getLogger("TemporaryFileStream"), "Reading {} from {}", header_.dumpStructure(), path);
+}
- explicit InputReader(const String & path, size_t size = 0)
- : in_file_buf(path, size ? std::min(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
- , in_compressed_buf(in_file_buf)
- , in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION)
- {
- LOG_TEST(getLogger("TemporaryFileStream"), "Reading from {}", path);
- }
+TemporaryFileStream::Reader::Reader(const String & path, size_t size)
+ : in_file_buf(path, size ? std::min(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
+ , in_compressed_buf(in_file_buf)
+ , in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION)
+{
+ LOG_TEST(getLogger("TemporaryFileStream"), "Reading from {}", path);
+}
- Block read()
- {
- return in_reader.read();
- }
-
- ReadBufferFromFile in_file_buf;
- CompressedReadBuffer in_compressed_buf;
- NativeReader in_reader;
-};
+Block TemporaryFileStream::Reader::read()
+{
+ return in_reader.read();
+}
TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_)
: parent(parent_)
@@ -310,6 +303,12 @@ TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
return stat;
}
+TemporaryFileStream::Stat TemporaryFileStream::finishWritingAsyncSafe()
+{
+ std::call_once(finish_writing, [this]{ finishWriting(); });
+ return stat;
+}
+
bool TemporaryFileStream::isWriteFinished() const
{
assert(in_reader == nullptr || out_writer == nullptr);
@@ -326,7 +325,7 @@ Block TemporaryFileStream::read()
if (!in_reader)
{
- in_reader = std::make_unique(getPath(), header, getSize());
+ in_reader = std::make_unique(getPath(), header, getSize());
}
Block block = in_reader->read();
@@ -338,6 +337,17 @@ Block TemporaryFileStream::read()
return block;
}
+std::unique_ptr TemporaryFileStream::getReadStream()
+{
+ if (!isWriteFinished())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
+
+ if (isEof())
+ return nullptr;
+
+ return std::make_unique(getPath(), header, getSize());
+}
+
void TemporaryFileStream::updateAllocAndCheck()
{
assert(out_writer);
diff --git a/src/Interpreters/TemporaryDataOnDisk.h b/src/Interpreters/TemporaryDataOnDisk.h
index 40100a62b44..488eed70da9 100644
--- a/src/Interpreters/TemporaryDataOnDisk.h
+++ b/src/Interpreters/TemporaryDataOnDisk.h
@@ -1,7 +1,12 @@
#pragma once
+#include
+#include
#include
+#include
+#include
+#include
#include
#include
#include
@@ -132,12 +137,25 @@ private:
/*
* Data can be written into this stream and then read.
- * After finish writing, call `finishWriting` and then `read` to read the data.
+ * After finish writing, call `finishWriting` and then either call `read` or 'getReadStream'(only one of the two) to read the data.
* Account amount of data written to disk in parent scope.
*/
class TemporaryFileStream : boost::noncopyable
{
public:
+ struct Reader
+ {
+ Reader(const String & path, const Block & header_, size_t size = 0);
+
+ explicit Reader(const String & path, size_t size = 0);
+
+ Block read();
+
+ ReadBufferFromFile in_file_buf;
+ CompressedReadBuffer in_compressed_buf;
+ NativeReader in_reader;
+ };
+
struct Stat
{
/// Statistics for file
@@ -154,8 +172,11 @@ public:
void flush();
Stat finishWriting();
+ Stat finishWritingAsyncSafe();
bool isWriteFinished() const;
+ std::unique_ptr getReadStream();
+
Block read();
String getPath() const;
@@ -184,11 +205,12 @@ private:
Stat stat;
+ std::once_flag finish_writing;
+
struct OutputWriter;
std::unique_ptr out_writer;
- struct InputReader;
- std::unique_ptr in_reader;
+ std::unique_ptr in_reader;
};
}
diff --git a/src/Planner/CollectSets.cpp b/src/Planner/CollectSets.cpp
index d62ad83c6b2..52a0d748d63 100644
--- a/src/Planner/CollectSets.cpp
+++ b/src/Planner/CollectSets.cpp
@@ -36,6 +36,12 @@ public:
void visitImpl(const QueryTreeNodePtr & node)
{
+ if (const auto * constant_node = node->as())
+ /// Collect sets from source expression as well.
+ /// Most likely we will not build them, but those sets could be requested during analysis.
+ if (constant_node->hasSourceExpression())
+ collectSets(constant_node->getSourceExpression(), planner_context);
+
auto * function_node = node->as();
if (!function_node || !isNameOfInFunction(function_node->getFunctionName()))
return;
diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp
index c9c57233fb0..1b2a55a50b0 100644
--- a/src/Planner/PlannerJoinTree.cpp
+++ b/src/Planner/PlannerJoinTree.cpp
@@ -1207,7 +1207,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
}
}
- auto table_join = std::make_shared(settings, query_context->getGlobalTemporaryVolume());
+ auto table_join = std::make_shared(settings, query_context->getGlobalTemporaryVolume(), query_context->getTempDataOnDisk());
table_join->getTableJoin() = join_node.toASTTableJoin()->as();
if (join_constant)
diff --git a/src/Planner/PlannerJoins.cpp b/src/Planner/PlannerJoins.cpp
index bd8940b96d8..c410b04f209 100644
--- a/src/Planner/PlannerJoins.cpp
+++ b/src/Planner/PlannerJoins.cpp
@@ -328,7 +328,7 @@ void buildJoinClause(
{
throw Exception(
ErrorCodes::INVALID_JOIN_ON_EXPRESSION,
- "JOIN {} join expression contains column from left and right table",
+ "JOIN {} join expression contains column from left and right table, you may try experimental support of this feature by `SET allow_experimental_join_condition = 1`",
join_node.formatASTForErrorMessage());
}
}
@@ -363,7 +363,7 @@ void buildJoinClause(
{
throw Exception(
ErrorCodes::INVALID_JOIN_ON_EXPRESSION,
- "JOIN {} join expression contains column from left and right table",
+ "JOIN {} join expression contains column from left and right table, you may try experimental support of this feature by `SET allow_experimental_join_condition = 1`",
join_node.formatASTForErrorMessage());
}
}
diff --git a/src/Processors/Formats/Impl/NpyOutputFormat.cpp b/src/Processors/Formats/Impl/NpyOutputFormat.cpp
new file mode 100644
index 00000000000..e02787b4f70
--- /dev/null
+++ b/src/Processors/Formats/Impl/NpyOutputFormat.cpp
@@ -0,0 +1,269 @@
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int TOO_MANY_COLUMNS;
+ extern const int BAD_ARGUMENTS;
+ extern const int ILLEGAL_COLUMN;
+}
+
+namespace
+{
+
+template
+void writeNumpyNumbers(const ColumnPtr & column, WriteBuffer & buf)
+{
+ const auto * number_column = assert_cast