Merge remote-tracking branch 'origin/fix-pg-sync-tables-exception-broken-sync' into fix-pg-sync-tables-exception-broken-sync

This commit is contained in:
kssenii 2023-05-17 12:35:36 +02:00
commit a279e61d9e
355 changed files with 3716 additions and 2939 deletions

View File

@ -1341,6 +1341,40 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestReleaseAnalyzer:
needs: [BuilderDebRelease]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_analyzer
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (release, analyzer)
REPO_COPY=${{runner.temp}}/stateless_analyzer/ClickHouse
KILL_TIMEOUT=10800
EOF
- name: Download json reports
uses: actions/download-artifact@v3
with:
path: ${{ env.REPORTS_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Functional test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestAarch64:
needs: [BuilderDebAarch64]
runs-on: [self-hosted, func-tester-aarch64]

6
.gitmodules vendored
View File

@ -253,6 +253,9 @@
[submodule "contrib/qpl"]
path = contrib/qpl
url = https://github.com/intel/qpl
[submodule "contrib/idxd-config"]
path = contrib/idxd-config
url = https://github.com/intel/idxd-config
[submodule "contrib/wyhash"]
path = contrib/wyhash
url = https://github.com/wangyi-fudan/wyhash
@ -335,6 +338,9 @@
[submodule "contrib/liburing"]
path = contrib/liburing
url = https://github.com/axboe/liburing
[submodule "contrib/libfiu"]
path = contrib/libfiu
url = https://github.com/ClickHouse/libfiu.git
[submodule "contrib/isa-l"]
path = contrib/isa-l
url = https://github.com/ClickHouse/isa-l.git

View File

@ -395,6 +395,8 @@ if ((NOT OS_LINUX AND NOT OS_ANDROID) OR (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG"))
set(ENABLE_GWP_ASAN OFF)
endif ()
option (ENABLE_FIU "Enable Fiu" ON)
option(WERROR "Enable -Werror compiler option" ON)
if (WERROR)

View File

@ -21,11 +21,17 @@ curl https://clickhouse.com/ | sh
* [Contacts](https://clickhouse.com/company/contact) can help to get your questions answered if there are any.
## Upcoming Events
* [**ClickHouse Spring Meetup in Manhattan**](https://www.meetup.com/clickhouse-new-york-user-group/events/292517734) - April 26 - It's spring, and it's time to meet again in the city! Talks include: "Building a domain specific query language on top of Clickhouse", "A Galaxy of Information", "Our Journey to ClickHouse Cloud from Redshift", and a ClickHouse update!
* [**v23.4 Release Webinar**](https://clickhouse.com/company/events/v23-4-release-webinar?utm_source=github&utm_medium=social&utm_campaign=release-webinar-2023-04) - April 26 - 23.4 is rapidly approaching. Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release.
* [**ClickHouse Meetup in Berlin**](https://www.meetup.com/clickhouse-berlin-user-group/events/292892466) - May 16 - Save the date! ClickHouse is coming back to Berlin. Were excited to announce an upcoming ClickHouse Meetup that you wont want to miss. Join us as we gather together to discuss the latest in the world of ClickHouse and share user stories.
* [**ClickHouse Meetup in Berlin**](https://www.meetup.com/clickhouse-berlin-user-group/events/292892466) - May 16
* [**ClickHouse Meetup in Barcelona**](https://www.meetup.com/clickhouse-barcelona-user-group/events/292892669) - May 25
* [**ClickHouse Meetup in London**](https://www.meetup.com/clickhouse-london-user-group/events/292892824) - May 25
* [**ClickHouse Meetup in San Francisco**](https://www.meetup.com/clickhouse-silicon-valley-meetup-group/events/293426725/) - Jun 7
* [**ClickHouse Meetup in Stockholm**](https://www.meetup.com/clickhouse-berlin-user-group/events/292892466) - Jun 13
Also, keep an eye out for upcoming meetups in Amsterdam, Boston, NYC, Beijing, and Toronto. Somewhere else you want us to be? Please feel free to reach out to tyler <at> clickhouse <dot> com.
## Recent Recordings
* **Recent Meetup Videos**: [Meetup Playlist](https://www.youtube.com/playlist?list=PL0Z2YDlm0b3iNDUzpY1S3L_iV4nARda_U) Whenever possible recordings of the ClickHouse Community Meetups are edited and presented as individual talks. Current featuring "Modern SQL in 2023", "Fast, Concurrent, and Consistent Asynchronous INSERTS in ClickHouse", and "Full-Text Indices: Design and Experiments"
* **Recording available**: [**v23.3 Release Webinar**](https://www.youtube.com/watch?v=ISaGUjvBNao) UNDROP TABLE, server settings introspection, nested dynamic disks, MySQL compatibility, parseDate Time, Lightweight Deletes, Parallel Replicas, integrations updates, and so much more! Watch it now!
* **Recording available**: [**v23.4 Release Webinar**]([https://www.youtube.com/watch?v=ISaGUjvBNao](https://www.youtube.com/watch?v=4rrf6bk_mOg)) UNDROP TABLE, server settings introspection, nested dynamic disks, MySQL compatibility, parseDate Time, Lightweight Deletes, Parallel Replicas, integrations updates, and so much more! Watch it now!
* **All release webinar recordings**: [YouTube playlist](https://www.youtube.com/playlist?list=PL0Z2YDlm0b3jAlSy1JxyP8zluvXaN3nxU)

View File

@ -314,7 +314,14 @@ struct integer<Bits, Signed>::_impl
const T alpha = t / static_cast<T>(max_int);
if (alpha <= static_cast<T>(max_int))
/** Here we have to use strict comparison.
* The max_int is 2^64 - 1.
* When casted to floating point type, it will be rounded to the closest representable number,
* which is 2^64.
* But 2^64 is not representable in uint64_t,
* so the maximum representable number will be strictly less.
*/
if (alpha < static_cast<T>(max_int))
self = static_cast<uint64_t>(alpha);
else // max(double) / 2^64 will surely contain less than 52 precision bits, so speed up computations.
set_multiplier<double>(self, static_cast<double>(alpha));

View File

@ -53,7 +53,7 @@ float logf(float x)
tmp = ix - OFF;
i = (tmp >> (23 - LOGF_TABLE_BITS)) % N;
k = (int32_t)tmp >> 23; /* arithmetic shift */
iz = ix - (tmp & 0x1ff << 23);
iz = ix - (tmp & 0xff800000);
invc = T[i].invc;
logc = T[i].logc;
z = (double_t)asfloat(iz);

View File

@ -9,27 +9,19 @@ if (CMAKE_CXX_COMPILER_LAUNCHER MATCHES "ccache" OR CMAKE_C_COMPILER_LAUNCHER MA
return()
endif()
set(ENABLE_CCACHE "default" CACHE STRING "Deprecated, use COMPILER_CACHE=(auto|ccache|sccache|disabled)")
if (NOT ENABLE_CCACHE STREQUAL "default")
message(WARNING "The -DENABLE_CCACHE is deprecated in favor of -DCOMPILER_CACHE")
endif()
set(COMPILER_CACHE "auto" CACHE STRING "Speedup re-compilations using the caching tools; valid options are 'auto' (ccache, then sccache), 'ccache', 'sccache', or 'disabled'")
# It has pretty complex logic, because the ENABLE_CCACHE is deprecated, but still should
# control the COMPILER_CACHE
# After it will be completely removed, the following block will be much simpler
if (COMPILER_CACHE STREQUAL "ccache" OR (ENABLE_CCACHE AND NOT ENABLE_CCACHE STREQUAL "default"))
find_program (CCACHE_EXECUTABLE ccache)
elseif(COMPILER_CACHE STREQUAL "disabled" OR NOT ENABLE_CCACHE STREQUAL "default")
message(STATUS "Using *ccache: no (disabled via configuration)")
return()
elseif(COMPILER_CACHE STREQUAL "auto")
if(COMPILER_CACHE STREQUAL "auto")
find_program (CCACHE_EXECUTABLE ccache sccache)
elseif (COMPILER_CACHE STREQUAL "ccache")
find_program (CCACHE_EXECUTABLE ccache)
elseif(COMPILER_CACHE STREQUAL "sccache")
find_program (CCACHE_EXECUTABLE sccache)
elseif(COMPILER_CACHE STREQUAL "disabled")
message(STATUS "Using *ccache: no (disabled via configuration)")
return()
else()
message(${RECONFIGURE_MESSAGE_LEVEL} "The COMPILER_CACHE must be one of (auto|ccache|sccache|disabled), given '${COMPILER_CACHE}'")
message(${RECONFIGURE_MESSAGE_LEVEL} "The COMPILER_CACHE must be one of (auto|ccache|sccache|disabled), value: '${COMPILER_CACHE}'")
endif()

View File

@ -21,7 +21,7 @@ set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_ASM_FLAGS "${CMAKE_ASM_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_EXE_LINKER_FLAGS_INIT "-fuse-ld=bfd")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=bfd")
# Currently, lld does not work with the error:
# ld.lld: error: section size decrease is too large

View File

@ -105,6 +105,7 @@ add_contrib (libfarmhash)
add_contrib (icu-cmake icu)
add_contrib (h3-cmake h3)
add_contrib (mariadb-connector-c-cmake mariadb-connector-c)
add_contrib (libfiu-cmake libfiu)
if (ENABLE_TESTS)
add_contrib (googletest-cmake googletest)
@ -177,7 +178,19 @@ endif()
add_contrib (sqlite-cmake sqlite-amalgamation)
add_contrib (s2geometry-cmake s2geometry)
add_contrib (c-ares-cmake c-ares)
add_contrib (qpl-cmake qpl)
if (OS_LINUX AND ARCH_AMD64 AND (ENABLE_AVX2 OR ENABLE_AVX512))
option (ENABLE_QPL "Enable Intel® Query Processing Library" ${ENABLE_LIBRARIES})
elseif(ENABLE_QPL)
message (${RECONFIGURE_MESSAGE_LEVEL} "QPL library is only supported on x86_64 arch with avx2/avx512 support")
endif()
if (ENABLE_QPL)
add_contrib (idxd-config-cmake idxd-config)
add_contrib (qpl-cmake qpl) # requires: idxd-config
else()
message(STATUS "Not using QPL")
endif ()
add_contrib (morton-nd-cmake morton-nd)
if (ARCH_S390X)
add_contrib(crc32-s390x-cmake crc32-s390x)

View File

@ -111,6 +111,8 @@ elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "mips")
set(ARCH "generic")
elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "ppc64le")
set(ARCH "ppc64le")
elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "riscv64")
set(ARCH "riscv64")
else()
message(FATAL_ERROR "Unknown processor:" ${CMAKE_SYSTEM_PROCESSOR})
endif()

1
contrib/idxd-config vendored Submodule

@ -0,0 +1 @@
Subproject commit f6605c41a735e3fdfef2d2d18655a33af6490b99

View File

@ -0,0 +1,23 @@
## accel_config is the utility library required by QPL-Deflate codec for controlling and configuring Intel® In-Memory Analytics Accelerator (Intel® IAA).
set (LIBACCEL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config")
set (UUID_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake")
set (LIBACCEL_HEADER_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config-cmake/include")
set (SRCS
"${LIBACCEL_SOURCE_DIR}/accfg/lib/libaccfg.c"
"${LIBACCEL_SOURCE_DIR}/util/log.c"
"${LIBACCEL_SOURCE_DIR}/util/sysfs.c"
)
add_library(_accel-config ${SRCS})
target_compile_options(_accel-config PRIVATE "-D_GNU_SOURCE")
target_include_directories(_accel-config BEFORE
PRIVATE ${UUID_DIR}
PRIVATE ${LIBACCEL_HEADER_DIR}
PRIVATE ${LIBACCEL_SOURCE_DIR})
target_include_directories(_accel-config SYSTEM BEFORE
PUBLIC ${LIBACCEL_SOURCE_DIR}/accfg)
add_library(ch_contrib::accel-config ALIAS _accel-config)

1
contrib/libfiu vendored Submodule

@ -0,0 +1 @@
Subproject commit b85edbde4cf974b1b40d27828a56f0505f4e2ee5

View File

@ -0,0 +1,20 @@
if (NOT ENABLE_FIU)
message (STATUS "Not using fiu")
return ()
endif ()
set(FIU_DIR "${ClickHouse_SOURCE_DIR}/contrib/libfiu/")
set(FIU_SOURCES
${FIU_DIR}/libfiu/fiu.c
${FIU_DIR}/libfiu/fiu-rc.c
${FIU_DIR}/libfiu/backtrace.c
${FIU_DIR}/libfiu/wtable.c
)
set(FIU_HEADERS "${FIU_DIR}/libfiu")
add_library(_fiu ${FIU_SOURCES})
target_compile_definitions(_fiu PUBLIC DUMMY_BACKTRACE)
target_include_directories(_fiu PUBLIC ${FIU_HEADERS})
add_library(ch_contrib::fiu ALIAS _fiu)

View File

@ -1,36 +1,5 @@
## The Intel® QPL provides high performance implementations of data processing functions for existing hardware accelerator, and/or software path in case if hardware accelerator is not available.
if (OS_LINUX AND ARCH_AMD64 AND (ENABLE_AVX2 OR ENABLE_AVX512))
option (ENABLE_QPL "Enable Intel® Query Processing Library" ${ENABLE_LIBRARIES})
elseif(ENABLE_QPL)
message (${RECONFIGURE_MESSAGE_LEVEL} "QPL library is only supported on x86_64 arch with avx2/avx512 support")
endif()
if (NOT ENABLE_QPL)
message(STATUS "Not using QPL")
return()
endif()
## QPL has build dependency on libaccel-config. Here is to build libaccel-config which is required by QPL.
## libaccel-config is the utility library for controlling and configuring Intel® In-Memory Analytics Accelerator (Intel® IAA).
set (LIBACCEL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config")
set (UUID_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake")
set (LIBACCEL_HEADER_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake/idxd-header")
set (SRCS
"${LIBACCEL_SOURCE_DIR}/accfg/lib/libaccfg.c"
"${LIBACCEL_SOURCE_DIR}/util/log.c"
"${LIBACCEL_SOURCE_DIR}/util/sysfs.c"
)
add_library(accel-config ${SRCS})
target_compile_options(accel-config PRIVATE "-D_GNU_SOURCE")
target_include_directories(accel-config BEFORE
PRIVATE ${UUID_DIR}
PRIVATE ${LIBACCEL_HEADER_DIR}
PRIVATE ${LIBACCEL_SOURCE_DIR})
## QPL build start here.
set (QPL_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl")
set (QPL_SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl/sources")
set (QPL_BINARY_DIR "${ClickHouse_BINARY_DIR}/build/contrib/qpl")
@ -342,12 +311,12 @@ target_compile_definitions(_qpl
PUBLIC -DENABLE_QPL_COMPRESSION)
target_link_libraries(_qpl
PRIVATE accel-config
PRIVATE ch_contrib::accel-config
PRIVATE ch_contrib::isal
PRIVATE ${CMAKE_DL_LIBS})
add_library (ch_contrib::qpl ALIAS _qpl)
target_include_directories(_qpl SYSTEM BEFORE
PUBLIC "${QPL_PROJECT_DIR}/include"
PUBLIC "${LIBACCEL_SOURCE_DIR}/accfg"
PUBLIC ${UUID_DIR})
add_library (ch_contrib::qpl ALIAS _qpl)

View File

@ -36,12 +36,10 @@ RUN arch=${TARGETARCH:-amd64} \
# repo versions doesn't work correctly with C++17
# also we push reports to s3, so we add index.html to subfolder urls
# https://github.com/ClickHouse-Extras/woboq_codebrowser/commit/37e15eaf377b920acb0b48dbe82471be9203f76b
RUN git clone https://github.com/ClickHouse/woboq_codebrowser \
&& cd woboq_codebrowser \
RUN git clone --depth=1 https://github.com/ClickHouse/woboq_codebrowser /woboq_codebrowser \
&& cd /woboq_codebrowser \
&& cmake . -G Ninja -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER=clang\+\+-${LLVM_VERSION} -DCMAKE_C_COMPILER=clang-${LLVM_VERSION} \
&& ninja \
&& cd .. \
&& rm -rf woboq_codebrowser
&& ninja
ENV CODEGEN=/woboq_codebrowser/generator/codebrowser_generator
ENV CODEINDEX=/woboq_codebrowser/indexgenerator/codebrowser_indexgenerator

View File

@ -147,6 +147,7 @@ function clone_submodules
contrib/xxHash
contrib/simdjson
contrib/liburing
contrib/libfiu
)
git submodule sync

View File

@ -90,15 +90,17 @@ SELECT * FROM mySecondReplacingMT FINAL;
### is_deleted
`is_deleted` — Name of the column with the type of row: `1` is a “deleted“ row, `0` is a “state“ row.
`is_deleted` — Name of a column used during a merge to determine whether the data in this row represents the state or is to be deleted; `1` is a “deleted“ row, `0` is a “state“ row.
Column data type — `Int8`.
Column data type — `UInt8`.
Can only be enabled when `ver` is used.
The row is deleted when use the `OPTIMIZE ... FINAL CLEANUP`, or `OPTIMIZE ... FINAL` if the engine settings `clean_deleted_rows` has been set to `Always`.
No matter the operation on the data, the version must be increased. If two inserted rows have the same version number, the last inserted one is the one kept.
:::note
`is_deleted` can only be enabled when `ver` is used.
The row is deleted when `OPTIMIZE ... FINAL CLEANUP` or `OPTIMIZE ... FINAL` is used, or if the engine setting `clean_deleted_rows` has been set to `Always`.
No matter the operation on the data, the version must be increased. If two inserted rows have the same version number, the last inserted row is the one kept.
:::
## Query clauses

View File

@ -184,6 +184,15 @@ sudo yum install -y yum-utils
sudo yum-config-manager --add-repo https://packages.clickhouse.com/rpm/clickhouse.repo
```
For systems with `zypper` package manager (openSUSE, SLES):
``` bash
sudo zypper addrepo -r https://packages.clickhouse.com/rpm/clickhouse.repo -g
sudo zypper --gpg-auto-import-keys refresh clickhouse-stable
```
Later any `yum install` can be replaced by `zypper install`. To specify a particular version, add `-$VERSION` to the end of the package name, e.g. `clickhouse-client-22.2.2.22`.
#### Install ClickHouse server and client
```bash

View File

@ -30,7 +30,7 @@ description: In order to effectively mitigate possible human errors, you should
```
:::note ALL
`ALL` is only applicable to the `RESTORE` command.
`ALL` is only applicable to the `RESTORE` command prior to version 23.4 of Clickhouse.
:::
## Background

View File

@ -1045,7 +1045,7 @@ Default value: `0`.
## background_pool_size {#background_pool_size}
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
Before changing it, please also take a look at related MergeTree settings, such as [number_of_free_entries_in_pool_to_lower_max_size_of_merge](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-lower-max-size-of-merge) and [number_of_free_entries_in_pool_to_execute_mutation](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-execute-mutation).
@ -1063,8 +1063,8 @@ Default value: 16.
## background_merges_mutations_concurrency_ratio {#background_merges_mutations_concurrency_ratio}
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example if the ratio equals to 2 and
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operation could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example, if the ratio equals to 2 and
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operations could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
The same as for `background_pool_size` setting `background_merges_mutations_concurrency_ratio` could be applied from the `default` profile for backward compatibility.
Possible values:
@ -1079,6 +1079,33 @@ Default value: 2.
<background_merges_mutations_concurrency_ratio>3</background_merges_mutations_concurrency_ratio>
```
## merges_mutations_memory_usage_soft_limit {#merges_mutations_memory_usage_soft_limit}
Sets the limit on how much RAM is allowed to use for performing merge and mutation operations.
Zero means unlimited.
If ClickHouse reaches this limit, it won't schedule any new background merge or mutation operations but will continue to execute already scheduled tasks.
Possible values:
- Any positive integer.
**Example**
```xml
<merges_mutations_memory_usage_soft_limit>0</merges_mutations_memory_usage_soft_limit>
```
## merges_mutations_memory_usage_to_ram_ratio {#merges_mutations_memory_usage_to_ram_ratio}
The default `merges_mutations_memory_usage_soft_limit` value is calculated as `memory_amount * merges_mutations_memory_usage_to_ram_ratio`.
Default value: `0.5`.
**See also**
- [max_memory_usage](../../operations/settings/query-complexity.md#settings_max_memory_usage)
- [merges_mutations_memory_usage_soft_limit](#merges_mutations_memory_usage_soft_limit)
## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy}
Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart.

View File

@ -1125,6 +1125,12 @@ If unsuccessful, several attempts are made to connect to various replicas.
Default value: 1000.
## connect_timeout_with_failover_secure_ms
Connection timeout for selecting first healthy replica (for secure connections)
Default value: 1000.
## connection_pool_max_wait_ms {#connection-pool-max-wait-ms}
The wait time in milliseconds for a connection when the connection pool is full.
@ -1410,8 +1416,8 @@ and [enable_writes_to_query_cache](#enable-writes-to-query-cache) control in mor
Possible values:
- 0 - Yes
- 1 - No
- 0 - Disabled
- 1 - Enabled
Default value: `0`.
@ -3562,7 +3568,7 @@ Default value: `1`.
If the setting is set to `0`, the table function does not make Nullable columns and inserts default values instead of NULL. This is also applicable for NULL values inside arrays.
## allow_experimental_projection_optimization {#allow-experimental-projection-optimization}
## optimize_use_projections {#optimize_use_projections}
Enables or disables [projection](../../engines/table-engines/mergetree-family/mergetree.md/#projections) optimization when processing `SELECT` queries.
@ -3575,7 +3581,7 @@ Default value: `1`.
## force_optimize_projection {#force-optimize-projection}
Enables or disables the obligatory use of [projections](../../engines/table-engines/mergetree-family/mergetree.md/#projections) in `SELECT` queries, when projection optimization is enabled (see [allow_experimental_projection_optimization](#allow-experimental-projection-optimization) setting).
Enables or disables the obligatory use of [projections](../../engines/table-engines/mergetree-family/mergetree.md/#projections) in `SELECT` queries, when projection optimization is enabled (see [optimize_use_projections](#optimize_use_projections) setting).
Possible values:

View File

@ -0,0 +1,55 @@
---
slug: /en/sql-reference/aggregate-functions/reference/first_value
sidebar_position: 7
---
# first_value
Selects the first encountered value, similar to `any`, but could accept NULL.
## examples
```sql
insert into test_data (a,b) values (1,null), (2,3), (4, 5), (6,null)
```
### example1
The NULL value is ignored at default.
```sql
select first_value(b) from test_data
```
```text
┌─first_value_ignore_nulls(b)─┐
│ 3 │
└─────────────────────────────┘
```
### example2
The NULL value is ignored.
```sql
select first_value(b) ignore nulls sfrom test_data
```
```text
┌─first_value_ignore_nulls(b)─┐
│ 3 │
└─────────────────────────────┘
```
### example3
The NULL value is accepted.
```sql
select first_value(b) respect nulls from test_data
```
```text
┌─first_value_respect_nulls(b)─┐
│ ᴺᵁᴸᴸ │
└──────────────────────────────┘
```

View File

@ -26,6 +26,8 @@ ClickHouse-specific aggregate functions:
- [anyHeavy](../../../sql-reference/aggregate-functions/reference/anyheavy.md)
- [anyLast](../../../sql-reference/aggregate-functions/reference/anylast.md)
- [first_value](../../../sql-reference/aggregate-functions/reference/first_value.md)
- [last_value](../../../sql-reference/aggregate-functions/reference/last_value.md)
- [argMin](../../../sql-reference/aggregate-functions/reference/argmin.md)
- [argMax](../../../sql-reference/aggregate-functions/reference/argmax.md)
- [avgWeighted](../../../sql-reference/aggregate-functions/reference/avgweighted.md)

View File

@ -0,0 +1,53 @@
---
slug: /en/sql-reference/aggregate-functions/reference/last_value
sidebar_position: 8
---
# first_value
Selects the last encountered value, similar to `anyLast`, but could accept NULL.
## examples
```sql
insert into test_data (a,b) values (1,null), (2,3), (4, 5), (6,null)
```
### example1
The NULL value is ignored at default.
```sql
select last_value(b) from test_data
```
```text
┌─last_value_ignore_nulls(b)─┐
│ 5 │
└────────────────────────────┘
```
### example2
The NULL value is ignored.
```sql
select last_value(b) ignore nulls from test_data
```
```text
┌─last_value_ignore_nulls(b)─┐
│ 5 │
└────────────────────────────┘
```
### example3
The NULL value is accepted.
```sql
select last_value(b) respect nulls from test_data
```
```text
┌─last_value_respect_nulls(b)─┐
│ ᴺᵁᴸᴸ │
└─────────────────────────────┘
```

View File

@ -8,10 +8,6 @@ sidebar_label: Interval
The family of data types representing time and date intervals. The resulting types of the [INTERVAL](../../../sql-reference/operators/index.md#operator-interval) operator.
:::note
`Interval` data type values cant be stored in tables.
:::
Structure:
- Time interval as an unsigned integer value.
@ -19,6 +15,9 @@ Structure:
Supported interval types:
- `NANOSECOND`
- `MICROSECOND`
- `MILLISECOND`
- `SECOND`
- `MINUTE`
- `HOUR`

View File

@ -78,6 +78,22 @@ GROUP BY
│ 1 │ Bobruisk │ Firefox │
└─────────────┴──────────┴─────────┘
```
### Important note!
Using multiple `arrayJoin` with same expression may not produce expected results due to optimizations.
For that cases, consider modifying repeated array expression with extra operations that do not affect join result - e.g. `arrayJoin(arraySort(arr))`, `arrayJoin(arrayConcat(arr, []))`
Example:
```sql
SELECT
arrayJoin(dice) as first_throw,
/* arrayJoin(dice) as second_throw */ -- is technically correct, but will annihilate result set
arrayJoin(arrayConcat(dice, [])) as second_throw -- intentionally changed expression to force re-evaluation
FROM (
SELECT [1, 2, 3, 4, 5, 6] as dice
);
```
Note the [ARRAY JOIN](../statements/select/array-join.md) syntax in the SELECT query, which provides broader possibilities.
`ARRAY JOIN` allows you to convert multiple arrays with the same number of elements at a time.

View File

@ -26,19 +26,27 @@ SELECT
## makeDate
Creates a [Date](../../sql-reference/data-types/date.md) from a year, month and day argument.
Creates a [Date](../../sql-reference/data-types/date.md)
- from a year, month and day argument, or
- from a year and day of year argument.
**Syntax**
``` sql
makeDate(year, month, day)
makeDate(year, month, day);
makeDate(year, day_of_year);
```
Alias:
- `MAKEDATE(year, month, day);`
- `MAKEDATE(year, day_of_year);`
**Arguments**
- `year` — Year. [Integer](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Decimal](../../sql-reference/data-types/decimal.md).
- `month` — Month. [Integer](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Decimal](../../sql-reference/data-types/decimal.md).
- `day` — Day. [Integer](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Decimal](../../sql-reference/data-types/decimal.md).
- `day_of_year` — Day of the year. [Integer](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Decimal](../../sql-reference/data-types/decimal.md).
**Returned value**
@ -48,6 +56,8 @@ Type: [Date](../../sql-reference/data-types/date.md).
**Example**
Create a Date from a year, month and day:
``` sql
SELECT makeDate(2023, 2, 28) AS Date;
```
@ -60,6 +70,19 @@ Result:
└────────────┘
```
Create a Date from a year and day of year argument:
``` sql
SELECT makeDate(2023, 42) AS Date;
```
Result:
``` text
┌───────date─┐
│ 2023-02-11 │
└────────────┘
```
## makeDate32
Like [makeDate](#makeDate) but produces a [Date32](../../sql-reference/data-types/date32.md).
@ -108,6 +131,12 @@ Result:
Like [makeDateTime](#makedatetime) but produces a [DateTime64](../../sql-reference/data-types/datetime64.md).
**Syntax**
``` sql
makeDateTime32(year, month, day, hour, minute, second[, fraction[, precision[, timezone]]])
```
## timeZone
Returns the timezone of the server.

View File

@ -1215,96 +1215,3 @@ Result:
│ A240 │
└──────────────────┘
```
## extractKeyValuePairs
Extracts key-value pairs from any string. The string does not need to be 100% structured in a key value pair format;
It can contain noise (e.g. log files). The key-value pair format to be interpreted should be specified via function arguments.
A key-value pair consists of a key followed by a `key_value_delimiter` and a value. Quoted keys and values are also supported. Key value pairs must be separated by pair delimiters.
**Syntax**
``` sql
extractKeyValuePairs(data, [key_value_delimiter], [pair_delimiter], [quoting_character])
```
**Arguments**
- `data` - String to extract key-value pairs from. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `key_value_delimiter` - Character to be used as delimiter between the key and the value. Defaults to `:`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `pair_delimiters` - Set of character to be used as delimiters between pairs. Defaults to `\space`, `,` and `;`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `quoting_character` - Character to be used as quoting character. Defaults to `"`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
**Returned values**
- The extracted key-value pairs in a Map(String, String).
**Examples**
Query:
**Simple case**
``` sql
arthur :) select extractKeyValuePairs('name:neymar, age:31 team:psg,nationality:brazil') as kv
SELECT extractKeyValuePairs('name:neymar, age:31 team:psg,nationality:brazil') as kv
Query id: f9e0ca6f-3178-4ee2-aa2c-a5517abb9cee
┌─kv──────────────────────────────────────────────────────────────────────┐
│ {'name':'neymar','age':'31','team':'psg','nationality':'brazil'} │
└─────────────────────────────────────────────────────────────────────────┘
```
**Single quote as quoting character**
``` sql
arthur :) select extractKeyValuePairs('name:\'neymar\';\'age\':31;team:psg;nationality:brazil,last_key:last_value', ':', ';,', '\'') as kv
SELECT extractKeyValuePairs('name:\'neymar\';\'age\':31;team:psg;nationality:brazil,last_key:last_value', ':', ';,', '\'') as kv
Query id: 0e22bf6b-9844-414a-99dc-32bf647abd5e
┌─kv───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ {'name':'neymar','age':'31','team':'psg','nationality':'brazil','last_key':'last_value'} │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
**Escape sequences without escape sequences support**
``` sql
arthur :) select extractKeyValuePairs('age:a\\x0A\\n\\0') as kv
SELECT extractKeyValuePairs('age:a\\x0A\\n\\0') AS kv
Query id: e9fd26ee-b41f-4a11-b17f-25af6fd5d356
┌─kv─────────────────────┐
│ {'age':'a\\x0A\\n\\0'} │
└────────────────────────┘
```
## extractKeyValuePairsWithEscaping
Same as `extractKeyValuePairs` but with escaping support.
Escape sequences supported: `\x`, `\N`, `\a`, `\b`, `\e`, `\f`, `\n`, `\r`, `\t`, `\v` and `\0`.
Non standard escape sequences are returned as it is (including the backslash) unless they are one of the following:
`\\`, `'`, `"`, `backtick`, `/`, `=` or ASCII control characters (c <= 31).
This function will satisfy the use case where pre-escaping and post-escaping are not suitable. For instance, consider the following
input string: `a: "aaaa\"bbb"`. The expected output is: `a: aaaa\"bbbb`.
- Pre-escaping: Pre-escaping it will output: `a: "aaaa"bbb"` and `extractKeyValuePairs` will then output: `a: aaaa`
- Post-escaping: `extractKeyValuePairs` will output `a: aaaa\` and post-escaping will keep it as it is.
Leading escape sequences will be skipped in keys and will be considered invalid for values.
**Escape sequences with escape sequence support turned on**
``` sql
arthur :) select extractKeyValuePairsWithEscaping('age:a\\x0A\\n\\0') as kv
SELECT extractKeyValuePairsWithEscaping('age:a\\x0A\\n\\0') AS kv
Query id: 44c114f0-5658-4c75-ab87-4574de3a1645
┌─kv────────────────┐
│ {'age':'a\n\n\0'} │
└───────────────────┘
```

View File

@ -109,6 +109,108 @@ SELECT mapFromArrays([1, 2, 3], map('a', 1, 'b', 2, 'c', 3))
└───────────────────────────────────────────────────────┘
```
## extractKeyValuePairs
Extracts key-value pairs, i.e. a [Map(String, String)](../../sql-reference/data-types/map.md), from a string. Parsing is robust towards noise (e.g. log files).
A key-value pair consists of a key, followed by a `key_value_delimiter` and a value. Key value pairs must be separated by `pair_delimiter`. Quoted keys and values are also supported.
**Syntax**
``` sql
extractKeyValuePairs(data[, key_value_delimiter[, pair_delimiter[, quoting_character]]])
```
Alias:
- `str_to_map`
- `mapFromString`
**Arguments**
- `data` - String to extract key-value pairs from. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `key_value_delimiter` - Character to be used as delimiter between the key and the value. Defaults to `:`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `pair_delimiters` - Set of character to be used as delimiters between pairs. Defaults to ` `, `,` and `;`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `quoting_character` - Character to be used as quoting character. Defaults to `"`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
**Returned values**
- A [Map(String, String)](../../sql-reference/data-types/map.md) of key-value pairs.
**Examples**
Simple case:
``` sql
SELECT extractKeyValuePairs('name:neymar, age:31 team:psg,nationality:brazil') as kv
```
Result:
``` Result:
┌─kv──────────────────────────────────────────────────────────────────────┐
│ {'name':'neymar','age':'31','team':'psg','nationality':'brazil'} │
└─────────────────────────────────────────────────────────────────────────┘
```
Single quote as quoting character:
``` sql
SELECT extractKeyValuePairs('name:\'neymar\';\'age\':31;team:psg;nationality:brazil,last_key:last_value', ':', ';,', '\'') as kv
```
Result:
``` text
┌─kv───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ {'name':'neymar','age':'31','team':'psg','nationality':'brazil','last_key':'last_value'} │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
Escape sequences without escape sequences support:
``` sql
SELECT extractKeyValuePairs('age:a\\x0A\\n\\0') AS kv
```
Result:
``` text
┌─kv─────────────────────┐
│ {'age':'a\\x0A\\n\\0'} │
└────────────────────────┘
```
## extractKeyValuePairsWithEscaping
Same as `extractKeyValuePairs` but with escaping support.
Supported escape sequences: `\x`, `\N`, `\a`, `\b`, `\e`, `\f`, `\n`, `\r`, `\t`, `\v` and `\0`.
Non standard escape sequences are returned as it is (including the backslash) unless they are one of the following:
`\\`, `'`, `"`, `backtick`, `/`, `=` or ASCII control characters (c <= 31).
This function will satisfy the use case where pre-escaping and post-escaping are not suitable. For instance, consider the following
input string: `a: "aaaa\"bbb"`. The expected output is: `a: aaaa\"bbbb`.
- Pre-escaping: Pre-escaping it will output: `a: "aaaa"bbb"` and `extractKeyValuePairs` will then output: `a: aaaa`
- Post-escaping: `extractKeyValuePairs` will output `a: aaaa\` and post-escaping will keep it as it is.
Leading escape sequences will be skipped in keys and will be considered invalid for values.
**Examples**
Escape sequences with escape sequence support turned on:
``` sql
SELECT extractKeyValuePairsWithEscaping('age:a\\x0A\\n\\0') AS kv
```
Result:
``` result
┌─kv────────────────┐
│ {'age':'a\n\n\0'} │
└───────────────────┘
```
## mapAdd
Collect all the keys and sum corresponding values.

View File

@ -103,7 +103,11 @@ ALTER TABLE table2 [ON CLUSTER cluster] ATTACH PARTITION partition_expr FROM tab
```
This query copies the data partition from `table1` to `table2`.
Note that data will be deleted neither from `table1` nor from `table2`.
Note that:
- Data will be deleted neither from `table1` nor from `table2`.
- `table1` may be a temporary table.
For the query to run successfully, the following conditions must be met:
@ -117,7 +121,12 @@ For the query to run successfully, the following conditions must be met:
ALTER TABLE table2 [ON CLUSTER cluster] REPLACE PARTITION partition_expr FROM table1
```
This query copies the data partition from the `table1` to `table2` and replaces existing partition in the `table2`. Note that data wont be deleted from `table1`.
This query copies the data partition from the `table1` to `table2` and replaces existing partition in the `table2`.
Note that:
- Data wont be deleted from `table1`.
- `table1` may be a temporary table.
For the query to run successfully, the following conditions must be met:

View File

@ -12,7 +12,7 @@ Compressed files are supported. Compression type is detected by the extension of
**Syntax**
```sql
SELECT <expr_list> INTO OUTFILE file_name [AND STDOUT] [COMPRESSION type [LEVEL level]]
SELECT <expr_list> INTO OUTFILE file_name [AND STDOUT] [APPEND] [COMPRESSION type [LEVEL level]]
```
`file_name` and `type` are string literals. Supported compression types are: `'none'`, `'gzip'`, `'deflate'`, `'br'`, `'xz'`, `'zstd'`, `'lz4'`, `'bz2'`.
@ -25,6 +25,7 @@ SELECT <expr_list> INTO OUTFILE file_name [AND STDOUT] [COMPRESSION type [LEVEL
- The query will fail if a file with the same file name already exists.
- The default [output format](../../../interfaces/formats.md) is `TabSeparated` (like in the command-line client batch mode). Use [FORMAT](format.md) clause to change it.
- If `AND STDOUT` is mentioned in the query then the output that is written to the file is also displayed on standard output. If used with compression, the plaintext is displayed on standard output.
- If `APPEND` is mentioned in the query then the output is appended to an existing file. If compression is used, append cannot be used.
**Example**

View File

@ -1,7 +1,7 @@
---
slug: /en/sql-reference/table-functions/dictionary
sidebar_position: 54
sidebar_label: dictionary function
sidebar_label: dictionary
title: dictionary
---

View File

@ -391,7 +391,7 @@ INDEX b (u64 * length(str), i32 + f64 * 100, date, str) TYPE set(100) GRANULARIT
## Проекции {#projections}
Проекции похожи на [материализованные представления](../../../sql-reference/statements/create/view.md#materialized), но определяются на уровне кусков данных. Это обеспечивает гарантии согласованности данных наряду с автоматическим использованием в запросах.
Проекции — это экспериментальная возможность. Чтобы включить поддержку проекций, установите настройку [allow_experimental_projection_optimization](../../../operations/settings/settings.md#allow-experimental-projection-optimization) в значение `1`. См. также настройку [force_optimize_projection ](../../../operations/settings/settings.md#force-optimize-projection).
Проекции — это экспериментальная возможность. Чтобы включить поддержку проекций, установите настройку [optimize_use_projections](../../../operations/settings/settings.md#allow-experimental-projection-optimization) в значение `1`. См. также настройку [force_optimize_projection ](../../../operations/settings/settings.md#optimize_use_projections).
Проекции не поддерживаются для запросов `SELECT` с модификатором [FINAL](../../../sql-reference/statements/select/from.md#select-from-final).

View File

@ -77,15 +77,37 @@ clickhouse-client # or "clickhouse-client --password" if you set up a password.
Команда ClickHouse в Яндексе рекомендует использовать официальные предкомпилированные `rpm` пакеты для CentOS, RedHat и всех остальных дистрибутивов Linux, основанных на rpm.
#### Установка официального репозитория
Сначала нужно подключить официальный репозиторий:
``` bash
sudo yum install -y yum-utils
sudo yum-config-manager --add-repo https://packages.clickhouse.com/rpm/clickhouse.repo
sudo yum install -y clickhouse-server clickhouse-client
```
sudo /etc/init.d/clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.
Для систем с пакетным менеджером `zypper` (openSUSE, SLES):
``` bash
sudo zypper addrepo -r https://packages.clickhouse.com/rpm/clickhouse.repo -g
sudo zypper --gpg-auto-import-keys refresh clickhouse-stable
```
Далее любая команда `yum install` может быть заменена на `zypper install`. Чтобы указать желаемую версию, необходимо добавить `-$VERSION` в имени пакета, например `clickhouse-client-22.2.2.22`.
#### Установка сервера и клиента
``` bash
sudo yum install -y clickhouse-server clickhouse-client
```
#### Запуск сервера
``` bash
sudo systemctl enable clickhouse-server
sudo systemctl start clickhouse-server
sudo systemctl status clickhouse-server
clickhouse-client # илм "clickhouse-client --password" если установлен пароль
```
<details markdown="1">

View File

@ -3588,7 +3588,7 @@ SETTINGS index_granularity = 8192 │
Строка с идентификатором снэпшота, из которого будет выполняться [исходный дамп таблиц PostgreSQL](../../engines/database-engines/materialized-postgresql.md). Эта настройка должна использоваться совместно с [materialized_postgresql_replication_slot](#materialized-postgresql-replication-slot).
## allow_experimental_projection_optimization {#allow-experimental-projection-optimization}
## optimize_use_projections {#optimize_use_projections}
Включает или отключает поддержку [проекций](../../engines/table-engines/mergetree-family/mergetree.md#projections) при обработке запросов `SELECT`.
@ -3601,7 +3601,7 @@ SETTINGS index_granularity = 8192 │
## force_optimize_projection {#force-optimize-projection}
Включает или отключает обязательное использование [проекций](../../engines/table-engines/mergetree-family/mergetree.md#projections) в запросах `SELECT`, если поддержка проекций включена (см. настройку [allow_experimental_projection_optimization](#allow-experimental-projection-optimization)).
Включает или отключает обязательное использование [проекций](../../engines/table-engines/mergetree-family/mergetree.md#projections) в запросах `SELECT`, если поддержка проекций включена (см. настройку [optimize_use_projections](#optimize_use_projections)).
Возможные значения:

View File

@ -102,7 +102,11 @@ ALTER TABLE table2 [ON CLUSTER cluster] ATTACH PARTITION partition_expr FROM tab
```
Копирует партицию из таблицы `table1` в таблицу `table2`.
Обратите внимание, что данные не удаляются ни из `table1`, ни из `table2`.
Обратите внимание, что:
- Данные не удаляются ни из `table1`, ни из `table2`.
- `table1` может быть временной таблицей.
Следует иметь в виду:
@ -118,7 +122,12 @@ ALTER TABLE table2 [ON CLUSTER cluster] ATTACH PARTITION partition_expr FROM tab
ALTER TABLE table2 [ON CLUSTER cluster] REPLACE PARTITION partition_expr FROM table1
```
Копирует партицию из таблицы `table1` в таблицу `table2` с заменой существующих данных в `table2`. Данные из `table1` не удаляются.
Копирует партицию из таблицы `table1` в таблицу `table2` с заменой существующих данных в `table2`.
Обратите внимание, что:
- Данные из `table1` не удаляются.
- `table1` может быть временной таблицей.
Следует иметь в виду:

View File

@ -84,6 +84,17 @@ sudo /etc/init.d/clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.
```
For systems with `zypper` package manager (openSUSE, SLES):
``` bash
sudo zypper addrepo -r https://packages.clickhouse.com/rpm/clickhouse.repo -g
sudo zypper --gpg-auto-import-keys refresh clickhouse-stable
sudo zypper install -y clickhouse-server clickhouse-client
sudo /etc/init.d/clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.
```
<details markdown="1">
<summary>Deprecated Method for installing rpm-packages</summary>

View File

@ -1074,7 +1074,7 @@ ClickHouse服务器日志文件中相应的跟踪日志确认了ClickHouse正在
<a href="https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree/#projections" target="_blank">Projections</a>目前是一个实验性的功能因此我们需要告诉ClickHouse
```sql
SET allow_experimental_projection_optimization = 1;
SET optimize_use_projections = 1;
```

View File

@ -862,7 +862,8 @@ bool Client::processWithFuzzing(const String & full_query)
const auto * tmp_pos = text_2.c_str();
const auto ast_3 = parseQuery(tmp_pos, tmp_pos + text_2.size(),
false /* allow_multi_statements */);
const auto text_3 = ast_3->formatForErrorMessage();
const auto text_3 = ast_3 ? ast_3->formatForErrorMessage() : "";
if (text_3 != text_2)
{
fmt::print(stderr, "Found error: The query formatting is broken.\n");
@ -877,7 +878,7 @@ bool Client::processWithFuzzing(const String & full_query)
fmt::print(stderr, "Text-1 (AST-1 formatted):\n'{}'\n", query_to_execute);
fmt::print(stderr, "AST-2 (Text-1 parsed):\n'{}'\n", ast_2->dumpTree());
fmt::print(stderr, "Text-2 (AST-2 formatted):\n'{}'\n", text_2);
fmt::print(stderr, "AST-3 (Text-2 parsed):\n'{}'\n", ast_3->dumpTree());
fmt::print(stderr, "AST-3 (Text-2 parsed):\n'{}'\n", ast_3 ? ast_3->dumpTree() : "");
fmt::print(stderr, "Text-3 (AST-3 formatted):\n'{}'\n", text_3);
fmt::print(stderr, "Text-3 must be equal to Text-2, but it is not.\n");

View File

@ -114,7 +114,7 @@ if (BUILD_STANDALONE_KEEPER)
clickhouse_add_executable(clickhouse-keeper ${CLICKHOUSE_KEEPER_STANDALONE_SOURCES})
# Remove some redundant dependencies
target_compile_definitions (clickhouse-keeper PRIVATE -DKEEPER_STANDALONE_BUILD)
target_compile_definitions (clickhouse-keeper PRIVATE -DCLICKHOUSE_PROGRAM_STANDALONE_BUILD)
target_compile_definitions (clickhouse-keeper PUBLIC -DWITHOUT_TEXT_LOG)
target_include_directories(clickhouse-keeper PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../src") # uses includes from src directory

View File

@ -57,7 +57,7 @@ int mainEntryClickHouseKeeper(int argc, char ** argv)
}
}
#ifdef KEEPER_STANDALONE_BUILD
#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
// Weak symbols don't work correctly on Darwin
// so we have a stub implementation to avoid linker errors

View File

@ -27,7 +27,6 @@
#include <Common/ConcurrencyControl.h>
#include <Common/Macros.h>
#include <Common/ShellCommand.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <Common/getMultipleKeysFromConfig.h>
@ -98,9 +97,7 @@
#include "config_version.h"
#if defined(OS_LINUX)
# include <cstddef>
# include <cstdlib>
# include <sys/socket.h>
# include <sys/un.h>
# include <sys/mman.h>
# include <sys/ptrace.h>
@ -108,7 +105,6 @@
#endif
#if USE_SSL
# include <Poco/Net/Context.h>
# include <Poco/Net/SecureServerSocket.h>
#endif
@ -134,6 +130,7 @@ namespace CurrentMetrics
extern const Metric Revision;
extern const Metric VersionInteger;
extern const Metric MemoryTracking;
extern const Metric MergesMutationsMemoryTracking;
extern const Metric MaxDDLEntryID;
extern const Metric MaxPushedDDLEntryID;
}
@ -1229,6 +1226,25 @@ try
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit;
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(memory_amount),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
}
LOG_INFO(log, "Merges and mutations memory limit is set to {}",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit));
background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit);
background_memory_tracker.setDescription("(background)");
background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking);
total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory);
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
@ -1242,8 +1258,13 @@ try
global_context->setMacros(std::make_unique<Macros>(*config, "macros", log));
global_context->setExternalAuthenticatorsConfig(*config);
global_context->loadOrReloadDictionaries(*config);
global_context->loadOrReloadUserDefinedExecutableFunctions(*config);
if (global_context->isServerCompletelyStarted())
{
/// It does not make sense to reload anything before server has started.
/// Moreover, it may break initialization order.
global_context->loadOrReloadDictionaries(*config);
global_context->loadOrReloadUserDefinedExecutableFunctions(*config);
}
global_context->setRemoteHostFilter(*config);
@ -1374,8 +1395,8 @@ try
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config(), socket, listen_host, port);
socket.setReceiveTimeout(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC));
socket.setSendTimeout(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC));
socket.setReceiveTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0));
socket.setSendTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
return ProtocolServerAdapter(
listen_host,
port_name,
@ -1397,8 +1418,8 @@ try
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(config(), socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC));
socket.setSendTimeout(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC));
socket.setReceiveTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0));
socket.setSendTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
return ProtocolServerAdapter(
listen_host,
secure_port_name,

View File

@ -185,6 +185,7 @@ enum class AccessType
M(SYSTEM_FLUSH, "", GROUP, SYSTEM) \
M(SYSTEM_THREAD_FUZZER, "SYSTEM START THREAD FUZZER, SYSTEM STOP THREAD FUZZER, START THREAD FUZZER, STOP THREAD FUZZER", GLOBAL, SYSTEM) \
M(SYSTEM_UNFREEZE, "SYSTEM UNFREEZE", GLOBAL, SYSTEM) \
M(SYSTEM_FAILPOINT, "SYSTEM ENABLE FAILPOINT, SYSTEM DISABLE FAILPOINT", GLOBAL, SYSTEM) \
M(SYSTEM, "", GROUP, ALL) /* allows to execute SYSTEM {SHUTDOWN|RELOAD CONFIG|...} */ \
\
M(dictGet, "dictHas, dictGetHierarchy, dictIsIn", DICTIONARY, ALL) /* allows to execute functions dictGet(), dictHas(), dictGetHierarchy(), dictIsIn() */\

View File

@ -14,11 +14,29 @@ AggregateFunctionPtr createAggregateFunctionAny(const std::string & name, const
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionAnyData>(name, argument_types, parameters, settings));
}
template <bool RespectNulls = false>
AggregateFunctionPtr createAggregateFunctionNullableAny(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(
createAggregateFunctionSingleNullableValue<AggregateFunctionsSingleValue, AggregateFunctionAnyData, RespectNulls>(
name, argument_types, parameters, settings));
}
AggregateFunctionPtr createAggregateFunctionAnyLast(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionAnyLastData>(name, argument_types, parameters, settings));
}
template <bool RespectNulls = false>
AggregateFunctionPtr createAggregateFunctionNullableAnyLast(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(createAggregateFunctionSingleNullableValue<
AggregateFunctionsSingleValue,
AggregateFunctionAnyLastData,
RespectNulls>(name, argument_types, parameters, settings));
}
AggregateFunctionPtr createAggregateFunctionAnyHeavy(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionAnyHeavyData>(name, argument_types, parameters, settings));
@ -38,9 +56,15 @@ void registerAggregateFunctionsAny(AggregateFunctionFactory & factory)
factory.registerFunction("first_value",
{ createAggregateFunctionAny, properties },
AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("first_value_respect_nulls",
{ createAggregateFunctionNullableAny<true>, properties },
AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("last_value",
{ createAggregateFunctionAnyLast, properties },
AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("last_value_respect_nulls",
{ createAggregateFunctionNullableAnyLast<true>, properties },
AggregateFunctionFactory::CaseInsensitive);
}
}

View File

@ -768,19 +768,23 @@ static_assert(
/// For any other value types.
template <bool IS_NULLABLE = false>
struct SingleValueDataGeneric
{
private:
using Self = SingleValueDataGeneric;
Field value;
bool has_value = false;
public:
static constexpr bool is_nullable = false;
static constexpr bool is_nullable = IS_NULLABLE;
static constexpr bool is_any = false;
bool has() const
{
if constexpr (is_nullable)
return has_value;
return !value.isNull();
}
@ -815,11 +819,15 @@ public:
void change(const IColumn & column, size_t row_num, Arena *)
{
column.get(row_num, value);
if constexpr (is_nullable)
has_value = true;
}
void change(const Self & to, Arena *)
{
value = to.value;
if constexpr (is_nullable)
has_value = true;
}
bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena)
@ -835,7 +843,7 @@ public:
bool changeFirstTime(const Self & to, Arena * arena)
{
if (!has() && to.has())
if (!has() && (is_nullable || to.has()))
{
change(to, arena);
return true;
@ -870,27 +878,61 @@ public:
}
else
{
Field new_value;
column.get(row_num, new_value);
if (new_value < value)
if constexpr (is_nullable)
{
value = new_value;
return true;
Field new_value;
column.get(row_num, new_value);
if (!value.isNull() && (new_value.isNull() || new_value < value))
{
value = new_value;
return true;
}
else
return false;
}
else
return false;
{
Field new_value;
column.get(row_num, new_value);
if (new_value < value)
{
value = new_value;
return true;
}
else
return false;
}
}
}
bool changeIfLess(const Self & to, Arena * arena)
{
if (to.has() && (!has() || to.value < value))
if (!to.has())
return false;
if constexpr (is_nullable)
{
change(to, arena);
return true;
if (!has())
{
change(to, arena);
return true;
}
if (to.value.isNull() || (!value.isNull() && to.value < value))
{
value = to.value;
return true;
}
return false;
}
else
return false;
{
if (!has() || to.value < value)
{
change(to, arena);
return true;
}
else
return false;
}
}
bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena)
@ -902,27 +944,55 @@ public:
}
else
{
Field new_value;
column.get(row_num, new_value);
if (new_value > value)
if constexpr (is_nullable)
{
value = new_value;
return true;
Field new_value;
column.get(row_num, new_value);
if (!value.isNull() && (new_value.isNull() || value < new_value))
{
value = new_value;
return true;
}
return false;
}
else
return false;
{
Field new_value;
column.get(row_num, new_value);
if (new_value > value)
{
value = new_value;
return true;
}
else
return false;
}
}
}
bool changeIfGreater(const Self & to, Arena * arena)
{
if (to.has() && (!has() || to.value > value))
if (!to.has())
return false;
if constexpr (is_nullable)
{
change(to, arena);
return true;
if (!value.isNull() && (to.value.isNull() || value < to.value))
{
value = to.value;
return true;
}
return false;
}
else
return false;
{
if (!has() || to.value > value)
{
change(to, arena);
return true;
}
else
return false;
}
}
bool isEqualTo(const IColumn & column, size_t row_num) const
@ -1359,6 +1429,17 @@ public:
this->data(place).insertResultInto(to);
}
AggregateFunctionPtr getOwnNullAdapter(
const AggregateFunctionPtr & nested_function,
const DataTypes & /*arguments*/,
const Array & /*params*/,
const AggregateFunctionProperties & /*properties*/) const override
{
if (Data::is_nullable)
return nested_function;
return nullptr;
}
#if USE_EMBEDDED_COMPILER
bool isCompilable() const override

View File

@ -75,7 +75,7 @@ public:
[[maybe_unused]] char symbol;
readChar(symbol, buf);
if (symbol != '\0')
throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect state of aggregate function 'nothing', it should contain exactly one zero byte.");
throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect state of aggregate function 'nothing', it should contain exactly one zero byte, while it is {}.", static_cast<UInt32>(symbol));
}
void insertResultInto(AggregateDataPtr __restrict, IColumn & to, Arena *) const override

View File

@ -9,7 +9,6 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeString.h>
namespace DB
{
struct Settings;
@ -22,7 +21,6 @@ static IAggregateFunction * createAggregateFunctionSingleValue(const String & na
assertUnary(name, argument_types);
const DataTypePtr & argument_type = argument_types[0];
WhichDataType which(argument_type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<Data<SingleValueDataFixed<TYPE>>>(argument_type); /// NOLINT
@ -46,7 +44,28 @@ static IAggregateFunction * createAggregateFunctionSingleValue(const String & na
if (which.idx == TypeIndex::String)
return new AggregateFunctionTemplate<Data<SingleValueDataString>>(argument_type);
return new AggregateFunctionTemplate<Data<SingleValueDataGeneric>>(argument_type);
return new AggregateFunctionTemplate<Data<SingleValueDataGeneric<>>>(argument_type);
}
template <template <typename> class AggregateFunctionTemplate, template <typename> class Data, bool RespectNulls = false>
static IAggregateFunction * createAggregateFunctionSingleNullableValue(const String & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);
const DataTypePtr & argument_type = argument_types[0];
WhichDataType which(argument_type);
// If the result value could be null (excluding the case that no row is matched),
// use SingleValueDataGeneric.
if constexpr (!RespectNulls)
{
return createAggregateFunctionSingleValue<AggregateFunctionTemplate, Data>(name, argument_types, Array(), settings);
}
else
{
return new AggregateFunctionTemplate<Data<SingleValueDataGeneric<true>>>(argument_type);
}
UNREACHABLE();
}
@ -79,7 +98,7 @@ static IAggregateFunction * createAggregateFunctionArgMinMaxSecond(const DataTyp
if (which.idx == TypeIndex::String)
return new AggregateFunctionArgMinMax<AggregateFunctionArgMinMaxData<ResData, MinMaxData<SingleValueDataString>>>(res_type, val_type);
return new AggregateFunctionArgMinMax<AggregateFunctionArgMinMaxData<ResData, MinMaxData<SingleValueDataGeneric>>>(res_type, val_type);
return new AggregateFunctionArgMinMax<AggregateFunctionArgMinMaxData<ResData, MinMaxData<SingleValueDataGeneric<>>>>(res_type, val_type);
}
template <template <typename> class MinMaxData>
@ -115,7 +134,7 @@ static IAggregateFunction * createAggregateFunctionArgMinMax(const String & name
if (which.idx == TypeIndex::String)
return createAggregateFunctionArgMinMaxSecond<MinMaxData, SingleValueDataString>(res_type, val_type);
return createAggregateFunctionArgMinMaxSecond<MinMaxData, SingleValueDataGeneric>(res_type, val_type);
return createAggregateFunctionArgMinMaxSecond<MinMaxData, SingleValueDataGeneric<>>(res_type, val_type);
}
}

View File

@ -335,7 +335,7 @@ public:
if constexpr (std::endian::native == std::endian::little)
hash_value = hash(x);
else
hash_value = __builtin_bswap32(hash(x));
hash_value = std::byteswap(hash(x));
if (!good(hash_value))
return;

View File

@ -162,14 +162,13 @@ private:
class PushOrVisitor
{
public:
PushOrVisitor(ContextPtr context, size_t max_atoms_, size_t num_atoms_)
PushOrVisitor(ContextPtr context, size_t max_atoms_)
: max_atoms(max_atoms_)
, num_atoms(num_atoms_)
, and_resolver(FunctionFactory::instance().get("and", context))
, or_resolver(FunctionFactory::instance().get("or", context))
{}
bool visit(QueryTreeNodePtr & node)
bool visit(QueryTreeNodePtr & node, size_t num_atoms)
{
if (max_atoms && num_atoms > max_atoms)
return false;
@ -187,7 +186,10 @@ public:
{
auto & arguments = function_node->getArguments().getNodes();
for (auto & argument : arguments)
visit(argument);
{
if (!visit(argument, num_atoms))
return false;
}
}
if (name == "or")
@ -217,7 +219,7 @@ public:
auto rhs = createFunctionNode(or_resolver, std::move(other_node), std::move(and_function_arguments[1]));
node = createFunctionNode(and_resolver, std::move(lhs), std::move(rhs));
visit(node);
return visit(node, num_atoms);
}
return true;
@ -225,7 +227,6 @@ public:
private:
size_t max_atoms;
size_t num_atoms;
const FunctionOverloadResolverPtr and_resolver;
const FunctionOverloadResolverPtr or_resolver;
@ -516,8 +517,8 @@ std::optional<CNF> CNF::tryBuildCNF(const QueryTreeNodePtr & node, ContextPtr co
visitor.visit(node_cloned, false);
}
if (PushOrVisitor visitor(context, max_atoms, atom_count);
!visitor.visit(node_cloned))
if (PushOrVisitor visitor(context, max_atoms);
!visitor.visit(node_cloned, atom_count))
return std::nullopt;
CollectGroupsVisitor collect_visitor;

View File

@ -214,14 +214,14 @@ int IBridge::main(const std::vector<std::string> & /*args*/)
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, hostname, port, log);
socket.setReceiveTimeout(http_timeout);
socket.setSendTimeout(http_timeout);
socket.setReceiveTimeout(Poco::Timespan(http_timeout, 0));
socket.setSendTimeout(Poco::Timespan(http_timeout, 0));
Poco::ThreadPool server_pool(3, max_server_connections);
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(http_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
http_params->setTimeout(Poco::Timespan(http_timeout, 0));
http_params->setKeepAliveTimeout(Poco::Timespan(keep_alive_timeout, 0));
auto shared_context = Context::createShared();
auto context = Context::createGlobal(shared_context.get());

View File

@ -353,6 +353,10 @@ target_link_libraries(clickhouse_common_io
Poco::Foundation
)
if (TARGET ch_contrib::fiu)
target_link_libraries(clickhouse_common_io PUBLIC ch_contrib::fiu)
endif()
if (TARGET ch_contrib::cpuid)
target_link_libraries(clickhouse_common_io PRIVATE ch_contrib::cpuid)
endif()
@ -544,6 +548,10 @@ if (TARGET ch_contrib::qpl)
dbms_target_link_libraries(PUBLIC ch_contrib::qpl)
endif ()
if (TARGET ch_contrib::accel-config)
dbms_target_link_libraries(PUBLIC ch_contrib::accel-config)
endif ()
target_link_libraries(clickhouse_common_io PUBLIC boost::context)
dbms_target_link_libraries(PUBLIC boost::context)

View File

@ -573,6 +573,13 @@ try
CompressionMethod compression_method = chooseCompressionMethod(out_file, compression_method_string);
UInt64 compression_level = 3;
if (query_with_output->is_outfile_append && compression_method != CompressionMethod::None)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot append to compressed file. Please use uncompressed file or remove APPEND keyword.");
}
if (query_with_output->compression_level)
{
const auto & compression_level_node = query_with_output->compression_level->as<ASTLiteral &>();
@ -587,8 +594,14 @@ try
range.second);
}
auto flags = O_WRONLY | O_EXCL;
if (query_with_output->is_outfile_append)
flags |= O_APPEND;
else
flags |= O_CREAT;
out_file_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, flags),
compression_method,
static_cast<int>(compression_level)
);

View File

@ -264,7 +264,9 @@ void ColumnFunction::appendArgument(const ColumnWithTypeAndName & column)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot capture column {} because it has incompatible type: "
"got {}, but {} is expected.", argument_types.size(), column.type->getName(), argument_types[index]->getName());
captured_columns.push_back(column);
auto captured_column = column;
captured_column.column = captured_column.column->convertToFullColumnIfSparse();
captured_columns.push_back(std::move(captured_column));
}
DataTypePtr ColumnFunction::getResultType() const

View File

@ -0,0 +1,173 @@
#include <Common/ConcurrencyControl.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ConcurrencyControl::Slot::~Slot()
{
allocation->release();
}
ConcurrencyControl::Slot::Slot(AllocationPtr && allocation_)
: allocation(std::move(allocation_))
{
}
ConcurrencyControl::Allocation::~Allocation()
{
// We have to lock parent's mutex to avoid race with grant()
// NOTE: shortcut can be added, but it requires Allocation::mutex lock even to check if shortcut is possible
parent.free(this);
}
[[nodiscard]] ConcurrencyControl::SlotPtr ConcurrencyControl::Allocation::tryAcquire()
{
SlotCount value = granted.load();
while (value)
{
if (granted.compare_exchange_strong(value, value - 1))
{
std::unique_lock lock{mutex};
return SlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
}
}
return {}; // avoid unnecessary locking
}
ConcurrencyControl::SlotCount ConcurrencyControl::Allocation::grantedCount() const
{
return granted;
}
ConcurrencyControl::Allocation::Allocation(ConcurrencyControl & parent_, SlotCount limit_, SlotCount granted_, Waiters::iterator waiter_)
: parent(parent_)
, limit(limit_)
, allocated(granted_)
, granted(granted_)
, waiter(waiter_)
{
if (allocated < limit)
*waiter = this;
}
// Grant single slot to allocation, returns true iff more slot(s) are required
bool ConcurrencyControl::Allocation::grant()
{
std::unique_lock lock{mutex};
granted++;
allocated++;
return allocated < limit;
}
// Release one slot and grant it to other allocation if required
void ConcurrencyControl::Allocation::release()
{
parent.release(1);
std::unique_lock lock{mutex};
released++;
if (released > allocated)
abort();
}
ConcurrencyControl::ConcurrencyControl()
: cur_waiter(waiters.end())
{
}
ConcurrencyControl::~ConcurrencyControl()
{
if (!waiters.empty())
abort();
}
[[nodiscard]] ConcurrencyControl::AllocationPtr ConcurrencyControl::allocate(SlotCount min, SlotCount max)
{
if (min > max)
throw Exception(ErrorCodes::LOGICAL_ERROR, "ConcurrencyControl: invalid allocation requirements");
std::unique_lock lock{mutex};
// Acquire as many slots as we can, but not lower than `min`
SlotCount granted = std::max(min, std::min(max, available(lock)));
cur_concurrency += granted;
// Create allocation and start waiting if more slots are required
if (granted < max)
return AllocationPtr(new Allocation(*this, max, granted,
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
else
return AllocationPtr(new Allocation(*this, max, granted));
}
void ConcurrencyControl::setMaxConcurrency(ConcurrencyControl::SlotCount value)
{
std::unique_lock lock{mutex};
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
schedule(lock);
}
ConcurrencyControl & ConcurrencyControl::instance()
{
static ConcurrencyControl result;
return result;
}
void ConcurrencyControl::free(Allocation * allocation)
{
// Allocation is allowed to be canceled even if there are:
// - `amount`: granted slots (acquired slots are not possible, because Slot holds AllocationPtr)
// - `waiter`: active waiting for more slots to be allocated
// Thus Allocation destruction may require the following lock, to avoid race conditions
std::unique_lock lock{mutex};
auto [amount, waiter] = allocation->cancel();
cur_concurrency -= amount;
if (waiter)
{
if (cur_waiter == *waiter)
cur_waiter = waiters.erase(*waiter);
else
waiters.erase(*waiter);
}
schedule(lock);
}
void ConcurrencyControl::release(SlotCount amount)
{
std::unique_lock lock{mutex};
cur_concurrency -= amount;
schedule(lock);
}
// Round-robin scheduling of available slots among waiting allocations
void ConcurrencyControl::schedule(std::unique_lock<std::mutex> &)
{
while (cur_concurrency < max_concurrency && !waiters.empty())
{
cur_concurrency++;
if (cur_waiter == waiters.end())
cur_waiter = waiters.begin();
Allocation * allocation = *cur_waiter;
if (allocation->grant())
++cur_waiter;
else
cur_waiter = waiters.erase(cur_waiter); // last required slot has just been granted -- stop waiting
}
}
ConcurrencyControl::SlotCount ConcurrencyControl::available(std::unique_lock<std::mutex> &) const
{
if (cur_concurrency < max_concurrency)
return max_concurrency - cur_concurrency;
else
return 0;
}
}

View File

@ -5,17 +5,10 @@
#include <mutex>
#include <memory>
#include <list>
#include <condition_variable>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
/*
* Controls how many threads can be allocated for a query (or another activity).
@ -53,17 +46,12 @@ public:
// Scoped guard for acquired slot, see Allocation::tryAcquire()
struct Slot : boost::noncopyable
{
~Slot()
{
allocation->release();
}
~Slot();
private:
friend struct Allocation; // for ctor
explicit Slot(AllocationPtr && allocation_)
: allocation(std::move(allocation_))
{}
explicit Slot(AllocationPtr && allocation_);
AllocationPtr allocation;
};
@ -74,47 +62,18 @@ public:
// Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max)
struct Allocation : std::enable_shared_from_this<Allocation>, boost::noncopyable
{
~Allocation()
{
// We have to lock parent's mutex to avoid race with grant()
// NOTE: shortcut can be added, but it requires Allocation::mutex lock even to check if shortcut is possible
parent.free(this);
}
~Allocation();
// Take one already granted slot if available. Lock-free iff there is no granted slot.
[[nodiscard]] SlotPtr tryAcquire()
{
SlotCount value = granted.load();
while (value)
{
if (granted.compare_exchange_strong(value, value - 1))
{
std::unique_lock lock{mutex};
return SlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
}
}
return {}; // avoid unnecessary locking
}
[[nodiscard]] SlotPtr tryAcquire();
SlotCount grantedCount() const
{
return granted;
}
SlotCount grantedCount() const;
private:
friend struct Slot; // for release()
friend class ConcurrencyControl; // for grant(), free() and ctor
Allocation(ConcurrencyControl & parent_, SlotCount limit_, SlotCount granted_, Waiters::iterator waiter_ = {})
: parent(parent_)
, limit(limit_)
, allocated(granted_)
, granted(granted_)
, waiter(waiter_)
{
if (allocated < limit)
*waiter = this;
}
Allocation(ConcurrencyControl & parent_, SlotCount limit_, SlotCount granted_, Waiters::iterator waiter_ = {});
auto cancel()
{
@ -126,23 +85,10 @@ public:
}
// Grant single slot to allocation, returns true iff more slot(s) are required
bool grant()
{
std::unique_lock lock{mutex};
granted++;
allocated++;
return allocated < limit;
}
bool grant();
// Release one slot and grant it to other allocation if required
void release()
{
parent.release(1);
std::unique_lock lock{mutex};
released++;
if (released > allocated)
abort();
}
void release();
ConcurrencyControl & parent;
const SlotCount limit;
@ -157,106 +103,32 @@ public:
};
public:
ConcurrencyControl()
: cur_waiter(waiters.end())
{}
ConcurrencyControl();
// WARNING: all Allocation objects MUST be destructed before ConcurrencyControl
// NOTE: Recommended way to achieve this is to use `instance()` and do graceful shutdown of queries
~ConcurrencyControl()
{
if (!waiters.empty())
abort();
}
~ConcurrencyControl();
// Allocate at least `min` and at most `max` slots.
// If not all `max` slots were successfully allocated, a subscription for later allocation is created
// Use `Allocation::tryAcquire()` to acquire allocated slot, before running a thread.
[[nodiscard]] AllocationPtr allocate(SlotCount min, SlotCount max)
{
if (min > max)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "ConcurrencyControl: invalid allocation requirements");
[[nodiscard]] AllocationPtr allocate(SlotCount min, SlotCount max);
std::unique_lock lock{mutex};
void setMaxConcurrency(SlotCount value);
// Acquire as much slots as we can, but not lower than `min`
SlotCount granted = std::max(min, std::min(max, available(lock)));
cur_concurrency += granted;
// Create allocation and start waiting if more slots are required
if (granted < max)
return AllocationPtr(new Allocation(*this, max, granted,
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
else
return AllocationPtr(new Allocation(*this, max, granted));
}
void setMaxConcurrency(SlotCount value)
{
std::unique_lock lock{mutex};
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
schedule(lock);
}
static ConcurrencyControl & instance()
{
static ConcurrencyControl result;
return result;
}
static ConcurrencyControl & instance();
private:
friend struct Allocation; // for free() and release()
void free(Allocation * allocation)
{
// Allocation is allowed to be canceled even if there are:
// - `amount`: granted slots (acquired slots are not possible, because Slot holds AllocationPtr)
// - `waiter`: active waiting for more slots to be allocated
// Thus Allocation destruction may require the following lock, to avoid race conditions
std::unique_lock lock{mutex};
auto [amount, waiter] = allocation->cancel();
void free(Allocation * allocation);
cur_concurrency -= amount;
if (waiter)
{
if (cur_waiter == *waiter)
cur_waiter = waiters.erase(*waiter);
else
waiters.erase(*waiter);
}
schedule(lock);
}
void release(SlotCount amount)
{
std::unique_lock lock{mutex};
cur_concurrency -= amount;
schedule(lock);
}
void release(SlotCount amount);
// Round-robin scheduling of available slots among waiting allocations
void schedule(std::unique_lock<std::mutex> &)
{
while (cur_concurrency < max_concurrency && !waiters.empty())
{
cur_concurrency++;
if (cur_waiter == waiters.end())
cur_waiter = waiters.begin();
Allocation * allocation = *cur_waiter;
if (allocation->grant())
++cur_waiter;
else
cur_waiter = waiters.erase(cur_waiter); // last required slot has just been granted -- stop waiting
}
}
void schedule(std::unique_lock<std::mutex> &);
SlotCount available(std::unique_lock<std::mutex> &) const
{
if (cur_concurrency < max_concurrency)
return max_concurrency - cur_concurrency;
else
return 0;
}
SlotCount available(std::unique_lock<std::mutex> &) const;
std::mutex mutex;
Waiters waiters;
@ -264,3 +136,5 @@ private:
SlotCount max_concurrency = Unlimited;
SlotCount cur_concurrency = 0;
};
}

View File

@ -53,6 +53,7 @@
M(QueryThread, "Number of query processing threads") \
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
M(MergesMutationsMemoryTracking, "Total amount of memory (bytes) allocated by background tasks (merges and mutations).") \
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \
M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \

View File

@ -29,21 +29,14 @@
M(13, SIZES_OF_COLUMNS_IN_TUPLE_DOESNT_MATCH) \
M(15, DUPLICATE_COLUMN) \
M(16, NO_SUCH_COLUMN_IN_TABLE) \
M(17, DELIMITER_IN_STRING_LITERAL_DOESNT_MATCH) \
M(18, CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN) \
M(19, SIZE_OF_FIXED_STRING_DOESNT_MATCH) \
M(20, NUMBER_OF_COLUMNS_DOESNT_MATCH) \
M(21, CANNOT_READ_ALL_DATA_FROM_TAB_SEPARATED_INPUT) \
M(22, CANNOT_PARSE_ALL_VALUE_FROM_TAB_SEPARATED_INPUT) \
M(23, CANNOT_READ_FROM_ISTREAM) \
M(24, CANNOT_WRITE_TO_OSTREAM) \
M(25, CANNOT_PARSE_ESCAPE_SEQUENCE) \
M(26, CANNOT_PARSE_QUOTED_STRING) \
M(27, CANNOT_PARSE_INPUT_ASSERTION_FAILED) \
M(28, CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER) \
M(29, CANNOT_PRINT_INTEGER) \
M(30, CANNOT_READ_SIZE_OF_COMPRESSED_CHUNK) \
M(31, CANNOT_READ_COMPRESSED_CHUNK) \
M(32, ATTEMPT_TO_READ_AFTER_EOF) \
M(33, CANNOT_READ_ALL_DATA) \
M(34, TOO_MANY_ARGUMENTS_FOR_FUNCTION) \
@ -57,7 +50,6 @@
M(42, NUMBER_OF_ARGUMENTS_DOESNT_MATCH) \
M(43, ILLEGAL_TYPE_OF_ARGUMENT) \
M(44, ILLEGAL_COLUMN) \
M(45, ILLEGAL_NUMBER_OF_RESULT_COLUMNS) \
M(46, UNKNOWN_FUNCTION) \
M(47, UNKNOWN_IDENTIFIER) \
M(48, NOT_IMPLEMENTED) \
@ -66,20 +58,14 @@
M(51, EMPTY_LIST_OF_COLUMNS_QUERIED) \
M(52, COLUMN_QUERIED_MORE_THAN_ONCE) \
M(53, TYPE_MISMATCH) \
M(54, STORAGE_DOESNT_ALLOW_PARAMETERS) \
M(55, STORAGE_REQUIRES_PARAMETER) \
M(56, UNKNOWN_STORAGE) \
M(57, TABLE_ALREADY_EXISTS) \
M(58, TABLE_METADATA_ALREADY_EXISTS) \
M(59, ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER) \
M(60, UNKNOWN_TABLE) \
M(61, ONLY_FILTER_COLUMN_IN_BLOCK) \
M(62, SYNTAX_ERROR) \
M(63, UNKNOWN_AGGREGATE_FUNCTION) \
M(64, CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT) \
M(65, CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT) \
M(66, NOT_A_COLUMN) \
M(67, ILLEGAL_KEY_OF_AGGREGATION) \
M(68, CANNOT_GET_SIZE_OF_FIELD) \
M(69, ARGUMENT_OUT_OF_BOUND) \
M(70, CANNOT_CONVERT_TYPE) \
@ -109,16 +95,11 @@
M(94, CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS) \
M(95, CANNOT_READ_FROM_SOCKET) \
M(96, CANNOT_WRITE_TO_SOCKET) \
M(97, CANNOT_READ_ALL_DATA_FROM_CHUNKED_INPUT) \
M(98, CANNOT_WRITE_TO_EMPTY_BLOCK_OUTPUT_STREAM) \
M(99, UNKNOWN_PACKET_FROM_CLIENT) \
M(100, UNKNOWN_PACKET_FROM_SERVER) \
M(101, UNEXPECTED_PACKET_FROM_CLIENT) \
M(102, UNEXPECTED_PACKET_FROM_SERVER) \
M(103, RECEIVED_DATA_FOR_WRONG_QUERY_ID) \
M(104, TOO_SMALL_BUFFER_SIZE) \
M(105, CANNOT_READ_HISTORY) \
M(106, CANNOT_APPEND_HISTORY) \
M(107, FILE_DOESNT_EXIST) \
M(108, NO_DATA_TO_INSERT) \
M(109, CANNOT_BLOCK_SIGNAL) \
@ -137,7 +118,6 @@
M(123, UNKNOWN_TYPE_OF_AST_NODE) \
M(124, INCORRECT_ELEMENT_OF_SET) \
M(125, INCORRECT_RESULT_OF_SCALAR_SUBQUERY) \
M(126, CANNOT_GET_RETURN_TYPE) \
M(127, ILLEGAL_INDEX) \
M(128, TOO_LARGE_ARRAY_SIZE) \
M(129, FUNCTION_IS_SPECIAL) \
@ -149,30 +129,17 @@
M(137, UNKNOWN_ELEMENT_IN_CONFIG) \
M(138, EXCESSIVE_ELEMENT_IN_CONFIG) \
M(139, NO_ELEMENTS_IN_CONFIG) \
M(140, ALL_REQUESTED_COLUMNS_ARE_MISSING) \
M(141, SAMPLING_NOT_SUPPORTED) \
M(142, NOT_FOUND_NODE) \
M(143, FOUND_MORE_THAN_ONE_NODE) \
M(144, FIRST_DATE_IS_BIGGER_THAN_LAST_DATE) \
M(145, UNKNOWN_OVERFLOW_MODE) \
M(146, QUERY_SECTION_DOESNT_MAKE_SENSE) \
M(147, NOT_FOUND_FUNCTION_ELEMENT_FOR_AGGREGATE) \
M(148, NOT_FOUND_RELATION_ELEMENT_FOR_CONDITION) \
M(149, NOT_FOUND_RHS_ELEMENT_FOR_CONDITION) \
M(150, EMPTY_LIST_OF_ATTRIBUTES_PASSED) \
M(151, INDEX_OF_COLUMN_IN_SORT_CLAUSE_IS_OUT_OF_RANGE) \
M(152, UNKNOWN_DIRECTION_OF_SORTING) \
M(153, ILLEGAL_DIVISION) \
M(154, AGGREGATE_FUNCTION_NOT_APPLICABLE) \
M(155, UNKNOWN_RELATION) \
M(156, DICTIONARIES_WAS_NOT_LOADED) \
M(157, ILLEGAL_OVERFLOW_MODE) \
M(158, TOO_MANY_ROWS) \
M(159, TIMEOUT_EXCEEDED) \
M(160, TOO_SLOW) \
M(161, TOO_MANY_COLUMNS) \
M(162, TOO_DEEP_SUBQUERIES) \
M(163, TOO_DEEP_PIPELINE) \
M(164, READONLY) \
M(165, TOO_MANY_TEMPORARY_COLUMNS) \
M(166, TOO_MANY_TEMPORARY_NON_CONST_COLUMNS) \
@ -183,20 +150,14 @@
M(172, CANNOT_CREATE_DIRECTORY) \
M(173, CANNOT_ALLOCATE_MEMORY) \
M(174, CYCLIC_ALIASES) \
M(176, CHUNK_NOT_FOUND) \
M(177, DUPLICATE_CHUNK_NAME) \
M(178, MULTIPLE_ALIASES_FOR_EXPRESSION) \
M(179, MULTIPLE_EXPRESSIONS_FOR_ALIAS) \
M(180, THERE_IS_NO_PROFILE) \
M(181, ILLEGAL_FINAL) \
M(182, ILLEGAL_PREWHERE) \
M(183, UNEXPECTED_EXPRESSION) \
M(184, ILLEGAL_AGGREGATION) \
M(185, UNSUPPORTED_MYISAM_BLOCK_TYPE) \
M(186, UNSUPPORTED_COLLATION_LOCALE) \
M(187, COLLATION_COMPARISON_FAILED) \
M(188, UNKNOWN_ACTION) \
M(189, TABLE_MUST_NOT_BE_CREATED_MANUALLY) \
M(190, SIZES_OF_ARRAYS_DONT_MATCH) \
M(191, SET_SIZE_LIMIT_EXCEEDED) \
M(192, UNKNOWN_USER) \
@ -204,15 +165,12 @@
M(194, REQUIRED_PASSWORD) \
M(195, IP_ADDRESS_NOT_ALLOWED) \
M(196, UNKNOWN_ADDRESS_PATTERN_TYPE) \
M(197, SERVER_REVISION_IS_TOO_OLD) \
M(198, DNS_ERROR) \
M(199, UNKNOWN_QUOTA) \
M(200, QUOTA_DOESNT_ALLOW_KEYS) \
M(201, QUOTA_EXCEEDED) \
M(202, TOO_MANY_SIMULTANEOUS_QUERIES) \
M(203, NO_FREE_CONNECTION) \
M(204, CANNOT_FSYNC) \
M(205, NESTED_TYPE_TOO_DEEP) \
M(206, ALIAS_REQUIRED) \
M(207, AMBIGUOUS_IDENTIFIER) \
M(208, EMPTY_NESTED_TABLE) \
@ -229,7 +187,6 @@
M(219, DATABASE_NOT_EMPTY) \
M(220, DUPLICATE_INTERSERVER_IO_ENDPOINT) \
M(221, NO_SUCH_INTERSERVER_IO_ENDPOINT) \
M(222, ADDING_REPLICA_TO_NON_EMPTY_TABLE) \
M(223, UNEXPECTED_AST_STRUCTURE) \
M(224, REPLICA_IS_ALREADY_ACTIVE) \
M(225, NO_ZOOKEEPER) \
@ -253,9 +210,7 @@
M(243, NOT_ENOUGH_SPACE) \
M(244, UNEXPECTED_ZOOKEEPER_ERROR) \
M(246, CORRUPTED_DATA) \
M(247, INCORRECT_MARK) \
M(248, INVALID_PARTITION_VALUE) \
M(250, NOT_ENOUGH_BLOCK_NUMBERS) \
M(251, NO_SUCH_REPLICA) \
M(252, TOO_MANY_PARTS) \
M(253, REPLICA_ALREADY_EXISTS) \
@ -271,8 +226,6 @@
M(264, INCOMPATIBLE_TYPE_OF_JOIN) \
M(265, NO_AVAILABLE_REPLICA) \
M(266, MISMATCH_REPLICAS_DATA_SOURCES) \
M(267, STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS) \
M(268, CPUID_ERROR) \
M(269, INFINITE_LOOP) \
M(270, CANNOT_COMPRESS) \
M(271, CANNOT_DECOMPRESS) \
@ -295,9 +248,7 @@
M(290, LIMIT_EXCEEDED) \
M(291, DATABASE_ACCESS_DENIED) \
M(293, MONGODB_CANNOT_AUTHENTICATE) \
M(294, INVALID_BLOCK_EXTRA_INFO) \
M(295, RECEIVED_EMPTY_DATA) \
M(296, NO_REMOTE_SHARD_FOUND) \
M(297, SHARD_HAS_NO_CONNECTIONS) \
M(298, CANNOT_PIPE) \
M(299, CANNOT_FORK) \
@ -311,13 +262,10 @@
M(307, TOO_MANY_BYTES) \
M(308, UNEXPECTED_NODE_IN_ZOOKEEPER) \
M(309, FUNCTION_CANNOT_HAVE_PARAMETERS) \
M(317, INVALID_SHARD_WEIGHT) \
M(318, INVALID_CONFIG_PARAMETER) \
M(319, UNKNOWN_STATUS_OF_INSERT) \
M(321, VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE) \
M(335, BARRIER_TIMEOUT) \
M(336, UNKNOWN_DATABASE_ENGINE) \
M(337, DDL_GUARD_IS_ACTIVE) \
M(341, UNFINISHED) \
M(342, METADATA_MISMATCH) \
M(344, SUPPORT_IS_DISABLED) \
@ -325,14 +273,10 @@
M(346, CANNOT_CONVERT_CHARSET) \
M(347, CANNOT_LOAD_CONFIG) \
M(349, CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN) \
M(350, INCOMPATIBLE_SOURCE_TABLES) \
M(351, AMBIGUOUS_TABLE_NAME) \
M(352, AMBIGUOUS_COLUMN_NAME) \
M(353, INDEX_OF_POSITIONAL_ARGUMENT_IS_OUT_OF_RANGE) \
M(354, ZLIB_INFLATE_FAILED) \
M(355, ZLIB_DEFLATE_FAILED) \
M(356, BAD_LAMBDA) \
M(357, RESERVED_IDENTIFIER_NAME) \
M(358, INTO_OUTFILE_NOT_ALLOWED) \
M(359, TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT) \
M(360, CANNOT_CREATE_CHARSET_CONVERTER) \
@ -341,7 +285,6 @@
M(363, CANNOT_CREATE_IO_BUFFER) \
M(364, RECEIVED_ERROR_TOO_MANY_REQUESTS) \
M(366, SIZES_OF_NESTED_COLUMNS_ARE_INCONSISTENT) \
M(367, TOO_MANY_FETCHES) \
M(369, ALL_REPLICAS_ARE_STALE) \
M(370, DATA_TYPE_CANNOT_BE_USED_IN_TABLES) \
M(371, INCONSISTENT_CLUSTER_DEFINITION) \
@ -352,7 +295,6 @@
M(376, CANNOT_PARSE_UUID) \
M(377, ILLEGAL_SYNTAX_FOR_DATA_TYPE) \
M(378, DATA_TYPE_CANNOT_HAVE_ARGUMENTS) \
M(379, UNKNOWN_STATUS_OF_DISTRIBUTED_DDL_TASK) \
M(380, CANNOT_KILL) \
M(381, HTTP_LENGTH_REQUIRED) \
M(382, CANNOT_LOAD_CATBOOST_MODEL) \
@ -378,11 +320,9 @@
M(402, CANNOT_IOSETUP) \
M(403, INVALID_JOIN_ON_EXPRESSION) \
M(404, BAD_ODBC_CONNECTION_STRING) \
M(405, PARTITION_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT) \
M(406, TOP_AND_LIMIT_TOGETHER) \
M(407, DECIMAL_OVERFLOW) \
M(408, BAD_REQUEST_PARAMETER) \
M(409, EXTERNAL_EXECUTABLE_NOT_FOUND) \
M(410, EXTERNAL_SERVER_IS_NOT_RESPONDING) \
M(411, PTHREAD_ERROR) \
M(412, NETLINK_ERROR) \
@ -399,7 +339,6 @@
M(424, CANNOT_LINK) \
M(425, SYSTEM_ERROR) \
M(427, CANNOT_COMPILE_REGEXP) \
M(428, UNKNOWN_LOG_LEVEL) \
M(429, FAILED_TO_GETPWUID) \
M(430, MISMATCHING_USERS_FOR_PROCESS_AND_DATA) \
M(431, ILLEGAL_SYNTAX_FOR_CODEC_TYPE) \
@ -433,7 +372,6 @@
M(459, CANNOT_SET_THREAD_PRIORITY) \
M(460, CANNOT_CREATE_TIMER) \
M(461, CANNOT_SET_TIMER_PERIOD) \
M(462, CANNOT_DELETE_TIMER) \
M(463, CANNOT_FCNTL) \
M(464, CANNOT_PARSE_ELF) \
M(465, CANNOT_PARSE_DWARF) \
@ -456,15 +394,12 @@
M(482, DICTIONARY_ACCESS_DENIED) \
M(483, TOO_MANY_REDIRECTS) \
M(484, INTERNAL_REDIS_ERROR) \
M(485, SCALAR_ALREADY_EXISTS) \
M(487, CANNOT_GET_CREATE_DICTIONARY_QUERY) \
M(488, UNKNOWN_DICTIONARY) \
M(489, INCORRECT_DICTIONARY_DEFINITION) \
M(490, CANNOT_FORMAT_DATETIME) \
M(491, UNACCEPTABLE_URL) \
M(492, ACCESS_ENTITY_NOT_FOUND) \
M(493, ACCESS_ENTITY_ALREADY_EXISTS) \
M(494, ACCESS_ENTITY_FOUND_DUPLICATES) \
M(495, ACCESS_STORAGE_READONLY) \
M(496, QUOTA_REQUIRES_CLIENT_KEY) \
M(497, ACCESS_DENIED) \
@ -475,8 +410,6 @@
M(502, CANNOT_SIGQUEUE) \
M(503, AGGREGATE_FUNCTION_THROW) \
M(504, FILE_ALREADY_EXISTS) \
M(505, CANNOT_DELETE_DIRECTORY) \
M(506, UNEXPECTED_ERROR_CODE) \
M(507, UNABLE_TO_SKIP_UNUSED_SHARDS) \
M(508, UNKNOWN_ACCESS_TYPE) \
M(509, INVALID_GRANT) \
@ -501,8 +434,6 @@
M(530, CANNOT_CONNECT_RABBITMQ) \
M(531, CANNOT_FSTAT) \
M(532, LDAP_ERROR) \
M(533, INCONSISTENT_RESERVATIONS) \
M(534, NO_RESERVATIONS_PROVIDED) \
M(535, UNKNOWN_RAID_TYPE) \
M(536, CANNOT_RESTORE_FROM_FIELD_DUMP) \
M(537, ILLEGAL_MYSQL_VARIABLE) \
@ -518,8 +449,6 @@
M(547, INVALID_RAID_TYPE) \
M(548, UNKNOWN_VOLUME) \
M(549, DATA_TYPE_CANNOT_BE_USED_IN_KEY) \
M(550, CONDITIONAL_TREE_PARENT_NOT_FOUND) \
M(551, ILLEGAL_PROJECTION_MANIPULATOR) \
M(552, UNRECOGNIZED_ARGUMENTS) \
M(553, LZMA_STREAM_ENCODER_FAILED) \
M(554, LZMA_STREAM_DECODER_FAILED) \
@ -580,8 +509,6 @@
M(609, FUNCTION_ALREADY_EXISTS) \
M(610, CANNOT_DROP_FUNCTION) \
M(611, CANNOT_CREATE_RECURSIVE_FUNCTION) \
M(612, OBJECT_ALREADY_STORED_ON_DISK) \
M(613, OBJECT_WAS_NOT_STORED_ON_DISK) \
M(614, POSTGRESQL_CONNECTION_FAILURE) \
M(615, CANNOT_ADVISE) \
M(616, UNKNOWN_READ_METHOD) \
@ -612,9 +539,7 @@
M(641, CANNOT_APPEND_TO_FILE) \
M(642, CANNOT_PACK_ARCHIVE) \
M(643, CANNOT_UNPACK_ARCHIVE) \
M(644, REMOTE_FS_OBJECT_CACHE_ERROR) \
M(645, NUMBER_OF_DIMENSIONS_MISMATCHED) \
M(646, CANNOT_BACKUP_DATABASE) \
M(647, CANNOT_BACKUP_TABLE) \
M(648, WRONG_DDL_RENAMING_SETTINGS) \
M(649, INVALID_TRANSACTION) \

166
src/Common/FailPoint.cpp Normal file
View File

@ -0,0 +1,166 @@
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <boost/core/noncopyable.hpp>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <optional>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
};
#if FIU_ENABLE
static struct InitFiu
{
InitFiu()
{
fiu_init(0);
}
} init_fiu;
#endif
/// We should define different types of failpoints here. There are four types of them:
/// - ONCE: the failpoint will only be triggered once.
/// - REGULAR: the failpoint will always be triggered util disableFailPoint is called.
/// - PAUSAEBLE_ONCE: the failpoint will be blocked one time when pauseFailPoint is called, util disableFailPoint is called.
/// - PAUSAEBLE: the failpoint will be blocked every time when pauseFailPoint is called, util disableFailPoint is called.
#define APPLY_FOR_FAILPOINTS(ONCE, REGULAR, PAUSEABLE_ONCE, PAUSEABLE) \
ONCE(replicated_merge_tree_commit_zk_fail_after_op) \
REGULAR(dummy_failpoint) \
PAUSEABLE_ONCE(dummy_pausable_failpoint_once) \
PAUSEABLE(dummy_pausable_failpoint)
namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";
APPLY_FOR_FAILPOINTS(M, M, M, M)
#undef M
}
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointInjection::fail_point_wait_channels;
std::mutex FailPointInjection::mu;
class FailPointChannel : private boost::noncopyable
{
public:
explicit FailPointChannel(UInt64 timeout_)
: timeout_ms(timeout_)
{}
FailPointChannel()
: timeout_ms(0)
{}
void wait()
{
std::unique_lock lock(m);
if (timeout_ms == 0)
cv.wait(lock);
else
cv.wait_for(lock, std::chrono::milliseconds(timeout_ms));
}
void notifyAll()
{
std::unique_lock lock(m);
cv.notify_all();
}
private:
UInt64 timeout_ms;
std::mutex m;
std::condition_variable cv;
};
void FailPointInjection::enablePauseFailPoint(const String & fail_point_name, UInt64 time_ms)
{
#define SUB_M(NAME, flags) \
if (fail_point_name == FailPoints::NAME) \
{ \
/* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \
fiu_enable(FailPoints::NAME, 1, nullptr, flags); \
std::lock_guard lock(mu); \
fail_point_wait_channels.try_emplace(FailPoints::NAME, std::make_shared<FailPointChannel>(time_ms)); \
return; \
}
#define ONCE(NAME)
#define REGULAR(NAME)
#define PAUSEABLE_ONCE(NAME) SUB_M(NAME, FIU_ONETIME)
#define PAUSEABLE(NAME) SUB_M(NAME, 0)
APPLY_FOR_FAILPOINTS(ONCE, REGULAR, PAUSEABLE_ONCE, PAUSEABLE)
#undef SUB_M
#undef ONCE
#undef REGULAR
#undef PAUSEABLE_ONCE
#undef PAUSEABLE
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find fail point {}", fail_point_name);
}
void FailPointInjection::pauseFailPoint(const String & fail_point_name)
{
fiu_do_on(fail_point_name.c_str(), FailPointInjection::wait(fail_point_name););
}
void FailPointInjection::enableFailPoint(const String & fail_point_name)
{
#if FIU_ENABLE
#define SUB_M(NAME, flags, pause) \
if (fail_point_name == FailPoints::NAME) \
{ \
/* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \
fiu_enable(FailPoints::NAME, 1, nullptr, flags); \
if (pause) \
{ \
std::lock_guard lock(mu); \
fail_point_wait_channels.try_emplace(FailPoints::NAME, std::make_shared<FailPointChannel>()); \
} \
return; \
}
#define ONCE(NAME) SUB_M(NAME, FIU_ONETIME, 0)
#define REGULAR(NAME) SUB_M(NAME, 0, 0)
#define PAUSEABLE_ONCE(NAME) SUB_M(NAME, FIU_ONETIME, 1)
#define PAUSEABLE(NAME) SUB_M(NAME, 0, 1)
APPLY_FOR_FAILPOINTS(ONCE, REGULAR, PAUSEABLE_ONCE, PAUSEABLE)
#undef SUB_M
#undef ONCE
#undef REGULAR
#undef PAUSEABLE_ONCE
#undef PAUSEABLE
#endif
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find fail point {}", fail_point_name);
}
void FailPointInjection::disableFailPoint(const String & fail_point_name)
{
std::lock_guard lock(mu);
if (auto iter = fail_point_wait_channels.find(fail_point_name); iter != fail_point_wait_channels.end())
{
/// can not rely on deconstruction to do the notify_all things, because
/// if someone wait on this, the deconstruct will never be called.
iter->second->notifyAll();
fail_point_wait_channels.erase(iter);
}
fiu_disable(fail_point_name.c_str());
}
void FailPointInjection::wait(const String & fail_point_name)
{
std::unique_lock lock(mu);
if (auto iter = fail_point_wait_channels.find(fail_point_name); iter == fail_point_wait_channels.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not find channel for fail point {}", fail_point_name);
else
{
lock.unlock();
auto ptr = iter->second;
ptr->wait();
}
};
}

53
src/Common/FailPoint.h Normal file
View File

@ -0,0 +1,53 @@
#pragma once
#include "config.h"
#include <Common/Exception.h>
#include <Core/Types.h>
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdocumentation"
#pragma clang diagnostic ignored "-Wreserved-macro-identifier"
#endif
#include <fiu.h>
#include <fiu-control.h>
#ifdef __clang__
#pragma clang diagnostic pop
#endif
#include <any>
#include <unordered_map>
namespace DB
{
/// This is a simple named failpoint library inspired by https://github.com/pingcap/tiflash
/// The usage is simple:
/// 1. define failpoint with a 'failpoint_name' in FailPoint.cpp
/// 2. inject failpoint in normal code
/// 2.1 use fiu_do_on which can inject any code blocks, when it is a regular-triggered / once-triggered failpoint
/// 2.2 use pauseFailPoint when it is a pausable failpoint
/// 3. in test file, we can use system failpoint enable/disable 'failpoint_name'
class FailPointChannel;
class FailPointInjection
{
public:
static void pauseFailPoint(const String & fail_point_name);
static void enableFailPoint(const String & fail_point_name);
static void enablePauseFailPoint(const String & fail_point_name, UInt64 time);
static void disableFailPoint(const String & fail_point_name);
static void wait(const String & fail_point_name);
private:
static std::mutex mu;
static std::unordered_map<String, std::shared_ptr<FailPointChannel>> fail_point_wait_channels;
};
}

View File

@ -80,6 +80,8 @@ template <
class ClearableHashSet
: public HashTable<Key, ClearableHashTableCell<Key, HashTableCell<Key, Hash, ClearableHashSetState>>, Hash, Grower, Allocator>
{
using Cell = ClearableHashTableCell<Key, HashTableCell<Key, Hash, ClearableHashSetState>>;
public:
using Base = HashTable<Key, ClearableHashTableCell<Key, HashTableCell<Key, Hash, ClearableHashSetState>>, Hash, Grower, Allocator>;
using typename Base::LookupResult;
@ -88,6 +90,13 @@ public:
{
++this->version;
this->m_size = 0;
if constexpr (Cell::need_zero_value_storage)
{
/// clear ZeroValueStorage
if (this->hasZero())
this->clearHasZero();
}
}
};
@ -103,11 +112,20 @@ class ClearableHashSetWithSavedHash : public HashTable<
Grower,
Allocator>
{
using Cell = ClearableHashTableCell<Key, HashSetCellWithSavedHash<Key, Hash, ClearableHashSetState>>;
public:
void clear()
{
++this->version;
this->m_size = 0;
if constexpr (Cell::need_zero_value_storage)
{
/// clear ZeroValueStorage
if (this->hasZero())
this->clearHasZero();
}
}
};

View File

@ -96,12 +96,17 @@ using namespace std::chrono_literals;
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false);
std::atomic<Int64> MemoryTracker::free_memory_in_allocator_arenas;
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_)
: parent(parent_)
, log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_)
, level(level_)
{}
MemoryTracker::~MemoryTracker()
{
@ -528,3 +533,10 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value))
;
}
bool canEnqueueBackgroundTask()
{
auto limit = background_memory_tracker.getSoftLimit();
auto amount = background_memory_tracker.get();
return limit == 0 || amount < limit;
}

View File

@ -98,6 +98,7 @@ public:
explicit MemoryTracker(VariableContext level_ = VariableContext::Thread);
explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_);
~MemoryTracker();
@ -110,6 +111,22 @@ public:
return amount.load(std::memory_order_relaxed);
}
// Merges and mutations may pass memory ownership to other threads thus in the end of execution
// MemoryTracker for background task may have a non-zero counter.
// This method is intended to fix the counter inside of background_memory_tracker.
// NOTE: We can't use alloc/free methods to do it, because they also will change the value inside
// of total_memory_tracker.
void adjustOnBackgroundTaskEnd(const MemoryTracker * child)
{
auto background_memory_consumption = child->amount.load(std::memory_order_relaxed);
amount.fetch_sub(background_memory_consumption, std::memory_order_relaxed);
// Also fix CurrentMetrics::MergesMutationsMemoryTracking
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end())
CurrentMetrics::sub(metric_loaded, background_memory_consumption);
}
Int64 getPeak() const
{
return peak.load(std::memory_order_relaxed);
@ -220,3 +237,6 @@ public:
};
extern MemoryTracker total_memory_tracker;
extern MemoryTracker background_memory_tracker;
bool canEnqueueBackgroundTask();

View File

@ -191,10 +191,8 @@
\
M(InsertedWideParts, "Number of parts inserted in Wide format.") \
M(InsertedCompactParts, "Number of parts inserted in Compact format.") \
M(InsertedInMemoryParts, "Number of parts inserted in InMemory format.") \
M(MergedIntoWideParts, "Number of parts merged into Wide format.") \
M(MergedIntoCompactParts, "Number of parts merged into Compact format.") \
M(MergedIntoInMemoryParts, "Number of parts in merged into InMemory format.") \
\
M(MergeTreeDataProjectionWriterRows, "Number of rows INSERTed to MergeTree tables projection.") \
M(MergeTreeDataProjectionWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables projection.") \

View File

@ -19,17 +19,26 @@ namespace ErrorCodes
class RandomFaultInjection
{
public:
bool must_fail_after_op = false;
bool must_fail_before_op = false;
RandomFaultInjection(double probability, UInt64 seed_) : rndgen(seed_), distribution(probability) { }
void beforeOperation()
{
if (distribution(rndgen))
if (distribution(rndgen) || must_fail_before_op)
{
must_fail_before_op = false;
throw zkutil::KeeperException("Fault injection before operation", Coordination::Error::ZSESSIONEXPIRED);
}
}
void afterOperation()
{
if (distribution(rndgen))
if (distribution(rndgen) || must_fail_after_op)
{
must_fail_after_op = false;
throw zkutil::KeeperException("Fault injection after operation", Coordination::Error::ZOPERATIONTIMEOUT);
}
}
private:
@ -42,6 +51,9 @@ private:
///
class ZooKeeperWithFaultInjection
{
template<bool async_insert>
friend class ReplicatedMergeTreeSinkImpl;
using zk = zkutil::ZooKeeper;
zk::Ptr keeper;

View File

@ -57,4 +57,5 @@
#cmakedefine01 USE_SKIM
#cmakedefine01 USE_OPENSSL_INTREE
#cmakedefine01 USE_ULID
#cmakedefine01 FIU_ENABLE
#cmakedefine01 USE_BCRYPT

View File

@ -9,6 +9,8 @@
#include <Common/ConcurrencyControl.h>
#include <Common/randomSeed.h>
using namespace DB;
struct ConcurrencyControlTest
{
ConcurrencyControl cc;
@ -276,9 +278,9 @@ TEST(ConcurrencyControl, MultipleThreads)
queries.emplace_back([&, max_threads = max_threads_distribution(rng)]
{
run_query(max_threads);
finished++;
++finished;
});
started++;
++started;
}
sleepForMicroseconds(5); // wait some queries to finish
t.cc.setMaxConcurrency(cfg_max_concurrency - started % 3); // emulate configuration updates

View File

@ -378,6 +378,13 @@ void transpose(const T * src, char * dst, UInt32 num_bits, UInt32 tail = 64)
/// UInt64[N] transposed matrix -> UIntX[64]
template <typename T, bool full = false>
#if defined(__s390x__)
/* Compiler Bug for S390x :- https://github.com/llvm/llvm-project/issues/62572
* Please remove this after the fix is backported
*/
__attribute__((noinline))
#endif
void reverseTranspose(const char * src, T * buf, UInt32 num_bits, UInt32 tail = 64)
{
UInt64 matrix[64] = {};

View File

@ -172,7 +172,7 @@ void registerCodecDeflateQpl(CompressionCodecFactory & factory);
/// Keeper use only general-purpose codecs, so we don't need these special codecs
/// in standalone build
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
void registerCodecDelta(CompressionCodecFactory & factory);
void registerCodecT64(CompressionCodecFactory & factory);
void registerCodecDoubleDelta(CompressionCodecFactory & factory);
@ -188,7 +188,7 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecZSTD(*this);
registerCodecLZ4HC(*this);
registerCodecMultiple(*this);
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
registerCodecDelta(*this);
registerCodecT64(*this);
registerCodecDoubleDelta(*this);

View File

@ -31,7 +31,7 @@ namespace Authentication
static const size_t SCRAMBLE_LENGTH = 20;
/** Generate a random string using ASCII characters but avoid separator character,
* produce pseudo random numbers between with about 7 bit worth of entropty between 1-127.
* produce pseudo random numbers between with about 7 bit worth of entropy between 1-127.
* https://github.com/mysql/mysql-server/blob/8.0/mysys/crypt_genhash_impl.cc#L427
*/
static String generateScramble()

View File

@ -42,6 +42,8 @@ namespace DB
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
M(UInt64, max_server_memory_usage, 0, "Limit on total memory usage. Zero means Unlimited.", 0) \
M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to ram ratio. Allows to lower max memory on low-memory systems.", 0) \
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Limit on total memory usage for merges and mutations. Zero means Unlimited.", 0) \
M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to ram ratio. Allows to lower memory limit on low-memory systems.", 0) \
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
\
M(UInt64, max_concurrent_queries, 0, "Limit on total number of concurrently executed queries. Zero means Unlimited.", 0) \

View File

@ -560,6 +560,7 @@ class IColumn;
M(Bool, asterisk_include_alias_columns, false, "Include ALIAS columns for wildcard query", 0) \
M(Bool, optimize_skip_merged_partitions, false, "Skip partitions with one part with level > 0 in optimize final", 0) \
M(Bool, optimize_on_insert, true, "Do the same transformation for inserted block of data as if merge was done on this block.", 0) \
M(Bool, optimize_use_projections, true, "Automatically choose projections to perform SELECT query", 0) \
M(Bool, force_optimize_projection, false, "If projection optimization is enabled, SELECT queries need to use projection", 0) \
M(Bool, async_socket_for_remote, true, "Asynchronously read from socket executing remote query", 0) \
M(Bool, async_query_sending_for_remote, true, "Asynchronously create connections and send query to shards in remote query", 0) \
@ -712,28 +713,15 @@ class IColumn;
M(String, additional_result_filter, "", "Additional filter expression which would be applied to query result", 0) \
\
M(String, workload, "default", "Name of workload to be used to access resources", 0) \
M(Milliseconds, storage_system_stack_trace_pipe_read_timeout_ms, 100, "Maximum time to read from a pipe for receiving information from the threads when querying the `system.stack_trace` table. This setting is used for testing purposes and not meant to be changed by users.", 0) \
\
M(Bool, parallelize_output_from_storages, true, "Parallelize output for reading step from storage. It allows parallelizing query processing right after reading from storage if possible", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \
M(Bool, allow_experimental_hash_functions, false, "Enable experimental hash functions (hashid, etc)", 0) \
M(Bool, allow_experimental_object_type, false, "Allow Object and JSON data types", 0) \
M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \
M(String, ann_index_select_query_params, "", "Parameters passed to ANN indexes in SELECT queries, the format is 'param1=x, param2=y, ...'", 0) \
M(UInt64, max_limit_for_ann_queries, 1000000, "Maximum limit value for using ANN indexes is used to prevent memory overflow in search queries for indexes", 0) \
M(Bool, allow_experimental_annoy_index, false, "Allows to use Annoy index. Disabled by default because this feature is experimental", 0) \
M(Bool, count_distinct_optimization, false, "Rewrite count distinct to subquery of group by", 0) \
M(Bool, throw_on_unsupported_query_inside_transaction, true, "Throw exception if unsupported query is used inside transaction", 0) \
M(TransactionsWaitCSNMode, wait_changes_become_visible_after_commit_mode, TransactionsWaitCSNMode::WAIT_UNKNOWN, "Wait for committed changes to become actually visible in the latest snapshot", 0) \
M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \
M(Bool, throw_if_no_data_to_insert, true, "Enables or disables empty INSERTs, enabled by default", 0) \
M(Bool, compatibility_ignore_auto_increment_in_create_table, false, "Ignore AUTO_INCREMENT keyword in column declaration if true, otherwise return error. It simplifies migration from MySQL", 0) \
M(Bool, multiple_joins_try_to_keep_original_names, false, "Do not add aliases to top level expression list on multiple joins rewrite", 0) \
M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, optimize_sorting_by_input_stream_properties, true, "Optimize sorting by sorting properties of input stream", 0) \
M(UInt64, insert_keeper_max_retries, 20, "Max retries for keeper operations during insert", 0) \
M(UInt64, insert_keeper_retry_initial_backoff_ms, 100, "Initial backoff timeout for keeper operations during insert", 0) \
@ -742,10 +730,24 @@ class IColumn;
M(UInt64, insert_keeper_fault_injection_seed, 0, "0 - random seed, otherwise the setting value", 0) \
M(Bool, force_aggregation_in_order, false, "Force use of aggregation in order on remote nodes during distributed aggregation. PLEASE, NEVER CHANGE THIS SETTING VALUE MANUALLY!", IMPORTANT) \
M(UInt64, http_max_request_param_data_size, 10_MiB, "Limit on size of request data used as a query parameter in predefined HTTP requests.", 0) \
M(Bool, function_json_value_return_type_allow_nullable, false, "Allow function JSON_VALUE to return nullable type.", 0) \
M(Bool, function_json_value_return_type_allow_complex, false, "Allow function JSON_VALUE to return complex type, such as: struct, array, map.", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \
M(Bool, allow_experimental_hash_functions, false, "Enable experimental hash functions (hashid, etc)", 0) \
M(Bool, allow_experimental_object_type, false, "Allow Object and JSON data types", 0) \
M(Bool, allow_experimental_annoy_index, false, "Allows to use Annoy index. Disabled by default because this feature is experimental", 0) \
M(UInt64, max_limit_for_ann_queries, 1000000, "Maximum limit value for using ANN indexes is used to prevent memory overflow in search queries for indexes", 0) \
M(Bool, throw_on_unsupported_query_inside_transaction, true, "Throw exception if unsupported query is used inside transaction", 0) \
M(TransactionsWaitCSNMode, wait_changes_become_visible_after_commit_mode, TransactionsWaitCSNMode::WAIT_UNKNOWN, "Wait for committed changes to become actually visible in the latest snapshot", 0) \
M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \
M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
M(Bool, optimize_distinct_in_order, false, "This optimization has a bug and it is disabled. Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, allow_experimental_undrop_table_query, false, "Allow to use undrop query to restore dropped table in a limited time", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(Bool, function_json_value_return_type_allow_nullable, false, "Allow function to return nullable type.", 0) \
M(Bool, function_json_value_return_type_allow_complex, false, "Allow function to return complex type, such as: struct, array, map.", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.
@ -903,6 +905,7 @@ class IColumn;
M(UInt64, output_format_pretty_max_value_width, 10000, "Maximum width of value to display in Pretty formats. If greater - it will be cut.", 0) \
M(Bool, output_format_pretty_color, true, "Use ANSI escape sequences to paint colors in Pretty formats", 0) \
M(String, output_format_pretty_grid_charset, "UTF-8", "Charset for printing grid borders. Available charsets: ASCII, UTF-8 (default one).", 0) \
M(Milliseconds, output_format_pretty_squash_ms, 100, "Squash blocks in Pretty formats if the time passed after the previous block is not greater than the specified threshold in milliseconds. This avoids printing miltiple small blocks.", 0) \
M(UInt64, output_format_parquet_row_group_size, 1000000, "Target row group size in rows.", 0) \
M(UInt64, output_format_parquet_row_group_size_bytes, 512 * 1024 * 1024, "Target row group size in bytes, before compression.", 0) \
M(Bool, output_format_parquet_string_as_string, false, "Use Parquet String type instead of Binary for String columns.", 0) \

View File

@ -338,7 +338,7 @@ void SettingFieldString::readBinary(ReadBuffer & in)
/// that. The linker does not complain only because clickhouse-keeper does not call any of below
/// functions. A cleaner alternative would be more modular libraries, e.g. one for data types, which
/// could then be linked by the server and the linker.
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
SettingFieldMap::SettingFieldMap(const Field & f) : value(fieldToMap(f)) {}

View File

@ -18,7 +18,7 @@
#include "config.h"
#include "config_version.h"
#if USE_SENTRY && !defined(KEEPER_STANDALONE_BUILD)
#if USE_SENTRY && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
# include <sentry.h>
# include <cstdio>

View File

@ -11,9 +11,6 @@ namespace DB
*
* Mostly the same as Int64.
* But also tagged with interval kind.
*
* Intended usage is for temporary elements in expressions,
* not for storing values in tables.
*/
class DataTypeInterval final : public DataTypeNumberBase<Int64>
{
@ -34,7 +31,6 @@ public:
bool equals(const IDataType & rhs) const override;
bool isParametric() const override { return true; }
bool cannotBeStoredInTables() const override { return true; }
bool isCategorial() const override { return false; }
bool canBeInsideNullable() const override { return true; }
};

View File

@ -246,7 +246,8 @@ void SerializationInfoByName::writeJSON(WriteBuffer & out) const
return writeString(oss.str(), out);
}
void SerializationInfoByName::readJSON(ReadBuffer & in)
SerializationInfoByName SerializationInfoByName::readJSON(
const NamesAndTypesList & columns, const Settings & settings, ReadBuffer & in)
{
String json_str;
readString(json_str, in);
@ -262,8 +263,13 @@ void SerializationInfoByName::readJSON(ReadBuffer & in)
"Unknown version of serialization infos ({}). Should be less or equal than {}",
object->getValue<size_t>(KEY_VERSION), SERIALIZATION_INFO_VERSION);
SerializationInfoByName infos;
if (object->has(KEY_COLUMNS))
{
std::unordered_map<std::string_view, const IDataType *> column_type_by_name;
for (const auto & [name, type] : columns)
column_type_by_name.emplace(name, type.get());
auto array = object->getArray(KEY_COLUMNS);
for (const auto & elem : *array)
{
@ -271,13 +277,22 @@ void SerializationInfoByName::readJSON(ReadBuffer & in)
if (!elem_object->has(KEY_NAME))
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Missed field '{}' in SerializationInfo of columns", KEY_NAME);
"Missed field '{}' in serialization infos", KEY_NAME);
auto name = elem_object->getValue<String>(KEY_NAME);
if (auto it = find(name); it != end())
it->second->fromJSON(*elem_object);
auto it = column_type_by_name.find(name);
if (it == column_type_by_name.end())
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Found unexpected column '{}' in serialization infos", name);
auto info = it->second->createSerializationInfo(settings);
info->fromJSON(*elem_object);
infos.emplace(name, std::move(info));
}
}
return infos;
}
}

View File

@ -96,8 +96,10 @@ using MutableSerializationInfos = std::vector<MutableSerializationInfoPtr>;
class SerializationInfoByName : public std::map<String, MutableSerializationInfoPtr>
{
public:
using Settings = SerializationInfo::Settings;
SerializationInfoByName() = default;
SerializationInfoByName(const NamesAndTypesList & columns, const SerializationInfo::Settings & settings);
SerializationInfoByName(const NamesAndTypesList & columns, const Settings & settings);
void add(const Block & block);
void add(const SerializationInfoByName & other);
@ -108,7 +110,9 @@ public:
void replaceData(const SerializationInfoByName & other);
void writeJSON(WriteBuffer & out) const;
void readJSON(ReadBuffer & in);
static SerializationInfoByName readJSON(
const NamesAndTypesList & columns, const Settings & settings, ReadBuffer & in);
};
}

View File

@ -1,5 +1,6 @@
#include <Databases/DDLDependencyVisitor.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
#include <Databases/removeWhereConditionPlaceholder.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/misc.h>
@ -12,6 +13,8 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/parseQuery.h>
#include <Common/KnownObjectNames.h>
#include <Poco/String.h>
@ -25,6 +28,7 @@ namespace
/// Used to visits ASTCreateQuery and extracts the names of all tables explicitly referenced in the create query.
class DDLDependencyVisitorData
{
friend void tryVisitNestedSelect(const String & query, DDLDependencyVisitorData & data);
public:
DDLDependencyVisitorData(const ContextPtr & context_, const QualifiedTableName & table_name_, const ASTPtr & ast_)
: create_query(ast_), table_name(table_name_), current_database(context_->getCurrentDatabase()), context(context_)
@ -106,9 +110,17 @@ namespace
if (!info || !info->is_local)
return;
if (info->table_name.database.empty())
info->table_name.database = current_database;
dependencies.emplace(std::move(info->table_name));
if (!info->table_name.table.empty())
{
if (info->table_name.database.empty())
info->table_name.database = current_database;
dependencies.emplace(std::move(info->table_name));
}
else
{
/// We don't have a table name, we have a select query instead
tryVisitNestedSelect(info->query, *this);
}
}
/// ASTTableExpression represents a reference to a table in SELECT query.
@ -424,6 +436,25 @@ namespace
static bool needChildVisit(const ASTPtr &, const ASTPtr & child, const Data & data) { return data.needChildVisit(child); }
static void visit(const ASTPtr & ast, Data & data) { data.visit(ast); }
};
void tryVisitNestedSelect(const String & query, DDLDependencyVisitorData & data)
{
try
{
ParserSelectWithUnionQuery parser;
String description = fmt::format("Query for ClickHouse dictionary {}", data.table_name);
String fixed_query = removeWhereConditionPlaceholder(query);
ASTPtr select = parseQuery(parser, fixed_query, description,
data.context->getSettingsRef().max_query_size, data.context->getSettingsRef().max_parser_depth);
DDLDependencyVisitor::Visitor visitor{data};
visitor.visit(select);
}
catch (...)
{
tryLogCurrentException("DDLDependencyVisitor");
}
}
}

View File

@ -103,7 +103,7 @@ void DDLLoadingDependencyVisitor::visit(const ASTFunctionWithKeyValueArguments &
auto config = getDictionaryConfigurationFromAST(data.create_query->as<ASTCreateQuery &>(), data.global_context);
auto info = getInfoIfClickHouseDictionarySource(config, data.global_context);
if (!info || !info->is_local)
if (!info || !info->is_local || info->table_name.table.empty())
return;
if (info->table_name.database.empty())

View File

@ -137,7 +137,7 @@ namespace
auto config = getDictionaryConfigurationFromAST(data.create_query->as<ASTCreateQuery &>(), data.global_context);
auto info = getInfoIfClickHouseDictionarySource(config, data.global_context);
if (!info || !info->is_local)
if (!info || !info->is_local || info->table_name.table.empty())
return;
auto * source_list = dictionary.source->elements->as<ASTExpressionList>();

View File

@ -726,7 +726,7 @@ static UUID getTableUUIDIfReplicated(const String & metadata, ContextPtr context
return create.uuid;
}
void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 max_log_ptr)
void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 & max_log_ptr)
{
is_recovering = true;
SCOPE_EXIT({ is_recovering = false; });

View File

@ -102,7 +102,7 @@ private:
void checkQueryValid(const ASTPtr & query, ContextPtr query_context) const;
void recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 max_log_ptr);
void recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 & max_log_ptr);
std::map<String, String> tryGetConsistentMetadataSnapshot(const ZooKeeperPtr & zookeeper, UInt32 & max_log_ptr);
ASTPtr parseQueryFromMetadataInZooKeeper(const String & node_name, const String & query);

View File

@ -0,0 +1,20 @@
#include <Databases/removeWhereConditionPlaceholder.h>
namespace DB
{
std::string removeWhereConditionPlaceholder(const std::string & query)
{
static constexpr auto true_condition = "(1 = 1)";
auto condition_position = query.find(CONDITION_PLACEHOLDER_TO_REPLACE_VALUE);
if (condition_position != std::string::npos)
{
auto query_copy = query;
query_copy.replace(condition_position, CONDITION_PLACEHOLDER_TO_REPLACE_VALUE.size(), true_condition);
return query_copy;
}
return query;
}
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <string>
namespace DB
{
static constexpr std::string_view CONDITION_PLACEHOLDER_TO_REPLACE_VALUE = "{condition}";
/** In case UPDATE_FIELD is specified in {condition} for dictionary that must load all data.
* Replace {condition} with true_condition for initial dictionary load.
* For next dictionary loads {condition} will be updated with UPDATE_FIELD.
*/
std::string removeWhereConditionPlaceholder(const std::string & query);
}

View File

@ -6,7 +6,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Databases/removeWhereConditionPlaceholder.h>
namespace DB
{
@ -24,7 +24,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static constexpr std::string_view CONDITION_PLACEHOLDER_TO_REPLACE_VALUE = "{condition}";
ExternalQueryBuilder::ExternalQueryBuilder(
const DictionaryStructure & dict_struct_,
@ -82,23 +81,8 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
writeChar(';', out);
return out.str();
}
else
{
/** In case UPDATE_FIELD is specified in {condition} for dictionary that must load all data.
* Replace {condition} with true_condition for initial dictionary load.
* For next dictionary loads {condition} will be updated with UPDATE_FIELD.
*/
static constexpr auto true_condition = "(1 = 1)";
auto condition_position = query.find(CONDITION_PLACEHOLDER_TO_REPLACE_VALUE);
if (condition_position != std::string::npos)
{
auto query_copy = query;
query_copy.replace(condition_position, CONDITION_PLACEHOLDER_TO_REPLACE_VALUE.size(), true_condition);
return query_copy;
}
return query;
}
return removeWhereConditionPlaceholder(query);
}
void ExternalQueryBuilder::composeLoadAllQuery(WriteBuffer & out) const

View File

@ -649,10 +649,12 @@ getInfoIfClickHouseDictionarySource(DictionaryConfigurationPtr & config, Context
String database = config->getString("dictionary.source.clickhouse.db", "");
String table = config->getString("dictionary.source.clickhouse.table", "");
if (table.empty())
return {};
info.query = config->getString("dictionary.source.clickhouse.query", "");
info.table_name = {database, table};
if (!table.empty())
info.table_name = {database, table};
else if (info.query.empty())
return {};
try
{

View File

@ -18,6 +18,7 @@ getDictionaryConfigurationFromAST(const ASTCreateQuery & query, ContextPtr conte
struct ClickHouseDictionarySourceInfo
{
QualifiedTableName table_name;
String query;
bool is_local = false;
};

View File

@ -47,14 +47,14 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
IAsynchronousReader & reader_,
const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t min_bytes_for_seek_)
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, read_settings(settings_)
, reader(reader_)
, base_priority(settings_.priority)
, impl(impl_)
, prefetch_buffer(settings_.prefetch_buffer_size)
, min_bytes_for_seek(min_bytes_for_seek_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
@ -63,6 +63,8 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
#else
, log(&Poco::Logger::get("AsyncBuffer(" + impl->getFileName() + ")"))
#endif
, async_read_counters(async_read_counters_)
, prefetches_log(prefetches_log_)
{
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
}
@ -111,7 +113,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, async_read_counters);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
@ -186,8 +188,8 @@ void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemP
.reader_id = current_reader_id,
};
if (auto prefetch_log = Context::getGlobalContextInstance()->getFilesystemReadPrefetchesLog())
prefetch_log->add(elem);
if (prefetches_log)
prefetches_log->add(elem);
}
@ -335,7 +337,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
if (impl->initialized()
&& read_until_position && new_pos < *read_until_position
&& new_pos > file_offset_of_buffer_end
&& new_pos < file_offset_of_buffer_end + min_bytes_for_seek)
&& new_pos < file_offset_of_buffer_end + read_settings.remote_read_min_bytes_for_seek)
{
ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks);
bytes_to_ignore = new_pos - file_offset_of_buffer_end;

View File

@ -12,6 +12,7 @@ namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
class ReadBufferFromRemoteFSGather;
/**
@ -34,7 +35,8 @@ public:
explicit AsynchronousReadIndirectBufferFromRemoteFS(
IAsynchronousReader & reader_, const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE);
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_);
~AsynchronousReadIndirectBufferFromRemoteFS() override;
@ -83,8 +85,6 @@ private:
Memory<> prefetch_buffer;
size_t min_bytes_for_seek;
std::string query_id;
std::string current_reader_id;
@ -95,6 +95,9 @@ private:
Poco::Logger * log;
std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;

View File

@ -48,7 +48,8 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
size_t file_size_,
bool allow_seeks_after_first_read_,
bool use_external_buffer_,
std::optional<size_t> read_until_position_)
std::optional<size_t> read_until_position_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_)
#ifndef NDEBUG
, log(&Poco::Logger::get("CachedOnDiskReadBufferFromFile(" + source_file_path_ + ")"))
@ -62,12 +63,12 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
, read_until_position(read_until_position_ ? *read_until_position_ : file_size_)
, implementation_buffer_creator(implementation_buffer_creator_)
, query_id(query_id_)
, enable_logging(!query_id.empty() && settings_.enable_filesystem_cache_log)
, current_buffer_id(getRandomASCIIString(8))
, allow_seeks_after_first_read(allow_seeks_after_first_read_)
, use_external_buffer(use_external_buffer_)
, query_context_holder(cache_->getQueryContextHolder(query_id, settings_))
, is_persistent(settings_.is_file_cache_persistent)
, cache_log(cache_log_)
{
}
@ -103,7 +104,7 @@ void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
break;
}
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
if (cache_log)
cache_log->add(elem);
}
@ -487,7 +488,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
auto * current_file_segment = &file_segments->front();
auto completed_range = current_file_segment->range();
if (enable_logging)
if (cache_log)
appendFilesystemCacheLog(completed_range, read_type);
chassert(file_offset_of_buffer_end > completed_range.right);
@ -512,7 +513,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile()
{
if (enable_logging && file_segments && !file_segments->empty())
if (cache_log && file_segments && !file_segments->empty())
{
appendFilesystemCacheLog(file_segments->front().range(), read_type);
}
@ -936,6 +937,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
if (result)
{
bool download_current_segment_succeeded = false;
if (download_current_segment)
{
chassert(file_offset_of_buffer_end + size - 1 <= file_segment.range().right);
@ -954,6 +956,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|| file_segment.getCurrentWriteOffset(false) == implementation_buffer->getFileOffsetOfBufferEnd());
LOG_TEST(log, "Successfully written {} bytes", size);
download_current_segment_succeeded = true;
// The implementation_buffer is valid and positioned correctly (at file_segment->getCurrentWriteOffset()).
// Later reads for this file segment can reuse it.
@ -962,14 +965,15 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
implementation_buffer_can_be_reused = true;
}
else
{
chassert(file_segment.state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_TRACE(log, "Bypassing cache because writeCache method failed");
}
}
else
{
LOG_TRACE(log, "No space left in cache to reserve {} bytes, will continue without cache download", size);
if (!success)
{
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
chassert(file_segment.state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
}
}
@ -990,6 +994,8 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
file_offset_of_buffer_end += size;
if (download_current_segment && download_current_segment_succeeded)
chassert(file_segment.getCurrentWriteOffset(false) >= file_offset_of_buffer_end);
chassert(file_offset_of_buffer_end <= read_until_position);
}

View File

@ -32,7 +32,8 @@ public:
size_t file_size_,
bool allow_seeks_after_first_read_,
bool use_external_buffer_,
std::optional<size_t> read_until_position_ = std::nullopt);
std::optional<size_t> read_until_position_,
std::shared_ptr<FilesystemCacheLog> cache_log_);
~CachedOnDiskReadBufferFromFile() override;
@ -137,7 +138,6 @@ private:
String last_caller_id;
String query_id;
bool enable_logging = false;
String current_buffer_id;
bool allow_seeks_after_first_read;
@ -148,6 +148,8 @@ private:
FileCache::QueryContextHolderPtr query_context_holder;
bool is_persistent;
std::shared_ptr<FilesystemCacheLog> cache_log;
};
}

View File

@ -153,27 +153,27 @@ FileSegment & FileSegmentRangeWriter::allocateFileSegment(size_t offset, FileSeg
void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_segment)
{
if (cache_log)
if (!cache_log)
return;
auto file_segment_range = file_segment.range();
size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize(false) - 1;
FilesystemCacheLogElement elem
{
auto file_segment_range = file_segment.range();
size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize(false) - 1;
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_path,
.file_segment_range = { file_segment_range.left, file_segment_right_bound },
.requested_range = {},
.cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE,
.file_segment_size = file_segment_range.size(),
.read_from_cache_attempted = false,
.read_buffer_id = {},
.profile_counters = nullptr,
};
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_path,
.file_segment_range = { file_segment_range.left, file_segment_right_bound },
.requested_range = {},
.cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE,
.file_segment_size = file_segment_range.size(),
.read_from_cache_attempted = false,
.read_buffer_id = {},
.profile_counters = nullptr,
};
cache_log->add(elem);
}
cache_log->add(elem);
}
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)

View File

@ -8,30 +8,29 @@
#include <iostream>
#include <base/hex.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_)
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBuffer(nullptr, 0)
, read_buffer_creator(std::move(read_buffer_creator_))
, blobs_to_read(blobs_to_read_)
, settings(settings_)
, current_object(!blobs_to_read_.empty() ? blobs_to_read_.front() : throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read zero number of objects"))
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
, enable_cache_log(!query_id.empty() && settings.enable_filesystem_cache_log)
{
if (cache_log_ && settings.enable_filesystem_cache_log)
cache_log = cache_log_;
if (!blobs_to_read.empty())
current_object = blobs_to_read.front();
with_cache = settings.remote_fs_cache
&& settings.enable_filesystem_cache
&& (!query_id.empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
@ -39,7 +38,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
{
if (current_buf != nullptr && !with_cache && enable_cache_log)
if (current_buf != nullptr && !with_cache)
{
appendFilesystemCacheLog();
}
@ -64,7 +63,8 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt);
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt,
cache_log);
}
return current_read_buffer_creator();
@ -72,7 +72,9 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
{
chassert(!current_object.remote_path.empty());
if (!cache_log || current_object.remote_path.empty())
return;
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
@ -83,9 +85,7 @@ void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
.file_segment_size = total_bytes_read_from_current_file,
.read_from_cache_attempted = false,
};
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
cache_log->add(elem);
cache_log->add(elem);
}
IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore)
@ -99,9 +99,7 @@ IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data,
file_offset_of_buffer_end = offset;
bytes_to_ignore = ignore;
assert(!bytes_to_ignore || initialized());
auto result = nextImpl();
const auto result = nextImpl();
if (result)
return { working_buffer.size(), BufferBase::offset(), nullptr };
@ -111,6 +109,9 @@ IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data,
void ReadBufferFromRemoteFSGather::initialize()
{
if (blobs_to_read.empty())
return;
/// One clickhouse file can be split into multiple files in remote fs.
auto current_buf_offset = file_offset_of_buffer_end;
for (size_t i = 0; i < blobs_to_read.size(); ++i)
@ -144,21 +145,14 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
if (!current_buf)
initialize();
/// If current buffer has remaining data - use it.
if (current_buf)
{
if (readImpl())
return true;
}
else
{
if (!current_buf)
return false;
}
if (readImpl())
return true;
if (!moveToNextBuffer())
{
return false;
}
return readImpl();
}
@ -274,10 +268,8 @@ size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
ReadBufferFromRemoteFSGather::~ReadBufferFromRemoteFSGather()
{
if (!with_cache && enable_cache_log)
{
if (!with_cache)
appendFilesystemCacheLog();
}
}
}

View File

@ -25,7 +25,8 @@ public:
ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_);
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_);
~ReadBufferFromRemoteFSGather() override;
@ -93,7 +94,7 @@ private:
size_t total_bytes_read_from_current_file = 0;
bool enable_cache_log = false;
std::shared_ptr<FilesystemCacheLog> cache_log;
};
}

View File

@ -11,7 +11,6 @@
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/AsyncReadCounters.h>
#include <Interpreters/Context.h>
#include <base/getThreadId.h>
#include <future>
@ -75,17 +74,11 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
return scheduleFromThreadPool<Result>([request]() -> Result
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead};
std::optional<AsyncReadIncrement> increment;
if (CurrentThread::isInitialized())
{
auto query_context = CurrentThread::get().getQueryContext();
if (query_context)
increment.emplace(query_context->getAsyncReadCounters());
}
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
auto async_read_counters = remote_fs_fd->getReadCounters();
std::optional<AsyncReadIncrement> increment = async_read_counters ? std::optional<AsyncReadIncrement>(async_read_counters) : std::nullopt;
auto watch = std::make_unique<Stopwatch>(CLOCK_MONOTONIC);
Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
watch->stop();

View File

@ -8,6 +8,8 @@
namespace DB
{
struct AsyncReadCounters;
class ThreadPoolRemoteFSReader : public IAsynchronousReader
{
public:
@ -24,12 +26,19 @@ private:
class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor
{
public:
explicit RemoteFSFileDescriptor(ReadBuffer & reader_) : reader(reader_) { }
explicit RemoteFSFileDescriptor(
ReadBuffer & reader_,
std::shared_ptr<AsyncReadCounters> async_read_counters_)
: reader(reader_)
, async_read_counters(async_read_counters_) {}
IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
std::shared_ptr<AsyncReadCounters> getReadCounters() const { return async_read_counters; }
private:
ReadBuffer & reader;
std::shared_ptr<AsyncReadCounters> async_read_counters;
};
}

View File

@ -5,7 +5,9 @@
#include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/IOUringReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <IO/SynchronousReader.h>
#include <IO/AsynchronousReader.h>
#include <Common/ProfileEvents.h>
#include "config.h"
@ -27,7 +29,6 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
const std::string & filename,
const ReadSettings & settings,
@ -119,11 +120,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
auto & reader = getThreadPoolReader(FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
settings.priority,
@ -137,11 +134,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool)
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
settings.priority,

View File

@ -0,0 +1,76 @@
#include <Common/ErrorCodes.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <IO/AsynchronousReader.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <IO/SynchronousReader.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
#include <Interpreters/Context.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type)
{
#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
const auto & config = Poco::Util::Application::instance().config();
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
static auto asynchronous_remote_fs_reader = createThreadPoolReader(type, config);
return *asynchronous_remote_fs_reader;
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
static auto asynchronous_local_fs_reader = createThreadPoolReader(type, config);
return *asynchronous_local_fs_reader;
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
static auto synchronous_local_fs_reader = createThreadPoolReader(type, config);
return *synchronous_local_fs_reader;
}
}
#else
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
return context->getThreadPoolReader(type);
#endif
}
std::unique_ptr<IAsynchronousReader> createThreadPoolReader(
FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config)
{
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
auto pool_size = config.getUInt(".threadpool_remote_fs_reader_pool_size", 250);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
return std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
auto pool_size = config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
return std::make_unique<ThreadPoolReader>(pool_size, queue_size);
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
return std::make_unique<SynchronousReader>();
}
}
}
}

View File

@ -0,0 +1,23 @@
#pragma once
namespace Poco::Util { class AbstractConfiguration; }
namespace DB
{
class IAsynchronousReader;
enum class FilesystemReaderType
{
SYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_REMOTE_FS_READER,
};
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type);
std::unique_ptr<IAsynchronousReader> createThreadPoolReader(
FilesystemReaderType type,
const Poco::Util::AbstractConfiguration & config);
}

Some files were not shown because too many files have changed in this diff Show More