Merge pull request #66374 from Zawa-ll/66010-add-machine-id-to-snowflakeid-clean

Add `machine_id` parameter to `generateSnowflakeID`
This commit is contained in:
Robert Schulze 2024-07-17 09:28:17 +00:00 committed by GitHub
commit 93abd4a6e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 54 additions and 16 deletions

View File

@ -567,12 +567,13 @@ While no standard or recommendation exists for the epoch of Snowflake IDs, imple
**Syntax**
``` sql
generateSnowflakeID([expr])
generateSnowflakeID([expr, [machine_id]])
```
**Arguments**
- `expr` — An arbitrary [expression](../../sql-reference/syntax.md#syntax-expressions) used to bypass [common subexpression elimination](../../sql-reference/functions/index.md#common-subexpression-elimination) if the function is called multiple times in a query. The value of the expression has no effect on the returned Snowflake ID. Optional.
- `machine_id` — A machine ID, the lowest 10 bits are used. [Int64](../data-types/int-uint.md). Optional.
**Returned value**
@ -608,6 +609,16 @@ SELECT generateSnowflakeID(1), generateSnowflakeID(2);
└────────────────────────┴────────────────────────┘
```
**Example with expression and a machine ID**
```
SELECT generateSnowflakeID('expr', 1);
┌─generateSnowflakeID('expr', 1)─┐
│ 7201148511606784002 │
└────────────────────────────────┘
```
## snowflakeToDateTime
:::warning

View File

@ -4,10 +4,10 @@
#include <Functions/FunctionHelpers.h>
#include <Core/ServerUUID.h>
#include <Common/Logger.h>
#include <Common/ErrorCodes.h>
#include <Common/logger_useful.h>
#include "base/types.h"
namespace DB
{
@ -96,10 +96,11 @@ struct SnowflakeIdRange
/// 1. calculate Snowflake ID by current timestamp (`now`)
/// 2. `begin = max(available, now)`
/// 3. Calculate `end = begin + input_rows_count` handling `machine_seq_num` overflow
SnowflakeIdRange getRangeOfAvailableIds(const SnowflakeId & available, size_t input_rows_count)
SnowflakeIdRange getRangeOfAvailableIds(const SnowflakeId & available, uint64_t machine_id, size_t input_rows_count)
{
/// 1. `now`
SnowflakeId begin = {.timestamp = getTimestamp(), .machine_id = getMachineId(), .machine_seq_num = 0};
SnowflakeId begin = {.timestamp = getTimestamp(), .machine_id = machine_id, .machine_seq_num = 0};
/// 2. `begin`
if (begin.timestamp <= available.timestamp)
@ -128,13 +129,13 @@ struct Data
/// Guarantee counter monotonicity within one timestamp across all threads generating Snowflake IDs simultaneously.
static inline std::atomic<uint64_t> lowest_available_snowflake_id = 0;
SnowflakeId reserveRange(size_t input_rows_count)
SnowflakeId reserveRange(uint64_t machine_id, size_t input_rows_count)
{
uint64_t available_snowflake_id = lowest_available_snowflake_id.load();
SnowflakeIdRange range;
do
{
range = getRangeOfAvailableIds(toSnowflakeId(available_snowflake_id), input_rows_count);
range = getRangeOfAvailableIds(toSnowflakeId(available_snowflake_id), machine_id, input_rows_count);
}
while (!lowest_available_snowflake_id.compare_exchange_weak(available_snowflake_id, fromSnowflakeId(range.end)));
/// CAS failed --> another thread updated `lowest_available_snowflake_id` and we re-try
@ -165,24 +166,32 @@ public:
{
FunctionArgumentDescriptors mandatory_args;
FunctionArgumentDescriptors optional_args{
{"expr", nullptr, nullptr, "Arbitrary expression"}
{"expr", nullptr, nullptr, "Arbitrary expression"},
{"machine_id", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isNativeUInt), static_cast<FunctionArgumentDescriptor::ColumnValidator>(&isColumnConst), "const UInt*"}
};
validateFunctionArguments(*this, arguments, mandatory_args, optional_args);
return std::make_shared<DataTypeUInt64>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & /*arguments*/, const DataTypePtr &, size_t input_rows_count) const override
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
auto col_res = ColumnVector<UInt64>::create();
typename ColumnVector<UInt64>::Container & vec_to = col_res->getData();
if (input_rows_count != 0)
if (input_rows_count > 0)
{
vec_to.resize(input_rows_count);
uint64_t machine_id = getMachineId();
if (arguments.size() == 2)
{
machine_id = arguments[1].column->getUInt(0);
machine_id &= (1ull << machine_id_bits_count) - 1;
}
Data data;
SnowflakeId snowflake_id = data.reserveRange(input_rows_count); /// returns begin of available snowflake ids range
SnowflakeId snowflake_id = data.reserveRange(machine_id, input_rows_count);
for (UInt64 & to_row : vec_to)
{
@ -208,10 +217,13 @@ public:
REGISTER_FUNCTION(GenerateSnowflakeID)
{
FunctionDocumentation::Description description = R"(Generates a Snowflake ID. The generated Snowflake ID contains the current Unix timestamp in milliseconds (41 + 1 top zero bits), followed by a machine id (10 bits), and a counter (12 bits) to distinguish IDs within a millisecond. For any given timestamp (unix_ts_ms), the counter starts at 0 and is incremented by 1 for each new Snowflake ID until the timestamp changes. In case the counter overflows, the timestamp field is incremented by 1 and the counter is reset to 0. Function generateSnowflakeID guarantees that the counter field within a timestamp increments monotonically across all function invocations in concurrently running threads and queries.)";
FunctionDocumentation::Syntax syntax = "generateSnowflakeID([expression])";
FunctionDocumentation::Arguments arguments = {{"expression", "The expression is used to bypass common subexpression elimination if the function is called multiple times in a query but otherwise ignored. Optional."}};
FunctionDocumentation::Syntax syntax = "generateSnowflakeID([expression, [machine_id]])";
FunctionDocumentation::Arguments arguments = {
{"expression", "The expression is used to bypass common subexpression elimination if the function is called multiple times in a query but otherwise ignored. Optional."},
{"machine_id", "A machine ID, the lowest 10 bits are used. Optional."}
};
FunctionDocumentation::ReturnedValue returned_value = "A value of type UInt64";
FunctionDocumentation::Examples examples = {{"single", "SELECT generateSnowflakeID()", "7201148511606784000"}, {"multiple", "SELECT generateSnowflakeID(1), generateSnowflakeID(2)", ""}};
FunctionDocumentation::Examples examples = {{"no_arguments", "SELECT generateSnowflakeID()", "7201148511606784000"}, {"with_machine_id", "SELECT generateSnowflakeID(1)", "7201148511606784001"}, {"with_expression_and_machine_id", "SELECT generateSnowflakeID('some_expression', 1)", "7201148511606784002"}};
FunctionDocumentation::Categories categories = {"Snowflake ID"};
factory.registerFunction<FunctionGenerateSnowflakeID>({description, syntax, arguments, returned_value, examples, categories});

View File

@ -1,5 +1,11 @@
Negative tests
The first bit must be zero
1
Test disabling of common subexpression elimination via first parameter
0
0
1
Test user-provided machine ID
1
Generated Snowflake IDs are unique
100

View File

@ -1,13 +1,22 @@
-- Test SQL function 'generateSnowflakeID'
SELECT bitAnd(bitShiftRight(toUInt64(generateSnowflakeID()), 63), 1) = 0; -- check first bit is zero
SELECT 'Negative tests';
SELECT generateSnowflakeID(1, 2, 3); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
SELECT generateSnowflakeID(1, 'not_an_int'); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
SELECT generateSnowflakeID(1, materialize(2)); -- { serverError ILLEGAL_COLUMN }
SELECT 'The first bit must be zero';
SELECT bitAnd(bitShiftRight(generateSnowflakeID(), 63), 1) = 0;
SELECT 'Test disabling of common subexpression elimination via first parameter';
SELECT generateSnowflakeID(1) = generateSnowflakeID(2); -- disabled common subexpression elimination --> lhs != rhs
SELECT generateSnowflakeID() = generateSnowflakeID(1); -- same as ^^
SELECT generateSnowflakeID(1) = generateSnowflakeID(1); -- enabled common subexpression elimination
SELECT generateSnowflakeID(1) = generateSnowflakeID(1); -- with common subexpression elimination
SELECT generateSnowflakeID(1, 2); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
SELECT 'Test user-provided machine ID';
SELECT bitAnd(bitShiftRight(generateSnowflakeID(1, 123), 12), 1024 - 1) = 123; -- the machine id is actually set in the generated snowflake ID (1024 = 2^10)
SELECT 'Generated Snowflake IDs are unique';
SELECT count(*)
FROM
(