Merge remote-tracking branch 'origin/master' into unite-storages3-and-disks3-settings

This commit is contained in:
kssenii 2024-05-27 12:58:12 +02:00
commit a47f7d56e1
66 changed files with 1282 additions and 421 deletions

View File

@ -136,7 +136,7 @@ jobs:
MarkReleaseReady:
if: ${{ !failure() && !cancelled() }}
needs: [RunConfig, Builds_1]
needs: [RunConfig, Builds_1, Builds_2]
runs-on: [self-hosted, style-checker-aarch64]
steps:
- name: Debug

View File

@ -33,6 +33,10 @@ name: Build ClickHouse
additional_envs:
description: additional ENV variables to setup the job
type: string
secrets:
secret_envs:
description: if given, it's passed to the environments
required: false
jobs:
Build:
@ -54,6 +58,7 @@ jobs:
run: |
cat >> "$GITHUB_ENV" << 'EOF'
${{inputs.additional_envs}}
${{secrets.secret_envs}}
DOCKER_TAG<<DOCKER_JSON
${{ toJson(fromJson(inputs.data).docker_data.images) }}
DOCKER_JSON

View File

@ -13,6 +13,10 @@ name: BuildStageWF
description: ci data
type: string
required: true
secrets:
secret_envs:
description: if given, it's passed to the environments
required: false
jobs:
s:
@ -30,3 +34,5 @@ jobs:
# for now let's do I deep checkout for builds
checkout_depth: 0
data: ${{ inputs.data }}
secrets:
secret_envs: ${{ secrets.secret_envs }}

View File

@ -10,6 +10,10 @@ name: StageWF
description: ci data
type: string
required: true
secrets:
secret_envs:
description: if given, it's passed to the environments
required: false
jobs:
s:
@ -23,3 +27,5 @@ jobs:
test_name: ${{ matrix.job_name_and_runner_type.job_name }}
runner_type: ${{ matrix.job_name_and_runner_type.runner_type }}
data: ${{ inputs.data }}
secrets:
secret_envs: ${{ secrets.secret_envs }}

View File

@ -1975,143 +1975,3 @@ Result:
│ 2,"good" │
└───────────────────────────────────────────┘
```
## snowflakeToDateTime
Extracts the timestamp component of a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) in [DateTime](../data-types/datetime.md) format.
**Syntax**
``` sql
snowflakeToDateTime(value[, time_zone])
```
**Arguments**
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
- `time_zone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../data-types/string.md).
**Returned value**
- The timestamp component of `value` as a [DateTime](../data-types/datetime.md) value.
**Example**
Query:
``` sql
SELECT snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC');
```
Result:
```response
┌─snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC')─┐
│ 2021-08-15 10:57:56 │
└──────────────────────────────────────────────────────────────────┘
```
## snowflakeToDateTime64
Extracts the timestamp component of a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) in [DateTime64](../data-types/datetime64.md) format.
**Syntax**
``` sql
snowflakeToDateTime64(value[, time_zone])
```
**Arguments**
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
- `time_zone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../data-types/string.md).
**Returned value**
- The timestamp component of `value` as a [DateTime64](../data-types/datetime64.md) with scale = 3, i.e. millisecond precision.
**Example**
Query:
``` sql
SELECT snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC');
```
Result:
```response
┌─snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC')─┐
│ 2021-08-15 10:58:19.841 │
└────────────────────────────────────────────────────────────────────┘
```
## dateTimeToSnowflake
Converts a [DateTime](../data-types/datetime.md) value to the first [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) at the giving time.
**Syntax**
``` sql
dateTimeToSnowflake(value)
```
**Arguments**
- `value` — Date with time. [DateTime](../data-types/datetime.md).
**Returned value**
- Input value converted to the [Int64](../data-types/int-uint.md) data type as the first Snowflake ID at that time.
**Example**
Query:
``` sql
WITH toDateTime('2021-08-15 18:57:56', 'Asia/Shanghai') AS dt SELECT dateTimeToSnowflake(dt);
```
Result:
```response
┌─dateTimeToSnowflake(dt)─┐
│ 1426860702823350272 │
└─────────────────────────┘
```
## dateTime64ToSnowflake
Convert a [DateTime64](../data-types/datetime64.md) to the first [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) at the giving time.
**Syntax**
``` sql
dateTime64ToSnowflake(value)
```
**Arguments**
- `value` — Date with time. [DateTime64](../data-types/datetime64.md).
**Returned value**
- Input value converted to the [Int64](../data-types/int-uint.md) data type as the first Snowflake ID at that time.
**Example**
Query:
``` sql
WITH toDateTime64('2021-08-15 18:57:56.492', 3, 'Asia/Shanghai') AS dt64 SELECT dateTime64ToSnowflake(dt64);
```
Result:
```response
┌─dateTime64ToSnowflake(dt64)─┐
│ 1426860704886947840 │
└─────────────────────────────┘
```

View File

@ -668,7 +668,7 @@ Result:
└──────────────────────────────────────────────────────────────────────────────────────┘
```
## serverUUID()
## serverUUID
Returns the random UUID generated during the first start of the ClickHouse server. The UUID is stored in file `uuid` in the ClickHouse server directory (e.g. `/var/lib/clickhouse/`) and retained between server restarts.
@ -682,6 +682,275 @@ serverUUID()
- The UUID of the server. [UUID](../data-types/uuid.md).
## generateSnowflakeID
Generates a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID).
The generated Snowflake ID contains the current Unix timestamp in milliseconds 41 (+ 1 top zero bit) bits, followed by machine id (10 bits), a counter (12 bits) to distinguish IDs within a millisecond.
For any given timestamp (unix_ts_ms), the counter starts at 0 and is incremented by 1 for each new Snowflake ID until the timestamp changes.
In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to 0.
Function `generateSnowflakeID` guarantees that the counter field within a timestamp increments monotonically across all function invocations in concurrently running threads and queries.
```
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
├─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┤
|0| timestamp |
├─┼ ┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┤
| | machine_id | machine_seq_num |
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
```
**Syntax**
``` sql
generateSnowflakeID([expr])
```
**Arguments**
- `expr` — An arbitrary [expression](../../sql-reference/syntax.md#syntax-expressions) used to bypass [common subexpression elimination](../../sql-reference/functions/index.md#common-subexpression-elimination) if the function is called multiple times in a query. The value of the expression has no effect on the returned Snowflake ID. Optional.
**Returned value**
A value of type UInt64.
**Example**
First, create a table with a column of type UInt64, then insert a generated Snowflake ID into the table.
``` sql
CREATE TABLE tab (id UInt64) ENGINE = Memory;
INSERT INTO tab SELECT generateSnowflakeID();
SELECT * FROM tab;
```
Result:
```response
┌──────────────────id─┐
│ 7199081390080409600 │
└─────────────────────┘
```
**Example with multiple Snowflake IDs generated per row**
```sql
SELECT generateSnowflakeID(1), generateSnowflakeID(2);
┌─generateSnowflakeID(1)─┬─generateSnowflakeID(2)─┐
│ 7199081609652224000 │ 7199081609652224001 │
└────────────────────────┴────────────────────────┘
```
## generateSnowflakeIDThreadMonotonic
Generates a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID).
The generated Snowflake ID contains the current Unix timestamp in milliseconds 41 (+ 1 top zero bit) bits, followed by machine id (10 bits), a counter (12 bits) to distinguish IDs within a millisecond.
For any given timestamp (unix_ts_ms), the counter starts at 0 and is incremented by 1 for each new Snowflake ID until the timestamp changes.
In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to 0.
This function behaves like `generateSnowflakeID` but gives no guarantee on counter monotony across different simultaneous requests.
Monotonicity within one timestamp is guaranteed only within the same thread calling this function to generate Snowflake IDs.
```
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
├─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┤
|0| timestamp |
├─┼ ┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┤
| | machine_id | machine_seq_num |
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
```
**Syntax**
``` sql
generateSnowflakeIDThreadMonotonic([expr])
```
**Arguments**
- `expr` — An arbitrary [expression](../../sql-reference/syntax.md#syntax-expressions) used to bypass [common subexpression elimination](../../sql-reference/functions/index.md#common-subexpression-elimination) if the function is called multiple times in a query. The value of the expression has no effect on the returned Snowflake ID. Optional.
**Returned value**
A value of type UInt64.
**Example**
First, create a table with a column of type UInt64, then insert a generated Snowflake ID into the table.
``` sql
CREATE TABLE tab (id UInt64) ENGINE = Memory;
INSERT INTO tab SELECT generateSnowflakeIDThreadMonotonic();
SELECT * FROM tab;
```
Result:
```response
┌──────────────────id─┐
│ 7199082832006627328 │
└─────────────────────┘
```
**Example with multiple Snowflake IDs generated per row**
```sql
SELECT generateSnowflakeIDThreadMonotonic(1), generateSnowflakeIDThreadMonotonic(2);
┌─generateSnowflakeIDThreadMonotonic(1)─┬─generateSnowflakeIDThreadMonotonic(2)─┐
│ 7199082940311945216 │ 7199082940316139520 │
└───────────────────────────────────────┴───────────────────────────────────────┘
```
## snowflakeToDateTime
Extracts the timestamp component of a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) in [DateTime](../data-types/datetime.md) format.
**Syntax**
``` sql
snowflakeToDateTime(value[, time_zone])
```
**Arguments**
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
- `time_zone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../data-types/string.md).
**Returned value**
- The timestamp component of `value` as a [DateTime](../data-types/datetime.md) value.
**Example**
Query:
``` sql
SELECT snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC');
```
Result:
```response
┌─snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC')─┐
│ 2021-08-15 10:57:56 │
└──────────────────────────────────────────────────────────────────┘
```
## snowflakeToDateTime64
Extracts the timestamp component of a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) in [DateTime64](../data-types/datetime64.md) format.
**Syntax**
``` sql
snowflakeToDateTime64(value[, time_zone])
```
**Arguments**
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
- `time_zone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../data-types/string.md).
**Returned value**
- The timestamp component of `value` as a [DateTime64](../data-types/datetime64.md) with scale = 3, i.e. millisecond precision.
**Example**
Query:
``` sql
SELECT snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC');
```
Result:
```response
┌─snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC')─┐
│ 2021-08-15 10:58:19.841 │
└────────────────────────────────────────────────────────────────────┘
```
## dateTimeToSnowflake
Converts a [DateTime](../data-types/datetime.md) value to the first [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) at the giving time.
**Syntax**
``` sql
dateTimeToSnowflake(value)
```
**Arguments**
- `value` — Date with time. [DateTime](../data-types/datetime.md).
**Returned value**
- Input value converted to the [Int64](../data-types/int-uint.md) data type as the first Snowflake ID at that time.
**Example**
Query:
``` sql
WITH toDateTime('2021-08-15 18:57:56', 'Asia/Shanghai') AS dt SELECT dateTimeToSnowflake(dt);
```
Result:
```response
┌─dateTimeToSnowflake(dt)─┐
│ 1426860702823350272 │
└─────────────────────────┘
```
## dateTime64ToSnowflake
Convert a [DateTime64](../data-types/datetime64.md) to the first [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) at the giving time.
**Syntax**
``` sql
dateTime64ToSnowflake(value)
```
**Arguments**
- `value` — Date with time. [DateTime64](../data-types/datetime64.md).
**Returned value**
- Input value converted to the [Int64](../data-types/int-uint.md) data type as the first Snowflake ID at that time.
**Example**
Query:
``` sql
WITH toDateTime64('2021-08-15 18:57:56.492', 3, 'Asia/Shanghai') AS dt64 SELECT dateTime64ToSnowflake(dt64);
```
Result:
```response
┌─dateTime64ToSnowflake(dt64)─┐
│ 1426860704886947840 │
└─────────────────────────────┘
```
## See also
- [dictGetUUID](../functions/ext-dict-functions.md#ext_dict_functions-other)

View File

@ -753,10 +753,11 @@ size_t getMaxArraySize()
return 0xFFFFFF;
}
bool hasLimitArraySize()
bool discardOnLimitReached()
{
if (auto context = Context::getGlobalContextInstance())
return context->getServerSettings().aggregate_function_group_array_has_limit_size;
return context->getServerSettings().aggregate_function_group_array_action_when_limit_is_reached
== GroupArrayActionWhenLimitReached::DISCARD;
return false;
}
@ -767,7 +768,7 @@ AggregateFunctionPtr createAggregateFunctionGroupArray(
{
assertUnary(name, argument_types);
bool limit_size = hasLimitArraySize();
bool has_limit = discardOnLimitReached();
UInt64 max_elems = getMaxArraySize();
if (parameters.empty())
@ -784,14 +785,14 @@ AggregateFunctionPtr createAggregateFunctionGroupArray(
(type == Field::Types::UInt64 && parameters[0].get<UInt64>() == 0))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter for aggregate function {} should be positive number", name);
limit_size = true;
has_limit = true;
max_elems = parameters[0].get<UInt64>();
}
else
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Incorrect number of parameters for aggregate function {}, should be 0 or 1", name);
if (!limit_size)
if (!has_limit)
{
if (Tlast)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "groupArrayLast make sense only with max_elems (groupArrayLast(max_elems)())");

View File

@ -3,6 +3,7 @@
#include <Core/BaseSettings.h>
#include <Core/Defines.h>
#include <Core/SettingsEnums.h>
namespace Poco::Util
@ -51,7 +52,7 @@ namespace DB
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \
M(Bool, aggregate_function_group_array_has_limit_size, false, "When the max array element size is exceeded, a `Too large array size` exception will be thrown by default. When set to true, no exception will be thrown, and the excess elements will be discarded.", 0) \
M(GroupArrayActionWhenLimitReached, aggregate_function_group_array_action_when_limit_is_reached, GroupArrayActionWhenLimitReached::THROW, "Action to execute when max array element size is exceeded in groupArray: `throw` exception, or `discard` extra values", 0) \
M(UInt64, max_server_memory_usage, 0, "Maximum total memory usage of the server in bytes. Zero means unlimited.", 0) \
M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to RAM ratio. Allows to lower max memory on low-memory systems.", 0) \
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Maximum total memory usage for merges and mutations in bytes. Zero means unlimited.", 0) \

