mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Merge branch 'master' into deprecate-in-memory-parts
This commit is contained in:
commit
75a2589a42
@ -8,7 +8,7 @@
|
||||
|
||||
/*
|
||||
* (all numbers are written in big-endian manner: the least significant digit on the right)
|
||||
* (only bit representations are used - no hex or octal, leading zeroes are ommited)
|
||||
* (only bit representations are used - no hex or octal, leading zeroes are omitted)
|
||||
*
|
||||
* Consistent hashing scheme:
|
||||
*
|
||||
|
@ -1,10 +0,0 @@
|
||||
#include <string.h>
|
||||
|
||||
int main()
|
||||
{
|
||||
// We can't test "char *p = strerror_r()" because that only causes a
|
||||
// compiler warning when strerror_r returns an integer.
|
||||
char *buf = 0;
|
||||
int i = strerror_r(0, buf, 100);
|
||||
return i;
|
||||
}
|
@ -1,46 +0,0 @@
|
||||
FUNCTION(AUTO_SOURCES RETURN_VALUE PATTERN SOURCE_SUBDIRS)
|
||||
|
||||
IF ("${SOURCE_SUBDIRS}" STREQUAL "RECURSE")
|
||||
SET(PATH ".")
|
||||
IF (${ARGC} EQUAL 4)
|
||||
LIST(GET ARGV 3 PATH)
|
||||
ENDIF ()
|
||||
ENDIF()
|
||||
|
||||
IF ("${SOURCE_SUBDIRS}" STREQUAL "RECURSE")
|
||||
UNSET(${RETURN_VALUE})
|
||||
FILE(GLOB SUBDIR_FILES "${PATH}/${PATTERN}")
|
||||
LIST(APPEND ${RETURN_VALUE} ${SUBDIR_FILES})
|
||||
|
||||
FILE(GLOB SUBDIRS RELATIVE ${PATH} ${PATH}/*)
|
||||
|
||||
FOREACH(DIR ${SUBDIRS})
|
||||
IF (IS_DIRECTORY ${PATH}/${DIR})
|
||||
IF (NOT "${DIR}" STREQUAL "CMAKEFILES")
|
||||
FILE(GLOB_RECURSE SUBDIR_FILES "${PATH}/${DIR}/${PATTERN}")
|
||||
LIST(APPEND ${RETURN_VALUE} ${SUBDIR_FILES})
|
||||
ENDIF()
|
||||
ENDIF()
|
||||
ENDFOREACH()
|
||||
ELSE ()
|
||||
FILE(GLOB ${RETURN_VALUE} "${PATTERN}")
|
||||
|
||||
FOREACH (PATH ${SOURCE_SUBDIRS})
|
||||
FILE(GLOB SUBDIR_FILES "${PATH}/${PATTERN}")
|
||||
LIST(APPEND ${RETURN_VALUE} ${SUBDIR_FILES})
|
||||
ENDFOREACH(PATH ${SOURCE_SUBDIRS})
|
||||
ENDIF ()
|
||||
|
||||
IF (${FILTER_OUT})
|
||||
LIST(REMOVE_ITEM ${RETURN_VALUE} ${FILTER_OUT})
|
||||
ENDIF()
|
||||
|
||||
SET(${RETURN_VALUE} ${${RETURN_VALUE}} PARENT_SCOPE)
|
||||
ENDFUNCTION(AUTO_SOURCES)
|
||||
|
||||
FUNCTION(CONTAINS_STRING FILE SEARCH RETURN_VALUE)
|
||||
FILE(STRINGS ${FILE} FILE_CONTENTS REGEX ".*${SEARCH}.*")
|
||||
IF (FILE_CONTENTS)
|
||||
SET(${RETURN_VALUE} TRUE PARENT_SCOPE)
|
||||
ENDIF()
|
||||
ENDFUNCTION(CONTAINS_STRING)
|
@ -1,44 +0,0 @@
|
||||
OPTION(ENABLE_SSE "enable SSE4.2 builtin function" ON)
|
||||
|
||||
INCLUDE (CheckFunctionExists)
|
||||
CHECK_FUNCTION_EXISTS(dladdr HAVE_DLADDR)
|
||||
CHECK_FUNCTION_EXISTS(nanosleep HAVE_NANOSLEEP)
|
||||
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-strict-aliasing")
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fno-strict-aliasing")
|
||||
|
||||
IF(ENABLE_SSE STREQUAL ON AND ARCH_AMD64)
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -msse4.2")
|
||||
ENDIF()
|
||||
|
||||
IF(NOT TEST_HDFS_PREFIX)
|
||||
SET(TEST_HDFS_PREFIX "./" CACHE STRING "default directory prefix used for test." FORCE)
|
||||
ENDIF(NOT TEST_HDFS_PREFIX)
|
||||
|
||||
ADD_DEFINITIONS(-DTEST_HDFS_PREFIX="${TEST_HDFS_PREFIX}")
|
||||
ADD_DEFINITIONS(-D__STDC_FORMAT_MACROS)
|
||||
ADD_DEFINITIONS(-D_GNU_SOURCE)
|
||||
ADD_DEFINITIONS(-D_GLIBCXX_USE_NANOSLEEP)
|
||||
|
||||
TRY_COMPILE(STRERROR_R_RETURN_INT
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/CMake/CMakeTestCompileStrerror.c"
|
||||
CMAKE_FLAGS "-DCMAKE_CXX_LINK_EXECUTABLE='echo not linking now...'"
|
||||
OUTPUT_VARIABLE OUTPUT)
|
||||
|
||||
MESSAGE(STATUS "Checking whether strerror_r returns an int")
|
||||
|
||||
IF(STRERROR_R_RETURN_INT)
|
||||
MESSAGE(STATUS "Checking whether strerror_r returns an int -- yes")
|
||||
ELSE(STRERROR_R_RETURN_INT)
|
||||
MESSAGE(STATUS "Checking whether strerror_r returns an int -- no")
|
||||
ENDIF(STRERROR_R_RETURN_INT)
|
||||
|
||||
set(HAVE_STEADY_CLOCK 1)
|
||||
set(HAVE_NESTED_EXCEPTION 1)
|
||||
|
||||
SET(HAVE_BOOST_CHRONO 0)
|
||||
SET(HAVE_BOOST_ATOMIC 0)
|
||||
|
||||
SET(HAVE_STD_CHRONO 1)
|
||||
SET(HAVE_STD_ATOMIC 1)
|
@ -1,42 +0,0 @@
|
||||
IF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
|
||||
SET(OS_LINUX true CACHE INTERNAL "Linux operating system")
|
||||
ELSEIF(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
|
||||
SET(OS_MACOSX true CACHE INTERNAL "Mac Darwin operating system")
|
||||
ELSE(CMAKE_SYSTEM_NAME STREQUAL "Linux")
|
||||
MESSAGE(FATAL_ERROR "Unsupported OS: \"${CMAKE_SYSTEM_NAME}\"")
|
||||
ENDIF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
|
||||
|
||||
IF(CMAKE_COMPILER_IS_GNUCXX)
|
||||
EXECUTE_PROCESS(COMMAND ${CMAKE_CXX_COMPILER} -dumpfullversion OUTPUT_VARIABLE GCC_COMPILER_VERSION)
|
||||
|
||||
IF (NOT GCC_COMPILER_VERSION)
|
||||
EXECUTE_PROCESS(COMMAND ${CMAKE_CXX_COMPILER} -dumpversion OUTPUT_VARIABLE GCC_COMPILER_VERSION)
|
||||
|
||||
IF (NOT GCC_COMPILER_VERSION)
|
||||
MESSAGE(FATAL_ERROR "Cannot get gcc version")
|
||||
ENDIF (NOT GCC_COMPILER_VERSION)
|
||||
ENDIF (NOT GCC_COMPILER_VERSION)
|
||||
|
||||
STRING(REGEX MATCHALL "[0-9]+" GCC_COMPILER_VERSION ${GCC_COMPILER_VERSION})
|
||||
|
||||
LIST(LENGTH GCC_COMPILER_VERSION GCC_COMPILER_VERSION_LENGTH)
|
||||
LIST(GET GCC_COMPILER_VERSION 0 GCC_COMPILER_VERSION_MAJOR)
|
||||
if (GCC_COMPILER_VERSION_LENGTH GREATER 1)
|
||||
LIST(GET GCC_COMPILER_VERSION 1 GCC_COMPILER_VERSION_MINOR)
|
||||
else ()
|
||||
set (GCC_COMPILER_VERSION_MINOR 0)
|
||||
endif ()
|
||||
|
||||
SET(GCC_COMPILER_VERSION_MAJOR ${GCC_COMPILER_VERSION_MAJOR} CACHE INTERNAL "gcc major version")
|
||||
SET(GCC_COMPILER_VERSION_MINOR ${GCC_COMPILER_VERSION_MINOR} CACHE INTERNAL "gcc minor version")
|
||||
|
||||
MESSAGE(STATUS "checking compiler: GCC (${GCC_COMPILER_VERSION_MAJOR}.${GCC_COMPILER_VERSION_MINOR}.${GCC_COMPILER_VERSION_PATCH})")
|
||||
ELSE(CMAKE_COMPILER_IS_GNUCXX)
|
||||
EXECUTE_PROCESS(COMMAND ${CMAKE_C_COMPILER} --version OUTPUT_VARIABLE COMPILER_OUTPUT)
|
||||
IF(COMPILER_OUTPUT MATCHES "clang")
|
||||
SET(CMAKE_COMPILER_IS_CLANG true CACHE INTERNAL "using clang as compiler")
|
||||
MESSAGE(STATUS "checking compiler: CLANG")
|
||||
ELSE(COMPILER_OUTPUT MATCHES "clang")
|
||||
MESSAGE(FATAL_ERROR "Unsupported compiler: \"${CMAKE_CXX_COMPILER}\"")
|
||||
ENDIF(COMPILER_OUTPUT MATCHES "clang")
|
||||
ENDIF(CMAKE_COMPILER_IS_GNUCXX)
|
@ -21,10 +21,17 @@ set(HDFS3_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/libhdfs3")
|
||||
set(HDFS3_SOURCE_DIR "${HDFS3_ROOT_DIR}/src")
|
||||
set(HDFS3_COMMON_DIR "${HDFS3_SOURCE_DIR}/common")
|
||||
|
||||
# module
|
||||
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH})
|
||||
include(Platform)
|
||||
include(Options)
|
||||
ADD_DEFINITIONS(-DTEST_HDFS_PREFIX="${TEST_HDFS_PREFIX}")
|
||||
ADD_DEFINITIONS(-D__STDC_FORMAT_MACROS)
|
||||
ADD_DEFINITIONS(-D_GNU_SOURCE)
|
||||
ADD_DEFINITIONS(-D_GLIBCXX_USE_NANOSLEEP)
|
||||
ADD_DEFINITIONS(-DHAVE_NANOSLEEP)
|
||||
set(HAVE_STEADY_CLOCK 1)
|
||||
set(HAVE_NESTED_EXCEPTION 1)
|
||||
SET(HAVE_BOOST_CHRONO 0)
|
||||
SET(HAVE_BOOST_ATOMIC 0)
|
||||
SET(HAVE_STD_CHRONO 1)
|
||||
SET(HAVE_STD_ATOMIC 1)
|
||||
|
||||
# source
|
||||
set(PROTO_FILES
|
||||
|
@ -148,7 +148,7 @@ Valid values:
|
||||
- `all` (default) - a universal rule, used when `rule_type` is omitted.
|
||||
- `plain` - a rule for plain metrics. The field `regexp` is processed as regular expression.
|
||||
- `tagged` - a rule for tagged metrics (metrics are stored in DB in the format of `someName?tag1=value1&tag2=value2&tag3=value3`). Regular expression must be sorted by tags' names, first tag must be `__name__` if exists. The field `regexp` is processed as regular expression.
|
||||
- `tag_list` - a rule for tagged matrics, a simple DSL for easier metric description in graphite format `someName;tag1=value1;tag2=value2`, `someName`, or `tag1=value1;tag2=value2`. The field `regexp` is translated into a `tagged` rule. The sorting by tags' names is unnecessary, ti will be done automatically. A tag's value (but not a name) can be set as a regular expression, e.g. `env=(dev|staging)`.
|
||||
- `tag_list` - a rule for tagged metrics, a simple DSL for easier metric description in graphite format `someName;tag1=value1;tag2=value2`, `someName`, or `tag1=value1;tag2=value2`. The field `regexp` is translated into a `tagged` rule. The sorting by tags' names is unnecessary, ti will be done automatically. A tag's value (but not a name) can be set as a regular expression, e.g. `env=(dev|staging)`.
|
||||
- `regexp` – A pattern for the metric name (a regular or DSL).
|
||||
- `age` – The minimum age of the data in seconds.
|
||||
- `precision`– How precisely to define the age of the data in seconds. Should be a divisor for 86400 (seconds in a day).
|
||||
|
@ -727,7 +727,7 @@ TTL d + INTERVAL 1 MONTH RECOMPRESS CODEC(ZSTD(17)), d + INTERVAL 1 YEAR RECOMPR
|
||||
SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0;
|
||||
```
|
||||
|
||||
Creating a table, where expired rows are aggregated. In result rows `x` contains the maximum value accross the grouped rows, `y` — the minimum value, and `d` — any occasional value from grouped rows.
|
||||
Creating a table, where expired rows are aggregated. In result rows `x` contains the maximum value across the grouped rows, `y` — the minimum value, and `d` — any occasional value from grouped rows.
|
||||
|
||||
``` sql
|
||||
CREATE TABLE table_for_aggregation
|
||||
|
@ -242,7 +242,7 @@ When querying a `Distributed` table, `SELECT` queries are sent to all shards and
|
||||
|
||||
When the `max_parallel_replicas` option is enabled, query processing is parallelized across all replicas within a single shard. For more information, see the section [max_parallel_replicas](../../../operations/settings/settings.md#settings-max_parallel_replicas).
|
||||
|
||||
To learn more about how distibuted `in` and `global in` queries are processed, refer to [this](../../../sql-reference/operators/in.md#select-distributed-subqueries) documentation.
|
||||
To learn more about how distributed `in` and `global in` queries are processed, refer to [this](../../../sql-reference/operators/in.md#select-distributed-subqueries) documentation.
|
||||
|
||||
## Virtual Columns {#virtual-columns}
|
||||
|
||||
|
@ -120,7 +120,7 @@ Some comments about the `sentiment` table:
|
||||
- The `TabSeparated` format means our Python script needs to generate rows of raw data that contain tab-separated values
|
||||
- The query selects two columns from `hackernews`. The Python script will need to parse out those column values from the incoming rows
|
||||
|
||||
Here is the defintion of `sentiment.py`:
|
||||
Here is the definition of `sentiment.py`:
|
||||
|
||||
```python
|
||||
#!/usr/local/bin/python3.9
|
||||
|
@ -14,7 +14,7 @@ Syntax: `URL(URL [,Format] [,CompressionMethod])`
|
||||
|
||||
- The `Format` must be one that ClickHouse can use in `SELECT` queries and, if necessary, in `INSERTs`. For the full list of supported formats, see [Formats](../../../interfaces/formats.md#formats).
|
||||
|
||||
If this argument is not specified, ClickHouse detectes the format automatically from the suffix of the `URL` parameter. If the suffix of `URL` parameter does not match any supported formats, it fails to create table. For example, for engine expression `URL('http://localhost/test.json')`, `JSON` format is applied.
|
||||
If this argument is not specified, ClickHouse detects the format automatically from the suffix of the `URL` parameter. If the suffix of `URL` parameter does not match any supported formats, it fails to create table. For example, for engine expression `URL('http://localhost/test.json')`, `JSON` format is applied.
|
||||
|
||||
- `CompressionMethod` indicates that whether the HTTP body should be compressed. If the compression is enabled, the HTTP packets sent by the URL engine contain 'Content-Encoding' header to indicate which compression method is used.
|
||||
|
||||
|
@ -308,7 +308,7 @@ To build a Superset dashboard using the OpenCelliD dataset you should:
|
||||
![Choose clickhouse connect as database type](@site/docs/en/getting-started/example-datasets/images/superset-choose-a-database.png)
|
||||
|
||||
:::note
|
||||
If **ClickHouse Connect** is not one of your options, then you will need to install it. The comand is `pip install clickhouse-connect`, and more info is [available here](https://pypi.org/project/clickhouse-connect/).
|
||||
If **ClickHouse Connect** is not one of your options, then you will need to install it. The command is `pip install clickhouse-connect`, and more info is [available here](https://pypi.org/project/clickhouse-connect/).
|
||||
:::
|
||||
|
||||
#### Add your connection details:
|
||||
|
@ -261,5 +261,5 @@ The results look like
|
||||
```
|
||||
|
||||
:::note
|
||||
As mentioned in the [GitHub repo](https://github.com/GoogleCloudPlatform/covid-19-open-data), the datset is no longer updated as of September 15, 2022.
|
||||
As mentioned in the [GitHub repo](https://github.com/GoogleCloudPlatform/covid-19-open-data), the dataset is no longer updated as of September 15, 2022.
|
||||
:::
|
@ -208,7 +208,7 @@ Default value: `3600` (1 hour).
|
||||
## database_catalog_unused_dir_rm_timeout_sec {#database_catalog_unused_dir_rm_timeout_sec}
|
||||
|
||||
Parameter of a task that cleans up garbage from `store/` directory.
|
||||
If some subdirectory is not used by clickhouse-server and it was previousely "hidden"
|
||||
If some subdirectory is not used by clickhouse-server and it was previously "hidden"
|
||||
(see [database_catalog_unused_dir_hide_timeout_sec](../../operations/server-configuration-parameters/settings.md#database_catalog_unused_dir_hide_timeout_sec))
|
||||
and this directory was not modified for last
|
||||
`database_catalog_unused_dir_rm_timeout_sec` seconds, the task will remove this directory.
|
||||
|
@ -1027,7 +1027,7 @@ Timeout to close idle TCP connections after specified number of seconds.
|
||||
|
||||
Possible values:
|
||||
|
||||
- Positive integer (0 - close immediatly, after 0 seconds).
|
||||
- Positive integer (0 - close immediately, after 0 seconds).
|
||||
|
||||
Default value: 3600.
|
||||
|
||||
@ -1733,7 +1733,7 @@ Possible values:
|
||||
|
||||
Default value: 1.
|
||||
|
||||
By default, async inserts are inserted into replicated tables by the `INSERT` statement enabling [async_isnert](#async-insert) are deduplicated (see [Data Replication](../../engines/table-engines/mergetree-family/replication.md)).
|
||||
By default, async inserts are inserted into replicated tables by the `INSERT` statement enabling [async_insert](#async-insert) are deduplicated (see [Data Replication](../../engines/table-engines/mergetree-family/replication.md)).
|
||||
For the replicated tables, by default, only 10000 of the most recent inserts for each partition are deduplicated (see [replicated_deduplication_window_for_async_inserts](merge-tree-settings.md/#replicated-deduplication-window-async-inserts), [replicated_deduplication_window_seconds_for_async_inserts](merge-tree-settings.md/#replicated-deduplication-window-seconds-async-inserts)).
|
||||
We recommend enabling the [async_block_ids_cache](merge-tree-settings.md/#use-async-block-ids-cache) to increase the efficiency of deduplication.
|
||||
This function does not work for non-replicated tables.
|
||||
@ -1939,8 +1939,8 @@ Do not merge aggregation states from different servers for distributed query pro
|
||||
Possible values:
|
||||
|
||||
- `0` — Disabled (final query processing is done on the initiator node).
|
||||
- `1` - Do not merge aggregation states from different servers for distributed query processing (query completelly processed on the shard, initiator only proxy the data), can be used in case it is for certain that there are different keys on different shards.
|
||||
- `2` - Same as `1` but applies `ORDER BY` and `LIMIT` (it is not possible when the query processed completelly on the remote node, like for `distributed_group_by_no_merge=1`) on the initiator (can be used for queries with `ORDER BY` and/or `LIMIT`).
|
||||
- `1` - Do not merge aggregation states from different servers for distributed query processing (query completely processed on the shard, initiator only proxy the data), can be used in case it is for certain that there are different keys on different shards.
|
||||
- `2` - Same as `1` but applies `ORDER BY` and `LIMIT` (it is not possible when the query processed completely on the remote node, like for `distributed_group_by_no_merge=1`) on the initiator (can be used for queries with `ORDER BY` and/or `LIMIT`).
|
||||
|
||||
Default value: `0`
|
||||
|
||||
@ -4110,7 +4110,7 @@ Enabled by default.
|
||||
|
||||
## use_hedged_requests {#use_hedged_requests}
|
||||
|
||||
Enables hadged requests logic for remote queries. It allows to establish many connections with different replicas for query.
|
||||
Enables hedged requests logic for remote queries. It allows to establish many connections with different replicas for query.
|
||||
New connection is enabled in case existent connection(s) with replica(s) were not established within `hedged_connection_timeout`
|
||||
or no data was received within `receive_data_timeout`. Query uses the first connection which send non empty progress packet (or data packet, if `allow_changing_replica_until_first_data_packet`);
|
||||
other connections are cancelled. Queries with `max_parallel_replicas > 1` are supported.
|
||||
|
@ -183,7 +183,7 @@ Arguments:
|
||||
- `-S`, `--structure` — table structure for input data.
|
||||
- `--input-format` — input format, `TSV` by default.
|
||||
- `-f`, `--file` — path to data, `stdin` by default.
|
||||
- `-q`, `--query` — queries to execute with `;` as delimeter. You must specify either `query` or `queries-file` option.
|
||||
- `-q`, `--query` — queries to execute with `;` as delimiter. You must specify either `query` or `queries-file` option.
|
||||
- `--queries-file` - file path with queries to execute. You must specify either `query` or `queries-file` option.
|
||||
- `-N`, `--table` — table name where to put output data, `table` by default.
|
||||
- `--format`, `--output-format` — output format, `TSV` by default.
|
||||
|
@ -13,11 +13,11 @@ groupBitAnd(expr)
|
||||
|
||||
**Arguments**
|
||||
|
||||
`expr` – An expression that results in `UInt*` type.
|
||||
`expr` – An expression that results in `UInt*` or `Int*` type.
|
||||
|
||||
**Return value**
|
||||
|
||||
Value of the `UInt*` type.
|
||||
Value of the `UInt*` or `Int*` type.
|
||||
|
||||
**Example**
|
||||
|
||||
|
@ -13,11 +13,11 @@ groupBitOr(expr)
|
||||
|
||||
**Arguments**
|
||||
|
||||
`expr` – An expression that results in `UInt*` type.
|
||||
`expr` – An expression that results in `UInt*` or `Int*` type.
|
||||
|
||||
**Returned value**
|
||||
|
||||
Value of the `UInt*` type.
|
||||
Value of the `UInt*` or `Int*` type.
|
||||
|
||||
**Example**
|
||||
|
||||
|
@ -13,11 +13,11 @@ groupBitXor(expr)
|
||||
|
||||
**Arguments**
|
||||
|
||||
`expr` – An expression that results in `UInt*` type.
|
||||
`expr` – An expression that results in `UInt*` or `Int*` type.
|
||||
|
||||
**Return value**
|
||||
|
||||
Value of the `UInt*` type.
|
||||
Value of the `UInt*` or `Int*` type.
|
||||
|
||||
**Example**
|
||||
|
||||
|
@ -23,7 +23,7 @@ Alias: `medianDeterministic`.
|
||||
|
||||
- `level` — Level of quantile. Optional parameter. Constant floating-point number from 0 to 1. We recommend using a `level` value in the range of `[0.01, 0.99]`. Default value: 0.5. At `level=0.5` the function calculates [median](https://en.wikipedia.org/wiki/Median).
|
||||
- `expr` — Expression over the column values resulting in numeric [data types](../../../sql-reference/data-types/index.md#data_types), [Date](../../../sql-reference/data-types/date.md) or [DateTime](../../../sql-reference/data-types/datetime.md).
|
||||
- `determinator` — Number whose hash is used instead of a random number generator in the reservoir sampling algorithm to make the result of sampling deterministic. As a determinator you can use any deterministic positive number, for example, a user id or an event id. If the same determinator value occures too often, the function works incorrectly.
|
||||
- `determinator` — Number whose hash is used instead of a random number generator in the reservoir sampling algorithm to make the result of sampling deterministic. As a determinator you can use any deterministic positive number, for example, a user id or an event id. If the same determinator value occurs too often, the function works incorrectly.
|
||||
|
||||
**Returned value**
|
||||
|
||||
|
@ -949,7 +949,7 @@ SOURCE(ODBC(... invalidate_query 'SELECT update_time FROM dictionary_source wher
|
||||
...
|
||||
```
|
||||
|
||||
For `Cache`, `ComplexKeyCache`, `SSDCache`, and `SSDComplexKeyCache` dictionaries both synchronious and asynchronious updates are supported.
|
||||
For `Cache`, `ComplexKeyCache`, `SSDCache`, and `SSDComplexKeyCache` dictionaries both synchronious and asynchronous updates are supported.
|
||||
|
||||
It is also possible for `Flat`, `Hashed`, `ComplexKeyHashed` dictionaries to only request data that was changed after the previous update. If `update_field` is specified as part of the dictionary source configuration, value of the previous update time in seconds will be added to the data request. Depends on source type (Executable, HTTP, MySQL, PostgreSQL, ClickHouse, or ODBC) different logic will be applied to `update_field` before request data from an external source.
|
||||
|
||||
|
@ -314,7 +314,7 @@ SELECT bitTestAny(number, index1, index2, index3, index4, ...)
|
||||
|
||||
**Returned values**
|
||||
|
||||
Returns result of logical disjuction.
|
||||
Returns result of logical disjunction.
|
||||
|
||||
Type: `UInt8`.
|
||||
|
||||
|
@ -256,7 +256,7 @@ Result:
|
||||
|
||||
## bitmapCardinality
|
||||
|
||||
Rerturn the cardinality of a bitmap.
|
||||
Returns the cardinality of a bitmap.
|
||||
|
||||
**Syntax**
|
||||
|
||||
|
@ -14,7 +14,7 @@ The following types can be compared:
|
||||
- dates
|
||||
- dates with times
|
||||
|
||||
Only values within the same group can be compared (e.g. UInt16 and UInt64) but not accross groups (e.g. UInt16 and DateTime).
|
||||
Only values within the same group can be compared (e.g. UInt16 and UInt64) but not across groups (e.g. UInt16 and DateTime).
|
||||
|
||||
Strings are compared byte-by-byte. Note that this may lead to unexpected results if one of the strings contains UTF-8 encoded multi-byte characters.
|
||||
|
||||
|
@ -289,7 +289,7 @@ Aliases: `DAYOFMONTH`, `DAY`.
|
||||
|
||||
Converts a date or date with time to the number of the day in the week as UInt8 value.
|
||||
|
||||
The two-argument form of `toDayOfWeek()` enables you to specify whether the week starts on Monday or Sunday, and whether the return value should be in the range from 0 to 6 or 1 to 7. If the mode argument is ommited, the default mode is 0. The time zone of the date can be specified as the third argument.
|
||||
The two-argument form of `toDayOfWeek()` enables you to specify whether the week starts on Monday or Sunday, and whether the return value should be in the range from 0 to 6 or 1 to 7. If the mode argument is omitted, the default mode is 0. The time zone of the date can be specified as the third argument.
|
||||
|
||||
| Mode | First day of week | Range |
|
||||
|------|-------------------|------------------------------------------------|
|
||||
|
@ -84,7 +84,7 @@ Result:
|
||||
|
||||
## s2GetNeighbors
|
||||
|
||||
Returns S2 neighbor indixes corresponding to the provided [S2](#s2index). Each cell in the S2 system is a quadrilateral bounded by four geodesics. So, each cell has 4 neighbors.
|
||||
Returns S2 neighbor indexes corresponding to the provided [S2](#s2index). Each cell in the S2 system is a quadrilateral bounded by four geodesics. So, each cell has 4 neighbors.
|
||||
|
||||
**Syntax**
|
||||
|
||||
@ -206,7 +206,7 @@ s2CapUnion(center1, radius1, center2, radius2)
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `center1`, `center2` — S2 point indixes corresponding to the two input caps. [UInt64](../../../sql-reference/data-types/int-uint.md).
|
||||
- `center1`, `center2` — S2 point indexes corresponding to the two input caps. [UInt64](../../../sql-reference/data-types/int-uint.md).
|
||||
- `radius1`, `radius2` — Radius of the two input caps in degrees. [Float64](../../../sql-reference/data-types/float.md).
|
||||
|
||||
**Returned values**
|
||||
|
@ -64,7 +64,7 @@ This is a cryptographic hash function. It works at least three times faster than
|
||||
The function [interprets](/docs/en/sql-reference/functions/type-conversion-functions.md/#type_conversion_functions-reinterpretAsString) all the input parameters as strings and calculates the hash value for each of them. It then combines the hashes by the following algorithm:
|
||||
|
||||
1. The first and the second hash value are concatenated to an array which is hashed.
|
||||
2. The previously calculated hash value and the hash of the third input paramter are hashed in a similar way.
|
||||
2. The previously calculated hash value and the hash of the third input parameter are hashed in a similar way.
|
||||
3. This calculation is repeated for all remaining hash values of the original input.
|
||||
|
||||
**Arguments**
|
||||
|
@ -84,7 +84,7 @@ Alias: The [OR Operator](../../sql-reference/operators/index.md#logical-or-opera
|
||||
|
||||
**Returned value**
|
||||
|
||||
- `1`, if at least one argument evalutes to `true`,
|
||||
- `1`, if at least one argument evaluates to `true`,
|
||||
- `0`, if all arguments evaluate to `false`,
|
||||
- `NULL`, if all arguments evaluate to `false` and at least one argument is `NULL`.
|
||||
|
||||
@ -173,7 +173,7 @@ xor(val1, val2...)
|
||||
**Returned value**
|
||||
|
||||
- `1`, for two values: if one of the values evaluates to `false` and other does not,
|
||||
- `0`, for two values: if both values evalute to `false` or to both `true`,
|
||||
- `0`, for two values: if both values evaluate to `false` or to both `true`,
|
||||
- `NULL`, if at least one of the inputs is `NULL`
|
||||
|
||||
Type: [UInt8](../../sql-reference/data-types/int-uint.md) or [Nullable](../../sql-reference/data-types/nullable.md)([UInt8](../../sql-reference/data-types/int-uint.md)).
|
||||
|
@ -187,7 +187,7 @@ detectLanguageMixed('text_to_be_analyzed')
|
||||
|
||||
**Returned value**
|
||||
|
||||
- `Map(String, Float32)`: The keys are 2-letter ISO codes and the values are a perentage of text found for that language
|
||||
- `Map(String, Float32)`: The keys are 2-letter ISO codes and the values are a percentage of text found for that language
|
||||
|
||||
|
||||
**Examples**
|
||||
|
@ -306,7 +306,7 @@ You can use this function in table engine parameters in a CREATE TABLE query whe
|
||||
|
||||
## currentUser()
|
||||
|
||||
Returns the login of current user. Login of user, that initiated query, will be returned in case distibuted query.
|
||||
Returns the login of current user. Login of user, that initiated query, will be returned in case distributed query.
|
||||
|
||||
``` sql
|
||||
SELECT currentUser();
|
||||
@ -317,7 +317,7 @@ Alias: `user()`, `USER()`.
|
||||
**Returned values**
|
||||
|
||||
- Login of current user.
|
||||
- Login of user that initiated query in case of disributed query.
|
||||
- Login of user that initiated query in case of distributed query.
|
||||
|
||||
Type: `String`.
|
||||
|
||||
|
@ -19,13 +19,13 @@ The random numbers are generated by non-cryptographic algorithms.
|
||||
|
||||
## rand, rand32
|
||||
|
||||
Returns a random UInt32 number, evenly distributed accross the range of all possible UInt32 numbers.
|
||||
Returns a random UInt32 number, evenly distributed across the range of all possible UInt32 numbers.
|
||||
|
||||
Uses a linear congruential generator.
|
||||
|
||||
## rand64
|
||||
|
||||
Returns a random UInt64 number, evenly distributed accross the range of all possible UInt64 numbers.
|
||||
Returns a random UInt64 number, evenly distributed across the range of all possible UInt64 numbers.
|
||||
|
||||
Uses a linear congruential generator.
|
||||
|
||||
|
@ -310,7 +310,7 @@ SELECT toValidUTF8('\x61\xF0\x80\x80\x80b');
|
||||
|
||||
## repeat
|
||||
|
||||
Conatenates a string as many times with itself as specified.
|
||||
Concatenates a string as many times with itself as specified.
|
||||
|
||||
**Syntax**
|
||||
|
||||
|
@ -133,7 +133,7 @@ Tuples should have the same type of the elements.
|
||||
|
||||
- The Hamming distance.
|
||||
|
||||
Type: The result type is calculed the same way it is for [Arithmetic functions](../../sql-reference/functions/arithmetic-functions.md), based on the number of elements in the input tuples.
|
||||
Type: The result type is calculated the same way it is for [Arithmetic functions](../../sql-reference/functions/arithmetic-functions.md), based on the number of elements in the input tuples.
|
||||
|
||||
``` sql
|
||||
SELECT
|
||||
@ -223,7 +223,7 @@ Result:
|
||||
└───────────────────────────────────────┘
|
||||
```
|
||||
|
||||
It is possible to transform colums to rows using this function:
|
||||
It is possible to transform columns to rows using this function:
|
||||
|
||||
``` sql
|
||||
CREATE TABLE tupletest (col Tuple(CPU Float64, Memory Float64, Disk Float64)) ENGINE = Memory;
|
||||
|
@ -449,7 +449,7 @@ mapExtractKeyLike(map, pattern)
|
||||
|
||||
**Returned value**
|
||||
|
||||
- A map contained elements the key of which matchs the specified pattern. If there are no elements matched the pattern, it will return an empty map.
|
||||
- A map contained elements the key of which matches the specified pattern. If there are no elements matched the pattern, it will return an empty map.
|
||||
|
||||
**Example**
|
||||
|
||||
|
@ -116,7 +116,7 @@ The column description can specify a default value expression in the form of `DE
|
||||
|
||||
The expression `expr` is optional. If it is omitted, the column type must be specified explicitly and the default value will be `0` for numeric columns, `''` (the empty string) for string columns, `[]` (the empty array) for array columns, `1970-01-01` for date columns, or `NULL` for nullable columns.
|
||||
|
||||
The column type of a default value column can be omitted in which case it is infered from `expr`'s type. For example the type of column `EventDate DEFAULT toDate(EventTime)` will be date.
|
||||
The column type of a default value column can be omitted in which case it is inferred from `expr`'s type. For example the type of column `EventDate DEFAULT toDate(EventTime)` will be date.
|
||||
|
||||
If both a data type and a default value expression are specified, an implicit type casting function inserted which converts the expression to the specified type. Example: `Hits UInt32 DEFAULT 0` is internally represented as `Hits UInt32 DEFAULT toUInt32(0)`.
|
||||
|
||||
|
@ -34,7 +34,7 @@ If the `alter_sync` is set to `2` and some replicas are not active for more than
|
||||
|
||||
## BY expression
|
||||
|
||||
If you want to perform deduplication on custom set of columns rather than on all, you can specify list of columns explicitly or use any combination of [`*`](../../sql-reference/statements/select/index.md#asterisk), [`COLUMNS`](../../sql-reference/statements/select/index.md#columns-expression) or [`EXCEPT`](../../sql-reference/statements/select/index.md#except-modifier) expressions. The explictly written or implicitly expanded list of columns must include all columns specified in row ordering expression (both primary and sorting keys) and partitioning expression (partitioning key).
|
||||
If you want to perform deduplication on custom set of columns rather than on all, you can specify list of columns explicitly or use any combination of [`*`](../../sql-reference/statements/select/index.md#asterisk), [`COLUMNS`](../../sql-reference/statements/select/index.md#columns-expression) or [`EXCEPT`](../../sql-reference/statements/select/index.md#except-modifier) expressions. The explicitly written or implicitly expanded list of columns must include all columns specified in row ordering expression (both primary and sorting keys) and partitioning expression (partitioning key).
|
||||
|
||||
:::note
|
||||
Notice that `*` behaves just like in `SELECT`: [MATERIALIZED](../../sql-reference/statements/create/table.md#materialized) and [ALIAS](../../sql-reference/statements/create/table.md#alias) columns are not used for expansion.
|
||||
|
184
docs/en/sql-reference/table-functions/gcs.md
Normal file
184
docs/en/sql-reference/table-functions/gcs.md
Normal file
@ -0,0 +1,184 @@
|
||||
---
|
||||
slug: /en/sql-reference/table-functions/gcs
|
||||
sidebar_position: 45
|
||||
sidebar_label: s3
|
||||
keywords: [gcs, bucket]
|
||||
---
|
||||
|
||||
# gcs Table Function
|
||||
|
||||
Provides a table-like interface to select/insert files in [Google Cloud Storage](https://cloud.google.com/storage/).
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
gcs(path [,hmac_key, hmac_secret] [,format] [,structure] [,compression])
|
||||
```
|
||||
|
||||
:::tip GCS
|
||||
The GCS Table Function integrates with Google Cloud Storage by using the GCS XML API and HMAC keys. See the [Google interoperability docs]( https://cloud.google.com/storage/docs/interoperability) for more details about the endpoint and HMAC.
|
||||
|
||||
:::
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `path` — Bucket url with path to file. Supports following wildcards in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings.
|
||||
|
||||
:::note GCS
|
||||
The GCS path is in this format as the endpoint for the Google XML API is different than the JSON API:
|
||||
```
|
||||
https://storage.googleapis.com/<bucket>/<folder>/<filename(s)>
|
||||
```
|
||||
and not ~~https://storage.cloud.google.com~~.
|
||||
:::
|
||||
|
||||
- `format` — The [format](../../interfaces/formats.md#formats) of the file.
|
||||
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
|
||||
- `compression` — Parameter is optional. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. By default, it will autodetect compression by file extension.
|
||||
|
||||
**Returned value**
|
||||
|
||||
A table with the specified structure for reading or writing data in the specified file.
|
||||
|
||||
**Examples**
|
||||
|
||||
Selecting the first two rows from the table from GCS file `https://storage.googleapis.com/my-test-bucket-768/data.csv`:
|
||||
|
||||
``` sql
|
||||
SELECT *
|
||||
FROM gcs('https://storage.googleapis.com/my-test-bucket-768/data.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
|
||||
LIMIT 2;
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─column1─┬─column2─┬─column3─┐
|
||||
│ 1 │ 2 │ 3 │
|
||||
│ 3 │ 2 │ 1 │
|
||||
└─────────┴─────────┴─────────┘
|
||||
```
|
||||
|
||||
The similar but from file with `gzip` compression:
|
||||
|
||||
``` sql
|
||||
SELECT *
|
||||
FROM gcs('https://storage.googleapis.com/my-test-bucket-768/data.csv.gz', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32', 'gzip')
|
||||
LIMIT 2;
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─column1─┬─column2─┬─column3─┐
|
||||
│ 1 │ 2 │ 3 │
|
||||
│ 3 │ 2 │ 1 │
|
||||
└─────────┴─────────┴─────────┘
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
Suppose that we have several files with following URIs on GCS:
|
||||
|
||||
- 'https://storage.googleapis.com/my-test-bucket-768/some_prefix/some_file_1.csv'
|
||||
- 'https://storage.googleapis.com/my-test-bucket-768/some_prefix/some_file_2.csv'
|
||||
- 'https://storage.googleapis.com/my-test-bucket-768/some_prefix/some_file_3.csv'
|
||||
- 'https://storage.googleapis.com/my-test-bucket-768/some_prefix/some_file_4.csv'
|
||||
- 'https://storage.googleapis.com/my-test-bucket-768/another_prefix/some_file_1.csv'
|
||||
- 'https://storage.googleapis.com/my-test-bucket-768/another_prefix/some_file_2.csv'
|
||||
- 'https://storage.googleapis.com/my-test-bucket-768/another_prefix/some_file_3.csv'
|
||||
- 'https://storage.googleapis.com/my-test-bucket-768/another_prefix/some_file_4.csv'
|
||||
|
||||
Count the amount of rows in files ending with numbers from 1 to 3:
|
||||
|
||||
``` sql
|
||||
SELECT count(*)
|
||||
FROM gcs('https://storage.googleapis.com/my-test-bucket-768/{some,another}_prefix/some_file_{1..3}.csv', 'CSV', 'name String, value UInt32')
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─count()─┐
|
||||
│ 18 │
|
||||
└─────────┘
|
||||
```
|
||||
|
||||
Count the total amount of rows in all files in these two directories:
|
||||
|
||||
``` sql
|
||||
SELECT count(*)
|
||||
FROM gcs('https://storage.googleapis.com/my-test-bucket-768/{some,another}_prefix/*', 'CSV', 'name String, value UInt32')
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─count()─┐
|
||||
│ 24 │
|
||||
└─────────┘
|
||||
```
|
||||
|
||||
:::warning
|
||||
If your listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`.
|
||||
:::
|
||||
|
||||
Count the total amount of rows in files named `file-000.csv`, `file-001.csv`, … , `file-999.csv`:
|
||||
|
||||
``` sql
|
||||
SELECT count(*)
|
||||
FROM gcs('https://storage.googleapis.com/my-test-bucket-768/big_prefix/file-{000..999}.csv', 'CSV', 'name String, value UInt32');
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─count()─┐
|
||||
│ 12 │
|
||||
└─────────┘
|
||||
```
|
||||
|
||||
Insert data into file `test-data.csv.gz`:
|
||||
|
||||
``` sql
|
||||
INSERT INTO FUNCTION gcs('https://storage.googleapis.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
|
||||
VALUES ('test-data', 1), ('test-data-2', 2);
|
||||
```
|
||||
|
||||
Insert data into file `test-data.csv.gz` from existing table:
|
||||
|
||||
``` sql
|
||||
INSERT INTO FUNCTION gcs('https://storage.googleapis.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
|
||||
SELECT name, value FROM existing_table;
|
||||
```
|
||||
|
||||
Glob ** can be used for recursive directory traversal. Consider the below example, it will fetch all files from `my-test-bucket-768` directory recursively:
|
||||
|
||||
``` sql
|
||||
SELECT * FROM gcs('https://storage.googleapis.com/my-test-bucket-768/**', 'CSV', 'name String, value UInt32', 'gzip');
|
||||
```
|
||||
|
||||
The below get data from all `test-data.csv.gz` files from any folder inside `my-test-bucket` directory recursively:
|
||||
|
||||
``` sql
|
||||
SELECT * FROM gcs('https://storage.googleapis.com/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');
|
||||
```
|
||||
|
||||
## Partitioned Write
|
||||
|
||||
If you specify `PARTITION BY` expression when inserting data into `GCS` table, a separate file is created for each partition value. Splitting the data into separate files helps to improve reading operations efficiency.
|
||||
|
||||
**Examples**
|
||||
|
||||
1. Using partition ID in a key creates separate files:
|
||||
|
||||
```sql
|
||||
INSERT INTO TABLE FUNCTION
|
||||
gcs('http://bucket.amazonaws.com/my_bucket/file_{_partition_id}.csv', 'CSV', 'a String, b UInt32, c UInt32')
|
||||
PARTITION BY a VALUES ('x', 2, 3), ('x', 4, 5), ('y', 11, 12), ('y', 13, 14), ('z', 21, 22), ('z', 23, 24);
|
||||
```
|
||||
As a result, the data is written into three files: `file_x.csv`, `file_y.csv`, and `file_z.csv`.
|
||||
|
||||
2. Using partition ID in a bucket name creates files in different buckets:
|
||||
|
||||
```sql
|
||||
INSERT INTO TABLE FUNCTION
|
||||
gcs('http://bucket.amazonaws.com/my_bucket_{_partition_id}/file.csv', 'CSV', 'a UInt32, b UInt32, c UInt32')
|
||||
PARTITION BY a VALUES (1, 2, 3), (1, 4, 5), (10, 11, 12), (10, 13, 14), (20, 21, 22), (20, 23, 24);
|
||||
```
|
||||
As a result, the data is written into three files in different buckets: `my_bucket_1/file.csv`, `my_bucket_10/file.csv`, and `my_bucket_20/file.csv`.
|
||||
|
||||
**See Also**
|
||||
|
||||
- [S3 table function](s3.md)
|
||||
- [S3 engine](../../engines/table-engines/integrations/s3.md)
|
@ -6,7 +6,7 @@ sidebar_label: hdfsCluster
|
||||
|
||||
# hdfsCluster Table Function
|
||||
|
||||
Allows processing files from HDFS in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster, discloses asterics in HDFS file path, and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||
Allows processing files from HDFS in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster, discloses asterisks in HDFS file path, and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||
|
||||
**Syntax**
|
||||
|
||||
|
@ -53,7 +53,7 @@ The `remote` table function can be useful in the following cases:
|
||||
- Infrequent distributed requests that are made manually.
|
||||
- Distributed requests where the set of servers is re-defined each time.
|
||||
|
||||
### Adresses
|
||||
### Addresses
|
||||
|
||||
``` text
|
||||
example01-01-1
|
||||
|
@ -5,7 +5,7 @@ sidebar_label: s3Cluster
|
||||
title: "s3Cluster Table Function"
|
||||
---
|
||||
|
||||
Allows processing files from [Amazon S3](https://aws.amazon.com/s3/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster, discloses asterics in S3 file path, and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||
Allows processing files from [Amazon S3](https://aws.amazon.com/s3/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster, discloses asterisks in S3 file path, and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||
|
||||
**Syntax**
|
||||
|
||||
|
@ -80,7 +80,7 @@ WINDOW window_name as ([[PARTITION BY grouping_column] [ORDER BY sorting_column]
|
||||
- `PARTITION BY` - defines how to break a resultset into groups.
|
||||
- `ORDER BY` - defines how to order rows inside the group during calculation aggregate_function.
|
||||
- `ROWS or RANGE` - defines bounds of a frame, aggregate_function is calculated within a frame.
|
||||
- `WINDOW` - allows to reuse a window definition with multiple exressions.
|
||||
- `WINDOW` - allows to reuse a window definition with multiple expressions.
|
||||
|
||||
### Functions
|
||||
|
||||
|
@ -107,7 +107,7 @@ SELECT comment, hex(secret) FROM encryption_test WHERE comment LIKE '%gcm%';
|
||||
|
||||
## aes_encrypt_mysql {#aes_encrypt_mysql}
|
||||
|
||||
Совместима с шифрованием myqsl, результат может быть расшифрован функцией [AES_DECRYPT](https://dev.mysql.com/doc/refman/8.0/en/encryption-functions.html#function_aes-decrypt).
|
||||
Совместима с шифрованием mysql, результат может быть расшифрован функцией [AES_DECRYPT](https://dev.mysql.com/doc/refman/8.0/en/encryption-functions.html#function_aes-decrypt).
|
||||
|
||||
При одинаковых входящих значениях зашифрованный текст будет совпадать с результатом, возвращаемым функцией `encrypt`. Однако если `key` или `iv` длиннее, чем должны быть, `aes_encrypt_mysql` будет работать аналогично функции `aes_encrypt` в MySQL: свернет ключ и проигнорирует лишнюю часть `iv`.
|
||||
|
||||
@ -298,7 +298,7 @@ SELECT comment, decrypt('aes-256-ofb', secret, '12345678910121314151617181920212
|
||||
|
||||
## aes_decrypt_mysql {#aes_decrypt_mysql}
|
||||
|
||||
Совместима с шифрованием myqsl и может расшифровать данные, зашифрованные функцией [AES_ENCRYPT](https://dev.mysql.com/doc/refman/8.0/en/encryption-functions.html#function_aes-encrypt).
|
||||
Совместима с шифрованием mysql и может расшифровать данные, зашифрованные функцией [AES_ENCRYPT](https://dev.mysql.com/doc/refman/8.0/en/encryption-functions.html#function_aes-encrypt).
|
||||
|
||||
При одинаковых входящих значениях расшифрованный текст будет совпадать с результатом, возвращаемым функцией `decrypt`. Однако если `key` или `iv` длиннее, чем должны быть, `aes_decrypt_mysql` будет работать аналогично функции `aes_decrypt` в MySQL: свернет ключ и проигнорирует лишнюю часть `iv`.
|
||||
|
||||
|
@ -778,7 +778,7 @@ TCP端口,用于与客户端进行安全通信。 使用它与 [OpenSSL](#serv
|
||||
|
||||
## zookeeper {#server-settings_zookeeper}
|
||||
|
||||
包含允许ClickHouse与 [zookpeer](http://zookeeper.apache.org/) 集群。
|
||||
包含允许ClickHouse与 [zookeeper](http://zookeeper.apache.org/) 集群。
|
||||
|
||||
ClickHouse使用ZooKeeper存储复制表副本的元数据。 如果未使用复制的表,则可以省略此部分参数。
|
||||
|
||||
|
@ -517,11 +517,12 @@
|
||||
let previous_query = '';
|
||||
|
||||
const current_url = new URL(window.location);
|
||||
const opened_locally = location.protocol == 'file:';
|
||||
|
||||
const server_address = current_url.searchParams.get('url');
|
||||
if (server_address) {
|
||||
document.getElementById('url').value = server_address;
|
||||
} else if (location.protocol != 'file:') {
|
||||
} else if (!opened_locally) {
|
||||
/// Substitute the address of the server where the page is served.
|
||||
document.getElementById('url').value = location.origin;
|
||||
}
|
||||
@ -532,6 +533,19 @@
|
||||
document.getElementById('user').value = user_from_url;
|
||||
}
|
||||
|
||||
const pass_from_url = current_url.searchParams.get('password');
|
||||
if (pass_from_url) {
|
||||
document.getElementById('password').value = pass_from_url;
|
||||
/// Browsers don't allow manipulating history for the 'file:' protocol.
|
||||
if (!opened_locally) {
|
||||
let replaced_pass = current_url.searchParams;
|
||||
replaced_pass.delete('password');
|
||||
window.history.replaceState(null, '',
|
||||
window.location.origin + window.location.pathname + '?'
|
||||
+ replaced_pass.toString() + window.location.hash);
|
||||
}
|
||||
}
|
||||
|
||||
function postImpl(posted_request_num, query)
|
||||
{
|
||||
const user = document.getElementById('user').value;
|
||||
@ -548,7 +562,7 @@
|
||||
'&max_result_rows=1000&max_result_bytes=10000000&result_overflow_mode=break';
|
||||
|
||||
// If play.html is opened locally, append username and password to the URL parameter to avoid CORS issue.
|
||||
if (document.location.href.startsWith("file://")) {
|
||||
if (opened_locally) {
|
||||
url += '&user=' + encodeURIComponent(user) +
|
||||
'&password=' + encodeURIComponent(password)
|
||||
}
|
||||
@ -557,7 +571,7 @@
|
||||
|
||||
xhr.open('POST', url, true);
|
||||
// If play.html is open normally, use Basic auth to prevent username and password being exposed in URL parameters
|
||||
if (!document.location.href.startsWith("file://")) {
|
||||
if (!opened_locally) {
|
||||
xhr.setRequestHeader("Authorization", "Basic " + btoa(user+":"+password));
|
||||
}
|
||||
xhr.onreadystatechange = function()
|
||||
|
@ -27,7 +27,7 @@ AggregateFunctionPtr createAggregateFunctionBitwise(const std::string & name, co
|
||||
"is illegal, because it cannot be used in bitwise operations",
|
||||
argument_types[0]->getName(), name);
|
||||
|
||||
AggregateFunctionPtr res(createWithUnsignedIntegerType<AggregateFunctionBitwise, Data>(*argument_types[0], argument_types[0]));
|
||||
AggregateFunctionPtr res(createWithIntegerType<AggregateFunctionBitwise, Data>(*argument_types[0], argument_types[0]));
|
||||
|
||||
if (!res)
|
||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||
|
@ -1,6 +1,5 @@
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
#include <AggregateFunctions/FactoryHelpers.h>
|
||||
#include <AggregateFunctions/Helpers.h>
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
|
||||
// TODO include this last because of a broken roaring header. See the comment inside.
|
||||
|
@ -100,6 +100,28 @@ static IAggregateFunction * createWithUnsignedIntegerType(const IDataType & argu
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <template <typename, typename> class AggregateFunctionTemplate, template <typename> class Data, typename... TArgs>
|
||||
static IAggregateFunction * createWithSignedIntegerType(const IDataType & argument_type, TArgs && ... args)
|
||||
{
|
||||
WhichDataType which(argument_type);
|
||||
if (which.idx == TypeIndex::Int8) return new AggregateFunctionTemplate<Int8, Data<Int8>>(std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::Int16) return new AggregateFunctionTemplate<Int16, Data<Int16>>(std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::Int32) return new AggregateFunctionTemplate<Int32, Data<Int32>>(std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::Int64) return new AggregateFunctionTemplate<Int64, Data<Int64>>(std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::Int128) return new AggregateFunctionTemplate<Int128, Data<Int128>>(std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::Int256) return new AggregateFunctionTemplate<Int256, Data<Int256>>(std::forward<TArgs>(args)...);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <template <typename, typename> class AggregateFunctionTemplate, template <typename> class Data, typename... TArgs>
|
||||
static IAggregateFunction * createWithIntegerType(const IDataType & argument_type, TArgs && ... args)
|
||||
{
|
||||
IAggregateFunction * f = createWithUnsignedIntegerType<AggregateFunctionTemplate, Data>(argument_type, std::forward<TArgs>(args)...);
|
||||
if (f)
|
||||
return f;
|
||||
return createWithSignedIntegerType<AggregateFunctionTemplate, Data>(argument_type, std::forward<TArgs>(args)...);
|
||||
}
|
||||
|
||||
template <template <typename, typename> class AggregateFunctionTemplate, template <typename> class Data, typename... TArgs>
|
||||
static IAggregateFunction * createWithBasicNumberOrDateOrDateTime(const IDataType & argument_type, TArgs &&... args)
|
||||
{
|
||||
|
@ -60,13 +60,7 @@ bool ColumnFixedString::isDefaultAt(size_t index) const
|
||||
void ColumnFixedString::insert(const Field & x)
|
||||
{
|
||||
const String & s = x.get<const String &>();
|
||||
|
||||
if (s.size() > n)
|
||||
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "Too large string '{}' for FixedString column", s);
|
||||
|
||||
size_t old_size = chars.size();
|
||||
chars.resize_fill(old_size + n);
|
||||
memcpy(chars.data() + old_size, s.data(), s.size());
|
||||
insertData(s.data(), s.size());
|
||||
}
|
||||
|
||||
void ColumnFixedString::insertFrom(const IColumn & src_, size_t index)
|
||||
@ -87,8 +81,9 @@ void ColumnFixedString::insertData(const char * pos, size_t length)
|
||||
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "Too large string for FixedString column");
|
||||
|
||||
size_t old_size = chars.size();
|
||||
chars.resize_fill(old_size + n);
|
||||
chars.resize(old_size + n);
|
||||
memcpy(chars.data() + old_size, pos, length);
|
||||
memset(chars.data() + old_size + length, 0, n - length);
|
||||
}
|
||||
|
||||
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const
|
||||
@ -278,7 +273,7 @@ void ColumnFixedString::expand(const IColumn::Filter & mask, bool inverted)
|
||||
|
||||
ssize_t index = mask.size() - 1;
|
||||
ssize_t from = size() - 1;
|
||||
chars.resize_fill(mask.size() * n, 0);
|
||||
chars.resize_fill(mask.size() * n);
|
||||
while (index >= 0)
|
||||
{
|
||||
if (!!mask[index] ^ inverted)
|
||||
|
@ -485,13 +485,8 @@ void ColumnLowCardinality::setSharedDictionary(const ColumnPtr & column_unique)
|
||||
ColumnLowCardinality::MutablePtr ColumnLowCardinality::cutAndCompact(size_t start, size_t length) const
|
||||
{
|
||||
auto sub_positions = IColumn::mutate(idx.getPositions()->cut(start, length));
|
||||
/// Create column with new indexes and old dictionary.
|
||||
/// Dictionary is shared, but will be recreated after compactInplace call.
|
||||
auto column = ColumnLowCardinality::create(getDictionary().assumeMutable(), std::move(sub_positions));
|
||||
/// Will create new dictionary.
|
||||
column->compactInplace();
|
||||
|
||||
return column;
|
||||
auto new_column_unique = Dictionary::compact(dictionary.getColumnUnique(), sub_positions);
|
||||
return ColumnLowCardinality::create(std::move(new_column_unique), std::move(sub_positions));
|
||||
}
|
||||
|
||||
void ColumnLowCardinality::compactInplace()
|
||||
@ -589,7 +584,7 @@ size_t ColumnLowCardinality::Index::getSizeOfIndexType(const IColumn & column, s
|
||||
column.getName());
|
||||
}
|
||||
|
||||
void ColumnLowCardinality::Index::attachPositions(ColumnPtr positions_)
|
||||
void ColumnLowCardinality::Index::attachPositions(MutableColumnPtr positions_)
|
||||
{
|
||||
positions = std::move(positions_);
|
||||
updateSizeOfType();
|
||||
@ -820,21 +815,23 @@ void ColumnLowCardinality::Dictionary::setShared(const ColumnPtr & column_unique
|
||||
shared = true;
|
||||
}
|
||||
|
||||
void ColumnLowCardinality::Dictionary::compact(ColumnPtr & positions)
|
||||
void ColumnLowCardinality::Dictionary::compact(MutableColumnPtr & positions)
|
||||
{
|
||||
auto new_column_unique = column_unique->cloneEmpty();
|
||||
column_unique = compact(getColumnUnique(), positions);
|
||||
shared = false;
|
||||
}
|
||||
|
||||
auto & unique = getColumnUnique();
|
||||
MutableColumnPtr ColumnLowCardinality::Dictionary::compact(const IColumnUnique & unique, MutableColumnPtr & positions)
|
||||
{
|
||||
auto new_column_unique = unique.cloneEmpty();
|
||||
auto & new_unique = static_cast<IColumnUnique &>(*new_column_unique);
|
||||
|
||||
auto indexes = mapUniqueIndex(positions->assumeMutableRef());
|
||||
auto indexes = mapUniqueIndex(*positions);
|
||||
auto sub_keys = unique.getNestedColumn()->index(*indexes, 0);
|
||||
auto new_indexes = new_unique.uniqueInsertRangeFrom(*sub_keys, 0, sub_keys->size());
|
||||
|
||||
positions = IColumn::mutate(new_indexes->index(*positions, 0));
|
||||
column_unique = std::move(new_column_unique);
|
||||
|
||||
shared = false;
|
||||
return new_column_unique;
|
||||
}
|
||||
|
||||
ColumnPtr ColumnLowCardinality::cloneWithDefaultOnNull() const
|
||||
|
@ -160,7 +160,9 @@ public:
|
||||
|
||||
void reserve(size_t n) override { idx.reserve(n); }
|
||||
|
||||
size_t byteSize() const override { return idx.getPositions()->byteSize() + getDictionary().byteSize(); }
|
||||
/// Don't count the dictionary size as it can be shared between different blocks.
|
||||
size_t byteSize() const override { return idx.getPositions()->byteSize(); }
|
||||
|
||||
size_t byteSizeAt(size_t n) const override { return getDictionary().byteSizeAt(getIndexes().getUInt(n)); }
|
||||
size_t allocatedBytes() const override { return idx.getPositions()->allocatedBytes() + getDictionary().allocatedBytes(); }
|
||||
|
||||
@ -301,8 +303,8 @@ public:
|
||||
|
||||
void checkSizeOfType();
|
||||
|
||||
ColumnPtr detachPositions() { return std::move(positions); }
|
||||
void attachPositions(ColumnPtr positions_);
|
||||
MutableColumnPtr detachPositions() { return IColumn::mutate(std::move(positions)); }
|
||||
void attachPositions(MutableColumnPtr positions_);
|
||||
|
||||
void countKeys(ColumnUInt64::Container & counts) const;
|
||||
|
||||
@ -350,7 +352,9 @@ private:
|
||||
bool isShared() const { return shared; }
|
||||
|
||||
/// Create new dictionary with only keys that are mentioned in positions.
|
||||
void compact(ColumnPtr & positions);
|
||||
void compact(MutableColumnPtr & positions);
|
||||
|
||||
static MutableColumnPtr compact(const IColumnUnique & column_unique, MutableColumnPtr & positions);
|
||||
|
||||
private:
|
||||
WrappedPtr column_unique;
|
||||
|
@ -214,7 +214,7 @@ void ColumnNullable::insertFromNotNullable(const IColumn & src, size_t n)
|
||||
void ColumnNullable::insertRangeFromNotNullable(const IColumn & src, size_t start, size_t length)
|
||||
{
|
||||
getNestedColumn().insertRangeFrom(src, start, length);
|
||||
getNullMapData().resize_fill(getNullMapData().size() + length, 0);
|
||||
getNullMapData().resize_fill(getNullMapData().size() + length);
|
||||
}
|
||||
|
||||
void ColumnNullable::insertManyFromNotNullable(const IColumn & src, size_t position, size_t length)
|
||||
|
@ -176,7 +176,7 @@ void ColumnString::expand(const IColumn::Filter & mask, bool inverted)
|
||||
/// (if not, one of exceptions below will throw) and we can calculate the resulting chars size.
|
||||
UInt64 last_offset = offsets_data[from] + (mask.size() - offsets_data.size());
|
||||
offsets_data.resize(mask.size());
|
||||
chars_data.resize_fill(last_offset, 0);
|
||||
chars_data.resize_fill(last_offset);
|
||||
while (index >= 0)
|
||||
{
|
||||
offsets_data[index] = last_offset;
|
||||
|
@ -649,6 +649,8 @@
|
||||
M(679, IO_URING_SUBMIT_ERROR) \
|
||||
M(690, MIXED_ACCESS_PARAMETER_TYPES) \
|
||||
M(691, UNKNOWN_ELEMENT_OF_ENUM) \
|
||||
M(692, TOO_MANY_MUTATIONS) \
|
||||
M(693, AWS_ERROR) \
|
||||
\
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
M(1000, POCO_EXCEPTION) \
|
||||
|
@ -103,6 +103,9 @@
|
||||
M(DelayedInserts, "Number of times the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \
|
||||
M(RejectedInserts, "Number of times the INSERT of a block to a MergeTree table was rejected with 'Too many parts' exception due to high number of active data parts for partition.") \
|
||||
M(DelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \
|
||||
M(DelayedMutations, "Number of times the mutation of a MergeTree table was throttled due to high number of unfinished mutations for table.") \
|
||||
M(RejectedMutations, "Number of times the mutation of a MergeTree table was rejected with 'Too many mutations' exception due to high number of unfinished mutations for table.") \
|
||||
M(DelayedMutationsMilliseconds, "Total number of milliseconds spent while the mutation of a MergeTree table was throttled due to high number of unfinished mutations for table.") \
|
||||
M(DistributedDelayedInserts, "Number of times the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \
|
||||
M(DistributedRejectedInserts, "Number of times the INSERT of a block to a Distributed table was rejected with 'Too many bytes' exception due to high number of pending bytes.") \
|
||||
M(DistributedDelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \
|
||||
@ -250,8 +253,8 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(DNSError, "Total count of errors in DNS resolution") \
|
||||
\
|
||||
M(RealTimeMicroseconds, "Total (wall clock) time spent in processing (queries and other tasks) threads (note that this is a sum).") \
|
||||
M(UserTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in user space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
|
||||
M(SystemTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in OS kernel space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
|
||||
M(UserTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in user mode. This include time CPU pipeline was stalled due to main memory access, cache misses, branch mispredictions, hyper-threading, etc.") \
|
||||
M(SystemTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in OS kernel mode. This is time spent in syscalls, excluding waiting time during blocking syscalls.") \
|
||||
M(MemoryOvercommitWaitTimeMicroseconds, "Total time spent in waiting for memory to be freed in OvercommitTracker.") \
|
||||
M(MemoryAllocatorPurge, "Total number of times memory allocator purge was requested") \
|
||||
M(MemoryAllocatorPurgeTimeMicroseconds, "Total number of times memory allocator purge was requested") \
|
||||
|
@ -37,7 +37,7 @@ using RWLock = std::shared_ptr<RWLockImpl>;
|
||||
///
|
||||
/// NOTE: it is dangerous to acquire lock with NO_QUERY, because FastPath doesn't
|
||||
/// exist for this case and deadlock, described in previous note,
|
||||
/// may accur in case of recursive locking.
|
||||
/// may occur in case of recursive locking.
|
||||
class RWLockImpl : public std::enable_shared_from_this<RWLockImpl>
|
||||
{
|
||||
public:
|
||||
|
@ -1,7 +1,6 @@
|
||||
#if defined(OS_LINUX)
|
||||
#include <Common/TimerDescriptor.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <base/defines.h>
|
||||
|
||||
#include <sys/timerfd.h>
|
||||
#include <fcntl.h>
|
||||
|
@ -44,7 +44,7 @@ struct NetworkInterfaces
|
||||
std::optional<Poco::Net::IPAddress> interface_address;
|
||||
switch (family)
|
||||
{
|
||||
/// We interested only in IP-adresses
|
||||
/// We interested only in IP-addresses
|
||||
case AF_INET:
|
||||
{
|
||||
interface_address.emplace(*(iface->ifa_addr));
|
||||
|
@ -17,10 +17,10 @@ namespace Poco { class Logger; }
|
||||
|
||||
namespace
|
||||
{
|
||||
[[maybe_unused]] const ::Poco::Logger * getLogger(const ::Poco::Logger * logger) { return logger; };
|
||||
[[maybe_unused]] const ::Poco::Logger * getLogger(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); };
|
||||
[[maybe_unused]] std::unique_ptr<LogToStrImpl> getLogger(std::unique_ptr<LogToStrImpl> && logger) { return logger; };
|
||||
[[maybe_unused]] std::unique_ptr<LogFrequencyLimiterIml> getLogger(std::unique_ptr<LogFrequencyLimiterIml> && logger) { return logger; };
|
||||
[[maybe_unused]] const ::Poco::Logger * getLogger(const ::Poco::Logger * logger) { return logger; }
|
||||
[[maybe_unused]] const ::Poco::Logger * getLogger(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); }
|
||||
[[maybe_unused]] std::unique_ptr<LogToStrImpl> getLogger(std::unique_ptr<LogToStrImpl> && logger) { return logger; }
|
||||
[[maybe_unused]] std::unique_ptr<LogFrequencyLimiterIml> getLogger(std::unique_ptr<LogFrequencyLimiterIml> && logger) { return logger; }
|
||||
}
|
||||
|
||||
#define LOG_IMPL_FIRST_ARG(X, ...) X
|
||||
|
@ -43,7 +43,8 @@ void setThreadName(const char * name)
|
||||
#else
|
||||
if (0 != prctl(PR_SET_NAME, name, 0, 0, 0))
|
||||
#endif
|
||||
DB::throwFromErrno("Cannot set thread name with prctl(PR_SET_NAME, ...)", DB::ErrorCodes::PTHREAD_ERROR);
|
||||
if (errno != ENOSYS) /// It's ok if the syscall is unsupported in some environments.
|
||||
DB::throwFromErrno("Cannot set thread name with prctl(PR_SET_NAME, ...)", DB::ErrorCodes::PTHREAD_ERROR);
|
||||
|
||||
memcpy(thread_name, name, std::min<size_t>(1 + strlen(name), THREAD_NAME_SIZE - 1));
|
||||
}
|
||||
@ -62,7 +63,8 @@ const char * getThreadName()
|
||||
// throw DB::Exception(DB::ErrorCodes::PTHREAD_ERROR, "Cannot get thread name with pthread_get_name_np()");
|
||||
#else
|
||||
if (0 != prctl(PR_GET_NAME, thread_name, 0, 0, 0))
|
||||
DB::throwFromErrno("Cannot get thread name with prctl(PR_GET_NAME)", DB::ErrorCodes::PTHREAD_ERROR);
|
||||
if (errno != ENOSYS) /// It's ok if the syscall is unsupported in some environments.
|
||||
DB::throwFromErrno("Cannot get thread name with prctl(PR_GET_NAME)", DB::ErrorCodes::PTHREAD_ERROR);
|
||||
#endif
|
||||
|
||||
return thread_name;
|
||||
|
@ -279,6 +279,8 @@ class IColumn;
|
||||
\
|
||||
M(UInt64, parts_to_delay_insert, 0, "If the destination table contains at least that many active parts in a single partition, artificially slow down insert into table.", 0) \
|
||||
M(UInt64, parts_to_throw_insert, 0, "If more than this number active parts in a single partition of the destination table, throw 'Too many parts ...' exception.", 0) \
|
||||
M(UInt64, number_of_mutations_to_delay, 0, "If the mutated table contains at least that many unfinished mutations, artificially slow down mutations of table. 0 - disabled", 0) \
|
||||
M(UInt64, number_of_mutations_to_throw, 0, "If the mutated table contains at least that many unfinished mutations, throw 'Too many mutations ...' exception. 0 - disabled", 0) \
|
||||
M(Bool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.", 0) \
|
||||
M(UInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) \
|
||||
M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \
|
||||
@ -711,7 +713,7 @@ class IColumn;
|
||||
\
|
||||
M(String, workload, "default", "Name of workload to be used to access resources", 0) \
|
||||
\
|
||||
M(Bool, parallelize_output_from_storages, false, "Parallelize output for reading step from storage. It allows parallelizing query processing right after reading from storage if possible", 0) \
|
||||
M(Bool, parallelize_output_from_storages, true, "Parallelize output for reading step from storage. It allows parallelizing query processing right after reading from storage if possible", 0) \
|
||||
\
|
||||
/** Experimental functions */ \
|
||||
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
|
||||
@ -824,7 +826,7 @@ class IColumn;
|
||||
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
|
||||
M(Bool, input_format_parquet_case_insensitive_column_matching, false, "Ignore case when matching Parquet columns with CH columns.", 0) \
|
||||
/* TODO: Consider unifying this with https://github.com/ClickHouse/ClickHouse/issues/38755 */ \
|
||||
M(Bool, input_format_parquet_preserve_order, true, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \
|
||||
M(Bool, input_format_parquet_preserve_order, false, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \
|
||||
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
|
||||
M(Bool, input_format_orc_allow_missing_columns, false, "Allow missing columns while reading ORC input formats", 0) \
|
||||
M(Bool, input_format_parquet_allow_missing_columns, false, "Allow missing columns while reading Parquet input formats", 0) \
|
||||
|
@ -80,8 +80,10 @@ namespace SettingsChangesHistory
|
||||
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
|
||||
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
|
||||
{
|
||||
{"23.4", {{"allow_suspicious_indices", true, false, "If true, index can defined with identical expressions"}}},
|
||||
{"23.4", {{"connect_timeout_with_failover_ms", 50, 1000, "Increase default connect timeout because of async connect"},
|
||||
{"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reade to reorder rows for better parallelism."},
|
||||
{"parallelize_output_from_storages", false, true, "Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows."}}},
|
||||
{"23.4", {{"allow_suspicious_indices", true, false, "If true, index can defined with identical expressions"},
|
||||
{"connect_timeout_with_failover_ms", 50, 1000, "Increase default connect timeout because of async connect"},
|
||||
{"connect_timeout_with_failover_secure_ms", 100, 1000, "Increase default secure connect timeout because of async connect"},
|
||||
{"hedged_connection_timeout_ms", 100, 50, "Start new connection in hedged requests after 50 ms instead of 100 to correspond with previous connect timeout"}}},
|
||||
{"23.3", {{"output_format_parquet_version", "1.0", "2.latest", "Use latest Parquet format version for output format"},
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
#include <Daemon/BaseDaemon.h>
|
||||
#include <Daemon/SentryWriter.h>
|
||||
#include <Parsers/toOneLineQuery.h>
|
||||
#include <base/errnoToString.h>
|
||||
#include <base/defines.h>
|
||||
|
||||
|
@ -319,7 +319,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context))
|
||||
{
|
||||
configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, false);
|
||||
use_table_cache = named_collection->getOrDefault<UInt64>("use_tables_cache", 0);
|
||||
use_table_cache = named_collection->getOrDefault<UInt64>("use_table_cache", 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -7,8 +7,8 @@
|
||||
#include <Columns/IColumn.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Core/ColumnNumbers.h>
|
||||
#include <Core/ColumnsWithTypeAndName.h>
|
||||
#include <Core/callOnTypeIndex.h>
|
||||
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <IO/S3/Credentials.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
# include <aws/core/Version.h>
|
||||
# include <aws/core/platform/OSVersionInfo.h>
|
||||
# include <aws/core/auth/STSCredentialsProvider.h>
|
||||
@ -15,15 +16,24 @@
|
||||
# include <IO/S3/PocoHTTPClient.h>
|
||||
# include <IO/S3/PocoHTTPClientFactory.h>
|
||||
# include <IO/S3/Client.h>
|
||||
# include <IO/S3Common.h>
|
||||
|
||||
# include <fstream>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int AWS_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
namespace DB::S3
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
bool areCredentialsEmptyOrExpired(const Aws::Auth::AWSCredentials & credentials, uint64_t expiration_window_seconds)
|
||||
{
|
||||
if (credentials.IsEmpty())
|
||||
@ -33,7 +43,6 @@ bool areCredentialsEmptyOrExpired(const Aws::Auth::AWSCredentials & credentials,
|
||||
return now >= credentials.GetExpiration() - std::chrono::seconds(expiration_window_seconds);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
AWSEC2MetadataClient::AWSEC2MetadataClient(const Aws::Client::ClientConfiguration & client_configuration, const char * endpoint_)
|
||||
@ -95,39 +104,22 @@ Aws::String AWSEC2MetadataClient::awsComputeUserAgentString()
|
||||
Aws::String AWSEC2MetadataClient::getDefaultCredentialsSecurely() const
|
||||
{
|
||||
String user_agent_string = awsComputeUserAgentString();
|
||||
String new_token;
|
||||
|
||||
auto [new_token, response_code] = getEC2MetadataToken(user_agent_string);
|
||||
if (response_code == Aws::Http::HttpResponseCode::BAD_REQUEST)
|
||||
return {};
|
||||
else if (response_code != Aws::Http::HttpResponseCode::OK || new_token.empty())
|
||||
{
|
||||
std::lock_guard locker(token_mutex);
|
||||
|
||||
Aws::StringStream ss;
|
||||
ss << endpoint << EC2_IMDS_TOKEN_RESOURCE;
|
||||
std::shared_ptr<Aws::Http::HttpRequest> token_request(Aws::Http::CreateHttpRequest(ss.str(), Aws::Http::HttpMethod::HTTP_PUT,
|
||||
Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
|
||||
token_request->SetHeaderValue(EC2_IMDS_TOKEN_TTL_HEADER, EC2_IMDS_TOKEN_TTL_DEFAULT_VALUE);
|
||||
token_request->SetUserAgent(user_agent_string);
|
||||
LOG_TRACE(logger, "Calling EC2MetadataService to get token.");
|
||||
auto result = GetResourceWithAWSWebServiceResult(token_request);
|
||||
const String & token_string = result.GetPayload();
|
||||
new_token = Aws::Utils::StringUtils::Trim(token_string.c_str());
|
||||
|
||||
if (result.GetResponseCode() == Aws::Http::HttpResponseCode::BAD_REQUEST)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
else if (result.GetResponseCode() != Aws::Http::HttpResponseCode::OK || new_token.empty())
|
||||
{
|
||||
LOG_TRACE(logger, "Calling EC2MetadataService to get token failed, falling back to less secure way.");
|
||||
return getDefaultCredentials();
|
||||
}
|
||||
token = new_token;
|
||||
LOG_TRACE(logger, "Calling EC2MetadataService to get token failed, "
|
||||
"falling back to less secure way. HTTP response code: {}", response_code);
|
||||
return getDefaultCredentials();
|
||||
}
|
||||
|
||||
token = std::move(new_token);
|
||||
String url = endpoint + EC2_SECURITY_CREDENTIALS_RESOURCE;
|
||||
std::shared_ptr<Aws::Http::HttpRequest> profile_request(Aws::Http::CreateHttpRequest(url,
|
||||
Aws::Http::HttpMethod::HTTP_GET,
|
||||
Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
|
||||
profile_request->SetHeaderValue(EC2_IMDS_TOKEN_HEADER, new_token);
|
||||
profile_request->SetHeaderValue(EC2_IMDS_TOKEN_HEADER, token);
|
||||
profile_request->SetUserAgent(user_agent_string);
|
||||
String profile_string = GetResourceWithAWSWebServiceResult(profile_request).GetPayload();
|
||||
|
||||
@ -148,12 +140,55 @@ Aws::String AWSEC2MetadataClient::getDefaultCredentialsSecurely() const
|
||||
std::shared_ptr<Aws::Http::HttpRequest> credentials_request(Aws::Http::CreateHttpRequest(ss.str(),
|
||||
Aws::Http::HttpMethod::HTTP_GET,
|
||||
Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
|
||||
credentials_request->SetHeaderValue(EC2_IMDS_TOKEN_HEADER, new_token);
|
||||
credentials_request->SetHeaderValue(EC2_IMDS_TOKEN_HEADER, token);
|
||||
credentials_request->SetUserAgent(user_agent_string);
|
||||
LOG_DEBUG(logger, "Calling EC2MetadataService resource {} with token.", ss.str());
|
||||
return GetResourceWithAWSWebServiceResult(credentials_request).GetPayload();
|
||||
}
|
||||
|
||||
Aws::String AWSEC2MetadataClient::getCurrentAvailabilityZone() const
|
||||
{
|
||||
String user_agent_string = awsComputeUserAgentString();
|
||||
auto [new_token, response_code] = getEC2MetadataToken(user_agent_string);
|
||||
if (response_code != Aws::Http::HttpResponseCode::OK || new_token.empty())
|
||||
throw DB::Exception(ErrorCodes::AWS_ERROR,
|
||||
"Failed to make token request. HTTP response code: {}", response_code);
|
||||
|
||||
token = std::move(new_token);
|
||||
const String url = endpoint + EC2_AVAILABILITY_ZONE_RESOURCE;
|
||||
std::shared_ptr<Aws::Http::HttpRequest> profile_request(
|
||||
Aws::Http::CreateHttpRequest(url, Aws::Http::HttpMethod::HTTP_GET, Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
|
||||
|
||||
profile_request->SetHeaderValue(EC2_IMDS_TOKEN_HEADER, token);
|
||||
profile_request->SetUserAgent(user_agent_string);
|
||||
|
||||
const auto result = GetResourceWithAWSWebServiceResult(profile_request);
|
||||
if (result.GetResponseCode() != Aws::Http::HttpResponseCode::OK)
|
||||
throw DB::Exception(ErrorCodes::AWS_ERROR,
|
||||
"Failed to get availability zone. HTTP response code: {}", result.GetResponseCode());
|
||||
|
||||
return Aws::Utils::StringUtils::Trim(result.GetPayload().c_str());
|
||||
}
|
||||
|
||||
std::pair<Aws::String, Aws::Http::HttpResponseCode> AWSEC2MetadataClient::getEC2MetadataToken(const std::string & user_agent_string) const
|
||||
{
|
||||
std::lock_guard locker(token_mutex);
|
||||
|
||||
Aws::StringStream ss;
|
||||
ss << endpoint << EC2_IMDS_TOKEN_RESOURCE;
|
||||
std::shared_ptr<Aws::Http::HttpRequest> token_request(
|
||||
Aws::Http::CreateHttpRequest(
|
||||
ss.str(), Aws::Http::HttpMethod::HTTP_PUT,
|
||||
Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
|
||||
token_request->SetHeaderValue(EC2_IMDS_TOKEN_TTL_HEADER, EC2_IMDS_TOKEN_TTL_DEFAULT_VALUE);
|
||||
token_request->SetUserAgent(user_agent_string);
|
||||
|
||||
LOG_TRACE(logger, "Calling EC2MetadataService to get token.");
|
||||
const auto result = GetResourceWithAWSWebServiceResult(token_request);
|
||||
const auto & token_string = result.GetPayload();
|
||||
return { Aws::Utils::StringUtils::Trim(token_string.c_str()), result.GetResponseCode() };
|
||||
}
|
||||
|
||||
Aws::String AWSEC2MetadataClient::getCurrentRegion() const
|
||||
{
|
||||
return Aws::Region::AWS_GLOBAL;
|
||||
|
@ -3,13 +3,12 @@
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
# include <aws/core/client/ClientConfiguration.h>
|
||||
# include <aws/core/internal/AWSHttpResourceClient.h>
|
||||
# include <aws/core/config/AWSProfileConfigLoader.h>
|
||||
# include <aws/core/auth/AWSCredentialsProvider.h>
|
||||
# include <aws/core/auth/AWSCredentialsProviderChain.h>
|
||||
|
||||
|
||||
# include <IO/S3/PocoHTTPClient.h>
|
||||
|
||||
|
||||
@ -21,6 +20,7 @@ inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120;
|
||||
class AWSEC2MetadataClient : public Aws::Internal::AWSHttpResourceClient
|
||||
{
|
||||
static constexpr char EC2_SECURITY_CREDENTIALS_RESOURCE[] = "/latest/meta-data/iam/security-credentials";
|
||||
static constexpr char EC2_AVAILABILITY_ZONE_RESOURCE[] = "/latest/meta-data/placement/availability-zone";
|
||||
static constexpr char EC2_IMDS_TOKEN_RESOURCE[] = "/latest/api/token";
|
||||
static constexpr char EC2_IMDS_TOKEN_HEADER[] = "x-aws-ec2-metadata-token";
|
||||
static constexpr char EC2_IMDS_TOKEN_TTL_DEFAULT_VALUE[] = "21600";
|
||||
@ -49,7 +49,11 @@ public:
|
||||
|
||||
virtual Aws::String getCurrentRegion() const;
|
||||
|
||||
virtual Aws::String getCurrentAvailabilityZone() const;
|
||||
|
||||
private:
|
||||
std::pair<Aws::String, Aws::Http::HttpResponseCode> getEC2MetadataToken(const std::string & user_agent_string) const;
|
||||
|
||||
const Aws::String endpoint;
|
||||
mutable std::recursive_mutex token_mutex;
|
||||
mutable Aws::String token;
|
||||
@ -136,6 +140,7 @@ public:
|
||||
const Aws::Auth::AWSCredentials & credentials,
|
||||
CredentialsConfiguration credentials_configuration);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -3,7 +3,6 @@
|
||||
#include <array>
|
||||
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/HashTable/Hash.h>
|
||||
#include <Common/memcpySmall.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Core/Defines.h>
|
||||
|
@ -2102,6 +2102,7 @@ Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & a
|
||||
|
||||
std::optional<OutputBlockColumns> out_cols;
|
||||
std::optional<Sizes> shuffled_key_sizes;
|
||||
size_t rows_in_current_block = 0;
|
||||
|
||||
auto init_out_cols = [&]()
|
||||
{
|
||||
@ -2116,6 +2117,7 @@ Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & a
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
out_cols->aggregate_columns_data[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]);
|
||||
|
||||
++rows_in_current_block;
|
||||
data.getNullKeyData() = nullptr;
|
||||
data.hasNullKeyData() = false;
|
||||
}
|
||||
@ -2127,8 +2129,6 @@ Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & a
|
||||
// should be invoked at least once, because null data might be the only content of the `data`
|
||||
init_out_cols();
|
||||
|
||||
size_t rows_in_current_block = 0;
|
||||
|
||||
data.forEachValue(
|
||||
[&](const auto & key, auto & mapped)
|
||||
{
|
||||
|
@ -92,4 +92,4 @@ private:
|
||||
const size_t max_elements = 0;
|
||||
};
|
||||
|
||||
};
|
||||
}
|
||||
|
@ -170,4 +170,4 @@ size_t LRUFileCachePriority::LRUFileCacheIterator::use(const CacheGuard::Lock &)
|
||||
return ++queue_iter->hits;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
@ -65,4 +65,4 @@ private:
|
||||
mutable LRUFileCachePriority::LRUQueueIterator queue_iter;
|
||||
};
|
||||
|
||||
};
|
||||
}
|
||||
|
@ -219,7 +219,7 @@ ClusterPtr ClusterDiscovery::makeCluster(const ClusterInfo & cluster_info)
|
||||
{
|
||||
std::vector<Strings> shards;
|
||||
{
|
||||
std::map<size_t, Strings> replica_adresses;
|
||||
std::map<size_t, Strings> replica_addresses;
|
||||
|
||||
for (const auto & [_, node] : cluster_info.nodes_info)
|
||||
{
|
||||
@ -228,11 +228,11 @@ ClusterPtr ClusterDiscovery::makeCluster(const ClusterInfo & cluster_info)
|
||||
LOG_WARNING(log, "Node '{}' in cluster '{}' has different 'secure' value, skipping it", node.address, cluster_info.name);
|
||||
continue;
|
||||
}
|
||||
replica_adresses[node.shard_id].emplace_back(node.address);
|
||||
replica_addresses[node.shard_id].emplace_back(node.address);
|
||||
}
|
||||
|
||||
shards.reserve(replica_adresses.size());
|
||||
for (auto & [_, replicas] : replica_adresses)
|
||||
shards.reserve(replica_addresses.size());
|
||||
for (auto & [_, replicas] : replica_addresses)
|
||||
shards.emplace_back(std::move(replicas));
|
||||
}
|
||||
|
||||
|
@ -7,7 +7,6 @@
|
||||
#include <Common/RemoteHostFilter.h>
|
||||
#include <Common/ThreadPool_fwd.h>
|
||||
#include <Common/Throttler_fwd.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Core/Settings.h>
|
||||
#include <Core/UUID.h>
|
||||
@ -25,12 +24,10 @@
|
||||
#include "config.h"
|
||||
|
||||
#include <boost/container/flat_set.hpp>
|
||||
#include <exception>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <thread>
|
||||
|
||||
|
||||
namespace Poco::Net { class IPAddress; }
|
||||
|
@ -800,7 +800,6 @@ bool InterpreterSystemQuery::dropReplicaImpl(ASTSystemQuery & query, const Stora
|
||||
return false;
|
||||
|
||||
ReplicatedTableStatus status;
|
||||
auto zookeeper = getContext()->getZooKeeper();
|
||||
storage_replicated->getStatus(status);
|
||||
|
||||
/// Do not allow to drop local replicas and active remote replicas
|
||||
@ -809,13 +808,7 @@ bool InterpreterSystemQuery::dropReplicaImpl(ASTSystemQuery & query, const Stora
|
||||
"We can't drop local replica, please use `DROP TABLE` if you want "
|
||||
"to clean the data and drop this replica");
|
||||
|
||||
/// NOTE it's not atomic: replica may become active after this check, but before dropReplica(...)
|
||||
/// However, the main use case is to drop dead replica, which cannot become active.
|
||||
/// This check prevents only from accidental drop of some other replica.
|
||||
if (zookeeper->exists(status.zookeeper_path + "/replicas/" + query.replica + "/is_active"))
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Can't drop replica: {}, because it's active", query.replica);
|
||||
|
||||
storage_replicated->dropReplica(zookeeper, status.zookeeper_path, query.replica, log);
|
||||
storage_replicated->dropReplica(status.zookeeper_path, query.replica, log);
|
||||
LOG_TRACE(log, "Dropped replica {} of {}", query.replica, table->getStorageID().getNameForLogs());
|
||||
|
||||
return true;
|
||||
|
@ -1,7 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/HashTable/Hash.h>
|
||||
|
||||
#include <Core/Names.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
|
||||
|
@ -1,7 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/HashTable/Hash.h>
|
||||
|
||||
#include <Core/Names.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
|
||||
|
@ -1,8 +1,3 @@
|
||||
#include <sys/ioctl.h>
|
||||
#if defined(OS_SUNOS)
|
||||
# include <sys/termios.h>
|
||||
#endif
|
||||
#include <unistd.h>
|
||||
#include <Processors/Formats/Impl/PrettyBlockOutputFormat.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
@ -12,6 +7,7 @@
|
||||
#include <Common/UTF8Helpers.h>
|
||||
#include <Common/PODArray.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -19,9 +15,6 @@ PrettyBlockOutputFormat::PrettyBlockOutputFormat(
|
||||
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool mono_block_)
|
||||
: IOutputFormat(header_, out_), format_settings(format_settings_), serializations(header_.getSerializations()), mono_block(mono_block_)
|
||||
{
|
||||
struct winsize w;
|
||||
if (0 == ioctl(STDOUT_FILENO, TIOCGWINSZ, &w))
|
||||
terminal_width = w.ws_col;
|
||||
}
|
||||
|
||||
|
||||
|
@ -29,8 +29,6 @@ protected:
|
||||
void consumeExtremes(Chunk) override;
|
||||
|
||||
size_t total_rows = 0;
|
||||
size_t terminal_width = 0;
|
||||
|
||||
size_t row_number_width = 7; // "10000. "
|
||||
|
||||
const FormatSettings format_settings;
|
||||
|
@ -290,7 +290,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
|
||||
.callback = read_task_callback.value(),
|
||||
.count_participating_replicas = client_info.count_participating_replicas,
|
||||
.number_of_current_replica = client_info.number_of_current_replica,
|
||||
.colums_to_read = required_columns
|
||||
.columns_to_read = required_columns
|
||||
};
|
||||
|
||||
/// We have a special logic for local replica. It has to read less data, because in some cases it should
|
||||
@ -734,7 +734,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
|
||||
.callback = read_task_callback.value(),
|
||||
.count_participating_replicas = client_info.count_participating_replicas,
|
||||
.number_of_current_replica = client_info.number_of_current_replica,
|
||||
.colums_to_read = column_names
|
||||
.columns_to_read = column_names
|
||||
};
|
||||
|
||||
auto min_marks_for_concurrent_read = info.min_marks_for_concurrent_read;
|
||||
|
@ -6,7 +6,9 @@
|
||||
#include <Poco/MongoDB/Connection.h>
|
||||
#include <Poco/MongoDB/Cursor.h>
|
||||
#include <Poco/MongoDB/ObjectId.h>
|
||||
#include <Poco/MongoDB/Array.h>
|
||||
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
@ -16,6 +18,9 @@
|
||||
#include <base/range.h>
|
||||
#include <Poco/URI.h>
|
||||
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
|
||||
// only after poco
|
||||
// naming conflict:
|
||||
// Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value);
|
||||
@ -30,43 +35,130 @@ namespace ErrorCodes
|
||||
extern const int TYPE_MISMATCH;
|
||||
extern const int UNKNOWN_TYPE;
|
||||
extern const int MONGODB_ERROR;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select)
|
||||
{
|
||||
auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);
|
||||
|
||||
/// Looks like selecting _id column is implicit by default.
|
||||
if (!sample_block_to_select.has("_id"))
|
||||
cursor->query().returnFieldSelector().add("_id", 0);
|
||||
|
||||
for (const auto & column : sample_block_to_select)
|
||||
cursor->query().returnFieldSelector().add(column.name, 1);
|
||||
return cursor;
|
||||
}
|
||||
|
||||
MongoDBSource::MongoDBSource(
|
||||
std::shared_ptr<Poco::MongoDB::Connection> & connection_,
|
||||
std::unique_ptr<Poco::MongoDB::Cursor> cursor_,
|
||||
const Block & sample_block,
|
||||
UInt64 max_block_size_)
|
||||
: ISource(sample_block.cloneEmpty())
|
||||
, connection(connection_)
|
||||
, cursor{std::move(cursor_)}
|
||||
, max_block_size{max_block_size_}
|
||||
{
|
||||
description.init(sample_block);
|
||||
}
|
||||
|
||||
|
||||
MongoDBSource::~MongoDBSource() = default;
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
using ValueType = ExternalResultDescription::ValueType;
|
||||
using ObjectId = Poco::MongoDB::ObjectId;
|
||||
using MongoArray = Poco::MongoDB::Array;
|
||||
|
||||
|
||||
template <typename T>
|
||||
Field getNumber(const Poco::MongoDB::Element & value, const std::string & name)
|
||||
{
|
||||
switch (value.type())
|
||||
{
|
||||
case Poco::MongoDB::ElementTraits<Int32>::TypeId:
|
||||
return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<Int32> &>(value).value());
|
||||
case Poco::MongoDB::ElementTraits<Poco::Int64>::TypeId:
|
||||
return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<Poco::Int64> &>(value).value());
|
||||
case Poco::MongoDB::ElementTraits<Float64>::TypeId:
|
||||
return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<Float64> &>(value).value());
|
||||
case Poco::MongoDB::ElementTraits<bool>::TypeId:
|
||||
return static_cast<T>(static_cast<const Poco::MongoDB::ConcreteElement<bool> &>(value).value());
|
||||
case Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId:
|
||||
return Field();
|
||||
case Poco::MongoDB::ElementTraits<String>::TypeId:
|
||||
return parse<T>(static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value());
|
||||
default:
|
||||
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected a number, got type id = {} for column {}",
|
||||
toString(value.type()), name);
|
||||
}
|
||||
}
|
||||
|
||||
void prepareMongoDBArrayInfo(
|
||||
std::unordered_map<size_t, MongoDBArrayInfo> & array_info, size_t column_idx, const DataTypePtr data_type)
|
||||
{
|
||||
const auto * array_type = assert_cast<const DataTypeArray *>(data_type.get());
|
||||
auto nested = array_type->getNestedType();
|
||||
|
||||
size_t count_dimensions = 1;
|
||||
while (isArray(nested))
|
||||
{
|
||||
++count_dimensions;
|
||||
nested = assert_cast<const DataTypeArray *>(nested.get())->getNestedType();
|
||||
}
|
||||
|
||||
Field default_value = nested->getDefault();
|
||||
if (nested->isNullable())
|
||||
nested = assert_cast<const DataTypeNullable *>(nested.get())->getNestedType();
|
||||
|
||||
WhichDataType which(nested);
|
||||
std::function<Field(const Poco::MongoDB::Element & value, const std::string & name)> parser;
|
||||
|
||||
if (which.isUInt8())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt8>(value, name); };
|
||||
else if (which.isUInt16())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt16>(value, name); };
|
||||
else if (which.isUInt32())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt32>(value, name); };
|
||||
else if (which.isUInt64())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<UInt64>(value, name); };
|
||||
else if (which.isInt8())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int8>(value, name); };
|
||||
else if (which.isInt16())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int16>(value, name); };
|
||||
else if (which.isInt32())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int32>(value, name); };
|
||||
else if (which.isInt64())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Int64>(value, name); };
|
||||
else if (which.isFloat32())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Float32>(value, name); };
|
||||
else if (which.isFloat64())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field { return getNumber<Float64>(value, name); };
|
||||
else if (which.isString() || which.isFixedString())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field
|
||||
{
|
||||
if (value.type() == Poco::MongoDB::ElementTraits<ObjectId::Ptr>::TypeId)
|
||||
{
|
||||
String string_id = value.toString();
|
||||
return Field(string_id.data(), string_id.size());
|
||||
}
|
||||
else if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId)
|
||||
{
|
||||
String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();
|
||||
return Field(string.data(), string.size());
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String, got type id = {} for column {}",
|
||||
toString(value.type()), name);
|
||||
};
|
||||
else if (which.isDate())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field
|
||||
{
|
||||
if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)
|
||||
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}",
|
||||
toString(value.type()), name);
|
||||
|
||||
return static_cast<UInt16>(DateLUT::instance().toDayNum(
|
||||
static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime()));
|
||||
};
|
||||
else if (which.isDateTime())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field
|
||||
{
|
||||
if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)
|
||||
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Timestamp, got type id = {} for column {}",
|
||||
toString(value.type()), name);
|
||||
|
||||
return static_cast<UInt32>(static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime());
|
||||
};
|
||||
else if (which.isUUID())
|
||||
parser = [](const Poco::MongoDB::Element & value, const std::string & name) -> Field
|
||||
{
|
||||
if (value.type() != Poco::MongoDB::ElementTraits<String>::TypeId)
|
||||
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected String (UUID), got type id = {} for column {}",
|
||||
toString(value.type()), name);
|
||||
|
||||
String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();
|
||||
return parse<UUID>(string);
|
||||
};
|
||||
else
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type conversion to {} is not supported", nested->getName());
|
||||
|
||||
array_info[column_idx] = {count_dimensions, default_value, parser};
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void insertNumber(IColumn & column, const Poco::MongoDB::Element & value, const std::string & name)
|
||||
@ -102,7 +194,13 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
void insertValue(IColumn & column, const ValueType type, const Poco::MongoDB::Element & value, const std::string & name)
|
||||
void insertValue(
|
||||
IColumn & column,
|
||||
const ValueType type,
|
||||
const Poco::MongoDB::Element & value,
|
||||
const std::string & name,
|
||||
std::unordered_map<size_t, MongoDBArrayInfo> & array_info,
|
||||
size_t idx)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
@ -191,8 +289,75 @@ namespace
|
||||
toString(value.type()), name);
|
||||
break;
|
||||
}
|
||||
case ValueType::vtArray:
|
||||
{
|
||||
if (value.type() != Poco::MongoDB::ElementTraits<MongoArray::Ptr>::TypeId)
|
||||
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected Array, got type id = {} for column {}",
|
||||
toString(value.type()), name);
|
||||
|
||||
size_t expected_dimensions = array_info[idx].num_dimensions;
|
||||
const auto parse_value = array_info[idx].parser;
|
||||
std::vector<Row> dimensions(expected_dimensions + 1);
|
||||
|
||||
auto array = static_cast<const Poco::MongoDB::ConcreteElement<MongoArray::Ptr> &>(value).value();
|
||||
|
||||
std::vector<std::pair<const Poco::MongoDB::Element *, size_t>> arrays;
|
||||
arrays.emplace_back(&value, 0);
|
||||
|
||||
while (!arrays.empty())
|
||||
{
|
||||
size_t dimension_idx = arrays.size() - 1;
|
||||
|
||||
if (dimension_idx + 1 > expected_dimensions)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Got more dimensions than expected");
|
||||
|
||||
auto [parent_ptr, child_idx] = arrays.back();
|
||||
auto parent = static_cast<const Poco::MongoDB::ConcreteElement<MongoArray::Ptr> &>(*parent_ptr).value();
|
||||
|
||||
if (child_idx >= parent->size())
|
||||
{
|
||||
arrays.pop_back();
|
||||
|
||||
if (dimension_idx == 0)
|
||||
break;
|
||||
|
||||
dimensions[dimension_idx].emplace_back(Array(dimensions[dimension_idx + 1].begin(), dimensions[dimension_idx + 1].end()));
|
||||
dimensions[dimension_idx + 1].clear();
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
Poco::MongoDB::Element::Ptr child = parent->get(static_cast<int>(child_idx));
|
||||
arrays.back().second += 1;
|
||||
|
||||
if (child->type() == Poco::MongoDB::ElementTraits<MongoArray::Ptr>::TypeId)
|
||||
{
|
||||
arrays.emplace_back(child.get(), 0);
|
||||
}
|
||||
else if (child->type() == Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId)
|
||||
{
|
||||
if (dimension_idx + 1 == expected_dimensions)
|
||||
dimensions[dimension_idx + 1].emplace_back(array_info[idx].default_value);
|
||||
else
|
||||
dimensions[dimension_idx + 1].emplace_back(Array());
|
||||
}
|
||||
else if (dimension_idx + 1 == expected_dimensions)
|
||||
{
|
||||
dimensions[dimension_idx + 1].emplace_back(parse_value(*child, name));
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Got less dimensions than expected. ({} instead of {})", dimension_idx + 1, expected_dimensions);
|
||||
}
|
||||
}
|
||||
|
||||
assert_cast<ColumnArray &>(column).insert(Array(dimensions[1].begin(), dimensions[1].end()));
|
||||
break;
|
||||
|
||||
}
|
||||
default:
|
||||
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Value of unsupported type:{}", column.getName());
|
||||
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Value of unsupported type: {}", column.getName());
|
||||
}
|
||||
}
|
||||
|
||||
@ -200,6 +365,39 @@ namespace
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select)
|
||||
{
|
||||
auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);
|
||||
|
||||
/// Looks like selecting _id column is implicit by default.
|
||||
if (!sample_block_to_select.has("_id"))
|
||||
cursor->query().returnFieldSelector().add("_id", 0);
|
||||
|
||||
for (const auto & column : sample_block_to_select)
|
||||
cursor->query().returnFieldSelector().add(column.name, 1);
|
||||
return cursor;
|
||||
}
|
||||
|
||||
MongoDBSource::MongoDBSource(
|
||||
std::shared_ptr<Poco::MongoDB::Connection> & connection_,
|
||||
std::unique_ptr<Poco::MongoDB::Cursor> cursor_,
|
||||
const Block & sample_block,
|
||||
UInt64 max_block_size_)
|
||||
: ISource(sample_block.cloneEmpty())
|
||||
, connection(connection_)
|
||||
, cursor{std::move(cursor_)}
|
||||
, max_block_size{max_block_size_}
|
||||
{
|
||||
description.init(sample_block);
|
||||
|
||||
for (const auto idx : collections::range(0, description.sample_block.columns()))
|
||||
if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray)
|
||||
prepareMongoDBArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type);
|
||||
}
|
||||
|
||||
|
||||
MongoDBSource::~MongoDBSource() = default;
|
||||
|
||||
Chunk MongoDBSource::generate()
|
||||
{
|
||||
if (all_read)
|
||||
@ -251,11 +449,11 @@ Chunk MongoDBSource::generate()
|
||||
if (is_nullable)
|
||||
{
|
||||
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
|
||||
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name);
|
||||
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name, array_info, idx);
|
||||
column_nullable.getNullMapData().emplace_back(0);
|
||||
}
|
||||
else
|
||||
insertValue(*columns[idx], description.types[idx].first, *value, name);
|
||||
insertValue(*columns[idx], description.types[idx].first, *value, name, array_info, idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,8 @@
|
||||
#include <Processors/ISource.h>
|
||||
#include <Core/ExternalResultDescription.h>
|
||||
|
||||
#include <Core/Field.h>
|
||||
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
@ -19,6 +21,13 @@ namespace MongoDB
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct MongoDBArrayInfo
|
||||
{
|
||||
size_t num_dimensions;
|
||||
Field default_value;
|
||||
std::function<Field(const Poco::MongoDB::Element & value, const std::string & name)> parser;
|
||||
};
|
||||
|
||||
void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password);
|
||||
|
||||
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select);
|
||||
@ -45,6 +54,8 @@ private:
|
||||
const UInt64 max_block_size;
|
||||
ExternalResultDescription description;
|
||||
bool all_read = false;
|
||||
|
||||
std::unordered_map<size_t, MongoDBArrayInfo> array_info;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Common/CacheBase.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/HashTable/Hash.h>
|
||||
#include <Interpreters/AggregationCommon.h>
|
||||
#include <Formats/MarkInCompressedFile.h>
|
||||
|
||||
|
@ -34,7 +34,7 @@ struct ParallelReadingExtension
|
||||
/// This is needed to estimate the number of bytes
|
||||
/// between a pair of marks to perform one request
|
||||
/// over the network for a 1Gb of data.
|
||||
Names colums_to_read;
|
||||
Names columns_to_read;
|
||||
};
|
||||
|
||||
/// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm
|
||||
|
@ -120,6 +120,9 @@ namespace ProfileEvents
|
||||
extern const Event InsertedCompactParts;
|
||||
extern const Event MergedIntoWideParts;
|
||||
extern const Event MergedIntoCompactParts;
|
||||
extern const Event RejectedMutations;
|
||||
extern const Event DelayedMutations;
|
||||
extern const Event DelayedMutationsMilliseconds;
|
||||
}
|
||||
|
||||
namespace CurrentMetrics
|
||||
@ -177,6 +180,7 @@ namespace ErrorCodes
|
||||
extern const int SERIALIZATION_ERROR;
|
||||
extern const int NETWORK_ERROR;
|
||||
extern const int SOCKET_TIMEOUT;
|
||||
extern const int TOO_MANY_MUTATIONS;
|
||||
}
|
||||
|
||||
static void checkSuspiciousIndices(const ASTFunction * index_function)
|
||||
@ -4367,6 +4371,51 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<size_t>(delay_milliseconds)));
|
||||
}
|
||||
|
||||
void MergeTreeData::delayMutationOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const
|
||||
{
|
||||
const auto settings = getSettings();
|
||||
const auto & query_settings = query_context->getSettingsRef();
|
||||
|
||||
size_t num_mutations_to_delay = query_settings.number_of_mutations_to_delay
|
||||
? query_settings.number_of_mutations_to_delay
|
||||
: settings->number_of_mutations_to_delay;
|
||||
|
||||
size_t num_mutations_to_throw = query_settings.number_of_mutations_to_throw
|
||||
? query_settings.number_of_mutations_to_throw
|
||||
: settings->number_of_mutations_to_throw;
|
||||
|
||||
if (!num_mutations_to_delay && !num_mutations_to_throw)
|
||||
return;
|
||||
|
||||
size_t num_unfinished_mutations = getNumberOfUnfinishedMutations();
|
||||
if (num_mutations_to_throw && num_unfinished_mutations >= num_mutations_to_throw)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedMutations);
|
||||
throw Exception(ErrorCodes::TOO_MANY_MUTATIONS,
|
||||
"Too many unfinished mutations ({}) in table {}",
|
||||
num_unfinished_mutations, getLogName());
|
||||
}
|
||||
|
||||
if (num_mutations_to_delay && num_unfinished_mutations >= num_mutations_to_delay)
|
||||
{
|
||||
if (!num_mutations_to_throw)
|
||||
num_mutations_to_throw = num_mutations_to_delay * 2;
|
||||
|
||||
size_t mutations_over_threshold = num_unfinished_mutations - num_mutations_to_delay;
|
||||
size_t allowed_mutations_over_threshold = num_mutations_to_throw - num_mutations_to_delay;
|
||||
|
||||
double delay_factor = std::min(static_cast<double>(mutations_over_threshold) / allowed_mutations_over_threshold, 1.0);
|
||||
size_t delay_milliseconds = static_cast<size_t>(std::lerp(settings->min_delay_to_mutate_ms, settings->max_delay_to_mutate_ms, delay_factor));
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::DelayedMutations);
|
||||
ProfileEvents::increment(ProfileEvents::DelayedMutationsMilliseconds, delay_milliseconds);
|
||||
|
||||
if (until)
|
||||
until->tryWait(delay_milliseconds);
|
||||
else
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(delay_milliseconds));
|
||||
}
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
|
||||
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const
|
||||
|
@ -543,7 +543,6 @@ public:
|
||||
/// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
|
||||
std::optional<Int64> getMinPartDataVersion() const;
|
||||
|
||||
|
||||
/// Returns all detached parts
|
||||
DetachedPartsInfo getDetachedParts() const;
|
||||
|
||||
@ -554,11 +553,19 @@ public:
|
||||
MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
|
||||
ContextPtr context, PartsTemporaryRename & renamed_parts);
|
||||
|
||||
|
||||
/// If the table contains too many active parts, sleep for a while to give them time to merge.
|
||||
/// If until is non-null, wake up from the sleep earlier if the event happened.
|
||||
/// The decision to delay or throw is made according to settings 'parts_to_delay_insert' and 'parts_to_throw_insert'.
|
||||
void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const;
|
||||
|
||||
/// If the table contains too many unfinished mutations, sleep for a while to give them time to execute.
|
||||
/// If until is non-null, wake up from the sleep earlier if the event happened.
|
||||
/// The decision to delay or throw is made according to settings 'number_of_mutations_to_delay' and 'number_of_mutations_to_throw'.
|
||||
void delayMutationOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const;
|
||||
|
||||
/// Returns number of unfinished mutations (is_done = 0).
|
||||
virtual size_t getNumberOfUnfinishedMutations() const = 0;
|
||||
|
||||
/// Renames temporary part to a permanent part and adds it to the parts set.
|
||||
/// It is assumed that the part does not intersect with existing parts.
|
||||
/// Adds the part in the PreActive state (the part will be added to the active set later with out_transaction->commit()).
|
||||
|
@ -378,7 +378,7 @@ MergeTreeReadPoolParallelReplicas::~MergeTreeReadPoolParallelReplicas() = defaul
|
||||
|
||||
Block MergeTreeReadPoolParallelReplicas::getHeader() const
|
||||
{
|
||||
return storage_snapshot->getSampleBlockForColumns(extension.colums_to_read);
|
||||
return storage_snapshot->getSampleBlockForColumns(extension.columns_to_read);
|
||||
}
|
||||
|
||||
MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t thread)
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <Compression/CachedCompressedReadBuffer.h>
|
||||
|
||||
#include <base/getThreadId.h>
|
||||
#include <base/range.h>
|
||||
#include <utility>
|
||||
|
||||
|
||||
|
@ -67,6 +67,10 @@ struct Settings;
|
||||
M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \
|
||||
M(CleanDeletedRows, clean_deleted_rows, CleanDeletedRows::Never, "Is the Replicated Merge cleanup has to be done automatically at each merge or manually (possible values are 'Always'/'Never' (default))", 0) \
|
||||
M(UInt64, replicated_max_mutations_in_one_entry, 10000, "Max number of mutation commands that can be merged together and executed in one MUTATE_PART entry (0 means unlimited)", 0) \
|
||||
M(UInt64, number_of_mutations_to_delay, 0, "If table has at least that many unfinished mutations, artificially slow down mutations of table. Disabled if set to 0", 0) \
|
||||
M(UInt64, number_of_mutations_to_throw, 0, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \
|
||||
M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
|
||||
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
|
||||
\
|
||||
/** Inserts settings. */ \
|
||||
M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
|
||||
|
@ -1727,18 +1727,30 @@ size_t ReplicatedMergeTreeQueue::countMutations() const
|
||||
return mutations_by_znode.size();
|
||||
}
|
||||
|
||||
|
||||
size_t ReplicatedMergeTreeQueue::countFinishedMutations() const
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
|
||||
size_t count = 0;
|
||||
for (const auto & pair : mutations_by_znode)
|
||||
for (const auto & [_, status] : mutations_by_znode)
|
||||
{
|
||||
const auto & mutation = pair.second;
|
||||
if (!mutation.is_done)
|
||||
if (!status.is_done)
|
||||
break;
|
||||
++count;
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
size_t ReplicatedMergeTreeQueue::countUnfinishedMutations() const
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
|
||||
size_t count = 0;
|
||||
for (const auto & [_, status] : mutations_by_znode | std::views::reverse)
|
||||
{
|
||||
if (status.is_done)
|
||||
break;
|
||||
++count;
|
||||
}
|
||||
|
||||
|
@ -386,6 +386,8 @@ public:
|
||||
|
||||
/// Count the total number of active mutations that are finished (is_done = true).
|
||||
size_t countFinishedMutations() const;
|
||||
/// Count the total number of active mutations that are not finished (is_done = false).
|
||||
size_t countUnfinishedMutations() const;
|
||||
|
||||
/// Returns functor which used by MergeTreeMergerMutator to select parts for merge
|
||||
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint);
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include "Storages/MergeTree/IMergeTreeDataPart.h"
|
||||
|
||||
#include <optional>
|
||||
#include <ranges>
|
||||
|
||||
#include <base/sort.h>
|
||||
#include <Backups/BackupEntriesCollector.h>
|
||||
@ -305,7 +306,11 @@ void StorageMergeTree::alter(
|
||||
|
||||
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
|
||||
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
|
||||
|
||||
auto maybe_mutation_commands = commands.getMutationCommands(new_metadata, local_context->getSettingsRef().materialize_ttl_after_modify, local_context);
|
||||
if (!maybe_mutation_commands.empty())
|
||||
delayMutationOrThrowIfNeeded(nullptr, local_context);
|
||||
|
||||
Int64 mutation_version = -1;
|
||||
commands.apply(new_metadata, local_context);
|
||||
|
||||
@ -313,7 +318,6 @@ void StorageMergeTree::alter(
|
||||
if (commands.isSettingsAlter())
|
||||
{
|
||||
changeSettings(new_metadata.settings_changes, table_lock_holder);
|
||||
|
||||
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
|
||||
}
|
||||
else
|
||||
@ -579,11 +583,12 @@ void StorageMergeTree::setMutationCSN(const String & mutation_id, CSN csn)
|
||||
|
||||
void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context)
|
||||
{
|
||||
delayMutationOrThrowIfNeeded(nullptr, query_context);
|
||||
|
||||
/// Validate partition IDs (if any) before starting mutation
|
||||
getPartitionIdsAffectedByCommands(commands, query_context);
|
||||
|
||||
Int64 version = startMutation(commands, query_context);
|
||||
|
||||
if (query_context->getSettingsRef().mutations_sync > 0 || query_context->getCurrentTransaction())
|
||||
waitForMutation(version);
|
||||
}
|
||||
@ -1327,6 +1332,24 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
|
||||
return scheduled;
|
||||
}
|
||||
|
||||
size_t StorageMergeTree::getNumberOfUnfinishedMutations() const
|
||||
{
|
||||
size_t count = 0;
|
||||
for (const auto & [version, _] : current_mutations_by_version | std::views::reverse)
|
||||
{
|
||||
auto status = getIncompleteMutationsStatus(version);
|
||||
if (!status)
|
||||
continue;
|
||||
|
||||
if (status->is_done)
|
||||
break;
|
||||
|
||||
++count;
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
UInt64 StorageMergeTree::getCurrentMutationVersion(
|
||||
const DataPartPtr & part,
|
||||
std::unique_lock<std::mutex> & /*currently_processing_in_background_mutex_lock*/) const
|
||||
|
@ -112,6 +112,8 @@ public:
|
||||
|
||||
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
|
||||
|
||||
size_t getNumberOfUnfinishedMutations() const override;
|
||||
|
||||
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }
|
||||
|
||||
private:
|
||||
|
@ -416,7 +416,7 @@ StoragePostgreSQL::Configuration StoragePostgreSQL::processNamedCollectionResult
|
||||
required_arguments.insert("table");
|
||||
|
||||
validateNamedCollection<ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>>(
|
||||
named_collection, required_arguments, {"schema", "on_conflict", "addresses_expr", "host", "hostname", "port", "use_tables_cache"});
|
||||
named_collection, required_arguments, {"schema", "on_conflict", "addresses_expr", "host", "hostname", "port", "use_table_cache"});
|
||||
|
||||
configuration.addresses_expr = named_collection.getOrDefault<String>("addresses_expr", "");
|
||||
if (configuration.addresses_expr.empty())
|
||||
|
@ -1063,6 +1063,20 @@ void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
|
||||
}
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::dropReplica(const String & drop_zookeeper_path, const String & drop_replica, Poco::Logger * logger)
|
||||
{
|
||||
zkutil::ZooKeeperPtr zookeeper = getZooKeeperIfTableShutDown();
|
||||
|
||||
/// NOTE it's not atomic: replica may become active after this check, but before dropReplica(...)
|
||||
/// However, the main use case is to drop dead replica, which cannot become active.
|
||||
/// This check prevents only from accidental drop of some other replica.
|
||||
if (zookeeper->exists(drop_zookeeper_path + "/replicas/" + drop_replica + "/is_active"))
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Can't drop replica: {}, because it's active", drop_replica);
|
||||
|
||||
dropReplica(zookeeper, drop_zookeeper_path, drop_replica, logger);
|
||||
}
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper,
|
||||
const String & zookeeper_path, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger)
|
||||
{
|
||||
@ -5231,7 +5245,10 @@ void StorageReplicatedMergeTree::alter(
|
||||
alter_entry->create_time = time(nullptr);
|
||||
|
||||
auto maybe_mutation_commands = commands.getMutationCommands(
|
||||
*current_metadata, query_context->getSettingsRef().materialize_ttl_after_modify, query_context);
|
||||
*current_metadata,
|
||||
query_context->getSettingsRef().materialize_ttl_after_modify,
|
||||
query_context);
|
||||
|
||||
bool have_mutation = !maybe_mutation_commands.empty();
|
||||
alter_entry->have_mutation = have_mutation;
|
||||
|
||||
@ -5242,6 +5259,7 @@ void StorageReplicatedMergeTree::alter(
|
||||
PartitionBlockNumbersHolder partition_block_numbers_holder;
|
||||
if (have_mutation)
|
||||
{
|
||||
delayMutationOrThrowIfNeeded(&partial_shutdown_event, query_context);
|
||||
const String mutations_path(fs::path(zookeeper_path) / "mutations");
|
||||
|
||||
ReplicatedMergeTreeMutationEntry mutation_entry;
|
||||
@ -6423,6 +6441,8 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte
|
||||
/// After all needed parts are mutated (i.e. all active parts have the mutation version greater than
|
||||
/// the version of this mutation), the mutation is considered done and can be deleted.
|
||||
|
||||
delayMutationOrThrowIfNeeded(&partial_shutdown_event, query_context);
|
||||
|
||||
ReplicatedMergeTreeMutationEntry mutation_entry;
|
||||
mutation_entry.source_replica = replica_name;
|
||||
mutation_entry.commands = commands;
|
||||
@ -8053,6 +8073,10 @@ String StorageReplicatedMergeTree::getTableSharedID() const
|
||||
return toString(table_shared_id);
|
||||
}
|
||||
|
||||
size_t StorageReplicatedMergeTree::getNumberOfUnfinishedMutations() const
|
||||
{
|
||||
return queue.countUnfinishedMutations();
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::createTableSharedID() const
|
||||
{
|
||||
|
@ -229,6 +229,8 @@ public:
|
||||
static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
|
||||
Poco::Logger * logger, MergeTreeSettingsPtr table_settings = nullptr, std::optional<bool> * has_metadata_out = nullptr);
|
||||
|
||||
void dropReplica(const String & drop_zookeeper_path, const String & drop_replica, Poco::Logger * logger);
|
||||
|
||||
/// Removes table from ZooKeeper after the last replica was dropped
|
||||
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
|
||||
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
|
||||
@ -317,6 +319,8 @@ public:
|
||||
// Return table id, common for different replicas
|
||||
String getTableSharedID() const override;
|
||||
|
||||
size_t getNumberOfUnfinishedMutations() const override;
|
||||
|
||||
/// Returns the same as getTableSharedID(), but extracts it from a create query.
|
||||
static std::optional<String> tryGetTableSharedIDFromCreateQuery(const IAST & create_query, const ContextPtr & global_context);
|
||||
|
||||
|
@ -41,13 +41,19 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
StorageS3Cluster::StorageS3Cluster(
|
||||
const Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
ContextPtr context_,
|
||||
bool structure_argument_was_provided_)
|
||||
bool structure_argument_was_provided_,
|
||||
bool format_argument_was_provided_)
|
||||
: IStorageCluster(table_id_)
|
||||
, log(&Poco::Logger::get("StorageS3Cluster (" + table_id_.table_name + ")"))
|
||||
, s3_configuration{configuration_}
|
||||
@ -55,6 +61,7 @@ StorageS3Cluster::StorageS3Cluster(
|
||||
, format_name(configuration_.format)
|
||||
, compression_method(configuration_.compression_method)
|
||||
, structure_argument_was_provided(structure_argument_was_provided_)
|
||||
, format_argument_was_provided(format_argument_was_provided_)
|
||||
{
|
||||
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration_.url.uri);
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
@ -89,6 +96,28 @@ void StorageS3Cluster::updateConfigurationIfChanged(ContextPtr local_context)
|
||||
s3_configuration.update(local_context);
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
void addColumnsStructureToQueryWithS3ClusterEngine(ASTPtr & query, const String & structure, bool format_argument_was_provided, const String & function_name)
|
||||
{
|
||||
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
|
||||
if (!expression_list)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function {}, got '{}'", function_name, queryToString(query));
|
||||
|
||||
auto structure_literal = std::make_shared<ASTLiteral>(structure);
|
||||
|
||||
if (!format_argument_was_provided)
|
||||
{
|
||||
auto format_literal = std::make_shared<ASTLiteral>("auto");
|
||||
expression_list->children.push_back(format_literal);
|
||||
}
|
||||
|
||||
expression_list->children.push_back(structure_literal);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// The code executes on initiator
|
||||
Pipe StorageS3Cluster::read(
|
||||
const Names & column_names,
|
||||
@ -127,8 +156,8 @@ Pipe StorageS3Cluster::read(
|
||||
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
|
||||
|
||||
if (!structure_argument_was_provided)
|
||||
addColumnsStructureToQueryWithClusterEngine(
|
||||
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 5, getName());
|
||||
addColumnsStructureToQueryWithS3ClusterEngine(
|
||||
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), format_argument_was_provided, getName());
|
||||
|
||||
RestoreQualifiedNamesVisitor::Data data;
|
||||
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
|
||||
|
@ -32,7 +32,8 @@ public:
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
ContextPtr context_,
|
||||
bool structure_argument_was_provided_);
|
||||
bool structure_argument_was_provided_,
|
||||
bool format_argument_was_provided_);
|
||||
|
||||
std::string getName() const override { return "S3Cluster"; }
|
||||
|
||||
@ -59,6 +60,7 @@ private:
|
||||
NamesAndTypesList virtual_columns;
|
||||
Block virtual_block;
|
||||
bool structure_argument_was_provided;
|
||||
bool format_argument_was_provided;
|
||||
};
|
||||
|
||||
|
||||
|
@ -14,7 +14,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
static ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query)
|
||||
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query)
|
||||
{
|
||||
auto * select_query = query->as<ASTSelectQuery>();
|
||||
if (!select_query || !select_query->tables())
|
||||
|
@ -1,10 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include <Parsers/IAST.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query);
|
||||
|
||||
/// Add structure argument for queries with s3Cluster/hdfsCluster table function.
|
||||
void addColumnsStructureToQueryWithClusterEngine(ASTPtr & query, const String & structure, size_t max_arguments, const String & function_name);
|
||||
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user