From 0f039f83590339ab636e9b2e59c25ed22227d246 Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Tue, 21 Mar 2023 09:09:57 +0100 Subject: [PATCH 01/52] Add Google Cloud Storage S3 compatible table function --- src/TableFunctions/TableFunctionS3.h | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/TableFunctions/TableFunctionS3.h b/src/TableFunctions/TableFunctionS3.h index 859da9e9201..a2e93476448 100644 --- a/src/TableFunctions/TableFunctionS3.h +++ b/src/TableFunctions/TableFunctionS3.h @@ -93,4 +93,18 @@ private: } +class TableFunctionGCS : public TableFunctionS3 +{ +public: + static constexpr auto name = "gcs"; + std::string getName() const override + { + return name; + } +private: + const char * getStorageTypeName() const override { return "GCS"; } +}; + +} + #endif From e6ddfc3486985393040222aec24ea70a4c60e7b8 Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Tue, 21 Mar 2023 09:51:37 +0100 Subject: [PATCH 02/52] Update GCS table function docs --- docs/en/sql-reference/table-functions/gcs.md | 184 +++++++++++++++++++ src/TableFunctions/TableFunctionS3.h | 8 + 2 files changed, 192 insertions(+) create mode 100644 docs/en/sql-reference/table-functions/gcs.md diff --git a/docs/en/sql-reference/table-functions/gcs.md b/docs/en/sql-reference/table-functions/gcs.md new file mode 100644 index 00000000000..8427a2db224 --- /dev/null +++ b/docs/en/sql-reference/table-functions/gcs.md @@ -0,0 +1,184 @@ +--- +slug: /en/sql-reference/table-functions/gcs +sidebar_position: 45 +sidebar_label: s3 +keywords: [gcs, bucket] +--- + +# gcs Table Function + +Provides a table-like interface to select/insert files in [Google Cloud Storage](https://cloud.google.com/storage/). + +**Syntax** + +``` sql +gcs(path [,hmac_key, hmac_secret] [,format] [,structure] [,compression]) +``` + +:::tip GCS +The GCS Table Function integrates with Google Cloud Storage by using the GCS XML API and HMAC keys. See the [Google interoperability docs]( https://cloud.google.com/storage/docs/interoperability) for more details about the endpoint and HMAC. + +::: + +**Arguments** + +- `path` — Bucket url with path to file. Supports following wildcards in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings. For more information see [here](../../engines/table-engines/integrations/gcs.md#wildcards-in-path). + + :::note GCS + The GCS path is in this format as the endpoint for the Google XML API is different than the JSON API: + ``` + https://storage.googleapis.com/// + ``` + and not ~~https://storage.cloud.google.com~~. + ::: + +- `format` — The [format](../../interfaces/formats.md#formats) of the file. +- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`. +- `compression` — Parameter is optional. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. By default, it will autodetect compression by file extension. + +**Returned value** + +A table with the specified structure for reading or writing data in the specified file. + +**Examples** + +Selecting the first two rows from the table from S3 file `https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/data.csv`: + +``` sql +SELECT * +FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/data.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32') +LIMIT 2; +``` + +``` text +┌─column1─┬─column2─┬─column3─┐ +│ 1 │ 2 │ 3 │ +│ 3 │ 2 │ 1 │ +└─────────┴─────────┴─────────┘ +``` + +The similar but from file with `gzip` compression: + +``` sql +SELECT * +FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/data.csv.gz', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32', 'gzip') +LIMIT 2; +``` + +``` text +┌─column1─┬─column2─┬─column3─┐ +│ 1 │ 2 │ 3 │ +│ 3 │ 2 │ 1 │ +└─────────┴─────────┴─────────┘ +``` + +## Usage + +Suppose that we have several files with following URIs on S3: + +- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_1.csv' +- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_2.csv' +- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_3.csv' +- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_4.csv' +- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_1.csv' +- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_2.csv' +- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_3.csv' +- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_4.csv' + +Count the amount of rows in files ending with numbers from 1 to 3: + +``` sql +SELECT count(*) +FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/some_file_{1..3}.csv', 'CSV', 'name String, value UInt32') +``` + +``` text +┌─count()─┐ +│ 18 │ +└─────────┘ +``` + +Count the total amount of rows in all files in these two directories: + +``` sql +SELECT count(*) +FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/*', 'CSV', 'name String, value UInt32') +``` + +``` text +┌─count()─┐ +│ 24 │ +└─────────┘ +``` + +:::warning +If your listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`. +::: + +Count the total amount of rows in files named `file-000.csv`, `file-001.csv`, … , `file-999.csv`: + +``` sql +SELECT count(*) +FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/big_prefix/file-{000..999}.csv', 'CSV', 'name String, value UInt32'); +``` + +``` text +┌─count()─┐ +│ 12 │ +└─────────┘ +``` + +Insert data into file `test-data.csv.gz`: + +``` sql +INSERT INTO FUNCTION gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip') +VALUES ('test-data', 1), ('test-data-2', 2); +``` + +Insert data into file `test-data.csv.gz` from existing table: + +``` sql +INSERT INTO FUNCTION gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip') +SELECT name, value FROM existing_table; +``` + +Glob ** can be used for recursive directory traversal. Consider the below example, it will fetch all files from `my-test-bucket-768` directory recursively: + +``` sql +SELECT * FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**', 'CSV', 'name String, value UInt32', 'gzip'); +``` + +The below get data from all `test-data.csv.gz` files from any folder inside `my-test-bucket` directory recursively: + +``` sql +SELECT * FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip'); +``` + +## Partitioned Write + +If you specify `PARTITION BY` expression when inserting data into `S3` table, a separate file is created for each partition value. Splitting the data into separate files helps to improve reading operations efficiency. + +**Examples** + +1. Using partition ID in a key creates separate files: + +```sql +INSERT INTO TABLE FUNCTION + gcs('http://bucket.amazonaws.com/my_bucket/file_{_partition_id}.csv', 'CSV', 'a String, b UInt32, c UInt32') + PARTITION BY a VALUES ('x', 2, 3), ('x', 4, 5), ('y', 11, 12), ('y', 13, 14), ('z', 21, 22), ('z', 23, 24); +``` +As a result, the data is written into three files: `file_x.csv`, `file_y.csv`, and `file_z.csv`. + +2. Using partition ID in a bucket name creates files in different buckets: + +```sql +INSERT INTO TABLE FUNCTION + gcs('http://bucket.amazonaws.com/my_bucket_{_partition_id}/file.csv', 'CSV', 'a UInt32, b UInt32, c UInt32') + PARTITION BY a VALUES (1, 2, 3), (1, 4, 5), (10, 11, 12), (10, 13, 14), (20, 21, 22), (20, 23, 24); +``` +As a result, the data is written into three files in different buckets: `my_bucket_1/file.csv`, `my_bucket_10/file.csv`, and `my_bucket_20/file.csv`. + +**See Also** + +- [S3 table function](s3.md) +- [S3 engine](../../engines/table-engines/integrations/s3.md) diff --git a/src/TableFunctions/TableFunctionS3.h b/src/TableFunctions/TableFunctionS3.h index a2e93476448..ed8cd3bd41a 100644 --- a/src/TableFunctions/TableFunctionS3.h +++ b/src/TableFunctions/TableFunctionS3.h @@ -97,6 +97,14 @@ class TableFunctionGCS : public TableFunctionS3 { public: static constexpr auto name = "gcs"; + static constexpr auto signature = " - url\n" + " - url, format\n" + " - url, format, structure\n" + " - url, hmac_key, hmac_secret\n" + " - url, format, structure, compression_method\n" + " - url, hmac_key, hmac_secret, format\n" + " - url, hmac_key, hmac_secret, format, structure\n" + " - url, hmac_key, hmac_secret, format, structure, compression_method"; std::string getName() const override { return name; From e2c32c3bc072e2290620d975ada42c37bcabcc52 Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Tue, 21 Mar 2023 13:46:37 +0100 Subject: [PATCH 03/52] Update GCS table function docs --- docs/en/sql-reference/table-functions/gcs.md | 40 ++++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/docs/en/sql-reference/table-functions/gcs.md b/docs/en/sql-reference/table-functions/gcs.md index 8427a2db224..dcf49a5108b 100644 --- a/docs/en/sql-reference/table-functions/gcs.md +++ b/docs/en/sql-reference/table-functions/gcs.md @@ -42,11 +42,11 @@ A table with the specified structure for reading or writing data in the specifie **Examples** -Selecting the first two rows from the table from S3 file `https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/data.csv`: +Selecting the first two rows from the table from GCS file `https://storage.googleapis.com/my-test-bucket-768/data.csv`: ``` sql SELECT * -FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/data.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32') +FROM gcs('https://storage.googleapis.com/my-test-bucket-768/data.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32') LIMIT 2; ``` @@ -61,7 +61,7 @@ The similar but from file with `gzip` compression: ``` sql SELECT * -FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/data.csv.gz', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32', 'gzip') +FROM gcs('https://storage.googleapis.com/my-test-bucket-768/data.csv.gz', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32', 'gzip') LIMIT 2; ``` @@ -74,22 +74,22 @@ LIMIT 2; ## Usage -Suppose that we have several files with following URIs on S3: +Suppose that we have several files with following URIs on GCS: -- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_1.csv' -- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_2.csv' -- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_3.csv' -- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/some_prefix/some_file_4.csv' -- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_1.csv' -- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_2.csv' -- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_3.csv' -- 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/another_prefix/some_file_4.csv' +- 'https://storage.googleapis.com/my-test-bucket-768/some_prefix/some_file_1.csv' +- 'https://storage.googleapis.com/my-test-bucket-768/some_prefix/some_file_2.csv' +- 'https://storage.googleapis.com/my-test-bucket-768/some_prefix/some_file_3.csv' +- 'https://storage.googleapis.com/my-test-bucket-768/some_prefix/some_file_4.csv' +- 'https://storage.googleapis.com/my-test-bucket-768/another_prefix/some_file_1.csv' +- 'https://storage.googleapis.com/my-test-bucket-768/another_prefix/some_file_2.csv' +- 'https://storage.googleapis.com/my-test-bucket-768/another_prefix/some_file_3.csv' +- 'https://storage.googleapis.com/my-test-bucket-768/another_prefix/some_file_4.csv' Count the amount of rows in files ending with numbers from 1 to 3: ``` sql SELECT count(*) -FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/some_file_{1..3}.csv', 'CSV', 'name String, value UInt32') +FROM gcs('https://storage.googleapis.com/my-test-bucket-768/{some,another}_prefix/some_file_{1..3}.csv', 'CSV', 'name String, value UInt32') ``` ``` text @@ -102,7 +102,7 @@ Count the total amount of rows in all files in these two directories: ``` sql SELECT count(*) -FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/*', 'CSV', 'name String, value UInt32') +FROM gcs('https://storage.googleapis.com/my-test-bucket-768/{some,another}_prefix/*', 'CSV', 'name String, value UInt32') ``` ``` text @@ -119,7 +119,7 @@ Count the total amount of rows in files named `file-000.csv`, `file-001.csv`, ``` sql SELECT count(*) -FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/big_prefix/file-{000..999}.csv', 'CSV', 'name String, value UInt32'); +FROM gcs('https://storage.googleapis.com/my-test-bucket-768/big_prefix/file-{000..999}.csv', 'CSV', 'name String, value UInt32'); ``` ``` text @@ -131,32 +131,32 @@ FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768 Insert data into file `test-data.csv.gz`: ``` sql -INSERT INTO FUNCTION gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip') +INSERT INTO FUNCTION gcs('https://storage.googleapis.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip') VALUES ('test-data', 1), ('test-data-2', 2); ``` Insert data into file `test-data.csv.gz` from existing table: ``` sql -INSERT INTO FUNCTION gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip') +INSERT INTO FUNCTION gcs('https://storage.googleapis.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip') SELECT name, value FROM existing_table; ``` Glob ** can be used for recursive directory traversal. Consider the below example, it will fetch all files from `my-test-bucket-768` directory recursively: ``` sql -SELECT * FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**', 'CSV', 'name String, value UInt32', 'gzip'); +SELECT * FROM gcs('https://storage.googleapis.com/my-test-bucket-768/**', 'CSV', 'name String, value UInt32', 'gzip'); ``` The below get data from all `test-data.csv.gz` files from any folder inside `my-test-bucket` directory recursively: ``` sql -SELECT * FROM gcs('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip'); +SELECT * FROM gcs('https://storage.googleapis.com/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip'); ``` ## Partitioned Write -If you specify `PARTITION BY` expression when inserting data into `S3` table, a separate file is created for each partition value. Splitting the data into separate files helps to improve reading operations efficiency. +If you specify `PARTITION BY` expression when inserting data into `GCS` table, a separate file is created for each partition value. Splitting the data into separate files helps to improve reading operations efficiency. **Examples** From d0a54ab21b2107ee6893a7480533c79c2919fd75 Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Tue, 21 Mar 2023 14:45:58 +0100 Subject: [PATCH 04/52] Update GCS table function docs --- docs/en/sql-reference/table-functions/gcs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/sql-reference/table-functions/gcs.md b/docs/en/sql-reference/table-functions/gcs.md index dcf49a5108b..bfa7f36fa48 100644 --- a/docs/en/sql-reference/table-functions/gcs.md +++ b/docs/en/sql-reference/table-functions/gcs.md @@ -22,7 +22,7 @@ The GCS Table Function integrates with Google Cloud Storage by using the GCS XML **Arguments** -- `path` — Bucket url with path to file. Supports following wildcards in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings. For more information see [here](../../engines/table-engines/integrations/gcs.md#wildcards-in-path). +- `path` — Bucket url with path to file. Supports following wildcards in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings. :::note GCS The GCS path is in this format as the endpoint for the Google XML API is different than the JSON API: From 576efc1da3384148664202acef1cdbd27ddc8a08 Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Wed, 22 Mar 2023 06:58:09 +0100 Subject: [PATCH 05/52] register GCP function in factory --- src/TableFunctions/TableFunctionS3.cpp | 5 +++++ src/TableFunctions/registerTableFunctions.cpp | 1 + src/TableFunctions/registerTableFunctions.h | 1 + 3 files changed, 7 insertions(+) diff --git a/src/TableFunctions/TableFunctionS3.cpp b/src/TableFunctions/TableFunctionS3.cpp index f082b192ee0..6f4e6acec8a 100644 --- a/src/TableFunctions/TableFunctionS3.cpp +++ b/src/TableFunctions/TableFunctionS3.cpp @@ -183,6 +183,11 @@ void registerTableFunctionOSS(TableFunctionFactory & factory) factory.registerFunction(); } +void registerTableFunctionGCS(TableFunctionFactory & factory) +{ + factory.registerFunction(); +} + } #endif diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp index 7b2b989e724..c692173e689 100644 --- a/src/TableFunctions/registerTableFunctions.cpp +++ b/src/TableFunctions/registerTableFunctions.cpp @@ -28,6 +28,7 @@ void registerTableFunctions() registerTableFunctionS3Cluster(factory); registerTableFunctionCOS(factory); registerTableFunctionOSS(factory); + registerTableFunctionGCS(factory); registerTableFunctionHudi(factory); registerTableFunctionDeltaLake(factory); #if USE_AVRO diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h index 911aae199e2..af1b7129ec4 100644 --- a/src/TableFunctions/registerTableFunctions.h +++ b/src/TableFunctions/registerTableFunctions.h @@ -25,6 +25,7 @@ void registerTableFunctionS3(TableFunctionFactory & factory); void registerTableFunctionS3Cluster(TableFunctionFactory & factory); void registerTableFunctionCOS(TableFunctionFactory & factory); void registerTableFunctionOSS(TableFunctionFactory & factory); +void registerTableFunctionGCS(TableFunctionFactory & factory); void registerTableFunctionHudi(TableFunctionFactory & factory); void registerTableFunctionDeltaLake(TableFunctionFactory & factory); #if USE_AVRO From ae100defa279c8e3343482551041758f6a7c925c Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky Date: Thu, 20 Apr 2023 15:51:10 +0000 Subject: [PATCH 06/52] Add Array data type to MongoDB --- src/Processors/Sources/MongoDBSource.cpp | 204 +++++++++++++++++- src/Processors/Sources/MongoDBSource.h | 9 + .../integration/test_storage_mongodb/test.py | 75 +++++++ 3 files changed, 284 insertions(+), 4 deletions(-) diff --git a/src/Processors/Sources/MongoDBSource.cpp b/src/Processors/Sources/MongoDBSource.cpp index a8bfefdf8a6..8ebedc3e877 100644 --- a/src/Processors/Sources/MongoDBSource.cpp +++ b/src/Processors/Sources/MongoDBSource.cpp @@ -6,7 +6,9 @@ #include #include #include +#include +#include #include #include #include @@ -17,6 +19,9 @@ #include #include +#include +#include + // only after poco // naming conflict: // Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value); @@ -33,6 +38,11 @@ namespace ErrorCodes extern const int MONGODB_ERROR; } +namespace +{ + void prepareMongoDBArrayInfo( + std::unordered_map & array_info, size_t column_idx, const DataTypePtr data_type); +} std::unique_ptr createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select) { @@ -58,6 +68,10 @@ MongoDBSource::MongoDBSource( , max_block_size{max_block_size_} { description.init(sample_block); + + for (const auto idx : collections::range(0, description.sample_block.columns())) + if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray) + prepareMongoDBArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type); } @@ -68,6 +82,7 @@ namespace { using ValueType = ExternalResultDescription::ValueType; using ObjectId = Poco::MongoDB::ObjectId; + using MongoArray = Poco::MongoDB::Array; template void insertNumber(IColumn & column, const Poco::MongoDB::Element & value, const std::string & name) @@ -103,7 +118,129 @@ namespace } } - void insertValue(IColumn & column, const ValueType type, const Poco::MongoDB::Element & value, const std::string & name) + template + Field getNumber(const Poco::MongoDB::Element & value, const std::string & name) + { + switch (value.type()) + { + case Poco::MongoDB::ElementTraits::TypeId: + return static_cast(static_cast &>(value).value()); + case Poco::MongoDB::ElementTraits::TypeId: + return static_cast(static_cast &>(value).value()); + case Poco::MongoDB::ElementTraits::TypeId: + return static_cast(static_cast &>(value).value()); + case Poco::MongoDB::ElementTraits::TypeId: + return static_cast(static_cast &>(value).value()); + case Poco::MongoDB::ElementTraits::TypeId: + return Field(); + case Poco::MongoDB::ElementTraits::TypeId: + return parse(static_cast &>(value).value()); + default: + throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected a number, got type id = {} for column {}", + toString(value.type()), name); + } + } + + void prepareMongoDBArrayInfo( + std::unordered_map & array_info, size_t column_idx, const DataTypePtr data_type) + { + const auto * array_type = typeid_cast(data_type.get()); + auto nested = array_type->getNestedType(); + + size_t count_dimensions = 1; + while (isArray(nested)) + { + ++count_dimensions; + nested = typeid_cast(nested.get())->getNestedType(); + } + + Field default_value = nested->getDefault(); + if (nested->isNullable()) + nested = static_cast(nested.get())->getNestedType(); + + WhichDataType which(nested); + std::function parser; + + if (which.isUInt8()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber(value, name); }; + else if (which.isUInt16()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber(value, name); }; + else if (which.isUInt32()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber(value, name); }; + else if (which.isUInt64()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber(value, name); }; + else if (which.isInt8()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber(value, name); }; + else if (which.isInt16()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber(value, name); }; + else if (which.isInt32()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber(value, name); }; + else if (which.isInt64()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber(value, name); }; + else if (which.isFloat32()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber(value, name); }; + else if (which.isFloat64()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber(value, name); }; + else if (which.isString() || which.isFixedString()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field + { + if (value.type() == Poco::MongoDB::ElementTraits::TypeId) + { + String string_id = value.toString(); + return Field(string_id.data(), string_id.size()); + } + else if (value.type() == Poco::MongoDB::ElementTraits::TypeId) + { + String string = static_cast &>(value).value(); + return Field(string.data(), string.size()); + } + + throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String, got type id = {} for column {}", + toString(value.type()), name); + }; + else if (which.isDate()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field + { + if (value.type() != Poco::MongoDB::ElementTraits::TypeId) + throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}", + toString(value.type()), name); + + return static_cast(DateLUT::instance().toDayNum( + static_cast &>(value).value().epochTime())); + }; + else if (which.isDateTime()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field + { + if (value.type() != Poco::MongoDB::ElementTraits::TypeId) + throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}", + toString(value.type()), name); + + return static_cast(static_cast &>(value).value().epochTime()); + }; + else if (which.isUUID()) + parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field + { + if (value.type() != Poco::MongoDB::ElementTraits::TypeId) + throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String (UUID), got type id = {} for column {}", + toString(value.type()), name); + + String string = static_cast &>(value).value(); + return parse(string); + }; + else + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type conversion to {} is not supported", nested->getName()); + + array_info[column_idx] = {count_dimensions, default_value, parser}; + + } + + void insertValue( + IColumn & column, + const ValueType type, + const Poco::MongoDB::Element & value, + const std::string & name, + std::unordered_map & array_info, + size_t idx) { switch (type) { @@ -192,8 +329,67 @@ namespace toString(value.type()), name); break; } + case ValueType::vtArray: + { + if (value.type() != Poco::MongoDB::ElementTraits::TypeId) + throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Array, got type id = {} for column {}", + toString(value.type()), name); + + size_t max_dimension = 0, expected_dimensions = array_info[idx].num_dimensions; + const auto parse_value = array_info[idx].parser; + std::vector dimensions(expected_dimensions + 1); + + auto array = static_cast &>(value).value(); + + std::vector> arrays; + arrays.emplace_back(&value, 0); + + while (!arrays.empty()) + { + size_t dimension = arrays.size(); + max_dimension = std::max(max_dimension, dimension); + + auto [element, i] = arrays.back(); + + auto parent = static_cast &>(*element).value(); + + if (i >= parent->size()) + { + dimensions[dimension].emplace_back(Array(dimensions[dimension + 1].begin(), dimensions[dimension + 1].end())); + dimensions[dimension + 1].clear(); + + arrays.pop_back(); + continue; + } + + Poco::MongoDB::Element::Ptr child = parent->get(static_cast(i)); + + if (child->type() == Poco::MongoDB::ElementTraits::TypeId) + { + if (dimension + 1 > expected_dimensions) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Got more dimensions than expected"); + + arrays.back().second += 1; + arrays.emplace_back(child.get(), 0); + } + else + { + dimensions[dimension].emplace_back(parse_value(*child, name)); + } + } + + if (max_dimension < expected_dimensions) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Got less dimensions than expected. ({} instead of {})", max_dimension, expected_dimensions); + + // TODO: default value + + assert_cast(column).insert(Array(dimensions[1].begin(), dimensions[1].end())); + break; + + } default: - throw Exception(ErrorCodes::UNKNOWN_TYPE, "Value of unsupported type:{}", column.getName()); + throw Exception(ErrorCodes::UNKNOWN_TYPE, "Value of unsupported type: {}", column.getName()); } } @@ -252,11 +448,11 @@ Chunk MongoDBSource::generate() if (is_nullable) { ColumnNullable & column_nullable = assert_cast(*columns[idx]); - insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name); + insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name, array_info, idx); column_nullable.getNullMapData().emplace_back(0); } else - insertValue(*columns[idx], description.types[idx].first, *value, name); + insertValue(*columns[idx], description.types[idx].first, *value, name, array_info, idx); } } } diff --git a/src/Processors/Sources/MongoDBSource.h b/src/Processors/Sources/MongoDBSource.h index d03a7a45477..ec73f00f378 100644 --- a/src/Processors/Sources/MongoDBSource.h +++ b/src/Processors/Sources/MongoDBSource.h @@ -19,6 +19,13 @@ namespace MongoDB namespace DB { +struct MongoDBArrayInfo +{ + size_t num_dimensions; + Field default_value; + std::function parser; +}; + void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password); std::unique_ptr createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select); @@ -45,6 +52,8 @@ private: const UInt64 max_block_size; ExternalResultDescription description; bool all_read = false; + + std::unordered_map array_info; }; } diff --git a/tests/integration/test_storage_mongodb/test.py b/tests/integration/test_storage_mongodb/test.py index 74b2b15fda0..cf843ddd489 100644 --- a/tests/integration/test_storage_mongodb/test.py +++ b/tests/integration/test_storage_mongodb/test.py @@ -70,6 +70,81 @@ def test_simple_select(started_cluster): simple_mongo_table.drop() +@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"]) +def test_arrays(started_cluster): + mongo_connection = get_mongo_connection(started_cluster) + db = mongo_connection["test"] + db.add_user("root", "clickhouse") + simple_mongo_table = db["simple_table"] + data = [] + for i in range(0, 100): + data.append({ + "key": i, + "arr_int64": [- (i + 1), - (i + 2), - (i + 3)], + "arr_int32": [- (i + 1), - (i + 2), - (i + 3)], + "arr_int16": [- (i + 1), - (i + 2), - (i + 3)], + "arr_int8": [- (i + 1), - (i + 2), - (i + 3)], + "arr_uint64": [i + 1, i + 2, i + 3], + "arr_uint32": [i + 1, i + 2, i + 3], + "arr_uint16": [i + 1, i + 2, i + 3], + "arr_uint8": [i + 1, i + 2, i + 3], + "arr_float32": [i + 1.125, i + 2.5, i + 3.750], + "arr_float64": [i + 1.125, i + 2.5, i + 3.750], + "arr_date": ['2023-11-01', '2023-06-19'], + "arr_datetime": ['2023-03-31 06:03:12', '2023-02-01 12:46:34'], + "arr_string": [str(i + 1), str(i + 2), str(i + 3)], + "arr_uuid": ['f0e77736-91d1-48ce-8f01-15123ca1c7ed', '93376a07-c044-4281-a76e-ad27cf6973c5'], + "arr_arr_bool": [[True, False, True]] + }) + + simple_mongo_table.insert_many(data) + + node = started_cluster.instances["node"] + node.query( + "CREATE TABLE simple_mongo_table(" + "key UInt64," + "arr_int64 Array(Int64)," + "arr_int32 Array(Int32)," + "arr_int16 Array(Int16)," + "arr_int8 Array(Int8)," + "arr_uint64 Array(UInt64)," + "arr_uint32 Array(UInt32)," + "arr_uint16 Array(UInt16)," + "arr_uint8 Array(UInt8)," + "arr_float32 Array(Float32)," + "arr_float64 Array(Float64)," + "arr_date Array(Date)," + "arr_datetime Array(DateTime)," + "arr_string Array(String)," + "arr_uuid Array(UUID)," + "arr_arr_bool Array(Array(Bool))" + ") ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')" + ) + + assert node.query("SELECT COUNT() FROM simple_mongo_table") == "100\n" + + for column_name in ["arr_int64", "arr_int32", "arr_int16", "arr_int8"]: + assert ( + node.query(f"SELECT {column_name} from simple_mongo_table where key = 42") + == "[-43,-44,-45]\n" + ) + + for column_name in ["arr_uint64", "arr_uint32", "arr_uint16", "arr_uint8"]: + assert ( + node.query(f"SELECT {column_name} from simple_mongo_table where key = 42") + == "[43,44,45]\n" + ) + + for column_name in ["arr_float32", "arr_float64"]: + assert ( + node.query(f"SELECT {column_name} from simple_mongo_table where key = 42") + == "[43,44,45]\n" + ) + + node.query("DROP TABLE simple_mongo_table") + simple_mongo_table.drop() + + @pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"]) def test_complex_data_type(started_cluster): mongo_connection = get_mongo_connection(started_cluster) From 2d0812e3c745c1c589a489c824db2f21785733fb Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 24 Apr 2023 14:55:31 +0000 Subject: [PATCH 07/52] Refactor ColumnLowCardinality::cutAndCompact to avoid calling IColumn::assumeMutable. --- src/Columns/ColumnLowCardinality.cpp | 27 ++++++++++++--------------- src/Columns/ColumnLowCardinality.h | 8 +++++--- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/Columns/ColumnLowCardinality.cpp b/src/Columns/ColumnLowCardinality.cpp index 11d02b023d6..4f9ab8215be 100644 --- a/src/Columns/ColumnLowCardinality.cpp +++ b/src/Columns/ColumnLowCardinality.cpp @@ -485,13 +485,8 @@ void ColumnLowCardinality::setSharedDictionary(const ColumnPtr & column_unique) ColumnLowCardinality::MutablePtr ColumnLowCardinality::cutAndCompact(size_t start, size_t length) const { auto sub_positions = IColumn::mutate(idx.getPositions()->cut(start, length)); - /// Create column with new indexes and old dictionary. - /// Dictionary is shared, but will be recreated after compactInplace call. - auto column = ColumnLowCardinality::create(getDictionary().assumeMutable(), std::move(sub_positions)); - /// Will create new dictionary. - column->compactInplace(); - - return column; + auto new_column_unique = Dictionary::compact(dictionary.getColumnUnique(), sub_positions); + return ColumnLowCardinality::create(std::move(new_column_unique), std::move(sub_positions)); } void ColumnLowCardinality::compactInplace() @@ -589,7 +584,7 @@ size_t ColumnLowCardinality::Index::getSizeOfIndexType(const IColumn & column, s column.getName()); } -void ColumnLowCardinality::Index::attachPositions(ColumnPtr positions_) +void ColumnLowCardinality::Index::attachPositions(MutableColumnPtr positions_) { positions = std::move(positions_); updateSizeOfType(); @@ -820,21 +815,23 @@ void ColumnLowCardinality::Dictionary::setShared(const ColumnPtr & column_unique shared = true; } -void ColumnLowCardinality::Dictionary::compact(ColumnPtr & positions) +void ColumnLowCardinality::Dictionary::compact(MutableColumnPtr & positions) { - auto new_column_unique = column_unique->cloneEmpty(); + column_unique = compact(getColumnUnique(), positions); + shared = false; +} - auto & unique = getColumnUnique(); +MutableColumnPtr ColumnLowCardinality::Dictionary::compact(const IColumnUnique & unique, MutableColumnPtr & positions) +{ + auto new_column_unique = unique.cloneEmpty(); auto & new_unique = static_cast(*new_column_unique); - auto indexes = mapUniqueIndex(positions->assumeMutableRef()); + auto indexes = mapUniqueIndex(*positions); auto sub_keys = unique.getNestedColumn()->index(*indexes, 0); auto new_indexes = new_unique.uniqueInsertRangeFrom(*sub_keys, 0, sub_keys->size()); positions = IColumn::mutate(new_indexes->index(*positions, 0)); - column_unique = std::move(new_column_unique); - - shared = false; + return new_column_unique; } ColumnPtr ColumnLowCardinality::cloneWithDefaultOnNull() const diff --git a/src/Columns/ColumnLowCardinality.h b/src/Columns/ColumnLowCardinality.h index e7f4b92d733..df707039b03 100644 --- a/src/Columns/ColumnLowCardinality.h +++ b/src/Columns/ColumnLowCardinality.h @@ -301,8 +301,8 @@ public: void checkSizeOfType(); - ColumnPtr detachPositions() { return std::move(positions); } - void attachPositions(ColumnPtr positions_); + MutableColumnPtr detachPositions() { return IColumn::mutate(std::move(positions)); } + void attachPositions(MutableColumnPtr positions_); void countKeys(ColumnUInt64::Container & counts) const; @@ -350,7 +350,9 @@ private: bool isShared() const { return shared; } /// Create new dictionary with only keys that are mentioned in positions. - void compact(ColumnPtr & positions); + void compact(MutableColumnPtr & positions); + + static MutableColumnPtr compact(const IColumnUnique & column_unique, MutableColumnPtr & positions); private: WrappedPtr column_unique; From 38622d07703804570643cc3ab0b5c764efbf675c Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Mon, 24 Apr 2023 18:21:49 +0000 Subject: [PATCH 08/52] add settings to delay or throw in case of too many mutations --- src/Common/ErrorCodes.cpp | 1 + src/Common/ProfileEvents.cpp | 3 + src/Core/Settings.h | 2 + src/Storages/MergeTree/MergeTreeData.cpp | 49 +++++++++++++ src/Storages/MergeTree/MergeTreeData.h | 9 ++- src/Storages/MergeTree/MergeTreeSettings.h | 4 ++ .../MergeTree/ReplicatedMergeTreeQueue.cpp | 20 ++++-- .../MergeTree/ReplicatedMergeTreeQueue.h | 2 + src/Storages/StorageMergeTree.cpp | 27 +++++++- src/Storages/StorageMergeTree.h | 2 + src/Storages/StorageReplicatedMergeTree.cpp | 12 +++- src/Storages/StorageReplicatedMergeTree.h | 2 + .../02724_delay_mutations.reference | 8 +++ .../0_stateless/02724_delay_mutations.sh | 59 ++++++++++++++++ .../02724_limit_num_mutations.reference | 9 +++ .../0_stateless/02724_limit_num_mutations.sh | 69 +++++++++++++++++++ 16 files changed, 269 insertions(+), 9 deletions(-) create mode 100644 tests/queries/0_stateless/02724_delay_mutations.reference create mode 100755 tests/queries/0_stateless/02724_delay_mutations.sh create mode 100644 tests/queries/0_stateless/02724_limit_num_mutations.reference create mode 100755 tests/queries/0_stateless/02724_limit_num_mutations.sh diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 9abf3bba8ff..d570eab8f18 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -650,6 +650,7 @@ M(679, IO_URING_SUBMIT_ERROR) \ M(690, MIXED_ACCESS_PARAMETER_TYPES) \ M(691, UNKNOWN_ELEMENT_OF_ENUM) \ + M(692, TOO_MANY_MUTATIONS) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index da096085d5b..387eafdc145 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -102,6 +102,9 @@ M(DelayedInserts, "Number of times the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \ M(RejectedInserts, "Number of times the INSERT of a block to a MergeTree table was rejected with 'Too many parts' exception due to high number of active data parts for partition.") \ M(DelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \ + M(DelayedMutations, "Number of times the mutation of a MergeTree table was throttled due to high number of unfinished mutations for table.") \ + M(RejectedMutations, "Number of times the mutation of a MergeTree table was rejected with 'Too many mutations' exception due to high number of unfinished mutations for table.") \ + M(DelayedMutationsMilliseconds, "Total number of milliseconds spent while the mutation of a MergeTree table was throttled due to high number of unfinished mutations for table.") \ M(DistributedDelayedInserts, "Number of times the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \ M(DistributedRejectedInserts, "Number of times the INSERT of a block to a Distributed table was rejected with 'Too many bytes' exception due to high number of pending bytes.") \ M(DistributedDelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 26409e98763..8fd2af5fa23 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -275,6 +275,8 @@ class IColumn; \ M(UInt64, parts_to_delay_insert, 0, "If the destination table contains at least that many active parts in a single partition, artificially slow down insert into table.", 0) \ M(UInt64, parts_to_throw_insert, 0, "If more than this number active parts in a single partition of the destination table, throw 'Too many parts ...' exception.", 0) \ + M(UInt64, number_of_mutations_to_delay, 0, "If the mutated table contains at least that many unfinished mutations, artificially slow down mutations of table. 0 - disabled", 0) \ + M(UInt64, number_of_mutations_to_throw, 0, "If the mutated table contains at least that many unfinished mutations, throw 'Too many mutations ...' exception. 0 - disabled", 0) \ M(Bool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.", 0) \ M(UInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) \ M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \ diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index f5f12660223..e9e3548f66f 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -114,6 +114,9 @@ namespace ProfileEvents extern const Event MergedIntoWideParts; extern const Event MergedIntoCompactParts; extern const Event MergedIntoInMemoryParts; + extern const Event RejectedMutations; + extern const Event DelayedMutations; + extern const Event DelayedMutationsMilliseconds; } namespace CurrentMetrics @@ -171,6 +174,7 @@ namespace ErrorCodes extern const int SERIALIZATION_ERROR; extern const int NETWORK_ERROR; extern const int SOCKET_TIMEOUT; + extern const int TOO_MANY_MUTATIONS; } @@ -4296,6 +4300,51 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(delay_milliseconds))); } +void MergeTreeData::delayMutationOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const +{ + const auto settings = getSettings(); + const auto & query_settings = query_context->getSettingsRef(); + + size_t num_mutations_to_delay = query_settings.number_of_mutations_to_delay + ? query_settings.number_of_mutations_to_delay + : settings->number_of_mutations_to_delay; + + size_t num_mutations_to_throw = query_settings.number_of_mutations_to_throw + ? query_settings.number_of_mutations_to_throw + : settings->number_of_mutations_to_throw; + + if (!num_mutations_to_delay && !num_mutations_to_throw) + return; + + size_t num_unfinished_mutations = getNumberOfUnfinishedMutations(); + if (num_mutations_to_throw && num_unfinished_mutations >= num_mutations_to_throw) + { + ProfileEvents::increment(ProfileEvents::RejectedMutations); + throw Exception(ErrorCodes::TOO_MANY_MUTATIONS, + "Too many unfinished mutations ({}) in table {}", + num_unfinished_mutations, getLogName()); + } + + if (num_mutations_to_delay && num_unfinished_mutations >= num_mutations_to_delay) + { + if (!num_mutations_to_throw) + num_mutations_to_throw = num_mutations_to_delay * 2; + + size_t mutations_over_threshold = num_unfinished_mutations - num_mutations_to_delay; + size_t allowed_mutations_over_threshold = num_mutations_to_throw - num_mutations_to_delay; + + double delay_factor = std::min(static_cast(mutations_over_threshold) / allowed_mutations_over_threshold, 1.0); + size_t delay_milliseconds = static_cast(std::lerp(settings->min_delay_to_mutate_ms, settings->max_delay_to_mutate_ms, delay_factor)); + + ProfileEvents::increment(ProfileEvents::DelayedMutations); + ProfileEvents::increment(ProfileEvents::DelayedMutationsMilliseconds, delay_milliseconds); + + if (until) + until->tryWait(delay_milliseconds); + else + std::this_thread::sleep_for(std::chrono::milliseconds(delay_milliseconds)); + } +} MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart( const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index b03b7d4a71e..cc5deb7c786 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -540,7 +540,6 @@ public: /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition. std::optional getMinPartDataVersion() const; - /// Returns all detached parts DetachedPartsInfo getDetachedParts() const; @@ -551,11 +550,17 @@ public: MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part, ContextPtr context, PartsTemporaryRename & renamed_parts); - /// If the table contains too many active parts, sleep for a while to give them time to merge. /// If until is non-null, wake up from the sleep earlier if the event happened. void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const; + /// If the table contains too many unfinished mutations, sleep for a while to give them time to execute. + /// If until is non-null, wake up from the sleep earlier if the event happened. + void delayMutationOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const; + + /// Returns number of unfinished mutations (is_done = 0). + virtual size_t getNumberOfUnfinishedMutations() const = 0; + /// Renames temporary part to a permanent part and adds it to the parts set. /// It is assumed that the part does not intersect with existing parts. /// Adds the part in the PreActive state (the part will be added to the active set later with out_transaction->commit()). diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index ad55c9d47f3..b7b94359ccf 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -83,6 +83,10 @@ struct Settings; M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \ M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \ M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \ + M(UInt64, number_of_mutations_to_delay, 0, "If table has at least that many unfinished mutations, artificially slow down mutations of table. Disabled if set to 0", 0) \ + M(UInt64, number_of_mutations_to_throw, 0, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \ + M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \ + M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \ \ /* Part removal settings. */ \ M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \ diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 1006bd5ab49..1762c7aabe9 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1727,18 +1727,30 @@ size_t ReplicatedMergeTreeQueue::countMutations() const return mutations_by_znode.size(); } - size_t ReplicatedMergeTreeQueue::countFinishedMutations() const { std::lock_guard lock(state_mutex); size_t count = 0; - for (const auto & pair : mutations_by_znode) + for (const auto & [_, status] : mutations_by_znode) { - const auto & mutation = pair.second; - if (!mutation.is_done) + if (!status.is_done) break; + ++count; + } + return count; +} + +size_t ReplicatedMergeTreeQueue::countUnfinishedMutations() const +{ + std::lock_guard lock(state_mutex); + + size_t count = 0; + for (const auto & [_, status] : mutations_by_znode | std::views::reverse) + { + if (status.is_done) + break; ++count; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 72796ddd4eb..368f2d4bc1f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -386,6 +386,8 @@ public: /// Count the total number of active mutations that are finished (is_done = true). size_t countFinishedMutations() const; + /// Count the total number of active mutations that are not finished (is_done = false). + size_t countUnfinishedMutations() const; /// Returns functor which used by MergeTreeMergerMutator to select parts for merge ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint); diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 5513603bca6..5592004d599 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -3,6 +3,7 @@ #include "Storages/MergeTree/IMergeTreeDataPart.h" #include +#include #include #include @@ -313,7 +314,11 @@ void StorageMergeTree::alter( StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); StorageInMemoryMetadata old_metadata = getInMemoryMetadata(); + auto maybe_mutation_commands = commands.getMutationCommands(new_metadata, local_context->getSettingsRef().materialize_ttl_after_modify, local_context); + if (!maybe_mutation_commands.empty()) + delayMutationOrThrowIfNeeded(nullptr, local_context); + Int64 mutation_version = -1; commands.apply(new_metadata, local_context); @@ -321,7 +326,6 @@ void StorageMergeTree::alter( if (commands.isSettingsAlter()) { changeSettings(new_metadata.settings_changes, table_lock_holder); - DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata); } else @@ -587,11 +591,12 @@ void StorageMergeTree::setMutationCSN(const String & mutation_id, CSN csn) void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context) { + delayMutationOrThrowIfNeeded(nullptr, query_context); + /// Validate partition IDs (if any) before starting mutation getPartitionIdsAffectedByCommands(commands, query_context); Int64 version = startMutation(commands, query_context); - if (query_context->getSettingsRef().mutations_sync > 0 || query_context->getCurrentTransaction()) waitForMutation(version); } @@ -1332,6 +1337,24 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign return scheduled; } +size_t StorageMergeTree::getNumberOfUnfinishedMutations() const +{ + size_t count = 0; + for (const auto & [version, _] : current_mutations_by_version | std::views::reverse) + { + auto status = getIncompleteMutationsStatus(version); + if (!status) + continue; + + if (status->is_done) + break; + + ++count; + } + + return count; +} + UInt64 StorageMergeTree::getCurrentMutationVersion( const DataPartPtr & part, std::unique_lock & /*currently_processing_in_background_mutex_lock*/) const diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 6f8acf9965a..78bd6e3f374 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -113,6 +113,8 @@ public: bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override; + size_t getNumberOfUnfinishedMutations() const override; + MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); } private: diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 9b4972ade59..7bca3cbf581 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5215,7 +5215,10 @@ void StorageReplicatedMergeTree::alter( alter_entry->create_time = time(nullptr); auto maybe_mutation_commands = commands.getMutationCommands( - *current_metadata, query_context->getSettingsRef().materialize_ttl_after_modify, query_context); + *current_metadata, + query_context->getSettingsRef().materialize_ttl_after_modify, + query_context); + bool have_mutation = !maybe_mutation_commands.empty(); alter_entry->have_mutation = have_mutation; @@ -5226,6 +5229,7 @@ void StorageReplicatedMergeTree::alter( PartitionBlockNumbersHolder partition_block_numbers_holder; if (have_mutation) { + delayMutationOrThrowIfNeeded(&partial_shutdown_event, query_context); const String mutations_path(fs::path(zookeeper_path) / "mutations"); ReplicatedMergeTreeMutationEntry mutation_entry; @@ -6406,6 +6410,8 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte /// After all needed parts are mutated (i.e. all active parts have the mutation version greater than /// the version of this mutation), the mutation is considered done and can be deleted. + delayMutationOrThrowIfNeeded(&partial_shutdown_event, query_context); + ReplicatedMergeTreeMutationEntry mutation_entry; mutation_entry.source_replica = replica_name; mutation_entry.commands = commands; @@ -8036,6 +8042,10 @@ String StorageReplicatedMergeTree::getTableSharedID() const return toString(table_shared_id); } +size_t StorageReplicatedMergeTree::getNumberOfUnfinishedMutations() const +{ + return queue.countUnfinishedMutations(); +} void StorageReplicatedMergeTree::createTableSharedID() const { diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index ade4e4f0b4b..e81be299144 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -311,6 +311,8 @@ public: // Return table id, common for different replicas String getTableSharedID() const override; + size_t getNumberOfUnfinishedMutations() const override; + /// Returns the same as getTableSharedID(), but extracts it from a create query. static std::optional tryGetTableSharedIDFromCreateQuery(const IAST & create_query, const ContextPtr & global_context); diff --git a/tests/queries/0_stateless/02724_delay_mutations.reference b/tests/queries/0_stateless/02724_delay_mutations.reference new file mode 100644 index 00000000000..16bd972a06d --- /dev/null +++ b/tests/queries/0_stateless/02724_delay_mutations.reference @@ -0,0 +1,8 @@ +1 2 +4 +1 6 +0 +ALTER TABLE t_delay_mutations UPDATE v = 3 WHERE 1; 0 0 +ALTER TABLE t_delay_mutations UPDATE v = 4 WHERE 1; 0 0 +ALTER TABLE t_delay_mutations UPDATE v = 5 WHERE 1; 1 1 +ALTER TABLE t_delay_mutations UPDATE v = 6 WHERE 1; 1 1 diff --git a/tests/queries/0_stateless/02724_delay_mutations.sh b/tests/queries/0_stateless/02724_delay_mutations.sh new file mode 100755 index 00000000000..f349e29253a --- /dev/null +++ b/tests/queries/0_stateless/02724_delay_mutations.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# shellcheck source=./mergetree_mutations.lib +. "$CURDIR"/mergetree_mutations.lib + +${CLICKHOUSE_CLIENT} -n --query " +DROP TABLE IF EXISTS t_delay_mutations SYNC; + +CREATE TABLE t_delay_mutations (id UInt64, v UInt64) +ENGINE = MergeTree ORDER BY id +SETTINGS + number_of_mutations_to_delay = 2, + number_of_mutations_to_throw = 10, + min_delay_to_mutate_ms = 10, + min_delay_to_mutate_ms = 1000; + +SET mutations_sync = 0; +SYSTEM STOP MERGES t_delay_mutations; + +INSERT INTO t_delay_mutations VALUES (1, 2); + +ALTER TABLE t_delay_mutations UPDATE v = 3 WHERE 1; +ALTER TABLE t_delay_mutations UPDATE v = 4 WHERE 1; + +ALTER TABLE t_delay_mutations UPDATE v = 5 WHERE 1; +ALTER TABLE t_delay_mutations UPDATE v = 6 WHERE 1; + +SELECT * FROM t_delay_mutations ORDER BY id; +SELECT count() FROM system.mutations WHERE database = currentDatabase() AND table = 't_delay_mutations' AND NOT is_done; +" + +${CLICKHOUSE_CLIENT} --query "SYSTEM START MERGES t_delay_mutations" +wait_for_mutation "t_delay_mutations" "mutation_5.txt" + +${CLICKHOUSE_CLIENT} -n --query " +SELECT * FROM t_delay_mutations ORDER BY id; +SELECT count() FROM system.mutations WHERE database = currentDatabase() AND table = 't_delay_mutations' AND NOT is_done; + +DROP TABLE IF EXISTS t_delay_mutations SYNC; +" + +${CLICKHOUSE_CLIENT} -n --query " +SYSTEM FLUSH LOGS; + +SELECT + query, + ProfileEvents['DelayedMutations'], + ProfileEvents['DelayedMutationsMilliseconds'] BETWEEN 10 AND 1000 +FROM system.query_log +WHERE + type = 'QueryFinish' AND + current_database = '$CLICKHOUSE_DATABASE' AND + query ILIKE 'ALTER TABLE t_delay_mutations UPDATE%' +ORDER BY query; +" diff --git a/tests/queries/0_stateless/02724_limit_num_mutations.reference b/tests/queries/0_stateless/02724_limit_num_mutations.reference new file mode 100644 index 00000000000..ecd1ce23ca2 --- /dev/null +++ b/tests/queries/0_stateless/02724_limit_num_mutations.reference @@ -0,0 +1,9 @@ +1 2 +2 +CREATE TABLE default.t_limit_mutations\n(\n `id` UInt64,\n `v` UInt64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/t_limit_mutations/\', \'1\')\nORDER BY id\nSETTINGS number_of_mutations_to_throw = 2, index_granularity = 8192 +1 2 +4 +CREATE TABLE default.t_limit_mutations\n(\n `id` UInt64,\n `v` String\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/t_limit_mutations/\', \'1\')\nORDER BY id\nSETTINGS number_of_mutations_to_throw = 2, index_granularity = 8192 +1 6 +0 +CREATE TABLE default.t_limit_mutations\n(\n `id` UInt64,\n `v` String\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/t_limit_mutations/\', \'1\')\nORDER BY id\nSETTINGS number_of_mutations_to_throw = 2, index_granularity = 8192 diff --git a/tests/queries/0_stateless/02724_limit_num_mutations.sh b/tests/queries/0_stateless/02724_limit_num_mutations.sh new file mode 100755 index 00000000000..98bfdbbb551 --- /dev/null +++ b/tests/queries/0_stateless/02724_limit_num_mutations.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# shellcheck source=./mergetree_mutations.lib +. "$CURDIR"/mergetree_mutations.lib + +function wait_for_alter() +{ + type=$1 + for i in {1..100}; do + sleep 0.1 + ${CLICKHOUSE_CLIENT} --query "SHOW CREATE TABLE t_limit_mutations" | grep -q "\`v\` $type" && break; + + if [[ $i -eq 100 ]]; then + echo "Timed out while waiting for alter to execute" + fi + done +} + +${CLICKHOUSE_CLIENT} -n --query " +DROP TABLE IF EXISTS t_limit_mutations SYNC; + +CREATE TABLE t_limit_mutations (id UInt64, v UInt64) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/t_limit_mutations/', '1') ORDER BY id +SETTINGS number_of_mutations_to_throw = 2; + +SET mutations_sync = 0; +SYSTEM STOP MERGES t_limit_mutations; + +INSERT INTO t_limit_mutations VALUES (1, 2); + +ALTER TABLE t_limit_mutations UPDATE v = 3 WHERE 1; +ALTER TABLE t_limit_mutations UPDATE v = 4 WHERE 1; + +ALTER TABLE t_limit_mutations UPDATE v = 5 WHERE 1; -- { serverError TOO_MANY_MUTATIONS } +ALTER TABLE t_limit_mutations MODIFY COLUMN v String; -- { serverError TOO_MANY_MUTATIONS } + +SELECT * FROM t_limit_mutations ORDER BY id; +SELECT count() FROM system.mutations WHERE database = currentDatabase() AND table = 't_limit_mutations' AND NOT is_done; +SHOW CREATE TABLE t_limit_mutations; +" + +${CLICKHOUSE_CLIENT} -n --query " +ALTER TABLE t_limit_mutations UPDATE v = 6 WHERE 1 SETTINGS number_of_mutations_to_throw = 100; +ALTER TABLE t_limit_mutations MODIFY COLUMN v String SETTINGS number_of_mutations_to_throw = 100, alter_sync = 0; +" + +wait_for_alter "String" + +${CLICKHOUSE_CLIENT} -n --query " +SELECT * FROM t_limit_mutations ORDER BY id; +SELECT count() FROM system.mutations WHERE database = currentDatabase() AND table = 't_limit_mutations' AND NOT is_done; +SHOW CREATE TABLE t_limit_mutations; +" + +${CLICKHOUSE_CLIENT} --query "SYSTEM START MERGES t_limit_mutations" + +wait_for_mutation "t_limit_mutations" "0000000003" + +${CLICKHOUSE_CLIENT} -n --query " +SELECT * FROM t_limit_mutations ORDER BY id; +SELECT count() FROM system.mutations WHERE database = currentDatabase() AND table = 't_limit_mutations' AND NOT is_done; +SHOW CREATE TABLE t_limit_mutations; + +DROP TABLE IF EXISTS t_limit_mutations SYNC; +" From 30f1bef6e8d1a2086d8ef240601cf99e0e47887e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 27 Apr 2023 16:57:42 +0300 Subject: [PATCH 09/52] Update TableFunctionS3.h --- src/TableFunctions/TableFunctionS3.h | 44 ---------------------------- 1 file changed, 44 deletions(-) diff --git a/src/TableFunctions/TableFunctionS3.h b/src/TableFunctions/TableFunctionS3.h index ed8cd3bd41a..70c4a020669 100644 --- a/src/TableFunctions/TableFunctionS3.h +++ b/src/TableFunctions/TableFunctionS3.h @@ -67,52 +67,8 @@ protected: ColumnsDescription structure_hint; }; -class TableFunctionCOS : public TableFunctionS3 -{ -public: - static constexpr auto name = "cosn"; - std::string getName() const override - { - return name; - } -private: - const char * getStorageTypeName() const override { return "COSN"; } -}; - -class TableFunctionOSS : public TableFunctionS3 -{ -public: - static constexpr auto name = "oss"; - std::string getName() const override - { - return name; - } -private: - const char * getStorageTypeName() const override { return "OSS"; } -}; - } -class TableFunctionGCS : public TableFunctionS3 -{ -public: - static constexpr auto name = "gcs"; - static constexpr auto signature = " - url\n" - " - url, format\n" - " - url, format, structure\n" - " - url, hmac_key, hmac_secret\n" - " - url, format, structure, compression_method\n" - " - url, hmac_key, hmac_secret, format\n" - " - url, hmac_key, hmac_secret, format, structure\n" - " - url, hmac_key, hmac_secret, format, structure, compression_method"; - std::string getName() const override - { - return name; - } -private: - const char * getStorageTypeName() const override { return "GCS"; } -}; - } #endif From aec8f17614371f8dc4ea161098736adafeddc70b Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 27 Apr 2023 16:58:18 +0300 Subject: [PATCH 10/52] Update TableFunctionS3.cpp --- src/TableFunctions/TableFunctionS3.cpp | 47 +++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/src/TableFunctions/TableFunctionS3.cpp b/src/TableFunctions/TableFunctionS3.cpp index 4153c6a81c9..841f5a91bc8 100644 --- a/src/TableFunctions/TableFunctionS3.cpp +++ b/src/TableFunctions/TableFunctionS3.cpp @@ -215,6 +215,48 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context } +class TableFunctionGCS : public TableFunctionS3 +{ +public: + static constexpr auto name = "gcs"; + std::string getName() const override + { + return name; + } +private: + const char * getStorageTypeName() const override { return "GCS"; } +}; + +class TableFunctionCOS : public TableFunctionS3 +{ +public: + static constexpr auto name = "cosn"; + std::string getName() const override + { + return name; + } +private: + const char * getStorageTypeName() const override { return "COSN"; } +}; + +class TableFunctionOSS : public TableFunctionS3 +{ +public: + static constexpr auto name = "oss"; + std::string getName() const override + { + return name; + } +private: + const char * getStorageTypeName() const override { return "OSS"; } +}; + + +void registerTableFunctionGCS(TableFunctionFactory & factory) +{ + factory.registerFunction(); +} + void registerTableFunctionS3(TableFunctionFactory & factory) { factory.registerFunction(); @@ -230,11 +272,6 @@ void registerTableFunctionOSS(TableFunctionFactory & factory) factory.registerFunction(); } -void registerTableFunctionGCS(TableFunctionFactory & factory) -{ - factory.registerFunction(); -} - } #endif From 536720605797f37a2870c79855a633cca3cd5faa Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 27 Apr 2023 23:24:36 +0300 Subject: [PATCH 11/52] Update TableFunctionS3.h --- src/TableFunctions/TableFunctionS3.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/TableFunctions/TableFunctionS3.h b/src/TableFunctions/TableFunctionS3.h index 70c4a020669..4724684712b 100644 --- a/src/TableFunctions/TableFunctionS3.h +++ b/src/TableFunctions/TableFunctionS3.h @@ -69,6 +69,4 @@ protected: } -} - #endif From 2da33b96eba3698dd347d7a0c5d487cfa031f85d Mon Sep 17 00:00:00 2001 From: xmy Date: Fri, 28 Apr 2023 10:31:49 +0800 Subject: [PATCH 12/52] Allow Int* type argument for groupBitAnd/GroupBitOr/groupBitXor --- .../aggregate-functions/reference/groupbitand.md | 4 ++-- .../aggregate-functions/reference/groupbitor.md | 4 ++-- .../aggregate-functions/reference/groupbitxor.md | 4 ++-- .../AggregateFunctionBitwise.cpp | 2 +- .../AggregateFunctionSequenceMatch.cpp | 2 +- .../AggregateFunctionWindowFunnel.cpp | 2 +- src/AggregateFunctions/Helpers.h | 14 ++++++++++++-- 7 files changed, 21 insertions(+), 11 deletions(-) diff --git a/docs/en/sql-reference/aggregate-functions/reference/groupbitand.md b/docs/en/sql-reference/aggregate-functions/reference/groupbitand.md index f89e3796aaa..5fd5029751a 100644 --- a/docs/en/sql-reference/aggregate-functions/reference/groupbitand.md +++ b/docs/en/sql-reference/aggregate-functions/reference/groupbitand.md @@ -13,11 +13,11 @@ groupBitAnd(expr) **Arguments** -`expr` – An expression that results in `UInt*` type. +`expr` – An expression that results in `UInt* or Int*` type. **Return value** -Value of the `UInt*` type. +Value of the `UInt* or Int*` type. **Example** diff --git a/docs/en/sql-reference/aggregate-functions/reference/groupbitor.md b/docs/en/sql-reference/aggregate-functions/reference/groupbitor.md index 75b34d9c5a3..08a5c15da46 100644 --- a/docs/en/sql-reference/aggregate-functions/reference/groupbitor.md +++ b/docs/en/sql-reference/aggregate-functions/reference/groupbitor.md @@ -13,11 +13,11 @@ groupBitOr(expr) **Arguments** -`expr` – An expression that results in `UInt*` type. +`expr` – An expression that results in `UInt* or Int*` type. **Returned value** -Value of the `UInt*` type. +Value of the `UInt* or Int*` type. **Example** diff --git a/docs/en/sql-reference/aggregate-functions/reference/groupbitxor.md b/docs/en/sql-reference/aggregate-functions/reference/groupbitxor.md index ca6fb9f8352..f33e375953c 100644 --- a/docs/en/sql-reference/aggregate-functions/reference/groupbitxor.md +++ b/docs/en/sql-reference/aggregate-functions/reference/groupbitxor.md @@ -13,11 +13,11 @@ groupBitXor(expr) **Arguments** -`expr` – An expression that results in `UInt*` type. +`expr` – An expression that results in `UInt* or Int*` type. **Return value** -Value of the `UInt*` type. +Value of the `UInt* or Int*` type. **Example** diff --git a/src/AggregateFunctions/AggregateFunctionBitwise.cpp b/src/AggregateFunctions/AggregateFunctionBitwise.cpp index b87e899a685..82cb3b327f0 100644 --- a/src/AggregateFunctions/AggregateFunctionBitwise.cpp +++ b/src/AggregateFunctions/AggregateFunctionBitwise.cpp @@ -27,7 +27,7 @@ AggregateFunctionPtr createAggregateFunctionBitwise(const std::string & name, co "is illegal, because it cannot be used in bitwise operations", argument_types[0]->getName(), name); - AggregateFunctionPtr res(createWithUnsignedIntegerType(*argument_types[0], argument_types[0])); + AggregateFunctionPtr res(createWithOptionSignedIntegerType(*argument_types[0], argument_types[0])); if (!res) throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, diff --git a/src/AggregateFunctions/AggregateFunctionSequenceMatch.cpp b/src/AggregateFunctions/AggregateFunctionSequenceMatch.cpp index 3dd9a8b658d..f2fe9014ceb 100644 --- a/src/AggregateFunctions/AggregateFunctionSequenceMatch.cpp +++ b/src/AggregateFunctions/AggregateFunctionSequenceMatch.cpp @@ -53,7 +53,7 @@ AggregateFunctionPtr createAggregateFunctionSequenceBase( String pattern = params.front().safeGet(); - AggregateFunctionPtr res(createWithUnsignedIntegerType(*argument_types[0], argument_types, params, pattern)); + AggregateFunctionPtr res(createWithOptionSignedIntegerType(*argument_types[0], argument_types, params, pattern)); if (res) return res; diff --git a/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp b/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp index d80d683fd04..71c675f7a3b 100644 --- a/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp +++ b/src/AggregateFunctions/AggregateFunctionWindowFunnel.cpp @@ -48,7 +48,7 @@ createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes & cond_arg->getName(), toString(i + 1), name); } - AggregateFunctionPtr res(createWithUnsignedIntegerType(*arguments[0], arguments, params)); + AggregateFunctionPtr res(createWithOptionSignedIntegerType(*arguments[0], arguments, params)); WhichDataType which(arguments.front().get()); if (res) return res; diff --git a/src/AggregateFunctions/Helpers.h b/src/AggregateFunctions/Helpers.h index 19904dd9215..4131279a897 100644 --- a/src/AggregateFunctions/Helpers.h +++ b/src/AggregateFunctions/Helpers.h @@ -87,8 +87,8 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ return nullptr; } -template