View File

@ -85,6 +85,14 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"24.6", {{"hdfs_throw_on_zero_files_match", false, false, "Allow to throw an error when ListObjects request cannot match any files in HDFS engine instead of empty query result"},
{"azure_throw_on_zero_files_match", false, false, "Allow to throw an error when ListObjects request cannot match any files in AzureBlobStorage engine instead of empty query result"},
{"s3_validate_request_settings", true, true, "Allow to disable S3 request settings validation"},
{"azure_skip_empty_files", false, false, "Allow to skip empty files in azure table engine"},
{"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"},
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
}},
{"24.5", {{"allow_deprecated_functions", true, false, "Allow usage of deprecated functions"},
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},
{"input_format_tsv_crlf_end_of_line", false, false, "Enables reading of CRLF line endings with TSV formats"},
@ -93,13 +101,6 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"cross_join_min_bytes_to_compress", 0, 1_GiB, "Minimal size of block to compress in CROSS JOIN. Zero value means - disable this threshold. This block is compressed when any of the two thresholds (by rows or by bytes) are reached."},
{"http_max_chunk_size", 0, 0, "Internal limitation"},
{"prefer_external_sort_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging."},
{"hdfs_throw_on_zero_files_match", false, false, "Allow to throw an error when ListObjects request cannot match any files in HDFS engine instead of empty query result"},
{"azure_throw_on_zero_files_match", false, false, "Allow to throw an error when ListObjects request cannot match any files in AzureBlobStorage engine instead of empty query result"},
{"s3_validate_request_settings", true, true, "Allow to disable S3 request settings validation"},
{"azure_skip_empty_files", false, false, "Allow to skip empty files in azure table engine"},
{"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"},
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
{"input_format_parquet_use_native_reader", false, false, "When reading Parquet files, to use native reader instead of arrow reader."},
{"input_format_force_null_for_omitted_fields", false, false, "Disable type-defaults for omitted fields when needed"},
{"cast_string_to_dynamic_use_inference", false, false, "Add setting to allow converting String to Dynamic through parsing"},

View File

@ -229,4 +229,9 @@ IMPLEMENT_SETTING_ENUM(SQLSecurityType, ErrorCodes::BAD_ARGUMENTS,
{{"DEFINER", SQLSecurityType::DEFINER},
{"INVOKER", SQLSecurityType::INVOKER},
{"NONE", SQLSecurityType::NONE}})
IMPLEMENT_SETTING_ENUM(
GroupArrayActionWhenLimitReached,
ErrorCodes::BAD_ARGUMENTS,
{{"throw", GroupArrayActionWhenLimitReached::THROW}, {"discard", GroupArrayActionWhenLimitReached::DISCARD}})
}

View File

@ -370,4 +370,12 @@ DECLARE_SETTING_ENUM(SchemaInferenceMode)
DECLARE_SETTING_ENUM_WITH_RENAME(DateTimeOverflowBehavior, FormatSettings::DateTimeOverflowBehavior)
DECLARE_SETTING_ENUM(SQLSecurityType)
enum class GroupArrayActionWhenLimitReached : uint8_t
{
THROW,
DISCARD
};
DECLARE_SETTING_ENUM(GroupArrayActionWhenLimitReached)
}

View File

@ -574,24 +574,21 @@ void S3ObjectStorage::applyNewSettings(
ContextPtr context,
const ApplyNewSettingsOptions & options)
{
auto new_s3_settings = getSettings(config, config_prefix, context, for_disk_s3, context->getSettingsRef().s3_validate_request_settings);
if (!static_headers.empty())
{
new_s3_settings->auth_settings.headers.insert(
new_s3_settings->auth_settings.headers.end(),
static_headers.begin(), static_headers.end());
}
auto settings_from_config = getSettings(config, config_prefix, context, for_disk_s3, context->getSettingsRef().s3_validate_request_settings);
auto modified_settings = std::make_unique<S3ObjectStorageSettings>(*s3_settings.get());
modified_settings->auth_settings.updateFrom(settings_from_config->auth_settings);
if (auto endpoint_settings = context->getStorageS3Settings().getSettings(uri.uri.toString(), context->getUserName()))
new_s3_settings->auth_settings.updateFrom(endpoint_settings->auth_settings);
modified_settings->auth_settings.updateFrom(endpoint_settings->auth_settings);
auto current_s3_settings = s3_settings.get();
if (options.allow_client_change && (current_s3_settings->auth_settings.hasUpdates(new_s3_settings->auth_settings) || for_disk_s3))
auto current_settings = s3_settings.get();
if (options.allow_client_change
&& (current_settings->auth_settings.hasUpdates(modified_settings->auth_settings) || for_disk_s3))
{
auto new_client = getClient(uri, *new_s3_settings, context, for_disk_s3);
client.set(std::move(new_client));
}
s3_settings.set(std::move(new_s3_settings));
s3_settings.set(std::move(modified_settings));
}
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(

View File

@ -54,8 +54,7 @@ private:
const S3Capabilities & s3_capabilities_,
ObjectStorageKeysGeneratorPtr key_generator_,
const String & disk_name_,
bool for_disk_s3_ = true,
const HTTPHeaderEntries & static_headers_ = {})
bool for_disk_s3_ = true)
: uri(uri_)
, disk_name(disk_name_)
, client(std::move(client_))
@ -64,7 +63,6 @@ private:
, key_generator(std::move(key_generator_))
, log(getLogger(logger_name))
, for_disk_s3(for_disk_s3_)
, static_headers(static_headers_)
{
}
@ -189,7 +187,6 @@ private:
LoggerPtr log;
const bool for_disk_s3;
const HTTPHeaderEntries static_headers;
};
}

View File

@ -21,8 +21,6 @@ namespace ErrorCodes
const ColumnConst * checkAndGetColumnConstStringOrFixedString(const IColumn * column)
{
if (!column)
return {};
if (!isColumnConst(*column))
return {};

View File

@ -0,0 +1,255 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsRandom.h>
#include <Functions/FunctionHelpers.h>
#include <Core/ServerUUID.h>
#include <Common/Logger.h>
#include <Common/logger_useful.h>
#include "base/types.h"
namespace DB
{
namespace
{
/* Snowflake ID
https://en.wikipedia.org/wiki/Snowflake_ID
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|0| timestamp |
| | machine_id | machine_seq_num |
- The first 41 (+ 1 top zero bit) bits is the timestamp (millisecond since Unix epoch 1 Jan 1970)
- The middle 10 bits are the machine ID
- The last 12 bits are a counter to disambiguate multiple snowflakeIDs generated within the same millisecond by different processes
*/
/// bit counts
constexpr auto timestamp_bits_count = 41;
constexpr auto machine_id_bits_count = 10;
constexpr auto machine_seq_num_bits_count = 12;
/// bits masks for Snowflake ID components
constexpr uint64_t machine_id_mask = ((1ull << machine_id_bits_count) - 1) << machine_seq_num_bits_count;
constexpr uint64_t machine_seq_num_mask = (1ull << machine_seq_num_bits_count) - 1;
/// max values
constexpr uint64_t max_machine_seq_num = machine_seq_num_mask;
uint64_t getTimestamp()
{
auto now = std::chrono::system_clock::now();
auto ticks_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
return static_cast<uint64_t>(ticks_since_epoch) & ((1ull << timestamp_bits_count) - 1);
}
uint64_t getMachineIdImpl()
{
UUID server_uuid = ServerUUID::get();
/// hash into 64 bits
uint64_t hi = UUIDHelpers::getHighBytes(server_uuid);
uint64_t lo = UUIDHelpers::getLowBytes(server_uuid);
/// return only 10 bits
return (((hi * 11) ^ (lo * 17)) & machine_id_mask) >> machine_seq_num_bits_count;
}
uint64_t getMachineId()
{
static uint64_t machine_id = getMachineIdImpl();
return machine_id;
}
struct SnowflakeId
{
uint64_t timestamp;
uint64_t machine_id;
uint64_t machine_seq_num;
};
SnowflakeId toSnowflakeId(uint64_t snowflake)
{
return {.timestamp = (snowflake >> (machine_id_bits_count + machine_seq_num_bits_count)),
.machine_id = ((snowflake & machine_id_mask) >> machine_seq_num_bits_count),
.machine_seq_num = (snowflake & machine_seq_num_mask)};
}
uint64_t fromSnowflakeId(SnowflakeId components)
{
return (components.timestamp << (machine_id_bits_count + machine_seq_num_bits_count) |
components.machine_id << (machine_seq_num_bits_count) |
components.machine_seq_num);
}
struct SnowflakeIdRange
{
SnowflakeId begin; /// inclusive
SnowflakeId end; /// exclusive
};
/// To get the range of `input_rows_count` Snowflake IDs from `max(available, now)`:
/// 1. calculate Snowflake ID by current timestamp (`now`)
/// 2. `begin = max(available, now)`
/// 3. Calculate `end = begin + input_rows_count` handling `machine_seq_num` overflow
SnowflakeIdRange getRangeOfAvailableIds(const SnowflakeId & available, size_t input_rows_count)
{
/// 1. `now`
SnowflakeId begin = {.timestamp = getTimestamp(), .machine_id = getMachineId(), .machine_seq_num = 0};
/// 2. `begin`
if (begin.timestamp <= available.timestamp)
{
begin.timestamp = available.timestamp;
begin.machine_seq_num = available.machine_seq_num;
}
/// 3. `end = begin + input_rows_count`
SnowflakeId end;
const uint64_t seq_nums_in_current_timestamp_left = (max_machine_seq_num - begin.machine_seq_num + 1);
if (input_rows_count >= seq_nums_in_current_timestamp_left)
/// if sequence numbers in current timestamp is not enough for rows --> depending on how many elements input_rows_count overflows, forward timestamp by at least 1 tick
end.timestamp = begin.timestamp + 1 + (input_rows_count - seq_nums_in_current_timestamp_left) / (max_machine_seq_num + 1);
else
end.timestamp = begin.timestamp;
end.machine_id = begin.machine_id;
end.machine_seq_num = (begin.machine_seq_num + input_rows_count) & machine_seq_num_mask;
return {begin, end};
}
struct GlobalCounterPolicy
{
static constexpr auto name = "generateSnowflakeID";
static constexpr auto description = R"(Generates a Snowflake ID. The generated Snowflake ID contains the current Unix timestamp in milliseconds 41 (+ 1 top zero bit) bits, followed by machine id (10 bits), a counter (12 bits) to distinguish IDs within a millisecond. For any given timestamp (unix_ts_ms), the counter starts at 0 and is incremented by 1 for each new Snowflake ID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to 0. Function generateSnowflakeID guarantees that the counter field within a timestamp increments monotonically across all function invocations in concurrently running threads and queries.)";
/// Guarantee counter monotonicity within one timestamp across all threads generating Snowflake IDs simultaneously.
struct Data
{
static inline std::atomic<uint64_t> lowest_available_snowflake_id = 0;
SnowflakeId reserveRange(size_t input_rows_count)
{
uint64_t available_snowflake_id = lowest_available_snowflake_id.load();
SnowflakeIdRange range;
do
{
range = getRangeOfAvailableIds(toSnowflakeId(available_snowflake_id), input_rows_count);
}
while (!lowest_available_snowflake_id.compare_exchange_weak(available_snowflake_id, fromSnowflakeId(range.end)));
/// if CAS failed --> another thread updated `lowest_available_snowflake_id` and we re-try
/// else --> our thread reserved ID range [begin, end) and return the beginning of the range
return range.begin;
}
};
};
struct ThreadLocalCounterPolicy
{
static constexpr auto name = "generateSnowflakeIDThreadMonotonic";
static constexpr auto description = R"(Generates a Snowflake ID. The generated Snowflake ID contains the current Unix timestamp in milliseconds 41 (+ 1 top zero bit) bits, followed by machine id (10 bits), a counter (12 bits) to distinguish IDs within a millisecond. For any given timestamp (unix_ts_ms), the counter starts at 0 and is incremented by 1 for each new Snowflake ID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to 0. This function behaves like generateSnowflakeID but gives no guarantee on counter monotony across different simultaneous requests. Monotonicity within one timestamp is guaranteed only within the same thread calling this function to generate Snowflake IDs.)";
/// Guarantee counter monotonicity within one timestamp within the same thread. Faster than GlobalCounterPolicy if a query uses multiple threads.
struct Data
{
static inline thread_local uint64_t lowest_available_snowflake_id = 0;
SnowflakeId reserveRange(size_t input_rows_count)
{
SnowflakeIdRange range = getRangeOfAvailableIds(toSnowflakeId(lowest_available_snowflake_id), input_rows_count);
lowest_available_snowflake_id = fromSnowflakeId(range.end);
return range.begin;
}
};
};
}
template <typename FillPolicy>
class FunctionGenerateSnowflakeID : public IFunction, public FillPolicy
{
public:
static FunctionPtr create(ContextPtr /*context*/) { return std::make_shared<FunctionGenerateSnowflakeID>(); }
String getName() const override { return FillPolicy::name; }
size_t getNumberOfArguments() const override { return 0; }
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return false; }
bool useDefaultImplementationForNulls() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
bool isVariadic() const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
FunctionArgumentDescriptors mandatory_args;
FunctionArgumentDescriptors optional_args{
{"expr", nullptr, nullptr, "Arbitrary expression"}
};
validateFunctionArgumentTypes(*this, arguments, mandatory_args, optional_args);
return std::make_shared<DataTypeUInt64>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & /*arguments*/, const DataTypePtr &, size_t input_rows_count) const override
{
auto col_res = ColumnVector<UInt64>::create();
typename ColumnVector<UInt64>::Container & vec_to = col_res->getData();
if (input_rows_count != 0)
{
vec_to.resize(input_rows_count);
typename FillPolicy::Data data;
SnowflakeId snowflake_id = data.reserveRange(input_rows_count); /// returns begin of available snowflake ids range
for (UInt64 & to_row : vec_to)
{
to_row = fromSnowflakeId(snowflake_id);
if (snowflake_id.machine_seq_num == max_machine_seq_num)
{
/// handle overflow
snowflake_id.machine_seq_num = 0;
++snowflake_id.timestamp;
}
else
{
++snowflake_id.machine_seq_num;
}
}
}
return col_res;
}
};
template<typename FillPolicy>
void registerSnowflakeIDGenerator(auto & factory)
{
static constexpr auto doc_syntax_format = "{}([expression])";
static constexpr auto example_format = "SELECT {}()";
static constexpr auto multiple_example_format = "SELECT {f}(1), {f}(2)";
FunctionDocumentation::Description description = FillPolicy::description;
FunctionDocumentation::Syntax syntax = fmt::format(doc_syntax_format, FillPolicy::name);
FunctionDocumentation::Arguments arguments = {{"expression", "The expression is used to bypass common subexpression elimination if the function is called multiple times in a query but otherwise ignored. Optional."}};
FunctionDocumentation::ReturnedValue returned_value = "A value of type UInt64";
FunctionDocumentation::Examples examples = {{"single", fmt::format(example_format, FillPolicy::name), ""}, {"multiple", fmt::format(multiple_example_format, fmt::arg("f", FillPolicy::name)), ""}};
FunctionDocumentation::Categories categories = {"Snowflake ID"};
factory.template registerFunction<FunctionGenerateSnowflakeID<FillPolicy>>({description, syntax, arguments, returned_value, examples, categories}, FunctionFactory::CaseInsensitive);
}
REGISTER_FUNCTION(GenerateSnowflakeID)
{
registerSnowflakeIDGenerator<GlobalCounterPolicy>(factory);
registerSnowflakeIDGenerator<ThreadLocalCounterPolicy>(factory);
}
}

