Merge branch 'master' into update-minio-statless

This commit is contained in:
Antonio Andelic 2024-08-09 11:50:17 +02:00
commit 34fb169aa7
244 changed files with 13021 additions and 3946 deletions

View File

@ -67,7 +67,7 @@ jobs:
if: ${{ !cancelled() }}
run: |
export WORKFLOW_RESULT_FILE="/tmp/workflow_results.json"
cat >> "$WORKFLOW_RESULT_FILE" << 'EOF'
cat > "$WORKFLOW_RESULT_FILE" << 'EOF'
${{ toJson(needs) }}
EOF
python3 ./tests/ci/ci_buddy.py --check-wf-status

View File

@ -428,12 +428,17 @@ if (NOT SANITIZE)
set (CMAKE_POSITION_INDEPENDENT_CODE OFF)
endif()
if (OS_LINUX AND NOT (ARCH_AARCH64 OR ARCH_S390X) AND NOT SANITIZE)
# Slightly more efficient code can be generated
# It's disabled for ARM because otherwise ClickHouse cannot run on Android.
if (NOT OS_ANDROID AND OS_LINUX AND NOT ARCH_S390X AND NOT SANITIZE)
# Using '-no-pie' builds executables with fixed addresses, resulting in slightly more efficient code
# and keeping binary addresses constant even with ASLR enabled.
# Disabled on Android as it requires PIE: https://source.android.com/docs/security/enhancements#android-5
# Disabled on IBM S390X due to build issues with 'no-pie'
# Disabled with sanitizers to avoid issues with maximum relocation size: https://github.com/ClickHouse/ClickHouse/pull/49145
set (CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -fno-pie")
set (CMAKE_C_FLAGS_RELWITHDEBINFO "${CMAKE_C_FLAGS_RELWITHDEBINFO} -fno-pie")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -no-pie -Wl,-no-pie")
else ()
message (WARNING "ClickHouse is built as PIE, system.trace_log will contain invalid addresses after server restart.")
endif ()
if (ENABLE_TESTS)

View File

@ -71,7 +71,6 @@ add_contrib (zlib-ng-cmake zlib-ng)
add_contrib (bzip2-cmake bzip2)
add_contrib (minizip-ng-cmake minizip-ng)
add_contrib (snappy-cmake snappy)
add_contrib (rocksdb-cmake rocksdb)
add_contrib (thrift-cmake thrift)
# parquet/arrow/orc
add_contrib (arrow-cmake arrow) # requires: snappy, thrift, double-conversion
@ -148,6 +147,7 @@ add_contrib (hive-metastore-cmake hive-metastore) # requires: thrift, avro, arro
add_contrib (cppkafka-cmake cppkafka)
add_contrib (libpqxx-cmake libpqxx)
add_contrib (libpq-cmake libpq)
add_contrib (rocksdb-cmake rocksdb) # requires: jemalloc, snappy, zlib, lz4, zstd, liburing
add_contrib (nuraft-cmake NuRaft)
add_contrib (fast_float-cmake fast_float)
add_contrib (idna-cmake idna)

2
contrib/librdkafka vendored

@ -1 +1 @@
Subproject commit 2d2aab6f5b79db1cfca15d7bf0dee75d00d82082
Subproject commit 39d4ed49ccf3406e2bf825d5d7b0903b5a290782

2
contrib/rocksdb vendored

@ -1 +1 @@
Subproject commit 49ce8a1064dd1ad89117899839bf136365e49e79
Subproject commit 5f003e4a22d2e48e37c98d9620241237cd30dd24

View File

@ -5,36 +5,38 @@ if (NOT ENABLE_ROCKSDB OR NO_SSE3_OR_HIGHER) # assumes SSE4.2 and PCLMUL
return()
endif()
# not in original build system, otherwise xxHash.cc fails to compile with ClickHouse C++23 default
set (CMAKE_CXX_STANDARD 20)
# Always disable jemalloc for rocksdb by default because it introduces non-standard jemalloc APIs
option(WITH_JEMALLOC "build with JeMalloc" OFF)
option(WITH_LIBURING "build with liburing" OFF) # TODO could try to enable this conditionally, depending on ClickHouse's ENABLE_LIBURING
# ClickHouse cannot be compiled without snappy, lz4, zlib, zstd
option(WITH_SNAPPY "build with SNAPPY" ON)
option(WITH_LZ4 "build with lz4" ON)
option(WITH_ZLIB "build with zlib" ON)
option(WITH_ZSTD "build with zstd" ON)
if(WITH_SNAPPY)
if (ENABLE_JEMALLOC AND OS_LINUX) # gives compile errors with jemalloc enabled for rocksdb on non-Linux
add_definitions(-DROCKSDB_JEMALLOC -DJEMALLOC_NO_DEMANGLE)
list (APPEND THIRDPARTY_LIBS ch_contrib::jemalloc)
endif ()
if (ENABLE_LIBURING)
add_definitions(-DROCKSDB_IOURING_PRESENT)
list (APPEND THIRDPARTY_LIBS ch_contrib::liburing)
endif ()
if (WITH_SNAPPY)
add_definitions(-DSNAPPY)
list(APPEND THIRDPARTY_LIBS ch_contrib::snappy)
endif()
if(WITH_ZLIB)
if (WITH_ZLIB)
add_definitions(-DZLIB)
list(APPEND THIRDPARTY_LIBS ch_contrib::zlib)
endif()
if(WITH_LZ4)
if (WITH_LZ4)
add_definitions(-DLZ4)
list(APPEND THIRDPARTY_LIBS ch_contrib::lz4)
endif()
if(WITH_ZSTD)
if (WITH_ZSTD)
add_definitions(-DZSTD)
list(APPEND THIRDPARTY_LIBS ch_contrib::zstd)
endif()
@ -88,6 +90,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/cache/sharded_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/tiered_secondary_cache.cc
${ROCKSDB_SOURCE_DIR}/db/arena_wrapped_db_iter.cc
${ROCKSDB_SOURCE_DIR}/db/attribute_group_iterator_impl.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_contents.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_fetcher.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_file_addition.cc
@ -104,6 +107,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/blob/prefetch_buffer_collection.cc
${ROCKSDB_SOURCE_DIR}/db/builder.cc
${ROCKSDB_SOURCE_DIR}/db/c.cc
${ROCKSDB_SOURCE_DIR}/db/coalescing_iterator.cc
${ROCKSDB_SOURCE_DIR}/db/column_family.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_iterator.cc
@ -124,6 +128,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_write.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_compaction_flush.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_files.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_follower.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_open.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_debug.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_experimental.cc
@ -181,6 +186,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/env/env_encryption.cc
${ROCKSDB_SOURCE_DIR}/env/file_system.cc
${ROCKSDB_SOURCE_DIR}/env/file_system_tracer.cc
${ROCKSDB_SOURCE_DIR}/env/fs_on_demand.cc
${ROCKSDB_SOURCE_DIR}/env/fs_remap.cc
${ROCKSDB_SOURCE_DIR}/env/mock_env.cc
${ROCKSDB_SOURCE_DIR}/env/unique_id_gen.cc
@ -368,6 +374,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/utilities/persistent_cache/volatile_tier_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/simulator_cache/cache_simulator.cc
${ROCKSDB_SOURCE_DIR}/utilities/simulator_cache/sim_cache.cc
${ROCKSDB_SOURCE_DIR}/utilities/table_properties_collectors/compact_for_tiering_collector.cc
${ROCKSDB_SOURCE_DIR}/utilities/table_properties_collectors/compact_on_deletion_collector.cc
${ROCKSDB_SOURCE_DIR}/utilities/trace/file_trace_reader_writer.cc
${ROCKSDB_SOURCE_DIR}/utilities/trace/replayer_impl.cc
@ -388,6 +395,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/utilities/transactions/write_prepared_txn_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/write_unprepared_txn.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/write_unprepared_txn_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/types_util.cc
${ROCKSDB_SOURCE_DIR}/utilities/ttl/db_ttl_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/wal_filter.cc
${ROCKSDB_SOURCE_DIR}/utilities/write_batch_with_index/write_batch_with_index.cc
@ -418,14 +426,18 @@ if(HAS_ARMV8_CRC)
endif(HAS_ARMV8_CRC)
list(APPEND SOURCES
"${ROCKSDB_SOURCE_DIR}/port/port_posix.cc"
"${ROCKSDB_SOURCE_DIR}/env/env_posix.cc"
"${ROCKSDB_SOURCE_DIR}/env/fs_posix.cc"
"${ROCKSDB_SOURCE_DIR}/env/io_posix.cc")
${ROCKSDB_SOURCE_DIR}/port/port_posix.cc
${ROCKSDB_SOURCE_DIR}/env/env_posix.cc
${ROCKSDB_SOURCE_DIR}/env/fs_posix.cc
${ROCKSDB_SOURCE_DIR}/env/io_posix.cc)
add_library(_rocksdb ${SOURCES})
add_library(ch_contrib::rocksdb ALIAS _rocksdb)
target_link_libraries(_rocksdb PRIVATE ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
# Not in the native build system but useful anyways:
# Make all functions in xxHash.h inline. Beneficial for performance: https://github.com/Cyan4973/xxHash/tree/v0.8.2#build-modifiers
target_compile_definitions (_rocksdb PRIVATE XXH_INLINE_ALL)
# SYSTEM is required to overcome some issues
target_include_directories(_rocksdb SYSTEM BEFORE INTERFACE "${ROCKSDB_SOURCE_DIR}/include")

View File

@ -34,7 +34,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.7.2.13"
ARG VERSION="24.7.3.42"
ARG PACKAGES="clickhouse-keeper"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.7.2.13"
ARG VERSION="24.7.3.42"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -28,7 +28,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="24.7.2.13"
ARG VERSION="24.7.3.42"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
#docker-official-library:off

View File

@ -232,15 +232,26 @@ function run_tests()
set +e
TEST_ARGS=(
-j 2
--testname
--shard
--zookeeper
--check-zookeeper-session
--no-stateless
--hung-check
--print-time
--capture-client-stacktrace
"${ADDITIONAL_OPTIONS[@]}"
"$SKIP_TESTS_OPTION"
)
if [[ -n "$USE_PARALLEL_REPLICAS" ]] && [[ "$USE_PARALLEL_REPLICAS" -eq 1 ]]; then
clickhouse-test --client="clickhouse-client --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 \
--max_parallel_replicas=100 --cluster_for_parallel_replicas='parallel_replicas'" \
-j 2 --testname --shard --zookeeper --check-zookeeper-session --no-stateless --no-parallel-replicas --hung-check --print-time "${ADDITIONAL_OPTIONS[@]}" \
"$SKIP_TESTS_OPTION" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt
else
clickhouse-test -j 2 --testname --shard --zookeeper --check-zookeeper-session --no-stateless --hung-check --print-time "${ADDITIONAL_OPTIONS[@]}" \
"$SKIP_TESTS_OPTION" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt
TEST_ARGS+=(
--client="clickhouse-client --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 --max_parallel_replicas=100 --cluster_for_parallel_replicas='parallel_replicas'"
--no-parallel-replicas
)
fi
clickhouse-test "${TEST_ARGS[@]}" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt
set -e
}

View File

@ -264,11 +264,22 @@ function run_tests()
TIMEOUT=$((MAX_RUN_TIME - 800 > 8400 ? 8400 : MAX_RUN_TIME - 800))
START_TIME=${SECONDS}
set +e
timeout --preserve-status --signal TERM --kill-after 60m ${TIMEOUT}s \
clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --hung-check --print-time \
--no-drop-if-fail --test-runs "$NUM_TRIES" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \
| ts '%Y-%m-%d %H:%M:%S' \
| tee -a test_output/test_result.txt
TEST_ARGS=(
--testname
--shard
--zookeeper
--check-zookeeper-session
--hung-check
--print-time
--no-drop-if-fail
--capture-client-stacktrace
--test-runs "$NUM_TRIES"
"${ADDITIONAL_OPTIONS[@]}"
)
timeout --preserve-status --signal TERM --kill-after 60m ${TIMEOUT}s clickhouse-test "${TEST_ARGS[@]}" 2>&1 \
| ts '%Y-%m-%d %H:%M:%S' \
| tee -a test_output/test_result.txt
set -e
DURATION=$((SECONDS - START_TIME))

View File

