mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-03 21:12:28 +00:00
Merge branch 'master' into add-compression-sorts-optimization
This commit is contained in:
commit
a2bcb4d9db
2
.github/workflows/master.yml
vendored
2
.github/workflows/master.yml
vendored
@ -136,7 +136,7 @@ jobs:
|
||||
|
||||
MarkReleaseReady:
|
||||
if: ${{ !failure() && !cancelled() }}
|
||||
needs: [RunConfig, Builds_1]
|
||||
needs: [RunConfig, Builds_1, Builds_2]
|
||||
runs-on: [self-hosted, style-checker-aarch64]
|
||||
steps:
|
||||
- name: Debug
|
||||
|
5
.github/workflows/reusable_build.yml
vendored
5
.github/workflows/reusable_build.yml
vendored
@ -33,6 +33,10 @@ name: Build ClickHouse
|
||||
additional_envs:
|
||||
description: additional ENV variables to setup the job
|
||||
type: string
|
||||
secrets:
|
||||
secret_envs:
|
||||
description: if given, it's passed to the environments
|
||||
required: false
|
||||
|
||||
jobs:
|
||||
Build:
|
||||
@ -54,6 +58,7 @@ jobs:
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
${{inputs.additional_envs}}
|
||||
${{secrets.secret_envs}}
|
||||
DOCKER_TAG<<DOCKER_JSON
|
||||
${{ toJson(fromJson(inputs.data).docker_data.images) }}
|
||||
DOCKER_JSON
|
||||
|
6
.github/workflows/reusable_build_stage.yml
vendored
6
.github/workflows/reusable_build_stage.yml
vendored
@ -13,6 +13,10 @@ name: BuildStageWF
|
||||
description: ci data
|
||||
type: string
|
||||
required: true
|
||||
secrets:
|
||||
secret_envs:
|
||||
description: if given, it's passed to the environments
|
||||
required: false
|
||||
|
||||
jobs:
|
||||
s:
|
||||
@ -30,3 +34,5 @@ jobs:
|
||||
# for now let's do I deep checkout for builds
|
||||
checkout_depth: 0
|
||||
data: ${{ inputs.data }}
|
||||
secrets:
|
||||
secret_envs: ${{ secrets.secret_envs }}
|
||||
|
6
.github/workflows/reusable_test_stage.yml
vendored
6
.github/workflows/reusable_test_stage.yml
vendored
@ -10,6 +10,10 @@ name: StageWF
|
||||
description: ci data
|
||||
type: string
|
||||
required: true
|
||||
secrets:
|
||||
secret_envs:
|
||||
description: if given, it's passed to the environments
|
||||
required: false
|
||||
|
||||
jobs:
|
||||
s:
|
||||
@ -23,3 +27,5 @@ jobs:
|
||||
test_name: ${{ matrix.job_name_and_runner_type.job_name }}
|
||||
runner_type: ${{ matrix.job_name_and_runner_type.runner_type }}
|
||||
data: ${{ inputs.data }}
|
||||
secrets:
|
||||
secret_envs: ${{ secrets.secret_envs }}
|
||||
|
@ -1975,143 +1975,3 @@ Result:
|
||||
│ 2,"good" │
|
||||
└───────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## snowflakeToDateTime
|
||||
|
||||
Extracts the timestamp component of a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) in [DateTime](../data-types/datetime.md) format.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
snowflakeToDateTime(value[, time_zone])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
|
||||
- `time_zone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../data-types/string.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- The timestamp component of `value` as a [DateTime](../data-types/datetime.md) value.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC');
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
|
||||
┌─snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC')─┐
|
||||
│ 2021-08-15 10:57:56 │
|
||||
└──────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## snowflakeToDateTime64
|
||||
|
||||
Extracts the timestamp component of a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) in [DateTime64](../data-types/datetime64.md) format.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
snowflakeToDateTime64(value[, time_zone])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
|
||||
- `time_zone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../data-types/string.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- The timestamp component of `value` as a [DateTime64](../data-types/datetime64.md) with scale = 3, i.e. millisecond precision.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC');
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
|
||||
┌─snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC')─┐
|
||||
│ 2021-08-15 10:58:19.841 │
|
||||
└────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## dateTimeToSnowflake
|
||||
|
||||
Converts a [DateTime](../data-types/datetime.md) value to the first [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) at the giving time.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
dateTimeToSnowflake(value)
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `value` — Date with time. [DateTime](../data-types/datetime.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- Input value converted to the [Int64](../data-types/int-uint.md) data type as the first Snowflake ID at that time.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
WITH toDateTime('2021-08-15 18:57:56', 'Asia/Shanghai') AS dt SELECT dateTimeToSnowflake(dt);
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
┌─dateTimeToSnowflake(dt)─┐
|
||||
│ 1426860702823350272 │
|
||||
└─────────────────────────┘
|
||||
```
|
||||
|
||||
## dateTime64ToSnowflake
|
||||
|
||||
Convert a [DateTime64](../data-types/datetime64.md) to the first [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) at the giving time.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
dateTime64ToSnowflake(value)
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `value` — Date with time. [DateTime64](../data-types/datetime64.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- Input value converted to the [Int64](../data-types/int-uint.md) data type as the first Snowflake ID at that time.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
WITH toDateTime64('2021-08-15 18:57:56.492', 3, 'Asia/Shanghai') AS dt64 SELECT dateTime64ToSnowflake(dt64);
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
┌─dateTime64ToSnowflake(dt64)─┐
|
||||
│ 1426860704886947840 │
|
||||
└─────────────────────────────┘
|
||||
```
|
||||
|
@ -668,7 +668,7 @@ Result:
|
||||
└──────────────────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## serverUUID()
|
||||
## serverUUID
|
||||
|
||||
Returns the random UUID generated during the first start of the ClickHouse server. The UUID is stored in file `uuid` in the ClickHouse server directory (e.g. `/var/lib/clickhouse/`) and retained between server restarts.
|
||||
|
||||
@ -682,6 +682,275 @@ serverUUID()
|
||||
|
||||
- The UUID of the server. [UUID](../data-types/uuid.md).
|
||||
|
||||
## generateSnowflakeID
|
||||
|
||||
Generates a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID).
|
||||
|
||||
The generated Snowflake ID contains the current Unix timestamp in milliseconds 41 (+ 1 top zero bit) bits, followed by machine id (10 bits), a counter (12 bits) to distinguish IDs within a millisecond.
|
||||
For any given timestamp (unix_ts_ms), the counter starts at 0 and is incremented by 1 for each new Snowflake ID until the timestamp changes.
|
||||
In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to 0.
|
||||
|
||||
Function `generateSnowflakeID` guarantees that the counter field within a timestamp increments monotonically across all function invocations in concurrently running threads and queries.
|
||||
|
||||
```
|
||||
0 1 2 3
|
||||
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||
├─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┤
|
||||
|0| timestamp |
|
||||
├─┼ ┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┤
|
||||
| | machine_id | machine_seq_num |
|
||||
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
|
||||
```
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
generateSnowflakeID([expr])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `expr` — An arbitrary [expression](../../sql-reference/syntax.md#syntax-expressions) used to bypass [common subexpression elimination](../../sql-reference/functions/index.md#common-subexpression-elimination) if the function is called multiple times in a query. The value of the expression has no effect on the returned Snowflake ID. Optional.
|
||||
|
||||
**Returned value**
|
||||
|
||||
A value of type UInt64.
|
||||
|
||||
**Example**
|
||||
|
||||
First, create a table with a column of type UInt64, then insert a generated Snowflake ID into the table.
|
||||
|
||||
``` sql
|
||||
CREATE TABLE tab (id UInt64) ENGINE = Memory;
|
||||
|
||||
INSERT INTO tab SELECT generateSnowflakeID();
|
||||
|
||||
SELECT * FROM tab;
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
┌──────────────────id─┐
|
||||
│ 7199081390080409600 │
|
||||
└─────────────────────┘
|
||||
```
|
||||
|
||||
**Example with multiple Snowflake IDs generated per row**
|
||||
|
||||
```sql
|
||||
SELECT generateSnowflakeID(1), generateSnowflakeID(2);
|
||||
|
||||
┌─generateSnowflakeID(1)─┬─generateSnowflakeID(2)─┐
|
||||
│ 7199081609652224000 │ 7199081609652224001 │
|
||||
└────────────────────────┴────────────────────────┘
|
||||
```
|
||||
|
||||
## generateSnowflakeIDThreadMonotonic
|
||||
|
||||
Generates a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID).
|
||||
|
||||
The generated Snowflake ID contains the current Unix timestamp in milliseconds 41 (+ 1 top zero bit) bits, followed by machine id (10 bits), a counter (12 bits) to distinguish IDs within a millisecond.
|
||||
For any given timestamp (unix_ts_ms), the counter starts at 0 and is incremented by 1 for each new Snowflake ID until the timestamp changes.
|
||||
In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to 0.
|
||||
|
||||
This function behaves like `generateSnowflakeID` but gives no guarantee on counter monotony across different simultaneous requests.
|
||||
Monotonicity within one timestamp is guaranteed only within the same thread calling this function to generate Snowflake IDs.
|
||||
|
||||
```
|
||||
0 1 2 3
|
||||
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||
├─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┤
|
||||
|0| timestamp |
|
||||
├─┼ ┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┤
|
||||
| | machine_id | machine_seq_num |
|
||||
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
|
||||
```
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
generateSnowflakeIDThreadMonotonic([expr])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `expr` — An arbitrary [expression](../../sql-reference/syntax.md#syntax-expressions) used to bypass [common subexpression elimination](../../sql-reference/functions/index.md#common-subexpression-elimination) if the function is called multiple times in a query. The value of the expression has no effect on the returned Snowflake ID. Optional.
|
||||
|
||||
**Returned value**
|
||||
|
||||
A value of type UInt64.
|
||||
|
||||
**Example**
|
||||
|
||||
First, create a table with a column of type UInt64, then insert a generated Snowflake ID into the table.
|
||||
|
||||
``` sql
|
||||
CREATE TABLE tab (id UInt64) ENGINE = Memory;
|
||||
|
||||
INSERT INTO tab SELECT generateSnowflakeIDThreadMonotonic();
|
||||
|
||||
SELECT * FROM tab;
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
┌──────────────────id─┐
|
||||
│ 7199082832006627328 │
|
||||
└─────────────────────┘
|
||||
```
|
||||
|
||||
**Example with multiple Snowflake IDs generated per row**
|
||||
|
||||
```sql
|
||||
SELECT generateSnowflakeIDThreadMonotonic(1), generateSnowflakeIDThreadMonotonic(2);
|
||||
|
||||
┌─generateSnowflakeIDThreadMonotonic(1)─┬─generateSnowflakeIDThreadMonotonic(2)─┐
|
||||
│ 7199082940311945216 │ 7199082940316139520 │
|
||||
└───────────────────────────────────────┴───────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## snowflakeToDateTime
|
||||
|
||||
Extracts the timestamp component of a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) in [DateTime](../data-types/datetime.md) format.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
snowflakeToDateTime(value[, time_zone])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
|
||||
- `time_zone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../data-types/string.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- The timestamp component of `value` as a [DateTime](../data-types/datetime.md) value.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC');
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
|
||||
┌─snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC')─┐
|
||||
│ 2021-08-15 10:57:56 │
|
||||
└──────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## snowflakeToDateTime64
|
||||
|
||||
Extracts the timestamp component of a [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) in [DateTime64](../data-types/datetime64.md) format.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
snowflakeToDateTime64(value[, time_zone])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
|
||||
- `time_zone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../data-types/string.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- The timestamp component of `value` as a [DateTime64](../data-types/datetime64.md) with scale = 3, i.e. millisecond precision.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC');
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
|
||||
┌─snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC')─┐
|
||||
│ 2021-08-15 10:58:19.841 │
|
||||
└────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## dateTimeToSnowflake
|
||||
|
||||
Converts a [DateTime](../data-types/datetime.md) value to the first [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) at the giving time.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
dateTimeToSnowflake(value)
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `value` — Date with time. [DateTime](../data-types/datetime.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- Input value converted to the [Int64](../data-types/int-uint.md) data type as the first Snowflake ID at that time.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
WITH toDateTime('2021-08-15 18:57:56', 'Asia/Shanghai') AS dt SELECT dateTimeToSnowflake(dt);
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
┌─dateTimeToSnowflake(dt)─┐
|
||||
│ 1426860702823350272 │
|
||||
└─────────────────────────┘
|
||||
```
|
||||
|
||||
## dateTime64ToSnowflake
|
||||
|
||||
Convert a [DateTime64](../data-types/datetime64.md) to the first [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) at the giving time.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
dateTime64ToSnowflake(value)
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `value` — Date with time. [DateTime64](../data-types/datetime64.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- Input value converted to the [Int64](../data-types/int-uint.md) data type as the first Snowflake ID at that time.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
WITH toDateTime64('2021-08-15 18:57:56.492', 3, 'Asia/Shanghai') AS dt64 SELECT dateTime64ToSnowflake(dt64);
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```response
|
||||
┌─dateTime64ToSnowflake(dt64)─┐
|
||||
│ 1426860704886947840 │
|
||||
└─────────────────────────────┘
|
||||
```
|
||||
|
||||
## See also
|
||||
|
||||
- [dictGetUUID](../functions/ext-dict-functions.md#ext_dict_functions-other)
|
||||
|
@ -10,6 +10,7 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int KEEPER_EXCEPTION;
|
||||
}
|
||||
|
||||
@ -441,7 +442,7 @@ void ReconfigCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient
|
||||
new_members = query->args[1].safeGet<String>();
|
||||
break;
|
||||
default:
|
||||
UNREACHABLE();
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected operation: {}", operation);
|
||||
}
|
||||
|
||||
auto response = client->zookeeper->reconfig(joining, leaving, new_members);
|
||||
|
@ -155,8 +155,8 @@ auto instructionFailToString(InstructionFail fail)
|
||||
ret("AVX2");
|
||||
case InstructionFail::AVX512:
|
||||
ret("AVX512");
|
||||
#undef ret
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,15 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <Server/IServer.h>
|
||||
|
||||
#include <Daemon/BaseDaemon.h>
|
||||
#include <Server/HTTP/HTTPContext.h>
|
||||
#include <Server/TCPProtocolStackFactory.h>
|
||||
#include <Server/ServerType.h>
|
||||
#include <Poco/Net/HTTPServerParams.h>
|
||||
|
||||
/** Server provides three interfaces:
|
||||
* 1. HTTP - simple interface for any applications.
|
||||
* 1. HTTP, GRPC - simple interfaces for any applications.
|
||||
* 2. TCP - interface for native clickhouse-client and for server to server internal communications.
|
||||
* More rich and efficient, but less compatible
|
||||
* - data is transferred by columns;
|
||||
@ -18,43 +13,21 @@
|
||||
* 3. Interserver HTTP - for replication.
|
||||
*/
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
namespace Net
|
||||
{
|
||||
class ServerSocket;
|
||||
}
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class AsynchronousMetrics;
|
||||
class ProtocolServerAdapter;
|
||||
|
||||
class Server : public BaseDaemon, public IServer
|
||||
{
|
||||
public:
|
||||
using ServerApplication::run;
|
||||
|
||||
Poco::Util::LayeredConfiguration & config() const override
|
||||
{
|
||||
return BaseDaemon::config();
|
||||
}
|
||||
Poco::Util::LayeredConfiguration & config() const override { return BaseDaemon::config(); }
|
||||
|
||||
Poco::Logger & logger() const override
|
||||
{
|
||||
return BaseDaemon::logger();
|
||||
}
|
||||
Poco::Logger & logger() const override { return BaseDaemon::logger(); }
|
||||
|
||||
ContextMutablePtr context() const override
|
||||
{
|
||||
return global_context;
|
||||
}
|
||||
ContextMutablePtr context() const override { return global_context; }
|
||||
|
||||
bool isCancelled() const override
|
||||
{
|
||||
return BaseDaemon::isCancelled();
|
||||
}
|
||||
bool isCancelled() const override { return BaseDaemon::isCancelled(); }
|
||||
|
||||
void defineOptions(Poco::Util::OptionSet & _options) override;
|
||||
|
||||
@ -73,64 +46,6 @@ private:
|
||||
ContextMutablePtr global_context;
|
||||
/// Updated/recent config, to compare http_handlers
|
||||
ConfigurationPtr latest_config;
|
||||
|
||||
HTTPContextPtr httpContext() const;
|
||||
|
||||
Poco::Net::SocketAddress socketBindListen(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
Poco::Net::ServerSocket & socket,
|
||||
const std::string & host,
|
||||
UInt16 port,
|
||||
[[maybe_unused]] bool secure = false) const;
|
||||
|
||||
std::unique_ptr<TCPProtocolStackFactory> buildProtocolStackFromConfig(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & protocol,
|
||||
Poco::Net::HTTPServerParams::Ptr http_params,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
bool & is_secure);
|
||||
|
||||
using CreateServerFunc = std::function<ProtocolServerAdapter(UInt16)>;
|
||||
void createServer(
|
||||
Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & listen_host,
|
||||
const char * port_name,
|
||||
bool listen_try,
|
||||
bool start_server,
|
||||
std::vector<ProtocolServerAdapter> & servers,
|
||||
CreateServerFunc && func) const;
|
||||
|
||||
void createServers(
|
||||
Poco::Util::AbstractConfiguration & config,
|
||||
const Strings & listen_hosts,
|
||||
bool listen_try,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
std::vector<ProtocolServerAdapter> & servers,
|
||||
bool start_servers = false,
|
||||
const ServerType & server_type = ServerType(ServerType::Type::QUERIES_ALL));
|
||||
|
||||
void createInterserverServers(
|
||||
Poco::Util::AbstractConfiguration & config,
|
||||
const Strings & interserver_listen_hosts,
|
||||
bool listen_try,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
std::vector<ProtocolServerAdapter> & servers,
|
||||
bool start_servers = false,
|
||||
const ServerType & server_type = ServerType(ServerType::Type::QUERIES_ALL));
|
||||
|
||||
void updateServers(
|
||||
Poco::Util::AbstractConfiguration & config,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
std::vector<ProtocolServerAdapter> & servers,
|
||||
std::vector<ProtocolServerAdapter> & servers_to_start_before_tables);
|
||||
|
||||
void stopServers(
|
||||
std::vector<ProtocolServerAdapter> & servers,
|
||||
const ServerType & server_type
|
||||
) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -144,8 +144,7 @@ AccessEntityPtr deserializeAccessEntity(const String & definition, const String
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage("Could not parse " + file_path);
|
||||
e.rethrow();
|
||||
UNREACHABLE();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -258,7 +258,7 @@ namespace
|
||||
case TABLE_LEVEL: return AccessFlags::allFlagsGrantableOnTableLevel();
|
||||
case COLUMN_LEVEL: return AccessFlags::allFlagsGrantableOnColumnLevel();
|
||||
}
|
||||
UNREACHABLE();
|
||||
chassert(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -257,8 +257,7 @@ std::vector<UUID> IAccessStorage::insert(const std::vector<AccessEntityPtr> & mu
|
||||
}
|
||||
e.addMessage("After successfully inserting {}/{}: {}", successfully_inserted.size(), multiple_entities.size(), successfully_inserted_str);
|
||||
}
|
||||
e.rethrow();
|
||||
UNREACHABLE();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
@ -361,8 +360,7 @@ std::vector<UUID> IAccessStorage::remove(const std::vector<UUID> & ids, bool thr
|
||||
}
|
||||
e.addMessage("After successfully removing {}/{}: {}", removed_names.size(), ids.size(), removed_names_str);
|
||||
}
|
||||
e.rethrow();
|
||||
UNREACHABLE();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
@ -458,8 +456,7 @@ std::vector<UUID> IAccessStorage::update(const std::vector<UUID> & ids, const Up
|
||||
}
|
||||
e.addMessage("After successfully updating {}/{}: {}", names_of_updated.size(), ids.size(), names_of_updated_str);
|
||||
}
|
||||
e.rethrow();
|
||||
UNREACHABLE();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,14 +60,13 @@ struct GroupArrayTrait
|
||||
template <typename Trait>
|
||||
constexpr const char * getNameByTrait()
|
||||
{
|
||||
if (Trait::last)
|
||||
if constexpr (Trait::last)
|
||||
return "groupArrayLast";
|
||||
if (Trait::sampler == Sampler::NONE)
|
||||
return "groupArray";
|
||||
else if (Trait::sampler == Sampler::RNG)
|
||||
return "groupArraySample";
|
||||
|
||||
UNREACHABLE();
|
||||
switch (Trait::sampler)
|
||||
{
|
||||
case Sampler::NONE: return "groupArray";
|
||||
case Sampler::RNG: return "groupArraySample";
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
@ -414,7 +414,6 @@ public:
|
||||
break;
|
||||
return (i == events_size) ? base - i : unmatched_idx;
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
|
||||
|
@ -463,7 +463,6 @@ public:
|
||||
return "sumWithOverflow";
|
||||
else if constexpr (Type == AggregateFunctionTypeSumKahan)
|
||||
return "sumKahan";
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
explicit AggregateFunctionSum(const DataTypes & argument_types_)
|
||||
|
@ -236,6 +236,7 @@ add_object_library(clickhouse_client Client)
|
||||
add_object_library(clickhouse_bridge BridgeHelper)
|
||||
add_object_library(clickhouse_server Server)
|
||||
add_object_library(clickhouse_server_http Server/HTTP)
|
||||
add_object_library(clickhouse_server_manager Server/ServersManager)
|
||||
add_object_library(clickhouse_formats Formats)
|
||||
add_object_library(clickhouse_processors Processors)
|
||||
add_object_library(clickhouse_processors_executors Processors/Executors)
|
||||
|
@ -41,7 +41,6 @@ UInt8 getDayOfWeek(const cctz::civil_day & date)
|
||||
case cctz::weekday::saturday: return 6;
|
||||
case cctz::weekday::sunday: return 7;
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
inline cctz::time_point<cctz::seconds> lookupTz(const cctz::time_zone & cctz_time_zone, const cctz::civil_day & date)
|
||||
|
@ -34,8 +34,6 @@ Int64 IntervalKind::toAvgNanoseconds() const
|
||||
default:
|
||||
return toAvgSeconds() * NANOSECONDS_PER_SECOND;
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
Int32 IntervalKind::toAvgSeconds() const
|
||||
@ -54,7 +52,6 @@ Int32 IntervalKind::toAvgSeconds() const
|
||||
case IntervalKind::Kind::Quarter: return 7889238; /// Exactly 1/4 of a year.
|
||||
case IntervalKind::Kind::Year: return 31556952; /// The average length of a Gregorian year is equal to 365.2425 days
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
Float64 IntervalKind::toSeconds() const
|
||||
@ -80,7 +77,6 @@ Float64 IntervalKind::toSeconds() const
|
||||
default:
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not possible to get precise number of seconds in non-precise interval");
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
bool IntervalKind::isFixedLength() const
|
||||
@ -99,7 +95,6 @@ bool IntervalKind::isFixedLength() const
|
||||
case IntervalKind::Kind::Quarter:
|
||||
case IntervalKind::Kind::Year: return false;
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
IntervalKind IntervalKind::fromAvgSeconds(Int64 num_seconds)
|
||||
@ -141,7 +136,6 @@ const char * IntervalKind::toKeyword() const
|
||||
case IntervalKind::Kind::Quarter: return "QUARTER";
|
||||
case IntervalKind::Kind::Year: return "YEAR";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
|
||||
@ -161,7 +155,6 @@ const char * IntervalKind::toLowercasedKeyword() const
|
||||
case IntervalKind::Kind::Quarter: return "quarter";
|
||||
case IntervalKind::Kind::Year: return "year";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
|
||||
@ -192,7 +185,6 @@ const char * IntervalKind::toDateDiffUnit() const
|
||||
case IntervalKind::Kind::Year:
|
||||
return "year";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
|
||||
@ -223,7 +215,6 @@ const char * IntervalKind::toNameOfFunctionToIntervalDataType() const
|
||||
case IntervalKind::Kind::Year:
|
||||
return "toIntervalYear";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
|
||||
@ -257,7 +248,6 @@ const char * IntervalKind::toNameOfFunctionExtractTimePart() const
|
||||
case IntervalKind::Kind::Year:
|
||||
return "toYear";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
|
||||
|
@ -54,8 +54,6 @@ String toString(TargetArch arch)
|
||||
case TargetArch::AMXTILE: return "amxtile";
|
||||
case TargetArch::AMXINT8: return "amxint8";
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -75,7 +75,6 @@ const char * TasksStatsCounters::metricsProviderString(MetricsProvider provider)
|
||||
case MetricsProvider::Netlink:
|
||||
return "netlink";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
bool TasksStatsCounters::checkIfAvailable()
|
||||
|
@ -146,8 +146,6 @@ const char * errorMessage(Error code)
|
||||
case Error::ZSESSIONMOVED: return "Session moved to another server, so operation is ignored";
|
||||
case Error::ZNOTREADONLY: return "State-changing request is passed to read-only server";
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
bool isHardwareError(Error zk_return_code)
|
||||
|
@ -466,7 +466,6 @@ void CompressionCodecDeflateQpl::doDecompressData(const char * source, UInt32 so
|
||||
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
|
||||
return;
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
void CompressionCodecDeflateQpl::flushAsynchronousDecompressRequests()
|
||||
|
@ -21,6 +21,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
/** NOTE DoubleDelta is surprisingly bad name. The only excuse is that it comes from an academic paper.
|
||||
* Most people will think that "double delta" is just applying delta transform twice.
|
||||
* But in fact it is something more than applying delta transform twice.
|
||||
@ -142,9 +147,9 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_COMPRESS;
|
||||
extern const int CANNOT_DECOMPRESS;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
|
||||
extern const int ILLEGAL_CODEC_PARAMETER;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -163,9 +168,8 @@ inline Int64 getMaxValueForByteSize(Int8 byte_size)
|
||||
case sizeof(UInt64):
|
||||
return std::numeric_limits<Int64>::max();
|
||||
default:
|
||||
assert(false && "only 1, 2, 4 and 8 data sizes are supported");
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "only 1, 2, 4 and 8 data sizes are supported");
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
struct WriteSpec
|
||||
|
@ -5,6 +5,12 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
ClusterUpdateActions joiningToClusterUpdates(const ClusterConfigPtr & cfg, std::string_view joining)
|
||||
{
|
||||
ClusterUpdateActions out;
|
||||
@ -79,7 +85,7 @@ String serializeClusterConfig(const ClusterConfigPtr & cfg, const ClusterUpdateA
|
||||
new_config.emplace_back(RaftServerConfig{*cfg->get_server(priority->id)});
|
||||
}
|
||||
else
|
||||
UNREACHABLE();
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected update");
|
||||
}
|
||||
|
||||
for (const auto & item : cfg->get_servers())
|
||||
|
@ -990,7 +990,7 @@ KeeperServer::ConfigUpdateState KeeperServer::applyConfigUpdate(
|
||||
raft_instance->set_priority(update->id, update->priority, /*broadcast on live leader*/true);
|
||||
return Accepted;
|
||||
}
|
||||
UNREACHABLE();
|
||||
std::unreachable();
|
||||
}
|
||||
|
||||
ClusterUpdateActions KeeperServer::getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
|
||||
|
@ -667,8 +667,6 @@ public:
|
||||
case Types::AggregateFunctionState: return f(field.template get<AggregateFunctionStateData>());
|
||||
case Types::CustomType: return f(field.template get<CustomType>());
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
String dump() const;
|
||||
|
@ -36,7 +36,6 @@ String ISerialization::kindToString(Kind kind)
|
||||
case Kind::SPARSE:
|
||||
return "Sparse";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
ISerialization::Kind ISerialization::stringToKind(const String & str)
|
||||
|
@ -140,7 +140,6 @@ private:
|
||||
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
|
||||
return "REMOTE_FS_READ_AND_PUT_IN_CACHE";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
size_t first_offset = 0;
|
||||
|
@ -17,7 +17,6 @@ std::string toString(MetadataStorageTransactionState state)
|
||||
case MetadataStorageTransactionState::PARTIALLY_ROLLED_BACK:
|
||||
return "PARTIALLY_ROLLED_BACK";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -112,7 +112,6 @@ DiskPtr VolumeJBOD::getDisk(size_t /* index */) const
|
||||
return disks_by_size.top().disk;
|
||||
}
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
ReservationPtr VolumeJBOD::reserve(UInt64 bytes)
|
||||
@ -164,7 +163,6 @@ ReservationPtr VolumeJBOD::reserve(UInt64 bytes)
|
||||
return reservation;
|
||||
}
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
bool VolumeJBOD::areMergesAvoided() const
|
||||
|
@ -62,7 +62,6 @@ String escapingRuleToString(FormatSettings::EscapingRule escaping_rule)
|
||||
case FormatSettings::EscapingRule::Raw:
|
||||
return "Raw";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings)
|
||||
|
@ -21,8 +21,6 @@ namespace ErrorCodes
|
||||
|
||||
const ColumnConst * checkAndGetColumnConstStringOrFixedString(const IColumn * column)
|
||||
{
|
||||
if (!column)
|
||||
return {};
|
||||
if (!isColumnConst(*column))
|
||||
return {};
|
||||
|
||||
|
@ -149,8 +149,6 @@ struct IntegerRoundingComputation
|
||||
return x;
|
||||
}
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
static ALWAYS_INLINE T compute(T x, T scale)
|
||||
@ -163,8 +161,6 @@ struct IntegerRoundingComputation
|
||||
case ScaleMode::Negative:
|
||||
return computeImpl(x, scale);
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
static ALWAYS_INLINE void compute(const T * __restrict in, size_t scale, T * __restrict out) requires std::integral<T>
|
||||
@ -247,8 +243,6 @@ inline float roundWithMode(float x, RoundingMode mode)
|
||||
case RoundingMode::Ceil: return ceilf(x);
|
||||
case RoundingMode::Trunc: return truncf(x);
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
inline double roundWithMode(double x, RoundingMode mode)
|
||||
@ -260,8 +254,6 @@ inline double roundWithMode(double x, RoundingMode mode)
|
||||
case RoundingMode::Ceil: return ceil(x);
|
||||
case RoundingMode::Trunc: return trunc(x);
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
@ -232,7 +232,6 @@ struct TimeWindowImpl<TUMBLE>
|
||||
default:
|
||||
throw Exception(ErrorCodes::SYNTAX_ERROR, "Fraction seconds are unsupported by windows yet");
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
template <typename ToType, IntervalKind::Kind unit>
|
||||
@ -422,7 +421,6 @@ struct TimeWindowImpl<HOP>
|
||||
default:
|
||||
throw Exception(ErrorCodes::SYNTAX_ERROR, "Fraction seconds are unsupported by windows yet");
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
template <typename ToType, IntervalKind::Kind kind>
|
||||
|
@ -381,8 +381,6 @@ bool PointInPolygonWithGrid<CoordinateType>::contains(CoordinateType x, Coordina
|
||||
case CellType::complexPolygon:
|
||||
return boost::geometry::within(Point(x, y), polygons[cell.index_of_inner_polygon]);
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
|
||||
|
@ -35,7 +35,6 @@ namespace
|
||||
case UserDefinedSQLObjectType::Function:
|
||||
return "function_";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
constexpr std::string_view sql_extension = ".sql";
|
||||
|
255
src/Functions/generateSnowflakeID.cpp
Normal file
255
src/Functions/generateSnowflakeID.cpp
Normal file
@ -0,0 +1,255 @@
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/FunctionsRandom.h>
|
||||
#include <Functions/FunctionHelpers.h>
|
||||
#include <Core/ServerUUID.h>
|
||||
#include <Common/Logger.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include "base/types.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
/* Snowflake ID
|
||||
https://en.wikipedia.org/wiki/Snowflake_ID
|
||||
|
||||
0 1 2 3
|
||||
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||
├─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┤
|
||||
|0| timestamp |
|
||||
├─┼ ┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┼─┤
|
||||
| | machine_id | machine_seq_num |
|
||||
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
|
||||
|
||||
- The first 41 (+ 1 top zero bit) bits is the timestamp (millisecond since Unix epoch 1 Jan 1970)
|
||||
- The middle 10 bits are the machine ID
|
||||
- The last 12 bits are a counter to disambiguate multiple snowflakeIDs generated within the same millisecond by different processes
|
||||
*/
|
||||
|
||||
/// bit counts
|
||||
constexpr auto timestamp_bits_count = 41;
|
||||
constexpr auto machine_id_bits_count = 10;
|
||||
constexpr auto machine_seq_num_bits_count = 12;
|
||||
|
||||
/// bits masks for Snowflake ID components
|
||||
constexpr uint64_t machine_id_mask = ((1ull << machine_id_bits_count) - 1) << machine_seq_num_bits_count;
|
||||
constexpr uint64_t machine_seq_num_mask = (1ull << machine_seq_num_bits_count) - 1;
|
||||
|
||||
/// max values
|
||||
constexpr uint64_t max_machine_seq_num = machine_seq_num_mask;
|
||||
|
||||
uint64_t getTimestamp()
|
||||
{
|
||||
auto now = std::chrono::system_clock::now();
|
||||
auto ticks_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
|
||||
return static_cast<uint64_t>(ticks_since_epoch) & ((1ull << timestamp_bits_count) - 1);
|
||||
}
|
||||
|
||||
uint64_t getMachineIdImpl()
|
||||
{
|
||||
UUID server_uuid = ServerUUID::get();
|
||||
/// hash into 64 bits
|
||||
uint64_t hi = UUIDHelpers::getHighBytes(server_uuid);
|
||||
uint64_t lo = UUIDHelpers::getLowBytes(server_uuid);
|
||||
/// return only 10 bits
|
||||
return (((hi * 11) ^ (lo * 17)) & machine_id_mask) >> machine_seq_num_bits_count;
|
||||
}
|
||||
|
||||
uint64_t getMachineId()
|
||||
{
|
||||
static uint64_t machine_id = getMachineIdImpl();
|
||||
return machine_id;
|
||||
}
|
||||
|
||||
struct SnowflakeId
|
||||
{
|
||||
uint64_t timestamp;
|
||||
uint64_t machine_id;
|
||||
uint64_t machine_seq_num;
|
||||
};
|
||||
|
||||
SnowflakeId toSnowflakeId(uint64_t snowflake)
|
||||
{
|
||||
return {.timestamp = (snowflake >> (machine_id_bits_count + machine_seq_num_bits_count)),
|
||||
.machine_id = ((snowflake & machine_id_mask) >> machine_seq_num_bits_count),
|
||||
.machine_seq_num = (snowflake & machine_seq_num_mask)};
|
||||
}
|
||||
|
||||
uint64_t fromSnowflakeId(SnowflakeId components)
|
||||
{
|
||||
return (components.timestamp << (machine_id_bits_count + machine_seq_num_bits_count) |
|
||||
components.machine_id << (machine_seq_num_bits_count) |
|
||||
components.machine_seq_num);
|
||||
}
|
||||
|
||||
struct SnowflakeIdRange
|
||||
{
|
||||
SnowflakeId begin; /// inclusive
|
||||
SnowflakeId end; /// exclusive
|
||||
};
|
||||
|
||||
/// To get the range of `input_rows_count` Snowflake IDs from `max(available, now)`:
|
||||
/// 1. calculate Snowflake ID by current timestamp (`now`)
|
||||
/// 2. `begin = max(available, now)`
|
||||
/// 3. Calculate `end = begin + input_rows_count` handling `machine_seq_num` overflow
|
||||
SnowflakeIdRange getRangeOfAvailableIds(const SnowflakeId & available, size_t input_rows_count)
|
||||
{
|
||||
/// 1. `now`
|
||||
SnowflakeId begin = {.timestamp = getTimestamp(), .machine_id = getMachineId(), .machine_seq_num = 0};
|
||||
|
||||
/// 2. `begin`
|
||||
if (begin.timestamp <= available.timestamp)
|
||||
{
|
||||
begin.timestamp = available.timestamp;
|
||||
begin.machine_seq_num = available.machine_seq_num;
|
||||
}
|
||||
|
||||
/// 3. `end = begin + input_rows_count`
|
||||
SnowflakeId end;
|
||||
const uint64_t seq_nums_in_current_timestamp_left = (max_machine_seq_num - begin.machine_seq_num + 1);
|
||||
if (input_rows_count >= seq_nums_in_current_timestamp_left)
|
||||
/// if sequence numbers in current timestamp is not enough for rows --> depending on how many elements input_rows_count overflows, forward timestamp by at least 1 tick
|
||||
end.timestamp = begin.timestamp + 1 + (input_rows_count - seq_nums_in_current_timestamp_left) / (max_machine_seq_num + 1);
|
||||
else
|
||||
end.timestamp = begin.timestamp;
|
||||
|
||||
end.machine_id = begin.machine_id;
|
||||
end.machine_seq_num = (begin.machine_seq_num + input_rows_count) & machine_seq_num_mask;
|
||||
|
||||
return {begin, end};
|
||||
}
|
||||
|
||||
struct GlobalCounterPolicy
|
||||
{
|
||||
static constexpr auto name = "generateSnowflakeID";
|
||||
static constexpr auto description = R"(Generates a Snowflake ID. The generated Snowflake ID contains the current Unix timestamp in milliseconds 41 (+ 1 top zero bit) bits, followed by machine id (10 bits), a counter (12 bits) to distinguish IDs within a millisecond. For any given timestamp (unix_ts_ms), the counter starts at 0 and is incremented by 1 for each new Snowflake ID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to 0. Function generateSnowflakeID guarantees that the counter field within a timestamp increments monotonically across all function invocations in concurrently running threads and queries.)";
|
||||
|
||||
/// Guarantee counter monotonicity within one timestamp across all threads generating Snowflake IDs simultaneously.
|
||||
struct Data
|
||||
{
|
||||
static inline std::atomic<uint64_t> lowest_available_snowflake_id = 0;
|
||||
|
||||
SnowflakeId reserveRange(size_t input_rows_count)
|
||||
{
|
||||
uint64_t available_snowflake_id = lowest_available_snowflake_id.load();
|
||||
SnowflakeIdRange range;
|
||||
do
|
||||
{
|
||||
range = getRangeOfAvailableIds(toSnowflakeId(available_snowflake_id), input_rows_count);
|
||||
}
|
||||
while (!lowest_available_snowflake_id.compare_exchange_weak(available_snowflake_id, fromSnowflakeId(range.end)));
|
||||
/// if CAS failed --> another thread updated `lowest_available_snowflake_id` and we re-try
|
||||
/// else --> our thread reserved ID range [begin, end) and return the beginning of the range
|
||||
|
||||
return range.begin;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
struct ThreadLocalCounterPolicy
|
||||
{
|
||||
static constexpr auto name = "generateSnowflakeIDThreadMonotonic";
|
||||
static constexpr auto description = R"(Generates a Snowflake ID. The generated Snowflake ID contains the current Unix timestamp in milliseconds 41 (+ 1 top zero bit) bits, followed by machine id (10 bits), a counter (12 bits) to distinguish IDs within a millisecond. For any given timestamp (unix_ts_ms), the counter starts at 0 and is incremented by 1 for each new Snowflake ID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to 0. This function behaves like generateSnowflakeID but gives no guarantee on counter monotony across different simultaneous requests. Monotonicity within one timestamp is guaranteed only within the same thread calling this function to generate Snowflake IDs.)";
|
||||
|
||||
/// Guarantee counter monotonicity within one timestamp within the same thread. Faster than GlobalCounterPolicy if a query uses multiple threads.
|
||||
struct Data
|
||||
{
|
||||
static inline thread_local uint64_t lowest_available_snowflake_id = 0;
|
||||
|
||||
SnowflakeId reserveRange(size_t input_rows_count)
|
||||
{
|
||||
SnowflakeIdRange range = getRangeOfAvailableIds(toSnowflakeId(lowest_available_snowflake_id), input_rows_count);
|
||||
lowest_available_snowflake_id = fromSnowflakeId(range.end);
|
||||
return range.begin;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
template <typename FillPolicy>
|
||||
class FunctionGenerateSnowflakeID : public IFunction, public FillPolicy
|
||||
{
|
||||
public:
|
||||
static FunctionPtr create(ContextPtr /*context*/) { return std::make_shared<FunctionGenerateSnowflakeID>(); }
|
||||
|
||||
String getName() const override { return FillPolicy::name; }
|
||||
size_t getNumberOfArguments() const override { return 0; }
|
||||
bool isDeterministic() const override { return false; }
|
||||
bool isDeterministicInScopeOfQuery() const override { return false; }
|
||||
bool useDefaultImplementationForNulls() const override { return false; }
|
||||
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
|
||||
bool isVariadic() const override { return true; }
|
||||
|
||||
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
|
||||
{
|
||||
FunctionArgumentDescriptors mandatory_args;
|
||||
FunctionArgumentDescriptors optional_args{
|
||||
{"expr", nullptr, nullptr, "Arbitrary expression"}
|
||||
};
|
||||
validateFunctionArgumentTypes(*this, arguments, mandatory_args, optional_args);
|
||||
|
||||
return std::make_shared<DataTypeUInt64>();
|
||||
}
|
||||
|
||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & /*arguments*/, const DataTypePtr &, size_t input_rows_count) const override
|
||||
{
|
||||
auto col_res = ColumnVector<UInt64>::create();
|
||||
typename ColumnVector<UInt64>::Container & vec_to = col_res->getData();
|
||||
|
||||
if (input_rows_count != 0)
|
||||
{
|
||||
vec_to.resize(input_rows_count);
|
||||
|
||||
typename FillPolicy::Data data;
|
||||
SnowflakeId snowflake_id = data.reserveRange(input_rows_count); /// returns begin of available snowflake ids range
|
||||
|
||||
for (UInt64 & to_row : vec_to)
|
||||
{
|
||||
to_row = fromSnowflakeId(snowflake_id);
|
||||
if (snowflake_id.machine_seq_num == max_machine_seq_num)
|
||||
{
|
||||
/// handle overflow
|
||||
snowflake_id.machine_seq_num = 0;
|
||||
++snowflake_id.timestamp;
|
||||
}
|
||||
else
|
||||
{
|
||||
++snowflake_id.machine_seq_num;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return col_res;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
template<typename FillPolicy>
|
||||
void registerSnowflakeIDGenerator(auto & factory)
|
||||
{
|
||||
static constexpr auto doc_syntax_format = "{}([expression])";
|
||||
static constexpr auto example_format = "SELECT {}()";
|
||||
static constexpr auto multiple_example_format = "SELECT {f}(1), {f}(2)";
|
||||
|
||||
FunctionDocumentation::Description description = FillPolicy::description;
|
||||
FunctionDocumentation::Syntax syntax = fmt::format(doc_syntax_format, FillPolicy::name);
|
||||
FunctionDocumentation::Arguments arguments = {{"expression", "The expression is used to bypass common subexpression elimination if the function is called multiple times in a query but otherwise ignored. Optional."}};
|
||||
FunctionDocumentation::ReturnedValue returned_value = "A value of type UInt64";
|
||||
FunctionDocumentation::Examples examples = {{"single", fmt::format(example_format, FillPolicy::name), ""}, {"multiple", fmt::format(multiple_example_format, fmt::arg("f", FillPolicy::name)), ""}};
|
||||
FunctionDocumentation::Categories categories = {"Snowflake ID"};
|
||||
|
||||
factory.template registerFunction<FunctionGenerateSnowflakeID<FillPolicy>>({description, syntax, arguments, returned_value, examples, categories}, FunctionFactory::CaseInsensitive);
|
||||
}
|
||||
|
||||
REGISTER_FUNCTION(GenerateSnowflakeID)
|
||||
{
|
||||
registerSnowflakeIDGenerator<GlobalCounterPolicy>(factory);
|
||||
registerSnowflakeIDGenerator<ThreadLocalCounterPolicy>(factory);
|
||||
}
|
||||
|
||||
}
|
@ -76,7 +76,7 @@ void setVariant(UUID & uuid)
|
||||
struct FillAllRandomPolicy
|
||||
{
|
||||
static constexpr auto name = "generateUUIDv7NonMonotonic";
|
||||
static constexpr auto doc_description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), and a random field (74 bit, including a 2-bit variant field "2") to distinguish UUIDs within a millisecond. This function is the fastest generateUUIDv7* function but it gives no monotonicity guarantees within a timestamp.)";
|
||||
static constexpr auto description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), and a random field (74 bit, including a 2-bit variant field "2") to distinguish UUIDs within a millisecond. This function is the fastest generateUUIDv7* function but it gives no monotonicity guarantees within a timestamp.)";
|
||||
struct Data
|
||||
{
|
||||
void generate(UUID & uuid, uint64_t ts)
|
||||
@ -136,7 +136,7 @@ struct CounterFields
|
||||
struct GlobalCounterPolicy
|
||||
{
|
||||
static constexpr auto name = "generateUUIDv7";
|
||||
static constexpr auto doc_description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), a counter (42 bit, including a variant field "2", 2 bit) to distinguish UUIDs within a millisecond, and a random field (32 bits). For any given timestamp (unix_ts_ms), the counter starts at a random value and is incremented by 1 for each new UUID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to a random new start value. Function generateUUIDv7 guarantees that the counter field within a timestamp increments monotonically across all function invocations in concurrently running threads and queries.)";
|
||||
static constexpr auto description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), a counter (42 bit, including a variant field "2", 2 bit) to distinguish UUIDs within a millisecond, and a random field (32 bits). For any given timestamp (unix_ts_ms), the counter starts at a random value and is incremented by 1 for each new UUID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to a random new start value. Function generateUUIDv7 guarantees that the counter field within a timestamp increments monotonically across all function invocations in concurrently running threads and queries.)";
|
||||
|
||||
/// Guarantee counter monotonicity within one timestamp across all threads generating UUIDv7 simultaneously.
|
||||
struct Data
|
||||
@ -159,7 +159,7 @@ struct GlobalCounterPolicy
|
||||
struct ThreadLocalCounterPolicy
|
||||
{
|
||||
static constexpr auto name = "generateUUIDv7ThreadMonotonic";
|
||||
static constexpr auto doc_description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), a counter (42 bit, including a variant field "2", 2 bit) to distinguish UUIDs within a millisecond, and a random field (32 bits). For any given timestamp (unix_ts_ms), the counter starts at a random value and is incremented by 1 for each new UUID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to a random new start value. This function behaves like generateUUIDv7 but gives no guarantee on counter monotony across different simultaneous requests. Monotonicity within one timestamp is guaranteed only within the same thread calling this function to generate UUIDs.)";
|
||||
static constexpr auto description = R"(Generates a UUID of version 7. The generated UUID contains the current Unix timestamp in milliseconds (48 bits), followed by version "7" (4 bits), a counter (42 bit, including a variant field "2", 2 bit) to distinguish UUIDs within a millisecond, and a random field (32 bits). For any given timestamp (unix_ts_ms), the counter starts at a random value and is incremented by 1 for each new UUID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to a random new start value. This function behaves like generateUUIDv7 but gives no guarantee on counter monotony across different simultaneous requests. Monotonicity within one timestamp is guaranteed only within the same thread calling this function to generate UUIDs.)";
|
||||
|
||||
/// Guarantee counter monotonicity within one timestamp within the same thread. Faster than GlobalCounterPolicy if a query uses multiple threads.
|
||||
struct Data
|
||||
@ -186,7 +186,6 @@ class FunctionGenerateUUIDv7Base : public IFunction, public FillPolicy
|
||||
{
|
||||
public:
|
||||
String getName() const final { return FillPolicy::name; }
|
||||
|
||||
size_t getNumberOfArguments() const final { return 0; }
|
||||
bool isDeterministic() const override { return false; }
|
||||
bool isDeterministicInScopeOfQuery() const final { return false; }
|
||||
@ -198,7 +197,7 @@ public:
|
||||
{
|
||||
FunctionArgumentDescriptors mandatory_args;
|
||||
FunctionArgumentDescriptors optional_args{
|
||||
{"expr", nullptr, nullptr, "Arbitrary Expression"}
|
||||
{"expr", nullptr, nullptr, "Arbitrary expression"}
|
||||
};
|
||||
validateFunctionArgumentTypes(*this, arguments, mandatory_args, optional_args);
|
||||
|
||||
@ -264,20 +263,20 @@ private:
|
||||
};
|
||||
|
||||
template<typename FillPolicy>
|
||||
void registerUUIDv7Generator(auto& factory)
|
||||
void registerUUIDv7Generator(auto & factory)
|
||||
{
|
||||
static constexpr auto doc_syntax_format = "{}([expression])";
|
||||
static constexpr auto example_format = "SELECT {}()";
|
||||
static constexpr auto multiple_example_format = "SELECT {f}(1), {f}(2)";
|
||||
|
||||
FunctionDocumentation::Description doc_description = FillPolicy::doc_description;
|
||||
FunctionDocumentation::Syntax doc_syntax = fmt::format(doc_syntax_format, FillPolicy::name);
|
||||
FunctionDocumentation::Arguments doc_arguments = {{"expression", "The expression is used to bypass common subexpression elimination if the function is called multiple times in a query but otherwise ignored. Optional."}};
|
||||
FunctionDocumentation::ReturnedValue doc_returned_value = "A value of type UUID version 7.";
|
||||
FunctionDocumentation::Examples doc_examples = {{"uuid", fmt::format(example_format, FillPolicy::name), ""}, {"multiple", fmt::format(multiple_example_format, fmt::arg("f", FillPolicy::name)), ""}};
|
||||
FunctionDocumentation::Categories doc_categories = {"UUID"};
|
||||
FunctionDocumentation::Description description = FillPolicy::description;
|
||||
FunctionDocumentation::Syntax syntax = fmt::format(doc_syntax_format, FillPolicy::name);
|
||||
FunctionDocumentation::Arguments arguments = {{"expression", "The expression is used to bypass common subexpression elimination if the function is called multiple times in a query but otherwise ignored. Optional."}};
|
||||
FunctionDocumentation::ReturnedValue returned_value = "A value of type UUID version 7.";
|
||||
FunctionDocumentation::Examples examples = {{"single", fmt::format(example_format, FillPolicy::name), ""}, {"multiple", fmt::format(multiple_example_format, fmt::arg("f", FillPolicy::name)), ""}};
|
||||
FunctionDocumentation::Categories categories = {"UUID"};
|
||||
|
||||
factory.template registerFunction<FunctionGenerateUUIDv7Base<FillPolicy>>({doc_description, doc_syntax, doc_arguments, doc_returned_value, doc_examples, doc_categories}, FunctionFactory::CaseInsensitive);
|
||||
factory.template registerFunction<FunctionGenerateUUIDv7Base<FillPolicy>>({description, syntax, arguments, returned_value, examples, categories}, FunctionFactory::CaseInsensitive);
|
||||
}
|
||||
|
||||
REGISTER_FUNCTION(GenerateUUIDv7)
|
||||
|
@ -52,7 +52,6 @@ std::string toContentEncodingName(CompressionMethod method)
|
||||
case CompressionMethod::None:
|
||||
return "";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
CompressionMethod chooseHTTPCompressionMethod(const std::string & list)
|
||||
|
@ -88,7 +88,6 @@ public:
|
||||
case Status::TOO_LARGE_COMPRESSED_BLOCK:
|
||||
return "TOO_LARGE_COMPRESSED_BLOCK";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
explicit HadoopSnappyReadBuffer(
|
||||
|
@ -117,8 +117,6 @@ size_t AggregatedDataVariants::size() const
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
size_t AggregatedDataVariants::sizeWithoutOverflowRow() const
|
||||
@ -136,8 +134,6 @@ size_t AggregatedDataVariants::sizeWithoutOverflowRow() const
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
const char * AggregatedDataVariants::getMethodName() const
|
||||
@ -155,8 +151,6 @@ const char * AggregatedDataVariants::getMethodName() const
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
bool AggregatedDataVariants::isTwoLevel() const
|
||||
@ -174,8 +168,6 @@ bool AggregatedDataVariants::isTwoLevel() const
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
bool AggregatedDataVariants::isConvertibleToTwoLevel() const
|
||||
|
@ -799,7 +799,6 @@ String FileSegment::stateToString(FileSegment::State state)
|
||||
case FileSegment::State::DETACHED:
|
||||
return "DETACHED";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
bool FileSegment::assertCorrectness() const
|
||||
|
@ -126,6 +126,11 @@ bool astContainsSystemTables(ASTPtr ast, ContextPtr context)
|
||||
namespace
|
||||
{
|
||||
|
||||
bool isQueryCacheRelatedSetting(const String & setting_name)
|
||||
{
|
||||
return setting_name.starts_with("query_cache_") || setting_name.ends_with("_query_cache");
|
||||
}
|
||||
|
||||
class RemoveQueryCacheSettingsMatcher
|
||||
{
|
||||
public:
|
||||
@ -141,7 +146,7 @@ public:
|
||||
|
||||
auto is_query_cache_related_setting = [](const auto & change)
|
||||
{
|
||||
return change.name.starts_with("query_cache_") || change.name.ends_with("_query_cache");
|
||||
return isQueryCacheRelatedSetting(change.name);
|
||||
};
|
||||
|
||||
std::erase_if(set_clause->changes, is_query_cache_related_setting);
|
||||
@ -177,11 +182,11 @@ ASTPtr removeQueryCacheSettings(ASTPtr ast)
|
||||
return transformed_ast;
|
||||
}
|
||||
|
||||
IAST::Hash calculateAstHash(ASTPtr ast, const String & current_database)
|
||||
IAST::Hash calculateAstHash(ASTPtr ast, const String & current_database, const Settings & settings)
|
||||
{
|
||||
ast = removeQueryCacheSettings(ast);
|
||||
|
||||
/// Hash the AST, it must consider aliases (issue #56258)
|
||||
/// Hash the AST, we must consider aliases (issue #56258)
|
||||
SipHash hash;
|
||||
ast->updateTreeHash(hash, /*ignore_aliases=*/ false);
|
||||
|
||||
@ -189,6 +194,25 @@ IAST::Hash calculateAstHash(ASTPtr ast, const String & current_database)
|
||||
/// tables (issue #64136)
|
||||
hash.update(current_database);
|
||||
|
||||
/// Finally, hash the (changed) settings as they might affect the query result (e.g. think of settings `additional_table_filters` and `limit`).
|
||||
/// Note: allChanged() returns the settings in random order. Also, update()-s of the composite hash must be done in deterministic order.
|
||||
/// Therefore, collect and sort the settings first, then hash them.
|
||||
Settings::Range changed_settings = settings.allChanged();
|
||||
std::vector<std::pair<String, String>> changed_settings_sorted; /// (name, value)
|
||||
for (const auto & setting : changed_settings)
|
||||
{
|
||||
const String & name = setting.getName();
|
||||
const String & value = setting.getValueString();
|
||||
if (!isQueryCacheRelatedSetting(name)) /// see removeQueryCacheSettings() why this is a good idea
|
||||
changed_settings_sorted.push_back({name, value});
|
||||
}
|
||||
std::sort(changed_settings_sorted.begin(), changed_settings_sorted.end(), [](auto & lhs, auto & rhs) { return lhs.first < rhs.first; });
|
||||
for (const auto & setting : changed_settings_sorted)
|
||||
{
|
||||
hash.update(setting.first);
|
||||
hash.update(setting.second);
|
||||
}
|
||||
|
||||
return getSipHash128AsPair(hash);
|
||||
}
|
||||
|
||||
@ -204,12 +228,13 @@ String queryStringFromAST(ASTPtr ast)
|
||||
QueryCache::Key::Key(
|
||||
ASTPtr ast_,
|
||||
const String & current_database,
|
||||
const Settings & settings,
|
||||
Block header_,
|
||||
std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_,
|
||||
bool is_shared_,
|
||||
std::chrono::time_point<std::chrono::system_clock> expires_at_,
|
||||
bool is_compressed_)
|
||||
: ast_hash(calculateAstHash(ast_, current_database))
|
||||
: ast_hash(calculateAstHash(ast_, current_database, settings))
|
||||
, header(header_)
|
||||
, user_id(user_id_)
|
||||
, current_user_roles(current_user_roles_)
|
||||
@ -220,8 +245,8 @@ QueryCache::Key::Key(
|
||||
{
|
||||
}
|
||||
|
||||
QueryCache::Key::Key(ASTPtr ast_, const String & current_database, std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_)
|
||||
: QueryCache::Key(ast_, current_database, {}, user_id_, current_user_roles_, false, std::chrono::system_clock::from_time_t(1), false) /// dummy values for everything != AST, current database, user name/roles
|
||||
QueryCache::Key::Key(ASTPtr ast_, const String & current_database, const Settings & settings, std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_)
|
||||
: QueryCache::Key(ast_, current_database, settings, {}, user_id_, current_user_roles_, false, std::chrono::system_clock::from_time_t(1), false) /// dummy values for everything != AST, current database, user name/roles
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,8 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct Settings;
|
||||
|
||||
/// Does AST contain non-deterministic functions like rand() and now()?
|
||||
bool astContainsNonDeterministicFunctions(ASTPtr ast, ContextPtr context);
|
||||
|
||||
@ -89,6 +91,7 @@ public:
|
||||
/// Ctor to construct a Key for writing into query cache.
|
||||
Key(ASTPtr ast_,
|
||||
const String & current_database,
|
||||
const Settings & settings,
|
||||
Block header_,
|
||||
std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_,
|
||||
bool is_shared_,
|
||||
@ -96,7 +99,7 @@ public:
|
||||
bool is_compressed);
|
||||
|
||||
/// Ctor to construct a Key for reading from query cache (this operation only needs the AST + user name).
|
||||
Key(ASTPtr ast_, const String & current_database, std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_);
|
||||
Key(ASTPtr ast_, const String & current_database, const Settings & settings, std::optional<UUID> user_id_, const std::vector<UUID> & current_user_roles_);
|
||||
|
||||
bool operator==(const Key & other) const;
|
||||
};
|
||||
|
@ -309,7 +309,6 @@ ComparisonGraphCompareResult ComparisonGraph<Node>::pathToCompareResult(Path pat
|
||||
case Path::GREATER: return inverse ? ComparisonGraphCompareResult::LESS : ComparisonGraphCompareResult::GREATER;
|
||||
case Path::GREATER_OR_EQUAL: return inverse ? ComparisonGraphCompareResult::LESS_OR_EQUAL : ComparisonGraphCompareResult::GREATER_OR_EQUAL;
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
template <ComparisonGraphNodeType Node>
|
||||
|
@ -26,7 +26,6 @@ static String typeToString(FilesystemCacheLogElement::CacheType type)
|
||||
case FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE:
|
||||
return "WRITE_THROUGH_CACHE";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
ColumnsDescription FilesystemCacheLogElement::getColumnsDescription()
|
||||
|
@ -705,7 +705,6 @@ namespace
|
||||
APPLY_FOR_JOIN_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
}
|
||||
|
||||
@ -2641,8 +2640,6 @@ private:
|
||||
default:
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Unsupported JOIN keys (type: {})", parent.data->type);
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
template <typename Map>
|
||||
|
@ -322,8 +322,6 @@ public:
|
||||
APPLY_FOR_JOIN_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
size_t getTotalByteCountImpl(Type which) const
|
||||
@ -338,8 +336,6 @@ public:
|
||||
APPLY_FOR_JOIN_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
size_t getBufferSizeInCells(Type which) const
|
||||
@ -354,8 +350,6 @@ public:
|
||||
APPLY_FOR_JOIN_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
/// NOLINTEND(bugprone-macro-parentheses)
|
||||
};
|
||||
|
@ -33,7 +33,6 @@ BlockIO InterpreterTransactionControlQuery::execute()
|
||||
case ASTTransactionControl::SET_SNAPSHOT:
|
||||
return executeSetSnapshot(session_context, tcl.snapshot);
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
BlockIO InterpreterTransactionControlQuery::executeBegin(ContextMutablePtr session_context)
|
||||
|
@ -41,8 +41,6 @@ size_t SetVariantsTemplate<Variant>::getTotalRowCount() const
|
||||
APPLY_FOR_SET_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
template <typename Variant>
|
||||
@ -57,8 +55,6 @@ size_t SetVariantsTemplate<Variant>::getTotalByteCount() const
|
||||
APPLY_FOR_SET_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
template <typename Variant>
|
||||
|
@ -1093,6 +1093,15 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
&& (ast->as<ASTSelectQuery>() || ast->as<ASTSelectWithUnionQuery>());
|
||||
QueryCache::Usage query_cache_usage = QueryCache::Usage::None;
|
||||
|
||||
/// If the query runs with "use_query_cache = 1", we first probe if the query cache already contains the query result (if yes:
|
||||
/// return result from cache). If doesn't, we execute the query normally and write the result into the query cache. Both steps use a
|
||||
/// hash of the AST, the current database and the settings as cache key. Unfortunately, the settings are in some places internally
|
||||
/// modified between steps 1 and 2 (= during query execution) - this is silly but hard to forbid. As a result, the hashes no longer
|
||||
/// match and the cache is rendered ineffective. Therefore make a copy of the settings and use it for steps 1 and 2.
|
||||
std::optional<Settings> settings_copy;
|
||||
if (can_use_query_cache)
|
||||
settings_copy = settings;
|
||||
|
||||
if (!async_insert)
|
||||
{
|
||||
/// If it is a non-internal SELECT, and passive (read) use of the query cache is enabled, and the cache knows the query, then set
|
||||
@ -1101,7 +1110,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
{
|
||||
if (can_use_query_cache && settings.enable_reads_from_query_cache)
|
||||
{
|
||||
QueryCache::Key key(ast, context->getCurrentDatabase(), context->getUserID(), context->getCurrentRoles());
|
||||
QueryCache::Key key(ast, context->getCurrentDatabase(), *settings_copy, context->getUserID(), context->getCurrentRoles());
|
||||
QueryCache::Reader reader = query_cache->createReader(key);
|
||||
if (reader.hasCacheEntryForKey())
|
||||
{
|
||||
@ -1224,7 +1233,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
&& (!ast_contains_system_tables || system_table_handling == QueryCacheSystemTableHandling::Save))
|
||||
{
|
||||
QueryCache::Key key(
|
||||
ast, context->getCurrentDatabase(), res.pipeline.getHeader(),
|
||||
ast, context->getCurrentDatabase(), *settings_copy, res.pipeline.getHeader(),
|
||||
context->getUserID(), context->getCurrentRoles(),
|
||||
settings.query_cache_share_between_users,
|
||||
std::chrono::system_clock::now() + std::chrono::seconds(settings.query_cache_ttl),
|
||||
|
@ -40,8 +40,6 @@ public:
|
||||
case TableOverride: return "EXPLAIN TABLE OVERRIDE";
|
||||
case CurrentTransaction: return "EXPLAIN CURRENT TRANSACTION";
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
static ExplainKind fromString(const String & str)
|
||||
|
@ -42,7 +42,7 @@ Token quotedString(const char *& pos, const char * const token_begin, const char
|
||||
continue;
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
chassert(false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -538,8 +538,6 @@ const char * getTokenName(TokenType type)
|
||||
APPLY_FOR_TOKENS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
|
||||
|
@ -657,7 +657,6 @@ DataTypePtr MsgPackSchemaReader::getDataType(const msgpack::object & object)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Msgpack extension type {:x} is not supported", object_ext.type());
|
||||
}
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
std::optional<DataTypes> MsgPackSchemaReader::readRowAndGetDataTypes()
|
||||
|
@ -36,8 +36,6 @@ std::string IProcessor::statusToName(Status status)
|
||||
case Status::ExpandPipeline:
|
||||
return "ExpandPipeline";
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1136,8 +1136,6 @@ static void addMergingFinal(
|
||||
return std::make_shared<GraphiteRollupSortedTransform>(header, num_outputs,
|
||||
sort_description, max_block_size_rows, /*max_block_size_bytes=*/0, merging_params.graphite_params, now);
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
};
|
||||
|
||||
pipe.addTransform(get_merging_processor());
|
||||
@ -2125,8 +2123,6 @@ static const char * indexTypeToString(ReadFromMergeTree::IndexType type)
|
||||
case ReadFromMergeTree::IndexType::Skip:
|
||||
return "Skip";
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
static const char * readTypeToString(ReadFromMergeTree::ReadType type)
|
||||
@ -2142,8 +2138,6 @@ static const char * readTypeToString(ReadFromMergeTree::ReadType type)
|
||||
case ReadFromMergeTree::ReadType::ParallelReplicas:
|
||||
return "Parallel";
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
|
||||
|
@ -86,8 +86,6 @@ static String totalsModeToString(TotalsMode totals_mode, double auto_include_thr
|
||||
case TotalsMode::AFTER_HAVING_AUTO:
|
||||
return "after_having_auto threshold " + std::to_string(auto_include_threshold);
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
void TotalsHavingStep::describeActions(FormatSettings & settings) const
|
||||
|
@ -67,7 +67,6 @@ static FillColumnDescription::StepFunction getStepFunction(
|
||||
FOR_EACH_INTERVAL_KIND(DECLARE_CASE)
|
||||
#undef DECLARE_CASE
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
static bool tryConvertFields(FillColumnDescription & descr, const DataTypePtr & type)
|
||||
|
@ -898,8 +898,6 @@ static std::exception_ptr addStorageToException(std::exception_ptr ptr, const St
|
||||
{
|
||||
return std::current_exception();
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
void FinalizingViewsTransform::work()
|
||||
|
268
src/Server/ServersManager/IServersManager.cpp
Normal file
268
src/Server/ServersManager/IServersManager.cpp
Normal file
@ -0,0 +1,268 @@
|
||||
#include <Server/ServersManager/IServersManager.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Server/waitServersToFinish.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Common/Config/AbstractConfigurationComparison.h>
|
||||
#include <Common/getMultipleKeysFromConfig.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/makeSocketAddress.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NETWORK_ERROR;
|
||||
extern const int INVALID_CONFIG_PARAMETER;
|
||||
}
|
||||
|
||||
IServersManager::IServersManager(ContextMutablePtr global_context_, Poco::Logger * logger_)
|
||||
: global_context(global_context_), logger(logger_)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
bool IServersManager::empty() const
|
||||
{
|
||||
return servers.empty();
|
||||
}
|
||||
|
||||
std::vector<ProtocolServerMetrics> IServersManager::getMetrics() const
|
||||
{
|
||||
std::vector<ProtocolServerMetrics> metrics;
|
||||
metrics.reserve(servers.size());
|
||||
for (const auto & server : servers)
|
||||
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
|
||||
return metrics;
|
||||
}
|
||||
|
||||
void IServersManager::startServers()
|
||||
{
|
||||
for (auto & server : servers)
|
||||
{
|
||||
server.start();
|
||||
LOG_INFO(logger, "Listening for {}", server.getDescription());
|
||||
}
|
||||
}
|
||||
|
||||
void IServersManager::stopServers(const ServerType & server_type)
|
||||
{
|
||||
/// Remove servers once all their connections are closed
|
||||
auto check_server = [&](const char prefix[], auto & server)
|
||||
{
|
||||
if (!server.isStopping())
|
||||
return false;
|
||||
size_t current_connections = server.currentConnections();
|
||||
LOG_DEBUG(
|
||||
logger,
|
||||
"Server {}{}: {} ({} connections)",
|
||||
server.getDescription(),
|
||||
prefix,
|
||||
!current_connections ? "finished" : "waiting",
|
||||
current_connections);
|
||||
return !current_connections;
|
||||
};
|
||||
|
||||
std::erase_if(servers, std::bind_front(check_server, " (from one of previous remove)"));
|
||||
|
||||
for (auto & server : servers)
|
||||
{
|
||||
if (!server.isStopping() && server_type.shouldStop(server.getPortName()))
|
||||
server.stop();
|
||||
}
|
||||
|
||||
std::erase_if(servers, std::bind_front(check_server, ""));
|
||||
}
|
||||
|
||||
void IServersManager::updateServers(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & iserver,
|
||||
std::mutex & servers_lock,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
ConfigurationPtr latest_config)
|
||||
{
|
||||
stopServersForUpdate(config, latest_config);
|
||||
createServers(config, iserver, servers_lock, server_pool, async_metrics, true, ServerType(ServerType::Type::QUERIES_ALL));
|
||||
}
|
||||
|
||||
Poco::Net::SocketAddress IServersManager::socketBindListen(
|
||||
const Poco::Util::AbstractConfiguration & config, Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port) const
|
||||
{
|
||||
auto address = makeSocketAddress(host, port, logger);
|
||||
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ config.getBool("listen_reuse_port", false));
|
||||
/// If caller requests any available port from the OS, discover it after binding.
|
||||
if (port == 0)
|
||||
{
|
||||
address = socket.address();
|
||||
LOG_DEBUG(logger, "Requested any available port (port == 0), actual port is {:d}", address.port());
|
||||
}
|
||||
|
||||
socket.listen(/* backlog = */ config.getUInt("listen_backlog", 4096));
|
||||
return address;
|
||||
}
|
||||
|
||||
void IServersManager::createServer(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & listen_host,
|
||||
const char * port_name,
|
||||
bool start_server,
|
||||
CreateServerFunc && func)
|
||||
{
|
||||
/// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file.
|
||||
if (config.getString(port_name, "").empty())
|
||||
return;
|
||||
|
||||
/// If we already have an active server for this listen_host/port_name, don't create it again
|
||||
for (const auto & server : servers)
|
||||
{
|
||||
if (!server.isStopping() && server.getListenHost() == listen_host && server.getPortName() == port_name)
|
||||
return;
|
||||
}
|
||||
|
||||
auto port = config.getInt(port_name);
|
||||
try
|
||||
{
|
||||
servers.push_back(func(port));
|
||||
if (start_server)
|
||||
{
|
||||
servers.back().start();
|
||||
LOG_INFO(logger, "Listening for {}", servers.back().getDescription());
|
||||
}
|
||||
global_context->registerServerPort(port_name, port);
|
||||
}
|
||||
catch (const Poco::Exception &)
|
||||
{
|
||||
if (!getListenTry(config))
|
||||
{
|
||||
throw Exception(ErrorCodes::NETWORK_ERROR, "Listen [{}]:{} failed: {}", listen_host, port, getCurrentExceptionMessage(false));
|
||||
}
|
||||
LOG_WARNING(
|
||||
logger,
|
||||
"Listen [{}]:{} failed: {}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, "
|
||||
"then consider to "
|
||||
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
|
||||
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
|
||||
" Example for disabled IPv4: <listen_host>::</listen_host>",
|
||||
listen_host,
|
||||
port,
|
||||
getCurrentExceptionMessage(false));
|
||||
}
|
||||
}
|
||||
|
||||
void IServersManager::stopServersForUpdate(const Poco::Util::AbstractConfiguration & config, ConfigurationPtr latest_config)
|
||||
{
|
||||
/// Remove servers once all their connections are closed
|
||||
auto check_server = [&](const char prefix[], auto & server)
|
||||
{
|
||||
if (!server.isStopping())
|
||||
return false;
|
||||
size_t current_connections = server.currentConnections();
|
||||
LOG_DEBUG(
|
||||
logger,
|
||||
"Server {}{}: {} ({} connections)",
|
||||
server.getDescription(),
|
||||
prefix,
|
||||
!current_connections ? "finished" : "waiting",
|
||||
current_connections);
|
||||
return !current_connections;
|
||||
};
|
||||
|
||||
std::erase_if(servers, std::bind_front(check_server, " (from one of previous reload)"));
|
||||
|
||||
const auto listen_hosts = getListenHosts(config);
|
||||
const Poco::Util::AbstractConfiguration & previous_config = latest_config ? *latest_config : config;
|
||||
|
||||
for (auto & server : servers)
|
||||
{
|
||||
if (server.isStopping())
|
||||
return;
|
||||
std::string port_name = server.getPortName();
|
||||
bool has_host = false;
|
||||
bool is_http = false;
|
||||
if (port_name.starts_with("protocols."))
|
||||
{
|
||||
std::string protocol = port_name.substr(0, port_name.find_last_of('.'));
|
||||
has_host = config.has(protocol + ".host");
|
||||
|
||||
std::string conf_name = protocol;
|
||||
std::string prefix = protocol + ".";
|
||||
std::unordered_set<std::string> pset{conf_name};
|
||||
while (true)
|
||||
{
|
||||
if (config.has(prefix + "type"))
|
||||
{
|
||||
std::string type = config.getString(prefix + "type");
|
||||
if (type == "http")
|
||||
{
|
||||
is_http = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!config.has(prefix + "impl"))
|
||||
break;
|
||||
|
||||
conf_name = "protocols." + config.getString(prefix + "impl");
|
||||
prefix = conf_name + ".";
|
||||
|
||||
if (!pset.insert(conf_name).second)
|
||||
throw Exception(
|
||||
ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' configuration contains a loop on '{}'", protocol, conf_name);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// NOTE: better to compare using getPortName() over using
|
||||
/// dynamic_cast<> since HTTPServer is also used for prometheus and
|
||||
/// internal replication communications.
|
||||
is_http = server.getPortName() == "http_port" || server.getPortName() == "https_port";
|
||||
}
|
||||
|
||||
if (!has_host)
|
||||
has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server.getListenHost()) != listen_hosts.end();
|
||||
bool has_port = !config.getString(port_name, "").empty();
|
||||
bool force_restart = is_http && !isSameConfiguration(previous_config, config, "http_handlers");
|
||||
if (force_restart)
|
||||
LOG_TRACE(logger, "<http_handlers> had been changed, will reload {}", server.getDescription());
|
||||
|
||||
if (!has_host || !has_port || config.getInt(server.getPortName()) != server.portNumber() || force_restart)
|
||||
{
|
||||
server.stop();
|
||||
LOG_INFO(logger, "Stopped listening for {}", server.getDescription());
|
||||
}
|
||||
}
|
||||
|
||||
std::erase_if(servers, std::bind_front(check_server, ""));
|
||||
}
|
||||
|
||||
Strings IServersManager::getListenHosts(const Poco::Util::AbstractConfiguration & config) const
|
||||
{
|
||||
auto listen_hosts = DB::getMultipleValuesFromConfig(config, "", "listen_host");
|
||||
if (listen_hosts.empty())
|
||||
{
|
||||
listen_hosts.emplace_back("::1");
|
||||
listen_hosts.emplace_back("127.0.0.1");
|
||||
}
|
||||
return listen_hosts;
|
||||
}
|
||||
|
||||
bool IServersManager::getListenTry(const Poco::Util::AbstractConfiguration & config) const
|
||||
{
|
||||
bool listen_try = config.getBool("listen_try", false);
|
||||
if (!listen_try)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys protocols;
|
||||
config.keys("protocols", protocols);
|
||||
listen_try = DB::getMultipleValuesFromConfig(config, "", "listen_host").empty()
|
||||
&& std::none_of(
|
||||
protocols.begin(),
|
||||
protocols.end(),
|
||||
[&](const auto & protocol)
|
||||
{ return config.has("protocols." + protocol + ".host") && config.has("protocols." + protocol + ".port"); });
|
||||
}
|
||||
return listen_try;
|
||||
}
|
||||
|
||||
}
|
74
src/Server/ServersManager/IServersManager.h
Normal file
74
src/Server/ServersManager/IServersManager.h
Normal file
@ -0,0 +1,74 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <Core/ServerSettings.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Server/IServer.h>
|
||||
#include <Server/ProtocolServerAdapter.h>
|
||||
#include <Server/ServerType.h>
|
||||
#include <Poco/Logger.h>
|
||||
#include <Poco/Net/ServerSocket.h>
|
||||
#include <Poco/ThreadPool.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Common/AsynchronousMetrics.h>
|
||||
#include <Common/Config/ConfigProcessor.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IServersManager
|
||||
{
|
||||
public:
|
||||
IServersManager(ContextMutablePtr global_context_, Poco::Logger * logger_);
|
||||
virtual ~IServersManager() = default;
|
||||
|
||||
bool empty() const;
|
||||
std::vector<ProtocolServerMetrics> getMetrics() const;
|
||||
|
||||
virtual void createServers(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & server,
|
||||
std::mutex & servers_lock,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
bool start_servers,
|
||||
const ServerType & server_type)
|
||||
= 0;
|
||||
|
||||
void startServers();
|
||||
|
||||
void stopServers(const ServerType & server_type);
|
||||
virtual size_t stopServers(const ServerSettings & server_settings, std::mutex & servers_lock) = 0;
|
||||
|
||||
virtual void updateServers(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & server,
|
||||
std::mutex & servers_lock,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
ConfigurationPtr latest_config);
|
||||
|
||||
protected:
|
||||
ContextMutablePtr global_context;
|
||||
Poco::Logger * logger;
|
||||
|
||||
std::vector<ProtocolServerAdapter> servers;
|
||||
|
||||
Poco::Net::SocketAddress socketBindListen(
|
||||
const Poco::Util::AbstractConfiguration & config, Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port) const;
|
||||
|
||||
using CreateServerFunc = std::function<ProtocolServerAdapter(UInt16)>;
|
||||
void createServer(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & listen_host,
|
||||
const char * port_name,
|
||||
bool start_server,
|
||||
CreateServerFunc && func);
|
||||
|
||||
void stopServersForUpdate(const Poco::Util::AbstractConfiguration & config, ConfigurationPtr latest_config);
|
||||
|
||||
Strings getListenHosts(const Poco::Util::AbstractConfiguration & config) const;
|
||||
bool getListenTry(const Poco::Util::AbstractConfiguration & config) const;
|
||||
};
|
||||
|
||||
}
|
327
src/Server/ServersManager/InterServersManager.cpp
Normal file
327
src/Server/ServersManager/InterServersManager.cpp
Normal file
@ -0,0 +1,327 @@
|
||||
#include <Server/ServersManager/InterServersManager.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Server/HTTP/HTTPServer.h>
|
||||
#include <Server/HTTPHandlerFactory.h>
|
||||
#include <Server/KeeperReadinessHandler.h>
|
||||
#include <Server/waitServersToFinish.h>
|
||||
#include <Poco/Net/HTTPServerParams.h>
|
||||
#include <Common/Config/AbstractConfigurationComparison.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/getMultipleKeysFromConfig.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
#if USE_SSL
|
||||
# include <Poco/Net/SecureServerSocket.h>
|
||||
#endif
|
||||
|
||||
#if USE_NURAFT
|
||||
# include <Coordination/FourLetterCommand.h>
|
||||
# include <Server/KeeperTCPHandlerFactory.h>
|
||||
#endif
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event InterfaceInterserverSendBytes;
|
||||
extern const Event InterfaceInterserverReceiveBytes;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int SUPPORT_IS_DISABLED;
|
||||
}
|
||||
|
||||
void InterServersManager::createServers(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & server,
|
||||
std::mutex & servers_lock,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
bool start_servers,
|
||||
const ServerType & server_type)
|
||||
{
|
||||
if (config.has("keeper_server.server_id"))
|
||||
{
|
||||
#if USE_NURAFT
|
||||
//// If we don't have configured connection probably someone trying to use clickhouse-server instead
|
||||
//// of clickhouse-keeper, so start synchronously.
|
||||
bool can_initialize_keeper_async = false;
|
||||
|
||||
if (zkutil::hasZooKeeperConfig(config)) /// We have configured connection to some zookeeper cluster
|
||||
{
|
||||
/// If we cannot connect to some other node from our cluster then we have to wait our Keeper start
|
||||
/// synchronously.
|
||||
can_initialize_keeper_async = global_context->tryCheckClientConnectionToMyKeeperCluster();
|
||||
}
|
||||
/// Initialize keeper RAFT.
|
||||
global_context->initializeKeeperDispatcher(can_initialize_keeper_async);
|
||||
FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher());
|
||||
|
||||
auto config_getter = [this]() -> const Poco::Util::AbstractConfiguration & { return global_context->getConfigRef(); };
|
||||
|
||||
for (const auto & listen_host : getListenHosts(config))
|
||||
{
|
||||
/// TCP Keeper
|
||||
constexpr auto port_name = "keeper_server.tcp_port";
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
port_name,
|
||||
/* start_server = */ false,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, listen_host, port);
|
||||
socket.setReceiveTimeout(
|
||||
Poco::Timespan(config.getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0));
|
||||
socket.setSendTimeout(
|
||||
Poco::Timespan(config.getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
port_name,
|
||||
"Keeper (tcp): " + address.toString(),
|
||||
std::make_unique<TCPServer>(
|
||||
new KeeperTCPHandlerFactory(
|
||||
config_getter,
|
||||
global_context->getKeeperDispatcher(),
|
||||
global_context->getSettingsRef().receive_timeout.totalSeconds(),
|
||||
global_context->getSettingsRef().send_timeout.totalSeconds(),
|
||||
false),
|
||||
server_pool,
|
||||
socket));
|
||||
});
|
||||
|
||||
constexpr auto secure_port_name = "keeper_server.tcp_port_secure";
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
secure_port_name,
|
||||
/* start_server = */ false,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
# if USE_SSL
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, listen_host, port);
|
||||
socket.setReceiveTimeout(
|
||||
Poco::Timespan(config.getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0));
|
||||
socket.setSendTimeout(
|
||||
Poco::Timespan(config.getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
secure_port_name,
|
||||
"Keeper with secure protocol (tcp_secure): " + address.toString(),
|
||||
std::make_unique<TCPServer>(
|
||||
new KeeperTCPHandlerFactory(
|
||||
config_getter,
|
||||
global_context->getKeeperDispatcher(),
|
||||
global_context->getSettingsRef().receive_timeout.totalSeconds(),
|
||||
global_context->getSettingsRef().send_timeout.totalSeconds(),
|
||||
true),
|
||||
server_pool,
|
||||
socket));
|
||||
# else
|
||||
UNUSED(port);
|
||||
throw Exception(
|
||||
ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
|
||||
# endif
|
||||
});
|
||||
|
||||
/// HTTP control endpoints
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
/* port_name = */ "keeper_server.http_control.port",
|
||||
/* start_server = */ false,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
auto http_context = std::make_shared<HTTPContext>(global_context);
|
||||
Poco::Timespan keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0);
|
||||
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
|
||||
http_params->setTimeout(http_context->getReceiveTimeout());
|
||||
http_params->setKeepAliveTimeout(keep_alive_timeout);
|
||||
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, listen_host, port);
|
||||
socket.setReceiveTimeout(http_context->getReceiveTimeout());
|
||||
socket.setSendTimeout(http_context->getSendTimeout());
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
port_name,
|
||||
"HTTP Control: http://" + address.toString(),
|
||||
std::make_unique<HTTPServer>(
|
||||
std::move(http_context),
|
||||
createKeeperHTTPControlMainHandlerFactory(
|
||||
config_getter(), global_context->getKeeperDispatcher(), "KeeperHTTPControlHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params));
|
||||
});
|
||||
}
|
||||
#else
|
||||
throw Exception(
|
||||
ErrorCodes::SUPPORT_IS_DISABLED, "ClickHouse server built without NuRaft library. Cannot use internal coordination.");
|
||||
#endif
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(servers_lock);
|
||||
/// We should start interserver communications before (and more important shutdown after) tables.
|
||||
/// Because server can wait for a long-running queries (for example in tcp_handler) after interserver handler was already shut down.
|
||||
/// In this case we will have replicated tables which are unable to send any parts to other replicas, but still can
|
||||
/// communicate with zookeeper, execute merges, etc.
|
||||
createInterserverServers(config, server, server_pool, async_metrics, start_servers, server_type);
|
||||
startServers();
|
||||
}
|
||||
}
|
||||
|
||||
size_t InterServersManager::stopServers(const ServerSettings & server_settings, std::mutex & servers_lock)
|
||||
{
|
||||
if (servers.empty())
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
LOG_DEBUG(logger, "Waiting for current connections to servers for tables to finish.");
|
||||
|
||||
size_t current_connections = 0;
|
||||
{
|
||||
std::lock_guard lock(servers_lock);
|
||||
for (auto & server : servers)
|
||||
{
|
||||
server.stop();
|
||||
current_connections += server.currentConnections();
|
||||
}
|
||||
}
|
||||
|
||||
if (current_connections)
|
||||
LOG_INFO(logger, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
|
||||
else
|
||||
LOG_INFO(logger, "Closed all listening sockets.");
|
||||
|
||||
if (current_connections > 0)
|
||||
current_connections = waitServersToFinish(servers, servers_lock, server_settings.shutdown_wait_unfinished);
|
||||
|
||||
if (current_connections)
|
||||
LOG_INFO(
|
||||
logger,
|
||||
"Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections "
|
||||
"after context shutdown.",
|
||||
current_connections);
|
||||
else
|
||||
LOG_INFO(logger, "Closed connections to servers for tables.");
|
||||
return current_connections;
|
||||
}
|
||||
|
||||
void InterServersManager::updateServers(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & iserver,
|
||||
std::mutex & /*servers_lock*/,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
ConfigurationPtr latest_config)
|
||||
{
|
||||
stopServersForUpdate(config, latest_config);
|
||||
createInterserverServers(config, iserver, server_pool, async_metrics, true, ServerType(ServerType::Type::QUERIES_ALL));
|
||||
}
|
||||
|
||||
Strings InterServersManager::getInterserverListenHosts(const Poco::Util::AbstractConfiguration & config) const
|
||||
{
|
||||
auto interserver_listen_hosts = DB::getMultipleValuesFromConfig(config, "", "interserver_listen_host");
|
||||
if (!interserver_listen_hosts.empty())
|
||||
return interserver_listen_hosts;
|
||||
|
||||
/// Use more general restriction in case of emptiness
|
||||
return getListenHosts(config);
|
||||
}
|
||||
|
||||
void InterServersManager::createInterserverServers(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & server,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
bool start_servers,
|
||||
const ServerType & server_type)
|
||||
{
|
||||
const Settings & settings = global_context->getSettingsRef();
|
||||
|
||||
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
|
||||
http_params->setTimeout(settings.http_receive_timeout);
|
||||
http_params->setKeepAliveTimeout(global_context->getServerSettings().keep_alive_timeout);
|
||||
|
||||
/// Now iterate over interserver_listen_hosts
|
||||
for (const auto & interserver_listen_host : getInterserverListenHosts(config))
|
||||
{
|
||||
if (server_type.shouldStart(ServerType::Type::INTERSERVER_HTTP))
|
||||
{
|
||||
/// Interserver IO HTTP
|
||||
constexpr auto port_name = "interserver_http_port";
|
||||
createServer(
|
||||
config,
|
||||
interserver_listen_host,
|
||||
port_name,
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, interserver_listen_host, port);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
return ProtocolServerAdapter(
|
||||
interserver_listen_host,
|
||||
port_name,
|
||||
"replica communication (interserver): http://" + address.toString(),
|
||||
std::make_unique<HTTPServer>(
|
||||
std::make_shared<HTTPContext>(global_context),
|
||||
createHandlerFactory(server, config, async_metrics, "InterserverIOHTTPHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params,
|
||||
ProfileEvents::InterfaceInterserverReceiveBytes,
|
||||
ProfileEvents::InterfaceInterserverSendBytes));
|
||||
});
|
||||
}
|
||||
|
||||
if (server_type.shouldStart(ServerType::Type::INTERSERVER_HTTPS))
|
||||
{
|
||||
constexpr auto port_name = "interserver_https_port";
|
||||
createServer(
|
||||
config,
|
||||
interserver_listen_host,
|
||||
port_name,
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
#if USE_SSL
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, interserver_listen_host, port);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
return ProtocolServerAdapter(
|
||||
interserver_listen_host,
|
||||
port_name,
|
||||
"secure replica communication (interserver): https://" + address.toString(),
|
||||
std::make_unique<HTTPServer>(
|
||||
std::make_shared<HTTPContext>(global_context),
|
||||
createHandlerFactory(server, config, async_metrics, "InterserverIOHTTPSHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params,
|
||||
ProfileEvents::InterfaceInterserverReceiveBytes,
|
||||
ProfileEvents::InterfaceInterserverSendBytes));
|
||||
#else
|
||||
UNUSED(port);
|
||||
throw Exception(
|
||||
ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
|
||||
#endif
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
44
src/Server/ServersManager/InterServersManager.h
Normal file
44
src/Server/ServersManager/InterServersManager.h
Normal file
@ -0,0 +1,44 @@
|
||||
#pragma once
|
||||
|
||||
#include <Server/ServersManager/IServersManager.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class InterServersManager : public IServersManager
|
||||
{
|
||||
public:
|
||||
using IServersManager::IServersManager;
|
||||
|
||||
void createServers(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & server,
|
||||
std::mutex & servers_lock,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
bool start_servers,
|
||||
const ServerType & server_type) override;
|
||||
|
||||
size_t stopServers(const ServerSettings & server_settings, std::mutex & servers_lock) override;
|
||||
|
||||
void updateServers(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & iserver,
|
||||
std::mutex & servers_lock,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
ConfigurationPtr latest_config) override;
|
||||
|
||||
private:
|
||||
Strings getInterserverListenHosts(const Poco::Util::AbstractConfiguration & config) const;
|
||||
|
||||
void createInterserverServers(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & server,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
bool start_servers,
|
||||
const ServerType & server_type);
|
||||
};
|
||||
|
||||
}
|
523
src/Server/ServersManager/ProtocolServersManager.cpp
Normal file
523
src/Server/ServersManager/ProtocolServersManager.cpp
Normal file
@ -0,0 +1,523 @@
|
||||
#include <Server/ServersManager/ProtocolServersManager.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/ProcessList.h>
|
||||
#include <Server/HTTP/HTTPServer.h>
|
||||
#include <Server/HTTP/HTTPServerConnectionFactory.h>
|
||||
#include <Server/HTTPHandlerFactory.h>
|
||||
#include <Server/MySQLHandlerFactory.h>
|
||||
#include <Server/PostgreSQLHandlerFactory.h>
|
||||
#include <Server/ProxyV1HandlerFactory.h>
|
||||
#include <Server/TCPHandlerFactory.h>
|
||||
#include <Server/TLSHandlerFactory.h>
|
||||
#include <Server/waitServersToFinish.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/getMultipleKeysFromConfig.h>
|
||||
#include <Common/makeSocketAddress.h>
|
||||
|
||||
#if USE_SSL
|
||||
# include <Poco/Net/SecureServerSocket.h>
|
||||
#endif
|
||||
|
||||
#if USE_GRPC
|
||||
# include <Server/GRPCServer.h>
|
||||
#endif
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event InterfaceNativeSendBytes;
|
||||
extern const Event InterfaceNativeReceiveBytes;
|
||||
extern const Event InterfaceHTTPSendBytes;
|
||||
extern const Event InterfaceHTTPReceiveBytes;
|
||||
extern const Event InterfacePrometheusSendBytes;
|
||||
extern const Event InterfacePrometheusReceiveBytes;
|
||||
extern const Event InterfaceMySQLSendBytes;
|
||||
extern const Event InterfaceMySQLReceiveBytes;
|
||||
extern const Event InterfacePostgreSQLSendBytes;
|
||||
extern const Event InterfacePostgreSQLReceiveBytes;
|
||||
extern const Event InterfaceInterserverSendBytes;
|
||||
extern const Event InterfaceInterserverReceiveBytes;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int SUPPORT_IS_DISABLED;
|
||||
extern const int INVALID_CONFIG_PARAMETER;
|
||||
}
|
||||
|
||||
void ProtocolServersManager::createServers(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & server,
|
||||
std::mutex & /*servers_lock*/,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
bool start_servers,
|
||||
const ServerType & server_type)
|
||||
{
|
||||
auto listen_hosts = getListenHosts(config);
|
||||
const Settings & settings = global_context->getSettingsRef();
|
||||
|
||||
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
|
||||
http_params->setTimeout(settings.http_receive_timeout);
|
||||
http_params->setKeepAliveTimeout(global_context->getServerSettings().keep_alive_timeout);
|
||||
|
||||
Poco::Util::AbstractConfiguration::Keys protocols;
|
||||
config.keys("protocols", protocols);
|
||||
|
||||
for (const auto & protocol : protocols)
|
||||
{
|
||||
if (!server_type.shouldStart(ServerType::Type::CUSTOM, protocol))
|
||||
continue;
|
||||
|
||||
std::string prefix = "protocols." + protocol + ".";
|
||||
std::string port_name = prefix + "port";
|
||||
std::string description{"<undefined> protocol"};
|
||||
if (config.has(prefix + "description"))
|
||||
description = config.getString(prefix + "description");
|
||||
|
||||
if (!config.has(prefix + "port"))
|
||||
continue;
|
||||
|
||||
std::vector<std::string> hosts;
|
||||
if (config.has(prefix + "host"))
|
||||
hosts.push_back(config.getString(prefix + "host"));
|
||||
else
|
||||
hosts = listen_hosts;
|
||||
|
||||
for (const auto & host : hosts)
|
||||
{
|
||||
bool is_secure = false;
|
||||
auto stack = buildProtocolStackFromConfig(config, server, protocol, http_params, async_metrics, is_secure);
|
||||
|
||||
if (stack->empty())
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' stack empty", protocol);
|
||||
|
||||
createServer(
|
||||
config,
|
||||
host,
|
||||
port_name.c_str(),
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, host, port);
|
||||
socket.setReceiveTimeout(settings.receive_timeout);
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
return ProtocolServerAdapter(
|
||||
host,
|
||||
port_name.c_str(),
|
||||
description + ": " + address.toString(),
|
||||
std::make_unique<TCPServer>(stack.release(), server_pool, socket, new Poco::Net::TCPServerParams));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & listen_host : listen_hosts)
|
||||
{
|
||||
if (server_type.shouldStart(ServerType::Type::HTTP))
|
||||
{
|
||||
/// HTTP
|
||||
constexpr auto port_name = "http_port";
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
port_name,
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, listen_host, port);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
port_name,
|
||||
"http://" + address.toString(),
|
||||
std::make_unique<HTTPServer>(
|
||||
std::make_shared<HTTPContext>(global_context),
|
||||
createHandlerFactory(server, config, async_metrics, "HTTPHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params,
|
||||
ProfileEvents::InterfaceHTTPReceiveBytes,
|
||||
ProfileEvents::InterfaceHTTPSendBytes));
|
||||
});
|
||||
}
|
||||
|
||||
if (server_type.shouldStart(ServerType::Type::HTTPS))
|
||||
{
|
||||
/// HTTPS
|
||||
constexpr auto port_name = "https_port";
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
port_name,
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
#if USE_SSL
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, listen_host, port);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
port_name,
|
||||
"https://" + address.toString(),
|
||||
std::make_unique<HTTPServer>(
|
||||
std::make_shared<HTTPContext>(global_context),
|
||||
createHandlerFactory(server, config, async_metrics, "HTTPSHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params,
|
||||
ProfileEvents::InterfaceHTTPReceiveBytes,
|
||||
ProfileEvents::InterfaceHTTPSendBytes));
|
||||
#else
|
||||
UNUSED(port);
|
||||
throw Exception(
|
||||
ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"HTTPS protocol is disabled because Poco library was built without NetSSL support.");
|
||||
#endif
|
||||
});
|
||||
}
|
||||
|
||||
if (server_type.shouldStart(ServerType::Type::TCP))
|
||||
{
|
||||
/// TCP
|
||||
constexpr auto port_name = "tcp_port";
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
port_name,
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, listen_host, port);
|
||||
socket.setReceiveTimeout(settings.receive_timeout);
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
port_name,
|
||||
"native protocol (tcp): " + address.toString(),
|
||||
std::make_unique<TCPServer>(
|
||||
new TCPHandlerFactory(
|
||||
server, false, false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
|
||||
server_pool,
|
||||
socket,
|
||||
new Poco::Net::TCPServerParams));
|
||||
});
|
||||
}
|
||||
|
||||
if (server_type.shouldStart(ServerType::Type::TCP_WITH_PROXY))
|
||||
{
|
||||
/// TCP with PROXY protocol, see https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt
|
||||
constexpr auto port_name = "tcp_with_proxy_port";
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
port_name,
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, listen_host, port);
|
||||
socket.setReceiveTimeout(settings.receive_timeout);
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
port_name,
|
||||
"native protocol (tcp) with PROXY: " + address.toString(),
|
||||
std::make_unique<TCPServer>(
|
||||
new TCPHandlerFactory(
|
||||
server, false, true, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
|
||||
server_pool,
|
||||
socket,
|
||||
new Poco::Net::TCPServerParams));
|
||||
});
|
||||
}
|
||||
|
||||
if (server_type.shouldStart(ServerType::Type::TCP_SECURE))
|
||||
{
|
||||
/// TCP with SSL
|
||||
constexpr auto port_name = "tcp_port_secure";
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
port_name,
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
#if USE_SSL
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, listen_host, port);
|
||||
socket.setReceiveTimeout(settings.receive_timeout);
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
port_name,
|
||||
"secure native protocol (tcp_secure): " + address.toString(),
|
||||
std::make_unique<TCPServer>(
|
||||
new TCPHandlerFactory(
|
||||
server, true, false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
|
||||
server_pool,
|
||||
socket,
|
||||
new Poco::Net::TCPServerParams));
|
||||
#else
|
||||
UNUSED(port);
|
||||
throw Exception(
|
||||
ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
|
||||
#endif
|
||||
});
|
||||
}
|
||||
|
||||
if (server_type.shouldStart(ServerType::Type::MYSQL))
|
||||
{
|
||||
constexpr auto port_name = "mysql_port";
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
port_name,
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, listen_host, port);
|
||||
socket.setReceiveTimeout(Poco::Timespan());
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
port_name,
|
||||
"MySQL compatibility protocol: " + address.toString(),
|
||||
std::make_unique<TCPServer>(
|
||||
new MySQLHandlerFactory(
|
||||
server, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes),
|
||||
server_pool,
|
||||
socket,
|
||||
new Poco::Net::TCPServerParams));
|
||||
});
|
||||
}
|
||||
|
||||
if (server_type.shouldStart(ServerType::Type::POSTGRESQL))
|
||||
{
|
||||
constexpr auto port_name = "postgresql_port";
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
port_name,
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, listen_host, port);
|
||||
socket.setReceiveTimeout(Poco::Timespan());
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
port_name,
|
||||
"PostgreSQL compatibility protocol: " + address.toString(),
|
||||
std::make_unique<TCPServer>(
|
||||
new PostgreSQLHandlerFactory(
|
||||
server, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes),
|
||||
server_pool,
|
||||
socket,
|
||||
new Poco::Net::TCPServerParams));
|
||||
});
|
||||
}
|
||||
|
||||
#if USE_GRPC
|
||||
if (server_type.shouldStart(ServerType::Type::GRPC))
|
||||
{
|
||||
constexpr auto port_name = "grpc_port";
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
port_name,
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
Poco::Net::SocketAddress server_address(listen_host, port);
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
port_name,
|
||||
"gRPC protocol: " + server_address.toString(),
|
||||
std::make_unique<GRPCServer>(server, makeSocketAddress(listen_host, port, logger)));
|
||||
});
|
||||
}
|
||||
#endif
|
||||
if (server_type.shouldStart(ServerType::Type::PROMETHEUS))
|
||||
{
|
||||
/// Prometheus (if defined and not setup yet with http_port)
|
||||
constexpr auto port_name = "prometheus.port";
|
||||
createServer(
|
||||
config,
|
||||
listen_host,
|
||||
port_name,
|
||||
start_servers,
|
||||
[&](UInt16 port) -> ProtocolServerAdapter
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(config, socket, listen_host, port);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
return ProtocolServerAdapter(
|
||||
listen_host,
|
||||
port_name,
|
||||
"Prometheus: http://" + address.toString(),
|
||||
std::make_unique<HTTPServer>(
|
||||
std::make_shared<HTTPContext>(global_context),
|
||||
createHandlerFactory(server, config, async_metrics, "PrometheusHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params,
|
||||
ProfileEvents::InterfacePrometheusReceiveBytes,
|
||||
ProfileEvents::InterfacePrometheusSendBytes));
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t ProtocolServersManager::stopServers(const ServerSettings & server_settings, std::mutex & servers_lock)
|
||||
{
|
||||
if (servers.empty())
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
LOG_DEBUG(logger, "Waiting for current connections to close.");
|
||||
|
||||
size_t current_connections = 0;
|
||||
{
|
||||
std::lock_guard lock(servers_lock);
|
||||
for (auto & server : servers)
|
||||
{
|
||||
server.stop();
|
||||
current_connections += server.currentConnections();
|
||||
}
|
||||
}
|
||||
|
||||
if (current_connections)
|
||||
LOG_WARNING(logger, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
|
||||
else
|
||||
LOG_INFO(logger, "Closed all listening sockets.");
|
||||
|
||||
/// Wait for unfinished backups and restores.
|
||||
/// This must be done after closing listening sockets (no more backups/restores) but before ProcessList::killAllQueries
|
||||
/// (because killAllQueries() will cancel all running backups/restores).
|
||||
if (server_settings.shutdown_wait_backups_and_restores)
|
||||
global_context->waitAllBackupsAndRestores();
|
||||
/// Killing remaining queries.
|
||||
if (!server_settings.shutdown_wait_unfinished_queries)
|
||||
global_context->getProcessList().killAllQueries();
|
||||
|
||||
if (current_connections)
|
||||
current_connections = waitServersToFinish(servers, servers_lock, server_settings.shutdown_wait_unfinished);
|
||||
|
||||
if (current_connections)
|
||||
LOG_WARNING(
|
||||
logger,
|
||||
"Closed connections. But {} remain."
|
||||
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>",
|
||||
current_connections);
|
||||
else
|
||||
LOG_INFO(logger, "Closed connections.");
|
||||
return current_connections;
|
||||
}
|
||||
|
||||
std::unique_ptr<TCPProtocolStackFactory> ProtocolServersManager::buildProtocolStackFromConfig(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & server,
|
||||
const std::string & protocol,
|
||||
Poco::Net::HTTPServerParams::Ptr http_params,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
bool & is_secure) const
|
||||
{
|
||||
auto create_factory = [&](const std::string & type, const std::string & conf_name) -> TCPServerConnectionFactory::Ptr
|
||||
{
|
||||
if (type == "tcp")
|
||||
return TCPServerConnectionFactory::Ptr(new TCPHandlerFactory(
|
||||
server, false, false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes));
|
||||
|
||||
if (type == "tls")
|
||||
#if USE_SSL
|
||||
return TCPServerConnectionFactory::Ptr(new TLSHandlerFactory(server, conf_name));
|
||||
#else
|
||||
throw Exception(
|
||||
ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
|
||||
#endif
|
||||
|
||||
if (type == "proxy1")
|
||||
return TCPServerConnectionFactory::Ptr(new ProxyV1HandlerFactory(server, conf_name));
|
||||
if (type == "mysql")
|
||||
return TCPServerConnectionFactory::Ptr(
|
||||
new MySQLHandlerFactory(server, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes));
|
||||
if (type == "postgres")
|
||||
return TCPServerConnectionFactory::Ptr(new PostgreSQLHandlerFactory(
|
||||
server, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes));
|
||||
if (type == "http")
|
||||
return TCPServerConnectionFactory::Ptr(new HTTPServerConnectionFactory(
|
||||
std::make_shared<HTTPContext>(global_context),
|
||||
http_params,
|
||||
createHandlerFactory(server, config, async_metrics, "HTTPHandler-factory"),
|
||||
ProfileEvents::InterfaceHTTPReceiveBytes,
|
||||
ProfileEvents::InterfaceHTTPSendBytes));
|
||||
if (type == "prometheus")
|
||||
return TCPServerConnectionFactory::Ptr(new HTTPServerConnectionFactory(
|
||||
std::make_shared<HTTPContext>(global_context),
|
||||
http_params,
|
||||
createHandlerFactory(server, config, async_metrics, "PrometheusHandler-factory"),
|
||||
ProfileEvents::InterfacePrometheusReceiveBytes,
|
||||
ProfileEvents::InterfacePrometheusSendBytes));
|
||||
if (type == "interserver")
|
||||
return TCPServerConnectionFactory::Ptr(new HTTPServerConnectionFactory(
|
||||
std::make_shared<HTTPContext>(global_context),
|
||||
http_params,
|
||||
createHandlerFactory(server, config, async_metrics, "InterserverIOHTTPHandler-factory"),
|
||||
ProfileEvents::InterfaceInterserverReceiveBytes,
|
||||
ProfileEvents::InterfaceInterserverSendBytes));
|
||||
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol configuration error, unknown protocol name '{}'", type);
|
||||
};
|
||||
|
||||
std::string conf_name = "protocols." + protocol;
|
||||
std::string prefix = conf_name + ".";
|
||||
std::unordered_set<std::string> pset{conf_name};
|
||||
|
||||
auto stack = std::make_unique<TCPProtocolStackFactory>(server, conf_name);
|
||||
|
||||
while (true)
|
||||
{
|
||||
// if there is no "type" - it's a reference to another protocol and this is just an endpoint
|
||||
if (config.has(prefix + "type"))
|
||||
{
|
||||
std::string type = config.getString(prefix + "type");
|
||||
if (type == "tls")
|
||||
{
|
||||
if (is_secure)
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' contains more than one TLS layer", protocol);
|
||||
is_secure = true;
|
||||
}
|
||||
|
||||
stack->append(create_factory(type, conf_name));
|
||||
}
|
||||
|
||||
if (!config.has(prefix + "impl"))
|
||||
break;
|
||||
|
||||
conf_name = "protocols." + config.getString(prefix + "impl");
|
||||
prefix = conf_name + ".";
|
||||
|
||||
if (!pset.insert(conf_name).second)
|
||||
throw Exception(
|
||||
ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' configuration contains a loop on '{}'", protocol, conf_name);
|
||||
}
|
||||
|
||||
return stack;
|
||||
}
|
||||
|
||||
}
|
37
src/Server/ServersManager/ProtocolServersManager.h
Normal file
37
src/Server/ServersManager/ProtocolServersManager.h
Normal file
@ -0,0 +1,37 @@
|
||||
#pragma once
|
||||
|
||||
#include <Server/ServersManager/IServersManager.h>
|
||||
#include <Server/TCPProtocolStackFactory.h>
|
||||
#include <Poco/Net/HTTPServerParams.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ProtocolServersManager : public IServersManager
|
||||
{
|
||||
public:
|
||||
using IServersManager::IServersManager;
|
||||
|
||||
void createServers(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & server,
|
||||
std::mutex & servers_lock,
|
||||
Poco::ThreadPool & server_pool,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
bool start_servers,
|
||||
const ServerType & server_type) override;
|
||||
|
||||
using IServersManager::stopServers;
|
||||
size_t stopServers(const ServerSettings & server_settings, std::mutex & servers_lock) override;
|
||||
|
||||
private:
|
||||
std::unique_ptr<TCPProtocolStackFactory> buildProtocolStackFromConfig(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
IServer & server,
|
||||
const std::string & protocol,
|
||||
Poco::Net::HTTPServerParams::Ptr http_params,
|
||||
AsynchronousMetrics & async_metrics,
|
||||
bool & is_secure) const;
|
||||
};
|
||||
|
||||
}
|
@ -93,7 +93,6 @@ String BackgroundJobsAssignee::toString(Type type)
|
||||
case Type::Moving:
|
||||
return "Moving";
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
void BackgroundJobsAssignee::start()
|
||||
|
@ -2964,8 +2964,6 @@ String KeyCondition::RPNElement::toString(std::string_view column_name, bool pri
|
||||
case ALWAYS_TRUE:
|
||||
return "true";
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
|
||||
|
@ -1177,8 +1177,6 @@ String MergeTreeData::MergingParams::getModeName() const
|
||||
case Graphite: return "Graphite";
|
||||
case VersionedCollapsing: return "VersionedCollapsing";
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
Int64 MergeTreeData::getMaxBlockNumber() const
|
||||
|
@ -361,8 +361,6 @@ Block MergeTreeDataWriter::mergeBlock(
|
||||
return std::make_shared<GraphiteRollupSortedAlgorithm>(
|
||||
block, 1, sort_description, block_size + 1, /*block_size_bytes=*/0, merging_params.graphite_params, time(nullptr));
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
};
|
||||
|
||||
auto merging_algorithm = get_merging_algorithm();
|
||||
|
@ -616,8 +616,6 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
void PartMovesBetweenShardsOrchestrator::removePins(const Entry & entry, zkutil::ZooKeeperPtr zk)
|
||||
|
@ -297,7 +297,6 @@ namespace
|
||||
CASE_WINDOW_KIND(Year)
|
||||
#undef CASE_WINDOW_KIND
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
class AddingAggregatedChunkInfoTransform : public ISimpleTransform
|
||||
@ -920,7 +919,6 @@ UInt32 StorageWindowView::getWindowLowerBound(UInt32 time_sec)
|
||||
CASE_WINDOW_KIND(Year)
|
||||
#undef CASE_WINDOW_KIND
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec)
|
||||
@ -948,7 +946,6 @@ UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec)
|
||||
CASE_WINDOW_KIND(Year)
|
||||
#undef CASE_WINDOW_KIND
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
void StorageWindowView::addFireSignal(std::set<UInt32> & signals)
|
||||
|
@ -0,0 +1,6 @@
|
||||
Test (1)
|
||||
1
|
||||
2
|
||||
Test (2)
|
||||
4
|
||||
4
|
70
tests/queries/0_stateless/02494_query_cache_key.sql
Normal file
70
tests/queries/0_stateless/02494_query_cache_key.sql
Normal file
@ -0,0 +1,70 @@
|
||||
-- Tags: no-parallel
|
||||
-- Tag no-parallel: Messes with internal cache
|
||||
|
||||
-- Tests that the key of the query cache is not only formed by the query AST but also by
|
||||
-- (1) the current database (`USE db`, issue #64136),
|
||||
-- (2) the query settings
|
||||
|
||||
|
||||
SELECT 'Test (1)';
|
||||
|
||||
SYSTEM DROP QUERY CACHE;
|
||||
|
||||
DROP DATABASE IF EXISTS db1;
|
||||
DROP DATABASE IF EXISTS db2;
|
||||
|
||||
CREATE DATABASE db1;
|
||||
CREATE DATABASE db2;
|
||||
|
||||
CREATE TABLE db1.tab(a UInt64, PRIMARY KEY a);
|
||||
CREATE TABLE db2.tab(a UInt64, PRIMARY KEY a);
|
||||
|
||||
INSERT INTO db1.tab values(1);
|
||||
INSERT INTO db2.tab values(2);
|
||||
|
||||
USE db1;
|
||||
SELECT * FROM tab SETTINGS use_query_cache=1;
|
||||
|
||||
USE db2;
|
||||
SELECT * FROM tab SETTINGS use_query_cache=1;
|
||||
|
||||
DROP DATABASE db1;
|
||||
DROP DATABASE db2;
|
||||
|
||||
SYSTEM DROP QUERY CACHE;
|
||||
|
||||
|
||||
SELECT 'Test (2)';
|
||||
|
||||
-- test with query-level settings
|
||||
SELECT 1 SETTINGS use_query_cache = 1, limit = 1, use_skip_indexes = 0 Format Null;
|
||||
SELECT 1 SETTINGS use_query_cache = 1, use_skip_indexes = 0 Format Null;
|
||||
SELECT 1 SETTINGS use_query_cache = 1, use_skip_indexes = 1 Format Null;
|
||||
SELECT 1 SETTINGS use_query_cache = 1, max_block_size = 1 Format Null;
|
||||
|
||||
-- 4x the same query but with different settings each. There should yield four entries in the query cache.
|
||||
SELECT count(query) FROM system.query_cache;
|
||||
|
||||
SYSTEM DROP QUERY CACHE;
|
||||
|
||||
-- test with mixed session-level/query-level settings
|
||||
SET use_query_cache = 1;
|
||||
SET limit = 1;
|
||||
SELECT 1 SETTINGS use_skip_indexes = 0 Format Null;
|
||||
SET limit = default;
|
||||
SET use_skip_indexes = 0;
|
||||
SELECT 1 Format Null;
|
||||
SET use_skip_indexes = 1;
|
||||
SELECT 1 SETTINGS use_skip_indexes = 1 Format Null;
|
||||
SET use_skip_indexes = default;
|
||||
SET max_block_size = 1;
|
||||
SELECT 1 Format Null;
|
||||
SET max_block_size = default;
|
||||
|
||||
SET use_query_cache = default;
|
||||
|
||||
-- 4x the same query but with different settings each. There should yield four entries in the query cache.
|
||||
SELECT count(query) FROM system.query_cache;
|
||||
|
||||
SYSTEM DROP QUERY CACHE;
|
||||
|
@ -1,2 +1,4 @@
|
||||
2
|
||||
0
|
||||
1
|
||||
0
|
||||
|
@ -15,11 +15,17 @@ ${CLICKHOUSE_CLIENT} --query "CREATE TABLE tab (a UInt64) ENGINE=MergeTree() ORD
|
||||
${CLICKHOUSE_CLIENT} --query "INSERT INTO tab VALUES (1) (2) (3)"
|
||||
${CLICKHOUSE_CLIENT} --query "INSERT INTO tab VALUES (3) (4) (5)"
|
||||
|
||||
SETTINGS="SETTINGS use_query_cache=1, max_threads=1, allow_experimental_analyzer=0, merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=0.0"
|
||||
SETTINGS_NO_ANALYZER="SETTINGS use_query_cache=1, max_threads=1, allow_experimental_analyzer=0, merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=0.0"
|
||||
SETTINGS_ANALYZER="SETTINGS use_query_cache=1, max_threads=1, allow_experimental_analyzer=1, merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=0.0"
|
||||
|
||||
# Verify that the first query does two aggregations and the second query zero aggregations. Since query cache is currently not integrated
|
||||
# with EXPLAIN PLAN, we need to check the logs.
|
||||
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS" 2>&1 | grep "Aggregated. " | wc -l
|
||||
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS" 2>&1 | grep "Aggregated. " | wc -l
|
||||
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS_NO_ANALYZER" 2>&1 | grep "Aggregated. " | wc -l
|
||||
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS_NO_ANALYZER" 2>&1 | grep "Aggregated. " | wc -l
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "SYSTEM DROP QUERY CACHE"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS_ANALYZER" 2>&1 | grep "Aggregated. " | wc -l
|
||||
${CLICKHOUSE_CLIENT} --send_logs_level=trace --query "SELECT count(a) / (SELECT sum(a) FROM tab) FROM tab $SETTINGS_ANALYZER" 2>&1 | grep "Aggregated. " | wc -l
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "SYSTEM DROP QUERY CACHE"
|
||||
|
@ -1,2 +0,0 @@
|
||||
1
|
||||
2
|
@ -1,30 +0,0 @@
|
||||
-- Tags: no-parallel, no-fasttest
|
||||
-- Tag no-fasttest: Depends on OpenSSL
|
||||
-- Tag no-parallel: Messes with internal cache
|
||||
|
||||
-- Test for issue #64136
|
||||
|
||||
SYSTEM DROP QUERY CACHE;
|
||||
|
||||
DROP DATABASE IF EXISTS db1;
|
||||
DROP DATABASE IF EXISTS db2;
|
||||
|
||||
CREATE DATABASE db1;
|
||||
CREATE DATABASE db2;
|
||||
|
||||
CREATE TABLE db1.tab(a UInt64, PRIMARY KEY a);
|
||||
CREATE TABLE db2.tab(a UInt64, PRIMARY KEY a);
|
||||
|
||||
INSERT INTO db1.tab values(1);
|
||||
INSERT INTO db2.tab values(2);
|
||||
|
||||
USE db1;
|
||||
SELECT * FROM tab SETTINGS use_query_cache=1;
|
||||
|
||||
USE db2;
|
||||
SELECT * FROM tab SETTINGS use_query_cache=1;
|
||||
|
||||
DROP DATABASE db1;
|
||||
DROP DATABASE db2;
|
||||
|
||||
SYSTEM DROP QUERY CACHE;
|
@ -0,0 +1,9 @@
|
||||
-- generateSnowflakeID
|
||||
1
|
||||
0
|
||||
0
|
||||
1
|
||||
100
|
||||
-- generateSnowflakeIDThreadMonotonic
|
||||
1
|
||||
100
|
29
tests/queries/0_stateless/03130_generateSnowflakeId.sql
Normal file
29
tests/queries/0_stateless/03130_generateSnowflakeId.sql
Normal file
@ -0,0 +1,29 @@
|
||||
SELECT '-- generateSnowflakeID';
|
||||
|
||||
SELECT bitAnd(bitShiftRight(toUInt64(generateSnowflakeID()), 63), 1) = 0; -- check first bit is zero
|
||||
|
||||
SELECT generateSnowflakeID(1) = generateSnowflakeID(2); -- disabled common subexpression elimination --> lhs != rhs
|
||||
SELECT generateSnowflakeID() = generateSnowflakeID(1); -- same as ^^
|
||||
SELECT generateSnowflakeID(1) = generateSnowflakeID(1); -- enabled common subexpression elimination
|
||||
|
||||
SELECT generateSnowflakeID(1, 2); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
|
||||
|
||||
SELECT count(*)
|
||||
FROM
|
||||
(
|
||||
SELECT DISTINCT generateSnowflakeID()
|
||||
FROM numbers(100)
|
||||
);
|
||||
|
||||
SELECT '-- generateSnowflakeIDThreadMonotonic';
|
||||
|
||||
SELECT bitAnd(bitShiftRight(toUInt64(generateSnowflakeIDThreadMonotonic()), 63), 1) = 0; -- check first bit is zero
|
||||
|
||||
SELECT generateSnowflakeIDThreadMonotonic(1, 2); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
|
||||
|
||||
SELECT count(*)
|
||||
FROM
|
||||
(
|
||||
SELECT DISTINCT generateSnowflakeIDThreadMonotonic()
|
||||
FROM numbers(100)
|
||||
);
|
@ -1618,6 +1618,8 @@ gcem
|
||||
generateRandom
|
||||
generateRandomStructure
|
||||
generateSeries
|
||||
generateSnowflakeID
|
||||
generateSnowflakeIDThreadMonotonic
|
||||
generateULID
|
||||
generateUUIDv
|
||||
geoDistance
|
||||
|
Loading…
Reference in New Issue
Block a user