View File

@ -76,7 +76,7 @@ void setVariant(UUID & uuid)
struct FillAllRandomPolicy
{
static constexpr auto name = "generateUUIDv7NonMonotonic";
static constexpr auto doc_description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), and a random field (74 bit, including a 2-bit variant field "2") to distinguish UUIDs within a millisecond. This function is the fastest generateUUIDv7* function but it gives no monotonicity guarantees within a timestamp.)";
static constexpr auto description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), and a random field (74 bit, including a 2-bit variant field "2") to distinguish UUIDs within a millisecond. This function is the fastest generateUUIDv7* function but it gives no monotonicity guarantees within a timestamp.)";
struct Data
{
void generate(UUID & uuid, uint64_t ts)
@ -136,7 +136,7 @@ struct CounterFields
struct GlobalCounterPolicy
{
static constexpr auto name = "generateUUIDv7";
static constexpr auto doc_description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), a counter (42 bit, including a variant field "2", 2 bit) to distinguish UUIDs within a millisecond, and a random field (32 bits). For any given timestamp (unix_ts_ms), the counter starts at a random value and is incremented by 1 for each new UUID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to a random new start value. Function generateUUIDv7 guarantees that the counter field within a timestamp increments monotonically across all function invocations in concurrently running threads and queries.)";
static constexpr auto description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), a counter (42 bit, including a variant field "2", 2 bit) to distinguish UUIDs within a millisecond, and a random field (32 bits). For any given timestamp (unix_ts_ms), the counter starts at a random value and is incremented by 1 for each new UUID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to a random new start value. Function generateUUIDv7 guarantees that the counter field within a timestamp increments monotonically across all function invocations in concurrently running threads and queries.)";
/// Guarantee counter monotonicity within one timestamp across all threads generating UUIDv7 simultaneously.
struct Data
@ -159,7 +159,7 @@ struct GlobalCounterPolicy
struct ThreadLocalCounterPolicy
{
static constexpr auto name = "generateUUIDv7ThreadMonotonic";
static constexpr auto doc_description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), a counter (42 bit, including a variant field "2", 2 bit) to distinguish UUIDs within a millisecond, and a random field (32 bits). For any given timestamp (unix_ts_ms), the counter starts at a random value and is incremented by 1 for each new UUID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to a random new start value. This function behaves like generateUUIDv7 but gives no guarantee on counter monotony across different simultaneous requests. Monotonicity within one timestamp is guaranteed only within the same thread calling this function to generate UUIDs.)";
static constexpr auto description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), a counter (42 bit, including a variant field "2", 2 bit) to distinguish UUIDs within a millisecond, and a random field (32 bits). For any given timestamp (unix_ts_ms), the counter starts at a random value and is incremented by 1 for each new UUID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to a random new start value. This function behaves like generateUUIDv7 but gives no guarantee on counter monotony across different simultaneous requests. Monotonicity within one timestamp is guaranteed only within the same thread calling this function to generate UUIDs.)";
/// Guarantee counter monotonicity within one timestamp within the same thread. Faster than GlobalCounterPolicy if a query uses multiple threads.
struct Data
@ -186,7 +186,6 @@ class FunctionGenerateUUIDv7Base : public IFunction, public FillPolicy
{
public:
String getName() const final { return FillPolicy::name; }
size_t getNumberOfArguments() const final { return 0; }
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const final { return false; }
@ -198,7 +197,7 @@ public:
{
FunctionArgumentDescriptors mandatory_args;
FunctionArgumentDescriptors optional_args{
{"expr", nullptr, nullptr, "Arbitrary Expression"}
{"expr", nullptr, nullptr, "Arbitrary expression"}
};
validateFunctionArgumentTypes(*this, arguments, mandatory_args, optional_args);
@ -270,14 +269,14 @@ void registerUUIDv7Generator(auto& factory)
static constexpr auto example_format = "SELECT {}()";
static constexpr auto multiple_example_format = "SELECT {f}(1), {f}(2)";
FunctionDocumentation::Description doc_description = FillPolicy::doc_description;
FunctionDocumentation::Syntax doc_syntax = fmt::format(doc_syntax_format, FillPolicy::name);
FunctionDocumentation::Arguments doc_arguments = {{"expression", "The expression is used to bypass common subexpression elimination if the function is called multiple times in a query but otherwise ignored. Optional."}};
FunctionDocumentation::ReturnedValue doc_returned_value = "A value of type UUID version 7.";
FunctionDocumentation::Examples doc_examples = {{"uuid", fmt::format(example_format, FillPolicy::name), ""}, {"multiple", fmt::format(multiple_example_format, fmt::arg("f", FillPolicy::name)), ""}};
FunctionDocumentation::Categories doc_categories = {"UUID"};
FunctionDocumentation::Description description = FillPolicy::description;
FunctionDocumentation::Syntax syntax = fmt::format(doc_syntax_format, FillPolicy::name);
FunctionDocumentation::Arguments arguments = {{"expression", "The expression is used to bypass common subexpression elimination if the function is called multiple times in a query but otherwise ignored. Optional."}};
FunctionDocumentation::ReturnedValue returned_value = "A value of type UUID version 7.";
FunctionDocumentation::Examples examples = {{"single", fmt::format(example_format, FillPolicy::name), ""}, {"multiple", fmt::format(multiple_example_format, fmt::arg("f", FillPolicy::name)), ""}};
FunctionDocumentation::Categories categories = {"UUID"};
factory.template registerFunction<FunctionGenerateUUIDv7Base<FillPolicy>>({doc_description, doc_syntax, doc_arguments, doc_returned_value, doc_examples, doc_categories}, FunctionFactory::CaseInsensitive);
factory.template registerFunction<FunctionGenerateUUIDv7Base<FillPolicy>>({description, syntax, arguments, returned_value, examples, categories}, FunctionFactory::CaseInsensitive);
}
REGISTER_FUNCTION(GenerateUUIDv7)

View File

@ -126,6 +126,11 @@ bool astContainsSystemTables(ASTPtr ast, ContextPtr context)
namespace
{
bool isQueryCacheRelatedSetting(const String & setting_name)
{
return setting_name.starts_with("query_cache_") || setting_name.ends_with("_query_cache");
}
class RemoveQueryCacheSettingsMatcher
{
public:
@ -141,7 +146,7 @@ public:
auto is_query_cache_related_setting = [](const auto & change)
{
return change.name.starts_with("query_cache_") || change.name.ends_with("_query_cache");
return isQueryCacheRelatedSetting(change.name);
};
std::erase_if(set_clause->changes, is_query_cache_related_setting);
@ -177,11 +182,11 @@ ASTPtr removeQueryCacheSettings(ASTPtr ast)
return transformed_ast;
}
IAST::Hash calculateAstHash(ASTPtr ast, const String & current_database)
IAST::Hash calculateAstHash(ASTPtr ast, const String & current_database, const Settings & settings)
{
ast = removeQueryCacheSettings(ast);
/// Hash the AST, it must consider aliases (issue #56258)
/// Hash the AST, we must consider aliases (issue #56258)
SipHash hash;
ast->updateTreeHash(hash, /*ignore_aliases=*/ false);
@ -189,6 +194,25 @@ IAST::Hash calculateAstHash(ASTPtr ast, const String & current_database)
/// tables (issue #64136)
hash.update(current_database);
/// Finally, hash the (changed) settings as they might affect the query result (e.g. think of settings `additional_table_filters` and `limit`).
/// Note: allChanged() returns the settings in random order. Also, update()-s of the composite hash must be done in deterministic order.
/// Therefore, collect and sort the settings first, then hash them.
Settings::Range changed_settings = settings.allChanged();
std::vector<std::pair<String, String>> changed_settings_sorted; /// (name, value)
for (const auto & setting : changed_settings)
{
const String & name = setting.getName();
const String & value = setting.getValueString();
if (!isQueryCacheRelatedSetting(name)) /// see removeQueryCacheSettings() why this is a good idea
changed_settings_sorted.push_back({name, value});
}
std::sort(changed_settings_sorted.begin(), changed_settings_sorted.end(), [](auto & lhs, auto & rhs) { return lhs.first < rhs.first; });
for (const auto & setting : changed_settings_sorted)
{
hash.update(setting.first);
hash.update(setting.second);
}
return getSipHash128AsPair(hash);
}
@ -204,12 +228,13 @@ String queryStringFromAST(ASTPtr ast)
QueryCache::Key::Key(
ASTPtr ast_,
const String & current_database,
const Settings & settings,
Block header_,
std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_,
bool is_shared_,
std::chrono::time_point<std::chrono::system_clock> expires_at_,
bool is_compressed_)
: ast_hash(calculateAstHash(ast_, current_database))
: ast_hash(calculateAstHash(ast_, current_database, settings))
, header(header_)
, user_id(user_id_)
, current_user_roles(current_user_roles_)
@ -220,8 +245,8 @@ QueryCache::Key::Key(
{
}
QueryCache::Key::Key(ASTPtr ast_, const String & current_database, std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_)
: QueryCache::Key(ast_, current_database, {}, user_id_, current_user_roles_, false, std::chrono::system_clock::from_time_t(1), false) /// dummy values for everything != AST, current database, user name/roles
QueryCache::Key::Key(ASTPtr ast_, const String & current_database, const Settings & settings, std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_)
: QueryCache::Key(ast_, current_database, settings, {}, user_id_, current_user_roles_, false, std::chrono::system_clock::from_time_t(1), false) /// dummy values for everything != AST, current database, user name/roles
{
}

View File

@ -14,6 +14,8 @@
namespace DB
{
struct Settings;
/// Does AST contain non-deterministic functions like rand() and now()?
bool astContainsNonDeterministicFunctions(ASTPtr ast, ContextPtr context);
@ -89,6 +91,7 @@ public:
/// Ctor to construct a Key for writing into query cache.
Key(ASTPtr ast_,
const String & current_database,
const Settings & settings,
Block header_,
std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_,
bool is_shared_,
@ -96,7 +99,7 @@ public:
bool is_compressed);
/// Ctor to construct a Key for reading from query cache (this operation only needs the AST + user name).
Key(ASTPtr ast_, const String & current_database, std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_);
Key(ASTPtr ast_, const String & current_database, const Settings & settings, std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_);
bool operator==(const Key & other) const;
};

View File

@ -1093,6 +1093,15 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
&& (ast->as<ASTSelectQuery>() || ast->as<ASTSelectWithUnionQuery>());
QueryCache::Usage query_cache_usage = QueryCache::Usage::None;
/// If the query runs with "use_query_cache = 1", we first probe if the query cache already contains the query result (if yes:
/// return result from cache). If doesn't, we execute the query normally and write the result into the query cache. Both steps use a
/// hash of the AST, the current database and the settings as cache key. Unfortunately, the settings are in some places internally
/// modified between steps 1 and 2 (= during query execution) - this is silly but hard to forbid. As a result, the hashes no longer
/// match and the cache is rendered ineffective. Therefore make a copy of the settings and use it for steps 1 and 2.
std::optional<Settings> settings_copy;
if (can_use_query_cache)
settings_copy = settings;
if (!async_insert)
{
/// If it is a non-internal SELECT, and passive (read) use of the query cache is enabled, and the cache knows the query, then set
@ -1101,7 +1110,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{
if (can_use_query_cache && settings.enable_reads_from_query_cache)
{
QueryCache::Key key(ast, context->getCurrentDatabase(), context->getUserID(), context->getCurrentRoles());
QueryCache::Key key(ast, context->getCurrentDatabase(), *settings_copy, context->getUserID(), context->getCurrentRoles());
QueryCache::Reader reader = query_cache->createReader(key);
if (reader.hasCacheEntryForKey())
{
@ -1224,7 +1233,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
&& (!ast_contains_system_tables || system_table_handling == QueryCacheSystemTableHandling::Save))
{
QueryCache::Key key(
ast, context->getCurrentDatabase(), res.pipeline.getHeader(),
ast, context->getCurrentDatabase(), *settings_copy, res.pipeline.getHeader(),
context->getUserID(), context->getCurrentRoles(),
settings.query_cache_share_between_users,
std::chrono::system_clock::now() + std::chrono::seconds(settings.query_cache_ttl),

View File

@ -43,7 +43,6 @@ class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
class IMergeTreeReader;
class IMergeTreeDataPartWriter;
class MarkCache;
class UncompressedCache;
class MergeTreeTransaction;
@ -74,7 +73,6 @@ public:
using VirtualFields = std::unordered_map<String, Field>;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
using MergeTreeWriterPtr = std::unique_ptr<IMergeTreeDataPartWriter>;
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
using NameToNumber = std::unordered_map<std::string, size_t>;
@ -106,15 +104,6 @@ public:
const ValueSizeMap & avg_value_size_hints_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_) const = 0;
virtual MergeTreeWriterPtr getWriter(
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity) = 0;
virtual bool isStoredOnDisk() const = 0;
virtual bool isStoredOnRemoteDisk() const = 0;
@ -172,6 +161,8 @@ public:
const SerializationInfoByName & getSerializationInfos() const { return serialization_infos; }
const SerializationByName & getSerializations() const { return serializations; }
SerializationPtr getSerialization(const String & column_name) const;
SerializationPtr tryGetSerialization(const String & column_name) const;
@ -201,6 +192,7 @@ public:
/// take place, you must take original name of column for this part from
/// storage and pass it to this method.
std::optional<size_t> getColumnPosition(const String & column_name) const;
const NameToNumber & getColumnPositions() const { return column_name_to_position; }
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.

View File

@ -3,6 +3,13 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
Block getBlockAndPermute(const Block & block, const Names & names, const IColumn::Permutation * permutation)
{
Block result;
@ -38,18 +45,27 @@ Block permuteBlockIfNeeded(const Block & block, const IColumn::Permutation * per
}
IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
const MergeTreeMutableDataPartPtr & data_part_,
const String & data_part_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list_,
const StorageMetadataPtr & metadata_snapshot_,
const VirtualsDescriptionPtr & virtual_columns_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: data_part(data_part_)
, storage(data_part_->storage)
: data_part_name(data_part_name_)
, serializations(serializations_)
, index_granularity_info(index_granularity_info_)
, storage_settings(storage_settings_)
, metadata_snapshot(metadata_snapshot_)
, virtual_columns(virtual_columns_)
, columns_list(columns_list_)
, settings(settings_)
, index_granularity(index_granularity_)
, with_final_mark(settings.can_use_adaptive_granularity)
, data_part_storage(data_part_storage_)
, index_granularity(index_granularity_)
{
}
@ -60,6 +76,102 @@ Columns IMergeTreeDataPartWriter::releaseIndexColumns()
std::make_move_iterator(index_columns.end()));
}
SerializationPtr IMergeTreeDataPartWriter::getSerialization(const String & column_name) const
{
auto it = serializations.find(column_name);
if (it == serializations.end())
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE,
"There is no column or subcolumn {} in part {}", column_name, data_part_name);
return it->second;
}
ASTPtr IMergeTreeDataPartWriter::getCodecDescOrDefault(const String & column_name, CompressionCodecPtr default_codec) const
{
auto get_codec_or_default = [&](const auto & column_desc)
{
return column_desc.codec ? column_desc.codec : default_codec->getFullCodecDesc();
};
const auto & columns = metadata_snapshot->getColumns();
if (const auto * column_desc = columns.tryGet(column_name))
return get_codec_or_default(*column_desc);
if (const auto * virtual_desc = virtual_columns->tryGetDescription(column_name))
return get_codec_or_default(*virtual_desc);
return default_codec->getFullCodecDesc();
}
IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default;
MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter(
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list,
const ColumnPositions & column_positions,
const StorageMetadataPtr & metadata_snapshot,
const VirtualsDescriptionPtr & virtual_columns,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity);
MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter(
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const VirtualsDescriptionPtr & virtual_columns,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity);
MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter(
MergeTreeDataPartType part_type,
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list,
const ColumnPositions & column_positions,
const StorageMetadataPtr & metadata_snapshot,
const VirtualsDescriptionPtr & virtual_columns,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity)
{
if (part_type == MergeTreeDataPartType::Compact)
return createMergeTreeDataPartCompactWriter(data_part_name_, logger_name_, serializations_, data_part_storage_,
index_granularity_info_, storage_settings_, columns_list, column_positions, metadata_snapshot, virtual_columns, indices_to_recalc, stats_to_recalc_,
marks_file_extension_, default_codec_, writer_settings, computed_index_granularity);
else if (part_type == MergeTreeDataPartType::Wide)
return createMergeTreeDataPartWideWriter(data_part_name_, logger_name_, serializations_, data_part_storage_,
index_granularity_info_, storage_settings_, columns_list, metadata_snapshot, virtual_columns, indices_to_recalc, stats_to_recalc_,
marks_file_extension_, default_codec_, writer_settings, computed_index_granularity);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown part type: {}", part_type.toString());
}
}

View File

@ -1,12 +1,13 @@
#pragma once
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/HashingWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Disks/IDisk.h>
#include <Storages/MergeTree/MergeTreeDataPartType.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/Statistics/Statistics.h>
#include <Storages/VirtualColumnsDescription.h>
namespace DB
@ -22,9 +23,14 @@ class IMergeTreeDataPartWriter : private boost::noncopyable
{
public:
IMergeTreeDataPartWriter(
const MergeTreeMutableDataPartPtr & data_part_,
const String & data_part_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list_,
const StorageMetadataPtr & metadata_snapshot_,
const VirtualsDescriptionPtr & virtual_columns_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_ = {});
@ -32,7 +38,7 @@ public:
virtual void write(const Block & block, const IColumn::Permutation * permutation) = 0;
virtual void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) = 0;
virtual void fillChecksums(MergeTreeDataPartChecksums & checksums, NameSet & checksums_to_remove) = 0;
virtual void finish(bool sync) = 0;
@ -40,16 +46,48 @@ public:
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
protected:
SerializationPtr getSerialization(const String & column_name) const;
const MergeTreeMutableDataPartPtr data_part;
const MergeTreeData & storage;
ASTPtr getCodecDescOrDefault(const String & column_name, CompressionCodecPtr default_codec) const;
IDataPartStorage & getDataPartStorage() { return *data_part_storage; }
const String data_part_name;
/// Serializations for every columns and subcolumns by their names.
const SerializationByName serializations;
const MergeTreeIndexGranularityInfo index_granularity_info;
const MergeTreeSettingsPtr storage_settings;
const StorageMetadataPtr metadata_snapshot;
const VirtualsDescriptionPtr virtual_columns;
const NamesAndTypesList columns_list;
const MergeTreeWriterSettings settings;
MergeTreeIndexGranularity index_granularity;
const bool with_final_mark;
MutableDataPartStoragePtr data_part_storage;
MutableColumns index_columns;
MergeTreeIndexGranularity index_granularity;
};
using MergeTreeDataPartWriterPtr = std::unique_ptr<IMergeTreeDataPartWriter>;
using ColumnPositions = std::unordered_map<std::string, size_t>;
MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter(
MergeTreeDataPartType part_type,
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list,
const ColumnPositions & column_positions,
const StorageMetadataPtr & metadata_snapshot,
const VirtualsDescriptionPtr & virtual_columns_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const String & marks_file_extension,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity);
}

View File

@ -7,20 +7,21 @@ namespace DB
{
IMergedBlockOutputStream::IMergedBlockOutputStream(
const MergeTreeMutableDataPartPtr & data_part,
const MergeTreeSettingsPtr & storage_settings_,
MutableDataPartStoragePtr data_part_storage_,
const StorageMetadataPtr & metadata_snapshot_,
const NamesAndTypesList & columns_list,
bool reset_columns_)
: storage(data_part->storage)
: storage_settings(storage_settings_)
, metadata_snapshot(metadata_snapshot_)
, data_part_storage(data_part->getDataPartStoragePtr())
, data_part_storage(data_part_storage_)
, reset_columns(reset_columns_)
{
if (reset_columns)
{
SerializationInfo::Settings info_settings =
{
.ratio_of_defaults_for_sparse = storage.getSettings()->ratio_of_defaults_for_sparse_serialization,
.ratio_of_defaults_for_sparse = storage_settings->ratio_of_defaults_for_sparse_serialization,
.choose_kind = false,
};
@ -42,7 +43,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
return {};
for (const auto & column : empty_columns)
LOG_TRACE(storage.log, "Skipping expired/empty column {} for part {}", column, data_part->name);
LOG_TRACE(data_part->storage.log, "Skipping expired/empty column {} for part {}", column, data_part->name);
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
std::map<String, size_t> stream_counts;
@ -91,7 +92,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
}
else /// If we have no file in checksums it doesn't exist on disk
{
LOG_TRACE(storage.log, "Files {} doesn't exist in checksums so it doesn't exist on disk, will not try to remove it", *itr);
LOG_TRACE(data_part->storage.log, "Files {} doesn't exist in checksums so it doesn't exist on disk, will not try to remove it", *itr);
itr = remove_files.erase(itr);
}
}

View File

@ -1,10 +1,12 @@
#pragma once
#include "Storages/MergeTree/IDataPartStorage.h"
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <Common/Logger.h>
namespace DB
{
@ -13,7 +15,8 @@ class IMergedBlockOutputStream
{
public:
IMergedBlockOutputStream(
const MergeTreeMutableDataPartPtr & data_part,
const MergeTreeSettingsPtr & storage_settings_,
MutableDataPartStoragePtr data_part_storage_,
const StorageMetadataPtr & metadata_snapshot_,
const NamesAndTypesList & columns_list,
bool reset_columns_);
@ -39,11 +42,13 @@ protected:
SerializationInfoByName & serialization_infos,
MergeTreeData::DataPart::Checksums & checksums);
const MergeTreeData & storage;
MergeTreeSettingsPtr storage_settings;
LoggerPtr log;
StorageMetadataPtr metadata_snapshot;
MutableDataPartStoragePtr data_part_storage;
IMergeTreeDataPart::MergeTreeWriterPtr writer;
MergeTreeDataPartWriterPtr writer;
bool reset_columns = false;
SerializationInfoByName new_serialization_infos;

View File

@ -9,7 +9,7 @@
#include <Common/ActionBlocker.h>
#include <Processors/Transforms/CheckSortedTransform.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Compression/CompressedWriteBuffer.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/Serializations/SerializationInfo.h>
#include <IO/IReadableWriteBuffer.h>
@ -34,6 +34,7 @@
#include <Processors/Transforms/DistinctTransform.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
@ -378,7 +379,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
MergeTreeIndexFactory::instance().getMany(global_ctx->metadata_snapshot->getSecondaryIndices()),
MergeTreeStatisticsFactory::instance().getMany(global_ctx->metadata_snapshot->getColumns()),
ctx->compression_codec,
global_ctx->txn,
global_ctx->txn ? global_ctx->txn->tid : Tx::PrehistoricTID,
/*reset_columns=*/ true,
ctx->blocks_are_granules_size,
global_ctx->context->getWriteSettings());

View File

@ -8492,7 +8492,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createE
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns,
index_factory.getMany(metadata_snapshot->getSecondaryIndices()),
Statistics{},
compression_codec, txn);
compression_codec, txn ? txn->tid : Tx::PrehistoricTID);
bool sync_on_insert = settings->fsync_after_insert;

View File

@ -47,26 +47,36 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
avg_value_size_hints, profile_callback, CLOCK_MONOTONIC_COARSE);
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter(
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list,
const ColumnPositions & column_positions,
const StorageMetadataPtr & metadata_snapshot,
const VirtualsDescriptionPtr & virtual_columns,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity)
{
NamesAndTypesList ordered_columns_list;
std::copy_if(columns_list.begin(), columns_list.end(), std::back_inserter(ordered_columns_list),
[this](const auto & column) { return getColumnPosition(column.name) != std::nullopt; });
[&column_positions](const auto & column) { return column_positions.contains(column.name); });
/// Order of writing is important in compact format
ordered_columns_list.sort([this](const auto & lhs, const auto & rhs)
{ return *getColumnPosition(lhs.name) < *getColumnPosition(rhs.name); });
ordered_columns_list.sort([&column_positions](const auto & lhs, const auto & rhs)
{ return column_positions.at(lhs.name) < column_positions.at(rhs.name); });
return std::make_unique<MergeTreeDataPartWriterCompact>(
shared_from_this(), ordered_columns_list, metadata_snapshot,
indices_to_recalc, stats_to_recalc_, getMarksFileExtension(),
data_part_name_, logger_name_, serializations_, data_part_storage_,
index_granularity_info_, storage_settings_, ordered_columns_list, metadata_snapshot, virtual_columns,
indices_to_recalc, stats_to_recalc_, marks_file_extension_,
default_codec_, writer_settings, computed_index_granularity);
}

