diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 1182481c897..f0741b5465f 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -1341,6 +1341,40 @@ jobs: docker ps --quiet | xargs --no-run-if-empty docker kill ||: docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||: sudo rm -fr "$TEMP_PATH" + FunctionalStatelessTestReleaseAnalyzer: + needs: [BuilderDebRelease] + runs-on: [self-hosted, func-tester] + steps: + - name: Set envs + run: | + cat >> "$GITHUB_ENV" << 'EOF' + TEMP_PATH=${{runner.temp}}/stateless_analyzer + REPORTS_PATH=${{runner.temp}}/reports_dir + CHECK_NAME=Stateless tests (release, analyzer) + REPO_COPY=${{runner.temp}}/stateless_analyzer/ClickHouse + KILL_TIMEOUT=10800 + EOF + - name: Download json reports + uses: actions/download-artifact@v3 + with: + path: ${{ env.REPORTS_PATH }} + - name: Check out repository code + uses: ClickHouse/checkout@v1 + with: + clear-repository: true + - name: Functional test + run: | + sudo rm -fr "$TEMP_PATH" + mkdir -p "$TEMP_PATH" + cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH" + cd "$REPO_COPY/tests/ci" + python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT" + - name: Cleanup + if: always() + run: | + docker ps --quiet | xargs --no-run-if-empty docker kill ||: + docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||: + sudo rm -fr "$TEMP_PATH" FunctionalStatelessTestAarch64: needs: [BuilderDebAarch64] runs-on: [self-hosted, func-tester-aarch64] diff --git a/base/base/wide_integer_impl.h b/base/base/wide_integer_impl.h index ed4570d5e3f..4a80c176829 100644 --- a/base/base/wide_integer_impl.h +++ b/base/base/wide_integer_impl.h @@ -314,7 +314,14 @@ struct integer::_impl const T alpha = t / static_cast(max_int); - if (alpha <= static_cast(max_int)) + /** Here we have to use strict comparison. + * The max_int is 2^64 - 1. + * When casted to floating point type, it will be rounded to the closest representable number, + * which is 2^64. + * But 2^64 is not representable in uint64_t, + * so the maximum representable number will be strictly less. + */ + if (alpha < static_cast(max_int)) self = static_cast(alpha); else // max(double) / 2^64 will surely contain less than 52 precision bits, so speed up computations. set_multiplier(self, static_cast(alpha)); diff --git a/base/glibc-compatibility/musl/logf.c b/base/glibc-compatibility/musl/logf.c index 7ee5d7fe623..e4c2237caa2 100644 --- a/base/glibc-compatibility/musl/logf.c +++ b/base/glibc-compatibility/musl/logf.c @@ -53,7 +53,7 @@ float logf(float x) tmp = ix - OFF; i = (tmp >> (23 - LOGF_TABLE_BITS)) % N; k = (int32_t)tmp >> 23; /* arithmetic shift */ - iz = ix - (tmp & 0x1ff << 23); + iz = ix - (tmp & 0xff800000); invc = T[i].invc; logc = T[i].logc; z = (double_t)asfloat(iz); diff --git a/cmake/linux/toolchain-riscv64.cmake b/cmake/linux/toolchain-riscv64.cmake index ea57c3b2c42..7f876f88d72 100644 --- a/cmake/linux/toolchain-riscv64.cmake +++ b/cmake/linux/toolchain-riscv64.cmake @@ -21,7 +21,7 @@ set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") set (CMAKE_ASM_FLAGS "${CMAKE_ASM_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") -set (CMAKE_EXE_LINKER_FLAGS_INIT "-fuse-ld=bfd") +set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=bfd") # Currently, lld does not work with the error: # ld.lld: error: section size decrease is too large diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 57ce93d45f7..901df1b2432 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -177,7 +177,19 @@ endif() add_contrib (sqlite-cmake sqlite-amalgamation) add_contrib (s2geometry-cmake s2geometry) add_contrib (c-ares-cmake c-ares) -add_contrib (qpl-cmake qpl) + +if (OS_LINUX AND ARCH_AMD64 AND (ENABLE_AVX2 OR ENABLE_AVX512)) + option (ENABLE_QPL "Enable Intel® Query Processing Library" ${ENABLE_LIBRARIES}) +elseif(ENABLE_QPL) + message (${RECONFIGURE_MESSAGE_LEVEL} "QPL library is only supported on x86_64 arch with avx2/avx512 support") +endif() +if (ENABLE_QPL) + add_contrib (idxd-config-cmake idxd-config) + add_contrib (qpl-cmake qpl) # requires: idxd-config +else() + message(STATUS "Not using QPL") +endif () + add_contrib (morton-nd-cmake morton-nd) if (ARCH_S390X) add_contrib(crc32-s390x-cmake crc32-s390x) diff --git a/contrib/boringssl-cmake/CMakeLists.txt b/contrib/boringssl-cmake/CMakeLists.txt index 828919476a7..51137f6d04e 100644 --- a/contrib/boringssl-cmake/CMakeLists.txt +++ b/contrib/boringssl-cmake/CMakeLists.txt @@ -111,6 +111,8 @@ elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "mips") set(ARCH "generic") elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "ppc64le") set(ARCH "ppc64le") +elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "riscv64") + set(ARCH "riscv64") else() message(FATAL_ERROR "Unknown processor:" ${CMAKE_SYSTEM_PROCESSOR}) endif() diff --git a/contrib/idxd-config-cmake/CMakeLists.txt b/contrib/idxd-config-cmake/CMakeLists.txt new file mode 100644 index 00000000000..030252ec8e6 --- /dev/null +++ b/contrib/idxd-config-cmake/CMakeLists.txt @@ -0,0 +1,23 @@ +## accel_config is the utility library required by QPL-Deflate codec for controlling and configuring Intel® In-Memory Analytics Accelerator (Intel® IAA). +set (LIBACCEL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config") +set (UUID_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake") +set (LIBACCEL_HEADER_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config-cmake/include") +set (SRCS + "${LIBACCEL_SOURCE_DIR}/accfg/lib/libaccfg.c" + "${LIBACCEL_SOURCE_DIR}/util/log.c" + "${LIBACCEL_SOURCE_DIR}/util/sysfs.c" +) + +add_library(_accel-config ${SRCS}) + +target_compile_options(_accel-config PRIVATE "-D_GNU_SOURCE") + +target_include_directories(_accel-config BEFORE + PRIVATE ${UUID_DIR} + PRIVATE ${LIBACCEL_HEADER_DIR} + PRIVATE ${LIBACCEL_SOURCE_DIR}) + +target_include_directories(_accel-config SYSTEM BEFORE + PUBLIC ${LIBACCEL_SOURCE_DIR}/accfg) + +add_library(ch_contrib::accel-config ALIAS _accel-config) diff --git a/contrib/qpl-cmake/idxd-header/config.h b/contrib/idxd-config-cmake/include/config.h similarity index 100% rename from contrib/qpl-cmake/idxd-header/config.h rename to contrib/idxd-config-cmake/include/config.h diff --git a/contrib/qpl-cmake/CMakeLists.txt b/contrib/qpl-cmake/CMakeLists.txt index d2be8add3c7..034106b51b6 100644 --- a/contrib/qpl-cmake/CMakeLists.txt +++ b/contrib/qpl-cmake/CMakeLists.txt @@ -1,36 +1,5 @@ ## The Intel® QPL provides high performance implementations of data processing functions for existing hardware accelerator, and/or software path in case if hardware accelerator is not available. -if (OS_LINUX AND ARCH_AMD64 AND (ENABLE_AVX2 OR ENABLE_AVX512)) - option (ENABLE_QPL "Enable Intel® Query Processing Library" ${ENABLE_LIBRARIES}) -elseif(ENABLE_QPL) - message (${RECONFIGURE_MESSAGE_LEVEL} "QPL library is only supported on x86_64 arch with avx2/avx512 support") -endif() - -if (NOT ENABLE_QPL) - message(STATUS "Not using QPL") - return() -endif() - -## QPL has build dependency on libaccel-config. Here is to build libaccel-config which is required by QPL. -## libaccel-config is the utility library for controlling and configuring Intel® In-Memory Analytics Accelerator (Intel® IAA). -set (LIBACCEL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config") set (UUID_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake") -set (LIBACCEL_HEADER_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake/idxd-header") -set (SRCS - "${LIBACCEL_SOURCE_DIR}/accfg/lib/libaccfg.c" - "${LIBACCEL_SOURCE_DIR}/util/log.c" - "${LIBACCEL_SOURCE_DIR}/util/sysfs.c" -) - -add_library(accel-config ${SRCS}) - -target_compile_options(accel-config PRIVATE "-D_GNU_SOURCE") - -target_include_directories(accel-config BEFORE - PRIVATE ${UUID_DIR} - PRIVATE ${LIBACCEL_HEADER_DIR} - PRIVATE ${LIBACCEL_SOURCE_DIR}) - -## QPL build start here. set (QPL_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl") set (QPL_SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl/sources") set (QPL_BINARY_DIR "${ClickHouse_BINARY_DIR}/build/contrib/qpl") @@ -342,12 +311,12 @@ target_compile_definitions(_qpl PUBLIC -DENABLE_QPL_COMPRESSION) target_link_libraries(_qpl - PRIVATE accel-config + PRIVATE ch_contrib::accel-config PRIVATE ch_contrib::isal PRIVATE ${CMAKE_DL_LIBS}) -add_library (ch_contrib::qpl ALIAS _qpl) target_include_directories(_qpl SYSTEM BEFORE PUBLIC "${QPL_PROJECT_DIR}/include" - PUBLIC "${LIBACCEL_SOURCE_DIR}/accfg" PUBLIC ${UUID_DIR}) + +add_library (ch_contrib::qpl ALIAS _qpl) diff --git a/docker/test/codebrowser/Dockerfile b/docker/test/codebrowser/Dockerfile index b76b8234c81..2b788ab134a 100644 --- a/docker/test/codebrowser/Dockerfile +++ b/docker/test/codebrowser/Dockerfile @@ -36,12 +36,10 @@ RUN arch=${TARGETARCH:-amd64} \ # repo versions doesn't work correctly with C++17 # also we push reports to s3, so we add index.html to subfolder urls # https://github.com/ClickHouse-Extras/woboq_codebrowser/commit/37e15eaf377b920acb0b48dbe82471be9203f76b -RUN git clone https://github.com/ClickHouse/woboq_codebrowser \ - && cd woboq_codebrowser \ +RUN git clone --depth=1 https://github.com/ClickHouse/woboq_codebrowser /woboq_codebrowser \ + && cd /woboq_codebrowser \ && cmake . -G Ninja -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER=clang\+\+-${LLVM_VERSION} -DCMAKE_C_COMPILER=clang-${LLVM_VERSION} \ - && ninja \ - && cd .. \ - && rm -rf woboq_codebrowser + && ninja ENV CODEGEN=/woboq_codebrowser/generator/codebrowser_generator ENV CODEINDEX=/woboq_codebrowser/indexgenerator/codebrowser_indexgenerator diff --git a/docs/en/engines/table-engines/mergetree-family/replacingmergetree.md b/docs/en/engines/table-engines/mergetree-family/replacingmergetree.md index 81d8cc2d3ca..7db2f3b465a 100644 --- a/docs/en/engines/table-engines/mergetree-family/replacingmergetree.md +++ b/docs/en/engines/table-engines/mergetree-family/replacingmergetree.md @@ -90,15 +90,17 @@ SELECT * FROM mySecondReplacingMT FINAL; ### is_deleted -`is_deleted` — Name of the column with the type of row: `1` is a “deleted“ row, `0` is a “state“ row. +`is_deleted` — Name of a column used during a merge to determine whether the data in this row represents the state or is to be deleted; `1` is a “deleted“ row, `0` is a “state“ row. - Column data type — `Int8`. + Column data type — `UInt8`. - Can only be enabled when `ver` is used. - The row is deleted when use the `OPTIMIZE ... FINAL CLEANUP`, or `OPTIMIZE ... FINAL` if the engine settings `clean_deleted_rows` has been set to `Always`. - No matter the operation on the data, the version must be increased. If two inserted rows have the same version number, the last inserted one is the one kept. +:::note +`is_deleted` can only be enabled when `ver` is used. +The row is deleted when `OPTIMIZE ... FINAL CLEANUP` or `OPTIMIZE ... FINAL` is used, or if the engine setting `clean_deleted_rows` has been set to `Always`. +No matter the operation on the data, the version must be increased. If two inserted rows have the same version number, the last inserted row is the one kept. +::: ## Query clauses diff --git a/docs/en/operations/backup.md b/docs/en/operations/backup.md index a31a52f509e..6da61833c12 100644 --- a/docs/en/operations/backup.md +++ b/docs/en/operations/backup.md @@ -30,7 +30,7 @@ description: In order to effectively mitigate possible human errors, you should ``` :::note ALL -`ALL` is only applicable to the `RESTORE` command. +`ALL` is only applicable to the `RESTORE` command prior to version 23.4 of Clickhouse. ::: ## Background diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index 3e3cd89a9e0..e3ca04f5b9b 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -1045,7 +1045,7 @@ Default value: `0`. ## background_pool_size {#background_pool_size} -Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance. +Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance. Before changing it, please also take a look at related MergeTree settings, such as [number_of_free_entries_in_pool_to_lower_max_size_of_merge](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-lower-max-size-of-merge) and [number_of_free_entries_in_pool_to_execute_mutation](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-execute-mutation). @@ -1063,8 +1063,8 @@ Default value: 16. ## background_merges_mutations_concurrency_ratio {#background_merges_mutations_concurrency_ratio} -Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example if the ratio equals to 2 and -`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operation could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server. +Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example, if the ratio equals to 2 and +`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operations could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server. The same as for `background_pool_size` setting `background_merges_mutations_concurrency_ratio` could be applied from the `default` profile for backward compatibility. Possible values: @@ -1079,6 +1079,33 @@ Default value: 2. 3 ``` +## merges_mutations_memory_usage_soft_limit {#merges_mutations_memory_usage_soft_limit} + +Sets the limit on how much RAM is allowed to use for performing merge and mutation operations. +Zero means unlimited. +If ClickHouse reaches this limit, it won't schedule any new background merge or mutation operations but will continue to execute already scheduled tasks. + +Possible values: + +- Any positive integer. + +**Example** + +```xml +0 +``` + +## merges_mutations_memory_usage_to_ram_ratio {#merges_mutations_memory_usage_to_ram_ratio} + +The default `merges_mutations_memory_usage_soft_limit` value is calculated as `memory_amount * merges_mutations_memory_usage_to_ram_ratio`. + +Default value: `0.5`. + +**See also** + +- [max_memory_usage](../../operations/settings/query-complexity.md#settings_max_memory_usage) +- [merges_mutations_memory_usage_soft_limit](#merges_mutations_memory_usage_soft_limit) + ## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy} Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart. diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index f21dff9fbb7..0a4b9a6bd9d 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -1410,8 +1410,8 @@ and [enable_writes_to_query_cache](#enable-writes-to-query-cache) control in mor Possible values: -- 0 - Yes -- 1 - No +- 0 - Disabled +- 1 - Enabled Default value: `0`. diff --git a/docs/en/sql-reference/data-types/special-data-types/interval.md b/docs/en/sql-reference/data-types/special-data-types/interval.md index c89c2e78752..bedbcf0bd28 100644 --- a/docs/en/sql-reference/data-types/special-data-types/interval.md +++ b/docs/en/sql-reference/data-types/special-data-types/interval.md @@ -8,10 +8,6 @@ sidebar_label: Interval The family of data types representing time and date intervals. The resulting types of the [INTERVAL](../../../sql-reference/operators/index.md#operator-interval) operator. -:::note -`Interval` data type values can’t be stored in tables. -::: - Structure: - Time interval as an unsigned integer value. @@ -19,6 +15,9 @@ Structure: Supported interval types: +- `NANOSECOND` +- `MICROSECOND` +- `MILLISECOND` - `SECOND` - `MINUTE` - `HOUR` diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index df0abceb8c6..fb30f60a0b8 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -862,7 +862,8 @@ bool Client::processWithFuzzing(const String & full_query) const auto * tmp_pos = text_2.c_str(); const auto ast_3 = parseQuery(tmp_pos, tmp_pos + text_2.size(), false /* allow_multi_statements */); - const auto text_3 = ast_3->formatForErrorMessage(); + const auto text_3 = ast_3 ? ast_3->formatForErrorMessage() : ""; + if (text_3 != text_2) { fmt::print(stderr, "Found error: The query formatting is broken.\n"); @@ -877,7 +878,7 @@ bool Client::processWithFuzzing(const String & full_query) fmt::print(stderr, "Text-1 (AST-1 formatted):\n'{}'\n", query_to_execute); fmt::print(stderr, "AST-2 (Text-1 parsed):\n'{}'\n", ast_2->dumpTree()); fmt::print(stderr, "Text-2 (AST-2 formatted):\n'{}'\n", text_2); - fmt::print(stderr, "AST-3 (Text-2 parsed):\n'{}'\n", ast_3->dumpTree()); + fmt::print(stderr, "AST-3 (Text-2 parsed):\n'{}'\n", ast_3 ? ast_3->dumpTree() : ""); fmt::print(stderr, "Text-3 (AST-3 formatted):\n'{}'\n", text_3); fmt::print(stderr, "Text-3 must be equal to Text-2, but it is not.\n"); diff --git a/programs/keeper/CMakeLists.txt b/programs/keeper/CMakeLists.txt index 761335fb707..e5d56023f7b 100644 --- a/programs/keeper/CMakeLists.txt +++ b/programs/keeper/CMakeLists.txt @@ -114,7 +114,7 @@ if (BUILD_STANDALONE_KEEPER) clickhouse_add_executable(clickhouse-keeper ${CLICKHOUSE_KEEPER_STANDALONE_SOURCES}) # Remove some redundant dependencies - target_compile_definitions (clickhouse-keeper PRIVATE -DKEEPER_STANDALONE_BUILD) + target_compile_definitions (clickhouse-keeper PRIVATE -DCLICKHOUSE_PROGRAM_STANDALONE_BUILD) target_compile_definitions (clickhouse-keeper PUBLIC -DWITHOUT_TEXT_LOG) target_include_directories(clickhouse-keeper PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../src") # uses includes from src directory diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index 3853c955171..0eb3df58f30 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -57,7 +57,7 @@ int mainEntryClickHouseKeeper(int argc, char ** argv) } } -#ifdef KEEPER_STANDALONE_BUILD +#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD // Weak symbols don't work correctly on Darwin // so we have a stub implementation to avoid linker errors diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 3f415462178..48ce4d20f4e 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -130,6 +130,7 @@ namespace CurrentMetrics extern const Metric Revision; extern const Metric VersionInteger; extern const Metric MemoryTracking; + extern const Metric MergesMutationsMemoryTracking; extern const Metric MaxDDLEntryID; extern const Metric MaxPushedDDLEntryID; } @@ -1225,6 +1226,25 @@ try total_memory_tracker.setDescription("(total)"); total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); + size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit; + + size_t default_merges_mutations_server_memory_usage = static_cast(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio); + if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage) + { + merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage; + LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}" + " ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)", + formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit), + formatReadableSizeWithBinarySuffix(memory_amount), + server_settings_.merges_mutations_memory_usage_to_ram_ratio); + } + + LOG_INFO(log, "Merges and mutations memory limit is set to {}", + formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit)); + background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit); + background_memory_tracker.setDescription("(background)"); + background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking); + total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory); auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker(); @@ -1238,8 +1258,13 @@ try global_context->setMacros(std::make_unique(*config, "macros", log)); global_context->setExternalAuthenticatorsConfig(*config); - global_context->loadOrReloadDictionaries(*config); - global_context->loadOrReloadUserDefinedExecutableFunctions(*config); + if (global_context->isServerCompletelyStarted()) + { + /// It does not make sense to reload anything before server has started. + /// Moreover, it may break initialization order. + global_context->loadOrReloadDictionaries(*config); + global_context->loadOrReloadUserDefinedExecutableFunctions(*config); + } global_context->setRemoteHostFilter(*config); diff --git a/src/Analyzer/Passes/CNF.cpp b/src/Analyzer/Passes/CNF.cpp index f53d833c107..91e973c7573 100644 --- a/src/Analyzer/Passes/CNF.cpp +++ b/src/Analyzer/Passes/CNF.cpp @@ -162,14 +162,13 @@ private: class PushOrVisitor { public: - PushOrVisitor(ContextPtr context, size_t max_atoms_, size_t num_atoms_) + PushOrVisitor(ContextPtr context, size_t max_atoms_) : max_atoms(max_atoms_) - , num_atoms(num_atoms_) , and_resolver(FunctionFactory::instance().get("and", context)) , or_resolver(FunctionFactory::instance().get("or", context)) {} - bool visit(QueryTreeNodePtr & node) + bool visit(QueryTreeNodePtr & node, size_t num_atoms) { if (max_atoms && num_atoms > max_atoms) return false; @@ -187,7 +186,10 @@ public: { auto & arguments = function_node->getArguments().getNodes(); for (auto & argument : arguments) - visit(argument); + { + if (!visit(argument, num_atoms)) + return false; + } } if (name == "or") @@ -217,7 +219,7 @@ public: auto rhs = createFunctionNode(or_resolver, std::move(other_node), std::move(and_function_arguments[1])); node = createFunctionNode(and_resolver, std::move(lhs), std::move(rhs)); - visit(node); + return visit(node, num_atoms); } return true; @@ -225,7 +227,6 @@ public: private: size_t max_atoms; - size_t num_atoms; const FunctionOverloadResolverPtr and_resolver; const FunctionOverloadResolverPtr or_resolver; @@ -516,8 +517,8 @@ std::optional CNF::tryBuildCNF(const QueryTreeNodePtr & node, ContextPtr co visitor.visit(node_cloned, false); } - if (PushOrVisitor visitor(context, max_atoms, atom_count); - !visitor.visit(node_cloned)) + if (PushOrVisitor visitor(context, max_atoms); + !visitor.visit(node_cloned, atom_count)) return std::nullopt; CollectGroupsVisitor collect_visitor; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 76a67ade99c..b22c2546aef 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -544,6 +544,10 @@ if (TARGET ch_contrib::qpl) dbms_target_link_libraries(PUBLIC ch_contrib::qpl) endif () +if (TARGET ch_contrib::accel-config) + dbms_target_link_libraries(PUBLIC ch_contrib::accel-config) +endif () + target_link_libraries(clickhouse_common_io PUBLIC boost::context) dbms_target_link_libraries(PUBLIC boost::context) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index cfdd78fe788..82d68ca8185 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -53,6 +53,7 @@ M(QueryThread, "Number of query processing threads") \ M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \ M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \ + M(MergesMutationsMemoryTracking, "Total amount of memory (bytes) allocated by background tasks (merges and mutations).") \ M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \ M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \ M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \ diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index d8a83378e26..e9dc5649245 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -29,21 +29,14 @@ M(13, SIZES_OF_COLUMNS_IN_TUPLE_DOESNT_MATCH) \ M(15, DUPLICATE_COLUMN) \ M(16, NO_SUCH_COLUMN_IN_TABLE) \ - M(17, DELIMITER_IN_STRING_LITERAL_DOESNT_MATCH) \ - M(18, CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN) \ M(19, SIZE_OF_FIXED_STRING_DOESNT_MATCH) \ M(20, NUMBER_OF_COLUMNS_DOESNT_MATCH) \ - M(21, CANNOT_READ_ALL_DATA_FROM_TAB_SEPARATED_INPUT) \ - M(22, CANNOT_PARSE_ALL_VALUE_FROM_TAB_SEPARATED_INPUT) \ M(23, CANNOT_READ_FROM_ISTREAM) \ M(24, CANNOT_WRITE_TO_OSTREAM) \ M(25, CANNOT_PARSE_ESCAPE_SEQUENCE) \ M(26, CANNOT_PARSE_QUOTED_STRING) \ M(27, CANNOT_PARSE_INPUT_ASSERTION_FAILED) \ M(28, CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER) \ - M(29, CANNOT_PRINT_INTEGER) \ - M(30, CANNOT_READ_SIZE_OF_COMPRESSED_CHUNK) \ - M(31, CANNOT_READ_COMPRESSED_CHUNK) \ M(32, ATTEMPT_TO_READ_AFTER_EOF) \ M(33, CANNOT_READ_ALL_DATA) \ M(34, TOO_MANY_ARGUMENTS_FOR_FUNCTION) \ @@ -57,7 +50,6 @@ M(42, NUMBER_OF_ARGUMENTS_DOESNT_MATCH) \ M(43, ILLEGAL_TYPE_OF_ARGUMENT) \ M(44, ILLEGAL_COLUMN) \ - M(45, ILLEGAL_NUMBER_OF_RESULT_COLUMNS) \ M(46, UNKNOWN_FUNCTION) \ M(47, UNKNOWN_IDENTIFIER) \ M(48, NOT_IMPLEMENTED) \ @@ -66,20 +58,14 @@ M(51, EMPTY_LIST_OF_COLUMNS_QUERIED) \ M(52, COLUMN_QUERIED_MORE_THAN_ONCE) \ M(53, TYPE_MISMATCH) \ - M(54, STORAGE_DOESNT_ALLOW_PARAMETERS) \ M(55, STORAGE_REQUIRES_PARAMETER) \ M(56, UNKNOWN_STORAGE) \ M(57, TABLE_ALREADY_EXISTS) \ M(58, TABLE_METADATA_ALREADY_EXISTS) \ M(59, ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER) \ M(60, UNKNOWN_TABLE) \ - M(61, ONLY_FILTER_COLUMN_IN_BLOCK) \ M(62, SYNTAX_ERROR) \ M(63, UNKNOWN_AGGREGATE_FUNCTION) \ - M(64, CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT) \ - M(65, CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT) \ - M(66, NOT_A_COLUMN) \ - M(67, ILLEGAL_KEY_OF_AGGREGATION) \ M(68, CANNOT_GET_SIZE_OF_FIELD) \ M(69, ARGUMENT_OUT_OF_BOUND) \ M(70, CANNOT_CONVERT_TYPE) \ @@ -109,16 +95,11 @@ M(94, CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS) \ M(95, CANNOT_READ_FROM_SOCKET) \ M(96, CANNOT_WRITE_TO_SOCKET) \ - M(97, CANNOT_READ_ALL_DATA_FROM_CHUNKED_INPUT) \ - M(98, CANNOT_WRITE_TO_EMPTY_BLOCK_OUTPUT_STREAM) \ M(99, UNKNOWN_PACKET_FROM_CLIENT) \ M(100, UNKNOWN_PACKET_FROM_SERVER) \ M(101, UNEXPECTED_PACKET_FROM_CLIENT) \ M(102, UNEXPECTED_PACKET_FROM_SERVER) \ - M(103, RECEIVED_DATA_FOR_WRONG_QUERY_ID) \ M(104, TOO_SMALL_BUFFER_SIZE) \ - M(105, CANNOT_READ_HISTORY) \ - M(106, CANNOT_APPEND_HISTORY) \ M(107, FILE_DOESNT_EXIST) \ M(108, NO_DATA_TO_INSERT) \ M(109, CANNOT_BLOCK_SIGNAL) \ @@ -137,7 +118,6 @@ M(123, UNKNOWN_TYPE_OF_AST_NODE) \ M(124, INCORRECT_ELEMENT_OF_SET) \ M(125, INCORRECT_RESULT_OF_SCALAR_SUBQUERY) \ - M(126, CANNOT_GET_RETURN_TYPE) \ M(127, ILLEGAL_INDEX) \ M(128, TOO_LARGE_ARRAY_SIZE) \ M(129, FUNCTION_IS_SPECIAL) \ @@ -149,30 +129,17 @@ M(137, UNKNOWN_ELEMENT_IN_CONFIG) \ M(138, EXCESSIVE_ELEMENT_IN_CONFIG) \ M(139, NO_ELEMENTS_IN_CONFIG) \ - M(140, ALL_REQUESTED_COLUMNS_ARE_MISSING) \ M(141, SAMPLING_NOT_SUPPORTED) \ M(142, NOT_FOUND_NODE) \ - M(143, FOUND_MORE_THAN_ONE_NODE) \ - M(144, FIRST_DATE_IS_BIGGER_THAN_LAST_DATE) \ M(145, UNKNOWN_OVERFLOW_MODE) \ - M(146, QUERY_SECTION_DOESNT_MAKE_SENSE) \ - M(147, NOT_FOUND_FUNCTION_ELEMENT_FOR_AGGREGATE) \ - M(148, NOT_FOUND_RELATION_ELEMENT_FOR_CONDITION) \ - M(149, NOT_FOUND_RHS_ELEMENT_FOR_CONDITION) \ - M(150, EMPTY_LIST_OF_ATTRIBUTES_PASSED) \ - M(151, INDEX_OF_COLUMN_IN_SORT_CLAUSE_IS_OUT_OF_RANGE) \ M(152, UNKNOWN_DIRECTION_OF_SORTING) \ M(153, ILLEGAL_DIVISION) \ - M(154, AGGREGATE_FUNCTION_NOT_APPLICABLE) \ - M(155, UNKNOWN_RELATION) \ M(156, DICTIONARIES_WAS_NOT_LOADED) \ - M(157, ILLEGAL_OVERFLOW_MODE) \ M(158, TOO_MANY_ROWS) \ M(159, TIMEOUT_EXCEEDED) \ M(160, TOO_SLOW) \ M(161, TOO_MANY_COLUMNS) \ M(162, TOO_DEEP_SUBQUERIES) \ - M(163, TOO_DEEP_PIPELINE) \ M(164, READONLY) \ M(165, TOO_MANY_TEMPORARY_COLUMNS) \ M(166, TOO_MANY_TEMPORARY_NON_CONST_COLUMNS) \ @@ -183,20 +150,14 @@ M(172, CANNOT_CREATE_DIRECTORY) \ M(173, CANNOT_ALLOCATE_MEMORY) \ M(174, CYCLIC_ALIASES) \ - M(176, CHUNK_NOT_FOUND) \ - M(177, DUPLICATE_CHUNK_NAME) \ - M(178, MULTIPLE_ALIASES_FOR_EXPRESSION) \ M(179, MULTIPLE_EXPRESSIONS_FOR_ALIAS) \ M(180, THERE_IS_NO_PROFILE) \ M(181, ILLEGAL_FINAL) \ M(182, ILLEGAL_PREWHERE) \ M(183, UNEXPECTED_EXPRESSION) \ M(184, ILLEGAL_AGGREGATION) \ - M(185, UNSUPPORTED_MYISAM_BLOCK_TYPE) \ M(186, UNSUPPORTED_COLLATION_LOCALE) \ M(187, COLLATION_COMPARISON_FAILED) \ - M(188, UNKNOWN_ACTION) \ - M(189, TABLE_MUST_NOT_BE_CREATED_MANUALLY) \ M(190, SIZES_OF_ARRAYS_DONT_MATCH) \ M(191, SET_SIZE_LIMIT_EXCEEDED) \ M(192, UNKNOWN_USER) \ @@ -204,15 +165,12 @@ M(194, REQUIRED_PASSWORD) \ M(195, IP_ADDRESS_NOT_ALLOWED) \ M(196, UNKNOWN_ADDRESS_PATTERN_TYPE) \ - M(197, SERVER_REVISION_IS_TOO_OLD) \ M(198, DNS_ERROR) \ M(199, UNKNOWN_QUOTA) \ - M(200, QUOTA_DOESNT_ALLOW_KEYS) \ M(201, QUOTA_EXCEEDED) \ M(202, TOO_MANY_SIMULTANEOUS_QUERIES) \ M(203, NO_FREE_CONNECTION) \ M(204, CANNOT_FSYNC) \ - M(205, NESTED_TYPE_TOO_DEEP) \ M(206, ALIAS_REQUIRED) \ M(207, AMBIGUOUS_IDENTIFIER) \ M(208, EMPTY_NESTED_TABLE) \ @@ -229,7 +187,6 @@ M(219, DATABASE_NOT_EMPTY) \ M(220, DUPLICATE_INTERSERVER_IO_ENDPOINT) \ M(221, NO_SUCH_INTERSERVER_IO_ENDPOINT) \ - M(222, ADDING_REPLICA_TO_NON_EMPTY_TABLE) \ M(223, UNEXPECTED_AST_STRUCTURE) \ M(224, REPLICA_IS_ALREADY_ACTIVE) \ M(225, NO_ZOOKEEPER) \ @@ -253,9 +210,7 @@ M(243, NOT_ENOUGH_SPACE) \ M(244, UNEXPECTED_ZOOKEEPER_ERROR) \ M(246, CORRUPTED_DATA) \ - M(247, INCORRECT_MARK) \ M(248, INVALID_PARTITION_VALUE) \ - M(250, NOT_ENOUGH_BLOCK_NUMBERS) \ M(251, NO_SUCH_REPLICA) \ M(252, TOO_MANY_PARTS) \ M(253, REPLICA_ALREADY_EXISTS) \ @@ -271,8 +226,6 @@ M(264, INCOMPATIBLE_TYPE_OF_JOIN) \ M(265, NO_AVAILABLE_REPLICA) \ M(266, MISMATCH_REPLICAS_DATA_SOURCES) \ - M(267, STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS) \ - M(268, CPUID_ERROR) \ M(269, INFINITE_LOOP) \ M(270, CANNOT_COMPRESS) \ M(271, CANNOT_DECOMPRESS) \ @@ -295,9 +248,7 @@ M(290, LIMIT_EXCEEDED) \ M(291, DATABASE_ACCESS_DENIED) \ M(293, MONGODB_CANNOT_AUTHENTICATE) \ - M(294, INVALID_BLOCK_EXTRA_INFO) \ M(295, RECEIVED_EMPTY_DATA) \ - M(296, NO_REMOTE_SHARD_FOUND) \ M(297, SHARD_HAS_NO_CONNECTIONS) \ M(298, CANNOT_PIPE) \ M(299, CANNOT_FORK) \ @@ -311,13 +262,10 @@ M(307, TOO_MANY_BYTES) \ M(308, UNEXPECTED_NODE_IN_ZOOKEEPER) \ M(309, FUNCTION_CANNOT_HAVE_PARAMETERS) \ - M(317, INVALID_SHARD_WEIGHT) \ M(318, INVALID_CONFIG_PARAMETER) \ M(319, UNKNOWN_STATUS_OF_INSERT) \ M(321, VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE) \ - M(335, BARRIER_TIMEOUT) \ M(336, UNKNOWN_DATABASE_ENGINE) \ - M(337, DDL_GUARD_IS_ACTIVE) \ M(341, UNFINISHED) \ M(342, METADATA_MISMATCH) \ M(344, SUPPORT_IS_DISABLED) \ @@ -325,14 +273,10 @@ M(346, CANNOT_CONVERT_CHARSET) \ M(347, CANNOT_LOAD_CONFIG) \ M(349, CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN) \ - M(350, INCOMPATIBLE_SOURCE_TABLES) \ - M(351, AMBIGUOUS_TABLE_NAME) \ M(352, AMBIGUOUS_COLUMN_NAME) \ M(353, INDEX_OF_POSITIONAL_ARGUMENT_IS_OUT_OF_RANGE) \ M(354, ZLIB_INFLATE_FAILED) \ M(355, ZLIB_DEFLATE_FAILED) \ - M(356, BAD_LAMBDA) \ - M(357, RESERVED_IDENTIFIER_NAME) \ M(358, INTO_OUTFILE_NOT_ALLOWED) \ M(359, TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT) \ M(360, CANNOT_CREATE_CHARSET_CONVERTER) \ @@ -341,7 +285,6 @@ M(363, CANNOT_CREATE_IO_BUFFER) \ M(364, RECEIVED_ERROR_TOO_MANY_REQUESTS) \ M(366, SIZES_OF_NESTED_COLUMNS_ARE_INCONSISTENT) \ - M(367, TOO_MANY_FETCHES) \ M(369, ALL_REPLICAS_ARE_STALE) \ M(370, DATA_TYPE_CANNOT_BE_USED_IN_TABLES) \ M(371, INCONSISTENT_CLUSTER_DEFINITION) \ @@ -352,7 +295,6 @@ M(376, CANNOT_PARSE_UUID) \ M(377, ILLEGAL_SYNTAX_FOR_DATA_TYPE) \ M(378, DATA_TYPE_CANNOT_HAVE_ARGUMENTS) \ - M(379, UNKNOWN_STATUS_OF_DISTRIBUTED_DDL_TASK) \ M(380, CANNOT_KILL) \ M(381, HTTP_LENGTH_REQUIRED) \ M(382, CANNOT_LOAD_CATBOOST_MODEL) \ @@ -378,11 +320,9 @@ M(402, CANNOT_IOSETUP) \ M(403, INVALID_JOIN_ON_EXPRESSION) \ M(404, BAD_ODBC_CONNECTION_STRING) \ - M(405, PARTITION_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT) \ M(406, TOP_AND_LIMIT_TOGETHER) \ M(407, DECIMAL_OVERFLOW) \ M(408, BAD_REQUEST_PARAMETER) \ - M(409, EXTERNAL_EXECUTABLE_NOT_FOUND) \ M(410, EXTERNAL_SERVER_IS_NOT_RESPONDING) \ M(411, PTHREAD_ERROR) \ M(412, NETLINK_ERROR) \ @@ -399,7 +339,6 @@ M(424, CANNOT_LINK) \ M(425, SYSTEM_ERROR) \ M(427, CANNOT_COMPILE_REGEXP) \ - M(428, UNKNOWN_LOG_LEVEL) \ M(429, FAILED_TO_GETPWUID) \ M(430, MISMATCHING_USERS_FOR_PROCESS_AND_DATA) \ M(431, ILLEGAL_SYNTAX_FOR_CODEC_TYPE) \ @@ -433,7 +372,6 @@ M(459, CANNOT_SET_THREAD_PRIORITY) \ M(460, CANNOT_CREATE_TIMER) \ M(461, CANNOT_SET_TIMER_PERIOD) \ - M(462, CANNOT_DELETE_TIMER) \ M(463, CANNOT_FCNTL) \ M(464, CANNOT_PARSE_ELF) \ M(465, CANNOT_PARSE_DWARF) \ @@ -456,15 +394,12 @@ M(482, DICTIONARY_ACCESS_DENIED) \ M(483, TOO_MANY_REDIRECTS) \ M(484, INTERNAL_REDIS_ERROR) \ - M(485, SCALAR_ALREADY_EXISTS) \ M(487, CANNOT_GET_CREATE_DICTIONARY_QUERY) \ - M(488, UNKNOWN_DICTIONARY) \ M(489, INCORRECT_DICTIONARY_DEFINITION) \ M(490, CANNOT_FORMAT_DATETIME) \ M(491, UNACCEPTABLE_URL) \ M(492, ACCESS_ENTITY_NOT_FOUND) \ M(493, ACCESS_ENTITY_ALREADY_EXISTS) \ - M(494, ACCESS_ENTITY_FOUND_DUPLICATES) \ M(495, ACCESS_STORAGE_READONLY) \ M(496, QUOTA_REQUIRES_CLIENT_KEY) \ M(497, ACCESS_DENIED) \ @@ -475,8 +410,6 @@ M(502, CANNOT_SIGQUEUE) \ M(503, AGGREGATE_FUNCTION_THROW) \ M(504, FILE_ALREADY_EXISTS) \ - M(505, CANNOT_DELETE_DIRECTORY) \ - M(506, UNEXPECTED_ERROR_CODE) \ M(507, UNABLE_TO_SKIP_UNUSED_SHARDS) \ M(508, UNKNOWN_ACCESS_TYPE) \ M(509, INVALID_GRANT) \ @@ -501,8 +434,6 @@ M(530, CANNOT_CONNECT_RABBITMQ) \ M(531, CANNOT_FSTAT) \ M(532, LDAP_ERROR) \ - M(533, INCONSISTENT_RESERVATIONS) \ - M(534, NO_RESERVATIONS_PROVIDED) \ M(535, UNKNOWN_RAID_TYPE) \ M(536, CANNOT_RESTORE_FROM_FIELD_DUMP) \ M(537, ILLEGAL_MYSQL_VARIABLE) \ @@ -518,8 +449,6 @@ M(547, INVALID_RAID_TYPE) \ M(548, UNKNOWN_VOLUME) \ M(549, DATA_TYPE_CANNOT_BE_USED_IN_KEY) \ - M(550, CONDITIONAL_TREE_PARENT_NOT_FOUND) \ - M(551, ILLEGAL_PROJECTION_MANIPULATOR) \ M(552, UNRECOGNIZED_ARGUMENTS) \ M(553, LZMA_STREAM_ENCODER_FAILED) \ M(554, LZMA_STREAM_DECODER_FAILED) \ @@ -580,8 +509,6 @@ M(609, FUNCTION_ALREADY_EXISTS) \ M(610, CANNOT_DROP_FUNCTION) \ M(611, CANNOT_CREATE_RECURSIVE_FUNCTION) \ - M(612, OBJECT_ALREADY_STORED_ON_DISK) \ - M(613, OBJECT_WAS_NOT_STORED_ON_DISK) \ M(614, POSTGRESQL_CONNECTION_FAILURE) \ M(615, CANNOT_ADVISE) \ M(616, UNKNOWN_READ_METHOD) \ @@ -612,9 +539,7 @@ M(641, CANNOT_APPEND_TO_FILE) \ M(642, CANNOT_PACK_ARCHIVE) \ M(643, CANNOT_UNPACK_ARCHIVE) \ - M(644, REMOTE_FS_OBJECT_CACHE_ERROR) \ M(645, NUMBER_OF_DIMENSIONS_MISMATCHED) \ - M(646, CANNOT_BACKUP_DATABASE) \ M(647, CANNOT_BACKUP_TABLE) \ M(648, WRONG_DDL_RENAMING_SETTINGS) \ M(649, INVALID_TRANSACTION) \ diff --git a/src/Common/HashTable/ClearableHashSet.h b/src/Common/HashTable/ClearableHashSet.h index 4cbce1a5213..006d45df7cd 100644 --- a/src/Common/HashTable/ClearableHashSet.h +++ b/src/Common/HashTable/ClearableHashSet.h @@ -80,6 +80,8 @@ template < class ClearableHashSet : public HashTable>, Hash, Grower, Allocator> { + using Cell = ClearableHashTableCell>; + public: using Base = HashTable>, Hash, Grower, Allocator>; using typename Base::LookupResult; @@ -88,6 +90,13 @@ public: { ++this->version; this->m_size = 0; + + if constexpr (Cell::need_zero_value_storage) + { + /// clear ZeroValueStorage + if (this->hasZero()) + this->clearHasZero(); + } } }; @@ -103,11 +112,20 @@ class ClearableHashSetWithSavedHash : public HashTable< Grower, Allocator> { + using Cell = ClearableHashTableCell>; + public: void clear() { ++this->version; this->m_size = 0; + + if constexpr (Cell::need_zero_value_storage) + { + /// clear ZeroValueStorage + if (this->hasZero()) + this->clearHasZero(); + } } }; diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 674d8d469af..81cac2617c5 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -96,12 +96,17 @@ using namespace std::chrono_literals; static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; MemoryTracker total_memory_tracker(nullptr, VariableContext::Global); +MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false); std::atomic MemoryTracker::free_memory_in_allocator_arenas; MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {} MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {} - +MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_) + : parent(parent_) + , log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_) + , level(level_) +{} MemoryTracker::~MemoryTracker() { @@ -528,3 +533,10 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value) while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value)) ; } + +bool canEnqueueBackgroundTask() +{ + auto limit = background_memory_tracker.getSoftLimit(); + auto amount = background_memory_tracker.get(); + return limit == 0 || amount < limit; +} diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 0d7748856bd..4e29d40c953 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -98,6 +98,7 @@ public: explicit MemoryTracker(VariableContext level_ = VariableContext::Thread); explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread); + MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_); ~MemoryTracker(); @@ -110,6 +111,22 @@ public: return amount.load(std::memory_order_relaxed); } + // Merges and mutations may pass memory ownership to other threads thus in the end of execution + // MemoryTracker for background task may have a non-zero counter. + // This method is intended to fix the counter inside of background_memory_tracker. + // NOTE: We can't use alloc/free methods to do it, because they also will change the value inside + // of total_memory_tracker. + void adjustOnBackgroundTaskEnd(const MemoryTracker * child) + { + auto background_memory_consumption = child->amount.load(std::memory_order_relaxed); + amount.fetch_sub(background_memory_consumption, std::memory_order_relaxed); + + // Also fix CurrentMetrics::MergesMutationsMemoryTracking + auto metric_loaded = metric.load(std::memory_order_relaxed); + if (metric_loaded != CurrentMetrics::end()) + CurrentMetrics::sub(metric_loaded, background_memory_consumption); + } + Int64 getPeak() const { return peak.load(std::memory_order_relaxed); @@ -220,3 +237,6 @@ public: }; extern MemoryTracker total_memory_tracker; +extern MemoryTracker background_memory_tracker; + +bool canEnqueueBackgroundTask(); diff --git a/src/Compression/CompressionCodecT64.cpp b/src/Compression/CompressionCodecT64.cpp index d5bace020be..3506c087b54 100644 --- a/src/Compression/CompressionCodecT64.cpp +++ b/src/Compression/CompressionCodecT64.cpp @@ -378,6 +378,13 @@ void transpose(const T * src, char * dst, UInt32 num_bits, UInt32 tail = 64) /// UInt64[N] transposed matrix -> UIntX[64] template +#if defined(__s390x__) + +/* Compiler Bug for S390x :- https://github.com/llvm/llvm-project/issues/62572 + * Please remove this after the fix is backported + */ + __attribute__((noinline)) +#endif void reverseTranspose(const char * src, T * buf, UInt32 num_bits, UInt32 tail = 64) { UInt64 matrix[64] = {}; diff --git a/src/Compression/CompressionFactory.cpp b/src/Compression/CompressionFactory.cpp index 514ae4eee3c..61f4e3a988c 100644 --- a/src/Compression/CompressionFactory.cpp +++ b/src/Compression/CompressionFactory.cpp @@ -172,7 +172,7 @@ void registerCodecDeflateQpl(CompressionCodecFactory & factory); /// Keeper use only general-purpose codecs, so we don't need these special codecs /// in standalone build -#ifndef KEEPER_STANDALONE_BUILD +#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD void registerCodecDelta(CompressionCodecFactory & factory); void registerCodecT64(CompressionCodecFactory & factory); void registerCodecDoubleDelta(CompressionCodecFactory & factory); @@ -188,7 +188,7 @@ CompressionCodecFactory::CompressionCodecFactory() registerCodecZSTD(*this); registerCodecLZ4HC(*this); registerCodecMultiple(*this); -#ifndef KEEPER_STANDALONE_BUILD +#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD registerCodecDelta(*this); registerCodecT64(*this); registerCodecDoubleDelta(*this); diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index 1ddc2ad0fa7..ee3482414af 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -42,6 +42,8 @@ namespace DB M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \ M(UInt64, max_server_memory_usage, 0, "Limit on total memory usage. Zero means Unlimited.", 0) \ M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to ram ratio. Allows to lower max memory on low-memory systems.", 0) \ + M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Limit on total memory usage for merges and mutations. Zero means Unlimited.", 0) \ + M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to ram ratio. Allows to lower memory limit on low-memory systems.", 0) \ M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \ \ M(UInt64, max_concurrent_queries, 0, "Limit on total number of concurrently executed queries. Zero means Unlimited.", 0) \ diff --git a/src/Core/SettingsFields.cpp b/src/Core/SettingsFields.cpp index 06cd53013ec..8cd4efb68c6 100644 --- a/src/Core/SettingsFields.cpp +++ b/src/Core/SettingsFields.cpp @@ -338,7 +338,7 @@ void SettingFieldString::readBinary(ReadBuffer & in) /// that. The linker does not complain only because clickhouse-keeper does not call any of below /// functions. A cleaner alternative would be more modular libraries, e.g. one for data types, which /// could then be linked by the server and the linker. -#ifndef KEEPER_STANDALONE_BUILD +#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD SettingFieldMap::SettingFieldMap(const Field & f) : value(fieldToMap(f)) {} diff --git a/src/Daemon/SentryWriter.cpp b/src/Daemon/SentryWriter.cpp index 3c62e54b117..041d3292841 100644 --- a/src/Daemon/SentryWriter.cpp +++ b/src/Daemon/SentryWriter.cpp @@ -18,7 +18,7 @@ #include "config.h" #include "config_version.h" -#if USE_SENTRY && !defined(KEEPER_STANDALONE_BUILD) +#if USE_SENTRY && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD) # include # include diff --git a/src/DataTypes/DataTypeInterval.h b/src/DataTypes/DataTypeInterval.h index 83d89a73460..05abe1d9b24 100644 --- a/src/DataTypes/DataTypeInterval.h +++ b/src/DataTypes/DataTypeInterval.h @@ -11,9 +11,6 @@ namespace DB * * Mostly the same as Int64. * But also tagged with interval kind. - * - * Intended usage is for temporary elements in expressions, - * not for storing values in tables. */ class DataTypeInterval final : public DataTypeNumberBase { @@ -34,7 +31,6 @@ public: bool equals(const IDataType & rhs) const override; bool isParametric() const override { return true; } - bool cannotBeStoredInTables() const override { return true; } bool isCategorial() const override { return false; } bool canBeInsideNullable() const override { return true; } }; diff --git a/src/Databases/DDLDependencyVisitor.cpp b/src/Databases/DDLDependencyVisitor.cpp index 26f912f6482..cb85119e3b0 100644 --- a/src/Databases/DDLDependencyVisitor.cpp +++ b/src/Databases/DDLDependencyVisitor.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -12,6 +13,8 @@ #include #include #include +#include +#include #include #include @@ -25,6 +28,7 @@ namespace /// Used to visits ASTCreateQuery and extracts the names of all tables explicitly referenced in the create query. class DDLDependencyVisitorData { + friend void tryVisitNestedSelect(const String & query, DDLDependencyVisitorData & data); public: DDLDependencyVisitorData(const ContextPtr & context_, const QualifiedTableName & table_name_, const ASTPtr & ast_) : create_query(ast_), table_name(table_name_), current_database(context_->getCurrentDatabase()), context(context_) @@ -106,9 +110,17 @@ namespace if (!info || !info->is_local) return; - if (info->table_name.database.empty()) - info->table_name.database = current_database; - dependencies.emplace(std::move(info->table_name)); + if (!info->table_name.table.empty()) + { + if (info->table_name.database.empty()) + info->table_name.database = current_database; + dependencies.emplace(std::move(info->table_name)); + } + else + { + /// We don't have a table name, we have a select query instead + tryVisitNestedSelect(info->query, *this); + } } /// ASTTableExpression represents a reference to a table in SELECT query. @@ -424,6 +436,25 @@ namespace static bool needChildVisit(const ASTPtr &, const ASTPtr & child, const Data & data) { return data.needChildVisit(child); } static void visit(const ASTPtr & ast, Data & data) { data.visit(ast); } }; + + void tryVisitNestedSelect(const String & query, DDLDependencyVisitorData & data) + { + try + { + ParserSelectWithUnionQuery parser; + String description = fmt::format("Query for ClickHouse dictionary {}", data.table_name); + String fixed_query = removeWhereConditionPlaceholder(query); + ASTPtr select = parseQuery(parser, fixed_query, description, + data.context->getSettingsRef().max_query_size, data.context->getSettingsRef().max_parser_depth); + + DDLDependencyVisitor::Visitor visitor{data}; + visitor.visit(select); + } + catch (...) + { + tryLogCurrentException("DDLDependencyVisitor"); + } + } } diff --git a/src/Databases/DDLLoadingDependencyVisitor.cpp b/src/Databases/DDLLoadingDependencyVisitor.cpp index 22dc0b4af7b..99538fd801e 100644 --- a/src/Databases/DDLLoadingDependencyVisitor.cpp +++ b/src/Databases/DDLLoadingDependencyVisitor.cpp @@ -103,7 +103,7 @@ void DDLLoadingDependencyVisitor::visit(const ASTFunctionWithKeyValueArguments & auto config = getDictionaryConfigurationFromAST(data.create_query->as(), data.global_context); auto info = getInfoIfClickHouseDictionarySource(config, data.global_context); - if (!info || !info->is_local) + if (!info || !info->is_local || info->table_name.table.empty()) return; if (info->table_name.database.empty()) diff --git a/src/Databases/DDLRenamingVisitor.cpp b/src/Databases/DDLRenamingVisitor.cpp index 9651861fd4f..6cd414635a0 100644 --- a/src/Databases/DDLRenamingVisitor.cpp +++ b/src/Databases/DDLRenamingVisitor.cpp @@ -137,7 +137,7 @@ namespace auto config = getDictionaryConfigurationFromAST(data.create_query->as(), data.global_context); auto info = getInfoIfClickHouseDictionarySource(config, data.global_context); - if (!info || !info->is_local) + if (!info || !info->is_local || info->table_name.table.empty()) return; auto * source_list = dictionary.source->elements->as(); diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 0c12b78738d..2827ec0ce77 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -726,7 +726,7 @@ static UUID getTableUUIDIfReplicated(const String & metadata, ContextPtr context return create.uuid; } -void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 max_log_ptr) +void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 & max_log_ptr) { is_recovering = true; SCOPE_EXIT({ is_recovering = false; }); diff --git a/src/Databases/DatabaseReplicated.h b/src/Databases/DatabaseReplicated.h index b3397a832f2..1da181de030 100644 --- a/src/Databases/DatabaseReplicated.h +++ b/src/Databases/DatabaseReplicated.h @@ -102,7 +102,7 @@ private: void checkQueryValid(const ASTPtr & query, ContextPtr query_context) const; - void recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 max_log_ptr); + void recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 & max_log_ptr); std::map tryGetConsistentMetadataSnapshot(const ZooKeeperPtr & zookeeper, UInt32 & max_log_ptr); ASTPtr parseQueryFromMetadataInZooKeeper(const String & node_name, const String & query); diff --git a/src/Databases/removeWhereConditionPlaceholder.cpp b/src/Databases/removeWhereConditionPlaceholder.cpp new file mode 100644 index 00000000000..fb147be26d0 --- /dev/null +++ b/src/Databases/removeWhereConditionPlaceholder.cpp @@ -0,0 +1,20 @@ +#include + +namespace DB +{ + +std::string removeWhereConditionPlaceholder(const std::string & query) +{ + static constexpr auto true_condition = "(1 = 1)"; + auto condition_position = query.find(CONDITION_PLACEHOLDER_TO_REPLACE_VALUE); + if (condition_position != std::string::npos) + { + auto query_copy = query; + query_copy.replace(condition_position, CONDITION_PLACEHOLDER_TO_REPLACE_VALUE.size(), true_condition); + return query_copy; + } + + return query; +} + +} diff --git a/src/Databases/removeWhereConditionPlaceholder.h b/src/Databases/removeWhereConditionPlaceholder.h new file mode 100644 index 00000000000..7f0e23d42cf --- /dev/null +++ b/src/Databases/removeWhereConditionPlaceholder.h @@ -0,0 +1,15 @@ +#pragma once +#include + +namespace DB +{ + +static constexpr std::string_view CONDITION_PLACEHOLDER_TO_REPLACE_VALUE = "{condition}"; + +/** In case UPDATE_FIELD is specified in {condition} for dictionary that must load all data. + * Replace {condition} with true_condition for initial dictionary load. + * For next dictionary loads {condition} will be updated with UPDATE_FIELD. + */ +std::string removeWhereConditionPlaceholder(const std::string & query); + +} diff --git a/src/Dictionaries/ExternalQueryBuilder.cpp b/src/Dictionaries/ExternalQueryBuilder.cpp index 19dabe92d64..e21b0842e11 100644 --- a/src/Dictionaries/ExternalQueryBuilder.cpp +++ b/src/Dictionaries/ExternalQueryBuilder.cpp @@ -6,7 +6,7 @@ #include #include #include - +#include namespace DB { @@ -24,7 +24,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -static constexpr std::string_view CONDITION_PLACEHOLDER_TO_REPLACE_VALUE = "{condition}"; ExternalQueryBuilder::ExternalQueryBuilder( const DictionaryStructure & dict_struct_, @@ -82,23 +81,8 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const writeChar(';', out); return out.str(); } - else - { - /** In case UPDATE_FIELD is specified in {condition} for dictionary that must load all data. - * Replace {condition} with true_condition for initial dictionary load. - * For next dictionary loads {condition} will be updated with UPDATE_FIELD. - */ - static constexpr auto true_condition = "(1 = 1)"; - auto condition_position = query.find(CONDITION_PLACEHOLDER_TO_REPLACE_VALUE); - if (condition_position != std::string::npos) - { - auto query_copy = query; - query_copy.replace(condition_position, CONDITION_PLACEHOLDER_TO_REPLACE_VALUE.size(), true_condition); - return query_copy; - } - return query; - } + return removeWhereConditionPlaceholder(query); } void ExternalQueryBuilder::composeLoadAllQuery(WriteBuffer & out) const diff --git a/src/Dictionaries/getDictionaryConfigurationFromAST.cpp b/src/Dictionaries/getDictionaryConfigurationFromAST.cpp index 0de8b843604..05065df5251 100644 --- a/src/Dictionaries/getDictionaryConfigurationFromAST.cpp +++ b/src/Dictionaries/getDictionaryConfigurationFromAST.cpp @@ -649,10 +649,12 @@ getInfoIfClickHouseDictionarySource(DictionaryConfigurationPtr & config, Context String database = config->getString("dictionary.source.clickhouse.db", ""); String table = config->getString("dictionary.source.clickhouse.table", ""); - if (table.empty()) - return {}; + info.query = config->getString("dictionary.source.clickhouse.query", ""); - info.table_name = {database, table}; + if (!table.empty()) + info.table_name = {database, table}; + else if (info.query.empty()) + return {}; try { diff --git a/src/Dictionaries/getDictionaryConfigurationFromAST.h b/src/Dictionaries/getDictionaryConfigurationFromAST.h index ec44b9815ff..aa42835d33d 100644 --- a/src/Dictionaries/getDictionaryConfigurationFromAST.h +++ b/src/Dictionaries/getDictionaryConfigurationFromAST.h @@ -18,6 +18,7 @@ getDictionaryConfigurationFromAST(const ASTCreateQuery & query, ContextPtr conte struct ClickHouseDictionarySourceInfo { QualifiedTableName table_name; + String query; bool is_local = false; }; diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 998b9ce0959..bf9a476b785 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -47,14 +47,14 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr impl_, - size_t min_bytes_for_seek_) + std::shared_ptr async_read_counters_, + std::shared_ptr prefetches_log_) : ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0) , read_settings(settings_) , reader(reader_) , base_priority(settings_.priority) , impl(impl_) , prefetch_buffer(settings_.prefetch_buffer_size) - , min_bytes_for_seek(min_bytes_for_seek_) , query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "") , current_reader_id(getRandomASCIIString(8)) @@ -63,6 +63,8 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe #else , log(&Poco::Logger::get("AsyncBuffer(" + impl->getFileName() + ")")) #endif + , async_read_counters(async_read_counters_) + , prefetches_log(prefetches_log_) { ProfileEvents::increment(ProfileEvents::RemoteFSBuffers); } @@ -111,7 +113,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead() std::future AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size, int64_t priority) { IAsynchronousReader::Request request; - request.descriptor = std::make_shared(*impl); + request.descriptor = std::make_shared(*impl, async_read_counters); request.buf = data; request.size = size; request.offset = file_offset_of_buffer_end; @@ -186,8 +188,8 @@ void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemP .reader_id = current_reader_id, }; - if (auto prefetch_log = Context::getGlobalContextInstance()->getFilesystemReadPrefetchesLog()) - prefetch_log->add(elem); + if (prefetches_log) + prefetches_log->add(elem); } @@ -335,7 +337,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence) if (impl->initialized() && read_until_position && new_pos < *read_until_position && new_pos > file_offset_of_buffer_end - && new_pos < file_offset_of_buffer_end + min_bytes_for_seek) + && new_pos < file_offset_of_buffer_end + read_settings.remote_read_min_bytes_for_seek) { ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks); bytes_to_ignore = new_pos - file_offset_of_buffer_end; diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h index 8cb0e2826b4..49b44916a46 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h @@ -12,6 +12,7 @@ namespace Poco { class Logger; } namespace DB { +struct AsyncReadCounters; class ReadBufferFromRemoteFSGather; /** @@ -34,7 +35,8 @@ public: explicit AsynchronousReadIndirectBufferFromRemoteFS( IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr impl_, - size_t min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE); + std::shared_ptr async_read_counters_, + std::shared_ptr prefetches_log_); ~AsynchronousReadIndirectBufferFromRemoteFS() override; @@ -83,8 +85,6 @@ private: Memory<> prefetch_buffer; - size_t min_bytes_for_seek; - std::string query_id; std::string current_reader_id; @@ -95,6 +95,9 @@ private: Poco::Logger * log; + std::shared_ptr async_read_counters; + std::shared_ptr prefetches_log; + struct LastPrefetchInfo { UInt64 submit_time = 0; diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp index 7847cac6058..00d23183f6a 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp @@ -48,7 +48,8 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile( size_t file_size_, bool allow_seeks_after_first_read_, bool use_external_buffer_, - std::optional read_until_position_) + std::optional read_until_position_, + std::shared_ptr cache_log_) : ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_) #ifndef NDEBUG , log(&Poco::Logger::get("CachedOnDiskReadBufferFromFile(" + source_file_path_ + ")")) @@ -62,12 +63,12 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile( , read_until_position(read_until_position_ ? *read_until_position_ : file_size_) , implementation_buffer_creator(implementation_buffer_creator_) , query_id(query_id_) - , enable_logging(!query_id.empty() && settings_.enable_filesystem_cache_log) , current_buffer_id(getRandomASCIIString(8)) , allow_seeks_after_first_read(allow_seeks_after_first_read_) , use_external_buffer(use_external_buffer_) , query_context_holder(cache_->getQueryContextHolder(query_id, settings_)) , is_persistent(settings_.is_file_cache_persistent) + , cache_log(cache_log_) { } @@ -103,7 +104,7 @@ void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog( break; } - if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog()) + if (cache_log) cache_log->add(elem); } @@ -487,7 +488,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext() auto * current_file_segment = &file_segments->front(); auto completed_range = current_file_segment->range(); - if (enable_logging) + if (cache_log) appendFilesystemCacheLog(completed_range, read_type); chassert(file_offset_of_buffer_end > completed_range.right); @@ -512,7 +513,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext() CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile() { - if (enable_logging && file_segments && !file_segments->empty()) + if (cache_log && file_segments && !file_segments->empty()) { appendFilesystemCacheLog(file_segments->front().range(), read_type); } diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.h b/src/Disks/IO/CachedOnDiskReadBufferFromFile.h index 9738c997d7a..8faf23ad343 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.h +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.h @@ -32,7 +32,8 @@ public: size_t file_size_, bool allow_seeks_after_first_read_, bool use_external_buffer_, - std::optional read_until_position_ = std::nullopt); + std::optional read_until_position_, + std::shared_ptr cache_log_); ~CachedOnDiskReadBufferFromFile() override; @@ -137,7 +138,6 @@ private: String last_caller_id; String query_id; - bool enable_logging = false; String current_buffer_id; bool allow_seeks_after_first_read; @@ -148,6 +148,8 @@ private: FileCache::QueryContextHolderPtr query_context_holder; bool is_persistent; + + std::shared_ptr cache_log; }; } diff --git a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp index d72dcecb484..af2226ea6ca 100644 --- a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp @@ -153,27 +153,27 @@ FileSegment & FileSegmentRangeWriter::allocateFileSegment(size_t offset, FileSeg void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_segment) { - if (cache_log) + if (!cache_log) + return; + + auto file_segment_range = file_segment.range(); + size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize(false) - 1; + + FilesystemCacheLogElement elem { - auto file_segment_range = file_segment.range(); - size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize(false) - 1; + .event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()), + .query_id = query_id, + .source_file_path = source_path, + .file_segment_range = { file_segment_range.left, file_segment_right_bound }, + .requested_range = {}, + .cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE, + .file_segment_size = file_segment_range.size(), + .read_from_cache_attempted = false, + .read_buffer_id = {}, + .profile_counters = nullptr, + }; - FilesystemCacheLogElement elem - { - .event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()), - .query_id = query_id, - .source_file_path = source_path, - .file_segment_range = { file_segment_range.left, file_segment_right_bound }, - .requested_range = {}, - .cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE, - .file_segment_size = file_segment_range.size(), - .read_from_cache_attempted = false, - .read_buffer_id = {}, - .profile_counters = nullptr, - }; - - cache_log->add(elem); - } + cache_log->add(elem); } void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment) diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 24b3840e3d5..68b5a9c9d96 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -8,7 +8,6 @@ #include #include #include -#include namespace DB @@ -17,15 +16,18 @@ namespace DB ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather( ReadBufferCreator && read_buffer_creator_, const StoredObjects & blobs_to_read_, - const ReadSettings & settings_) + const ReadSettings & settings_, + std::shared_ptr cache_log_) : ReadBuffer(nullptr, 0) , read_buffer_creator(std::move(read_buffer_creator_)) , blobs_to_read(blobs_to_read_) , settings(settings_) , query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "") , log(&Poco::Logger::get("ReadBufferFromRemoteFSGather")) - , enable_cache_log(!query_id.empty() && settings.enable_filesystem_cache_log) { + if (cache_log_ && settings.enable_filesystem_cache_log) + cache_log = cache_log_; + if (!blobs_to_read.empty()) current_object = blobs_to_read.front(); @@ -36,7 +38,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather( SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object) { - if (current_buf != nullptr && !with_cache && enable_cache_log) + if (current_buf != nullptr && !with_cache) { appendFilesystemCacheLog(); } @@ -61,7 +63,8 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c object.bytes_size, /* allow_seeks */false, /* use_external_buffer */true, - read_until_position ? std::optional(read_until_position) : std::nullopt); + read_until_position ? std::optional(read_until_position) : std::nullopt, + cache_log); } return current_read_buffer_creator(); @@ -69,7 +72,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog() { - if (current_object.remote_path.empty()) + if (!cache_log || current_object.remote_path.empty()) return; FilesystemCacheLogElement elem @@ -82,9 +85,7 @@ void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog() .file_segment_size = total_bytes_read_from_current_file, .read_from_cache_attempted = false, }; - - if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog()) - cache_log->add(elem); + cache_log->add(elem); } IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore) @@ -267,10 +268,8 @@ size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const ReadBufferFromRemoteFSGather::~ReadBufferFromRemoteFSGather() { - if (!with_cache && enable_cache_log) - { + if (!with_cache) appendFilesystemCacheLog(); - } } } diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.h b/src/Disks/IO/ReadBufferFromRemoteFSGather.h index d8e55b648fa..8c55f747e5b 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.h @@ -25,7 +25,8 @@ public: ReadBufferFromRemoteFSGather( ReadBufferCreator && read_buffer_creator_, const StoredObjects & blobs_to_read_, - const ReadSettings & settings_); + const ReadSettings & settings_, + std::shared_ptr cache_log_); ~ReadBufferFromRemoteFSGather() override; @@ -93,7 +94,7 @@ private: size_t total_bytes_read_from_current_file = 0; - bool enable_cache_log = false; + std::shared_ptr cache_log; }; } diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index 4d0f39357ab..b7e598e2a87 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include @@ -75,17 +74,11 @@ std::future ThreadPoolRemoteFSReader::submit(Reques return scheduleFromThreadPool([request]() -> Result { CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead}; - - std::optional increment; - if (CurrentThread::isInitialized()) - { - auto query_context = CurrentThread::get().getQueryContext(); - if (query_context) - increment.emplace(query_context->getAsyncReadCounters()); - } - auto * remote_fs_fd = assert_cast(request.descriptor.get()); + auto async_read_counters = remote_fs_fd->getReadCounters(); + std::optional increment = async_read_counters ? std::optional(async_read_counters) : std::nullopt; + auto watch = std::make_unique(CLOCK_MONOTONIC); Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore); watch->stop(); diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.h b/src/Disks/IO/ThreadPoolRemoteFSReader.h index 3a765993292..506d77a64ef 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.h +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.h @@ -8,6 +8,8 @@ namespace DB { +struct AsyncReadCounters; + class ThreadPoolRemoteFSReader : public IAsynchronousReader { public: @@ -24,12 +26,19 @@ private: class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor { public: - explicit RemoteFSFileDescriptor(ReadBuffer & reader_) : reader(reader_) { } + explicit RemoteFSFileDescriptor( + ReadBuffer & reader_, + std::shared_ptr async_read_counters_) + : reader(reader_) + , async_read_counters(async_read_counters_) {} IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore = 0); + std::shared_ptr getReadCounters() const { return async_read_counters; } + private: ReadBuffer & reader; + std::shared_ptr async_read_counters; }; } diff --git a/src/Disks/IO/createReadBufferFromFileBase.cpp b/src/Disks/IO/createReadBufferFromFileBase.cpp index 8e9a1d86628..7de26b6c333 100644 --- a/src/Disks/IO/createReadBufferFromFileBase.cpp +++ b/src/Disks/IO/createReadBufferFromFileBase.cpp @@ -5,7 +5,9 @@ #include #include #include +#include #include +#include #include #include "config.h" @@ -27,7 +29,6 @@ namespace ErrorCodes extern const int UNSUPPORTED_METHOD; } - std::unique_ptr createReadBufferFromFileBase( const std::string & filename, const ReadSettings & settings, @@ -119,11 +120,7 @@ std::unique_ptr createReadBufferFromFileBase( } else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async) { - auto context = Context::getGlobalContextInstance(); - if (!context) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized"); - - auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER); + auto & reader = getThreadPoolReader(FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER); res = std::make_unique( reader, settings.priority, @@ -137,11 +134,7 @@ std::unique_ptr createReadBufferFromFileBase( } else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool) { - auto context = Context::getGlobalContextInstance(); - if (!context) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized"); - - auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER); + auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER); res = std::make_unique( reader, settings.priority, diff --git a/src/Disks/IO/getThreadPoolReader.cpp b/src/Disks/IO/getThreadPoolReader.cpp new file mode 100644 index 00000000000..deb8f66106c --- /dev/null +++ b/src/Disks/IO/getThreadPoolReader.cpp @@ -0,0 +1,76 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD +#include +#endif + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) +{ +#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD + const auto & config = Poco::Util::Application::instance().config(); + switch (type) + { + case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER: + { + static auto asynchronous_remote_fs_reader = createThreadPoolReader(type, config); + return *asynchronous_remote_fs_reader; + } + case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER: + { + static auto asynchronous_local_fs_reader = createThreadPoolReader(type, config); + return *asynchronous_local_fs_reader; + } + case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER: + { + static auto synchronous_local_fs_reader = createThreadPoolReader(type, config); + return *synchronous_local_fs_reader; + } + } +#else + auto context = Context::getGlobalContextInstance(); + if (!context) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized"); + return context->getThreadPoolReader(type); +#endif +} + +std::unique_ptr createThreadPoolReader( + FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config) +{ + switch (type) + { + case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER: + { + auto pool_size = config.getUInt(".threadpool_remote_fs_reader_pool_size", 250); + auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000); + return std::make_unique(pool_size, queue_size); + } + case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER: + { + auto pool_size = config.getUInt(".threadpool_local_fs_reader_pool_size", 100); + auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000); + return std::make_unique(pool_size, queue_size); + } + case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER: + { + return std::make_unique(); + } + } +} + +} diff --git a/src/Disks/IO/getThreadPoolReader.h b/src/Disks/IO/getThreadPoolReader.h new file mode 100644 index 00000000000..243e0e5478f --- /dev/null +++ b/src/Disks/IO/getThreadPoolReader.h @@ -0,0 +1,23 @@ +#pragma once + +namespace Poco::Util { class AbstractConfiguration; } + +namespace DB +{ + +class IAsynchronousReader; + +enum class FilesystemReaderType +{ + SYNCHRONOUS_LOCAL_FS_READER, + ASYNCHRONOUS_LOCAL_FS_READER, + ASYNCHRONOUS_REMOTE_FS_READER, +}; + +IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type); + +std::unique_ptr createThreadPoolReader( + FilesystemReaderType type, + const Poco::Util::AbstractConfiguration & config); + +} diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp index 62b72dc82d5..9db5d13a7f8 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp @@ -10,6 +10,7 @@ #include #include +#include #include @@ -86,6 +87,7 @@ std::unique_ptr AzureObjectStorage::readObjects( /// NOL { ReadSettings disk_read_settings = patchSettings(read_settings); auto settings_ptr = settings.get(); + auto global_context = Context::getGlobalContextInstance(); auto read_buffer_creator = [this, settings_ptr, disk_read_settings] @@ -104,12 +106,16 @@ std::unique_ptr AzureObjectStorage::readObjects( /// NOL auto reader_impl = std::make_unique( std::move(read_buffer_creator), objects, - disk_read_settings); + disk_read_settings, + global_context->getFilesystemCacheLog()); if (disk_read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) { - auto & reader = getThreadPoolReader(); - return std::make_unique(reader, disk_read_settings, std::move(reader_impl)); + auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); + return std::make_unique( + reader, disk_read_settings, std::move(reader_impl), + global_context->getAsyncReadCounters(), + global_context->getFilesystemReadPrefetchesLog()); } else { diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp index 2448d2d1101..a26969c6175 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp index ccba60153f2..e50e410823d 100644 --- a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp +++ b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp @@ -74,7 +74,7 @@ std::unique_ptr HDFSObjectStorage::readObjects( /// NOLI hdfs_uri, hdfs_path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true); }; - auto hdfs_impl = std::make_unique(std::move(read_buffer_creator), objects, disk_read_settings); + auto hdfs_impl = std::make_unique(std::move(read_buffer_creator), objects, disk_read_settings, nullptr); auto buf = std::make_unique(std::move(hdfs_impl), read_settings); return std::make_unique(std::move(buf), settings->min_bytes_for_seek); } diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index f741c96006c..52e8b1a465d 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -26,15 +26,6 @@ void IObjectStorage::getDirectoryContents(const std::string &, throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getDirectoryContents() is not supported"); } -IAsynchronousReader & IObjectStorage::getThreadPoolReader() -{ - auto context = Context::getGlobalContextInstance(); - if (!context) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized"); - - return context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); -} - ThreadPool & IObjectStorage::getThreadPoolWriter() { auto context = Context::getGlobalContextInstance(); diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 5c15c5210e1..f4f1b063ade 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -157,8 +157,6 @@ public: virtual const std::string & getCacheName() const; - static IAsynchronousReader & getThreadPoolReader(); - static ThreadPool & getThreadPoolWriter(); virtual void shutdown() = 0; diff --git a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp index f2b892b96eb..8cf0b27a517 100644 --- a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp @@ -51,6 +51,7 @@ std::unique_ptr LocalObjectStorage::readObjects( /// NOL std::optional file_size) const { auto modified_settings = patchSettings(read_settings); + auto global_context = Context::getGlobalContextInstance(); auto read_buffer_creator = [=] (const std::string & file_path, size_t /* read_until_position */) -> std::unique_ptr @@ -59,14 +60,18 @@ std::unique_ptr LocalObjectStorage::readObjects( /// NOL }; auto impl = std::make_unique( - std::move(read_buffer_creator), objects, modified_settings); + std::move(read_buffer_creator), objects, modified_settings, + global_context->getFilesystemCacheLog()); /// We use `remove_fs_method` (not `local_fs_method`) because we are about to use /// AsynchronousReadIndirectBufferFromRemoteFS which works by the remote_fs_* settings. if (modified_settings.remote_fs_method == RemoteFSReadMethod::threadpool) { - auto & reader = getThreadPoolReader(); - return std::make_unique(reader, modified_settings, std::move(impl)); + auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); + return std::make_unique( + reader, modified_settings, std::move(impl), + global_context->getAsyncReadCounters(), + global_context->getFilesystemReadPrefetchesLog()); } else { diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 786a2ec9ede..2eee8bf5693 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -98,6 +98,7 @@ std::unique_ptr S3ObjectStorage::readObjects( /// NOLINT std::optional) const { ReadSettings disk_read_settings = patchSettings(read_settings); + auto global_context = Context::getGlobalContextInstance(); auto settings_ptr = s3_settings.get(); @@ -121,13 +122,16 @@ std::unique_ptr S3ObjectStorage::readObjects( /// NOLINT auto s3_impl = std::make_unique( std::move(read_buffer_creator), objects, - disk_read_settings); + disk_read_settings, + global_context->getFilesystemCacheLog()); if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) { - auto & reader = getThreadPoolReader(); + auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); return std::make_unique( - reader, disk_read_settings, std::move(s3_impl), disk_read_settings.remote_read_min_bytes_for_seek); + reader, disk_read_settings, std::move(s3_impl), + global_context->getAsyncReadCounters(), + global_context->getFilesystemReadPrefetchesLog()); } else { diff --git a/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp b/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp index 2e85ee2c7d2..c57b75f4038 100644 --- a/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include @@ -179,12 +180,20 @@ std::unique_ptr WebObjectStorage::readObject( /// NOLINT read_until_position); }; - auto web_impl = std::make_unique(std::move(read_buffer_creator), StoredObjects{object}, read_settings); + auto global_context = Context::getGlobalContextInstance(); + auto web_impl = std::make_unique( + std::move(read_buffer_creator), + StoredObjects{object}, + read_settings, + global_context->getFilesystemCacheLog()); if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) { - auto & reader = IObjectStorage::getThreadPoolReader(); - return std::make_unique(reader, read_settings, std::move(web_impl), min_bytes_for_seek); + auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); + return std::make_unique( + reader, read_settings, std::move(web_impl), + global_context->getAsyncReadCounters(), + global_context->getFilesystemReadPrefetchesLog()); } else { diff --git a/src/Functions/FunctionsCodingIP.cpp b/src/Functions/FunctionsCodingIP.cpp index 8279aa490db..2671418fc7b 100644 --- a/src/Functions/FunctionsCodingIP.cpp +++ b/src/Functions/FunctionsCodingIP.cpp @@ -580,7 +580,7 @@ private: #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ unalignedStoreLittleEndian(buf + 8, 0x00000000FFFF0000ull | (static_cast(ntohl(in)) << 32)); #else - unalignedStoreLittleEndian(buf + 8, 0x00000000FFFF0000ull | (static_cast(in)) << 32)); + unalignedStoreLittleEndian(buf + 8, 0x00000000FFFF0000ull | (static_cast(__builtin_bswap32(in))) << 32)); #endif } }; diff --git a/src/Interpreters/Cache/FileCacheSettings.cpp b/src/Interpreters/Cache/FileCacheSettings.cpp index 1737defd316..b6bf77cb306 100644 --- a/src/Interpreters/Cache/FileCacheSettings.cpp +++ b/src/Interpreters/Cache/FileCacheSettings.cpp @@ -47,7 +47,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration & else bypass_cache_threashold = FILECACHE_BYPASS_THRESHOLD; - do_not_evict_index_and_mark_files = config.getUInt64(config_prefix + ".do_not_evict_index_and_mark_files", false); + do_not_evict_index_and_mark_files = config.getUInt64(config_prefix + ".do_not_evict_index_and_mark_files", true); delayed_cleanup_interval_ms = config.getUInt64(config_prefix + ".delayed_cleanup_interval_ms", FILECACHE_DELAYED_CLEANUP_INTERVAL_MS); } diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index 60883631177..163a15fcfda 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -61,7 +61,7 @@ struct CreateFileSegmentSettings : kind(kind_), unbounded(unbounded_) {} }; -class FileSegment : private boost::noncopyable, public std::enable_shared_from_this +class FileSegment : private boost::noncopyable { friend struct LockedKey; friend class FileCache; /// Because of reserved_size in tryReserve(). diff --git a/src/Interpreters/Cache/Guards.h b/src/Interpreters/Cache/Guards.h index 0e06495bd82..09586b55c61 100644 --- a/src/Interpreters/Cache/Guards.h +++ b/src/Interpreters/Cache/Guards.h @@ -1,8 +1,6 @@ #pragma once #include -#include #include -#include namespace DB { @@ -63,6 +61,8 @@ namespace DB */ struct CacheGuard : private boost::noncopyable { + /// struct is used (not keyword `using`) to make CacheGuard::Lock non-interchangable with other guards locks + /// so, we wouldn't be able to pass CacheGuard::Lock to a function which accepts KeyGuard::Lock, for example struct Lock : public std::unique_lock { explicit Lock(std::mutex & mutex_) : std::unique_lock(mutex_) {} diff --git a/src/Interpreters/Cache/Metadata.cpp b/src/Interpreters/Cache/Metadata.cpp index 02ec7fdbefa..1aa13d4e98c 100644 --- a/src/Interpreters/Cache/Metadata.cpp +++ b/src/Interpreters/Cache/Metadata.cpp @@ -208,10 +208,9 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata( chassert(key_not_found_policy == KeyNotFoundPolicy::CREATE_EMPTY); } - /// Not we are at a case: - /// key_state == KeyMetadata::KeyState::REMOVED - /// and KeyNotFoundPolicy == CREATE_EMPTY - /// Retry. + /// Now we are at the case when the key was removed (key_state == KeyMetadata::KeyState::REMOVED) + /// but we need to return empty key (key_not_found_policy == KeyNotFoundPolicy::CREATE_EMPTY) + /// Retry return lockKeyMetadata(key, key_not_found_policy); } @@ -241,13 +240,6 @@ void CacheMetadata::doCleanup() { auto lock = guard.lock(); - /// Let's mention this case. - /// This metadata cleanup is delayed so what is we marked key as deleted and - /// put it to deletion queue, but then the same key was added to cache before - /// we actually performed this delayed removal? - /// In this case it will work fine because on each attempt to add any key to cache - /// we perform this delayed removal. - FileCacheKey cleanup_key; while (cleanup_queue->tryPop(cleanup_key)) { diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 31c0f2fc87a..e222b8655aa 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -669,13 +669,15 @@ SharedContextHolder Context::createShared() ContextMutablePtr Context::createCopy(const ContextPtr & other) { + auto lock = other->getLock(); return std::shared_ptr(new Context(*other)); } ContextMutablePtr Context::createCopy(const ContextWeakPtr & other) { auto ptr = other.lock(); - if (!ptr) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't copy an expired context"); + if (!ptr) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't copy an expired context"); return createCopy(ptr); } @@ -4169,35 +4171,8 @@ OrdinaryBackgroundExecutorPtr Context::getCommonExecutor() const return shared->common_executor; } -static size_t getThreadPoolReaderSizeFromConfig(Context::FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config) -{ - switch (type) - { - case Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER: - { - return config.getUInt(".threadpool_remote_fs_reader_pool_size", 250); - } - case Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER: - { - return config.getUInt(".threadpool_local_fs_reader_pool_size", 100); - } - case Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER: - { - return std::numeric_limits::max(); - } - } -} - -size_t Context::getThreadPoolReaderSize(FilesystemReaderType type) const -{ - const auto & config = getConfigRef(); - return getThreadPoolReaderSizeFromConfig(type, config); -} - IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) const { - const auto & config = getConfigRef(); - auto lock = getLock(); switch (type) @@ -4205,31 +4180,20 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER: { if (!shared->asynchronous_remote_fs_reader) - { - auto pool_size = getThreadPoolReaderSizeFromConfig(type, config); - auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000); - shared->asynchronous_remote_fs_reader = std::make_unique(pool_size, queue_size); - } - + shared->asynchronous_remote_fs_reader = createThreadPoolReader(type, getConfigRef()); return *shared->asynchronous_remote_fs_reader; } case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER: { if (!shared->asynchronous_local_fs_reader) - { - auto pool_size = getThreadPoolReaderSizeFromConfig(type, config); - auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000); - shared->asynchronous_local_fs_reader = std::make_unique(pool_size, queue_size); - } + shared->asynchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef()); return *shared->asynchronous_local_fs_reader; } case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER: { if (!shared->synchronous_local_fs_reader) - { - shared->synchronous_local_fs_reader = std::make_unique(); - } + shared->synchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef()); return *shared->synchronous_local_fs_reader; } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index f0be7fb3f7f..87843a458e8 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -1096,17 +1097,8 @@ public: OrdinaryBackgroundExecutorPtr getFetchesExecutor() const; OrdinaryBackgroundExecutorPtr getCommonExecutor() const; - enum class FilesystemReaderType - { - SYNCHRONOUS_LOCAL_FS_READER, - ASYNCHRONOUS_LOCAL_FS_READER, - ASYNCHRONOUS_REMOTE_FS_READER, - }; - IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const; - size_t getThreadPoolReaderSize(FilesystemReaderType type) const; - std::shared_ptr getAsyncReadCounters() const; ThreadPool & getThreadPoolWriter() const; diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index e78a61831a1..b5b159f37d5 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -241,13 +241,17 @@ Chain InterpreterInsertQuery::buildChain( running_group = std::make_shared(getContext()); auto sample = getSampleBlock(columns, table, metadata_snapshot); - return buildChainImpl(table, metadata_snapshot, sample, thread_status_holder, running_group, elapsed_counter_ms); + + Chain sink = buildSink(table, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms); + Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample, thread_status_holder); + + chain.appendChain(std::move(sink)); + return chain; } -Chain InterpreterInsertQuery::buildChainImpl( +Chain InterpreterInsertQuery::buildSink( const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, - const Block & query_sample_block, ThreadStatusesHolderPtr thread_status_holder, ThreadGroupPtr running_group, std::atomic_uint64_t * elapsed_counter_ms) @@ -258,14 +262,7 @@ Chain InterpreterInsertQuery::buildChainImpl( thread_status = nullptr; auto context_ptr = getContext(); - const ASTInsertQuery * query = nullptr; - if (query_ptr) - query = query_ptr->as(); - const Settings & settings = context_ptr->getSettingsRef(); - bool null_as_default = query && query->select && context_ptr->getSettingsRef().insert_null_as_default; - - /// We create a pipeline of several streams, into which we will write data. Chain out; /// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyed @@ -286,16 +283,48 @@ Chain InterpreterInsertQuery::buildChainImpl( thread_status_holder, running_group, elapsed_counter_ms); } + return out; +} + +Chain InterpreterInsertQuery::buildPreSinkChain( + const Block & subsequent_header, + const StoragePtr & table, + const StorageMetadataPtr & metadata_snapshot, + const Block & query_sample_block, + ThreadStatusesHolderPtr thread_status_holder) +{ + ThreadStatus * thread_status = current_thread; + + if (!thread_status_holder) + thread_status = nullptr; + + auto context_ptr = getContext(); + + const ASTInsertQuery * query = nullptr; + if (query_ptr) + query = query_ptr->as(); + + const Settings & settings = context_ptr->getSettingsRef(); + bool null_as_default = query && query->select && context_ptr->getSettingsRef().insert_null_as_default; + + /// We create a pipeline of several streams, into which we will write data. + Chain out; + + auto input_header = [&]() -> const Block & + { + return out.empty() ? subsequent_header : out.getInputHeader(); + }; + /// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order. /// Checking constraints. It must be done after calculation of all defaults, so we can check them on calculated columns. if (const auto & constraints = metadata_snapshot->getConstraints(); !constraints.empty()) out.addSource(std::make_shared( - table->getStorageID(), out.getInputHeader(), metadata_snapshot->getConstraints(), context_ptr)); + table->getStorageID(), input_header(), metadata_snapshot->getConstraints(), context_ptr)); auto adding_missing_defaults_dag = addMissingDefaults( query_sample_block, - out.getInputHeader().getNamesAndTypesList(), + input_header().getNamesAndTypesList(), metadata_snapshot->getColumns(), context_ptr, null_as_default); @@ -316,12 +345,12 @@ Chain InterpreterInsertQuery::buildChainImpl( bool table_prefers_large_blocks = table->prefersLargeBlocks(); out.addSource(std::make_shared( - out.getInputHeader(), + input_header(), table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL)); } - auto counting = std::make_shared(out.getInputHeader(), thread_status, getContext()->getQuota()); + auto counting = std::make_shared(input_header(), thread_status, getContext()->getQuota()); counting->setProcessListElement(context_ptr->getProcessListElement()); counting->setProgressCallback(context_ptr->getProgressCallback()); out.addSource(std::move(counting)); @@ -362,10 +391,20 @@ BlockIO InterpreterInsertQuery::execute() // Distributed INSERT SELECT distributed_pipeline = table->distributedWrite(query, getContext()); - std::vector out_chains; + std::vector presink_chains; + std::vector sink_chains; if (!distributed_pipeline || query.watch) { - size_t out_streams_size = 1; + /// Number of streams works like this: + /// * For the SELECT, use `max_threads`, or `max_insert_threads`, or whatever + /// InterpreterSelectQuery ends up with. + /// * Use `max_insert_threads` streams for various insert-preparation steps, e.g. + /// materializing and squashing (too slow to do in one thread). That's `presink_chains`. + /// * If the table supports parallel inserts, use the same streams for writing to IStorage. + /// Otherwise ResizeProcessor them down to 1 stream. + /// * If it's not an INSERT SELECT, forget all that and use one stream. + size_t pre_streams_size = 1; + size_t sink_streams_size = 1; if (query.select) { @@ -441,10 +480,14 @@ BlockIO InterpreterInsertQuery::execute() pipeline.dropTotalsAndExtremes(); - if (table->supportsParallelInsert() && settings.max_insert_threads > 1) - out_streams_size = std::min(static_cast(settings.max_insert_threads), pipeline.getNumStreams()); + if (settings.max_insert_threads > 1) + { + pre_streams_size = std::min(static_cast(settings.max_insert_threads), pipeline.getNumStreams()); + if (table->supportsParallelInsert()) + sink_streams_size = pre_streams_size; + } - pipeline.resize(out_streams_size); + pipeline.resize(pre_streams_size); /// Allow to insert Nullable into non-Nullable columns, NULL values will be added as defaults values. if (getContext()->getSettingsRef().insert_null_as_default) @@ -476,13 +519,17 @@ BlockIO InterpreterInsertQuery::execute() running_group = current_thread->getThreadGroup(); if (!running_group) running_group = std::make_shared(getContext()); - for (size_t i = 0; i < out_streams_size; ++i) + for (size_t i = 0; i < sink_streams_size; ++i) { - auto out = buildChainImpl(table, metadata_snapshot, query_sample_block, - /* thread_status_holder= */ nullptr, - running_group, - /* elapsed_counter_ms= */ nullptr); - out_chains.emplace_back(std::move(out)); + auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr, + running_group, /* elapsed_counter_ms= */ nullptr); + sink_chains.emplace_back(std::move(out)); + } + for (size_t i = 0; i < pre_streams_size; ++i) + { + auto out = buildPreSinkChain(sink_chains[0].getInputHeader(), table, metadata_snapshot, + query_sample_block, /* thread_status_holder= */ nullptr); + presink_chains.emplace_back(std::move(out)); } } @@ -495,7 +542,7 @@ BlockIO InterpreterInsertQuery::execute() } else if (query.select || query.watch) { - const auto & header = out_chains.at(0).getInputHeader(); + const auto & header = presink_chains.at(0).getInputHeader(); auto actions_dag = ActionsDAG::makeConvertingActions( pipeline.getHeader().getColumnsWithTypeAndName(), header.getColumnsWithTypeAndName(), @@ -516,10 +563,14 @@ BlockIO InterpreterInsertQuery::execute() size_t num_select_threads = pipeline.getNumThreads(); - for (auto & chain : out_chains) + for (auto & chain : presink_chains) + resources = chain.detachResources(); + for (auto & chain : sink_chains) resources = chain.detachResources(); - pipeline.addChains(std::move(out_chains)); + pipeline.addChains(std::move(presink_chains)); + pipeline.resize(sink_chains.size()); + pipeline.addChains(std::move(sink_chains)); if (!settings.parallel_view_processing) { @@ -552,7 +603,8 @@ BlockIO InterpreterInsertQuery::execute() } else { - res.pipeline = QueryPipeline(std::move(out_chains.at(0))); + presink_chains.at(0).appendChain(std::move(sink_chains.at(0))); + res.pipeline = QueryPipeline(std::move(presink_chains[0])); res.pipeline.setNumThreads(std::min(res.pipeline.getNumThreads(), settings.max_threads)); if (query.hasInlinedData() && !async_insert) diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index 5bff472270d..b9a146e5338 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -66,13 +66,19 @@ private: std::vector> owned_buffers; - Chain buildChainImpl( + Chain buildSink( const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, - const Block & query_sample_block, ThreadStatusesHolderPtr thread_status_holder, ThreadGroupPtr running_group, std::atomic_uint64_t * elapsed_counter_ms); + + Chain buildPreSinkChain( + const Block & subsequent_header, + const StoragePtr & table, + const StorageMetadataPtr & metadata_snapshot, + const Block & query_sample_block, + ThreadStatusesHolderPtr thread_status_holder); }; diff --git a/src/Interpreters/OpenTelemetrySpanLog.cpp b/src/Interpreters/OpenTelemetrySpanLog.cpp index 257c64e1df8..610330b8559 100644 --- a/src/Interpreters/OpenTelemetrySpanLog.cpp +++ b/src/Interpreters/OpenTelemetrySpanLog.cpp @@ -1,19 +1,14 @@ -#include "OpenTelemetrySpanLog.h" +#include #include #include #include -#include #include #include +#include #include #include #include -#include - -#include -#include -#include namespace DB @@ -32,11 +27,13 @@ NamesAndTypesList OpenTelemetrySpanLogElement::getNamesAndTypes() } ); + auto low_cardinality_string = std::make_shared(std::make_shared()); + return { {"trace_id", std::make_shared()}, {"span_id", std::make_shared()}, {"parent_span_id", std::make_shared()}, - {"operation_name", std::make_shared()}, + {"operation_name", low_cardinality_string}, {"kind", std::move(span_kind_type)}, // DateTime64 is really unwieldy -- there is no "normal" way to convert // it to an UInt64 count of microseconds, except: @@ -51,15 +48,17 @@ NamesAndTypesList OpenTelemetrySpanLogElement::getNamesAndTypes() {"start_time_us", std::make_shared()}, {"finish_time_us", std::make_shared()}, {"finish_date", std::make_shared()}, - {"attribute", std::make_shared(std::make_shared(), std::make_shared())}, + {"attribute", std::make_shared(low_cardinality_string, std::make_shared())}, }; } NamesAndAliases OpenTelemetrySpanLogElement::getNamesAndAliases() { + auto low_cardinality_string = std::make_shared(std::make_shared()); + return { - {"attribute.names", std::make_shared(std::make_shared()), "mapKeys(attribute)"}, + {"attribute.names", std::make_shared(low_cardinality_string), "mapKeys(attribute)"}, {"attribute.values", std::make_shared(std::make_shared()), "mapValues(attribute)"} }; } @@ -83,4 +82,3 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const } } - diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 5a37d324fc3..6a4f4576eca 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -84,6 +84,7 @@ ThreadGroupPtr ThreadGroup::createForBackgroundProcess(ContextPtr storage_contex group->memory_tracker.setProfilerStep(settings.memory_profiler_step); group->memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability); group->memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator); + group->memory_tracker.setParent(&background_memory_tracker); if (settings.memory_tracker_fault_probability > 0.0) group->memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability); diff --git a/src/Interpreters/tests/gtest_context_race.cpp b/src/Interpreters/tests/gtest_context_race.cpp new file mode 100644 index 00000000000..60531494592 --- /dev/null +++ b/src/Interpreters/tests/gtest_context_race.cpp @@ -0,0 +1,49 @@ +#include +#include +#include + +using namespace DB; + +template +void run(Ptr context) +{ + for (size_t i = 0; i < 100; ++i) + { + std::thread t1([context] + { + if constexpr (std::is_same_v) + context.lock()->getAsyncReadCounters(); + else + context->getAsyncReadCounters(); + }); + + std::thread t2([context] + { + Context::createCopy(context); + }); + + t1.join(); + t2.join(); + } +} + +TEST(Context, MutableRace) +{ + auto context = Context::createCopy(getContext().context); + context->makeQueryContext(); + run(context); +} + +TEST(Context, ConstRace) +{ + auto context = Context::createCopy(getContext().context); + context->makeQueryContext(); + run(context); +} + +TEST(Context, WeakRace) +{ + auto context = Context::createCopy(getContext().context); + context->makeQueryContext(); + run(context); +} diff --git a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp index 5f9725c804b..a5befca7233 100644 --- a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp @@ -168,7 +168,6 @@ void FinishAggregatingInOrderAlgorithm::addToAggregation() accumulated_bytes += static_cast(static_cast(states[i].total_bytes) * current_rows / states[i].num_rows); accumulated_rows += current_rows; - if (!states[i].isValid()) inputs_to_update.push_back(i); } diff --git a/src/QueryPipeline/Chain.cpp b/src/QueryPipeline/Chain.cpp index 6122517432a..eaa36071542 100644 --- a/src/QueryPipeline/Chain.cpp +++ b/src/QueryPipeline/Chain.cpp @@ -99,6 +99,14 @@ void Chain::addSink(ProcessorPtr processor) processors.emplace_back(std::move(processor)); } +void Chain::appendChain(Chain chain) +{ + connect(getOutputPort(), chain.getInputPort()); + processors.splice(processors.end(), std::move(chain.processors)); + attachResources(chain.detachResources()); + num_threads += chain.num_threads; +} + IProcessor & Chain::getSource() { checkInitialized(processors); diff --git a/src/QueryPipeline/Chain.h b/src/QueryPipeline/Chain.h index d6139281990..322e49d0d49 100644 --- a/src/QueryPipeline/Chain.h +++ b/src/QueryPipeline/Chain.h @@ -7,6 +7,10 @@ namespace DB { +/// Has one unconnected input port and one unconnected output port. +/// There may be other ports on the processors, but they must all be connected. +/// The unconnected input must be on the first processor, output - on the last. +/// The processors don't necessarily form an actual chain. class Chain { public: @@ -27,6 +31,7 @@ public: void addSource(ProcessorPtr processor); void addSink(ProcessorPtr processor); + void appendChain(Chain chain); IProcessor & getSource(); IProcessor & getSink(); @@ -44,7 +49,11 @@ public: void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); } void addInterpreterContext(ContextPtr context) { holder.interpreter_context.emplace_back(std::move(context)); } - void attachResources(QueryPlanResourceHolder holder_) { holder = std::move(holder_); } + void attachResources(QueryPlanResourceHolder holder_) + { + /// This operator "=" actually merges holder_ into holder, doesn't replace. + holder = std::move(holder_); + } QueryPlanResourceHolder detachResources() { return std::move(holder); } void reset(); diff --git a/src/Server/ProtocolServerAdapter.cpp b/src/Server/ProtocolServerAdapter.cpp index dbc676432f5..915b6265993 100644 --- a/src/Server/ProtocolServerAdapter.cpp +++ b/src/Server/ProtocolServerAdapter.cpp @@ -1,7 +1,7 @@ #include #include -#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD) +#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD) #include #endif @@ -37,7 +37,7 @@ ProtocolServerAdapter::ProtocolServerAdapter( { } -#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD) +#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD) class ProtocolServerAdapter::GRPCServerAdapterImpl : public Impl { public: diff --git a/src/Server/ProtocolServerAdapter.h b/src/Server/ProtocolServerAdapter.h index 514354f9723..e08b12e67f2 100644 --- a/src/Server/ProtocolServerAdapter.h +++ b/src/Server/ProtocolServerAdapter.h @@ -23,7 +23,7 @@ public: ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default; ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr tcp_server_); -#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD) +#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD) ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr grpc_server_); #endif diff --git a/src/Storages/HDFS/AsynchronousReadBufferFromHDFS.cpp b/src/Storages/HDFS/AsynchronousReadBufferFromHDFS.cpp index c683d59579b..a8502b1bd65 100644 --- a/src/Storages/HDFS/AsynchronousReadBufferFromHDFS.cpp +++ b/src/Storages/HDFS/AsynchronousReadBufferFromHDFS.cpp @@ -67,7 +67,7 @@ bool AsynchronousReadBufferFromHDFS::hasPendingDataToRead() std::future AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size, int64_t priority) { IAsynchronousReader::Request request; - request.descriptor = std::make_shared(*impl); + request.descriptor = std::make_shared(*impl, nullptr); request.buf = data; request.size = size; request.offset = file_offset_of_buffer_end; diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index 965b27b4c05..f554a14ec75 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -232,7 +233,7 @@ public: if (thread_pool_read) { return std::make_unique( - IObjectStorage::getThreadPoolReader(), read_settings, std::move(buf)); + getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, std::move(buf)); } else { diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 0070791d5de..15b45f30891 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -146,6 +146,8 @@ public: virtual bool supportsReplication() const { return false; } /// Returns true if the storage supports parallel insert. + /// If false, each INSERT query will call write() only once. + /// Different INSERT queries may write in parallel regardless of this value. virtual bool supportsParallelInsert() const { return false; } /// Returns true if the storage supports deduplication of inserted data blocks. diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 6812ef93a78..1cd2d11da50 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -80,4 +80,9 @@ MergeInfo MergeListElement::getInfo() const return res; } +MergeListElement::~MergeListElement() +{ + background_memory_tracker.adjustOnBackgroundTaskEnd(&getMemoryTracker()); +} + } diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index 045b4015c8e..ffa87e75505 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -115,6 +115,8 @@ struct MergeListElement : boost::noncopyable MergeListElement * ptr() { return this; } MergeListElement & ref() { return *this; } + + ~MergeListElement(); }; /** Maintains a list of currently running merges. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 87a5449cb83..9c7488cb6a6 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -166,8 +166,8 @@ struct Settings; M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for Zero-copy table-independet info.", 0) \ M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \ /** Compress marks and primary key. */ \ - M(Bool, compress_marks, false, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \ - M(Bool, compress_primary_key, false, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \ + M(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \ + M(Bool, compress_primary_key, true, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \ M(String, marks_compression_codec, "ZSTD(3)", "Compression encoding used by marks, marks are small enough and cached, so the default compression is ZSTD(3).", 0) \ M(String, primary_key_compression_codec, "ZSTD(3)", "Compression encoding used by primary, primary key is small enough and cached, so the default compression is ZSTD(3).", 0) \ M(UInt64, marks_compress_block_size, 65536, "Mark compress block size, the actual size of the block to compress.", 0) \ diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index 5209be0fd3b..027cd1af7c9 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -535,7 +535,7 @@ static StoragePtr create(const StorageFactory::Arguments & args) if (!args.storage_def->order_by) throw Exception(ErrorCodes::BAD_ARGUMENTS, "You must provide an ORDER BY or PRIMARY KEY expression in the table definition. " - "If you don't want this table to be sorted, use ORDER BY/PRIMARY KEY tuple()"); + "If you don't want this table to be sorted, use ORDER BY/PRIMARY KEY ()"); /// Get sorting key from engine arguments. /// diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 971314e3149..17d1085d6ae 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,7 @@ #include #include #include +#include namespace DB { @@ -918,7 +920,14 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT; - if (partition_id.empty()) + if (!canEnqueueBackgroundTask()) + { + if (out_disable_reason) + *out_disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})", + formatReadableSizeWithBinarySuffix(background_memory_tracker.get()), + formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit())); + } + else if (partition_id.empty()) { UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 2e9a1b81420..b5e53950a02 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -3250,7 +3251,14 @@ void StorageReplicatedMergeTree::mergeSelectingTask() auto merges_and_mutations_queued = queue.countMergesAndPartMutations(); size_t merges_and_mutations_sum = merges_and_mutations_queued.merges + merges_and_mutations_queued.mutations; - if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue) + if (!canEnqueueBackgroundTask()) + { + LOG_TRACE(log, "Reached memory limit for the background tasks ({}), so won't select new parts to merge or mutate." + "Current background tasks memory usage: {}.", + formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()), + formatReadableSizeWithBinarySuffix(background_memory_tracker.get())); + } + else if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue) { LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})" " is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.", diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index d1b168d3f7d..00e72482a17 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -646,6 +646,7 @@ StorageS3Source::ReadBufferOrFactory StorageS3Source::createS3ReadBuffer(const S std::unique_ptr StorageS3Source::createAsyncS3ReadBuffer( const String & key, const ReadSettings & read_settings, size_t object_size) { + auto context = getContext(); auto read_buffer_creator = [this, read_settings, object_size] (const std::string & path, size_t read_until_position) -> std::unique_ptr @@ -667,10 +668,17 @@ std::unique_ptr StorageS3Source::createAsyncS3ReadBuffer( auto s3_impl = std::make_unique( std::move(read_buffer_creator), StoredObjects{StoredObject{key, object_size}}, - read_settings); + read_settings, + /* cache_log */nullptr); - auto & pool_reader = getContext()->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); - auto async_reader = std::make_unique(pool_reader, read_settings, std::move(s3_impl)); + auto modified_settings{read_settings}; + /// FIXME: Changing this setting to default value breaks something around parquet reading + modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size; + + auto & pool_reader = context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); + auto async_reader = std::make_unique( + pool_reader, modified_settings, std::move(s3_impl), + context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog()); async_reader->setReadUntilEnd(); if (read_settings.remote_fs_prefetch) diff --git a/src/Storages/examples/async_read_buffer_from_hdfs.cpp b/src/Storages/examples/async_read_buffer_from_hdfs.cpp index 17aa5479de5..4f6aed8ef65 100644 --- a/src/Storages/examples/async_read_buffer_from_hdfs.cpp +++ b/src/Storages/examples/async_read_buffer_from_hdfs.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -25,7 +26,7 @@ int main() String path = "/path/to/hdfs/file"; ReadSettings settings = {}; auto in = std::make_unique(hdfs_namenode_url, path, *config, settings); - auto & reader = IObjectStorage::getThreadPoolReader(); + auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); AsynchronousReadBufferFromHDFS buf(reader, {}, std::move(in)); String output; diff --git a/tests/broken_tests.txt b/tests/broken_tests.txt index 1317240b818..d04b95a3ebd 100644 --- a/tests/broken_tests.txt +++ b/tests/broken_tests.txt @@ -123,7 +123,6 @@ 02324_map_combinator_bug 02241_join_rocksdb_bs 02003_WithMergeableStateAfterAggregationAndLimit_LIMIT_BY_LIMIT_OFFSET -01626_cnf_fuzz_long 01115_join_with_dictionary 01009_global_array_join_names 00917_multiple_joins_denny_crane diff --git a/tests/clickhouse-test b/tests/clickhouse-test index e61749e5377..e279b899a93 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -2244,7 +2244,7 @@ def main(args): "\nFound hung queries in processlist:", args, "red", attrs=["bold"] ) ) - print(processlist) + print(processlist.decode()) print(get_transactions_list(args)) print_stacktraces() diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 8adb722e20b..950663cb429 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -17,7 +17,6 @@ import urllib.parse import shlex import urllib3 import requests -import pyspark try: # Please, add modules that required for specific tests only here. @@ -33,6 +32,7 @@ try: import nats import ssl import meilisearch + import pyspark from confluent_kafka.avro.cached_schema_registry_client import ( CachedSchemaRegistryClient, ) diff --git a/tests/integration/test_backward_compatibility/configs/no_compress_marks.xml b/tests/integration/test_backward_compatibility/configs/no_compress_marks.xml new file mode 100644 index 00000000000..cc968525bbb --- /dev/null +++ b/tests/integration/test_backward_compatibility/configs/no_compress_marks.xml @@ -0,0 +1,6 @@ + + + 0 + 0 + + diff --git a/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml b/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml index c823dd02d5a..e9cf053f1c5 100644 --- a/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml +++ b/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml @@ -1,7 +1,5 @@ 0 - 0 - 0 - + diff --git a/tests/integration/test_backward_compatibility/test.py b/tests/integration/test_backward_compatibility/test.py index 01ed02720f8..ea1d3ab9c07 100644 --- a/tests/integration/test_backward_compatibility/test.py +++ b/tests/integration/test_backward_compatibility/test.py @@ -12,7 +12,9 @@ node1 = cluster.add_instance( with_installed_binary=True, ) node2 = cluster.add_instance( - "node2", main_configs=["configs/wide_parts_only.xml"], with_zookeeper=True + "node2", + main_configs=["configs/wide_parts_only.xml", "configs/no_compress_marks.xml"], + with_zookeeper=True, ) diff --git a/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py b/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py index 1781ed7c976..3d006caad0d 100644 --- a/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py +++ b/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py @@ -14,6 +14,7 @@ node_old = cluster.add_instance( ) node_new = cluster.add_instance( "node2", + main_configs=["configs/no_compress_marks.xml"], with_zookeeper=True, stay_alive=True, ) @@ -29,7 +30,7 @@ def start_cluster(): cluster.shutdown() -def test_vertical_merges_from_comapact_parts(start_cluster): +def test_vertical_merges_from_compact_parts(start_cluster): for i, node in enumerate([node_old, node_new]): node.query( """ @@ -41,7 +42,7 @@ def test_vertical_merges_from_comapact_parts(start_cluster): vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1, min_bytes_for_wide_part = 0, - min_rows_for_wide_part = 100; + min_rows_for_wide_part = 100 """.format( i ) @@ -104,8 +105,16 @@ def test_vertical_merges_from_comapact_parts(start_cluster): node_old.query("SYSTEM FLUSH LOGS") assert not ( - node_old.contains_in_log("CHECKSUM_DOESNT_MATCH") - or node_new.contains_in_log("CHECKSUM_DOESNT_MATCH") + # Now the old node is restarted as a new, and its config allows compressed indices, and it merged the data into compressed indices, + # that's why the error about different number of compressed files is expected and ok. + ( + node_old.contains_in_log("CHECKSUM_DOESNT_MATCH") + and not node_old.contains_in_log("Different number of files") + ) + or ( + node_new.contains_in_log("CHECKSUM_DOESNT_MATCH") + and not node_new.contains_in_log("Different number of files") + ) ) assert node_new.query(check_query.format("all_0_3_3")) == "Vertical\tWide\n" diff --git a/tests/integration/test_dictionaries_dependency/test.py b/tests/integration/test_dictionaries_dependency/test.py index 05d6afd35c7..d262738af60 100644 --- a/tests/integration/test_dictionaries_dependency/test.py +++ b/tests/integration/test_dictionaries_dependency/test.py @@ -191,3 +191,27 @@ def test_dependent_dict_table_distr(node): node.restart_clickhouse() query("DROP DATABASE IF EXISTS test_db;") + + +def test_no_lazy_load(): + node2.query("create database no_lazy") + node2.query( + "create table no_lazy.src (n int, m int) engine=MergeTree order by n partition by n % 100" + ) + node2.query("insert into no_lazy.src select number, number from numbers(0, 99)") + node2.query("insert into no_lazy.src select number, number from numbers(100, 99)") + node2.query( + "create dictionary no_lazy.dict (n int, mm int) primary key n " + "source(clickhouse(query 'select n, m + sleepEachRow(0.1) as mm from no_lazy.src')) " + "lifetime(min 0 max 0) layout(complex_key_hashed_array(shards 10))" + ) + + node2.restart_clickhouse() + + assert "42\n" == node2.query("select dictGet('no_lazy.dict', 'mm', 42)") + + assert "some tables depend on it" in node2.query_and_get_error( + "drop table no_lazy.src", settings={"check_referential_table_dependencies": 1} + ) + + node2.query("drop database no_lazy") diff --git a/tests/integration/test_merge_tree_load_parts/test.py b/tests/integration/test_merge_tree_load_parts/test.py index 049dd516647..118c31ea864 100644 --- a/tests/integration/test_merge_tree_load_parts/test.py +++ b/tests/integration/test_merge_tree_load_parts/test.py @@ -227,7 +227,7 @@ def test_merge_tree_load_parts_filesystem_error(started_cluster): # It can be a filesystem exception triggered at initialization of part storage but it hard # to trigger it because it should be an exception on stat/listDirectory. # The most easy way to trigger such exception is to use chmod but clickhouse server - # is run with root user in integration test and this won't work. So let's do some + # is run with root user in integration test and this won't work. So let's do # some stupid things: create a table without adaptive granularity and change mark # extensions of data files in part to make clickhouse think that it's a compact part which # cannot be created in such table. This will trigger a LOGICAL_ERROR on part creation. @@ -240,7 +240,8 @@ def test_merge_tree_load_parts_filesystem_error(started_cluster): ).strip() node3.exec_in_container( - ["bash", "-c", f"mv {part_path}id.mrk {part_path}id.mrk3"], privileged=True + ["bash", "-c", f"mv {part_path}id.cmrk {part_path}id.cmrk3"], + privileged=True, ) corrupt_part("mt_load_parts", "all_1_1_0") diff --git a/tests/integration/test_merges_memory_limit/__init__.py b/tests/integration/test_merges_memory_limit/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_merges_memory_limit/test.py b/tests/integration/test_merges_memory_limit/test.py new file mode 100644 index 00000000000..e663f3280cc --- /dev/null +++ b/tests/integration/test_merges_memory_limit/test.py @@ -0,0 +1,39 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node = cluster.add_instance("node") + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_memory_limit_success(): + node.query( + "CREATE TABLE test_merge_oom ENGINE=AggregatingMergeTree ORDER BY id EMPTY AS SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(20000)" + ) + node.query("SYSTEM STOP MERGES test_merge_oom") + node.query( + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)" + ) + node.query( + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)" + ) + node.query( + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)" + ) + + _, error = node.query_and_get_answer_with_error( + "SYSTEM START MERGES test_merge_oom;SET optimize_throw_if_noop=1;OPTIMIZE TABLE test_merge_oom FINAL" + ) + + assert not error + node.query("DROP TABLE test_merge_oom") diff --git a/tests/integration/test_ssl_cert_authentication/test.py b/tests/integration/test_ssl_cert_authentication/test.py index 514897ad345..ec7111099c9 100644 --- a/tests/integration/test_ssl_cert_authentication/test.py +++ b/tests/integration/test_ssl_cert_authentication/test.py @@ -164,19 +164,24 @@ def get_ssl_context(cert_name): def execute_query_https( query, user, enable_ssl_auth=True, cert_name=None, password=None ): - url = ( - f"https://{instance.ip_address}:{HTTPS_PORT}/?query={urllib.parse.quote(query)}" - ) - request = urllib.request.Request(url) - request.add_header("X-ClickHouse-User", user) - if enable_ssl_auth: - request.add_header("X-ClickHouse-SSL-Certificate-Auth", "on") - if password: - request.add_header("X-ClickHouse-Key", password) - response = urllib.request.urlopen( - request, context=get_ssl_context(cert_name) - ).read() - return response.decode("utf-8") + retries = 10 + while True: + try: + url = f"https://{instance.ip_address}:{HTTPS_PORT}/?query={urllib.parse.quote(query)}" + request = urllib.request.Request(url) + request.add_header("X-ClickHouse-User", user) + if enable_ssl_auth: + request.add_header("X-ClickHouse-SSL-Certificate-Auth", "on") + if password: + request.add_header("X-ClickHouse-Key", password) + response = urllib.request.urlopen( + request, context=get_ssl_context(cert_name) + ).read() + return response.decode("utf-8") + except BrokenPipeError: + retries -= 1 + if retries == 0: + raise def test_https(): diff --git a/tests/queries/0_stateless/00341_squashing_insert_select2.sql b/tests/queries/0_stateless/00341_squashing_insert_select2.sql index 3eb5a2682e0..e973a146d5b 100644 --- a/tests/queries/0_stateless/00341_squashing_insert_select2.sql +++ b/tests/queries/0_stateless/00341_squashing_insert_select2.sql @@ -3,6 +3,7 @@ CREATE TABLE numbers_squashed (number UInt8) ENGINE = StripeLog; SET min_insert_block_size_rows = 100; SET min_insert_block_size_bytes = 0; +SET max_insert_threads = 1; SET max_threads = 1; INSERT INTO numbers_squashed diff --git a/tests/queries/0_stateless/01592_long_window_functions1.sql b/tests/queries/0_stateless/01592_long_window_functions1.sql index 022d8071ffa..4911b7aa792 100644 --- a/tests/queries/0_stateless/01592_long_window_functions1.sql +++ b/tests/queries/0_stateless/01592_long_window_functions1.sql @@ -4,10 +4,6 @@ drop table if exists stack; set max_insert_threads = 4; --- Temporary disable aggregation in order, --- because it may fail with UBSan. -set optimize_aggregation_in_order = 0; - create table stack(item_id Int64, brand_id Int64, rack_id Int64, dt DateTime, expiration_dt DateTime, quantity UInt64) Engine = MergeTree partition by toYYYYMM(dt) diff --git a/tests/queries/0_stateless/01710_projection_aggregation_in_order.sql b/tests/queries/0_stateless/01710_projection_aggregation_in_order.sql index c7ed91eb19b..f9de1275dbd 100644 --- a/tests/queries/0_stateless/01710_projection_aggregation_in_order.sql +++ b/tests/queries/0_stateless/01710_projection_aggregation_in_order.sql @@ -1,3 +1,5 @@ +-- Tags: disabled +-- FIXME https://github.com/ClickHouse/ClickHouse/issues/49552 -- Test that check the correctness of the result for optimize_aggregation_in_order and projections, -- not that this optimization will take place. diff --git a/tests/queries/0_stateless/02516_projections_with_rollup.sql b/tests/queries/0_stateless/02516_projections_with_rollup.sql index e670fbb7827..038caf59264 100644 --- a/tests/queries/0_stateless/02516_projections_with_rollup.sql +++ b/tests/queries/0_stateless/02516_projections_with_rollup.sql @@ -1,3 +1,6 @@ +-- Tags: disabled +-- FIXME https://github.com/ClickHouse/ClickHouse/issues/49552 + DROP TABLE IF EXISTS video_log; DROP TABLE IF EXISTS video_log_result__fuzz_0; DROP TABLE IF EXISTS rng; diff --git a/tests/queries/0_stateless/02684_bson.sql b/tests/queries/0_stateless/02684_bson.sql index 577bd4ffd27..cab5600eff0 100644 Binary files a/tests/queries/0_stateless/02684_bson.sql and b/tests/queries/0_stateless/02684_bson.sql differ diff --git a/tests/queries/0_stateless/02706_kolmogorov_smirnov_test_scipy.python b/tests/queries/0_stateless/02706_kolmogorov_smirnov_test_scipy.python index a2a34b57c90..01f245e0cf0 100644 --- a/tests/queries/0_stateless/02706_kolmogorov_smirnov_test_scipy.python +++ b/tests/queries/0_stateless/02706_kolmogorov_smirnov_test_scipy.python @@ -62,8 +62,8 @@ def test_ks_all_alternatives(rvs1, rvs2): def test_kolmogorov_smirnov(): - rvs1 = np.round(stats.norm.rvs(loc=1, scale=5, size=10), 2) - rvs2 = np.round(stats.norm.rvs(loc=1.5, scale=5, size=20), 2) + rvs1 = np.round(stats.norm.rvs(loc=1, scale=5, size=100), 2) + rvs2 = np.round(stats.norm.rvs(loc=1.5, scale=5, size=200), 2) test_ks_all_alternatives(rvs1, rvs2) rvs1 = np.round(stats.norm.rvs(loc=13, scale=1, size=100), 2) diff --git a/tests/queries/0_stateless/02724_persist_interval_type.reference b/tests/queries/0_stateless/02724_persist_interval_type.reference new file mode 100644 index 00000000000..964604605d9 --- /dev/null +++ b/tests/queries/0_stateless/02724_persist_interval_type.reference @@ -0,0 +1,6 @@ +2023-01-01 00:00:01.000000001 2023-01-01 02:00:00.000000001 2023-01-01 00:00:00.000000004 1 2 0 +2023-01-01 00:00:02.000000001 2023-01-01 03:00:00.000000001 2023-01-01 00:00:00.000000005 2 3 0 +2023-01-01 00:00:01.000000001 2023-01-01 02:00:00.000000001 2023-01-01 00:00:00.000000004 1 2 0 +2023-01-01 00:00:02.000000001 2023-01-01 03:00:00.000000001 2023-01-01 00:00:00.000000005 2 3 0 +0 +1 diff --git a/tests/queries/0_stateless/02724_persist_interval_type.sql b/tests/queries/0_stateless/02724_persist_interval_type.sql new file mode 100644 index 00000000000..3acce003c9a --- /dev/null +++ b/tests/queries/0_stateless/02724_persist_interval_type.sql @@ -0,0 +1,15 @@ +DROP TABLE IF EXISTS saved_intervals_tmp; +create table saved_intervals_tmp Engine=Memory as SELECT number as EventID, toIntervalSecond(number+1) as v1, toIntervalHour(number+2) as v2, toIntervalNanosecond(number+3) as v3 from numbers(2); +with toDateTime64('2023-01-01 00:00:00.000000001', 9, 'US/Eastern') as c select c+v1 as c_v1, c+v2 as c_v2, c+v3 as c_v3, date_diff(second, c, c_v1), date_diff(hour, c, c_v2), date_diff(second, c, c_v3) from saved_intervals_tmp; +DROP TABLE IF EXISTS saved_intervals_tmp; + +DROP TABLE IF EXISTS saved_intervals_mgt; +create table saved_intervals_mgt Engine=MergeTree() ORDER BY EventID as SELECT number as EventID, toIntervalSecond(number+1) as v1, toIntervalHour(number+2) as v2, toIntervalNanosecond(number+3) as v3 from numbers(2); +with toDateTime64('2023-01-01 00:00:00.000000001', 9, 'US/Eastern') as c select c+v1 as c_v1, c+v2 as c_v2, c+v3 as c_v3, date_diff(second, c, c_v1), date_diff(hour, c, c_v2), date_diff(second, c, c_v3) from saved_intervals_mgt; +DROP TABLE IF EXISTS saved_intervals_mgt; + +DROP TABLE IF EXISTS t1; +CREATE table t1 (v1 IntervalMinute) ENGINE = Memory; +INSERT INTO t1 with toDateTime64('2023-01-01 00:00:00.000000001', 9, 'US/Eastern') as c SELECT EXTRACT(MINUTE FROM c+toIntervalSecond(number * 60)) from numbers(2); +select * from t1; +DROP TABLE IF EXISTS t1; \ No newline at end of file diff --git a/tests/queries/0_stateless/02725_cnf_large_check.reference b/tests/queries/0_stateless/02725_cnf_large_check.reference new file mode 100644 index 00000000000..1c915801174 --- /dev/null +++ b/tests/queries/0_stateless/02725_cnf_large_check.reference @@ -0,0 +1,4 @@ +8 +8 +2 +2 diff --git a/tests/queries/0_stateless/02725_cnf_large_check.sql b/tests/queries/0_stateless/02725_cnf_large_check.sql new file mode 100644 index 00000000000..0780e6bcdd3 --- /dev/null +++ b/tests/queries/0_stateless/02725_cnf_large_check.sql @@ -0,0 +1,27 @@ +DROP TABLE IF EXISTS 02725_cnf; + +CREATE TABLE 02725_cnf (c0 UInt8, c1 UInt8, c2 UInt8, c3 UInt8, c4 UInt8, c5 UInt8, c6 UInt8, c7 UInt8, c8 UInt8, c9 UInt8) ENGINE = Memory; + +INSERT INTO 02725_cnf VALUES (0, 0, 0, 0, 0, 0, 0, 0, 0, 0), (0, 0, 0, 0, 0, 0, 0, 0, 0, 1), (0, 0, 0, 0, 0, 0, 0, 0, 1, 0), (0, 0, 0, 0, 0, 0, 0, 0, 1, 1), (0, 0, 0, 0, 0, 0, 0, 1, 0, 0), (0, 0, 0, 0, 0, 0, 0, 1, 0, 1), (0, 0, 0, 0, 0, 0, 0, 1, 1, 0), (0, 0, 0, 0, 0, 0, 0, 1, 1, 1); + +SELECT count() +FROM 02725_cnf +WHERE (c5 AND (NOT c0)) OR ((NOT c3) AND (NOT c6) AND (NOT c1) AND (NOT c6)) OR (c7 AND (NOT c3) AND (NOT c5) AND (NOT c7)) OR ((NOT c8) AND c5) OR ((NOT c0)) OR ((NOT c8) AND (NOT c5) AND c1 AND c6 AND c3) OR (c7 AND (NOT c0) AND c6 AND c1 AND (NOT c2)) OR (c3 AND (NOT c9) AND c1) +SETTINGS convert_query_to_cnf = 1, allow_experimental_analyzer = 1; + +SELECT count() +FROM 02725_cnf +WHERE (c5 AND (NOT c0)) OR ((NOT c3) AND (NOT c6) AND (NOT c1) AND (NOT c6)) OR (c7 AND (NOT c3) AND (NOT c5) AND (NOT c7)) OR ((NOT c8) AND c5) OR ((NOT c0)) OR ((NOT c8) AND (NOT c5) AND c1 AND c6 AND c3) OR (c7 AND (NOT c0) AND c6 AND c1 AND (NOT c2)) OR (c3 AND (NOT c9) AND c1) +SETTINGS convert_query_to_cnf = 1, allow_experimental_analyzer = 0; + +SELECT count() +FROM 02725_cnf +WHERE ((NOT c2) AND c2 AND (NOT c1)) OR ((NOT c2) AND c3 AND (NOT c5)) OR ((NOT c7) AND (NOT c8)) OR (c9 AND c6 AND c8 AND (NOT c8) AND (NOT c7)) +SETTINGS convert_query_to_cnf = 1, allow_experimental_analyzer = 1; + +SELECT count() +FROM 02725_cnf +WHERE ((NOT c2) AND c2 AND (NOT c1)) OR ((NOT c2) AND c3 AND (NOT c5)) OR ((NOT c7) AND (NOT c8)) OR (c9 AND c6 AND c8 AND (NOT c8) AND (NOT c7)) +SETTINGS convert_query_to_cnf = 1, allow_experimental_analyzer = 0; + +DROP TABLE 02725_cnf; diff --git a/tests/queries/0_stateless/02733_fix_distinct_in_order_bug_49622.reference b/tests/queries/0_stateless/02733_fix_distinct_in_order_bug_49622.reference new file mode 100644 index 00000000000..19f58f6ca91 --- /dev/null +++ b/tests/queries/0_stateless/02733_fix_distinct_in_order_bug_49622.reference @@ -0,0 +1,2 @@ + 1 + 2 diff --git a/tests/queries/0_stateless/02733_fix_distinct_in_order_bug_49622.sql b/tests/queries/0_stateless/02733_fix_distinct_in_order_bug_49622.sql new file mode 100644 index 00000000000..9501a2c0761 --- /dev/null +++ b/tests/queries/0_stateless/02733_fix_distinct_in_order_bug_49622.sql @@ -0,0 +1,15 @@ +set optimize_distinct_in_order=1; + +DROP TABLE IF EXISTS test_string; + +CREATE TABLE test_string +( + `c1` String, + `c2` String +) +ENGINE = MergeTree +ORDER BY c1; + +INSERT INTO test_string(c1, c2) VALUES ('1', ''), ('2', ''); + +SELECT DISTINCT c2, c1 FROM test_string; diff --git a/tests/queries/0_stateless/02734_big_int_from_float_ubsan.reference b/tests/queries/0_stateless/02734_big_int_from_float_ubsan.reference new file mode 100644 index 00000000000..9972842f982 --- /dev/null +++ b/tests/queries/0_stateless/02734_big_int_from_float_ubsan.reference @@ -0,0 +1 @@ +1 1 diff --git a/tests/queries/0_stateless/02734_big_int_from_float_ubsan.sql b/tests/queries/0_stateless/02734_big_int_from_float_ubsan.sql new file mode 100644 index 00000000000..9fbf54c1a4d --- /dev/null +++ b/tests/queries/0_stateless/02734_big_int_from_float_ubsan.sql @@ -0,0 +1,9 @@ +WITH + 18 AS precision, + toUInt256(-1) AS int, + toUInt256(toFloat64(int)) AS converted, + toString(int) AS int_str, + toString(converted) AS converted_str +SELECT + length(int_str) = length(converted_str) AS have_same_length, + substring(int_str, 1, precision) = substring(converted_str, 1, precision) AS have_same_prefix