Merge branch 'ClickHouse:master' into hotfix-72756

This commit is contained in:
Andrey Zvonov 2024-12-06 13:43:26 +01:00 committed by GitHub
commit 9b0f33b897
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
545 changed files with 8806 additions and 3370 deletions

View File

@ -43,7 +43,6 @@ Keep an eye out for upcoming meetups and events around the world. Somewhere else
Upcoming meetups
* [Amsterdam Meetup](https://www.meetup.com/clickhouse-netherlands-user-group/events/303638814) - December 3
* [Stockholm Meetup](https://www.meetup.com/clickhouse-stockholm-user-group/events/304382411) - December 9
* [New York Meetup](https://www.meetup.com/clickhouse-new-york-user-group/events/304268174) - December 9
* [Kuala Lampur Meetup](https://www.meetup.com/clickhouse-malaysia-meetup-group/events/304576472/) - December 11
@ -52,6 +51,7 @@ Upcoming meetups
Recently completed meetups
* [Amsterdam Meetup](https://www.meetup.com/clickhouse-netherlands-user-group/events/303638814) - December 3
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26
* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19
* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit ce6de271811899d587fc28b500041ebcf720014f
Subproject commit c11f7fce68737cdc67a1d61678b2717d617ebb5a

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit d5450d76abda556ce145ddabe7e0cc6a7644ec59
Subproject commit 71169aeec91b41c1bd5cf78fad6158dacdcde9d5

View File

@ -0,0 +1,26 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.10.3.21-stable (e668b927efb) FIXME as compared to v24.10.2.80-stable (96b80057159)
#### Improvement
* Backported in [#72100](https://github.com/ClickHouse/ClickHouse/issues/72100): Fix the metadata_version record in ZooKeeper in restarting thread rather than in attach thread. [#70297](https://github.com/ClickHouse/ClickHouse/pull/70297) ([Miсhael Stetsyuk](https://github.com/mstetsyuk)).
* Backported in [#72169](https://github.com/ClickHouse/ClickHouse/issues/72169): Forbid Dynamic/Variant types in min/max functions to avoid confusion. [#71761](https://github.com/ClickHouse/ClickHouse/pull/71761) ([Pavel Kruglov](https://github.com/Avogar)).
* Backported in [#72064](https://github.com/ClickHouse/ClickHouse/issues/72064): When retrieving data directly from a dictionary using Dictionary storage, dictionary table function, or direct SELECT from the dictionary itself, it is now enough to have `SELECT` permission or `dictGet` permission for the dictionary. This aligns with previous attempts to prevent ACL bypasses: https://github.com/ClickHouse/ClickHouse/pull/57362 and https://github.com/ClickHouse/ClickHouse/pull/65359. It also makes the latter one backward compatible. [#72051](https://github.com/ClickHouse/ClickHouse/pull/72051) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Backported in [#72144](https://github.com/ClickHouse/ClickHouse/issues/72144): Acquiring zero-copy shared lock before moving a part to zero-copy disk to prevent possible data loss if Keeper is unavailable. [#71845](https://github.com/ClickHouse/ClickHouse/pull/71845) ([Aleksei Filatov](https://github.com/aalexfvk)).
* Backported in [#72088](https://github.com/ClickHouse/ClickHouse/issues/72088): Fix rows_processed column in system.s3/azure_queue_log broken in 24.6. Closes [#69975](https://github.com/ClickHouse/ClickHouse/issues/69975). [#71946](https://github.com/ClickHouse/ClickHouse/pull/71946) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Backported in [#72036](https://github.com/ClickHouse/ClickHouse/issues/72036): Fix `Illegal type` error for `MergeTree` tables with binary monotonic function in `ORDER BY` when the first argument is constant. Fixes [#71941](https://github.com/ClickHouse/ClickHouse/issues/71941). [#71966](https://github.com/ClickHouse/ClickHouse/pull/71966) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Backported in [#72207](https://github.com/ClickHouse/ClickHouse/issues/72207): Fixed incorrect settings order `max_parser_depth` and `max_parser_backtracks`. [#71498](https://github.com/ClickHouse/ClickHouse/pull/71498) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).
* Backported in [#72071](https://github.com/ClickHouse/ClickHouse/issues/72071): Fix client syntax highlighting that was broken in https://github.com/ClickHouse/ClickHouse/pull/71949. [#72049](https://github.com/ClickHouse/ClickHouse/pull/72049) ([Nikolay Degterinsky](https://github.com/evillique)).
* Backported in [#72095](https://github.com/ClickHouse/ClickHouse/issues/72095): Minor improvement for system.query_metric_log stateless test. [#72076](https://github.com/ClickHouse/ClickHouse/pull/72076) ([Pablo Marcos](https://github.com/pamarcos)).
* Backported in [#72184](https://github.com/ClickHouse/ClickHouse/issues/72184): Add google-cloud-cpp submodule. [#72092](https://github.com/ClickHouse/ClickHouse/pull/72092) ([Pablo Marcos](https://github.com/pamarcos)).

View File

@ -6,6 +6,14 @@ sidebar_label: Iceberg
# Iceberg Table Engine
:::warning
We recommend using the [Iceberg Table Function](/docs/en/sql-reference/table-functions/iceberg.md) for working with Iceberg data in ClickHouse. The Iceberg Table Function currently provides sufficient functionality, offering a partial read-only interface for Iceberg tables.
The Iceberg Table Engine is available but may have limitations. ClickHouse wasn't originally designed to support tables with externally changing schemas, which can affect the functionality of the Iceberg Table Engine. As a result, some features that work with regular tables may be unavailable or may not function correctly, especially when using the old analyzer.
For optimal compatibility, we suggest using the Iceberg Table Function while we continue to improve support for the Iceberg Table Engine.
:::
This engine provides a read-only integration with existing Apache [Iceberg](https://iceberg.apache.org/) tables in Amazon S3, Azure, HDFS and locally stored tables.
## Create Table
@ -63,6 +71,16 @@ CREATE TABLE iceberg_table ENGINE=IcebergS3(iceberg_conf, filename = 'test_table
Table engine `Iceberg` is an alias to `IcebergS3` now.
**Schema Evolution**
At the moment, with the help of CH, you can read iceberg tables, the schema of which has changed over time. We currently support reading tables where columns have been added and removed, and their order has changed. You can also change a column where a value is required to one where NULL is allowed. Additionally, we support permitted type casting for simple types, namely:  
* int -> long
* float -> double
* decimal(P, S) -> decimal(P', S) where P' > P.
Currently, it is not possible to change nested structures or the types of elements within arrays and maps.
To read a table where the schema has changed after its creation with dynamic schema inference, set allow_dynamic_metadata_for_data_lakes = true when creating the table.
### Data cache {#data-cache}
`Iceberg` table engine and table function support data caching same as `S3`, `AzureBlobStorage`, `HDFS` storages. See [here](../../../engines/table-engines/integrations/s3.md#data-cache).

View File

@ -7,6 +7,7 @@ Contains information about setting changes in previous ClickHouse versions.
Columns:
- `type` ([Enum](../../sql-reference/data-types/enum.md)) - The settings type: `Core` (general / query settings), `MergeTree`.
- `version` ([String](../../sql-reference/data-types/string.md)) — The ClickHouse version in which settings were changed
- `changes` ([Array](../../sql-reference/data-types/array.md) of [Tuple](../../sql-reference/data-types/tuple.md)) — A description of the setting changes: (setting name, previous value, new value, reason for the change)
@ -22,6 +23,7 @@ FORMAT Vertical
``` text
Row 1:
──────
type: Core
version: 23.5
changes: [('input_format_parquet_preserve_order','1','0','Allow Parquet reader to reorder rows for better parallelism.'),('parallelize_output_from_storages','0','1','Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows.'),('use_with_fill_by_sorting_prefix','0','1','Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently'),('output_format_parquet_compliant_nested_types','0','1','Change an internal field name in output Parquet file schema.')]
```

View File

@ -786,6 +786,24 @@ SELECT indexOf([1, 3, NULL, NULL], NULL)
Elements set to `NULL` are handled as normal values.
## indexOfAssumeSorted(arr, x)
Returns the index of the first x element (starting from 1) if it is in the array, or 0 if it is not.
The function should be used for an array sorted not in descending order since binary search is used for the search.
If the internal array type is Nullable, the indexOf function will be used.
Example:
``` sql
SELECT indexOfAssumeSorted([1, 3, 3, 3, 4, 4, 5], 4)
```
``` text
┌─indexOf([1, 3, 3, 3, 4, 4, 5], NULL)─┐
│ 5 │
└──────────────────────────────────--─-┘
```
## arrayCount(\[func,\] arr1, ...)
Returns the number of elements for which `func(arr1[i], ..., arrN[i])` returns something other than 0. If `func` is not specified, it returns the number of non-zero elements in the array.

View File

@ -253,7 +253,7 @@ SELECT format('{} {}', 'Hello', 'World')
## translate
Replaces characters in the string `s` using a one-to-one character mapping defined by `from` and `to` strings. `from` and `to` must be constant ASCII strings of the same size. Non-ASCII characters in the original string are not modified.
Replaces characters in the string `s` using a one-to-one character mapping defined by `from` and `to` strings. `from` and `to` must be constant ASCII strings. Non-ASCII characters in the original string are not modified. If the number of characters in `from` list is larger than the `to` list, non overlapping characters will be deleted from the input string.
**Syntax**

View File

@ -7228,6 +7228,45 @@ Result:
└──────────────────────────────┘
```
## toUnixTimestamp64Second
Converts a `DateTime64` to a `Int64` value with fixed second precision. The input value is scaled up or down appropriately depending on its precision.
:::note
The output value is a timestamp in UTC, not in the timezone of `DateTime64`.
:::
**Syntax**
```sql
toUnixTimestamp64Second(value)
```
**Arguments**
- `value` — DateTime64 value with any precision. [DateTime64](../data-types/datetime64.md).
**Returned value**
- `value` converted to the `Int64` data type. [Int64](../data-types/int-uint.md).
**Example**
Query:
```sql
WITH toDateTime64('2009-02-13 23:31:31.011', 3, 'UTC') AS dt64
SELECT toUnixTimestamp64Second(dt64);
```
Result:
```response
┌─toUnixTimestamp64Second(dt64)─┐
│ 1234567891 │
└───────────────────────────────┘
```
## toUnixTimestamp64Micro
Converts a `DateTime64` to a `Int64` value with fixed microsecond precision. The input value is scaled up or down appropriately depending on its precision.

View File

@ -154,7 +154,7 @@ This feature is deprecated and will be removed in the future.
For your convenience, the old documentation is located [here](https://pastila.nl/?00f32652/fdf07272a7b54bda7e13b919264e449f.md)
## Refreshable Materialized View [Experimental] {#refreshable-materialized-view}
## Refreshable Materialized View {#refreshable-materialized-view}
```sql
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name

View File

@ -65,6 +65,14 @@ SELECT * FROM icebergS3(iceberg_conf, filename = 'test_table')
DESCRIBE icebergS3(iceberg_conf, filename = 'test_table')
```
**Schema Evolution**
At the moment, with the help of CH, you can read iceberg tables, the schema of which has changed over time. We currently support reading tables where columns have been added and removed, and their order has changed. You can also change a column where a value is required to one where NULL is allowed. Additionally, we support permitted type casting for simple types, namely:  
* int -> long
* float -> double
* decimal(P, S) -> decimal(P', S) where P' > P.
Currently, it is not possible to change nested structures or the types of elements within arrays and maps.
**Aliases**
Table function `iceberg` is an alias to `icebergS3` now.

View File

@ -785,6 +785,24 @@ SELECT indexOf([1, 3, NULL, NULL], NULL)
`NULL` に設定された要素は通常の値として扱われます。
# indexOfAssumeSorted(arr, x)
配列内にある場合は最初の'x'要素(1から始まる)のインデックスを返し、そうでない場合は0を返します。
この関数は、バイナリ検索が検索に使用されるため、降順ではなくソートされた配列に使用する必要があります。
内部配列型がNull許容の場合は、indexOf関数が使用されます
例:
``` sql
SELECT indexOfAssumeSorted([1, 3, 3, 3, 4, 4, 5], 4)
```
``` text
┌─indexOf([1, 3, 3, 3, 4, 4, 5], NULL)─┐
│ 5 │
└──────────────────────────────────--─-┘
```
## arrayCount(\[func,\] arr1, ...)
`func(arr1[i], ..., arrN[i])`が0以外の値を返す要素の数を返します。`func` が指定されていない場合、配列内の0以外の要素の数を返します。

View File

@ -306,6 +306,24 @@ SELECT indexOf([1, 3, NULL, NULL], NULL)
└───────────────────────────────────┘
```
## indexOfAssumeSorted(arr, x)
Возвращает индекс первого элемента x (начиная с 1), если он есть в массиве, или 0, если его нет.
Функция должна использоваться, если массив отсортирован в неубывающем порядке, так как используется бинарный поиск.
Если внутренний тип Nullable, то будет использована функция indexOf.
Пример:
``` sql
SELECT indexOfAssumeSorted([1, 3, 3, 3, 4, 4, 5], 4)
```
``` text
┌─indexOf([1, 3, 3, 3, 4, 4, 5], NULL)─┐
│ 5 │
└──────────────────────────────────--─-┘
```
Элементы, равные `NULL`, обрабатываются как обычные значения.
## arrayCount(\[func,\] arr1, ...) {#array-count}

View File

@ -337,6 +337,24 @@ SELECT indexOf([1, 3, NULL, NULL], NULL)
设置为«NULL»的元素将作为普通的元素值处理。
## indexOfAssumeSorted(arr, x)
返回数组中第一个x元素的索引从1开始如果x元素不存在在数组中则返回0.
该函数应用于不按降序排序的数组,因为二进制搜索用于搜索。
如果内部数组类型为空则将使用indexOf函数。
示例:
``` sql
SELECT indexOfAssumeSorted([1, 3, 3, 3, 4, 4, 5], 4)
```
``` text
┌─indexOf([1, 3, 3, 3, 4, 4, 5], NULL)─┐
│ 5 │
└──────────────────────────────────--─-┘
```
## arrayCount(\[func,\] arr1, ...) {#array-count}
`func`将arr数组作为参数其返回结果为非零值的数量。如果未指定“func”则返回数组中非零元素的数量。

View File

@ -35,7 +35,7 @@ public:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "cannot remove '{}': Is a directory", path);
}
disk.getDisk()->removeRecursive(path);
disk.getDisk()->removeRecursiveWithLimit(path);
}
else if (disk.getDisk()->existsFile(path))
{

View File

@ -277,10 +277,10 @@ int mainEntryClickHouseFormat(int argc, char ** argv)
{
WriteBufferFromOwnString str_buf;
bool oneline_current_query = oneline || approx_query_length < max_line_length;
IAST::FormatSettings settings(str_buf, oneline_current_query, hilite);
IAST::FormatSettings settings(oneline_current_query, hilite);
settings.show_secrets = true;
settings.print_pretty_type_names = !oneline_current_query;
res->format(settings);
res->format(str_buf, settings);
if (insert_query_payload)
{
@ -324,10 +324,10 @@ int mainEntryClickHouseFormat(int argc, char ** argv)
{
WriteBufferFromOwnString str_buf;
bool oneline_current_query = oneline || approx_query_length < max_line_length;
IAST::FormatSettings settings(str_buf, oneline_current_query, hilite);
IAST::FormatSettings settings(oneline_current_query, hilite);
settings.show_secrets = true;
settings.print_pretty_type_names = !oneline_current_query;
res->format(settings);
res->format(str_buf, settings);
auto res_string = str_buf.str();
WriteBufferFromOStream res_cout(std::cout, 4096);

View File

@ -15,6 +15,7 @@
#include <Common/logger_useful.h>
#include <base/phdr_cache.h>
#include <Common/ErrorHandlers.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <base/getMemoryAmount.h>
#include <base/getAvailableMemoryAmount.h>
#include <base/errnoToString.h>
@ -295,6 +296,7 @@ namespace CurrentMetrics
extern const Metric MergesMutationsMemoryTracking;
extern const Metric MaxDDLEntryID;
extern const Metric MaxPushedDDLEntryID;
extern const Metric StartupScriptsExecutionState;
}
namespace ProfileEvents
@ -365,6 +367,14 @@ namespace ErrorCodes
}
enum StartupScriptsExecutionState : CurrentMetrics::Value
{
NotFinished = 0,
Success = 1,
Failure = 2,
};
static std::string getCanonicalPath(std::string && path)
{
Poco::trimInPlace(path);
@ -781,9 +791,12 @@ void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, Contex
startup_context->makeQueryContext();
executeQuery(read_buffer, write_buffer, true, startup_context, callback, QueryFlags{ .internal = true }, std::nullopt, {});
}
CurrentMetrics::set(CurrentMetrics::StartupScriptsExecutionState, StartupScriptsExecutionState::Success);
}
catch (...)
{
CurrentMetrics::set(CurrentMetrics::StartupScriptsExecutionState, StartupScriptsExecutionState::Failure);
tryLogCurrentException(log, "Failed to parse startup scripts file");
}
}
@ -924,6 +937,8 @@ try
registerRemoteFileMetadatas();
registerSchedulerNodes();
QueryPlanStepRegistry::registerPlanSteps();
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());

View File

@ -8,7 +8,6 @@
#include <Interpreters/AggregationCommon.h>
#include <Common/HashTable/HashSet.h>
#include <Common/HashTable/HashMap.h>
#include <Common/SipHash.h>
#include <IO/ReadHelpersArena.h>

View File

@ -14,6 +14,7 @@
#include <Common/Exception.h>
#include <Common/ThreadPool_fwd.h>
#include <IO/ReadBuffer.h>
#include "config.h"
#include <cstddef>
@ -176,11 +177,15 @@ public:
/// Serializes state (to transmit it over the network, for example).
virtual void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version = std::nullopt) const = 0; /// NOLINT
/// Devirtualize serialize call.
virtual void serializeBatch(const PaddedPODArray<AggregateDataPtr> & data, size_t start, size_t size, WriteBuffer & buf, std::optional<size_t> version = std::nullopt) const = 0; /// NOLINT
/// Deserializes state. This function is called only for empty (just created) states.
virtual void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> version = std::nullopt, Arena * arena = nullptr) const = 0; /// NOLINT
/// Devirtualize create and deserialize calls. Used in deserialization of ColumnAggregateFunction.
virtual void createAndDeserializeBatch(PaddedPODArray<AggregateDataPtr> & data, AggregateDataPtr __restrict place, size_t total_size_of_state, size_t limit, ReadBuffer & buf, std::optional<size_t> version, Arena * arena) const = 0;
/// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()).
virtual bool allocatesMemoryInArena() const = 0;
@ -479,6 +484,37 @@ public:
static_cast<const Derived *>(this)->serialize(data[i], buf, version);
}
void createAndDeserializeBatch(
PaddedPODArray<AggregateDataPtr> & data,
AggregateDataPtr __restrict place,
size_t total_size_of_state,
size_t limit,
ReadBuffer & buf,
std::optional<size_t> version,
Arena * arena) const override
{
for (size_t i = 0; i < limit; ++i)
{
if (buf.eof())
break;
static_cast<const Derived *>(this)->create(place);
try
{
static_cast<const Derived *>(this)->deserialize(place, buf, version, arena);
}
catch (...)
{
static_cast<const Derived *>(this)->destroy(place);
throw;
}
data.push_back(place);
place += total_size_of_state;
}
}
void addBatchSparse(
size_t row_begin,
size_t row_end,

View File

@ -8,6 +8,7 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Common/assert_cast.h>
#include <Common/SipHash.h>
namespace DB
{

View File

@ -9,6 +9,7 @@
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Common/assert_cast.h>
#include <Common/SipHash.h>
namespace DB
{

View File

@ -3,12 +3,8 @@
#include <Analyzer/HashUtils.h>
#include <Analyzer/IQueryTreeNode.h>
#include <Common/SipHash.h>
#include <Interpreters/Context_fwd.h>
#include <unordered_set>
namespace DB::Analyzer
{

View File

@ -1,6 +1,7 @@
#include <Analyzer/QueryTreeBuilder.h>
#include <Common/FieldVisitorToString.h>
#include <Common/quoteString.h>
#include <DataTypes/FieldToDataType.h>
#include <Parsers/ParserSelectWithUnionQuery.h>

View File

@ -393,7 +393,7 @@ QueryTreeNodePtr IdentifierResolver::wrapExpressionNodeInTupleElement(QueryTreeN
/// Resolve identifier functions implementation
/// Try resolve table identifier from database catalog
QueryTreeNodePtr IdentifierResolver::tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context)
std::shared_ptr<TableNode> IdentifierResolver::tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context)
{
size_t parts_size = table_identifier.getPartsSize();
if (parts_size < 1 || parts_size > 2)
@ -443,6 +443,11 @@ QueryTreeNodePtr IdentifierResolver::tryResolveTableIdentifierFromDatabaseCatalo
if (!storage)
return {};
if (storage->hasExternalDynamicMetadata())
{
storage->updateExternalDynamicMetadata(context);
}
if (!storage_lock)
storage_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]);
auto storage_snapshot = storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context);

View File

@ -21,6 +21,7 @@ class QueryExpressionsAliasVisitor ;
class QueryNode;
class JoinNode;
class ColumnNode;
class TableNode;
using ProjectionName = String;
using ProjectionNames = std::vector<ProjectionName>;
@ -86,7 +87,7 @@ public:
/// Resolve identifier functions
static QueryTreeNodePtr tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context);
static std::shared_ptr<TableNode> tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context);
QueryTreeNodePtr tryResolveIdentifierFromCompoundExpression(const Identifier & expression_identifier,
size_t identifier_bind_size,

View File

@ -3470,11 +3470,8 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
auto set = std::make_shared<Set>(size_limits_for_set, 0, settings[Setting::transform_null_in]);
set->setHeader(result_block.cloneEmpty().getColumnsWithTypeAndName());
set->insertFromBlock(result_block.getColumnsWithTypeAndName());
set->finishInsert();
auto future_set = std::make_shared<FutureSetFromStorage>(std::move(set));
auto hash = function_arguments[1]->getTreeHash();
auto future_set = std::make_shared<FutureSetFromTuple>(hash, std::move(result_block), settings[Setting::transform_null_in], size_limits_for_set);
/// Create constant set column for constant folding

View File

@ -62,7 +62,7 @@ size_t getCompoundTypeDepth(const IDataType & type)
}
template <typename Collection>
Block createBlockFromCollection(const Collection & collection, const DataTypes& value_types, const DataTypes & block_types, bool transform_null_in)
ColumnsWithTypeAndName createBlockFromCollection(const Collection & collection, const DataTypes& value_types, const DataTypes & block_types, bool transform_null_in)
{
assert(collection.size() == value_types.size());
size_t columns_size = block_types.size();
@ -132,16 +132,19 @@ Block createBlockFromCollection(const Collection & collection, const DataTypes&
columns[i]->insert(tuple_values[i]);
}
Block res;
ColumnsWithTypeAndName res(columns_size);
for (size_t i = 0; i < columns_size; ++i)
res.insert(ColumnWithTypeAndName{std::move(columns[i]), block_types[i], "argument_" + toString(i)});
{
res[i].type = block_types[i];
res[i].column = std::move(columns[i]);
}
return res;
}
}
Block getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in)
ColumnsWithTypeAndName getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in)
{
DataTypes set_element_types = {expression_type};
const auto * lhs_tuple_type = typeid_cast<const DataTypeTuple *>(expression_type.get());
@ -158,7 +161,7 @@ Block getSetElementsForConstantValue(const DataTypePtr & expression_type, const
size_t lhs_type_depth = getCompoundTypeDepth(*expression_type);
size_t rhs_type_depth = getCompoundTypeDepth(*value_type);
Block result_block;
ColumnsWithTypeAndName result_block;
if (lhs_type_depth == rhs_type_depth)
{

View File

@ -19,6 +19,6 @@ using SetPtr = std::shared_ptr<Set>;
* Example: SELECT id FROM test_table WHERE id IN (1, 2, 3, 4);
* Example: SELECT id FROM test_table WHERE id IN ((1, 2), (3, 4));
*/
Block getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in);
ColumnsWithTypeAndName getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in);
}

View File

@ -1,5 +1,7 @@
#include <Analyzer/TableFunctionNode.h>
#include <Common/SipHash.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>

View File

@ -11,6 +11,7 @@
#include <Interpreters/Context.h>
#include <Core/Settings.h>
#include <Common/SipHash.h>
namespace DB
{

View File

@ -6,6 +6,7 @@
#include <Backups/BackupUtils.h>
#include <Backups/DDLAdjustingForBackupVisitor.h>
#include <Backups/IBackupCoordination.h>
#include <Common/quoteString.h>
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>

View File

@ -348,11 +348,11 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, const Setting
{
output_stream << std::endl;
WriteBufferFromOStream res_buf(output_stream, 4096);
IAST::FormatSettings format_settings(res_buf, /* one_line */ false);
IAST::FormatSettings format_settings(/* one_line */ false);
format_settings.hilite = true;
format_settings.show_secrets = true;
format_settings.print_pretty_type_names = true;
res->format(format_settings);
res->format(res_buf, format_settings);
res_buf.finalize();
output_stream << std::endl << std::endl;
}

View File

@ -198,6 +198,9 @@ public:
/// Create copy of this column, but with recursively_convert_result_to_full_column_if_low_cardinality = true
ColumnPtr recursivelyConvertResultToFullColumnIfLowCardinality() const;
const FunctionBasePtr & getFunction() const { return function; }
const ColumnsWithTypeAndName & getCapturedColumns() const { return captured_columns; }
private:
size_t elements_size;
FunctionBasePtr function;

View File

@ -5,6 +5,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
#include <Common/Arena.h>
#include <Common/SipHash.h>
namespace DB
{

View File

@ -7,6 +7,7 @@
#include <Common/Arena.h>
#include <Common/HashTable/StringHashSet.h>
#include <Common/HashTable/Hash.h>
#include <Common/SipHash.h>
#include <Common/WeakHash.h>
#include <Common/assert_cast.h>
#include <Common/memcmpSmall.h>
@ -695,4 +696,19 @@ void ColumnString::validate() const
last_offset, chars.size());
}
void ColumnString::updateHashWithValue(size_t n, SipHash & hash) const
{
size_t string_size = sizeAt(n);
size_t offset = offsetAt(n);
hash.update(reinterpret_cast<const char *>(&string_size), sizeof(string_size));
hash.update(reinterpret_cast<const char *>(&chars[offset]), string_size);
}
void ColumnString::updateHashFast(SipHash & hash) const
{
hash.update(reinterpret_cast<const char *>(offsets.data()), offsets.size() * sizeof(offsets[0]));
hash.update(reinterpret_cast<const char *>(chars.data()), chars.size() * sizeof(chars[0]));
}
}

View File

@ -5,7 +5,6 @@
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Common/PODArray.h>
#include <Common/SipHash.h>
#include <Common/memcpySmall.h>
#include <Common/memcmpSmall.h>
#include <Common/assert_cast.h>
@ -15,7 +14,7 @@
class Collator;
class SipHash;
namespace DB
{
@ -207,22 +206,11 @@ public:
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override
{
size_t string_size = sizeAt(n);
size_t offset = offsetAt(n);
hash.update(reinterpret_cast<const char *>(&string_size), sizeof(string_size));
hash.update(reinterpret_cast<const char *>(&chars[offset]), string_size);
}
void updateHashWithValue(size_t n, SipHash & hash) const override;
WeakHash32 getWeakHash32() const override;
void updateHashFast(SipHash & hash) const override
{
hash.update(reinterpret_cast<const char *>(offsets.data()), offsets.size() * sizeof(offsets[0]));
hash.update(reinterpret_cast<const char *>(chars.data()), chars.size() * sizeof(chars[0]));
}
void updateHashFast(SipHash & hash) const override;
#if !defined(DEBUG_OR_SANITIZER_BUILD)
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;

View File

@ -1,8 +1,41 @@
#include <Columns/ColumnUnique.h>
#include <Common/SipHash.h>
namespace DB
{
template <typename ColumnType>
void ColumnUnique<ColumnType>::updateHashWithValue(size_t n, SipHash & hash_func) const
{
return getNestedColumn()->updateHashWithValue(n, hash_func);
}
template <typename ColumnType>
UInt128 ColumnUnique<ColumnType>::IncrementalHash::getHash(const ColumnType & column)
{
size_t column_size = column.size();
UInt128 cur_hash;
if (column_size != num_added_rows.load())
{
SipHash sip_hash;
for (size_t i = 0; i < column_size; ++i)
column.updateHashWithValue(i, sip_hash);
std::lock_guard lock(mutex);
hash = sip_hash.get128();
cur_hash = hash;
num_added_rows.store(column_size);
}
else
{
std::lock_guard lock(mutex);
cur_hash = hash;
}
return cur_hash;
}
/// Explicit template instantiations.
template class ColumnUnique<ColumnInt8>;
template class ColumnUnique<ColumnUInt8>;
@ -25,5 +58,15 @@ template class ColumnUnique<ColumnDateTime64>;
template class ColumnUnique<ColumnIPv4>;
template class ColumnUnique<ColumnIPv6>;
template class ColumnUnique<ColumnUUID>;
template class ColumnUnique<ColumnDecimal<Decimal32>>;
template class ColumnUnique<ColumnDecimal<Decimal64>>;
template class ColumnUnique<ColumnDecimal<Decimal128>>;
template class ColumnUnique<ColumnDecimal<Decimal256>>;
// template class IColumnHelper<ColumnDecimal<Decimal32>, ColumnFixedSizeHelper>;
// template class IColumnHelper<ColumnDecimal<Decimal64>, ColumnFixedSizeHelper>;
// template class IColumnHelper<ColumnDecimal<Decimal128>, ColumnFixedSizeHelper>;
// template class IColumnHelper<ColumnDecimal<Decimal256>, ColumnFixedSizeHelper>;
// template class IColumnHelper<ColumnDecimal<DateTime64>, ColumnFixedSizeHelper>;
}

View File

@ -87,10 +87,7 @@ public:
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
char * serializeValueIntoMemory(size_t n, char * memory) const override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash_func) const override
{
return getNestedColumn()->updateHashWithValue(n, hash_func);
}
void updateHashWithValue(size_t n, SipHash & hash_func) const override;
#if !defined(DEBUG_OR_SANITIZER_BUILD)
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
@ -721,33 +718,6 @@ IColumnUnique::IndexesWithOverflow ColumnUnique<ColumnType>::uniqueInsertRangeWi
return indexes_with_overflow;
}
template <typename ColumnType>
UInt128 ColumnUnique<ColumnType>::IncrementalHash::getHash(const ColumnType & column)
{
size_t column_size = column.size();
UInt128 cur_hash;
if (column_size != num_added_rows.load())
{
SipHash sip_hash;
for (size_t i = 0; i < column_size; ++i)
column.updateHashWithValue(i, sip_hash);
std::lock_guard lock(mutex);
hash = sip_hash.get128();
cur_hash = hash;
num_added_rows.store(column_size);
}
else
{
std::lock_guard lock(mutex);
cur_hash = hash;
}
return cur_hash;
}
extern template class ColumnUnique<ColumnInt8>;
extern template class ColumnUnique<ColumnUInt8>;
extern template class ColumnUnique<ColumnInt16>;
@ -766,5 +736,12 @@ extern template class ColumnUnique<ColumnFloat64>;
extern template class ColumnUnique<ColumnString>;
extern template class ColumnUnique<ColumnFixedString>;
extern template class ColumnUnique<ColumnDateTime64>;
extern template class ColumnUnique<ColumnIPv4>;
extern template class ColumnUnique<ColumnIPv6>;
extern template class ColumnUnique<ColumnUUID>;
extern template class ColumnUnique<ColumnDecimal<Decimal32>>;
extern template class ColumnUnique<ColumnDecimal<Decimal64>>;
extern template class ColumnUnique<ColumnDecimal<Decimal128>>;
extern template class ColumnUnique<ColumnDecimal<Decimal256>>;
}

View File

@ -13,6 +13,8 @@
#include "config.h"
class SipHash;
namespace DB
{
@ -21,7 +23,6 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
/** A template for columns that use a simple array to store.
*/
template <typename T>

View File

@ -5,6 +5,7 @@
#include <Common/ColumnsHashingImpl.h>
#include <Common/Arena.h>
#include <Common/CacheBase.h>
#include <Common/SipHash.h>
#include <Common/assert_cast.h>
#include <base/unaligned.h>
@ -26,6 +27,19 @@ namespace ErrorCodes
namespace ColumnsHashing
{
/// Hash a set of keys into a UInt128 value.
static inline UInt128 ALWAYS_INLINE hash128( /// NOLINT
size_t i,
size_t keys_size,
const ColumnRawPtrs & key_columns)
{
SipHash hash;
for (size_t j = 0; j < keys_size; ++j)
key_columns[j]->updateHashWithValue(i, hash);
return hash.get128();
}
/// For the case when there is one numeric key.
/// UInt8/16/32/64 for any type with corresponding bit width.
template <typename Value, typename Mapped, typename FieldType, bool use_cache = true, bool need_offset = false, bool nullable = false>

View File

@ -818,6 +818,7 @@ ConfigProcessor::LoadedConfig ConfigProcessor::loadConfigWithZooKeeperIncludes(
bool processed_successfully = false;
try
{
zk_node_cache.sync();
config_xml = processConfig(&has_zk_includes, &zk_node_cache, zk_changed_event, is_config_changed);
processed_successfully = true;
}

View File

@ -356,6 +356,8 @@
M(SharedDatabaseCatalogTablesInLocalDropDetachQueue, "Number of tables in the queue for local drop or detach in Shared Catalog.") \
\
M(MergeTreeIndexGranularityInternalArraysTotalSize, "The total size of all internal arrays in Merge Tree index granularity objects in bytes.") \
\
M(StartupScriptsExecutionState, "State of startup scripts execution: 0 = not finished, 1 = success, 2 = failure.") \
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -229,6 +229,7 @@
\
M(WaitMarksLoadMicroseconds, "Time spent loading marks", ValueType::Microseconds) \
M(BackgroundLoadingMarksTasks, "Number of background tasks for loading marks", ValueType::Number) \
M(LoadingMarksTasksCanceled, "Number of times background tasks for loading marks were canceled", ValueType::Number) \
M(LoadedMarksFiles, "Number of mark files loaded.", ValueType::Number) \
M(LoadedMarksCount, "Number of marks loaded (total across columns).", ValueType::Number) \
M(LoadedMarksMemoryBytes, "Size of in-memory representations of loaded marks.", ValueType::Bytes) \

View File

@ -45,6 +45,7 @@
#include <pcg_random.hpp>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <Common/SipHash.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>

View File

@ -105,4 +105,9 @@ ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, Coor
return result;
}
void ZooKeeperNodeCache::sync()
{
get_zookeeper()->sync("/");
}
}

View File

@ -46,6 +46,8 @@ public:
ZNode get(const std::string & path, EventPtr watch_event);
ZNode get(const std::string & path, Coordination::WatchCallback watch_callback);
void sync();
private:
GetZooKeeper get_zookeeper;

View File

@ -1,6 +1,8 @@
#include <Common/computeMaxTableNameLength.h>
#include <Common/escapeForFileName.h>
#include <filesystem>
namespace DB
{

View File

@ -2,7 +2,6 @@
#include <base/types.h>
#include <base/StringRef.h>
#include <concepts>
namespace DB

View File

@ -1,5 +1,6 @@
#pragma clang diagnostic ignored "-Wreserved-identifier"
#include <Common/SipHash.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>

View File

@ -1,5 +1,6 @@
#pragma clang diagnostic ignored "-Wreserved-identifier"
#include <Common/SipHash.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>

View File

@ -1,5 +1,6 @@
#include <cstring>
#include <Common/SipHash.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionFactory.h>
#include <base/unaligned.h>

View File

@ -6,6 +6,7 @@
#include <base/unaligned.h>
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Common/SipHash.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTIdentifier.h>
#include <Compression/CompressionCodecMultiple.h>

View File

@ -5,8 +5,8 @@
#include <Compression/CompressionInfo.h>
#include <base/types.h>
#include <Parsers/IAST_fwd.h>
#include <Common/SipHash.h>
class SipHash;
namespace DB
{

View File

@ -78,11 +78,10 @@ namespace CoordinationSetting
namespace ErrorCodes
{
extern const int RAFT_ERROR;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int SUPPORT_IS_DISABLED;
extern const int LOGICAL_ERROR;
extern const int INVALID_CONFIG_PARAMETER;
extern const int BAD_ARGUMENTS;
extern const int OPENSSL_ERROR;
}
using namespace std::chrono_literals;
@ -92,47 +91,38 @@ namespace
#if USE_SSL
int callSetCertificate(SSL * ssl, void * arg)
auto getSslContextProvider(const Poco::Util::AbstractConfiguration & config, std::string_view key)
{
if (!arg)
return -1;
const CertificateReloader::Data * data = reinterpret_cast<CertificateReloader::Data *>(arg);
return setCertificateCallback(ssl, data, getLogger("SSLContext"));
}
void setSSLParams(nuraft::asio_service::options & asio_opts)
{
const Poco::Util::LayeredConfiguration & config = Poco::Util::Application::instance().config();
String certificate_file_property = "openSSL.server.certificateFile";
String private_key_file_property = "openSSL.server.privateKeyFile";
String root_ca_file_property = "openSSL.server.caConfig";
if (!config.has(certificate_file_property))
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Server certificate file is not set.");
if (!config.has(private_key_file_property))
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Server private key file is not set.");
String load_default_ca_file_property = fmt::format("openSSL.{}.loadDefaultCAFile", key);
String verification_mode_property = fmt::format("openSSL.{}.verificationMode", key);
String root_ca_file_property = fmt::format("openSSL.{}.caConfig", key);
String private_key_passphrase_property = fmt::format("openSSL.{}.privateKeyPassphraseHandler.options.password", key);
Poco::Net::Context::Params params;
String certificate_file_property = fmt::format("openSSL.{}.certificateFile", key);
String private_key_file_property = fmt::format("openSSL.{}.privateKeyFile", key);
if (config.has(certificate_file_property))
params.certificateFile = config.getString(certificate_file_property);
if (params.certificateFile.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Server certificate file in config '{}' is empty", certificate_file_property);
if (config.has(private_key_file_property))
params.privateKeyFile = config.getString(private_key_file_property);
if (params.privateKeyFile.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Server key file in config '{}' is empty", private_key_file_property);
auto pass_phrase = config.getString("openSSL.server.privateKeyPassphraseHandler.options.password", "");
auto certificate_data = std::make_shared<CertificateReloader::Data>(params.certificateFile, params.privateKeyFile, pass_phrase);
std::shared_ptr<CertificateReloader::Data> certificate_data;
if (config.has(private_key_passphrase_property))
{
certificate_data = std::make_shared<CertificateReloader::Data>(
params.certificateFile, params.privateKeyFile, config.getString(private_key_passphrase_property));
params.certificateFile.clear();
params.privateKeyFile.clear();
}
if (config.has(root_ca_file_property))
params.caLocation = config.getString(root_ca_file_property);
params.loadDefaultCAs = config.getBool("openSSL.server.loadDefaultCAFile", false);
params.verificationMode = Poco::Net::Utility::convertVerificationMode(config.getString("openSSL.server.verificationMode", "none"));
params.loadDefaultCAs = config.getBool(load_default_ca_file_property, false);
params.verificationMode = Poco::Net::Utility::convertVerificationMode(config.getString(verification_mode_property, "none"));
std::string disabled_protocols_list = config.getString("openSSL.server.disableProtocols", "");
std::string disabled_protocols_list = config.getString(fmt::format("openSSL.{}.disableProtocols", key), "");
Poco::StringTokenizer dp_tok(disabled_protocols_list, ";,", Poco::StringTokenizer::TOK_TRIM | Poco::StringTokenizer::TOK_IGNORE_EMPTY);
int disabled_protocols = 0;
for (const auto & token : dp_tok)
@ -149,21 +139,54 @@ void setSSLParams(nuraft::asio_service::options & asio_opts)
disabled_protocols |= Poco::Net::Context::PROTO_TLSV1_2;
}
asio_opts.ssl_context_provider_server_ = [params, certificate_data, disabled_protocols]
auto prefer_server_cypher = config.getBool(fmt::format("openSSL.{}.preferServerCiphers", key), false);
auto cache_sessions = config.getBool(fmt::format("openSSL.{}.cache_sessions", key), false);
return [params, disabled_protocols, prefer_server_cypher, cache_sessions, is_server = key == "server", certificate_data]
{
Poco::Net::Context context(Poco::Net::Context::Usage::TLSV1_2_SERVER_USE, params);
Poco::Net::Context context(is_server ? Poco::Net::Context::Usage::SERVER_USE : Poco::Net::Context::Usage::CLIENT_USE, params);
context.disableProtocols(disabled_protocols);
SSL_CTX * ssl_ctx = context.takeSslContext();
SSL_CTX_set_cert_cb(ssl_ctx, callSetCertificate, reinterpret_cast<void *>(certificate_data.get()));
return ssl_ctx;
};
asio_opts.ssl_context_provider_client_ = [ctx_params = std::move(params)]
if (prefer_server_cypher)
context.preferServerCiphers();
if (cache_sessions)
context.enableSessionCache();
auto * ssl_ctx = context.sslContext();
if (certificate_data)
{
Poco::Net::Context context(Poco::Net::Context::Usage::TLSV1_2_CLIENT_USE, ctx_params);
if (auto err = SSL_CTX_clear_chain_certs(ssl_ctx); err != 1)
throw Exception(ErrorCodes::OPENSSL_ERROR, "Clear certificates {}", Poco::Net::Utility::getLastError());
if (auto err = SSL_CTX_use_certificate(ssl_ctx, const_cast<X509 *>(certificate_data->certs_chain[0].certificate())); err != 1)
throw Exception(ErrorCodes::OPENSSL_ERROR, "Use certificate {}", Poco::Net::Utility::getLastError());
for (auto cert = certificate_data->certs_chain.begin() + 1; cert != certificate_data->certs_chain.end(); cert++)
{
if (auto err = SSL_CTX_add1_chain_cert(ssl_ctx, const_cast<X509 *>(cert->certificate())); err != 1)
throw Exception(ErrorCodes::OPENSSL_ERROR, "Add certificate to chain {}", Poco::Net::Utility::getLastError());
}
if (auto err = SSL_CTX_use_PrivateKey(ssl_ctx, const_cast<EVP_PKEY *>(static_cast<const EVP_PKEY *>(certificate_data->key))); err != 1)
throw Exception(ErrorCodes::OPENSSL_ERROR, "Use private key {}", Poco::Net::Utility::getLastError());
if (auto err = SSL_CTX_check_private_key(ssl_ctx); err != 1)
throw Exception(ErrorCodes::OPENSSL_ERROR, "Unusable key-pair {}", Poco::Net::Utility::getLastError());
}
return context.takeSslContext();
};
}
void setSSLParams(nuraft::asio_service::options & asio_opts)
{
asio_opts.enable_ssl_ = true;
const Poco::Util::LayeredConfiguration & config = Poco::Util::Application::instance().config();
asio_opts.ssl_context_provider_server_ = getSslContextProvider(config, "server");
asio_opts.ssl_context_provider_client_ = getSslContextProvider(config, "client");
}
#endif
std::string checkAndGetSuperdigest(const String & user_and_digest)
@ -483,7 +506,6 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for NuRaft is disabled because ClickHouse was built without SSL support.");
#endif
}
if (is_recovering)
enterRecoveryMode(params);

View File

@ -4,6 +4,7 @@
#include <Coordination/Defines.h>
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
#include <Common/SipHash.h>
#include <Common/isLocalAddress.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>

View File

@ -3622,6 +3622,14 @@ bool KeeperStorageBase::checkDigest(const Digest & first, const Digest & second)
return first.value == second.value;
}
UInt64 KeeperStorageBase::WatchInfoHash::operator()(WatchInfo info) const
{
SipHash hash;
hash.update(info.path);
hash.update(info.is_list_watch);
return hash.get64();
}
template<typename Container>
String KeeperStorage<Container>::generateDigest(const String & userdata)
{

View File

@ -326,13 +326,7 @@ public:
struct WatchInfoHash
{
auto operator()(WatchInfo info) const
{
SipHash hash;
hash.update(info.path);
hash.update(info.is_list_watch);
return hash.get64();
}
UInt64 operator()(WatchInfo info) const;
};
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<WatchInfo, WatchInfoHash>>;

View File

@ -1,7 +1,6 @@
#pragma once
#include <base/StringRef.h>
#include <Coordination/KeeperContext.h>
#include <Common/SipHash.h>
#include <Disks/DiskLocal.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>

View File

@ -145,6 +145,9 @@ public:
void write(WriteBuffer & out, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT) const;
void read(ReadBuffer & in, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT);
void writeChangedBinary(WriteBuffer & out) const;
void readBinary(ReadBuffer & in);
// A debugging aid.
std::string toString() const;
@ -485,6 +488,46 @@ void BaseSettings<TTraits>::write(WriteBuffer & out, SettingsWriteFormat format)
BaseSettingsHelpers::writeString(std::string_view{}, out);
}
template <typename TTraits>
void BaseSettings<TTraits>::writeChangedBinary(WriteBuffer & out) const
{
const auto & accessor = Traits::Accessor::instance();
size_t num_settings = 0;
for (auto it = this->begin(); it != this->end(); ++it)
++num_settings;
writeVarUInt(num_settings, out);
for (const auto & field : *this)
{
BaseSettingsHelpers::writeString(field.getName(), out);
using Flags = BaseSettingsHelpers::Flags;
Flags flags{0};
BaseSettingsHelpers::writeFlags(flags, out);
accessor.writeBinary(*this, field.index, out);
}
}
template <typename TTraits>
void BaseSettings<TTraits>::readBinary(ReadBuffer & in)
{
const auto & accessor = Traits::Accessor::instance();
size_t num_settings = 0;
readVarUInt(num_settings, in);
for (size_t i = 0; i < num_settings; ++i)
{
String read_name = BaseSettingsHelpers::readString(in);
std::string_view name = TTraits::resolveName(read_name);
size_t index = accessor.find(name);
std::ignore = BaseSettingsHelpers::readFlags(in);
accessor.readBinary(*this, index, in);
}
}
template <typename TTraits>
void BaseSettings<TTraits>::read(ReadBuffer & in, SettingsWriteFormat format)
{

View File

@ -127,4 +127,6 @@ static constexpr auto QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS = 1000000000;
static constexpr auto QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS = 0;
#endif
static constexpr auto DEFAULT_REMOVE_SHARED_RECURSIVE_FILE_LIMIT = 1000uz;
}

View File

@ -1012,6 +1012,10 @@ void readQuoted(DecimalField<T> & x, ReadBuffer & buf);
void writeFieldText(const Field & x, WriteBuffer & buf);
void writeFieldBinary(const Field & x, WriteBuffer & buf);
Field readFieldBinary(ReadBuffer & buf);
String toString(const Field & x);
}

View File

@ -0,0 +1,30 @@
#include <Core/QualifiedTableName.h>
#include <Common/SipHash.h>
#include <Common/quoteString.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}
UInt64 QualifiedTableName::hash() const
{
SipHash hash_state;
hash_state.update(database.data(), database.size());
hash_state.update(table.data(), table.size());
return hash_state.get64();
}
QualifiedTableName QualifiedTableName::parseFromString(const String & maybe_qualified_name)
{
auto name = tryParseFromString(maybe_qualified_name);
if (!name)
throw Exception(ErrorCodes::SYNTAX_ERROR, "Invalid qualified name: {}", maybe_qualified_name);
return *name;
}
}

View File

@ -1,21 +1,14 @@
#pragma once
#include <base/types.h>
#include <string>
#include <tuple>
#include <optional>
#include <Common/Exception.h>
#include <Common/SipHash.h>
#include <Common/quoteString.h>
#include <fmt/format.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}
//TODO replace with StorageID
struct QualifiedTableName
{
@ -32,13 +25,7 @@ struct QualifiedTableName
return std::forward_as_tuple(database, table) < std::forward_as_tuple(other.database, other.table);
}
UInt64 hash() const
{
SipHash hash_state;
hash_state.update(database.data(), database.size());
hash_state.update(table.data(), table.size());
return hash_state.get64();
}
UInt64 hash() const;
std::vector<std::string> getParts() const
{
@ -86,13 +73,7 @@ struct QualifiedTableName
return name;
}
static QualifiedTableName parseFromString(const String & maybe_qualified_name)
{
auto name = tryParseFromString(maybe_qualified_name);
if (!name)
throw Exception(ErrorCodes::SYNTAX_ERROR, "Invalid qualified name: {}", maybe_qualified_name);
return *name;
}
static QualifiedTableName parseFromString(const String & maybe_qualified_name);
};
}
@ -111,22 +92,3 @@ template <> struct hash<DB::QualifiedTableName>
}
};
}
namespace fmt
{
template <>
struct formatter<DB::QualifiedTableName>
{
static constexpr auto parse(format_parse_context & ctx)
{
return ctx.begin();
}
template <typename FormatContext>
auto format(const DB::QualifiedTableName & name, FormatContext & ctx) const
{
return fmt::format_to(ctx.out(), "{}.{}", DB::backQuoteIfNeed(name.database), DB::backQuoteIfNeed(name.table));
}
};
}