@ -0,0 +1,37 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.7.3.42-stable (63730bc4293) FIXME as compared to v24.7.2.13-stable (6e41f601b2f)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Backported in [#67969](https://github.com/ClickHouse/ClickHouse/issues/67969): Fixed reading of subcolumns after `ALTER ADD COLUMN` query. [#66243](https://github.com/ClickHouse/ClickHouse/pull/66243) ([Anton Popov](https://github.com/CurtizJ)).
* Backported in [#67637](https://github.com/ClickHouse/ClickHouse/issues/67637): Fix for occasional deadlock in Context::getDDLWorker. [#66843](https://github.com/ClickHouse/ClickHouse/pull/66843) ([Alexander Gololobov](https://github.com/davenger)).
* Backported in [#67820](https://github.com/ClickHouse/ClickHouse/issues/67820): Fix possible deadlock on query cancel with parallel replicas. [#66905](https://github.com/ClickHouse/ClickHouse/pull/66905) ([Nikita Taranov](https://github.com/nickitat)).
* Backported in [#67881](https://github.com/ClickHouse/ClickHouse/issues/67881): Correctly parse file name/URI containing `::` if it's not an archive. [#67433](https://github.com/ClickHouse/ClickHouse/pull/67433) ([Antonio Andelic](https://github.com/antonio2368)).
* Backported in [#67713](https://github.com/ClickHouse/ClickHouse/issues/67713): Fix reloading SQL UDFs with UNION. Previously, restarting the server could make UDF invalid. [#67665](https://github.com/ClickHouse/ClickHouse/pull/67665) ([Antonio Andelic](https://github.com/antonio2368)).
* Backported in [#67995](https://github.com/ClickHouse/ClickHouse/issues/67995): Validate experimental/suspicious data types in ALTER ADD/MODIFY COLUMN. [#67911](https://github.com/ClickHouse/ClickHouse/pull/67911) ([Kruglov Pavel](https://github.com/Avogar)).
#### Critical Bug Fix (crash, LOGICAL_ERROR, data loss, RBAC)
* Backported in [#67818](https://github.com/ClickHouse/ClickHouse/issues/67818): Only relevant to the experimental Variant data type. Fix crash with Variant + AggregateFunction type. [#67122](https://github.com/ClickHouse/ClickHouse/pull/67122) ([Kruglov Pavel](https://github.com/Avogar)).
* Backported in [#67766](https://github.com/ClickHouse/ClickHouse/issues/67766): Fix crash of `uniq` and `uniqTheta ` with `tuple()` argument. Closes [#67303](https://github.com/ClickHouse/ClickHouse/issues/67303). [#67306](https://github.com/ClickHouse/ClickHouse/pull/67306) ([flynn](https://github.com/ucasfl)).
* Backported in [#67854](https://github.com/ClickHouse/ClickHouse/issues/67854): Fixes [#66026](https://github.com/ClickHouse/ClickHouse/issues/66026). Avoid unresolved table function arguments traversal in `ReplaceTableNodeToDummyVisitor`. [#67522](https://github.com/ClickHouse/ClickHouse/pull/67522) ([Dmitry Novik](https://github.com/novikd)).
* Backported in [#67840](https://github.com/ClickHouse/ClickHouse/issues/67840): Fix potential stack overflow in `JSONMergePatch` function. Renamed this function from `jsonMergePatch` to `JSONMergePatch` because the previous name was wrong. The previous name is still kept for compatibility. Improved diagnostic of errors in the function. This closes [#67304](https://github.com/ClickHouse/ClickHouse/issues/67304). [#67756](https://github.com/ClickHouse/ClickHouse/pull/67756) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Backported in [#67518](https://github.com/ClickHouse/ClickHouse/issues/67518): Split slow test 03036_dynamic_read_subcolumns. [#66954](https://github.com/ClickHouse/ClickHouse/pull/66954) ([Nikita Taranov](https://github.com/nickitat)).
* Backported in [#67516](https://github.com/ClickHouse/ClickHouse/issues/67516): Split 01508_partition_pruning_long. [#66983](https://github.com/ClickHouse/ClickHouse/pull/66983) ([Nikita Taranov](https://github.com/nickitat)).
* Backported in [#67529](https://github.com/ClickHouse/ClickHouse/issues/67529): Reduce max time of 00763_long_lock_buffer_alter_destination_table. [#67185](https://github.com/ClickHouse/ClickHouse/pull/67185) ([Raúl Marín](https://github.com/Algunenano)).
* Backported in [#67643](https://github.com/ClickHouse/ClickHouse/issues/67643): [Green CI] Fix potentially flaky test_mask_sensitive_info integration test. [#67506](https://github.com/ClickHouse/ClickHouse/pull/67506) ([Alexey Katsman](https://github.com/alexkats)).
* Backported in [#67609](https://github.com/ClickHouse/ClickHouse/issues/67609): Fix test_zookeeper_config_load_balancing after adding the xdist worker name to the instance. [#67590](https://github.com/ClickHouse/ClickHouse/pull/67590) ([Pablo Marcos](https://github.com/pamarcos)).
* Backported in [#67871](https://github.com/ClickHouse/ClickHouse/issues/67871): Fix 02434_cancel_insert_when_client_dies. [#67600](https://github.com/ClickHouse/ClickHouse/pull/67600) ([vdimir](https://github.com/vdimir)).
* Backported in [#67704](https://github.com/ClickHouse/ClickHouse/issues/67704): Fix 02910_bad_logs_level_in_local in fast tests. [#67603](https://github.com/ClickHouse/ClickHouse/pull/67603) ([Raúl Marín](https://github.com/Algunenano)).
* Backported in [#67689](https://github.com/ClickHouse/ClickHouse/issues/67689): Fix 01605_adaptive_granularity_block_borders. [#67605](https://github.com/ClickHouse/ClickHouse/pull/67605) ([Nikita Taranov](https://github.com/nickitat)).
* Backported in [#67827](https://github.com/ClickHouse/ClickHouse/issues/67827): Try fix 03143_asof_join_ddb_long. [#67620](https://github.com/ClickHouse/ClickHouse/pull/67620) ([Nikita Taranov](https://github.com/nickitat)).
* Backported in [#67892](https://github.com/ClickHouse/ClickHouse/issues/67892): Revert "Merge pull request [#66510](https://github.com/ClickHouse/ClickHouse/issues/66510) from canhld94/fix_trivial_count_non_deterministic_func". [#67800](https://github.com/ClickHouse/ClickHouse/pull/67800) ([János Benjamin Antal](https://github.com/antaljanosbenjamin)).

View File

@ -27,23 +27,23 @@ Avoid dumping copies of external code into the library directory.
Instead create a Git submodule to pull third-party code from an external upstream repository.
All submodules used by ClickHouse are listed in the `.gitmodule` file.
If the library can be used as-is (the default case), you can reference the upstream repository directly.
If the library needs patching, create a fork of the upstream repository in the [ClickHouse organization on GitHub](https://github.com/ClickHouse).
- If the library can be used as-is (the default case), you can reference the upstream repository directly.
- If the library needs patching, create a fork of the upstream repository in the [ClickHouse organization on GitHub](https://github.com/ClickHouse).
In the latter case, we aim to isolate custom patches as much as possible from upstream commits.
To that end, create a branch with prefix `clickhouse/` from the branch or tag you want to integrate, e.g. `clickhouse/master` (for branch `master`) or `clickhouse/release/vX.Y.Z` (for tag `release/vX.Y.Z`).
This ensures that pulls from the upstream repository into the fork will leave custom `clickhouse/` branches unaffected.
Submodules in `contrib/` must only track `clickhouse/` branches of forked third-party repositories.
To that end, create a branch with prefix `ClickHouse/` from the branch or tag you want to integrate, e.g. `ClickHouse/2024_2` (for branch `2024_2`) or `ClickHouse/release/vX.Y.Z` (for tag `release/vX.Y.Z`).
Avoid following upstream development branches `master`/ `main` / `dev` (i.e., prefix branches `ClickHouse/master` / `ClickHouse/main` / `ClickHouse/dev` in the fork repository).
Such branches are moving targets which make proper versioning harder.
"Prefix branches" ensure that pulls from the upstream repository into the fork will leave custom `ClickHouse/` branches unaffected.
Submodules in `contrib/` must only track `ClickHouse/` branches of forked third-party repositories.
Patches are only applied against `clickhouse/` branches of external libraries.
For that, push the patch as a branch with `clickhouse/`, e.g. `clickhouse/fix-some-desaster`.
Then create a PR from the new branch against the custom tracking branch with `clickhouse/` prefix, (e.g. `clickhouse/master` or `clickhouse/release/vX.Y.Z`) and merge the patch.
Patches are only applied against `ClickHouse/` branches of external libraries.
There are two ways to do that:
- you like to make a new fix against a `ClickHouse/`-prefix branch in the forked repository, e.g. a sanitizer fix. In that case, push the fix as a branch with `ClickHouse/` prefix, e.g. `ClickHouse/fix-sanitizer-disaster`. Then create a PR from the new branch against the custom tracking branch, e.g. `ClickHouse/2024_2 <-- ClickHouse/fix-sanitizer-disaster` and merge the PR.
- you update the submodule and need to re-apply earlier patches. In this case, re-creating old PRs is overkill. Instead, simply cherry-pick older commits into the new `ClickHouse/` branch (corresponding to the new version). Feel free to squash commits of PRs that had multiple commits. In the best case, we did contribute custom patches back to upstream and can omit patches in the new version.
Once the submodule has been updated, bump the submodule in ClickHouse to point to the new hash in the fork.
Create patches of third-party libraries with the official repository in mind and consider contributing the patch back to the upstream repository.
This makes sure that others will also benefit from the patch and it will not be a maintenance burden for the ClickHouse team.
To pull upstream changes into the submodule, you can use two methods:
- (less work but less clean): merge upstream `master` into the corresponding `clickhouse/` tracking branch in the forked repository. You will need to resolve merge conflicts with previous custom patches. This method can be used when the `clickhouse/` branch tracks an upstream development branch like `master`, `main`, `dev`, etc.
- (more work but cleaner): create a new branch with `clickhouse/` prefix from the upstream commit or tag you like to integrate. Then re-apply all existing patches using new PRs (or squash them into a single PR). This method can be used when the `clickhouse/` branch tracks a specific upstream version branch or tag. It is cleaner in the sense that custom patches and upstream changes are better isolated from each other.
Once the submodule has been updated, bump the submodule in ClickHouse to point to the new hash in the fork.

View File

@ -61,6 +61,7 @@ Engines in the family:
- [RabbitMQ](../../engines/table-engines/integrations/rabbitmq.md)
- [PostgreSQL](../../engines/table-engines/integrations/postgresql.md)
- [S3Queue](../../engines/table-engines/integrations/s3queue.md)
- [TimeSeries](../../engines/table-engines/integrations/time-series.md)
### Special Engines {#special-engines}

View File

@ -251,6 +251,44 @@ The number of rows in one Kafka message depends on whether the format is row-bas
- For row-based formats the number of rows in one Kafka message can be controlled by setting `kafka_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
## Experimental engine to store committed offsets in ClickHouse Keeper
If `allow_experimental_kafka_offsets_storage_in_keeper` is enabled, then two more settings can be specified to the Kafka table engine:
- `kafka_keeper_path` specifies the path to the table in ClickHouse Keeper
- `kafka_replica_name` specifies the replica name in ClickHouse Keeper
Either both of the settings must be specified or neither of them. When both of them are specified, then a new, experimental Kafka engine will be used. The new engine doesn't depend on storing the committed offsets in Kafka, but stores them in ClickHouse Keeper. It still tries to commit the offsets to Kafka, but it only depends on those offsets when the table is created. In any other circumstances (table is restarted, or recovered after some error) the offsets stored in ClickHouse Keeper will be used as an offset to continue consuming messages from. Apart from the committed offset, it also stores how many messages were consumed in the last batch, so if the insert fails, the same amount of messages will be consumed, thus enabling deduplication if necessary.
Example:
``` sql
CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
kafka_keeper_path = '/clickhouse/{database}/experimental_kafka',
kafka_replica_name = 'r1'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;
```
Or to utilize the `uuid` and `replica` macros similarly to ReplicatedMergeTree:
``` sql
CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
kafka_keeper_path = '/clickhouse/{database}/{uuid}',
kafka_replica_name = '{replica}'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;
```
### Known limitations
As the new engine is experimental, it is not production ready yet. There are few known limitations of the implementation:
- The biggest limitation is the engine doesn't support direct reading. Reading from the engine using materialized views and writing to the engine work, but direct reading doesn't. As a result, all direct `SELECT` queries will fail.
- Rapidly dropping and recreating the table or specifying the same ClickHouse Keeper path to different engines might cause issues. As best practice you can use the `{uuid}` in `kafka_keeper_path` to avoid clashing paths.
- To make repeatable reads, messages cannot be consumed from multiple partitions on a single thread. On the other hand, the Kafka consumers have to be polled regularly to keep them alive. As a result of these two objectives, we decided to only allow creating multiple consumers if `kafka_thread_per_consumer` is enabled, otherwise it is too complicated to avoid issues regarding polling consumers regularly.
- Consumers created by the new storage engine do not show up in [`system.kafka_consumers`](../../../operations/system-tables/kafka_consumers.md) table.
**See Also**
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)

View File

@ -0,0 +1,295 @@
---
slug: /en/engines/table-engines/special/time_series
sidebar_position: 60
sidebar_label: TimeSeries
---
# TimeSeries Engine [Experimental]
A table engine storing time series, i.e. a set of values associated with timestamps and tags (or labels):
```
metric_name1[tag1=value1, tag2=value2, ...] = {timestamp1: value1, timestamp2: value2, ...}
metric_name2[...] = ...
```
:::info
This is an experimental feature that may change in backwards-incompatible ways in the future releases.
Enable usage of the TimeSeries table engine
with [allow_experimental_time_series_table](../../../operations/settings/settings.md#allow-experimental-time-series-table) setting.
Input the command `set allow_experimental_time_series_table = 1`.
:::
## Syntax {#syntax}
``` sql
CREATE TABLE name [(columns)] ENGINE=TimeSeries
[SETTINGS var1=value1, ...]
[DATA db.data_table_name | DATA ENGINE data_table_engine(arguments)]
[TAGS db.tags_table_name | TAGS ENGINE tags_table_engine(arguments)]
[METRICS db.metrics_table_name | METRICS ENGINE metrics_table_engine(arguments)]
```
## Usage {#usage}
It's easier to start with everything set by default (it's allowed to create a `TimeSeries` table without specifying a list of columns):
``` sql
CREATE TABLE my_table ENGINE=TimeSeries
```
Then this table can be used with the following protocols (a port must be assigned in the server configuration):
- [prometheus remote-write](../../../interfaces/prometheus.md#remote-write)
- [prometheus remote-read](../../../interfaces/prometheus.md#remote-read)
## Target tables {#target-tables}
A `TimeSeries` table doesn't have its own data, everything is stored in its target tables.
This is similar to how a [materialized view](../../../sql-reference/statements/create/view#materialized-view) works,
with the difference that a materialized view has one target table
whereas a `TimeSeries` table has three target tables named [data]{#data-table}, [tags]{#tags-table], and [metrics]{#metrics-table}.
The target tables can be either specified explicitly in the `CREATE TABLE` query
or the `TimeSeries` table engine can generate inner target tables automatically.
The target tables are the following:
1. The _data_ table {#data-table} contains time series associated with some identifier.
The _data_ table must have columns:
| Name | Mandatory? | Default type | Possible types | Description |
|---|---|---|---|---|
| `id` | [x] | `UUID` | any | Identifies a combination of a metric names and tags |
| `timestamp` | [x] | `DateTime64(3)` | `DateTime64(X)` | A time point |
| `value` | [x] | `Float64` | `Float32` or `Float64` | A value associated with the `timestamp` |
2. The _tags_ table {#tags-table} contains identifiers calculated for each combination of a metric name and tags.
The _tags_ table must have columns:
| Name | Mandatory? | Default type | Possible types | Description |
|---|---|---|---|---|
| `id` | [x] | `UUID` | any (must match the type of `id` in the [data]{#data-table} table) | An `id` identifies a combination of a metric name and tags. The DEFAULT expression specifies how to calculate such an identifier |
| `metric_name` | [x] | `LowCardinality(String)` | `String` or `LowCardinality(String)` | The name of a metric |
| `<tag_value_column>` | [ ] | `String` | `String` or `LowCardinality(String)` or `LowCardinality(Nullable(String))` | The value of a specific tag, the tag's name and the name of a corresponding column are specified in the [tags_to_columns](#settings) setting |
| `tags` | [x] | `Map(LowCardinality(String), String)` | `Map(String, String)` or `Map(LowCardinality(String), String)` or `Map(LowCardinality(String), LowCardinality(String))` | Map of tags excluding the tag `__name__` containing the name of a metric and excluding tags with names enumerated in the [tags_to_columns](#settings) setting |
| `all_tags` | [ ] | `Map(String, String)` | `Map(String, String)` or `Map(LowCardinality(String), String)` or `Map(LowCardinality(String), LowCardinality(String))` | Ephemeral column, each row is a map of all the tags excluding only the tag `__name__` containing the name of a metric. The only purpose of that column is to be used while calculating `id` |
| `min_time` | [ ] | `Nullable(DateTime64(3))` | `DateTime64(X)` or `Nullable(DateTime64(X))` | Minimum timestamp of time series with that `id`. The column is created if [store_min_time_and_max_time](#settings) is `true` |
| `max_time` | [ ] | `Nullable(DateTime64(3))` | `DateTime64(X)` or `Nullable(DateTime64(X))` | Maximum timestamp of time series with that `id`. The column is created if [store_min_time_and_max_time](#settings) is `true` |
3. The _metrics_ table {#metrics-table} contains some information about metrics been collected, the types of those metrics and their descriptions.
The _metrics_ table must have columns:
| Name | Mandatory? | Default type | Possible types | Description |
|---|---|---|---|---|
| `metric_family_name` | [x] | `String` | `String` or `LowCardinality(String)` | The name of a metric family |
| `type` | [x] | `String` | `String` or `LowCardinality(String)` | The type of a metric family, one of "counter", "gauge", "summary", "stateset", "histogram", "gaugehistogram" |
| `unit` | [x] | `String` | `String` or `LowCardinality(String)` | The unit used in a metric |
| `help` | [x] | `String` | `String` or `LowCardinality(String)` | The description of a metric |
Any row inserted into a `TimeSeries` table will be in fact stored in those three target tables.
A `TimeSeries` table contains all those columns from the [data]{#data-table}, [tags]{#tags-table}, [metrics]{#metrics-table} tables.
## Creation {#creation}
There are multiple ways to create a table with the `TimeSeries` table engine.
The simplest statement
``` sql
CREATE TABLE my_table ENGINE=TimeSeries
```
will actually create the following table (you can see that by executing `SHOW CREATE TABLE my_table`):
``` sql
CREATE TABLE my_table
(
`id` UUID DEFAULT reinterpretAsUUID(sipHash128(metric_name, all_tags)),
`timestamp` DateTime64(3),
`value` Float64,
`metric_name` LowCardinality(String),
`tags` Map(LowCardinality(String), String),
`all_tags` Map(String, String),
`min_time` Nullable(DateTime64(3)),
`max_time` Nullable(DateTime64(3)),
`metric_family_name` String,
`type` String,
`unit` String,
`help` String
)
ENGINE = TimeSeries
DATA ENGINE = MergeTree ORDER BY (id, timestamp)
DATA INNER UUID '01234567-89ab-cdef-0123-456789abcdef'
TAGS ENGINE = AggregatingMergeTree PRIMARY KEY metric_name ORDER BY (metric_name, id)
TAGS INNER UUID '01234567-89ab-cdef-0123-456789abcdef'
METRICS ENGINE = ReplacingMergeTree ORDER BY metric_family_name
METRICS INNER UUID '01234567-89ab-cdef-0123-456789abcdef'
```
So the columns were generated automatically and also there are three inner UUIDs in this statement -
one per each inner target table that was created.
(Inner UUIDs are not shown normally until setting
[show_table_uuid_in_table_create_query_if_not_nil](../../../operations/settings/settings#show_table_uuid_in_table_create_query_if_not_nil)
is set.)
Inner target tables have names like `.inner_id.data.xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx`,
`.inner_id.tags.xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx`, `.inner_id.metrics.xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx`
and each target table has columns which is a subset of the columns of the main `TimeSeries` table:
``` sql
CREATE TABLE default.`.inner_id.data.xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx`
(
`id` UUID,
`timestamp` DateTime64(3),
`value` Float64
)
ENGINE = MergeTree
ORDER BY (id, timestamp)
```
``` sql
CREATE TABLE default.`.inner_id.tags.xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx`
(
`id` UUID DEFAULT reinterpretAsUUID(sipHash128(metric_name, all_tags)),
`metric_name` LowCardinality(String),
`tags` Map(LowCardinality(String), String),
`all_tags` Map(String, String) EPHEMERAL,
`min_time` SimpleAggregateFunction(min, Nullable(DateTime64(3))),
`max_time` SimpleAggregateFunction(max, Nullable(DateTime64(3)))
)
ENGINE = AggregatingMergeTree
PRIMARY KEY metric_name
ORDER BY (metric_name, id)
```
``` sql
CREATE TABLE default.`.inner_id.metrics.xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx`
(
`metric_family_name` String,
`type` String,
`unit` String,
`help` String
)
ENGINE = ReplacingMergeTree
ORDER BY metric_family_name
```
## Adjusting types of columns {#adjusting-column-types}
You can adjust the types of almost any column of the inner target tables by specifying them explicitly
while defining the main table. For example,
``` sql
CREATE TABLE my_table
(
timestamp DateTime64(6)
) ENGINE=TimeSeries
```
will make the inner [data]{#data-table} table store timestamp in microseconds instead of milliseconds:
``` sql
CREATE TABLE default.`.inner_id.data.xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx`
(
`id` UUID,
`timestamp` DateTime64(6),
`value` Float64
)
ENGINE = MergeTree
ORDER BY (id, timestamp)
```
## The `id` column {#id-column}
The `id` column contains identifiers, every identifier is calculated for a combination of a metric name and tags.
The DEFAULT expression for the `id` column is an expression which will be used to calculate such identifiers.
Both the type of the `id` column and that expression can be adjusted by specifying them explicitly:
``` sql
CREATE TABLE my_table
(
id UInt64 DEFAULT sipHash64(metric_name, all_tags)
) ENGINE=TimeSeries
```
## The `tags` and `all_tags` columns {#tags-and-all-tags}
There are two columns containing maps of tags - `tags` and `all_tags`. In this example they mean the same, however they can be different
if setting `tags_to_columns` is used. This setting allows to specify that a specific tag should be stored in a separate column instead of storing
in a map inside the `tags` column:
``` sql
CREATE TABLE my_table ENGINE=TimeSeries SETTINGS = {'instance': 'instance', 'job': 'job'}
```
This statement will add columns
```
`instance` String,
`job` String
```
to the definition of both `my_table` and its inner [tags]{#tags-table} target table. In this case the `tags` column will not contain tags `instance` and `job`,
but the `all_tags` column will contain them. The `all_tags` column is ephemeral and its only purpose to be used in the DEFAULT expression
for the `id` column.
The types of columns can be adjusted by specifying them explicitly:
``` sql
CREATE TABLE my_table (instance LowCardinality(String), job LowCardinality(Nullable(String)))
ENGINE=TimeSeries SETTINGS = {'instance': 'instance', 'job': 'job'}
```
## Table engines of inner target tables {#inner-table-engines}
By default inner target tables use the following table engines:
- the [data]{#data-table} table uses [MergeTree](../mergetree-family/mergetree);
- the [tags]{#tags-table} table uses [AggregatingMergeTree](../mergetree-family/aggregatingmergetree) because the same data is often inserted multiple times to this table so we need a way
to remove duplicates, and also because it's required to do aggregation for columns `min_time` and `max_time`;
- the [metrics]{#metrics-table} table uses [ReplacingMergeTree](../mergetree-family/replacingmergetree) because the same data is often inserted multiple times to this table so we need a way
to remove duplicates.
Other table engines also can be used for inner target tables if it's specified so:
``` sql
CREATE TABLE my_table ENGINE=TimeSeries
DATA ENGINE=ReplicatedMergeTree
TAGS ENGINE=ReplicatedAggregatingMergeTree
METRICS ENGINE=ReplicatedReplacingMergeTree
```
## External target tables {#external-target-tables}
It's possible to make a `TimeSeries` table use a manually created table:
``` sql
CREATE TABLE data_for_my_table
(
`id` UUID,
`timestamp` DateTime64(3),
`value` Float64
)
ENGINE = MergeTree
ORDER BY (id, timestamp);
CREATE TABLE tags_for_my_table ...
CREATE TABLE metrics_for_my_table ...
CREATE TABLE my_table ENGINE=TimeSeries DATA data_for_my_table TAGS tags_for_my_table METRICS metrics_for_my_table;
```
## Settings {#settings}
Here is a list of settings which can be specified while defining a `TimeSeries` table:
| Name | Type | Default | Description |
|---|---|---|---|
| `tags_to_columns` | Map | {} | Map specifying which tags should be put to separate columns in the [tags]{#tags-table} table. Syntax: `{'tag1': 'column1', 'tag2' : column2, ...}` |
| `use_all_tags_column_to_generate_id` | Bool | true | When generating an expression to calculate an identifier of a time series, this flag enables using the `all_tags` column in that calculation |
| `store_min_time_and_max_time` | Bool | true | If set to true then the table will store `min_time` and `max_time` for each time series |
| `aggregate_min_time_and_max_time` | Bool | true | When creating an inner target `tags` table, this flag enables using `SimpleAggregateFunction(min, Nullable(DateTime64(3)))` instead of just `Nullable(DateTime64(3))` as the type of the `min_time` column, and the same for the `max_time` column |
| `filter_by_min_time_and_max_time` | Bool | true | If set to true then the table will use the `min_time` and `max_time` columns for filtering time series |
# Functions {#functions}
Here is a list of functions supporting a `TimeSeries` table as an argument:
- [timeSeriesData](../../../sql-reference/table-functions/timeSeriesData.md)
- [timeSeriesTags](../../../sql-reference/table-functions/timeSeriesTags.md)
- [timeSeriesMetrics](../../../sql-reference/table-functions/timeSeriesMetrics.md)

View File

@ -1005,7 +1005,7 @@ They can be used for prewhere optimization only if we enable `set allow_statisti
## Column-level Settings {#column-level-settings}
Certain MergeTree settings can be override at column level:
Certain MergeTree settings can be overridden at column level:
- `max_compress_block_size` — Maximum size of blocks of uncompressed data before compressing for writing to a table.
- `min_compress_block_size` — Minimum size of blocks of uncompressed data required for compression when writing the next mark.

View File

@ -0,0 +1,160 @@
---
slug: /en/interfaces/prometheus
sidebar_position: 19
sidebar_label: Prometheus protocols
---
# Prometheus protocols
## Exposing metrics {#expose}
:::note
ClickHouse Cloud does not currently support connecting to Prometheus. To be notified when this feature is supported, please contact support@clickhouse.com.
:::
ClickHouse can expose its own metrics for scraping from Prometheus:
```xml
<prometheus>
<port>9363</port>
<endpoint>/metrics</endpoint>
<metrics>true</metrics>
<asynchronous_metrics>true</asynchronous_metrics>
<events>true</events>
<errors>true</errors>
</prometheus>
Section `<prometheus.handlers>` can be used to make more extended handlers.
This section is similar to [<http_handlers>](/en/interfaces/http) but works for prometheus protocols:
```xml
<prometheus>
<port>9363</port>
<handlers>
<my_rule_1>
<url>/metrics</url>
<handler>
<type>expose_metrics</type>
<metrics>true</metrics>
<asynchronous_metrics>true</asynchronous_metrics>
<events>true</events>
<errors>true</errors>
</handler>
</my_rule_1>
</handlers>
</prometheus>
```
Settings:
| Name | Default | Description |
|---|---|---|---|
| `port` | none | Port for serving the exposing metrics protocol. |
| `endpoint` | `/metrics` | HTTP endpoint for scraping metrics by prometheus server. Starts with `/`. Should not be used with the `<handlers>` section. |
| `url` / `headers` / `method` | none | Filters used to find a matching handler for a request. Similar to the fields with the same names in the [<http_handlers>](/en/interfaces/http) section. |
| `metrics` | true | Expose metrics from the [system.metrics](/en/operations/system-tables/metrics) table. |
| `asynchronous_metrics` | true | Expose current metrics values from the [system.asynchronous_metrics](/en/operations/system-tables/asynchronous_metrics) table. |
| `events` | true | Expose metrics from the [system.events](/en/operations/system-tables/events) table. |
| `errors` | true | Expose the number of errors by error codes occurred since the last server restart. This information could be obtained from the [system.errors](/en/operations/system-tables/errors) as well. |
Check (replace `127.0.0.1` with the IP addr or hostname of your ClickHouse server):
```bash
curl 127.0.0.1:9363/metrics
```
## Remote-write protocol {#remote-write}
ClickHouse supports the [remote-write](https://prometheus.io/docs/specs/remote_write_spec/) protocol.
Data are received by this protocol and written to a [TimeSeries](/en/engines/table-engines/special/time_series) table
(which should be created beforehand).
```xml
<prometheus>
<port>9363</port>
<handlers>
<my_rule_1>
<url>/write</url>
<handler>
<type>remote_write</type
<database>db_name</database>
<table>time_series_table</table>
</handler>
</my_rule_1>
</handlers>
</prometheus>
```
Settings:
| Name | Default | Description |
|---|---|---|---|
| `port` | none | Port for serving the `remote-write` protocol. |
| `url` / `headers` / `method` | none | Filters used to find a matching handler for a request. Similar to the fields with the same names in the [<http_handlers>](/en/interfaces/http) section. |
| `table` | none | The name of a [TimeSeries](/en/engines/table-engines/special/time_series) table to write data received by the `remote-write` protocol. This name can optionally contain the name of a database too. |
| `database` | none | The name of a database where the table specified in the `table` setting is located if it's not specified in the `table` setting. |
## Remote-read protocol {#remote-read}
ClickHouse supports the [remote-read](https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/) protocol.
Data are read from a [TimeSeries](/en/engines/table-engines/special/time_series) table and sent via this protocol.
```xml
<prometheus>
<port>9363</port>
<handlers>
<my_rule_1>
<url>/read</url>
<handler>
<type>remote_read</type
<database>db_name</database>
<table>time_series_table</table>
</handler>
</my_rule_1>
</handlers>
</prometheus>
```
Settings:
| Name | Default | Description |
|---|---|---|---|
| `port` | none | Port for serving the `remote-read` protocol. |
| `url` / `headers` / `method` | none | Filters used to find a matching handler for a request. Similar to the fields with the same names in the [<http_handlers>](/en/interfaces/http) section. |
| `table` | none | The name of a [TimeSeries](/en/engines/table-engines/special/time_series) table to read data to send by the `remote-read` protocol. This name can optionally contain the name of a database too. |
| `database` | none | The name of a database where the table specified in the `table` setting is located if it's not specified in the `table` setting. |
## Configuration for multiple protocols {#multiple-protocols}
Multiple protocols can be specified together in one place:
```xml
<prometheus>
<port>9363</port>
<handlers>
<my_rule_1>
<url>/metrics</url>
<handler>
<type>expose_metrics</type>
<metrics>true</metrics>
<asynchronous_metrics>true</asynchronous_metrics>
<events>true</events>
<errors>true</errors>
</handler>
</my_rule_1>
<my_rule_2>
<url>/write</url>
<handler>
<type>remote_write</type
<table>db_name.time_series_table</table>
</handler>
</my_rule_2>
<my_rule_3>
<url>/read</url>
<handler>
<type>remote_read</type
<table>db_name.time_series_table</table>
</handler>
</my_rule_3>
</handlers>
</prometheus>
```

View File

@ -2112,48 +2112,6 @@ The trailing slash is mandatory.
<path>/var/lib/clickhouse/</path>
```
## Prometheus {#prometheus}
:::note
ClickHouse Cloud does not currently support connecting to Prometheus. To be notified when this feature is supported, please contact support@clickhouse.com.
:::
Exposing metrics data for scraping from [Prometheus](https://prometheus.io).
Settings:
- `endpoint` HTTP endpoint for scraping metrics by prometheus server. Start from /.
- `port` Port for `endpoint`.
- `metrics` Expose metrics from the [system.metrics](../../operations/system-tables/metrics.md#system_tables-metrics) table.
- `events` Expose metrics from the [system.events](../../operations/system-tables/events.md#system_tables-events) table.
- `asynchronous_metrics` Expose current metrics values from the [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) table.
- `errors` - Expose the number of errors by error codes occurred since the last server restart. This information could be obtained from the [system.errors](../../operations/system-tables/asynchronous_metrics.md#system_tables-errors) as well.
**Example**
``` xml
<clickhouse>
<listen_host>0.0.0.0</listen_host>
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<!-- highlight-start -->
<prometheus>
<endpoint>/metrics</endpoint>
<port>9363</port>
<metrics>true</metrics>
<events>true</events>
<asynchronous_metrics>true</asynchronous_metrics>
<errors>true</errors>
</prometheus>
<!-- highlight-end -->
</clickhouse>
```
Check (replace `127.0.0.1` with the IP addr or hostname of your ClickHouse server):
```bash
curl 127.0.0.1:9363/metrics
```
## query_log {#query-log}
Setting for logging queries received with the [log_queries=1](../../operations/settings/settings.md) setting.

View File

@ -5626,3 +5626,14 @@ Default value: `False`
Disable all insert and mutations (alter table update / alter table delete / alter table drop partition). Set to true, can make this node focus on reading queries.
Default value: `false`.
## allow_experimental_time_series_table {#allow-experimental-time-series-table}
Allows creation of tables with the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine.
Possible values:
- 0 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is disabled.
- 1 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is enabled.
Default value: `0`.

View File

@ -3,7 +3,7 @@ slug: /en/operations/system-tables/trace_log
---
# trace_log
Contains stack traces collected by the sampling query profiler.
Contains stack traces collected by the [sampling query profiler](../../operations/optimizing-performance/sampling-query-profiler.md).
ClickHouse creates this table when the [trace_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-trace_log) server configuration section is set. Also see settings: [query_profiler_real_time_period_ns](../../operations/settings/settings.md#query_profiler_real_time_period_ns), [query_profiler_cpu_time_period_ns](../../operations/settings/settings.md#query_profiler_cpu_time_period_ns), [memory_profiler_step](../../operations/settings/settings.md#memory_profiler_step),
[memory_profiler_sample_probability](../../operations/settings/settings.md#memory_profiler_sample_probability), [trace_profile_events](../../operations/settings/settings.md#trace_profile_events).

View File

@ -9,6 +9,7 @@ The following operations with [partitions](/docs/en/engines/table-engines/merget
- [DETACH PARTITION\|PART](#detach-partitionpart) — Moves a partition or part to the `detached` directory and forget it.
- [DROP PARTITION\|PART](#drop-partitionpart) — Deletes a partition or part.
- [DROP DETACHED PARTITION\|PART](#drop-detached-partitionpart) - Delete a part or all parts of a partition from `detached`.
- [FORGET PARTITION](#forget-partition) — Deletes a partition metadata from zookeeper if it's empty.
- [ATTACH PARTITION\|PART](#attach-partitionpart) — Adds a partition or part from the `detached` directory to the table.
- [ATTACH PARTITION FROM](#attach-partition-from) — Copies the data partition from one table to another and adds.
@ -68,7 +69,7 @@ ALTER TABLE mt DROP PART 'all_4_4_0';
## DROP DETACHED PARTITION\|PART
``` sql
ALTER TABLE table_name [ON CLUSTER cluster] DROP DETACHED PARTITION|PART partition_expr
ALTER TABLE table_name [ON CLUSTER cluster] DROP DETACHED PARTITION|PART ALL|partition_expr
```
Removes the specified part or all parts of the specified partition from `detached`.

View File

@ -0,0 +1,36 @@
---
slug: /en/sql-reference/table-functions/fuzzQuery
sidebar_position: 75
sidebar_label: fuzzQuery
---
# fuzzQuery
Perturbs the given query string with random variations.
``` sql
fuzzQuery(query[, max_query_length[, random_seed]])
```
**Arguments**
- `query` (String) - The source query to perform the fuzzing on.
- `max_query_length` (UInt64) - A maximum length the query can get during the fuzzing process.
- `random_seed` (UInt64) - A random seed for producing stable results.
**Returned Value**
A table object with a single column containing perturbed query strings.
## Usage Example
``` sql
SELECT * FROM fuzzQuery('SELECT materialize(\'a\' AS key) GROUP BY key') LIMIT 2;
```
```
┌─query──────────────────────────────────────────────────────────┐
1. │ SELECT 'a' AS key GROUP BY key │
2. │ EXPLAIN PIPELINE compact = true SELECT 'a' AS key GROUP BY key │
└────────────────────────────────────────────────────────────────┘
```

View File

@ -44,6 +44,7 @@ LIMIT 2
Paths may use globbing. Files must match the whole path pattern, not only the suffix or prefix.
- `*` — Represents arbitrarily many characters except `/` but including the empty string.
- `**` — Represents all files inside a folder recursively.
- `?` — Represents an arbitrary single character.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`. The strings can contain the `/` symbol.
- `{N..M}` — Represents any number `>= N` and `<= M`.

View File

@ -0,0 +1,28 @@
---
slug: /en/sql-reference/table-functions/timeSeriesData
sidebar_position: 145
sidebar_label: timeSeriesData
---
# timeSeriesData
`timeSeriesData(db_name.time_series_table)` - Returns the [data](../../engines/table-engines/integrations/time-series.md#data-table) table
used by table `db_name.time_series_table` which table engine is [TimeSeries](../../engines/table-engines/integrations/time-series.md):
``` sql
CREATE TABLE db_name.time_series_table ENGINE=TimeSeries DATA data_table
```
The function also works if the _data_ table is inner:
``` sql
CREATE TABLE db_name.time_series_table ENGINE=TimeSeries DATA INNER UUID '01234567-89ab-cdef-0123-456789abcdef'
```
The following queries are equivalent:
``` sql
SELECT * FROM timeSeriesData(db_name.time_series_table);
SELECT * FROM timeSeriesData('db_name.time_series_table');
SELECT * FROM timeSeriesData('db_name', 'time_series_table');
```

View File

@ -0,0 +1,28 @@
---
slug: /en/sql-reference/table-functions/timeSeriesMetrics
sidebar_position: 145
sidebar_label: timeSeriesMetrics
---
# timeSeriesMetrics
`timeSeriesMetrics(db_name.time_series_table)` - Returns the [metrics](../../engines/table-engines/integrations/time-series.md#metrics-table) table
used by table `db_name.time_series_table` which table engine is [TimeSeries](../../engines/table-engines/integrations/time-series.md):
``` sql
CREATE TABLE db_name.time_series_table ENGINE=TimeSeries METRICS metrics_table
```
The function also works if the _metrics_ table is inner:
``` sql
CREATE TABLE db_name.time_series_table ENGINE=TimeSeries METRICS INNER UUID '01234567-89ab-cdef-0123-456789abcdef'
```
The following queries are equivalent:
``` sql
SELECT * FROM timeSeriesMetrics(db_name.time_series_table);
SELECT * FROM timeSeriesMetrics('db_name.time_series_table');
SELECT * FROM timeSeriesMetrics('db_name', 'time_series_table');
```

View File

@ -0,0 +1,28 @@
---
slug: /en/sql-reference/table-functions/timeSeriesTags
sidebar_position: 145
sidebar_label: timeSeriesTags
---
# timeSeriesTags
`timeSeriesTags(db_name.time_series_table)` - Returns the [tags](../../engines/table-engines/integrations/time-series.md#tags-table) table
used by table `db_name.time_series_table` which table engine is [TimeSeries](../../engines/table-engines/integrations/time-series.md):
``` sql
CREATE TABLE db_name.time_series_table ENGINE=TimeSeries TAGS tags_table
```
The function also works if the _tags_ table is inner:
``` sql
CREATE TABLE db_name.time_series_table ENGINE=TimeSeries TAGS INNER UUID '01234567-89ab-cdef-0123-456789abcdef'
```
The following queries are equivalent:
``` sql
SELECT * FROM timeSeriesTags(db_name.time_series_table);
SELECT * FROM timeSeriesTags('db_name.time_series_table');
SELECT * FROM timeSeriesTags('db_name', 'time_series_table');
```

View File

@ -1,7 +1,7 @@
---
slug: /en/sql-reference/window-functions/lagInFrame
sidebar_label: lagInFrame
sidebar_position: 8
sidebar_position: 9
---
# lagInFrame

View File

@ -1,7 +1,7 @@
---
slug: /en/sql-reference/window-functions/leadInFrame
sidebar_label: leadInFrame
sidebar_position: 9
sidebar_position: 10
---
# leadInFrame

View File

@ -0,0 +1,72 @@
---
slug: /en/sql-reference/window-functions/percent_rank
sidebar_label: percent_rank
sidebar_position: 8
---
# percent_rank
returns the relative rank (i.e. percentile) of rows within a window partition.
**Syntax**
Alias: `percentRank` (case-sensitive)
```sql
percent_rank (column_name)
OVER ([[PARTITION BY grouping_column] [ORDER BY sorting_column]
[RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] | [window_name])
FROM table_name
WINDOW window_name as ([PARTITION BY grouping_column] [ORDER BY sorting_column] RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
```
The default and required window frame definition is `RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING`.
For more detail on window function syntax see: [Window Functions - Syntax](./index.md/#syntax).
**Example**
Query:
```sql
CREATE TABLE salaries
(
`team` String,
`player` String,
`salary` UInt32,
`position` String
)
Engine = Memory;
INSERT INTO salaries FORMAT Values
('Port Elizabeth Barbarians', 'Gary Chen', 195000, 'F'),
('New Coreystad Archdukes', 'Charles Juarez', 190000, 'F'),
('Port Elizabeth Barbarians', 'Michael Stanley', 150000, 'D'),
('New Coreystad Archdukes', 'Scott Harrison', 150000, 'D'),
('Port Elizabeth Barbarians', 'Robert George', 195000, 'M'),
('South Hampton Seagulls', 'Douglas Benson', 150000, 'M'),
('South Hampton Seagulls', 'James Henderson', 140000, 'M');
```
```sql
SELECT player, salary,
percent_rank() OVER (ORDER BY salary DESC) AS percent_rank
FROM salaries;
```
Result:
```response
┌─player──────────┬─salary─┬───────percent_rank─┐
1. │ Gary Chen │ 195000 │ 0 │
2. │ Robert George │ 195000 │ 0 │
3. │ Charles Juarez │ 190000 │ 0.3333333333333333 │
4. │ Michael Stanley │ 150000 │ 0.5 │
5. │ Scott Harrison │ 150000 │ 0.5 │
6. │ Douglas Benson │ 150000 │ 0.5 │
7. │ James Henderson │ 140000 │ 1 │
└─────────────────┴────────┴────────────────────┘
```

View File

@ -81,6 +81,7 @@ SELECT * FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 U
Обрабатываться будут те и только те файлы, которые существуют в файловой системе и удовлетворяют всему шаблону пути.
- `*` — заменяет любое количество любых символов кроме `/`, включая отсутствие символов.
- `**` — Заменяет любое количество любых символов, включая `/`, то есть осуществляет рекурсивный поиск по вложенным директориям.
- `?` — заменяет ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`. Эти строки также могут содержать символ `/`.
- `{N..M}` — заменяет любое число в интервале от `N` до `M` включительно (может содержать ведущие нули).

View File

@ -47,6 +47,7 @@ LIMIT 2
- `*` — Заменяет любое количество любых символов (кроме `/`), включая отсутствие символов.
- `**` — Заменяет любое количество любых символов, включая `/`, то есть осуществляет рекурсивный поиск по вложенным директориям.
- `?` — Заменяет ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — Заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`. Эти строки также могут содержать символ `/`.
- `{N..M}` — Заменяет любое число в интервале от `N` до `M` включительно (может содержать ведущие нули).

View File

@ -11,7 +11,10 @@ class Client : public ClientApplicationBase
public:
using Arguments = ClientApplicationBase::Arguments;
Client() = default;
Client()
{
fuzzer = QueryFuzzer(randomSeed(), &std::cout, &std::cerr);
}
void initialize(Poco::Util::Application & self) override;

View File

@ -38,7 +38,7 @@
#include <Server/HTTP/HTTPServer.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/KeeperReadinessHandler.h>
#include <Server/PrometheusMetricsWriter.h>
#include <Server/PrometheusRequestHandlerFactory.h>
#include <Server/TCPServer.h>
#include "Core/Defines.h"
@ -509,14 +509,13 @@ try
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(my_http_context->getReceiveTimeout());
socket.setSendTimeout(my_http_context->getSendTimeout());
auto metrics_writer = std::make_shared<KeeperPrometheusMetricsWriter>(config, "prometheus", async_metrics);
servers->emplace_back(
listen_host,
port_name,
"Prometheus: http://" + address.toString(),
std::make_unique<HTTPServer>(
std::move(my_http_context),
createPrometheusMainHandlerFactory(*this, config_getter(), metrics_writer, "PrometheusHandler-factory"),
createKeeperPrometheusHandlerFactory(*this, config_getter(), async_metrics, "PrometheusHandler-factory"),
server_pool,
socket,
http_params));

View File

@ -14,7 +14,6 @@
#include <Databases/registerDatabases.h>
#include <Databases/DatabaseFilesystem.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabasesOverlay.h>
#include <Storages/System/attachSystemTables.h>
#include <Storages/System/attachInformationSchemaTables.h>
@ -51,6 +50,7 @@
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Formats/registerFormats.h>
#include <boost/algorithm/string/replace.hpp>
#include <boost/program_options/options_description.hpp>
#include <base/argsToConfig.h>
#include <filesystem>
@ -216,12 +216,12 @@ static DatabasePtr createMemoryDatabaseIfNotExists(ContextPtr context, const Str
return system_database;
}
static DatabasePtr createClickHouseLocalDatabaseOverlay(const String & name_, ContextPtr context)
static DatabasePtr createClickHouseLocalDatabaseOverlay(const String & name_, ContextPtr context_)
{
auto overlay = std::make_shared<DatabasesOverlay>(name_, context);
overlay->registerNextDatabase(std::make_shared<DatabaseAtomic>(name_, fs::weakly_canonical(context->getPath()), UUIDHelpers::generateV4(), context));
overlay->registerNextDatabase(std::make_shared<DatabaseFilesystem>(name_, "", context));
return overlay;
auto databaseCombiner = std::make_shared<DatabasesOverlay>(name_, context_);
databaseCombiner->registerNextDatabase(std::make_shared<DatabaseFilesystem>(name_, "", context_));
databaseCombiner->registerNextDatabase(std::make_shared<DatabaseMemory>(name_, context_));
return databaseCombiner;
}
/// If path is specified and not empty, will try to setup server environment and load existing metadata
@ -367,7 +367,7 @@ std::string LocalServer::getInitialCreateTableQuery()
else
table_structure = "(" + table_structure + ")";
return fmt::format("CREATE TEMPORARY TABLE {} {} ENGINE = File({}, {});",
return fmt::format("CREATE TABLE {} {} ENGINE = File({}, {});",
table_name, table_structure, data_format, table_file);
}
@ -761,12 +761,7 @@ void LocalServer::processConfig()
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
std::string default_database = server_settings.default_database;
{
DatabasePtr database = createClickHouseLocalDatabaseOverlay(default_database, global_context);
if (UUID uuid = database->getUUID(); uuid != UUIDHelpers::Nil)
DatabaseCatalog::instance().addUUIDMapping(uuid);
DatabaseCatalog::instance().attachDatabase(default_database, database);
}
DatabaseCatalog::instance().attachDatabase(default_database, createClickHouseLocalDatabaseOverlay(default_database, global_context));
global_context->setCurrentDatabase(default_database);
if (getClientConfiguration().has("path"))

View File

@ -814,10 +814,11 @@ try
const size_t physical_server_memory = getMemoryAmount();
LOG_INFO(log, "Available RAM: {}; physical cores: {}; logical cores: {}.",
LOG_INFO(log, "Available RAM: {}; logical cores: {}; used cores: {}.",
formatReadableSizeWithBinarySuffix(physical_server_memory),
getNumberOfPhysicalCPUCores(), // on ARM processors it can show only enabled at current moment cores
std::thread::hardware_concurrency());
std::thread::hardware_concurrency(),
getNumberOfPhysicalCPUCores() // on ARM processors it can show only enabled at current moment cores
);
#if defined(__x86_64__)
String cpu_info;
@ -1623,7 +1624,7 @@ try
concurrent_threads_soft_limit = new_server_settings.concurrent_threads_soft_limit_num;
if (new_server_settings.concurrent_threads_soft_limit_ratio_to_cores > 0)
{
auto value = new_server_settings.concurrent_threads_soft_limit_ratio_to_cores * std::thread::hardware_concurrency();
auto value = new_server_settings.concurrent_threads_soft_limit_ratio_to_cores * getNumberOfPhysicalCPUCores();
if (value > 0 && value < concurrent_threads_soft_limit)
concurrent_threads_soft_limit = value;
}

View File

@ -0,0 +1,117 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <Interpreters/WindowDescription.h>
#include <Common/AlignedBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class WindowTransform;
// Interface for true window functions. It's not much of an interface, they just
// accept the guts of WindowTransform and do 'something'. Given a small number of
// true window functions, and the fact that the WindowTransform internals are
// pretty much well-defined in domain terms (e.g. frame boundaries), this is
// somewhat acceptable.
class IWindowFunction
{
public:
virtual ~IWindowFunction() = default;
// Must insert the result for current_row.
virtual void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const = 0;
virtual std::optional<WindowFrame> getDefaultFrame() const { return {}; }
virtual ColumnPtr castColumn(const Columns &, const std::vector<size_t> &) { return nullptr; }
/// Is the frame type supported by this function.
virtual bool checkWindowFrameType(const WindowTransform * /*transform*/) const { return true; }
};
// Runtime data for computing one window function.
struct WindowFunctionWorkspace
{
AggregateFunctionPtr aggregate_function;
// Cached value of aggregate function isState virtual method
bool is_aggregate_function_state = false;
// This field is set for pure window functions. When set, we ignore the
// window_function.aggregate_function, and work through this interface
// instead.
IWindowFunction * window_function_impl = nullptr;
std::vector<size_t> argument_column_indices;
// Will not be initialized for a pure window function.
mutable AlignedBuffer aggregate_function_state;
// Argument columns. Be careful, this is a per-block cache.
std::vector<const IColumn *> argument_columns;
UInt64 cached_block_number = std::numeric_limits<UInt64>::max();
};
// A basic implementation for a true window function. It pretends to be an
// aggregate function, but refuses to work as such.
struct WindowFunction : public IAggregateFunctionHelper<WindowFunction>, public IWindowFunction
{
std::string name;
WindowFunction(
const std::string & name_, const DataTypes & argument_types_, const Array & parameters_, const DataTypePtr & result_type_)
: IAggregateFunctionHelper<WindowFunction>(argument_types_, parameters_, result_type_), name(name_)
{
}
bool isOnlyWindowFunction() const override { return true; }
[[noreturn]] void fail() const
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "The function '{}' can only be used as a window function, not as an aggregate function", getName());
}
String getName() const override { return name; }
void create(AggregateDataPtr __restrict) const override { }
void destroy(AggregateDataPtr __restrict) const noexcept override { }
bool hasTrivialDestructor() const override { return true; }
size_t sizeOfData() const override { return 0; }
size_t alignOfData() const override { return 1; }
void add(AggregateDataPtr __restrict, const IColumn **, size_t, Arena *) const override { fail(); }
void merge(AggregateDataPtr __restrict, ConstAggregateDataPtr, Arena *) const override { fail(); }
void serialize(ConstAggregateDataPtr __restrict, WriteBuffer &, std::optional<size_t>) const override { fail(); }
void deserialize(AggregateDataPtr __restrict, ReadBuffer &, std::optional<size_t>, Arena *) const override { fail(); }
void insertResultInto(AggregateDataPtr __restrict, IColumn &, Arena *) const override { fail(); }
};
template <typename State>
struct StatefulWindowFunction : public WindowFunction
{
StatefulWindowFunction(
const std::string & name_, const DataTypes & argument_types_, const Array & parameters_, const DataTypePtr & result_type_)
: WindowFunction(name_, argument_types_, parameters_, result_type_)
{
}
size_t sizeOfData() const override { return sizeof(State); }
size_t alignOfData() const override { return 1; }
void create(AggregateDataPtr __restrict place) const override { new (place) State(); }
void destroy(AggregateDataPtr __restrict place) const noexcept override { reinterpret_cast<State *>(place)->~State(); }
bool hasTrivialDestructor() const override { return std::is_trivially_destructible_v<State>; }
State & getState(const WindowFunctionWorkspace & workspace) const
{
return *reinterpret_cast<State *>(workspace.aggregate_function_state.data());
}
};
}

View File

@ -78,6 +78,7 @@ add_headers_and_sources(clickhouse_common_io Common/Scheduler)
add_headers_and_sources(clickhouse_common_io Common/Scheduler/Nodes)
add_headers_and_sources(clickhouse_common_io IO)
add_headers_and_sources(clickhouse_common_io IO/Archives)
add_headers_and_sources(clickhouse_common_io IO/Protobuf)
add_headers_and_sources(clickhouse_common_io IO/S3)
add_headers_and_sources(clickhouse_common_io IO/AzureBlobStorage)
list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp)
@ -225,6 +226,7 @@ add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_storages_windowview Storages/WindowView)
add_object_library(clickhouse_storages_s3queue Storages/ObjectStorageQueue)
add_object_library(clickhouse_storages_materializedview Storages/MaterializedView)
add_object_library(clickhouse_storages_time_series Storages/TimeSeries)
add_object_library(clickhouse_client Client)
# Always compile this file with the highest possible level of optimizations, even in Debug builds.
# https://github.com/ClickHouse/ClickHouse/issues/65745
@ -469,6 +471,7 @@ dbms_target_link_libraries (PUBLIC ch_contrib::sparsehash)
if (TARGET ch_contrib::protobuf)
dbms_target_link_libraries (PRIVATE ch_contrib::protobuf)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::protobuf)
endif ()
if (TARGET clickhouse_grpc_protos)

View File

@ -2,7 +2,7 @@
#include <Client/Suggest.h>
#include <Client/QueryFuzzer.h>
#include <Common/QueryFuzzer.h>
#include <Common/DNSResolver.h>
#include <Common/InterruptListener.h>
#include <Common/ProgressIndication.h>

View File

@ -365,7 +365,7 @@ bool LocalConnection::poll(size_t)
{
while (pollImpl())
{
LOG_DEBUG(&Poco::Logger::get("LocalConnection"), "Executor timeout encountered, will retry");
LOG_TEST(&Poco::Logger::get("LocalConnection"), "Executor timeout encountered, will retry");
if (needSendProgressOrMetrics())
return true;

View File

@ -604,6 +604,10 @@
M(723, PARQUET_EXCEPTION) \
M(724, TOO_MANY_TABLES) \
M(725, TOO_MANY_DATABASES) \
M(726, UNEXPECTED_HTTP_HEADERS) \
M(727, UNEXPECTED_TABLE_ENGINE) \
M(728, UNEXPECTED_DATA_TYPE) \
M(729, ILLEGAL_TIME_SERIES_TAGS) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -60,6 +60,7 @@ static struct InitFiu
ONCE(receive_timeout_on_table_status_response) \
REGULAR(keepermap_fail_drop_data) \
REGULAR(lazy_pipe_fds_fail_close) \
PAUSEABLE(infinite_sleep) \
namespace FailPoints

View File

@ -68,22 +68,21 @@ Field QueryFuzzer::getRandomField(int type)
{
case 0:
{
return bad_int64_values[fuzz_rand() % (sizeof(bad_int64_values)
/ sizeof(*bad_int64_values))];
return bad_int64_values[fuzz_rand() % std::size(bad_int64_values)];
}
case 1:
{
static constexpr double values[]
= {NAN, INFINITY, -INFINITY, 0., -0., 0.0001, 0.5, 0.9999,
1., 1.0001, 2., 10.0001, 100.0001, 1000.0001, 1e10, 1e20,
FLT_MIN, FLT_MIN + FLT_EPSILON, FLT_MAX, FLT_MAX + FLT_EPSILON}; return values[fuzz_rand() % (sizeof(values) / sizeof(*values))];
FLT_MIN, FLT_MIN + FLT_EPSILON, FLT_MAX, FLT_MAX + FLT_EPSILON}; return values[fuzz_rand() % std::size(values)];
}
case 2:
{
static constexpr UInt64 scales[] = {0, 1, 2, 10};
return DecimalField<Decimal64>(
bad_int64_values[fuzz_rand() % (sizeof(bad_int64_values) / sizeof(*bad_int64_values))],
static_cast<UInt32>(scales[fuzz_rand() % (sizeof(scales) / sizeof(*scales))])
bad_int64_values[fuzz_rand() % std::size(bad_int64_values)],
static_cast<UInt32>(scales[fuzz_rand() % std::size(scales)])
);
}
default:
@ -165,7 +164,8 @@ Field QueryFuzzer::fuzzField(Field field)
{
size_t pos = fuzz_rand() % arr.size();
arr.erase(arr.begin() + pos);
std::cerr << "erased\n";
if (debug_stream)
*debug_stream << "erased\n";
}
if (fuzz_rand() % 5 == 0)
@ -174,12 +174,14 @@ Field QueryFuzzer::fuzzField(Field field)
{
size_t pos = fuzz_rand() % arr.size();
arr.insert(arr.begin() + pos, fuzzField(arr[pos]));
std::cerr << fmt::format("inserted (pos {})\n", pos);
if (debug_stream)
*debug_stream << fmt::format("inserted (pos {})\n", pos);
}
else
{
arr.insert(arr.begin(), getRandomField(0));
std::cerr << "inserted (0)\n";
if (debug_stream)
*debug_stream << "inserted (0)\n";
}
}
@ -197,7 +199,9 @@ Field QueryFuzzer::fuzzField(Field field)
{
size_t pos = fuzz_rand() % arr.size();
arr.erase(arr.begin() + pos);
std::cerr << "erased\n";
if (debug_stream)
*debug_stream << "erased\n";
}
if (fuzz_rand() % 5 == 0)
@ -206,12 +210,16 @@ Field QueryFuzzer::fuzzField(Field field)
{
size_t pos = fuzz_rand() % arr.size();
arr.insert(arr.begin() + pos, fuzzField(arr[pos]));
std::cerr << fmt::format("inserted (pos {})\n", pos);
if (debug_stream)
*debug_stream << fmt::format("inserted (pos {})\n", pos);
}
else
{
arr.insert(arr.begin(), getRandomField(0));
std::cerr << "inserted (0)\n";
if (debug_stream)
*debug_stream << "inserted (0)\n";
}
}
@ -344,7 +352,8 @@ void QueryFuzzer::fuzzOrderByList(IAST * ast)
}
else
{
std::cerr << "No random column.\n";
if (debug_stream)
*debug_stream << "No random column.\n";
}
}
@ -378,7 +387,8 @@ void QueryFuzzer::fuzzColumnLikeExpressionList(IAST * ast)
if (col)
impl->children.insert(pos, col);
else
std::cerr << "No random column.\n";
if (debug_stream)
*debug_stream << "No random column.\n";
}
// We don't have to recurse here to fuzz the children, this is handled by
@ -1361,11 +1371,15 @@ void QueryFuzzer::fuzzMain(ASTPtr & ast)
collectFuzzInfoMain(ast);
fuzz(ast);
std::cout << std::endl;
WriteBufferFromOStream ast_buf(std::cout, 4096);
formatAST(*ast, ast_buf, false /*highlight*/);
ast_buf.finalize();
std::cout << std::endl << std::endl;
if (out_stream)
{
*out_stream << std::endl;
WriteBufferFromOStream ast_buf(*out_stream, 4096);
formatAST(*ast, ast_buf, false /*highlight*/);
ast_buf.finalize();
*out_stream << std::endl << std::endl;
}
}
}

View File

@ -35,9 +35,31 @@ struct ASTWindowDefinition;
* queries, so you want to feed it a lot of queries to get some interesting mix
* of them. Normally we feed SQL regression tests to it.
*/
struct QueryFuzzer
class QueryFuzzer
{
pcg64 fuzz_rand{randomSeed()};
public:
explicit QueryFuzzer(pcg64 fuzz_rand_ = randomSeed(), std::ostream * out_stream_ = nullptr, std::ostream * debug_stream_ = nullptr)
: fuzz_rand(fuzz_rand_)
, out_stream(out_stream_)
, debug_stream(debug_stream_)
{
}
// This is the only function you have to call -- it will modify the passed
// ASTPtr to point to new AST with some random changes.
void fuzzMain(ASTPtr & ast);
ASTs getInsertQueriesForFuzzedTables(const String & full_query);
ASTs getDropQueriesForFuzzedTables(const ASTDropQuery & drop_query);
void notifyQueryFailed(ASTPtr ast);
static bool isSuitableForFuzzing(const ASTCreateQuery & create);
private:
pcg64 fuzz_rand;
std::ostream * out_stream = nullptr;
std::ostream * debug_stream = nullptr;
// We add elements to expression lists with fixed probability. Some elements
// are so large, that the expected number of elements we add to them is
@ -66,10 +88,6 @@ struct QueryFuzzer
std::unordered_map<std::string, size_t> index_of_fuzzed_table;
std::set<IAST::Hash> created_tables_hashes;
// This is the only function you have to call -- it will modify the passed
// ASTPtr to point to new AST with some random changes.
void fuzzMain(ASTPtr & ast);
// Various helper functions follow, normally you shouldn't have to call them.
Field getRandomField(int type);
Field fuzzField(Field field);
@ -77,9 +95,6 @@ struct QueryFuzzer
ASTPtr getRandomExpressionList();
DataTypePtr fuzzDataType(DataTypePtr type);
DataTypePtr getRandomType();
ASTs getInsertQueriesForFuzzedTables(const String & full_query);
ASTs getDropQueriesForFuzzedTables(const ASTDropQuery & drop_query);
void notifyQueryFailed(ASTPtr ast);
void replaceWithColumnLike(ASTPtr & ast);
void replaceWithTableLike(ASTPtr & ast);
void fuzzOrderByElement(ASTOrderByElement * elem);
@ -102,8 +117,6 @@ struct QueryFuzzer
void addTableLike(ASTPtr ast);
void addColumnLike(ASTPtr ast);
void collectFuzzInfoRecurse(ASTPtr ast);
static bool isSuitableForFuzzing(const ASTCreateQuery & create);
};
}

View File

@ -629,6 +629,7 @@ void HandledSignals::setupTerminateHandler()
void HandledSignals::setupCommonDeadlySignalHandlers()
{
/// SIGTSTP is added for debugging purposes. To output a stack trace of any running thread at anytime.
/// NOTE: that it is also used by clickhouse-test wrapper
addSignalHandler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP, SIGTRAP}, signalHandler, true);
#if defined(SANITIZER)

View File

@ -1,4 +1,5 @@
#pragma once
/// Get number of CPU cores without hyper-threading.
/// The calculation respects possible cgroups limits.
unsigned getNumberOfPhysicalCPUCores();

View File

@ -690,6 +690,7 @@ class IColumn;
M(UInt64, max_size_to_preallocate_for_joins, 100'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before join", 0) \
\
M(Bool, kafka_disable_num_consumers_limit, false, "Disable limit on kafka_num_consumers that depends on the number of available CPU cores", 0) \
M(Bool, allow_experimental_kafka_offsets_storage_in_keeper, false, "Allow experimental feature to store Kafka related offsets in ClickHouse Keeper. When enabled a ClickHouse Keeper path and replica name can be specified to the Kafka table engine. As a result instead of the regular Kafka engine, a new type of storage engine will be used that stores the committed offsets primarily in ClickHouse Keeper", 0) \
M(Bool, enable_software_prefetch_in_aggregation, true, "Enable use of software prefetch in aggregation", 0) \
M(Bool, allow_aggregate_partitions_independently, false, "Enable independent aggregation of partitions on separate threads when partition key suits group by key. Beneficial when number of partitions close to number of cores and partitions have roughly the same size", 0) \
M(Bool, force_aggregate_partitions_independently, false, "Force the use of optimization when it is applicable, but heuristics decided not to use it", 0) \
@ -905,6 +906,7 @@ class IColumn;
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \
M(Bool, allow_experimental_hash_functions, false, "Enable experimental hash functions", 0) \
M(Bool, allow_experimental_object_type, false, "Allow Object and JSON data types", 0) \
M(Bool, allow_experimental_time_series_table, false, "Allows experimental TimeSeries table engine", 0) \
M(Bool, allow_experimental_variant_type, false, "Allow Variant data type", 0) \
M(Bool, allow_experimental_dynamic_type, false, "Allow Dynamic data type", 0) \
M(Bool, allow_experimental_annoy_index, false, "Allows to use Annoy index. Disabled by default because this feature is experimental", 0) \

View File

@ -79,7 +79,9 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"restore_replace_external_engines_to_null", false, false, "New setting."},
{"input_format_json_max_depth", 1000000, 1000, "It was unlimited in previous versions, but that was unsafe."},
{"merge_tree_min_bytes_per_task_for_remote_reading", 4194304, 2097152, "Value is unified with `filesystem_prefetch_min_bytes_for_single_read_task`"},
{"allow_experimental_kafka_offsets_storage_in_keeper", false, false, "Allow the usage of experimental Kafka storage engine that stores the committed offsets in ClickHouse Keeper"},
{"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."},
{"allow_experimental_time_series_table", false, false, "Added new setting to allow the TimeSeries table engine"},
{"enable_analyzer", 1, 1, "Added an alias to a setting `allow_experimental_analyzer`."},
}
},

View File

@ -53,6 +53,9 @@ DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, c
, db_uuid(uuid)
{
assert(db_uuid != UUIDHelpers::Nil);
fs::create_directories(fs::path(getContext()->getPath()) / "metadata");
fs::create_directories(path_to_table_symlinks);
tryCreateMetadataSymlink();
}
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, ContextPtr context_)
@ -60,16 +63,6 @@ DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, C
{
}
void DatabaseAtomic::createDirectories()
{
if (database_atomic_directories_created.test_and_set())
return;
DatabaseOnDisk::createDirectories();
fs::create_directories(fs::path(getContext()->getPath()) / "metadata");
fs::create_directories(path_to_table_symlinks);
tryCreateMetadataSymlink();
}
String DatabaseAtomic::getTableDataPath(const String & table_name) const
{
std::lock_guard lock(mutex);
@ -106,7 +99,6 @@ void DatabaseAtomic::drop(ContextPtr)
void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name, const StoragePtr & table, const String & relative_table_path)
{
assert(relative_table_path != data_path && !relative_table_path.empty());
createDirectories();
DetachedTables not_in_use;
std::lock_guard lock(mutex);
not_in_use = cleanupDetachedTables();
@ -208,15 +200,11 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
if (exchange && !supportsAtomicRename())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "RENAME EXCHANGE is not supported");
createDirectories();
waitDatabaseStarted();
auto & other_db = dynamic_cast<DatabaseAtomic &>(to_database);
bool inside_database = this == &other_db;
if (!inside_database)
other_db.createDirectories();
String old_metadata_path = getObjectMetadataPath(table_name);
String new_metadata_path = to_database.getObjectMetadataPath(to_table_name);
@ -337,7 +325,6 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
const String & table_metadata_tmp_path, const String & table_metadata_path,
ContextPtr query_context)
{
createDirectories();
DetachedTables not_in_use;
auto table_data_path = getTableDataPath(query);
try
@ -474,9 +461,6 @@ void DatabaseAtomic::beforeLoadingMetadata(ContextMutablePtr /*context*/, Loadin
if (mode < LoadingStrictnessLevel::FORCE_RESTORE)
return;
if (!fs::exists(path_to_table_symlinks))
return;
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
{
@ -604,7 +588,6 @@ void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new
{
/// CREATE, ATTACH, DROP, DETACH and RENAME DATABASE must hold DDLGuard
createDirectories();
waitDatabaseStarted();
bool check_ref_deps = query_context->getSettingsRef().check_referential_table_dependencies;
@ -696,5 +679,4 @@ void registerDatabaseAtomic(DatabaseFactory & factory)
};
factory.registerDatabase("Atomic", create_fn);
}
}

View File

@ -76,9 +76,6 @@ protected:
using DetachedTables = std::unordered_map<UUID, StoragePtr>;
[[nodiscard]] DetachedTables cleanupDetachedTables() TSA_REQUIRES(mutex);
std::atomic_flag database_atomic_directories_created = ATOMIC_FLAG_INIT;
void createDirectories();
void tryCreateMetadataSymlink();
virtual bool allowMoveTableToOtherDatabaseEngine(IDatabase & /*to_database*/) const { return false; }

View File

@ -47,13 +47,12 @@ DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_,
: DatabaseOnDisk(name_, metadata_path_, std::filesystem::path("data") / escapeForFileName(name_) / "", "DatabaseLazy (" + name_ + ")", context_)
, expiration_time(expiration_time_)
{
createDirectories();
}
void DatabaseLazy::loadStoredObjects(ContextMutablePtr local_context, LoadingStrictnessLevel /*mode*/)
{
iterateMetadataFiles([this, &local_context](const String & file_name)
iterateMetadataFiles(local_context, [this, &local_context](const String & file_name)
{
const std::string table_name = unescapeForFileName(file_name.substr(0, file_name.size() - 4));

View File

@ -12,7 +12,7 @@ class DatabaseLazyIterator;
class Context;
/** Lazy engine of databases.
* Works like DatabaseOrdinary, but stores only recently accessed tables in memory.
* Works like DatabaseOrdinary, but stores in memory only the cache.
* Can be used only with *Log engines.
*/
class DatabaseLazy final : public DatabaseOnDisk

View File

@ -172,14 +172,7 @@ DatabaseOnDisk::DatabaseOnDisk(
, metadata_path(metadata_path_)
, data_path(data_path_)
{
}
void DatabaseOnDisk::createDirectories()
{
if (directories_created.test_and_set())
return;
fs::create_directories(std::filesystem::path(getContext()->getPath()) / data_path);
fs::create_directories(local_context->getPath() + data_path);
fs::create_directories(metadata_path);
}
@ -197,8 +190,6 @@ void DatabaseOnDisk::createTable(
const StoragePtr & table,
const ASTPtr & query)
{
createDirectories();
const auto & settings = local_context->getSettingsRef();
const auto & create = query->as<ASTCreateQuery &>();
assert(table_name == create.getTable());
@ -266,6 +257,7 @@ void DatabaseOnDisk::createTable(
}
commitCreateTable(create, table, table_metadata_tmp_path, table_metadata_path, local_context);
removeDetachedPermanentlyFlag(local_context, table_name, table_metadata_path, false);
}
@ -293,8 +285,6 @@ void DatabaseOnDisk::commitCreateTable(const ASTCreateQuery & query, const Stora
{
try
{
createDirectories();
/// Add a table to the map of known tables.
attachTable(query_context, query.getTable(), table, getTableDataPath(query));
@ -430,7 +420,6 @@ void DatabaseOnDisk::renameTable(
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Moving tables between databases of different engines is not supported");
}
createDirectories();
waitDatabaseStarted();
auto table_data_relative_path = getTableDataPath(table_name);
@ -579,14 +568,14 @@ void DatabaseOnDisk::drop(ContextPtr local_context)
assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
if (local_context->getSettingsRef().force_remove_data_recursively_on_drop)
{
(void)fs::remove_all(std::filesystem::path(getContext()->getPath()) / data_path);
(void)fs::remove_all(local_context->getPath() + getDataPath());
(void)fs::remove_all(getMetadataPath());
}
else
{
try
{
(void)fs::remove(std::filesystem::path(getContext()->getPath()) / data_path);
(void)fs::remove(local_context->getPath() + getDataPath());
(void)fs::remove(getMetadataPath());
}
catch (const fs::filesystem_error & e)
@ -624,18 +613,15 @@ time_t DatabaseOnDisk::getObjectMetadataModificationTime(const String & object_n
}
}
void DatabaseOnDisk::iterateMetadataFiles(const IteratingFunction & process_metadata_file) const
void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const IteratingFunction & process_metadata_file) const
{
if (!fs::exists(metadata_path))
return;
auto process_tmp_drop_metadata_file = [&](const String & file_name)
{
assert(getUUID() == UUIDHelpers::Nil);
static const char * tmp_drop_ext = ".sql.tmp_drop";
const std::string object_name = file_name.substr(0, file_name.size() - strlen(tmp_drop_ext));
if (fs::exists(std::filesystem::path(getContext()->getPath()) / data_path / object_name))
if (fs::exists(local_context->getPath() + getDataPath() + '/' + object_name))
{
fs::rename(getMetadataPath() + file_name, getMetadataPath() + object_name + ".sql");
LOG_WARNING(log, "Object {} was not dropped previously and will be restored", backQuote(object_name));
@ -652,7 +638,7 @@ void DatabaseOnDisk::iterateMetadataFiles(const IteratingFunction & process_meta
std::vector<std::pair<String, bool>> metadata_files;
fs::directory_iterator dir_end;
for (fs::directory_iterator dir_it(metadata_path); dir_it != dir_end; ++dir_it)
for (fs::directory_iterator dir_it(getMetadataPath()); dir_it != dir_end; ++dir_it)
{
String file_name = dir_it->path().filename();
/// For '.svn', '.gitignore' directory and similar.

View File

@ -64,7 +64,7 @@ public:
time_t getObjectMetadataModificationTime(const String & object_name) const override;
String getDataPath() const override { return data_path; }
String getTableDataPath(const String & table_name) const override { return std::filesystem::path(data_path) / escapeForFileName(table_name) / ""; }
String getTableDataPath(const String & table_name) const override { return data_path + escapeForFileName(table_name) + "/"; }
String getTableDataPath(const ASTCreateQuery & query) const override { return getTableDataPath(query.getTable()); }
String getMetadataPath() const override { return metadata_path; }
@ -83,7 +83,7 @@ protected:
using IteratingFunction = std::function<void(const String &)>;
void iterateMetadataFiles(const IteratingFunction & process_metadata_file) const;
void iterateMetadataFiles(ContextPtr context, const IteratingFunction & process_metadata_file) const;
ASTPtr getCreateTableQueryImpl(
const String & table_name,
@ -99,9 +99,6 @@ protected:
virtual void removeDetachedPermanentlyFlag(ContextPtr context, const String & table_name, const String & table_metadata_path, bool attach);
virtual void setDetachedTableNotInUseForce(const UUID & /*uuid*/) {}
std::atomic_flag directories_created = ATOMIC_FLAG_INIT;
void createDirectories();
const String metadata_path;
const String data_path;
};

View File

@ -55,7 +55,7 @@ static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
static constexpr const char * const CONVERT_TO_REPLICATED_FLAG_NAME = "convert_to_replicated";
DatabaseOrdinary::DatabaseOrdinary(const String & name_, const String & metadata_path_, ContextPtr context_)
: DatabaseOrdinary(name_, metadata_path_, std::filesystem::path("data") / escapeForFileName(name_) / "", "DatabaseOrdinary (" + name_ + ")", context_)
: DatabaseOrdinary(name_, metadata_path_, "data/" + escapeForFileName(name_) + "/", "DatabaseOrdinary (" + name_ + ")", context_)
{
}
@ -265,7 +265,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
}
};
iterateMetadataFiles(process_metadata);
iterateMetadataFiles(local_context, process_metadata);
size_t objects_in_database = metadata.parsed_tables.size() - prev_tables_count;
size_t dictionaries_in_database = metadata.total_dictionaries - prev_total_dictionaries;

View File

@ -12,6 +12,7 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/PoolId.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
@ -338,9 +339,12 @@ ClusterPtr DatabaseReplicated::getClusterImpl(bool all_groups) const
return std::make_shared<Cluster>(getContext()->getSettingsRef(), shards, params);
}
std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr & cluster_) const
ReplicasInfo DatabaseReplicated::tryGetReplicasInfo(const ClusterPtr & cluster_) const
{
Strings paths;
paths.emplace_back(fs::path(zookeeper_path) / "max_log_ptr");
const auto & addresses_with_failover = cluster_->getShardsAddresses();
const auto & shards_info = cluster_->getShardsInfo();
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
@ -349,22 +353,50 @@ std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr
{
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "log_ptr");
}
}
try
{
auto current_zookeeper = getZooKeeper();
auto res = current_zookeeper->exists(paths);
auto zk_res = current_zookeeper->tryGet(paths);
std::vector<UInt8> statuses;
statuses.resize(paths.size());
auto max_log_ptr_zk = zk_res[0];
if (max_log_ptr_zk.error != Coordination::Error::ZOK)
throw Coordination::Exception(max_log_ptr_zk.error);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error == Coordination::Error::ZOK)
statuses[i] = 1;
UInt32 max_log_ptr = parse<UInt32>(max_log_ptr_zk.data);
return statuses;
ReplicasInfo replicas_info;
replicas_info.resize((zk_res.size() - 1) / 2);
size_t global_replica_index = 0;
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
for (const auto & replica : addresses_with_failover[shard_index])
{
auto replica_active = zk_res[2 * global_replica_index + 1];
auto replica_log_ptr = zk_res[2 * global_replica_index + 2];
UInt64 recovery_time = 0;
{
std::lock_guard lock(ddl_worker_mutex);
if (replica.is_local && ddl_worker)
recovery_time = ddl_worker->getCurrentInitializationDurationMs();
}
replicas_info[global_replica_index] = ReplicaInfo{
.is_active = replica_active.error == Coordination::Error::ZOK,
.replication_lag = replica_log_ptr.error != Coordination::Error::ZNONODE ? std::optional(max_log_ptr - parse<UInt32>(replica_log_ptr.data)) : std::nullopt,
.recovery_time = recovery_time,
};
++global_replica_index;
}
}
return replicas_info;
}
catch (...)
{
@ -373,7 +405,6 @@ std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr
}
}
void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco::Util::AbstractConfiguration & config_ref)
{
const auto & config_prefix = fmt::format("named_collections.{}", collection_name);

View File

@ -1,5 +1,7 @@
#pragma once
#include <optional>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseReplicatedSettings.h>
#include <Common/ZooKeeper/ZooKeeper.h>
@ -17,6 +19,14 @@ using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
struct ReplicaInfo
{
bool is_active;
std::optional<UInt32> replication_lag;
UInt64 recovery_time;
};
using ReplicasInfo = std::vector<ReplicaInfo>;
class DatabaseReplicated : public DatabaseAtomic
{
public:
@ -84,7 +94,7 @@ public:
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, bool throw_if_noop);
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;
ReplicasInfo tryGetReplicasInfo(const ClusterPtr & cluster_) const;
void renameDatabase(ContextPtr query_context, const String & new_name) override;

View File

@ -32,6 +32,12 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db
bool DatabaseReplicatedDDLWorker::initializeMainThread()
{
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.emplace();
initialization_duration_timer->start();
}
while (!stop_flag)
{
try
@ -69,6 +75,10 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
initializeReplication();
initialized = true;
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.reset();
}
return true;
}
catch (...)
@ -78,6 +88,11 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
}
}
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.reset();
}
return false;
}
@ -459,4 +474,10 @@ UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const
return max_id.load();
}
UInt64 DatabaseReplicatedDDLWorker::getCurrentInitializationDurationMs() const
{
std::lock_guard lock(initialization_duration_timer_mutex);
return initialization_duration_timer ? initialization_duration_timer->elapsedMilliseconds() : 0;
}
}

View File

@ -36,6 +36,8 @@ public:
DatabaseReplicated * const database, bool committed = false); /// NOLINT
UInt32 getLogPointer() const;
UInt64 getCurrentInitializationDurationMs() const;
private:
bool initializeMainThread() override;
void initializeReplication();
@ -56,6 +58,9 @@ private:
ZooKeeperPtr active_node_holder_zookeeper;
/// It will remove "active" node when database is detached
zkutil::EphemeralNodeHolderPtr active_node_holder;
std::optional<Stopwatch> initialization_duration_timer;
mutable std::mutex initialization_duration_timer_mutex;
};
}

View File

@ -14,8 +14,6 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TABLE;
}
DatabasesOverlay::DatabasesOverlay(const String & name_, ContextPtr context_)
@ -126,39 +124,6 @@ StoragePtr DatabasesOverlay::detachTable(ContextPtr context_, const String & tab
getEngineName());
}
void DatabasesOverlay::renameTable(
ContextPtr current_context,
const String & name,
IDatabase & to_database,
const String & to_name,
bool exchange,
bool dictionary)
{
for (auto & db : databases)
{
if (db->isTableExist(name, current_context))
{
if (DatabasesOverlay * to_overlay_database = typeid_cast<DatabasesOverlay *>(&to_database))
{
/// Renaming from Overlay database inside itself or into another Overlay database.
/// Just use the first database in the overlay as a destination.
if (to_overlay_database->databases.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The destination Overlay database {} does not have any members", to_database.getDatabaseName());
db->renameTable(current_context, name, *to_overlay_database->databases[0], to_name, exchange, dictionary);
}
else
{
/// Renaming into a different type of database. E.g. from Overlay on top of Atomic database into just Atomic database.
db->renameTable(current_context, name, to_database, to_name, exchange, dictionary);
}
return;
}
}
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", backQuote(getDatabaseName()), backQuote(name));
}
ASTPtr DatabasesOverlay::getCreateTableQueryImpl(const String & name, ContextPtr context_, bool throw_on_error) const
{
ASTPtr result = nullptr;
@ -213,18 +178,6 @@ String DatabasesOverlay::getTableDataPath(const ASTCreateQuery & query) const
return result;
}
UUID DatabasesOverlay::getUUID() const
{
UUID result = UUIDHelpers::Nil;
for (const auto & db : databases)
{
result = db->getUUID();
if (result != UUIDHelpers::Nil)
break;
}
return result;
}
UUID DatabasesOverlay::tryGetTableUUID(const String & table_name) const
{
UUID result = UUIDHelpers::Nil;

View File

@ -35,21 +35,12 @@ public:
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void renameTable(
ContextPtr current_context,
const String & name,
IDatabase & to_database,
const String & to_name,
bool exchange,
bool dictionary) override;
ASTPtr getCreateTableQueryImpl(const String & name, ContextPtr context, bool throw_on_error) const override;
ASTPtr getCreateDatabaseQuery() const override;
String getTableDataPath(const String & table_name) const override;
String getTableDataPath(const ASTCreateQuery & query) const override;
UUID getUUID() const override;
UUID tryGetTableUUID(const String & table_name) const override;
void drop(ContextPtr context) override;

View File

@ -416,7 +416,6 @@ public:
std::lock_guard lock{mutex};
return database_name;
}
/// Get UUID of database.
virtual UUID getUUID() const { return UUIDHelpers::Nil; }

View File

@ -46,7 +46,6 @@ DatabaseMaterializedMySQL::DatabaseMaterializedMySQL(
, settings(std::move(settings_))
, materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), binlog_client_, settings.get())
{
createDirectories();
}
void DatabaseMaterializedMySQL::rethrowExceptionIfNeeded() const

View File

@ -153,12 +153,8 @@ public:
return true;
}
std::vector<double> xfreq(spec_len);
double step = 0.5 / (spec_len - 1);
for (size_t i = 0; i < spec_len; ++i)
xfreq[i] = i * step;
auto freq = xfreq[idx];
auto freq = idx * step;
period = std::round(1 / freq);
return true;

View File

@ -7,6 +7,7 @@
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/ProfileEvents.h>
#include <Common/assert_cast.h>
#include <Common/FailPoint.h>
#include <Core/Settings.h>
#include <base/sleep.h>
#include <IO/WriteHelpers.h>
@ -32,6 +33,11 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
namespace FailPoints
{
extern const char infinite_sleep[];
}
/** sleep(seconds) - the specified number of seconds sleeps each columns.
*/
@ -107,6 +113,8 @@ public:
{
/// When sleeping, the query cannot be cancelled. For ability to cancel query, we limit sleep time.
UInt64 microseconds = static_cast<UInt64>(seconds * 1e6);
FailPointInjection::pauseFailPoint(FailPoints::infinite_sleep);
if (max_microseconds && microseconds > max_microseconds)
throw Exception(ErrorCodes::TOO_SLOW, "The maximum sleep time is {} microseconds. Requested: {} microseconds",
max_microseconds, microseconds);

View File

@ -0,0 +1,56 @@
#include "config.h"
#if USE_PROTOBUF
#include <IO/Protobuf/ProtobufZeroCopyInputStreamFromReadBuffer.h>
#include <IO/ReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ProtobufZeroCopyInputStreamFromReadBuffer::ProtobufZeroCopyInputStreamFromReadBuffer(std::unique_ptr<ReadBuffer> in_) : in(std::move(in_))
{
}
ProtobufZeroCopyInputStreamFromReadBuffer::~ProtobufZeroCopyInputStreamFromReadBuffer() = default;
bool ProtobufZeroCopyInputStreamFromReadBuffer::Next(const void ** data, int * size)
{
if (in->eof())
return false;
*data = in->position();
*size = static_cast<int>(in->available());
in->position() += *size;
return true;
}
void ProtobufZeroCopyInputStreamFromReadBuffer::BackUp(int count)
{
if (static_cast<Int64>(in->offset()) < count)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"ProtobufZeroCopyInputStreamFromReadBuffer::BackUp() cannot back up {} bytes (max = {} bytes)",
count,
in->offset());
in->position() -= count;
}
bool ProtobufZeroCopyInputStreamFromReadBuffer::Skip(int count)
{
return static_cast<Int64>(in->tryIgnore(count)) == count;
}
int64_t ProtobufZeroCopyInputStreamFromReadBuffer::ByteCount() const
{
return in->count();
}
}
#endif

View File

@ -0,0 +1,38 @@
#pragma once
#include "config.h"
#if USE_PROTOBUF
#include <google/protobuf/io/zero_copy_stream.h>
namespace DB
{
class ReadBuffer;
class ProtobufZeroCopyInputStreamFromReadBuffer : public google::protobuf::io::ZeroCopyInputStream
{
public:
explicit ProtobufZeroCopyInputStreamFromReadBuffer(std::unique_ptr<ReadBuffer> in_);
~ProtobufZeroCopyInputStreamFromReadBuffer() override;
// Obtains a chunk of data from the stream.
bool Next(const void ** data, int * size) override;
// Backs up a number of bytes, so that the next call to Next() returns
// data again that was already returned by the last call to Next().
void BackUp(int count) override;
// Skips a number of bytes.
bool Skip(int count) override;
// Returns the total number of bytes read since this object was created.
int64_t ByteCount() const override;
private:
std::unique_ptr<ReadBuffer> in;
};
}
#endif

View File

@ -0,0 +1,60 @@
#include "config.h"
#if USE_PROTOBUF
#include <IO/Protobuf/ProtobufZeroCopyOutputStreamFromWriteBuffer.h>
#include <IO/WriteBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ProtobufZeroCopyOutputStreamFromWriteBuffer::ProtobufZeroCopyOutputStreamFromWriteBuffer(WriteBuffer & out_) : out(&out_)
{
}
ProtobufZeroCopyOutputStreamFromWriteBuffer::ProtobufZeroCopyOutputStreamFromWriteBuffer(std::unique_ptr<WriteBuffer> out_)
: ProtobufZeroCopyOutputStreamFromWriteBuffer(*out_)
{
out_holder = std::move(out_);
}
ProtobufZeroCopyOutputStreamFromWriteBuffer::~ProtobufZeroCopyOutputStreamFromWriteBuffer() = default;
bool ProtobufZeroCopyOutputStreamFromWriteBuffer::Next(void ** data, int * size)
{
*data = out->position();
*size = static_cast<int>(out->available());
out->position() += *size;
return true;
}
void ProtobufZeroCopyOutputStreamFromWriteBuffer::BackUp(int count)
{
if (static_cast<Int64>(out->offset()) < count)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"ProtobufZeroCopyOutputStreamFromWriteBuffer::BackUp() cannot back up {} bytes (max = {} bytes)",
count,
out->offset());
out->position() -= count;
}
int64_t ProtobufZeroCopyOutputStreamFromWriteBuffer::ByteCount() const
{
return out->count();
}
void ProtobufZeroCopyOutputStreamFromWriteBuffer::finalize()
{
out->finalize();
}
}
#endif

View File

@ -0,0 +1,40 @@
#pragma once
#include "config.h"
#if USE_PROTOBUF
#include <google/protobuf/io/zero_copy_stream.h>
namespace DB
{
class WriteBuffer;
class ProtobufZeroCopyOutputStreamFromWriteBuffer : public google::protobuf::io::ZeroCopyOutputStream
{
public:
explicit ProtobufZeroCopyOutputStreamFromWriteBuffer(WriteBuffer & out_);
explicit ProtobufZeroCopyOutputStreamFromWriteBuffer(std::unique_ptr<WriteBuffer> out_);
~ProtobufZeroCopyOutputStreamFromWriteBuffer() override;
// Obtains a buffer into which data can be written.
bool Next(void ** data, int * size) override;
// Backs up a number of bytes, so that the end of the last buffer returned
// by Next() is not actually written.
void BackUp(int count) override;
// Returns the total number of bytes written since this object was created.
int64_t ByteCount() const override;
void finalize();
private:
WriteBuffer * out;
std::unique_ptr<WriteBuffer> out_holder;
};
}
#endif

View File

@ -16,7 +16,13 @@ namespace ErrorCodes
}
SnappyWriteBuffer::SnappyWriteBuffer(std::unique_ptr<WriteBuffer> out_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(std::move(out_))
: SnappyWriteBuffer(*out_, buf_size, existing_memory, alignment)
{
out_holder = std::move(out_);
}
SnappyWriteBuffer::SnappyWriteBuffer(WriteBuffer & out_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(&out_)
{
}

View File

@ -18,6 +18,12 @@ public:
char * existing_memory = nullptr,
size_t alignment = 0);
explicit SnappyWriteBuffer(
WriteBuffer & out_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
~SnappyWriteBuffer() override;
void finalizeImpl() override { finish(); }
@ -28,7 +34,9 @@ private:
void finishImpl();
void finish();
std::unique_ptr<WriteBuffer> out;
WriteBuffer * out;
std::unique_ptr<WriteBuffer> out_holder;
bool finished = false;
String uncompress_buffer;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/typeid_cast.h>
#include <Parsers/ASTWithElement.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTRenameQuery.h>
@ -100,6 +101,7 @@ private:
const String database_name;
std::set<String> external_tables;
mutable std::unordered_set<String> with_aliases;
bool only_replace_current_database_function = false;
bool only_replace_in_join = false;
@ -117,6 +119,10 @@ private:
void visit(ASTSelectQuery & select, ASTPtr &) const
{
if (select.recursive_with)
for (const auto & child : select.with()->children)
with_aliases.insert(child->as<ASTWithElement>()->name);
if (select.tables())
tryVisit<ASTTablesInSelectQuery>(select.refTables());
@ -165,6 +171,9 @@ private:
/// There is temporary table with such name, should not be rewritten.
if (external_tables.contains(identifier.shortName()))
return;
/// This is WITH RECURSIVE alias.
if (with_aliases.contains(identifier.name()))
return;
auto qualified_identifier = std::make_shared<ASTTableIdentifier>(database_name, identifier.name());
if (!identifier.alias.empty())

View File

@ -254,6 +254,8 @@ String toString(ClientInfo::Interface interface)
return "LOCAL";
case ClientInfo::Interface::TCP_INTERSERVER:
return "TCP_INTERSERVER";
case ClientInfo::Interface::PROMETHEUS:
return "PROMETHEUS";
}
return std::format("Unknown server interface ({}).", static_cast<int>(interface));

View File

@ -38,6 +38,7 @@ public:
POSTGRESQL = 5,
LOCAL = 6,
TCP_INTERSERVER = 7,
PROMETHEUS = 8,
};
enum class HTTPMethod : uint8_t

View File

@ -38,6 +38,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
#include <AggregateFunctions/WindowFunction.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageDictionary.h>
@ -590,6 +591,7 @@ void ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAG & actions, Aggrega
void ExpressionAnalyzer::makeWindowDescriptionFromAST(const Context & context_,
const WindowDescriptions & existing_descriptions,
AggregateFunctionPtr aggregate_function,
WindowDescription & desc, const IAST * ast)
{
const auto & definition = ast->as<const ASTWindowDefinition &>();
@ -698,7 +700,21 @@ void ExpressionAnalyzer::makeWindowDescriptionFromAST(const Context & context_,
ast->formatForErrorMessage());
}
const auto * window_function = aggregate_function ? dynamic_cast<const IWindowFunction *>(aggregate_function.get()) : nullptr;
desc.frame.is_default = definition.frame_is_default;
if (desc.frame.is_default && window_function)
{
auto default_window_frame_opt = window_function->getDefaultFrame();
if (default_window_frame_opt)
{
desc.frame = *default_window_frame_opt;
/// Append the default frame description to window_name, make sure it will be put into
/// a proper window description.
desc.window_name += " " + desc.frame.toString();
return;
}
}
desc.frame.type = definition.frame_type;
desc.frame.begin_type = definition.frame_begin_type;
desc.frame.begin_preceding = definition.frame_begin_preceding;
@ -734,7 +750,7 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAG & actions)
WindowDescription desc;
desc.window_name = elem.name;
makeWindowDescriptionFromAST(*current_context, window_descriptions,
desc, elem.definition.get());
nullptr, desc, elem.definition.get());
auto [it, inserted] = window_descriptions.insert(
{elem.name, std::move(desc)});
@ -821,12 +837,12 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAG & actions)
WindowDescription desc;
desc.window_name = default_window_name;
makeWindowDescriptionFromAST(*current_context, window_descriptions,
desc, &definition);
window_function.aggregate_function, desc, &definition);
auto full_sort_description = desc.full_sort_description;
auto [it, inserted] = window_descriptions.insert(
{default_window_name, std::move(desc)});
{desc.window_name, std::move(desc)});
if (!inserted)
{

View File

@ -135,7 +135,12 @@ public:
/// A list of windows for window functions.
const WindowDescriptions & windowDescriptions() const { return window_descriptions; }
void makeWindowDescriptionFromAST(const Context & context, const WindowDescriptions & existing_descriptions, WindowDescription & desc, const IAST * ast);
void makeWindowDescriptionFromAST(
const Context & context,
const WindowDescriptions & existing_descriptions,
AggregateFunctionPtr aggregate_function,
WindowDescription & desc,
const IAST * ast);
void makeWindowDescriptions(ActionsDAG & actions);
/** Checks if subquery is not a plain StorageSet.

View File

@ -37,6 +37,7 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/StorageTimeSeries.h>
#include <Storages/WindowView/StorageWindowView.h>
#include <Interpreters/Context.h>
@ -751,6 +752,10 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
if (create.storage && create.storage->engine)
getContext()->checkAccess(AccessType::TABLE_ENGINE, create.storage->engine->name);
/// If this is a TimeSeries table then we need to normalize list of columns (add missing columns and reorder), and also set inner table engines.
if (create.is_time_series_table && (mode < LoadingStrictnessLevel::ATTACH))
StorageTimeSeries::normalizeTableDefinition(create, getContext());
TableProperties properties;
TableLockHolder as_storage_lock;
@ -1093,6 +1098,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
else if (as_create.storage)
{
storage_def = typeid_cast<std::shared_ptr<ASTStorage>>(as_create.storage->ptr());
create.is_time_series_table = as_create.is_time_series_table;
}
else
{

View File

@ -105,9 +105,10 @@ ColumnsDescription SessionLogElement::getColumnsDescription()
{"MySQL", static_cast<Int8>(Interface::MYSQL)},
{"PostgreSQL", static_cast<Int8>(Interface::POSTGRESQL)},
{"Local", static_cast<Int8>(Interface::LOCAL)},
{"TCP_Interserver", static_cast<Int8>(Interface::TCP_INTERSERVER)}
{"TCP_Interserver", static_cast<Int8>(Interface::TCP_INTERSERVER)},
{"Prometheus", static_cast<Int8>(Interface::PROMETHEUS)},
});
static_assert(magic_enum::enum_count<Interface>() == 7);
static_assert(magic_enum::enum_count<Interface>() == 8);
auto lc_string_datatype = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());

View File

@ -27,6 +27,7 @@ class ASTQueryWithTableAndOutput;
class ASTTableIdentifier;
class Context;
// TODO(ilezhankin): refactor and merge |ASTTableIdentifier|
struct StorageID
{
String database_name;

View File

@ -214,6 +214,10 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
}
else if (type.isValueRepresentedByNumber() && src.getType() != Field::Types::String)
{
/// Bool is not represented in which_type, so we need to type it separately
if (isInt64OrUInt64orBoolFieldType(src.getType()) && type.getName() == "Bool")
return bool(src.safeGet<bool>());
if (which_type.isUInt8()) return convertNumericType<UInt8>(src, type);
if (which_type.isUInt16()) return convertNumericType<UInt16>(src, type);
if (which_type.isUInt32()) return convertNumericType<UInt32>(src, type);

View File

@ -483,6 +483,13 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
if (auto to_storage = getTargetInnerEngine(ViewTarget::To))
to_storage->formatImpl(settings, state, frame);
if (targets)
{
targets->formatTarget(ViewTarget::Data, settings, state, frame);
targets->formatTarget(ViewTarget::Tags, settings, state, frame);
targets->formatTarget(ViewTarget::Metrics, settings, state, frame);
}
if (dictionary)
dictionary->formatImpl(settings, state, frame);

View File

@ -97,6 +97,7 @@ public:
bool is_materialized_view{false};
bool is_live_view{false};
bool is_window_view{false};
bool is_time_series_table{false}; /// CREATE TABLE ... ENGINE=TimeSeries() ...
bool is_populate{false};
bool is_create_empty{false}; /// CREATE TABLE ... EMPTY AS SELECT ...
bool replace_view{false}; /// CREATE OR REPLACE VIEW

View File

@ -21,6 +21,9 @@ std::string_view toString(ViewTarget::Kind kind)
{
case ViewTarget::To: return "to";
case ViewTarget::Inner: return "inner";
case ViewTarget::Data: return "data";
case ViewTarget::Tags: return "tags";
case ViewTarget::Metrics: return "metrics";
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "{} doesn't support kind {}", __FUNCTION__, kind);
}
@ -254,6 +257,9 @@ std::optional<Keyword> ASTViewTargets::getKeywordForTableID(ViewTarget::Kind kin
{
case ViewTarget::To: return Keyword::TO; /// TO mydb.mydata
case ViewTarget::Inner: return std::nullopt;
case ViewTarget::Data: return Keyword::DATA; /// DATA mydb.mydata
case ViewTarget::Tags: return Keyword::TAGS; /// TAGS mydb.mytags
case ViewTarget::Metrics: return Keyword::METRICS; /// METRICS mydb.mymetrics
}
UNREACHABLE();
}
@ -264,6 +270,9 @@ std::optional<Keyword> ASTViewTargets::getKeywordForInnerStorage(ViewTarget::Kin
{
case ViewTarget::To: return std::nullopt; /// ENGINE = MergeTree()
case ViewTarget::Inner: return Keyword::INNER; /// INNER ENGINE = MergeTree()
case ViewTarget::Data: return Keyword::DATA; /// DATA ENGINE = MergeTree()
case ViewTarget::Tags: return Keyword::TAGS; /// TAGS ENGINE = MergeTree()
case ViewTarget::Metrics: return Keyword::METRICS; /// METRICS ENGINE = MergeTree()
}
UNREACHABLE();
}
@ -274,6 +283,9 @@ std::optional<Keyword> ASTViewTargets::getKeywordForInnerUUID(ViewTarget::Kind k
{
case ViewTarget::To: return Keyword::TO_INNER_UUID; /// TO INNER UUID 'XXX'
case ViewTarget::Inner: return std::nullopt;
case ViewTarget::Data: return Keyword::DATA_INNER_UUID; /// DATA INNER UUID 'XXX'
case ViewTarget::Tags: return Keyword::TAGS_INNER_UUID; /// TAGS INNER UUID 'XXX'
case ViewTarget::Metrics: return Keyword::METRICS_INNER_UUID; /// METRICS INNER UUID 'XXX'
}
UNREACHABLE();
}

View File

@ -9,7 +9,7 @@ namespace DB
class ASTStorage;
enum class Keyword : size_t;
/// Information about target tables (external or inner) of a materialized view or a window view.
/// Information about target tables (external or inner) of a materialized view or a window view or a TimeSeries table.
/// See ASTViewTargets for more details.
struct ViewTarget
{
@ -24,6 +24,15 @@ struct ViewTarget
/// If `kind == ViewTarget::Inner` then `ViewTarget` contains information about the "INNER" table of a window view:
/// CREATE WINDOW VIEW db.wv_name {INNER ENGINE inner_engine} AS SELECT ...
Inner,
/// The "data" table for a TimeSeries table, contains time series.
Data,
/// The "tags" table for a TimeSeries table, contains identifiers for each combination of a metric name and tags (labels).
Tags,
/// The "metrics" table for a TimeSeries table, contains general information (metadata) about metrics.
Metrics,
};
Kind kind = To;

View File

@ -116,6 +116,8 @@ namespace DB
MR_MACROS(CURRENT_TRANSACTION, "CURRENT TRANSACTION") \
MR_MACROS(CURRENTUSER, "CURRENTUSER") \
MR_MACROS(D, "D") \
MR_MACROS(DATA, "DATA") \
MR_MACROS(DATA_INNER_UUID, "DATA INNER UUID") \
MR_MACROS(DATABASE, "DATABASE") \
MR_MACROS(DATABASES, "DATABASES") \
MR_MACROS(DATE, "DATE") \
@ -288,6 +290,8 @@ namespace DB
MR_MACROS(MCS, "MCS") \
MR_MACROS(MEMORY, "MEMORY") \
MR_MACROS(MERGES, "MERGES") \
MR_MACROS(METRICS, "METRICS") \
MR_MACROS(METRICS_INNER_UUID, "METRICS INNER UUID") \
MR_MACROS(MI, "MI") \
MR_MACROS(MICROSECOND, "MICROSECOND") \
MR_MACROS(MICROSECONDS, "MICROSECONDS") \
@ -464,6 +468,8 @@ namespace DB
MR_MACROS(TABLE_OVERRIDE, "TABLE OVERRIDE") \
MR_MACROS(TABLE, "TABLE") \
MR_MACROS(TABLES, "TABLES") \
MR_MACROS(TAGS, "TAGS") \
MR_MACROS(TAGS_INNER_UUID, "TAGS INNER UUID") \
MR_MACROS(TEMPORARY_TABLE, "TEMPORARY TABLE") \
MR_MACROS(TEMPORARY, "TEMPORARY") \
MR_MACROS(TEST, "TEST") \

View File

@ -45,6 +45,13 @@ CreateQueryUUIDs::CreateQueryUUIDs(const ASTCreateQuery & query, bool generate_r
/// then MV will create inner table. We should generate UUID of inner table here.
if (query.is_materialized_view)
generate_target_uuid(ViewTarget::To);
if (query.is_time_series_table)
{
generate_target_uuid(ViewTarget::Data);
generate_target_uuid(ViewTarget::Tags);
generate_target_uuid(ViewTarget::Metrics);
}
}
}
}

View File

@ -696,6 +696,7 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
ASTPtr table;
ASTPtr columns_list;
std::shared_ptr<ASTStorage> storage;
bool is_time_series_table = false;
ASTPtr targets;
ASTPtr as_database;
ASTPtr as_table;
@ -784,6 +785,13 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
return false;
storage = typeid_cast<std::shared_ptr<ASTStorage>>(ast);
if (storage && storage->engine && (storage->engine->name == "TimeSeries"))
{
is_time_series_table = true;
ParserViewTargets({ViewTarget::Data, ViewTarget::Tags, ViewTarget::Metrics}).parse(pos, targets, expected);
}
return true;
};
@ -873,6 +881,7 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
query->create_or_replace = or_replace;
query->if_not_exists = if_not_exists;
query->temporary = is_temporary;
query->is_time_series_table = is_time_series_table;
query->database = table_id->getDatabase();
query->table = table_id->getTable();

View File

@ -1,5 +1,6 @@
#include <Planner/PlannerActionsVisitor.h>
#include <AggregateFunctions/WindowFunction.h>
#include <Analyzer/Utils.h>
#include <Analyzer/SetUtils.h>
#include <Analyzer/ConstantNode.h>
@ -237,7 +238,7 @@ public:
if (function_node.isWindowFunction())
{
buffer << " OVER (";
buffer << calculateWindowNodeActionName(function_node.getWindowNode());
buffer << calculateWindowNodeActionName(node, function_node.getWindowNode());
buffer << ')';
}
@ -298,21 +299,22 @@ public:
return calculateConstantActionNodeName(constant_literal, applyVisitor(FieldToDataType(), constant_literal));
}
String calculateWindowNodeActionName(const QueryTreeNodePtr & node)
String calculateWindowNodeActionName(const QueryTreeNodePtr & function_nodew_node_, const QueryTreeNodePtr & window_node_)
{
auto & window_node = node->as<WindowNode &>();
const auto & function_node = function_nodew_node_->as<const FunctionNode&>();
const auto & window_node = window_node_->as<const WindowNode &>();
WriteBufferFromOwnString buffer;
if (window_node.hasPartitionBy())
{
buffer << "PARTITION BY ";
auto & partition_by_nodes = window_node.getPartitionBy().getNodes();
const auto & partition_by_nodes = window_node.getPartitionBy().getNodes();
size_t partition_by_nodes_size = partition_by_nodes.size();
for (size_t i = 0; i < partition_by_nodes_size; ++i)
{
auto & partition_by_node = partition_by_nodes[i];
const auto & partition_by_node = partition_by_nodes[i];
buffer << calculateActionNodeName(partition_by_node);
if (i + 1 != partition_by_nodes_size)
buffer << ", ";
@ -326,7 +328,7 @@ public:
buffer << "ORDER BY ";
auto & order_by_nodes = window_node.getOrderBy().getNodes();
const auto & order_by_nodes = window_node.getOrderBy().getNodes();
size_t order_by_nodes_size = order_by_nodes.size();
for (size_t i = 0; i < order_by_nodes_size; ++i)
@ -364,44 +366,14 @@ public:
}
}
auto & window_frame = window_node.getWindowFrame();
if (!window_frame.is_default)
auto window_frame_opt = extractWindowFrame(function_node);
if (window_frame_opt)
{
auto & window_frame = *window_frame_opt;
if (window_node.hasPartitionBy() || window_node.hasOrderBy())
buffer << ' ';
buffer << window_frame.type << " BETWEEN ";
if (window_frame.begin_type == WindowFrame::BoundaryType::Current)
{
buffer << "CURRENT ROW";
}
else if (window_frame.begin_type == WindowFrame::BoundaryType::Unbounded)
{
buffer << "UNBOUNDED";
buffer << " " << (window_frame.begin_preceding ? "PRECEDING" : "FOLLOWING");
}
else
{
buffer << calculateActionNodeName(window_node.getFrameBeginOffsetNode());
buffer << " " << (window_frame.begin_preceding ? "PRECEDING" : "FOLLOWING");
}
buffer << " AND ";
if (window_frame.end_type == WindowFrame::BoundaryType::Current)
{
buffer << "CURRENT ROW";
}
else if (window_frame.end_type == WindowFrame::BoundaryType::Unbounded)
{
buffer << "UNBOUNDED";
buffer << " " << (window_frame.end_preceding ? "PRECEDING" : "FOLLOWING");
}
else
{
buffer << calculateActionNodeName(window_node.getFrameEndOffsetNode());
buffer << " " << (window_frame.end_preceding ? "PRECEDING" : "FOLLOWING");
}
window_frame.toString(buffer);
}
return buffer.str();
@ -1056,20 +1028,11 @@ String calculateConstantActionNodeName(const Field & constant_literal)
return ActionNodeNameHelper::calculateConstantActionNodeName(constant_literal);
}
String calculateWindowNodeActionName(const QueryTreeNodePtr & node,
const PlannerContext & planner_context,
QueryTreeNodeToName & node_to_name,
bool use_column_identifier_as_action_node_name)
{
ActionNodeNameHelper helper(node_to_name, planner_context, use_column_identifier_as_action_node_name);
return helper.calculateWindowNodeActionName(node);
}
String calculateWindowNodeActionName(const QueryTreeNodePtr & node, const PlannerContext & planner_context, bool use_column_identifier_as_action_node_name)
String calculateWindowNodeActionName(const QueryTreeNodePtr & function_node, const QueryTreeNodePtr & window_node, const PlannerContext & planner_context, bool use_column_identifier_as_action_node_name)
{
QueryTreeNodeToName empty_map;
ActionNodeNameHelper helper(empty_map, planner_context, use_column_identifier_as_action_node_name);
return helper.calculateWindowNodeActionName(node);
return helper.calculateWindowNodeActionName(function_node, window_node);
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <optional>
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
@ -8,6 +9,7 @@
#include <Analyzer/IQueryTreeNode.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/WindowDescription.h>
namespace DB
{
@ -73,16 +75,8 @@ String calculateConstantActionNodeName(const Field & constant_literal);
* Window node action name can only be part of window function action name.
* For column node column node identifier from planner context is used, if use_column_identifier_as_action_node_name = true.
*/
String calculateWindowNodeActionName(const QueryTreeNodePtr & node,
const PlannerContext & planner_context,
QueryTreeNodeToName & node_to_name,
bool use_column_identifier_as_action_node_name = true);
/** Calculate action node name for window node.
* Window node action name can only be part of window function action name.
* For column node column node identifier from planner context is used, if use_column_identifier_as_action_node_name = true.
*/
String calculateWindowNodeActionName(const QueryTreeNodePtr & node,
String calculateWindowNodeActionName(const QueryTreeNodePtr & function_node,
const QueryTreeNodePtr & window_node,
const PlannerContext & planner_context,
bool use_column_identifier_as_action_node_name = true);

View File

@ -1,5 +1,7 @@
#include <optional>
#include <Planner/PlannerWindowFunctions.h>
#include <AggregateFunctions/WindowFunction.h>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/WindowNode.h>
@ -8,8 +10,9 @@
#include <Interpreters/Context.h>
#include <Planner/PlannerSorting.h>
#include <Planner/PlannerActionsVisitor.h>
#include <Planner/PlannerSorting.h>
#include <Planner/Utils.h>
namespace DB
{
@ -22,27 +25,33 @@ namespace ErrorCodes
namespace
{
WindowDescription extractWindowDescriptionFromWindowNode(const QueryTreeNodePtr & node, const PlannerContext & planner_context)
WindowDescription extractWindowDescriptionFromWindowNode(const QueryTreeNodePtr & func_node_, const PlannerContext & planner_context)
{
const auto & func_node = func_node_->as<FunctionNode &>();
auto node = func_node.getWindowNode();
auto & window_node = node->as<WindowNode &>();
WindowDescription window_description;
window_description.window_name = calculateWindowNodeActionName(node, planner_context);
window_description.window_name = calculateWindowNodeActionName(func_node_, node, planner_context);
for (const auto & partition_by_node : window_node.getPartitionBy().getNodes())
{
auto partition_by_node_action_name = calculateActionNodeName(partition_by_node, planner_context);
auto partition_by_sort_column_description = SortColumnDescription(partition_by_node_action_name, 1 /* direction */, 1 /* nulls_direction */);
auto partition_by_sort_column_description
= SortColumnDescription(partition_by_node_action_name, 1 /* direction */, 1 /* nulls_direction */);
window_description.partition_by.push_back(std::move(partition_by_sort_column_description));
}
window_description.order_by = extractSortDescription(window_node.getOrderByNode(), planner_context);
window_description.full_sort_description = window_description.partition_by;
window_description.full_sort_description.insert(window_description.full_sort_description.end(), window_description.order_by.begin(), window_description.order_by.end());
window_description.full_sort_description.insert(
window_description.full_sort_description.end(), window_description.order_by.begin(), window_description.order_by.end());
/// WINDOW frame is validated during query analysis stage
window_description.frame = window_node.getWindowFrame();
auto window_frame = extractWindowFrame(func_node);
window_description.frame = window_frame ? *window_frame : window_node.getWindowFrame();
auto node_frame = window_node.getWindowFrame();
const auto & query_context = planner_context.getQueryContext();
const auto & query_context_settings = query_context->getSettingsRef();
@ -64,7 +73,8 @@ WindowDescription extractWindowDescriptionFromWindowNode(const QueryTreeNodePtr
}
std::vector<WindowDescription> extractWindowDescriptions(const QueryTreeNodes & window_function_nodes, const PlannerContext & planner_context)
std::vector<WindowDescription>
extractWindowDescriptions(const QueryTreeNodes & window_function_nodes, const PlannerContext & planner_context)
{
std::unordered_map<std::string, WindowDescription> window_name_to_description;
@ -72,7 +82,7 @@ std::vector<WindowDescription> extractWindowDescriptions(const QueryTreeNodes &
{
auto & window_function_node_typed = window_function_node->as<FunctionNode &>();
auto function_window_description = extractWindowDescriptionFromWindowNode(window_function_node_typed.getWindowNode(), planner_context);
auto function_window_description = extractWindowDescriptionFromWindowNode(window_function_node, planner_context);
auto frame_type = function_window_description.frame.type;
if (frame_type != WindowFrame::FrameType::ROWS && frame_type != WindowFrame::FrameType::RANGE)

View File

@ -22,6 +22,8 @@
#include <Interpreters/Context.h>
#include <AggregateFunctions/WindowFunction.h>
#include <Analyzer/Utils.h>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/ColumnNode.h>
@ -34,6 +36,7 @@
#include <Analyzer/JoinNode.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/WindowNode.h>
#include <Core/Settings.h>
@ -507,4 +510,20 @@ void appendSetsFromActionsDAG(const ActionsDAG & dag, UsefulSets & useful_sets)
}
}
std::optional<WindowFrame> extractWindowFrame(const FunctionNode & node)
{
if (!node.isWindowFunction())
return {};
auto & window_node = node.getWindowNode()->as<WindowNode &>();
const auto & window_frame = window_node.getWindowFrame();
if (!window_frame.is_default)
return window_frame;
auto aggregate_function = node.getAggregateFunction();
if (const auto * win_func = dynamic_cast<const IWindowFunction *>(aggregate_function.get()))
{
return win_func->getDefaultFrame();
}
return {};
}
}

View File

@ -19,6 +19,8 @@
#include <Storages/SelectQueryInfo.h>
#include <Interpreters/WindowDescription.h>
namespace DB
{
@ -91,4 +93,9 @@ ASTPtr parseAdditionalResultFilter(const Settings & settings);
using UsefulSets = std::unordered_set<FutureSetPtr>;
void appendSetsFromActionsDAG(const ActionsDAG & dag, UsefulSets & useful_sets);
/// If the window frame is not set in sql, try to use the default frame from window function
/// if it have any one. Otherwise return empty.
/// If the window frame is set in sql, use it anyway.
std::optional<WindowFrame> extractWindowFrame(const FunctionNode & node);
}

View File

@ -65,28 +65,6 @@ namespace ErrorCodes
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
}
// Interface for true window functions. It's not much of an interface, they just
// accept the guts of WindowTransform and do 'something'. Given a small number of
// true window functions, and the fact that the WindowTransform internals are
// pretty much well-defined in domain terms (e.g. frame boundaries), this is
// somewhat acceptable.
class IWindowFunction
{
public:
virtual ~IWindowFunction() = default;
// Must insert the result for current_row.
virtual void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) const = 0;
virtual std::optional<WindowFrame> getDefaultFrame() const { return {}; }
virtual ColumnPtr castColumn(const Columns &, const std::vector<size_t> &) { return nullptr; }
/// Is the frame type supported by this function.
virtual bool checkWindowFrameType(const WindowTransform * /*transform*/) const { return true; }
};
// Compares ORDER BY column values at given rows to find the boundaries of frame:
// [compared] with [reference] +/- offset. Return value is -1/0/+1, like in
// sorting predicates -- -1 means [compared] is less than [reference] +/- offset.
@ -1523,41 +1501,6 @@ void WindowTransform::work()
}
}
// A basic implementation for a true window function. It pretends to be an
// aggregate function, but refuses to work as such.
struct WindowFunction
: public IAggregateFunctionHelper<WindowFunction>
, public IWindowFunction
{
std::string name;
WindowFunction(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_, const DataTypePtr & result_type_)
: IAggregateFunctionHelper<WindowFunction>(argument_types_, parameters_, result_type_)
, name(name_)
{}
bool isOnlyWindowFunction() const override { return true; }
[[noreturn]] void fail() const
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"The function '{}' can only be used as a window function, not as an aggregate function",
getName());
}
String getName() const override { return name; }
void create(AggregateDataPtr __restrict) const override {}
void destroy(AggregateDataPtr __restrict) const noexcept override {}
bool hasTrivialDestructor() const override { return true; }
size_t sizeOfData() const override { return 0; }
size_t alignOfData() const override { return 1; }
void add(AggregateDataPtr __restrict, const IColumn **, size_t, Arena *) const override { fail(); }
void merge(AggregateDataPtr __restrict, ConstAggregateDataPtr, Arena *) const override { fail(); }
void serialize(ConstAggregateDataPtr __restrict, WriteBuffer &, std::optional<size_t>) const override { fail(); }
void deserialize(AggregateDataPtr __restrict, ReadBuffer &, std::optional<size_t>, Arena *) const override { fail(); }
void insertResultInto(AggregateDataPtr __restrict, IColumn &, Arena *) const override { fail(); }
};
struct WindowFunctionRank final : public WindowFunction
{
WindowFunctionRank(const std::string & name_,
@ -1669,36 +1612,6 @@ struct WindowFunctionHelpers
}
};
template<typename State>
struct StatefulWindowFunction : public WindowFunction
{
StatefulWindowFunction(const std::string & name_,
const DataTypes & argument_types_, const Array & parameters_, const DataTypePtr & result_type_)
: WindowFunction(name_, argument_types_, parameters_, result_type_)
{
}
size_t sizeOfData() const override { return sizeof(State); }
size_t alignOfData() const override { return 1; }
void create(AggregateDataPtr __restrict place) const override
{
new (place) State();
}
void destroy(AggregateDataPtr __restrict place) const noexcept override
{
reinterpret_cast<State *>(place)->~State();
}
bool hasTrivialDestructor() const override { return std::is_trivially_destructible_v<State>; }
State & getState(const WindowFunctionWorkspace & workspace) const
{
return *reinterpret_cast<State *>(workspace.aggregate_function_state.data());
}
};
struct ExponentialTimeDecayedSumState
{
Float64 previous_time;
@ -2278,14 +2191,13 @@ public:
bool checkWindowFrameType(const WindowTransform * transform) const override
{
if (transform->window_description.frame.type != WindowFrame::FrameType::RANGE
|| transform->window_description.frame.begin_type != WindowFrame::BoundaryType::Unbounded
|| transform->window_description.frame.end_type != WindowFrame::BoundaryType::Current)
{
LOG_ERROR(
getLogger("WindowFunctionPercentRank"),
"Window frame for function 'percent_rank' should be 'RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT'");
return false;
auto default_window_frame = getDefaultFrame();
if (transform->window_description.frame != default_window_frame)
{
LOG_ERROR(
getLogger("WindowFunctionPercentRank"),
"Window frame for function 'percent_rank' should be '{}'", default_window_frame->toString());
return false;
}
return true;
}
@ -2295,7 +2207,7 @@ public:
WindowFrame frame;
frame.type = WindowFrame::FrameType::RANGE;
frame.begin_type = WindowFrame::BoundaryType::Unbounded;
frame.end_type = WindowFrame::BoundaryType::Current;
frame.end_type = WindowFrame::BoundaryType::Unbounded;
return frame;
}
@ -2860,5 +2772,4 @@ void registerWindowFunctions(AggregateFunctionFactory & factory)
name, argument_types, parameters);
}, properties});
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Interpreters/WindowDescription.h>
#include <AggregateFunctions/WindowFunction.h>
#include <Processors/IProcessor.h>
@ -21,30 +22,6 @@ using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class Arena;
class IWindowFunction;
// Runtime data for computing one window function.
struct WindowFunctionWorkspace
{
AggregateFunctionPtr aggregate_function;
// Cached value of aggregate function isState virtual method
bool is_aggregate_function_state = false;
// This field is set for pure window functions. When set, we ignore the
// window_function.aggregate_function, and work through this interface
// instead.
IWindowFunction * window_function_impl = nullptr;
std::vector<size_t> argument_column_indices;
// Will not be initialized for a pure window function.
mutable AlignedBuffer aggregate_function_state;
// Argument columns. Be careful, this is a per-block cache.
std::vector<const IColumn *> argument_columns;
UInt64 cached_block_number = std::numeric_limits<UInt64>::max();
};
struct WindowTransformBlock
{

View File

@ -2,6 +2,7 @@
#include <Server/TCPServer.h>
#include <Poco/Net/NetException.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -97,6 +98,21 @@ void HTTPServerConnection::run()
{
sendErrorResponse(session, Poco::Net::HTTPResponse::HTTP_BAD_REQUEST);
}
catch (const Poco::Net::NetException & e)
{
/// Do not spam logs with messages related to connection reset by peer.
if (e.code() == POCO_ENOTCONN)
{
LOG_DEBUG(LogFrequencyLimiter(getLogger("HTTPServerConnection"), 10), "Connection reset by peer while processing HTTP request: {}", e.message());
break;
}
if (session.networkException())
session.networkException()->rethrow();
else
throw;
}
catch (const Poco::Exception &)
{
if (session.networkException())

View File

@ -0,0 +1,22 @@
#include <Server/HTTP/checkHTTPHeader.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNEXPECTED_HTTP_HEADERS;
}
void checkHTTPHeader(const HTTPRequest & request, const String & header_name, const String & expected_value)
{
if (!request.has(header_name))
throw Exception(ErrorCodes::UNEXPECTED_HTTP_HEADERS, "No HTTP header {}", header_name);
if (request.get(header_name) != expected_value)
throw Exception(ErrorCodes::UNEXPECTED_HTTP_HEADERS, "HTTP header {} has unexpected value '{}' (instead of '{}')", header_name, request.get(header_name), expected_value);
}
}

View File

@ -0,0 +1,13 @@
#pragma once
#include <Server/HTTP/HTTPRequest.h>
#include <base/types.h>
namespace DB
{
/// Checks that the HTTP request has a specified header with a specified value.
void checkHTTPHeader(const HTTPRequest & request, const String & header_name, const String & expected_value);
}

View File

@ -43,7 +43,6 @@ void sendExceptionToHTTPClient(
out->position() = out->buffer().begin();
out->writeln(exception_message);
out->finalize();
}
}

View File

@ -1,18 +1,16 @@
#include <memory>
#include <Server/HTTPHandlerFactory.h>
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Server/PrometheusMetricsWriter.h>
#include <Server/PrometheusRequestHandlerFactory.h>
#include <Server/IServer.h>
#include <Access/Credentials.h>
#include <Poco/Util/AbstractConfiguration.h>
#include "HTTPHandler.h"
#include "Server/PrometheusMetricsWriter.h"
#include "StaticRequestHandler.h"
#include "ReplicasStatusHandler.h"
#include "InterserverIOHTTPHandler.h"
#include "PrometheusRequestHandler.h"
#include "WebUIRequestHandler.h"
@ -124,7 +122,7 @@ static inline auto createHandlersFactoryFromConfig(
}
else if (handler_type == "prometheus")
{
main_handler_factory->addHandler(createPrometheusHandlerFactory(server, config, async_metrics, prefix + "." + key));
main_handler_factory->addHandler(createPrometheusHandlerFactoryForHTTPRule(server, config, prefix + "." + key, async_metrics));
}
else if (handler_type == "replicas_status")
{
@ -201,10 +199,7 @@ HTTPRequestHandlerFactoryPtr createHandlerFactory(IServer & server, const Poco::
else if (name == "InterserverIOHTTPHandler-factory" || name == "InterserverIOHTTPSHandler-factory")
return createInterserverHTTPHandlerFactory(server, name);
else if (name == "PrometheusHandler-factory")
{
auto metrics_writer = std::make_shared<PrometheusMetricsWriter>(config, "prometheus", async_metrics);
return createPrometheusMainHandlerFactory(server, config, metrics_writer, name);
}
return createPrometheusHandlerFactory(server, config, async_metrics, name);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown HTTP handler factory name.");
}
@ -291,20 +286,9 @@ void addDefaultHandlersFactory(
);
factory.addHandler(query_handler);
/// We check that prometheus handler will be served on current (default) port.
/// Otherwise it will be created separately, see createHandlerFactory(...).
if (config.has("prometheus") && config.getInt("prometheus.port", 0) == 0)
{
auto writer = std::make_shared<PrometheusMetricsWriter>(config, "prometheus", async_metrics);
auto creator = [&server, writer] () -> std::unique_ptr<PrometheusRequestHandler>
{
return std::make_unique<PrometheusRequestHandler>(server, writer);
};
auto prometheus_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>>(std::move(creator));
prometheus_handler->attachStrictPath(config.getString("prometheus.endpoint", "/metrics"));
prometheus_handler->allowGetAndHeadRequest();
/// createPrometheusHandlerFactoryForHTTPRuleDefaults() can return nullptr if prometheus protocols must not be served on http port.
if (auto prometheus_handler = createPrometheusHandlerFactoryForHTTPRuleDefaults(server, config, async_metrics))
factory.addHandler(prometheus_handler);
}
}
}

View File

@ -1,15 +1,12 @@
#pragma once
#include <Common/AsynchronousMetrics.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Server/HTTPHandlerRequestFilter.h>
#include <Server/HTTPRequestHandlerFactoryMain.h>
#include <Common/StringUtils.h>
#include <Server/PrometheusMetricsWriter.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
@ -19,6 +16,7 @@ namespace ErrorCodes
}
class IServer;
class AsynchronousMetrics;
template <typename TEndpoint>
class HandlingRuleHTTPHandlerFactory : public HTTPRequestHandlerFactory
@ -126,18 +124,6 @@ HTTPRequestHandlerFactoryPtr createReplicasStatusHandlerFactory(IServer & server
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix);
HTTPRequestHandlerFactoryPtr
createPrometheusHandlerFactory(IServer & server,
const Poco::Util::AbstractConfiguration & config,
AsynchronousMetrics & async_metrics,
const std::string & config_prefix);
HTTPRequestHandlerFactoryPtr createPrometheusMainHandlerFactory(
IServer & server,
const Poco::Util::AbstractConfiguration & config,
PrometheusMetricsWriterPtr metrics_writer,
const std::string & name);
/// @param server - used in handlers to check IServer::isCancelled()
/// @param config - not the same as server.config(), since it can be newer
/// @param async_metrics - used for prometheus (in case of prometheus.asynchronous_metrics=true)

Some files were not shown because too many files have changed in this diff Show More