View File

@ -40,15 +40,6 @@ public:
const ValueSizeMap & avg_value_size_hints,
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const override;
MergeTreeWriterPtr getWriter(
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity) override;
bool isStoredOnDisk() const override { return true; }
bool isStoredOnRemoteDisk() const override;

View File

@ -53,19 +53,28 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
profile_callback);
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(
MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter(
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const VirtualsDescriptionPtr & virtual_columns,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity)
{
return std::make_unique<MergeTreeDataPartWriterWide>(
shared_from_this(), columns_list,
metadata_snapshot, indices_to_recalc, stats_to_recalc_,
getMarksFileExtension(),
data_part_name_, logger_name_, serializations_, data_part_storage_,
index_granularity_info_, storage_settings_, columns_list,
metadata_snapshot, virtual_columns, indices_to_recalc, stats_to_recalc_,
marks_file_extension_,
default_codec_, writer_settings, computed_index_granularity);
}

View File

@ -35,15 +35,6 @@ public:
const ValueSizeMap & avg_value_size_hints,
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const override;
MergeTreeWriterPtr getWriter(
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity) override;
bool isStoredOnDisk() const override { return true; }
bool isStoredOnRemoteDisk() const override;

View File

@ -10,32 +10,41 @@ namespace ErrorCodes
}
MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const MergeTreeMutableDataPartPtr & data_part_,
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list_,
const StorageMetadataPtr & metadata_snapshot_,
const VirtualsDescriptionPtr & virtual_columns_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const Statistics & stats_to_recalc,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: MergeTreeDataPartWriterOnDisk(data_part_, columns_list_, metadata_snapshot_,
: MergeTreeDataPartWriterOnDisk(
data_part_name_, logger_name_, serializations_,
data_part_storage_, index_granularity_info_, storage_settings_,
columns_list_, metadata_snapshot_, virtual_columns_,
indices_to_recalc_, stats_to_recalc, marks_file_extension_,
default_codec_, settings_, index_granularity_)
, plain_file(data_part_->getDataPartStorage().writeFile(
, plain_file(getDataPartStorage().writeFile(
MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION,
settings.max_compress_block_size,
settings_.query_write_settings))
, plain_hashing(*plain_file)
{
marks_file = data_part_->getDataPartStorage().writeFile(
marks_file = getDataPartStorage().writeFile(
MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension_,
4096,
settings_.query_write_settings);
marks_file_hashing = std::make_unique<HashingWriteBuffer>(*marks_file);
if (data_part_->index_granularity_info.mark_type.compressed)
if (index_granularity_info.mark_type.compressed)
{
marks_compressor = std::make_unique<CompressedWriteBuffer>(
*marks_file_hashing,
@ -45,10 +54,9 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
marks_source_hashing = std::make_unique<HashingWriteBuffer>(*marks_compressor);
}
auto storage_snapshot = std::make_shared<StorageSnapshot>(data_part->storage, metadata_snapshot);
for (const auto & column : columns_list)
{
auto compression = storage_snapshot->getCodecDescOrDefault(column.name, default_codec);
auto compression = getCodecDescOrDefault(column.name, default_codec);
addStreams(column, nullptr, compression);
}
}
@ -59,12 +67,11 @@ void MergeTreeDataPartWriterCompact::initDynamicStreamsIfNeeded(const Block & bl
return;
is_dynamic_streams_initialized = true;
auto storage_snapshot = std::make_shared<StorageSnapshot>(data_part->storage, metadata_snapshot);
for (const auto & column : columns_list)
{
if (column.type->hasDynamicSubcolumns())
{
auto compression = storage_snapshot->getCodecDescOrDefault(column.name, default_codec);
auto compression = getCodecDescOrDefault(column.name, default_codec);
addStreams(column, block.getByName(column.name).column, compression);
}
}
@ -98,7 +105,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & name_and
compressed_streams.emplace(stream_name, stream);
};
data_part->getSerialization(name_and_type.name)->enumerateStreams(callback, name_and_type.type, column);
getSerialization(name_and_type.name)->enumerateStreams(callback, name_and_type.type, column);
}
namespace
@ -251,7 +258,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
writeBinaryLittleEndian(static_cast<UInt64>(0), marks_out);
writeColumnSingleGranule(
block.getByName(name_and_type->name), data_part->getSerialization(name_and_type->name),
block.getByName(name_and_type->name), getSerialization(name_and_type->name),
stream_getter, granule.start_row, granule.rows_to_write);
/// Each type always have at least one substream
@ -262,7 +269,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
}
}
void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Checksums & checksums)
void MergeTreeDataPartWriterCompact::fillDataChecksums(MergeTreeDataPartChecksums & checksums)
{
if (columns_buffer.size() != 0)
{
@ -432,7 +439,7 @@ size_t MergeTreeDataPartWriterCompact::ColumnsBuffer::size() const
return accumulated_columns.at(0)->size();
}
void MergeTreeDataPartWriterCompact::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & /*checksums_to_remove*/)
void MergeTreeDataPartWriterCompact::fillChecksums(MergeTreeDataPartChecksums & checksums, NameSet & /*checksums_to_remove*/)
{
// If we don't have anything to write, skip finalization.
if (!columns_list.empty())

View File

@ -11,9 +11,15 @@ class MergeTreeDataPartWriterCompact : public MergeTreeDataPartWriterOnDisk
{
public:
MergeTreeDataPartWriterCompact(
const MergeTreeMutableDataPartPtr & data_part,
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot_,
const VirtualsDescriptionPtr & virtual_columns_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc,
const String & marks_file_extension,
@ -23,12 +29,12 @@ public:
void write(const Block & block, const IColumn::Permutation * permutation) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) override;
void fillChecksums(MergeTreeDataPartChecksums & checksums, NameSet & checksums_to_remove) override;
void finish(bool sync) override;
private:
/// Finish serialization of the data. Flush rows in buffer to disk, compute checksums.
void fillDataChecksums(IMergeTreeDataPart::Checksums & checksums);
void fillDataChecksums(MergeTreeDataPartChecksums & checksums);
void finishDataSerialization(bool sync);
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;

View File

@ -140,16 +140,24 @@ void MergeTreeDataPartWriterOnDisk::Stream<only_plain_file>::addToChecksums(Merg
MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
const MergeTreeMutableDataPartPtr & data_part_,
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list_,
const StorageMetadataPtr & metadata_snapshot_,
const VirtualsDescriptionPtr & virtual_columns_,
const MergeTreeIndices & indices_to_recalc_,
const Statistics & stats_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter(data_part_, columns_list_, metadata_snapshot_, settings_, index_granularity_)
: IMergeTreeDataPartWriter(
data_part_name_, serializations_, data_part_storage_, index_granularity_info_,
storage_settings_, columns_list_, metadata_snapshot_, virtual_columns_, settings_, index_granularity_)
, skip_indices(indices_to_recalc_)
, stats(stats_to_recalc_)
, marks_file_extension(marks_file_extension_)
@ -157,14 +165,14 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
, compute_granularity(index_granularity.empty())
, compress_primary_key(settings.compress_primary_key)
, execution_stats(skip_indices.size(), stats.size())
, log(getLogger(storage.getLogName() + " (DataPartWriter)"))
, log(getLogger(logger_name_ + " (DataPartWriter)"))
{
if (settings.blocks_are_granules_size && !index_granularity.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Can't take information about index granularity from blocks, when non empty index_granularity array specified");
if (!data_part->getDataPartStorage().exists())
data_part->getDataPartStorage().createDirectories();
if (!getDataPartStorage().exists())
getDataPartStorage().createDirectories();
if (settings.rewrite_primary_key)
initPrimaryIndex();
@ -223,7 +231,6 @@ static size_t computeIndexGranularityImpl(
size_t MergeTreeDataPartWriterOnDisk::computeIndexGranularity(const Block & block) const
{
const auto storage_settings = storage.getSettings();
return computeIndexGranularityImpl(
block,
storage_settings->index_granularity_bytes,
@ -237,7 +244,7 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
if (metadata_snapshot->hasPrimaryKey())
{
String index_name = "primary" + getIndexExtension(compress_primary_key);
index_file_stream = data_part->getDataPartStorage().writeFile(index_name, DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings);
index_file_stream = getDataPartStorage().writeFile(index_name, DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings);
index_file_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
if (compress_primary_key)
@ -256,7 +263,7 @@ void MergeTreeDataPartWriterOnDisk::initStatistics()
String stats_name = stat_ptr->getFileName();
stats_streams.emplace_back(std::make_unique<MergeTreeDataPartWriterOnDisk::Stream<true>>(
stats_name,
data_part->getDataPartStoragePtr(),
data_part_storage,
stats_name, STAT_FILE_SUFFIX,
default_codec, settings.max_compress_block_size,
settings.query_write_settings));
@ -275,7 +282,7 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
skip_indices_streams.emplace_back(
std::make_unique<MergeTreeDataPartWriterOnDisk::Stream<false>>(
stream_name,
data_part->getDataPartStoragePtr(),
data_part_storage,
stream_name, skip_index->getSerializedFileExtension(),
stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size,
@ -285,7 +292,7 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
GinIndexStorePtr store = nullptr;
if (typeid_cast<const MergeTreeIndexFullText *>(&*skip_index) != nullptr)
{
store = std::make_shared<GinIndexStore>(stream_name, data_part->getDataPartStoragePtr(), data_part->getDataPartStoragePtr(), storage.getSettings()->max_digestion_size_per_segment);
store = std::make_shared<GinIndexStore>(stream_name, data_part_storage, data_part_storage, storage_settings->max_digestion_size_per_segment);
gin_index_stores[stream_name] = store;
}
skip_indices_aggregators.push_back(skip_index->createIndexAggregatorForPart(store, settings));
@ -498,7 +505,7 @@ void MergeTreeDataPartWriterOnDisk::finishStatisticsSerialization(bool sync)
}
for (size_t i = 0; i < stats.size(); ++i)
LOG_DEBUG(log, "Spent {} ms calculating statistics {} for the part {}", execution_stats.statistics_build_us[i] / 1000, stats[i]->columnName(), data_part->name);
LOG_DEBUG(log, "Spent {} ms calculating statistics {} for the part {}", execution_stats.statistics_build_us[i] / 1000, stats[i]->columnName(), data_part_name);
}
void MergeTreeDataPartWriterOnDisk::fillStatisticsChecksums(MergeTreeData::DataPart::Checksums & checksums)
@ -524,7 +531,7 @@ void MergeTreeDataPartWriterOnDisk::finishSkipIndicesSerialization(bool sync)
store.second->finalize();
for (size_t i = 0; i < skip_indices.size(); ++i)
LOG_DEBUG(log, "Spent {} ms calculating index {} for the part {}", execution_stats.skip_indices_build_us[i] / 1000, skip_indices[i]->index.name, data_part->name);
LOG_DEBUG(log, "Spent {} ms calculating index {} for the part {}", execution_stats.skip_indices_build_us[i] / 1000, skip_indices[i]->index.name, data_part_name);
gin_index_stores.clear();
skip_indices_streams.clear();

View File

@ -5,9 +5,6 @@
#include <IO/WriteBufferFromFileBase.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/HashingWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Disks/IDisk.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/parseQuery.h>
#include <Storages/Statistics/Statistics.h>
@ -97,16 +94,22 @@ public:
void sync() const;
void addToChecksums(IMergeTreeDataPart::Checksums & checksums);
void addToChecksums(MergeTreeDataPartChecksums & checksums);
};
using StreamPtr = std::unique_ptr<Stream<false>>;
using StatisticStreamPtr = std::unique_ptr<Stream<true>>;
MergeTreeDataPartWriterOnDisk(
const MergeTreeMutableDataPartPtr & data_part_,
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot_,
const VirtualsDescriptionPtr & virtual_columns_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const String & marks_file_extension,
@ -133,13 +136,13 @@ protected:
void calculateAndSerializeStatistics(const Block & stats_block);
/// Finishes primary index serialization: write final primary index row (if required) and compute checksums
void fillPrimaryIndexChecksums(MergeTreeData::DataPart::Checksums & checksums);
void fillPrimaryIndexChecksums(MergeTreeDataPartChecksums & checksums);
void finishPrimaryIndexSerialization(bool sync);
/// Finishes skip indices serialization: write all accumulated data to disk and compute checksums
void fillSkipIndicesChecksums(MergeTreeData::DataPart::Checksums & checksums);
void fillSkipIndicesChecksums(MergeTreeDataPartChecksums & checksums);
void finishSkipIndicesSerialization(bool sync);
void fillStatisticsChecksums(MergeTreeData::DataPart::Checksums & checksums);
void fillStatisticsChecksums(MergeTreeDataPartChecksums & checksums);
void finishStatisticsSerialization(bool sync);
/// Get global number of the current which we are writing (or going to start to write)

View File

@ -76,23 +76,31 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
}
MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
const MergeTreeMutableDataPartPtr & data_part_,
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list_,
const StorageMetadataPtr & metadata_snapshot_,
const VirtualsDescriptionPtr & virtual_columns_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const Statistics & stats_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: MergeTreeDataPartWriterOnDisk(data_part_, columns_list_, metadata_snapshot_,
: MergeTreeDataPartWriterOnDisk(
data_part_name_, logger_name_, serializations_,
data_part_storage_, index_granularity_info_, storage_settings_,
columns_list_, metadata_snapshot_, virtual_columns_,
indices_to_recalc_, stats_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
{
auto storage_snapshot = std::make_shared<StorageSnapshot>(data_part->storage, metadata_snapshot);
for (const auto & column : columns_list)
{
auto compression = storage_snapshot->getCodecDescOrDefault(column.name, default_codec);
auto compression = getCodecDescOrDefault(column.name, default_codec);
addStreams(column, nullptr, compression);
}
}
@ -104,12 +112,11 @@ void MergeTreeDataPartWriterWide::initDynamicStreamsIfNeeded(const DB::Block & b
is_dynamic_streams_initialized = true;
block_sample = block.cloneEmpty();
auto storage_snapshot = std::make_shared<StorageSnapshot>(data_part->storage, metadata_snapshot);
for (const auto & column : columns_list)
{
if (column.type->hasDynamicSubcolumns())
{
auto compression = storage_snapshot->getCodecDescOrDefault(column.name, default_codec);
auto compression = getCodecDescOrDefault(column.name, default_codec);
addStreams(column, block_sample.getByName(column.name).column, compression);
}
}
@ -124,7 +131,6 @@ void MergeTreeDataPartWriterWide::addStreams(
{
assert(!substream_path.empty());
auto storage_settings = storage.getSettings();
auto full_stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
String stream_name;
@ -168,7 +174,7 @@ void MergeTreeDataPartWriterWide::addStreams(
column_streams[stream_name] = std::make_unique<Stream<false>>(
stream_name,
data_part->getDataPartStoragePtr(),
data_part_storage,
stream_name, DATA_FILE_EXTENSION,
stream_name, marks_file_extension,
compression_codec,
@ -182,7 +188,7 @@ void MergeTreeDataPartWriterWide::addStreams(
};
ISerialization::SubstreamPath path;
data_part->getSerialization(name_and_type.name)->enumerateStreams(callback, name_and_type.type, column);
getSerialization(name_and_type.name)->enumerateStreams(callback, name_and_type.type, column);
}
const String & MergeTreeDataPartWriterWide::getStreamName(
@ -286,7 +292,7 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
{
auto & column = block_to_write.getByName(it->name);
if (data_part->getSerialization(it->name)->getKind() != ISerialization::Kind::SPARSE)
if (getSerialization(it->name)->getKind() != ISerialization::Kind::SPARSE)
column.column = recursiveRemoveSparse(column.column);
if (permutation)
@ -358,7 +364,7 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
min_compress_block_size = value->safeGet<UInt64>();
if (!min_compress_block_size)
min_compress_block_size = settings.min_compress_block_size;
data_part->getSerialization(name_and_type.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
getSerialization(name_and_type.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
auto stream_name = getStreamName(name_and_type, substream_path);
@ -392,7 +398,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
ISerialization::SerializeBinaryBulkSettings & serialize_settings,
const Granule & granule)
{
const auto & serialization = data_part->getSerialization(name_and_type.name);
const auto & serialization = getSerialization(name_and_type.name);
serialization->serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.rows_to_write, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
@ -422,7 +428,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
const auto & [name, type] = name_and_type;
auto [it, inserted] = serialization_states.emplace(name, nullptr);
auto serialization = data_part->getSerialization(name_and_type.name);
auto serialization = getSerialization(name_and_type.name);
if (inserted)
{
@ -431,11 +437,10 @@ void MergeTreeDataPartWriterWide::writeColumn(
serialization->serializeBinaryBulkStatePrefix(column, serialize_settings, it->second);
}
const auto & global_settings = storage.getContext()->getSettingsRef();
ISerialization::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = createStreamGetter(name_and_type, offset_columns);
serialize_settings.low_cardinality_max_dictionary_size = global_settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = global_settings.low_cardinality_use_single_dictionary_for_part != 0;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part;
for (const auto & granule : granules)
{
@ -484,7 +489,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePair & name_type)
{
const auto & [name, type] = name_type;
const auto & serialization = data_part->getSerialization(name_type.name);
const auto & serialization = getSerialization(name_type.name);
if (!type->isValueRepresentedByNumber() || type->haveSubtypes() || serialization->getKind() != ISerialization::Kind::DEFAULT)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot validate column of non fixed type {}", type->getName());
@ -494,21 +499,21 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
String bin_path = escaped_name + DATA_FILE_EXTENSION;
/// Some columns may be removed because of ttl. Skip them.
if (!data_part->getDataPartStorage().exists(mrk_path))
if (!getDataPartStorage().exists(mrk_path))
return;
auto mrk_file_in = data_part->getDataPartStorage().readFile(mrk_path, {}, std::nullopt, std::nullopt);
auto mrk_file_in = getDataPartStorage().readFile(mrk_path, {}, std::nullopt, std::nullopt);
std::unique_ptr<ReadBuffer> mrk_in;
if (data_part->index_granularity_info.mark_type.compressed)
if (index_granularity_info.mark_type.compressed)
mrk_in = std::make_unique<CompressedReadBufferFromFile>(std::move(mrk_file_in));
else
mrk_in = std::move(mrk_file_in);
DB::CompressedReadBufferFromFile bin_in(data_part->getDataPartStorage().readFile(bin_path, {}, std::nullopt, std::nullopt));
DB::CompressedReadBufferFromFile bin_in(getDataPartStorage().readFile(bin_path, {}, std::nullopt, std::nullopt));
bool must_be_last = false;
UInt64 offset_in_compressed_file = 0;
UInt64 offset_in_decompressed_block = 0;
UInt64 index_granularity_rows = data_part->index_granularity_info.fixed_index_granularity;
UInt64 index_granularity_rows = index_granularity_info.fixed_index_granularity;
size_t mark_num;
@ -524,7 +529,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
if (settings.can_use_adaptive_granularity)
readBinaryLittleEndian(index_granularity_rows, *mrk_in);
else
index_granularity_rows = data_part->index_granularity_info.fixed_index_granularity;
index_granularity_rows = index_granularity_info.fixed_index_granularity;
if (must_be_last)
{
@ -557,7 +562,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
ErrorCodes::LOGICAL_ERROR,
"Incorrect mark rows for part {} for mark #{}"
" (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}, total marks {}",
data_part->getDataPartStorage().getFullPath(),
getDataPartStorage().getFullPath(),
mark_num, offset_in_compressed_file, offset_in_decompressed_block,
index_granularity.getMarkRows(mark_num), index_granularity_rows,
index_granularity.getMarksCount());
@ -617,12 +622,11 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
}
}
void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove)
void MergeTreeDataPartWriterWide::fillDataChecksums(MergeTreeDataPartChecksums & checksums, NameSet & checksums_to_remove)
{
const auto & global_settings = storage.getContext()->getSettingsRef();
ISerialization::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = global_settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = global_settings.low_cardinality_use_single_dictionary_for_part != 0;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part;
WrittenOffsetColumns offset_columns;
if (rows_written_in_last_mark > 0)
{
@ -646,7 +650,7 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksum
{
serialize_settings.getter = createStreamGetter(*it, written_offset_columns ? *written_offset_columns : offset_columns);
serialize_settings.dynamic_write_statistics = ISerialization::SerializeBinaryBulkSettings::DynamicStatisticsMode::SUFFIX;
data_part->getSerialization(it->name)->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[it->name]);
getSerialization(it->name)->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[it->name]);
}
if (write_final_mark)
@ -689,7 +693,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(bool sync)
{
if (column.type->isValueRepresentedByNumber()
&& !column.type->haveSubtypes()
&& data_part->getSerialization(column.name)->getKind() == ISerialization::Kind::DEFAULT)
&& getSerialization(column.name)->getKind() == ISerialization::Kind::DEFAULT)
{
validateColumnOfFixedSize(column);
}
@ -698,7 +702,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(bool sync)
}
void MergeTreeDataPartWriterWide::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove)
void MergeTreeDataPartWriterWide::fillChecksums(MergeTreeDataPartChecksums & checksums, NameSet & checksums_to_remove)
{
// If we don't have anything to write, skip finalization.
if (!columns_list.empty())
@ -732,7 +736,7 @@ void MergeTreeDataPartWriterWide::writeFinalMark(
{
writeSingleMark(name_and_type, offset_columns, 0);
/// Memoize information about offsets
data_part->getSerialization(name_and_type.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
getSerialization(name_and_type.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (is_offsets)

View File

@ -21,9 +21,15 @@ class MergeTreeDataPartWriterWide : public MergeTreeDataPartWriterOnDisk
{
public:
MergeTreeDataPartWriterWide(
const MergeTreeMutableDataPartPtr & data_part,
const String & data_part_name_,
const String & logger_name_,
const SerializationByName & serializations_,
MutableDataPartStoragePtr data_part_storage_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const MergeTreeSettingsPtr & storage_settings_,
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const VirtualsDescriptionPtr & virtual_columns_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const String & marks_file_extension,
@ -33,14 +39,14 @@ public:
void write(const Block & block, const IColumn::Permutation * permutation) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) final;
void fillChecksums(MergeTreeDataPartChecksums & checksums, NameSet & checksums_to_remove) final;
void finish(bool sync) final;
private:
/// Finish serialization of data: write final mark if required and compute checksums
/// Also validate written data in debug mode
void fillDataChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove);
void fillDataChecksums(MergeTreeDataPartChecksums & checksums, NameSet & checksums_to_remove);
void finishDataSerialization(bool sync);
/// Write data of one column.

View File

@ -600,7 +600,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
indices,
MergeTreeStatisticsFactory::instance().getMany(metadata_snapshot->getColumns()),
compression_codec,
context->getCurrentTransaction(),
context->getCurrentTransaction() ? context->getCurrentTransaction()->tid : Tx::PrehistoricTID,
false,
false,
context->getWriteSettings());
@ -738,7 +738,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
MergeTreeIndices{},
Statistics{}, /// TODO(hanfei): It should be helpful to write statistics for projection result.
compression_codec,
NO_TRANSACTION_PTR,
Tx::PrehistoricTID,
false, false, data.getContext()->getWriteSettings());
out->writeWithPermutation(block, perm_ptr);

View File

@ -74,6 +74,8 @@ struct MergeTreeWriterSettings
, blocks_are_granules_size(blocks_are_granules_size_)
, query_write_settings(query_write_settings_)
, max_threads_for_annoy_index_creation(global_settings.max_threads_for_annoy_index_creation)
, low_cardinality_max_dictionary_size(global_settings.low_cardinality_max_dictionary_size)
, low_cardinality_use_single_dictionary_for_part(global_settings.low_cardinality_use_single_dictionary_for_part != 0)
{
}
@ -93,6 +95,9 @@ struct MergeTreeWriterSettings
WriteSettings query_write_settings;
size_t max_threads_for_annoy_index_creation;
size_t low_cardinality_max_dictionary_size;
bool low_cardinality_use_single_dictionary_for_part;
};
}

View File

@ -413,12 +413,12 @@ void MergeTreePartition::load(const MergeTreeData & storage, const PartMetadataM
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->deserializeBinary(value[i], *file, {});
}
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const MergeTreeData & storage, IDataPartStorage & data_part_storage, MergeTreeDataPartChecksums & checksums) const
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(
StorageMetadataPtr metadata_snapshot, ContextPtr storage_context,
IDataPartStorage & data_part_storage, MergeTreeDataPartChecksums & checksums) const
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
const auto & context = storage.getContext();
const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, storage.getContext()).sample_block;
return store(partition_key_sample, data_part_storage, checksums, context->getWriteSettings());
const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, storage_context).sample_block;
return store(partition_key_sample, data_part_storage, checksums, storage_context->getWriteSettings());
}
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const Block & partition_key_sample, IDataPartStorage & data_part_storage, MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const

View File

@ -44,7 +44,9 @@ public:
/// Store functions return write buffer with written but not finalized data.
/// User must call finish() for returned object.
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const MergeTreeData & storage, IDataPartStorage & data_part_storage, MergeTreeDataPartChecksums & checksums) const;
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(
StorageMetadataPtr metadata_snapshot, ContextPtr storage_context,
IDataPartStorage & data_part_storage, MergeTreeDataPartChecksums & checksums) const;
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const Block & partition_key_sample, IDataPartStorage & data_part_storage, MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const;
void assign(const MergeTreePartition & other) { value = other.value; }

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <IO/HashingWriteBuffer.h>
#include <Interpreters/Context.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Parsers/queryToString.h>
@ -21,35 +22,39 @@ MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeIndices & skip_indices,
const Statistics & statistics,
CompressionCodecPtr default_codec_,
const MergeTreeTransactionPtr & txn,
TransactionID tid,
bool reset_columns_,
bool blocks_are_granules_size,
const WriteSettings & write_settings_,
const MergeTreeIndexGranularity & computed_index_granularity)
: IMergedBlockOutputStream(data_part, metadata_snapshot_, columns_list_, reset_columns_)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, reset_columns_)
, columns_list(columns_list_)
, default_codec(default_codec_)
, write_settings(write_settings_)
{
MergeTreeWriterSettings writer_settings(
storage.getContext()->getSettings(),
data_part->storage.getContext()->getSettings(),
write_settings,
storage.getSettings(),
storage_settings,
data_part->index_granularity_info.mark_type.adaptive,
/* rewrite_primary_key = */ true,
blocks_are_granules_size);
/// TODO: looks like isStoredOnDisk() is always true for MergeTreeDataPart
if (data_part->isStoredOnDisk())
data_part_storage->createDirectories();
/// We should write version metadata on part creation to distinguish it from parts that were created without transaction.
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
/// NOTE do not pass context for writing to system.transactions_info_log,
/// because part may have temporary name (with temporary block numbers). Will write it later.
data_part->version.setCreationTID(tid, nullptr);
data_part->storeVersionMetadata();
writer = data_part->getWriter(columns_list, metadata_snapshot, skip_indices, statistics, default_codec, writer_settings, computed_index_granularity);
writer = createMergeTreeDataPartWriter(data_part->getType(),
data_part->name, data_part->storage.getLogName(), data_part->getSerializations(),
data_part_storage, data_part->index_granularity_info,
storage_settings,
columns_list, data_part->getColumnPositions(), metadata_snapshot, data_part->storage.getVirtualsPtr(),
skip_indices, statistics, data_part->getMarksFileExtension(), default_codec, writer_settings, computed_index_granularity);
}
/// If data is pre-sorted.
@ -208,7 +213,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
if (new_part->isProjectionPart())
{
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
if (new_part->storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
{
auto count_out = new_part->getDataPartStorage().writeFile("count.txt", 4096, write_settings);
HashingWriteBuffer count_out_hashing(*count_out);
@ -234,14 +239,16 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
written_files.emplace_back(std::move(out));
}
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
if (new_part->storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
if (auto file = new_part->partition.store(storage, new_part->getDataPartStorage(), checksums))
if (auto file = new_part->partition.store(
new_part->storage.getInMemoryMetadataPtr(), new_part->storage.getContext(),
new_part->getDataPartStorage(), checksums))
written_files.emplace_back(std::move(file));
if (new_part->minmax_idx->initialized)
{
auto files = new_part->minmax_idx->store(storage, new_part->getDataPartStorage(), checksums);
auto files = new_part->minmax_idx->store(new_part->storage, new_part->getDataPartStorage(), checksums);
for (auto & file : files)
written_files.emplace_back(std::move(file));
}

View File

@ -22,7 +22,7 @@ public:
const MergeTreeIndices & skip_indices,
const Statistics & statistics,
CompressionCodecPtr default_codec_,
const MergeTreeTransactionPtr & txn,
TransactionID tid,
bool reset_columns_ = false,
bool blocks_are_granules_size = false,
const WriteSettings & write_settings = {},

View File

@ -20,11 +20,10 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
WrittenOffsetColumns * offset_columns_,
const MergeTreeIndexGranularity & index_granularity,
const MergeTreeIndexGranularityInfo * index_granularity_info)
: IMergedBlockOutputStream(data_part, metadata_snapshot_, header_.getNamesAndTypesList(), /*reset_columns=*/ true)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, header_.getNamesAndTypesList(), /*reset_columns=*/ true)
, header(header_)
{
const auto & global_settings = data_part->storage.getContext()->getSettings();
const auto & storage_settings = data_part->storage.getSettings();
MergeTreeWriterSettings writer_settings(
global_settings,
@ -33,11 +32,18 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(),
/* rewrite_primary_key = */ false);
writer = data_part->getWriter(
writer = createMergeTreeDataPartWriter(
data_part->getType(),
data_part->name, data_part->storage.getLogName(), data_part->getSerializations(),
data_part_storage, data_part->index_granularity_info,
storage_settings,
header.getNamesAndTypesList(),
data_part->getColumnPositions(),
metadata_snapshot_,
data_part->storage.getVirtualsPtr(),
indices_to_recalc,
stats_to_recalc_,
data_part->getMarksFileExtension(),
default_codec,
writer_settings,
index_granularity);

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MutateTask.h>
#include <IO/HashingWriteBuffer.h>
#include <Common/logger_useful.h>
#include <Common/escapeForFileName.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
@ -1675,7 +1676,7 @@ private:
skip_indices,
stats_to_rewrite,
ctx->compression_codec,
ctx->txn,
ctx->txn ? ctx->txn->tid : Tx::PrehistoricTID,
/*reset_columns=*/ true,
/*blocks_are_granules_size=*/ false,
ctx->context->getWriteSettings(),

View File

@ -136,7 +136,7 @@ ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context,
return std::make_shared<S3ObjectStorage>(
std::move(client), std::move(s3_settings), url, s3_capabilities,
key_generator, "StorageS3", false, headers_from_ast);
key_generator, "StorageS3", false);
}
void StorageS3Configuration::fromNamedCollection(const NamedCollection & collection)

View File

@ -302,6 +302,8 @@ void StorageBuffer::read(
auto src_table_query_info = query_info;
if (src_table_query_info.prewhere_info)
{
src_table_query_info.prewhere_info = src_table_query_info.prewhere_info->clone();
auto actions_dag = ActionsDAG::makeConvertingActions(
header_after_adding_defaults.getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),

View File

@ -1,4 +1,4 @@
<clickhouse>
<aggregate_function_group_array_max_element_size>10</aggregate_function_group_array_max_element_size>
<aggregate_function_group_array_has_limit_size>false</aggregate_function_group_array_has_limit_size>
<aggregate_function_group_array_action_when_limit_is_reached>throw</aggregate_function_group_array_action_when_limit_is_reached>
</clickhouse>

View File

@ -80,8 +80,8 @@ def test_limit_size(started_cluster):
node2.replace_in_config(
"/etc/clickhouse-server/config.d/group_array_max_element_size.xml",
"false",
"true",
"throw",
"discard",
)
node2.restart_clickhouse()
@ -91,8 +91,8 @@ def test_limit_size(started_cluster):
node2.replace_in_config(
"/etc/clickhouse-server/config.d/group_array_max_element_size.xml",
"true",
"false",
"discard",
"throw",
)
node2.restart_clickhouse()

View File

@ -2,8 +2,14 @@ DROP TABLE IF EXISTS buffer_table1__fuzz_28;
DROP TABLE IF EXISTS merge_tree_table1;
CREATE TABLE merge_tree_table1 (`x` UInt32) ENGINE = MergeTree ORDER BY x;
CREATE TABLE buffer_table1__fuzz_24 (`s` Nullable(Int128), `x` Nullable(FixedString(17))) ENGINE = Buffer(currentDatabase(), 'merge_tree_table1', 16, 10, 60, 10, 1000, 1048576, 2097152);
SELECT s FROM buffer_table1__fuzz_24 PREWHERE factorial(toNullable(10)); -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
INSERT INTO merge_tree_table1 VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);
SELECT s FROM buffer_table1__fuzz_24 PREWHERE factorial(toNullable(10)); -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
SET send_logs_level='error';
CREATE TABLE buffer_table1__fuzz_28 (`x` Nullable(UInt32)) ENGINE = Buffer(currentDatabase(), 'merge_tree_table1', 16, 10, 60, 10, 1000, 1048576, 2097152);

View File

@ -0,0 +1,19 @@
------------------- Distributed ------------------
1
---------- merge() over distributed --------------
2
---------- merge() over local --------------------
1
1
1
---------- remote() over Merge -------------------
2
---------- Distributed over Merge ----------------
1
---------- remote() over Merge -------------------
2
---------- Merge over Distributed -----------------
1
1
1
2

View File

@ -0,0 +1,83 @@
-- https://github.com/ClickHouse/ClickHouse/issues/64211
DROP TABLE IF EXISTS test_merge;
DROP TABLE IF EXISTS test_merge_distributed;
DROP TABLE IF EXISTS test_distributed_merge;
DROP TABLE IF EXISTS test_distributed;
DROP TABLE IF EXISTS test_local;
CREATE TABLE test_local (name String)
ENGINE = MergeTree
ORDER BY name as select 'x';
CREATE TABLE test_distributed as test_local
ENGINE = Distributed(test_shard_localhost, currentDatabase(), test_local);
CREATE TABLE test_merge as test_local
ENGINE = Merge(currentDatabase(), 'test_local');
CREATE TABLE test_merge_distributed as test_local
ENGINE = Distributed(test_shard_localhost, currentDatabase(), test_merge);
CREATE TABLE test_distributed_merge as test_local
ENGINE = Merge(currentDatabase(), 'test_distributed');
SELECT '------------------- Distributed ------------------';
SELECT count()
FROM test_distributed
WHERE name GLOBAL IN (SELECT name FROM test_distributed);
SELECT '---------- merge() over distributed --------------';
SELECT count()
FROM merge(currentDatabase(), 'test_distributed')
WHERE name GLOBAL IN (SELECT name FROM test_distributed);
SELECT '---------- merge() over local --------------------';
SELECT count()
FROM merge(currentDatabase(), 'test_local')
WHERE name GLOBAL IN (SELECT name FROM test_distributed);
SELECT count()
FROM merge(currentDatabase(), 'test_local')
WHERE name GLOBAL IN (SELECT name FROM merge(currentDatabase(), 'test_local'));
SELECT count()
FROM merge(currentDatabase(), 'test_local')
WHERE name GLOBAL IN (SELECT name FROM remote('127.0.0.{1,2}', currentDatabase(), test_merge));
SELECT '---------- remote() over Merge -------------------';
SELECT count()
FROM remote('127.0.0.{1,2}', currentDatabase(), test_merge)
WHERE name GLOBAL IN (SELECT name FROM test_distributed);
SELECT '---------- Distributed over Merge ----------------';
SELECT count()
FROM test_merge_distributed
WHERE name GLOBAL IN (SELECT name FROM test_merge_distributed);
SELECT '---------- remote() over Merge -------------------';
SELECT count()
FROM remote('127.0.0.{1,2}', currentDatabase(), test_merge)
WHERE name GLOBAL IN (SELECT name FROM remote('127.0.0.{1,2}', currentDatabase(), test_merge));
SELECT '---------- Merge over Distributed -----------------';
SELECT count()
FROM test_distributed_merge
WHERE name GLOBAL IN (SELECT name FROM remote('127.0.0.{1,2}', currentDatabase(), test_merge));
SELECT count()
FROM test_distributed_merge
WHERE name GLOBAL IN (SELECT name FROM remote('127.0.0.{1,2}', currentDatabase(), test_distributed_merge));
SELECT count()
FROM test_distributed_merge
WHERE name GLOBAL IN (SELECT name FROM test_distributed_merge);
SELECT count()
FROM remote('127.0.0.{1,2}', currentDatabase(), test_distributed_merge)
WHERE name GLOBAL IN (SELECT name FROM remote('127.0.0.{1,2}', currentDatabase(), test_merge));
DROP TABLE test_merge;
DROP TABLE test_merge_distributed;
DROP TABLE test_distributed_merge;
DROP TABLE test_distributed;
DROP TABLE test_local;

View File

@ -11,7 +11,7 @@ REPLICA=$($CLICKHOUSE_CLIENT --query "Select getMacro('replica')")
# Check that if we have one inactive replica and a huge number of INSERTs to active replicas,
# the number of nodes in ZooKeeper does not grow unbounded.
SCALE=5000
SCALE=1000
$CLICKHOUSE_CLIENT -n --query "
DROP TABLE IF EXISTS r1;

View File

@ -11,14 +11,24 @@ CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS t"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE t (x Int8) ENGINE = MergeTree ORDER BY tuple()"
for _ in {1..100}; do
function thread_ops()
{
local TIMELIMIT=$((SECONDS+$1))
local it=0
while [ $SECONDS -lt "$TIMELIMIT" ] && [ $it -lt 100 ];
do
it=$((it+1))
${CLICKHOUSE_CLIENT} --query="INSERT INTO t VALUES (0)"
${CLICKHOUSE_CLIENT} --query="INSERT INTO t VALUES (0)"
${CLICKHOUSE_CLIENT} --query="OPTIMIZE TABLE t FINAL" 2>/dev/null &
${CLICKHOUSE_CLIENT} --query="ALTER TABLE t DETACH PARTITION tuple()"
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM t HAVING count() > 0"
done
}
export -f thread_ops
TIMEOUT=60
thread_ops $TIMEOUT &
wait
$CLICKHOUSE_CLIENT -q "DROP TABLE t"

View File

@ -1,16 +1,16 @@
-- Tags: long, no-parallel
SET insert_keeper_fault_injection_probability=0; -- to succeed this test can require too many retries due to 1024 partitions, so disable fault injections
SET insert_keeper_fault_injection_probability=0; -- to succeed this test can require too many retries due to 100 partitions, so disable fault injections
-- regression for MEMORY_LIMIT_EXCEEDED error because of deferred final part flush
drop table if exists data_02228;
create table data_02228 (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by key1 % 1024;
insert into data_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=0;
insert into data_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=10000000; -- { serverError MEMORY_LIMIT_EXCEEDED }
create table data_02228 (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by key1 % 100;
insert into data_02228 select number, 1, number from numbers_mt(10_000) settings max_memory_usage='30Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=0;
insert into data_02228 select number, 1, number from numbers_mt(10_000) settings max_memory_usage='30Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=1000000; -- { serverError MEMORY_LIMIT_EXCEEDED }
drop table data_02228;
drop table if exists data_rep_02228 SYNC;
create table data_rep_02228 (key1 UInt32, sign Int8, s UInt64) engine = ReplicatedCollapsingMergeTree('/clickhouse/{database}', 'r1', sign) order by (key1) partition by key1 % 1024;
insert into data_rep_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=0;
insert into data_rep_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=10000000; -- { serverError MEMORY_LIMIT_EXCEEDED }
create table data_rep_02228 (key1 UInt32, sign Int8, s UInt64) engine = ReplicatedCollapsingMergeTree('/clickhouse/{database}', 'r1', sign) order by (key1) partition by key1 % 100;
insert into data_rep_02228 select number, 1, number from numbers_mt(10_000) settings max_memory_usage='30Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=0;
insert into data_rep_02228 select number, 1, number from numbers_mt(10_000) settings max_memory_usage='30Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=1000000; -- { serverError MEMORY_LIMIT_EXCEEDED }
drop table data_rep_02228 SYNC;

View File

@ -1,4 +1,4 @@
-- Tags: no-parallel, long, no-debug, no-tsan
-- Tags: no-parallel, long, no-debug, no-tsan, no-msan, no-asan
create table data_02344 (key Int) engine=Null;
-- 3e9 rows is enough to fill the socket buffer and cause INSERT hung.

View File

@ -0,0 +1,6 @@
Test (1)
1
2
Test (2)
4
4

View File

@ -0,0 +1,70 @@
-- Tags: no-parallel
-- Tag no-parallel: Messes with internal cache
-- Tests that the key of the query cache is not only formed by the query AST but also by
-- (1) the current database (`USE db`, issue #64136),
-- (2) the query settings
SELECT 'Test (1)';
SYSTEM DROP QUERY CACHE;
DROP DATABASE IF EXISTS db1;
DROP DATABASE IF EXISTS db2;
CREATE DATABASE db1;
CREATE DATABASE db2;
CREATE TABLE db1.tab(a UInt64, PRIMARY KEY a);
CREATE TABLE db2.tab(a UInt64, PRIMARY KEY a);
INSERT INTO db1.tab values(1);
INSERT INTO db2.tab values(2);
USE db1;
SELECT * FROM tab SETTINGS use_query_cache=1;
USE db2;
SELECT * FROM tab SETTINGS use_query_cache=1;
DROP DATABASE db1;
DROP DATABASE db2;
SYSTEM DROP QUERY CACHE;
SELECT 'Test (2)';
-- test with query-level settings
SELECT 1 SETTINGS use_query_cache = 1, limit = 1, use_skip_indexes = 0 Format Null;
SELECT 1 SETTINGS use_query_cache = 1, use_skip_indexes = 0 Format Null;
SELECT 1 SETTINGS use_query_cache = 1, use_skip_indexes = 1 Format Null;
SELECT 1 SETTINGS use_query_cache = 1, max_block_size = 1 Format Null;
-- 4x the same query but with different settings each. There should yield four entries in the query cache.
SELECT count(query) FROM system.query_cache;
SYSTEM DROP QUERY CACHE;
-- test with mixed session-level/query-level settings
SET use_query_cache = 1;
SET limit = 1;
SELECT 1 SETTINGS use_skip_indexes = 0 Format Null;
SET limit = default;
SET use_skip_indexes = 0;
SELECT 1 Format Null;
SET use_skip_indexes = 1;
SELECT 1 SETTINGS use_skip_indexes = 1 Format Null;
SET use_skip_indexes = default;
SET max_block_size = 1;
SELECT 1 Format Null;
SET max_block_size = default;
SET use_query_cache = default;
-- 4x the same query but with different settings each. There should yield four entries in the query cache.
SELECT count(query) FROM system.query_cache;
SYSTEM DROP QUERY CACHE;

View File

@ -15,11 +15,17 @@ ${CLICKHOUSE_CLIENT} --query "CREATE TABLE tab (a UInt64) ENGINE=MergeTree() ORD
${CLICKHOUSE_CLIENT} --query "INSERT INTO tab VALUES (1) (2) (3)"
${CLICKHOUSE_CLIENT} --query "INSERT INTO tab VALUES (3) (4) (5)"
SETTINGS="SETTINGS use_query_cache=1, max_threads=1, allow_experimental_analyzer=0, merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=0.0"
SETTINGS_NO_ANALYZER="SETTINGS use_query_cache=1, max_threads=1, allow_experimental_analyzer=0, merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=0.0"
SETTINGS_ANALYZER="SETTINGS use_query_cache=1, max_threads=1, allow_experimental_analyzer=1, merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=0.0"
# Verify that the first query does two aggregations and the second query zero aggregations. Since query cache is currently not integrated
# with EXPLAIN PLAN, we need to check the logs.
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS" 2>&1 | grep "Aggregated. " | wc -l
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS" 2>&1 | grep "Aggregated. " | wc -l
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS_NO_ANALYZER" 2>&1 | grep "Aggregated. " | wc -l
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS_NO_ANALYZER" 2>&1 | grep "Aggregated. " | wc -l
${CLICKHOUSE_CLIENT} --query "SYSTEM DROP QUERY CACHE"
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS_ANALYZER" 2>&1 | grep "Aggregated. " | wc -l
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS_ANALYZER" 2>&1 | grep "Aggregated. " | wc -l
${CLICKHOUSE_CLIENT} --query "SYSTEM DROP QUERY CACHE"

View File

@ -1,30 +0,0 @@
-- Tags: no-parallel, no-fasttest
-- Tag no-fasttest: Depends on OpenSSL
-- Tag no-parallel: Messes with internal cache
-- Test for issue #64136
SYSTEM DROP QUERY CACHE;
DROP DATABASE IF EXISTS db1;
DROP DATABASE IF EXISTS db2;
CREATE DATABASE db1;
CREATE DATABASE db2;
CREATE TABLE db1.tab(a UInt64, PRIMARY KEY a);
CREATE TABLE db2.tab(a UInt64, PRIMARY KEY a);
INSERT INTO db1.tab values(1);
INSERT INTO db2.tab values(2);
USE db1;
SELECT * FROM tab SETTINGS use_query_cache=1;
USE db2;
SELECT * FROM tab SETTINGS use_query_cache=1;
DROP DATABASE db1;
DROP DATABASE db2;
SYSTEM DROP QUERY CACHE;

View File

@ -41,7 +41,7 @@ create temporary table basic_types_02735 as select * from generateRandom('
decimal128 Decimal128(20),
decimal256 Decimal256(40),
ipv4 IPv4,
ipv6 IPv6') limit 10101;
ipv6 IPv6') limit 1011;
insert into function file(basic_types_02735.parquet) select * from basic_types_02735;
desc file(basic_types_02735.parquet);
select (select sum(cityHash64(*)) from basic_types_02735) - (select sum(cityHash64(*)) from file(basic_types_02735.parquet));
@ -59,7 +59,7 @@ create temporary table nullables_02735 as select * from generateRandom('
fstr Nullable(FixedString(12)),
i256 Nullable(Int256),
decimal256 Nullable(Decimal256(40)),
ipv6 Nullable(IPv6)') limit 10000;
ipv6 Nullable(IPv6)') limit 1000;
insert into function file(nullables_02735.parquet) select * from nullables_02735;
select (select sum(cityHash64(*)) from nullables_02735) - (select sum(cityHash64(*)) from file(nullables_02735.parquet));
drop table nullables_02735;
@ -83,7 +83,7 @@ create table arrays_02735 engine = Memory as select * from generateRandom('
decimal64 Array(Decimal64(10)),
ipv4 Array(IPv4),
msi Map(String, Int16),
tup Tuple(FixedString(3), Array(String), Map(Int8, Date))') limit 10000;
tup Tuple(FixedString(3), Array(String), Map(Int8, Date))') limit 1000;
insert into function file(arrays_02735.parquet) select * from arrays_02735;
create temporary table arrays_out_02735 as arrays_02735;
insert into arrays_out_02735 select * from file(arrays_02735.parquet);
@ -107,7 +107,7 @@ create temporary table madness_02735 as select * from generateRandom('
mln Map(LowCardinality(String), Nullable(Int8)),
t Tuple(Map(FixedString(5), Tuple(Array(UInt16), Nullable(UInt16), Array(Tuple(Int8, Decimal64(10))))), Tuple(kitchen UInt64, sink String)),
n Nested(hello UInt64, world Tuple(first String, second FixedString(1)))
') limit 10000;
') limit 1000;
insert into function file(madness_02735.parquet) select * from madness_02735;
insert into function file(a.csv) select * from madness_02735 order by tuple(*);
insert into function file(b.csv) select aa, aaa, an, aan, l, ln, arrayMap(x->reinterpret(x, 'UInt128'), al) as al_, aaln, mln, t, n.hello, n.world from file(madness_02735.parquet) order by tuple(aa, aaa, an, aan, l, ln, al_, aaln, mln, t, n.hello, n.world);

View File

@ -0,0 +1,9 @@
-- generateSnowflakeID
1
0
0
1
100
-- generateSnowflakeIDThreadMonotonic
1
100

View File

@ -0,0 +1,29 @@
SELECT '-- generateSnowflakeID';
SELECT bitAnd(bitShiftRight(toUInt64(generateSnowflakeID()), 63), 1) = 0; -- check first bit is zero
SELECT generateSnowflakeID(1) = generateSnowflakeID(2); -- disabled common subexpression elimination --> lhs != rhs
SELECT generateSnowflakeID() = generateSnowflakeID(1); -- same as ^^
SELECT generateSnowflakeID(1) = generateSnowflakeID(1); -- enabled common subexpression elimination
SELECT generateSnowflakeID(1, 2); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
SELECT count(*)
FROM
(
SELECT DISTINCT generateSnowflakeID()
FROM numbers(100)
);
SELECT '-- generateSnowflakeIDThreadMonotonic';
SELECT bitAnd(bitShiftRight(toUInt64(generateSnowflakeIDThreadMonotonic()), 63), 1) = 0; -- check first bit is zero
SELECT generateSnowflakeIDThreadMonotonic(1, 2); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
SELECT count(*)
FROM
(
SELECT DISTINCT generateSnowflakeIDThreadMonotonic()
FROM numbers(100)
);

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel, no-ordinary-database, long
# Tags: no-fasttest, no-parallel, no-ordinary-database, long, no-debug, no-asan, no-tsan, no-msan
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1618,6 +1618,8 @@ gcem
generateRandom
generateRandomStructure
generateSeries
generateSnowflakeID
generateSnowflakeIDThreadMonotonic
generateULID
generateUUIDv
geoDistance