View File

@ -4951,6 +4951,7 @@ Prefer prefetched threadpool if all parts are on remote filesystem
Prefer prefetched threadpool if all parts are on local filesystem
)", 0) \
\
DECLARE(UInt64, object_storage_remove_recursive_file_limit, DEFAULT_REMOVE_SHARED_RECURSIVE_FILE_LIMIT, "Max number of files to store in memory during remove. Zero value means unlimited. Used to reduce memory usage.", 0) \
DECLARE(UInt64, prefetch_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, R"(
The maximum size of the prefetch buffer to read from the filesystem.
)", 0) \
@ -5559,13 +5560,6 @@ Only available in ClickHouse Cloud. Exclude new data parts from SELECT queries u
)", 0) \
DECLARE(Int64, prefer_warmed_unmerged_parts_seconds, 0, R"(
Only available in ClickHouse Cloud. If a merged part is less than this many seconds old and is not pre-warmed (see cache_populated_by_fetch), but all its source parts are available and pre-warmed, SELECT queries will read from those parts instead. Only for ReplicatedMergeTree. Note that this only checks whether CacheWarmer processed the part; if the part was fetched into cache by something else, it'll still be considered cold until CacheWarmer gets to it; if it was warmed, then evicted from cache, it'll still be considered warm.
)", 0) \
DECLARE(Bool, iceberg_engine_ignore_schema_evolution, false, R"(
Allow to ignore schema evolution in Iceberg table engine and read all data using schema specified by the user on table creation or latest schema parsed from metadata on table creation.
:::note
Enabling this setting can lead to incorrect result as in case of evolved schema all data files will be read using the same schema.
:::
)", 0) \
DECLARE(Bool, allow_deprecated_error_prone_window_functions, false, R"(
Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)
@ -5999,6 +5993,7 @@ Experimental data deduplication for SELECT queries based on part UUIDs
MAKE_OBSOLETE(M, Bool, optimize_monotonous_functions_in_order_by, false) \
MAKE_OBSOLETE(M, UInt64, http_max_chunk_size, 100_GiB) \
MAKE_OBSOLETE(M, Bool, enable_deflate_qpl_codec, false) \
MAKE_OBSOLETE(M, Bool, iceberg_engine_ignore_schema_evolution, false) \
/** The section above is for obsolete settings. Do not add anything there. */
#endif /// __CLION_IDE__

View File

@ -63,12 +63,13 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"query_plan_join_swap_table", "false", "auto", "New setting. Right table was always chosen before."},
{"max_size_to_preallocate_for_aggregation", 100'000'000, 1'000'000'000'000, "Enable optimisation for bigger tables."},
{"max_size_to_preallocate_for_joins", 100'000'000, 1'000'000'000'000, "Enable optimisation for bigger tables."},
{"parallel_replicas_index_analysis_only_on_coordinator", false, true, "Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan"},
{"max_bytes_ratio_before_external_group_by", 0., 0., "New setting."},
{"object_storage_remove_recursive_file_limit", 0, 1000, "Added new setting to limit number of files stored in memory while removing from object storage. Zero value means unlimited."},
{"max_bytes_ratio_before_external_sort", 0., 0., "New setting."},
{"use_async_executor_for_materialized_views", false, false, "New setting."},
{"composed_data_type_output_format_mode", "default", "default", "New setting"},
{"http_response_headers", "", "", "New setting."},
{"parallel_replicas_index_analysis_only_on_coordinator", true, true, "Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan"}, // enabling it was moved to 24.10
}
},
{"24.11",
@ -149,6 +150,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"allow_reorder_prewhere_conditions", true, true, "New setting"},
{"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."},
{"date_time_64_output_format_cut_trailing_zeros_align_to_groups_of_thousands", false, false, "Dynamically trim the trailing zeros of datetime64 values to adjust the output scale to (0, 3, 6), corresponding to 'seconds', 'milliseconds', and 'microseconds'."},
{"parallel_replicas_index_analysis_only_on_coordinator", false, true, "Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan"},
}
},
{"24.9",
@ -617,7 +619,10 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{
{"24.12",
{
{"enforce_index_structure_match_on_partition_manipulation", true, false, "Add new setting to allow attach when source table's projections and secondary indices is a subset of those in the target table."}
{"enforce_index_structure_match_on_partition_manipulation", true, false, "Add new setting to allow attach when source table's projections and secondary indices is a subset of those in the target table."},
{"use_primary_key_cache", false, false, "New setting"},
{"prewarm_primary_key_cache", false, false, "New setting"},
{"min_bytes_to_prewarm_caches", 0, 0, "New setting"},
}
},
{"24.11",
@ -642,7 +647,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
static void initSettingsChangesHistory(
std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> & settings_changes_history,
std::once_flag & initialized_flag,
std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory::SettingsChanges>> & initializer
std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory::SettingsChanges>> const & initializer
)
{
std::call_once(initialized_flag, [&]()

View File

@ -15,6 +15,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
void dumpSortDescription(const SortDescription & description, WriteBuffer & out)
{
bool first = true;
@ -209,4 +214,58 @@ JSONBuilder::ItemPtr explainSortDescription(const SortDescription & description)
return json_array;
}
void serializeSortDescription(const SortDescription & sort_description, WriteBuffer & out)
{
writeVarUInt(sort_description.size(), out);
for (const auto & desc : sort_description)
{
writeStringBinary(desc.column_name, out);
UInt8 flags = 0;
if (desc.direction > 0)
flags |= 1;
if (desc.nulls_direction > 0)
flags |= 2;
if (desc.collator)
flags |= 4;
if (desc.with_fill)
flags |= 8;
writeIntBinary(flags, out);
if (desc.collator)
writeStringBinary(desc.collator->getLocale(), out);
if (desc.with_fill)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH FILL is not supported in serialized sort description");
}
}
void deserializeSortDescription(SortDescription & sort_description, ReadBuffer & in)
{
size_t size = 0;
readVarUInt(size, in);
sort_description.resize(size);
for (auto & desc : sort_description)
{
readStringBinary(desc.column_name, in);
UInt8 flags = 0;
readIntBinary(flags, in);
desc.direction = (flags & 1) ? 1 : -1;
desc.nulls_direction = (flags & 2) ? 1 : -1;
if (flags & 4)
{
String collator_locale;
readStringBinary(collator_locale, in);
if (!collator_locale.empty())
desc.collator = std::make_shared<Collator>(collator_locale);
}
if (flags & 8)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH FILL is not supported in deserialized sort description");
}
}
}

View File

@ -143,4 +143,11 @@ void dumpSortDescription(const SortDescription & description, WriteBuffer & out)
std::string dumpSortDescription(const SortDescription & description);
JSONBuilder::ItemPtr explainSortDescription(const SortDescription & description);
class WriteBuffer;
class ReadBuffer;
void serializeSortDescription(const SortDescription & sort_description, WriteBuffer & out);
void deserializeSortDescription(SortDescription & sort_description, ReadBuffer & in);
}

View File

@ -10,6 +10,7 @@
#include <DataTypes/Serializations/SerializationSubObject.h>
#include <Columns/ColumnObject.h>
#include <Common/CurrentThread.h>
#include <Common/quoteString.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>

View File

@ -43,6 +43,9 @@ DataTypeVariant::DataTypeVariant(const DataTypes & variants_)
for (const auto & [_, type] : name_to_type)
variants.push_back(type);
if (variants.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Variant type should have at least one nested type");
if (variants.size() > ColumnVariant::MAX_NESTED_COLUMNS)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Variant type with more than {} nested types is not allowed", ColumnVariant::MAX_NESTED_COLUMNS);
}

View File

@ -521,7 +521,7 @@ DataTypePtr createConcreteEmptyDynamicColumn(const DataTypePtr & type_in_storage
throw Exception(ErrorCodes::EXPERIMENTAL_FEATURE_ERROR, "Type {} unexpectedly has dynamic columns", type_in_storage->getName());
}
bool hasDynamicSubcolumns(const ColumnsDescription & columns)
bool hasDynamicSubcolumnsDeprecated(const ColumnsDescription & columns)
{
return std::any_of(columns.begin(), columns.end(),
[](const auto & column)

View File

@ -63,7 +63,7 @@ DataTypePtr createConcreteEmptyDynamicColumn(const DataTypePtr & type_in_storage
void extendObjectColumns(NamesAndTypesList & columns_list, const ColumnsDescription & object_columns, bool with_subcolumns);
/// Checks whether @columns contain any column with dynamic subcolumns.
bool hasDynamicSubcolumns(const ColumnsDescription & columns);
bool hasDynamicSubcolumnsDeprecated(const ColumnsDescription & columns);
/// Updates types of objects in @object_columns inplace
/// according to types in new_columns.

View File

@ -79,27 +79,11 @@ void SerializationAggregateFunction::deserializeBinaryBulk(IColumn & column, Rea
size_t size_of_state = function->sizeOfData();
size_t align_of_state = function->alignOfData();
for (size_t i = 0; i < limit; ++i)
{
if (istr.eof())
break;
/// Adjust the size of state to make all states aligned in vector.
size_t total_size_of_state = (size_of_state + align_of_state - 1) / align_of_state * align_of_state;
char * place = arena.alignedAlloc(total_size_of_state * limit, align_of_state);
AggregateDataPtr place = arena.alignedAlloc(size_of_state, align_of_state);
function->create(place);
try
{
function->deserialize(place, istr, version, &arena);
}
catch (...)
{
function->destroy(place);
throw;
}
vec.push_back(place);
}
function->createAndDeserializeBatch(vec, place, total_size_of_state, limit, istr, version, &arena);
}
static String serializeToString(const AggregateFunctionPtr & function, const IColumn & column, size_t row_num, size_t version)

View File

@ -10,6 +10,7 @@
#include <Common/HashTable/HashMap.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/SipHash.h>
#include <Core/Field.h>
namespace DB

View File

@ -16,6 +16,7 @@
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/parseQuery.h>
#include <Common/KnownObjectNames.h>
#include <Common/quoteString.h>
#include <Core/Settings.h>
#include <Poco/String.h>
@ -473,7 +474,7 @@ namespace
try
{
ParserSelectWithUnionQuery parser;
String description = fmt::format("Query for ClickHouse dictionary {}", data.table_name);
String description = fmt::format("Query for ClickHouse dictionary {}.{}", backQuoteIfNeed(data.table_name.database), backQuoteIfNeed(data.table_name.table));
String fixed_query = removeWhereConditionPlaceholder(query);
const Settings & settings = data.global_context->getSettingsRef();
ASTPtr select = parseQuery(

View File

@ -5,6 +5,7 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Common/isLocalAddress.h>
#include <Common/quoteString.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>

View File

@ -1,6 +1,7 @@
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseFilesystem.h>
#include <Common/quoteString.h>
#include <Core/Settings.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>

View File

@ -14,6 +14,7 @@
#include <Storages/ObjectStorage/HDFS/HDFSCommon.h>
#include <Storages/IStorage.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/quoteString.h>
#include <Common/re2.h>
#include <Common/RemoteHostFilter.h>
#include <Core/Settings.h>

View File

@ -1,5 +1,6 @@
#include <base/scope_guard.h>
#include <Common/logger_useful.h>
#include <Common/quoteString.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabasesCommon.h>

View File

@ -43,6 +43,7 @@
#include <Common/Macros.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/PoolId.h>
#include <Common/SipHash.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
@ -1226,6 +1227,13 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
query_context->setSetting("database_replicated_allow_explicit_uuid", 3);
query_context->setSetting("database_replicated_allow_replicated_engine_arguments", 3);
/// We apply the flatten_nested setting after writing the CREATE query to the DDL log,
/// but before writing metadata to ZooKeeper. So we have to apply the setting on secondary replicas, but not in recovery mode.
/// Set it to false, so it will do nothing on recovery. The metadata in ZooKeeper should be used as is.
/// Same for data_type_default_nullable.
query_context->setSetting("flatten_nested", false);
query_context->setSetting("data_type_default_nullable", false);
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(current_zookeeper, zookeeper_path, false, "");
query_context->initZooKeeperMetadataTransaction(txn);
return query_context;

View File

@ -23,6 +23,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
namespace DB

View File

@ -1,5 +1,6 @@
#include <Databases/DatabasesOverlay.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>

View File

@ -600,6 +600,7 @@ void mergeBlockWithPipe(
while (executor.pull(block))
{
convertToFullIfSparse(block);
block.checkNumberOfRows();
Columns block_key_columns;
block_key_columns.reserve(key_columns_size);

View File

@ -1,6 +1,7 @@
#include <Disks/DiskFomAST.h>
#include <Common/assert_cast.h>
#include <Common/filesystemHelpers.h>
#include <Common/SipHash.h>
#include <Disks/getDiskConfigurationFromAST.h>
#include <Disks/DiskSelector.h>
#include <Parsers/formatAST.h>

View File

@ -249,6 +249,11 @@ public:
/// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists.
virtual void removeRecursive(const String & path) = 0;
/// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists.
/// Differs from removeRecursive for S3/HDFS disks
/// Limits the number of removing files in batches to prevent high memory consumption.
virtual void removeRecursiveWithLimit(const String & path) { removeRecursive(path); }
/// Remove file. Throws exception if file doesn't exists or if directory is not empty.
/// Differs from removeFile for S3/HDFS disks
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3

View File

@ -11,6 +11,7 @@
#include <azure/storage/blobs/blob_options.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
#include <filesystem>
namespace ProfileEvents
{
@ -108,7 +109,7 @@ ListBlobsPagedResponse ContainerClientWrapper::ListBlobs(const ListBlobsOptions
new_options.Prefix = blob_prefix / options.Prefix.ValueOr("");
auto response = client.ListBlobs(new_options);
auto blob_prefix_str = blob_prefix.empty() ? "" : blob_prefix.string() + "/";
String blob_prefix_str = blob_prefix / "";
for (auto & blob : response.Blobs)
{

View File

@ -3,6 +3,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromEmptyFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Common/checkStackSize.h>
#include <Common/formatReadable.h>
#include <Common/CurrentThread.h>
#include <Common/quoteString.h>
@ -75,6 +76,7 @@ DiskObjectStorage::DiskObjectStorage(
, read_resource_name_from_config(config.getString(config_prefix + ".read_resource", ""))
, write_resource_name_from_config(config.getString(config_prefix + ".write_resource", ""))
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}, WriteSettings{}))
, remove_shared_recursive_file_limit(config.getUInt64(config_prefix + ".remove_shared_recursive_file_limit", DEFAULT_REMOVE_SHARED_RECURSIVE_FILE_LIMIT))
{
data_source_description = DataSourceDescription{
.type = DataSourceType::ObjectStorage,
@ -472,6 +474,65 @@ void DiskObjectStorage::removeSharedRecursive(
transaction->commit();
}
void DiskObjectStorage::removeSharedRecursiveWithLimit(
const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
{
if (remove_shared_recursive_file_limit == 0)
{
removeSharedRecursive(path, keep_all_batch_data, file_names_remove_metadata_only);
return;
}
RemoveBatchRequest local_paths;
std::vector<std::string> directories;
auto check_limit_reached = [&]()
{
const auto path_count = local_paths.size() + directories.size();
chassert(path_count <= this->remove_shared_recursive_file_limit);
return local_paths.size() + directories.size() == this->remove_shared_recursive_file_limit;
};
auto remove = [&]()
{
auto transaction = createObjectStorageTransaction();
if (!local_paths.empty())
transaction->removeSharedFiles(local_paths, keep_all_batch_data, file_names_remove_metadata_only);
for (auto & directory : directories)
transaction->removeDirectory(directory);
transaction->commit();
local_paths.clear();
directories.clear();
};
std::function<void(const std::string &)> traverse_metadata_recursive = [&](const std::string & path_to_remove)
{
checkStackSize();
if (check_limit_reached())
remove();
if (metadata_storage->existsFile(path_to_remove))
{
chassert(path_to_remove.starts_with(path));
local_paths.emplace_back(path_to_remove);
}
else
{
for (auto it = metadata_storage->iterateDirectory(path_to_remove); it->isValid(); it->next())
{
traverse_metadata_recursive(it->path());
if (check_limit_reached())
remove();
}
directories.push_back(path_to_remove);
}
};
traverse_metadata_recursive(path);
remove();
}
bool DiskObjectStorage::tryReserve(UInt64 bytes)
{
std::lock_guard lock(reservation_mutex);
@ -739,6 +800,8 @@ void DiskObjectStorage::applyNewSettings(
write_resource_name_from_config = new_write_resource_name;
}
remove_shared_recursive_file_limit = config.getUInt64(config_prefix + ".remove_shared_recursive_file_limit", DEFAULT_REMOVE_SHARED_RECURSIVE_FILE_LIMIT);
IDisk::applyNewSettings(config, context_, config_prefix, disk_map);
}

View File

@ -80,12 +80,16 @@ public:
void removeRecursive(const String & path) override { removeSharedRecursive(path, false, {}); }
void removeRecursiveWithLimit(const String & path) override { removeSharedRecursiveWithLimit(path, false, {}); }
void removeSharedFile(const String & path, bool delete_metadata_only) override;
void removeSharedFileIfExists(const String & path, bool delete_metadata_only) override;
void removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void removeSharedRecursiveWithLimit(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only);
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void truncateFile(const String & path, size_t size) override;
@ -259,6 +263,8 @@ private:
scope_guard resource_changes_subscription;
std::unique_ptr<DiskObjectStorageRemoteMetadataRestoreHelper> metadata_helper;
UInt64 remove_shared_recursive_file_limit;
};
using DiskObjectStoragePtr = std::shared_ptr<DiskObjectStorage>;

View File

@ -911,8 +911,11 @@ void DiskObjectStorageTransaction::chmod(const String & path, mode_t mode)
void DiskObjectStorageTransaction::createFile(const std::string & path)
{
operations_to_execute.emplace_back(
std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx)
std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path, this](MetadataTransactionPtr tx)
{
if (object_storage.isPlain() && !object_storage.isWriteOnce())
tx->createEmptyFile(path);
tx->createEmptyMetadataFile(path);
}));
}

View File

@ -135,6 +135,8 @@ public:
/// Create empty file in metadata storage
virtual void createEmptyMetadataFile(const std::string & path) = 0;
virtual void createEmptyFile(const std::string & /* path */) {}
/// Create metadata file on paths with content (blob_name, size_in_bytes)
virtual void createMetadataFile(const std::string & path, ObjectStorageKey key, uint64_t size_in_bytes) = 0;

View File

@ -98,9 +98,9 @@ protected:
ObjectMetadataEntryPtr getObjectMetadataEntryWithCache(const std::string & path) const;
};
class MetadataStorageFromPlainObjectStorageTransaction final : public IMetadataTransaction, private MetadataOperationsHolder
class MetadataStorageFromPlainObjectStorageTransaction : public IMetadataTransaction, private MetadataOperationsHolder
{
private:
protected:
MetadataStorageFromPlainObjectStorage & metadata_storage;
ObjectStoragePtr object_storage;

View File

@ -4,11 +4,31 @@
#include <memory>
#include <unordered_set>
#include <IO/WriteHelpers.h>
namespace DB
{
class MetadataStorageFromPlainRewritableObjectStorageTransaction final : public MetadataStorageFromPlainObjectStorageTransaction
{
public:
explicit MetadataStorageFromPlainRewritableObjectStorageTransaction(
MetadataStorageFromPlainObjectStorage & metadata_storage_, ObjectStoragePtr object_storage_)
: DB::MetadataStorageFromPlainObjectStorageTransaction(metadata_storage_, object_storage_)
{
}
void createEmptyFile(const std::string & path) override
{
const auto key = object_storage->generateObjectKeyForPath(path, std::nullopt);
StoredObject object(key.serialize(), "", /* file_size */0);
auto buf = object_storage->writeObject(object, WriteMode::Rewrite);
buf->finalize();
}
};
class MetadataStorageFromPlainRewritableObjectStorage final : public MetadataStorageFromPlainObjectStorage
{
private:
@ -22,6 +42,11 @@ public:
MetadataStorageType getType() const override { return MetadataStorageType::PlainRewritable; }
MetadataTransactionPtr createTransaction() override
{
return std::make_shared<MetadataStorageFromPlainRewritableObjectStorageTransaction>(*this, object_storage);
}
bool existsFile(const std::string & path) const override;
bool existsDirectory(const std::string & path) const override;

View File

@ -16,6 +16,7 @@
#include <IO/S3/copyS3File.h>
#include <IO/S3/deleteFileFromS3.h>
#include <Interpreters/Context.h>
#include <Common/quoteString.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Core/Settings.h>
#include <IO/S3/BlobStorageLogWriter.h>

View File

@ -221,7 +221,7 @@ struct FormatSettings
bool escape_forward_slashes = true;
bool read_named_tuples_as_objects = false;
bool use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects = false;
bool write_named_tuples_as_objects = false;
bool write_named_tuples_as_objects = true;
bool skip_null_value_in_named_tuples = false;
bool defaults_for_missing_elements_in_named_tuple = false;
bool ignore_unknown_keys_in_named_tuple = false;

View File

@ -4354,6 +4354,7 @@ private:
auto & write_buffer = write_helper.getWriteBuffer();
FormatSettings format_settings = context ? getFormatSettings(context) : FormatSettings{};
auto serialization = arguments[0].type->getDefaultSerialization();
format_settings.json.quote_64bit_integers = false;
for (size_t i = 0; i < input_rows_count; ++i)
{
serialization->serializeTextJSON(*arguments[0].column, i, write_buffer, format_settings);
@ -4880,6 +4881,14 @@ private:
if (const auto * variant_type = typeid_cast<const DataTypeVariant *>(from_type.get()))
return createVariantToDynamicWrapper(*variant_type, dynamic_type);
if (from_type->onlyNull())
return [](ColumnsWithTypeAndName &, const DataTypePtr & result_type, const ColumnNullable *, size_t input_rows_count) -> ColumnPtr
{
auto result = result_type->createColumn();
result->insertManyDefaults(input_rows_count);
return result;
};
if (context && context->getSettingsRef()[Setting::cast_string_to_dynamic_use_inference] && isStringOrFixedString(removeNullable(removeLowCardinality(from_type))))
return createStringToDynamicThroughParsingWrapper();

View File

@ -1,13 +1,13 @@
#pragma once
#include <Functions/IFunctionAdaptors.h>
#include <Interpreters/ExpressionActions.h>
#include <DataTypes/DataTypeFunction.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Columns/ColumnFunction.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnFunction.h>
#include <DataTypes/DataTypeFunction.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/IFunctionAdaptors.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
@ -18,6 +18,18 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
struct LambdaCapture
{
Names captured_names;
DataTypes captured_types;
NamesAndTypesList lambda_arguments;
String return_name;
DataTypePtr return_type;
bool allow_constant_folding;
};
using LambdaCapturePtr = std::shared_ptr<LambdaCapture>;
class ExecutableFunctionExpression : public IExecutableFunction
{
public:
@ -30,14 +42,20 @@ public:
using SignaturePtr = std::shared_ptr<Signature>;
ExecutableFunctionExpression(ExpressionActionsPtr expression_actions_, SignaturePtr signature_)
: expression_actions(std::move(expression_actions_))
, signature(std::move(signature_))
{}
: expression_actions(std::move(expression_actions_)), signature(std::move(signature_))
{
}
String getName() const override { return "FunctionExpression"; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
if (input_rows_count == 0)
return result_type->createColumn();
if (!expression_actions)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No actions were passed to FunctionExpression");
DB::Block expr_columns;
for (size_t i = 0; i < arguments.size(); ++i)
{
@ -71,13 +89,27 @@ public:
using Signature = ExecutableFunctionExpression::Signature;
using SignaturePtr = ExecutableFunctionExpression::SignaturePtr;
FunctionExpression(ExpressionActionsPtr expression_actions_,
DataTypes argument_types_, const Names & argument_names_,
DataTypePtr return_type_, const std::string & return_name_)
FunctionExpression(LambdaCapturePtr capture_, ExpressionActionsPtr expression_actions_)
: expression_actions(std::move(expression_actions_))
, signature(std::make_shared<Signature>(Signature{argument_names_, return_name_}))
, argument_types(std::move(argument_types_)), return_type(std::move(return_type_))
, capture(std::move(capture_))
{
Names names;
DataTypes types;
names.reserve(capture->captured_names.size() + capture->lambda_arguments.size());
names.insert(names.end(), capture->captured_names.begin(), capture->captured_names.end());
types.reserve(capture->captured_types.size() + capture->lambda_arguments.size());
types.insert(types.end(), capture->captured_types.begin(), capture->captured_types.end());
for (const auto & lambda_argument : capture->lambda_arguments)
{
names.push_back(lambda_argument.name);
types.push_back(lambda_argument.type);
}
argument_types = std::move(types);
signature = std::make_shared<Signature>(Signature{names, capture->return_name});
}
String getName() const override { return "FunctionExpression"; }
@ -85,7 +117,10 @@ public:
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
const DataTypes & getArgumentTypes() const override { return argument_types; }
const DataTypePtr & getResultType() const override { return return_type; }
const DataTypePtr & getResultType() const override { return capture->return_type; }
const LambdaCapture & getCapture() const { return *capture; }
const ActionsDAG & getAcionsDAG() const { return expression_actions->getActionsDAG(); }
ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &) const override
{
@ -94,9 +129,11 @@ public:
private:
ExpressionActionsPtr expression_actions;
LambdaCapturePtr capture;
/// This is redundant and is built from capture.
SignaturePtr signature;
DataTypes argument_types;
DataTypePtr return_type;
};
/// Captures columns which are used by lambda function but not in argument list.
@ -106,20 +143,10 @@ private:
class ExecutableFunctionCapture : public IExecutableFunction
{
public:
struct Capture
ExecutableFunctionCapture(ExpressionActionsPtr expression_actions_, LambdaCapturePtr capture_)
: expression_actions(std::move(expression_actions_)), capture(std::move(capture_))
{
Names captured_names;
DataTypes captured_types;
NamesAndTypesList lambda_arguments;
String return_name;
DataTypePtr return_type;
bool allow_constant_folding;
};
using CapturePtr = std::shared_ptr<Capture>;
ExecutableFunctionCapture(ExpressionActionsPtr expression_actions_, CapturePtr capture_)
: expression_actions(std::move(expression_actions_)), capture(std::move(capture_)) {}
}
String getName() const override { return "FunctionCapture"; }
@ -148,8 +175,7 @@ public:
types.push_back(lambda_argument.type);
}
auto function = std::make_unique<FunctionExpression>(expression_actions, types, names,
capture->return_type, capture->return_name);
auto function = std::make_unique<FunctionExpression>(capture, expression_actions);
/// If all the captured arguments are constant, let's also return ColumnConst (with ColumnFunction inside it).
/// Consequently, it allows to treat higher order functions with constant arrays and constant captured columns
@ -175,17 +201,15 @@ public:
private:
ExpressionActionsPtr expression_actions;
CapturePtr capture;
LambdaCapturePtr capture;
};
class FunctionCapture : public IFunctionBase
{
public:
using CapturePtr = ExecutableFunctionCapture::CapturePtr;
FunctionCapture(
ExpressionActionsPtr expression_actions_,
CapturePtr capture_,
LambdaCapturePtr capture_,
DataTypePtr return_type_,
String name_)
: expression_actions(std::move(expression_actions_))
@ -207,9 +231,12 @@ public:
return std::make_unique<ExecutableFunctionCapture>(expression_actions, capture);
}
const LambdaCapture & getCapture() const { return *capture; }
const ActionsDAG & getAcionsDAG() const { return expression_actions->getActionsDAG(); }
private:
ExpressionActionsPtr expression_actions;
CapturePtr capture;
LambdaCapturePtr capture;
DataTypePtr return_type;
String name;
};
@ -217,28 +244,23 @@ private:
class FunctionCaptureOverloadResolver : public IFunctionOverloadResolver
{
public:
using Capture = ExecutableFunctionCapture::Capture;
using CapturePtr = ExecutableFunctionCapture::CapturePtr;
FunctionCaptureOverloadResolver(
ExpressionActionsPtr expression_actions_,
ActionsDAG actions_dag,
const ExpressionActionsSettings & actions_settings,
const Names & captured_names,
const NamesAndTypesList & lambda_arguments,
const DataTypePtr & function_return_type,
const String & expression_return_name,
bool allow_constant_folding)
: expression_actions(std::move(expression_actions_))
{
/// Check that expression does not contain unusual actions that will break columns structure.
for (const auto & action : expression_actions->getActions())
if (action.node->type == ActionsDAG::ActionType::ARRAY_JOIN)
if (actions_dag.hasArrayJoin())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression with arrayJoin or other unusual action cannot be captured");
std::unordered_map<std::string, DataTypePtr> arguments_map;
const auto & all_arguments = expression_actions->getRequiredColumnsWithTypes();
for (const auto & arg : all_arguments)
arguments_map[arg.name] = arg.type;
for (const auto * input : actions_dag.getInputs())
arguments_map[input->result_name] = input->result_type;
DataTypes captured_types;
captured_types.reserve(captured_names.size());
@ -263,7 +285,7 @@ public:
name = "Capture[" + toString(captured_types) + "](" + toString(argument_types) + ") -> "
+ function_return_type->getName();
capture = std::make_shared<Capture>(Capture{
capture = std::make_shared<LambdaCapture>(LambdaCapture{
.captured_names = captured_names,
.captured_types = std::move(captured_types),
.lambda_arguments = lambda_arguments,
@ -271,6 +293,8 @@ public:
.return_type = function_return_type,
.allow_constant_folding = allow_constant_folding,
});
expression_actions = std::make_shared<ExpressionActions>(std::move(actions_dag), actions_settings);
}
String getName() const override { return name; }
@ -288,7 +312,7 @@ public:
private:
ExpressionActionsPtr expression_actions;
CapturePtr capture;
LambdaCapturePtr capture;
DataTypePtr return_type;
String name;

View File

@ -10,6 +10,7 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
@ -140,7 +141,7 @@ public:
const auto & haystack_type = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[0] : arguments[1];
const auto & needle_type = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[1] : arguments[0];
if (!isStringOrFixedString(haystack_type))
if (!(isStringOrFixedString(haystack_type) || isEnum(haystack_type)))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of function {}",
@ -173,11 +174,56 @@ public:
return std::make_shared<DataTypeNumber<typename Impl::ResultType>>();
}
template <typename EnumType>
static ColumnPtr genStringColumnFromEnumColumn(const ColumnWithTypeAndName & argument)
{
const auto * col = argument.column.get();
const auto * type = argument.type.get();
auto res = ColumnString::create();
res->reserve(col->size());
if constexpr (std::is_same_v<DataTypeEnum8, EnumType>)
{
const auto * enum_col = typeid_cast<const ColumnInt8 *>(col);
const auto * enum_type = typeid_cast<const DataTypeEnum8 *>(type);
const auto size = enum_col->size();
for (size_t i = 0; i < size; ++i)
{
StringRef value = enum_type->getNameForValue(enum_col->getData()[i]);
res->insertData(value.data, value.size);
}
}
else if constexpr (std::is_same_v<DataTypeEnum16, EnumType>)
{
const auto * enum_col = typeid_cast<const ColumnInt16 *>(col);
const auto size = enum_col->size();
const auto * enum_type = typeid_cast<const DataTypeEnum16 *>(type);
for (size_t i = 0; i < size; ++i)
{
StringRef value = enum_type->getNameForValue(enum_col->getData()[i]);
res->insertData(value.data, value.size);
}
}
return res;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
const ColumnPtr & column_haystack = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[0].column : arguments[1].column;
auto & haystack_argument = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[0] : arguments[1];
ColumnPtr column_haystack = haystack_argument.column;
const ColumnPtr & column_needle = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[1].column : arguments[0].column;
bool is_enum8 = isEnum8(haystack_argument.type);
bool is_enum16 = isEnum16(haystack_argument.type);
if (is_enum8)
{
column_haystack = genStringColumnFromEnumColumn<DataTypeEnum8>(haystack_argument);
}
if (is_enum16)
{
column_haystack = genStringColumnFromEnumColumn<DataTypeEnum16>(haystack_argument);
}
ColumnPtr column_start_pos = nullptr;
if (arguments.size() >= 3)
column_start_pos = arguments[2].column;

View File

@ -7,7 +7,6 @@
#include <Columns/ColumnVector.h>
#include <Common/typeid_cast.h>
#include <Common/NaNUtils.h>
#include <Common/SipHash.h>
#include <base/range.h>
/// Warning in boost::geometry during template strategy substitution.
@ -612,28 +611,4 @@ NO_INLINE ColumnPtr pointInPolygon(const IColumn & x, const IColumn & y, PointIn
using Impl = TypeListChangeRoot<CallPointInPolygon, TypeListNativeNumber>;
return Impl::call(x, y, impl);
}
template <typename Polygon>
UInt128 sipHash128(Polygon && polygon)
{
SipHash hash;
auto hash_ring = [&hash](const auto & ring)
{
UInt32 size = static_cast<UInt32>(ring.size());
hash.update(size);
hash.update(reinterpret_cast<const char *>(ring.data()), size * sizeof(ring[0]));
};
hash_ring(polygon.outer());
const auto & inners = polygon.inners();
hash.update(inners.size());
for (auto & inner : inners)
hash_ring(inner);
return hash.get128();
}
}

View File

@ -4,6 +4,7 @@
#include <Common/filesystemHelpers.h>
#include <Common/FieldVisitorToString.h>
#include <Common/quoteString.h>
#include <DataTypes/FieldToDataType.h>
#include <Processors/Sources/ShellCommandSource.h>

View File

@ -1,5 +1,6 @@
#include <Columns/ColumnConst.h>
#include <Functions/array/arrayEnumerateRanked.h>
#include <Common/SipHash.h>
#include <Common/assert_cast.h>
#include <algorithm>
@ -12,6 +13,18 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
UInt128 hash128depths(const std::vector<size_t> & indices, const ColumnRawPtrs & key_columns)
{
SipHash hash;
for (size_t j = 0, keys_size = key_columns.size(); j < keys_size; ++j)
{
// Debug: const auto & field = (*key_columns[j])[indices[j]]; DUMP(j, indices[j], field);
key_columns[j]->updateHashWithValue(indices[j], hash);
}
return hash.get128();
}
ArraysDepths getArraysDepths(const ColumnsWithTypeAndName & arguments, const char * function_name)
{
const size_t num_arguments = arguments.size();

View File

@ -10,7 +10,6 @@
#include <Functions/IFunction.h>
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/Context_fwd.h>
#include <Common/ColumnsHashing.h>
#include <Common/HashTable/ClearableHashMap.h>
@ -132,18 +131,7 @@ private:
/// Hash a set of keys into a UInt128 value.
static UInt128 hash128depths(const std::vector<size_t> & indices, const ColumnRawPtrs & key_columns)
{
SipHash hash;
for (size_t j = 0, keys_size = key_columns.size(); j < keys_size; ++j)
{
// Debug: const auto & field = (*key_columns[j])[indices[j]]; DUMP(j, indices[j], field);
key_columns[j]->updateHashWithValue(indices[j], hash);
}
return hash.get128();
}
UInt128 hash128depths(const std::vector<size_t> & indices, const ColumnRawPtrs & key_columns);
template <typename Derived>
ColumnPtr FunctionArrayEnumerateRankedExtended<Derived>::executeImpl(

Some files were not shown because too many files have changed in this diff Show More