diff --git a/.gitmodules b/.gitmodules index f372a309cad..26889ddbb56 100644 --- a/.gitmodules +++ b/.gitmodules @@ -30,9 +30,6 @@ [submodule "contrib/re2"] path = contrib/re2 url = https://github.com/google/re2.git -[submodule "contrib/llvm"] - path = contrib/llvm - url = https://github.com/ClickHouse/llvm [submodule "contrib/mariadb-connector-c"] path = contrib/mariadb-connector-c url = https://github.com/ClickHouse/mariadb-connector-c.git @@ -284,3 +281,6 @@ [submodule "contrib/c-ares"] path = contrib/c-ares url = https://github.com/ClickHouse/c-ares +[submodule "contrib/llvm-project"] + path = contrib/llvm-project + url = https://github.com/ClickHouse/llvm-project.git diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 486fca60912..bda040fbf81 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -107,7 +107,7 @@ if (ENABLE_TESTS) add_contrib (googletest-cmake googletest) endif() -add_contrib (llvm-cmake llvm) +add_contrib (llvm-project-cmake llvm-project) add_contrib (libxml2-cmake libxml2) add_contrib (aws-s3-cmake aws diff --git a/contrib/llvm b/contrib/llvm deleted file mode 160000 index 0db5bf5bd24..00000000000 --- a/contrib/llvm +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 0db5bf5bd2452cd8f1283a1fcdc04845af705bfc diff --git a/contrib/llvm-cmake/CMakeLists.txt b/contrib/llvm-cmake/CMakeLists.txt deleted file mode 100644 index 4a4a5cef62e..00000000000 --- a/contrib/llvm-cmake/CMakeLists.txt +++ /dev/null @@ -1,112 +0,0 @@ -if (APPLE OR NOT ARCH_AMD64 OR SANITIZE STREQUAL "undefined") - set (ENABLE_EMBEDDED_COMPILER_DEFAULT OFF) -else() - set (ENABLE_EMBEDDED_COMPILER_DEFAULT ON) -endif() - -option (ENABLE_EMBEDDED_COMPILER "Enable support for 'compile_expressions' option for query execution" ${ENABLE_EMBEDDED_COMPILER_DEFAULT}) - -if (NOT ENABLE_EMBEDDED_COMPILER) - message(STATUS "Not using LLVM") - return() -endif() - -set (LLVM_FOUND 1) -set (LLVM_VERSION "12.0.0bundled") -set (LLVM_INCLUDE_DIRS - "${ClickHouse_SOURCE_DIR}/contrib/llvm/llvm/include" - "${ClickHouse_BINARY_DIR}/contrib/llvm/llvm/include" -) -set (LLVM_LIBRARY_DIRS "${ClickHouse_BINARY_DIR}/contrib/llvm/llvm") - -# This list was generated by listing all LLVM libraries, compiling the binary and removing all libraries while it still compiles. -set (REQUIRED_LLVM_LIBRARIES - LLVMExecutionEngine - LLVMRuntimeDyld - LLVMAsmPrinter - LLVMDebugInfoDWARF - LLVMGlobalISel - LLVMSelectionDAG - LLVMMCDisassembler - LLVMPasses - LLVMCodeGen - LLVMipo - LLVMBitWriter - LLVMInstrumentation - LLVMScalarOpts - LLVMAggressiveInstCombine - LLVMInstCombine - LLVMVectorize - LLVMTransformUtils - LLVMTarget - LLVMAnalysis - LLVMProfileData - LLVMObject - LLVMBitReader - LLVMCore - LLVMRemarks - LLVMBitstreamReader - LLVMMCParser - LLVMMC - LLVMBinaryFormat - LLVMDebugInfoCodeView - LLVMSupport - LLVMDemangle -) - -if (ARCH_AMD64) - list(APPEND REQUIRED_LLVM_LIBRARIES LLVMX86Info LLVMX86Desc LLVMX86CodeGen) -elseif (ARCH_AARCH64) - list(APPEND REQUIRED_LLVM_LIBRARIES LLVMAArch64Info LLVMAArch64Desc LLVMAArch64CodeGen) -endif () - -#function(llvm_libs_all REQUIRED_LLVM_LIBRARIES) -# llvm_map_components_to_libnames (result all) -# if (USE_STATIC_LIBRARIES OR NOT "LLVM" IN_LIST result) -# list (REMOVE_ITEM result "LTO" "LLVM") -# else() -# set (result "LLVM") -# endif () -# list (APPEND result ${CMAKE_DL_LIBS} ch_contrib::zlib) -# set (${REQUIRED_LLVM_LIBRARIES} ${result} PARENT_SCOPE) -#endfunction() - -message (STATUS "LLVM include Directory: ${LLVM_INCLUDE_DIRS}") -message (STATUS "LLVM library Directory: ${LLVM_LIBRARY_DIRS}") -message (STATUS "LLVM C++ compiler flags: ${LLVM_CXXFLAGS}") - -# ld: unknown option: --color-diagnostics -set (LINKER_SUPPORTS_COLOR_DIAGNOSTICS 0 CACHE INTERNAL "") - -# Do not adjust RPATH in llvm, since then it will not be able to find libcxx/libcxxabi/libunwind -set (CMAKE_INSTALL_RPATH "ON") -set (LLVM_COMPILER_CHECKED 1 CACHE INTERNAL "") -set (LLVM_ENABLE_EH 1 CACHE INTERNAL "") -set (LLVM_ENABLE_RTTI 1 CACHE INTERNAL "") -set (LLVM_ENABLE_PIC 0 CACHE INTERNAL "") -set (LLVM_TARGETS_TO_BUILD "X86;AArch64" CACHE STRING "") - -# Need to use C++17 since the compilation is not possible with C++20 currently, due to ambiguous operator != etc. -# LLVM project will set its default value for the -std=... but our global setting from CMake will override it. -set (CMAKE_CXX_STANDARD 17) - -set (LLVM_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/llvm/llvm") -set (LLVM_BINARY_DIR "${ClickHouse_BINARY_DIR}/contrib/llvm/llvm") -add_subdirectory ("${LLVM_SOURCE_DIR}" "${LLVM_BINARY_DIR}") -set_directory_properties (PROPERTIES - # due to llvm crosscompile cmake does not know how to clean it, and on clean - # will lead to the following error: - # - # ninja: error: remove(contrib/llvm/llvm/NATIVE): Directory not empty - # - ADDITIONAL_CLEAN_FILES "${LLVM_BINARY_DIR}" - # llvm's cmake configuring this file only when cmake runs, - # and after clean cmake will not know that it should re-run, - # add explicitly depends from llvm-config.h - CMAKE_CONFIGURE_DEPENDS "${LLVM_BINARY_DIR}/include/llvm/Config/llvm-config.h" -) - -add_library (_llvm INTERFACE) -target_link_libraries (_llvm INTERFACE ${REQUIRED_LLVM_LIBRARIES}) -target_include_directories (_llvm SYSTEM BEFORE INTERFACE ${LLVM_INCLUDE_DIRS}) -add_library(ch_contrib::llvm ALIAS _llvm) diff --git a/contrib/llvm-project b/contrib/llvm-project new file mode 160000 index 00000000000..6ca2b5b3927 --- /dev/null +++ b/contrib/llvm-project @@ -0,0 +1 @@ +Subproject commit 6ca2b5b3927226f6bcf6c656f502ff5d012ad9b6 diff --git a/contrib/llvm-project-cmake/CMakeLists.txt b/contrib/llvm-project-cmake/CMakeLists.txt new file mode 100644 index 00000000000..510436951ec --- /dev/null +++ b/contrib/llvm-project-cmake/CMakeLists.txt @@ -0,0 +1,122 @@ +if (APPLE OR NOT ARCH_AMD64 OR SANITIZE STREQUAL "undefined" OR NOT USE_STATIC_LIBRARIES) + set (ENABLE_EMBEDDED_COMPILER_DEFAULT OFF) +else() + set (ENABLE_EMBEDDED_COMPILER_DEFAULT ON) +endif() + +option (ENABLE_EMBEDDED_COMPILER "Enable support for 'compile_expressions' option for query execution" ${ENABLE_EMBEDDED_COMPILER_DEFAULT}) + +if (NOT ENABLE_EMBEDDED_COMPILER) + message(STATUS "Not using LLVM") + return() +endif() + +# TODO: Enable shared library build +# TODO: Enable compilation on AArch64 + +set (LLVM_VERSION "13.0.0bundled") +set (LLVM_INCLUDE_DIRS + "${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm/include" + "${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm/include" +) +set (LLVM_LIBRARY_DIRS "${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm") + +# This list was generated by listing all LLVM libraries, compiling the binary and removing all libraries while it still compiles. +set (REQUIRED_LLVM_LIBRARIES + LLVMExecutionEngine + LLVMRuntimeDyld + LLVMAsmPrinter + LLVMDebugInfoDWARF + LLVMGlobalISel + LLVMSelectionDAG + LLVMMCDisassembler + LLVMPasses + LLVMCodeGen + LLVMipo + LLVMBitWriter + LLVMInstrumentation + LLVMScalarOpts + LLVMAggressiveInstCombine + LLVMInstCombine + LLVMVectorize + LLVMTransformUtils + LLVMTarget + LLVMAnalysis + LLVMProfileData + LLVMObject + LLVMBitReader + LLVMCore + LLVMRemarks + LLVMBitstreamReader + LLVMMCParser + LLVMMC + LLVMBinaryFormat + LLVMDebugInfoCodeView + LLVMSupport + LLVMDemangle +) + +# if (ARCH_AMD64) + list(APPEND REQUIRED_LLVM_LIBRARIES LLVMX86Info LLVMX86Desc LLVMX86CodeGen) +# elseif (ARCH_AARCH64) +# list(APPEND REQUIRED_LLVM_LIBRARIES LLVMAArch64Info LLVMAArch64Desc LLVMAArch64CodeGen) +# endif () + +# ld: unknown option: --color-diagnostics +# set (LINKER_SUPPORTS_COLOR_DIAGNOSTICS 0 CACHE INTERNAL "") + +set (CMAKE_INSTALL_RPATH "ON") # Do not adjust RPATH in llvm, since then it will not be able to find libcxx/libcxxabi/libunwind +set (LLVM_COMPILER_CHECKED 1 CACHE INTERNAL "") # Skip internal compiler selection +set (LLVM_ENABLE_EH 1 CACHE INTERNAL "") # With exception handling +set (LLVM_ENABLE_RTTI 1 CACHE INTERNAL "") +set (LLVM_ENABLE_PIC 0 CACHE INTERNAL "") +set (LLVM_TARGETS_TO_BUILD "X86" CACHE STRING "") # for x86 + ARM: "X86;AArch64" + +# Omit unnecessary stuff (just the options which are ON by default) +set(LLVM_ENABLE_BACKTRACES 0 CACHE INTERNAL "") +set(LLVM_ENABLE_CRASH_OVERRIDES 0 CACHE INTERNAL "") +set(LLVM_ENABLE_TERMINFO 0 CACHE INTERNAL "") +set(LLVM_ENABLE_LIBXML2 0 CACHE INTERNAL "") +set(LLVM_ENABLE_LIBEDIT 0 CACHE INTERNAL "") +set(LLVM_ENABLE_LIBPFM 0 CACHE INTERNAL "") +set(LLVM_ENABLE_ZLIB 0 CACHE INTERNAL "") +set(LLVM_ENABLE_Z3_SOLVER 0 CACHE INTERNAL "") +set(LLVM_INCLUDE_TOOLS 0 CACHE INTERNAL "") +set(LLVM_BUILD_TOOLS 0 CACHE INTERNAL "") +set(LLVM_INCLUDE_UTILS 0 CACHE INTERNAL "") +set(LLVM_BUILD_UTILS 0 CACHE INTERNAL "") +set(LLVM_INCLUDE_RUNTIMES 0 CACHE INTERNAL "") +set(LLVM_BUILD_RUNTIMES 0 CACHE INTERNAL "") +set(LLVM_BUILD_RUNTIME 0 CACHE INTERNAL "") +set(LLVM_INCLUDE_EXAMPLES 0 CACHE INTERNAL "") +set(LLVM_INCLUDE_TESTS 0 CACHE INTERNAL "") +set(LLVM_INCLUDE_GO_TESTS 0 CACHE INTERNAL "") +set(LLVM_INCLUDE_BENCHMARKS 0 CACHE INTERNAL "") +set(LLVM_INCLUDE_DOCS 0 CACHE INTERNAL "") +set(LLVM_ENABLE_OCAMLDOC 0 CACHE INTERNAL "") +set(LLVM_ENABLE_BINDINGS 0 CACHE INTERNAL "") + +# C++20 is currently not supported due to ambiguous operator != etc. +set (CMAKE_CXX_STANDARD 17) + +set (LLVM_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm") +set (LLVM_BINARY_DIR "${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm") +add_subdirectory ("${LLVM_SOURCE_DIR}" "${LLVM_BINARY_DIR}") + +set_directory_properties (PROPERTIES + # due to llvm crosscompile cmake does not know how to clean it, and on clean + # will lead to the following error: + # + # ninja: error: remove(contrib/llvm/llvm/NATIVE): Directory not empty + # + ADDITIONAL_CLEAN_FILES "${LLVM_BINARY_DIR}" + # llvm's cmake configuring this file only when cmake runs, + # and after clean cmake will not know that it should re-run, + # add explicitly depends from llvm-config.h + CMAKE_CONFIGURE_DEPENDS "${LLVM_BINARY_DIR}/include/llvm/Config/llvm-config.h" +) + +add_library (_llvm INTERFACE) +target_link_libraries (_llvm INTERFACE ${REQUIRED_LLVM_LIBRARIES}) +target_include_directories (_llvm SYSTEM BEFORE INTERFACE ${LLVM_INCLUDE_DIRS}) +add_library(ch_contrib::llvm ALIAS _llvm) diff --git a/docker/test/stress/run.sh b/docker/test/stress/run.sh index bf76fb20928..6b9954c2431 100755 --- a/docker/test/stress/run.sh +++ b/docker/test/stress/run.sh @@ -13,25 +13,28 @@ sysctl kernel.core_pattern='core.%e.%p-%P' # Thread Fuzzer allows to check more permutations of possible thread scheduling # and find more potential issues. +# Temporarily disable ThreadFuzzer with tsan because of https://github.com/google/sanitizers/issues/1540 +is_tsan_build=$(clickhouse local -q "select value like '% -fsanitize=thread %' from system.build_options where name='CXX_FLAGS'") +if [ "$is_tsan_build" -eq "0" ]; then + export THREAD_FUZZER_CPU_TIME_PERIOD_US=1000 + export THREAD_FUZZER_SLEEP_PROBABILITY=0.1 + export THREAD_FUZZER_SLEEP_TIME_US=100000 -export THREAD_FUZZER_CPU_TIME_PERIOD_US=1000 -export THREAD_FUZZER_SLEEP_PROBABILITY=0.1 -export THREAD_FUZZER_SLEEP_TIME_US=100000 + export THREAD_FUZZER_pthread_mutex_lock_BEFORE_MIGRATE_PROBABILITY=1 + export THREAD_FUZZER_pthread_mutex_lock_AFTER_MIGRATE_PROBABILITY=1 + export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_MIGRATE_PROBABILITY=1 + export THREAD_FUZZER_pthread_mutex_unlock_AFTER_MIGRATE_PROBABILITY=1 -export THREAD_FUZZER_pthread_mutex_lock_BEFORE_MIGRATE_PROBABILITY=1 -export THREAD_FUZZER_pthread_mutex_lock_AFTER_MIGRATE_PROBABILITY=1 -export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_MIGRATE_PROBABILITY=1 -export THREAD_FUZZER_pthread_mutex_unlock_AFTER_MIGRATE_PROBABILITY=1 + export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_PROBABILITY=0.001 + export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_PROBABILITY=0.001 + export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_PROBABILITY=0.001 + export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_PROBABILITY=0.001 + export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_TIME_US=10000 -export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_PROBABILITY=0.001 -export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_PROBABILITY=0.001 -export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_PROBABILITY=0.001 -export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_PROBABILITY=0.001 -export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_TIME_US=10000 - -export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US=10000 -export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_TIME_US=10000 -export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US=10000 + export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US=10000 + export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_TIME_US=10000 + export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US=10000 +fi function install_packages() diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index b7fe7d49b7b..05c42974b8d 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -1498,7 +1498,7 @@ If not set, [tmp_path](#tmp-path) is used, otherwise it is ignored. - `move_factor` is ignored. - `keep_free_space_bytes` is ignored. - `max_data_part_size_bytes` is ignored. -- Уou must have exactly one volume in that policy. +- Policy should have exactly one volume with local disks. ::: ## uncompressed_cache_size {#server-settings-uncompressed_cache_size} diff --git a/docs/ru/operations/server-configuration-parameters/settings.md b/docs/ru/operations/server-configuration-parameters/settings.md index 832b19f9fe6..e1fb30ced0d 100644 --- a/docs/ru/operations/server-configuration-parameters/settings.md +++ b/docs/ru/operations/server-configuration-parameters/settings.md @@ -1342,12 +1342,13 @@ TCP порт для защищённого обмена данными с кли Если политика не задана, используется [tmp_path](#tmp-path). В противном случае `tmp_path` игнорируется. - :::note "Примечание" - - `move_factor` игнорируется. - - `keep_free_space_bytes` игнорируется. - - `max_data_part_size_bytes` игнорируется. - - В данной политике у вас должен быть ровно один том. - ::: +:::note "Примечание" +- `move_factor` игнорируется. +- `keep_free_space_bytes` игнорируется. +- `max_data_part_size_bytes` игнорируется. +- В данной политике должен быть ровно один том, содержащий только локальный диски. +::: + ## uncompressed_cache_size {#server-settings-uncompressed_cache_size} Размер кеша (в байтах) для несжатых данных, используемых движками таблиц семейства [MergeTree](../../operations/server-configuration-parameters/settings.md). diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 2b9d819f5eb..ffec435239e 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -203,7 +203,7 @@ void LocalServer::tryInitPath() global_context->setPath(path); - global_context->setTemporaryStorage(path + "tmp"); + global_context->setTemporaryStorage(path + "tmp", "", 0); global_context->setFlagsPath(path + "flags"); global_context->setUserFilesPath(""); // user's files are everywhere diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 14f97923ce3..242a86ba725 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -209,7 +209,7 @@ try fs::remove(it->path()); } else - LOG_DEBUG(log, "Skipped file in temporary path {}", it->path().string()); + LOG_DEBUG(log, "Found unknown file in temporary path {}", it->path().string()); } } catch (...) @@ -971,7 +971,8 @@ int Server::main(const std::vector & /*args*/) { std::string tmp_path = config().getString("tmp_path", path / "tmp/"); std::string tmp_policy = config().getString("tmp_policy", ""); - const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy); + size_t tmp_max_size = config().getUInt64("tmp_max_size", 0); + const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy, tmp_max_size); for (const DiskPtr & disk : volume->getDisks()) setupTmpPath(log, disk->getPath()); } diff --git a/src/AggregateFunctions/AggregateFunctionAvg.h b/src/AggregateFunctions/AggregateFunctionAvg.h index c41a51997df..71bd9e7f6c5 100644 --- a/src/AggregateFunctions/AggregateFunctionAvg.h +++ b/src/AggregateFunctions/AggregateFunctionAvg.h @@ -164,8 +164,10 @@ public: auto * denominator_type = toNativeType(b); static constexpr size_t denominator_offset = offsetof(Fraction, denominator); - auto * denominator_dst_ptr = b.CreatePointerCast(b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_dst_ptr, denominator_offset), denominator_type->getPointerTo()); - auto * denominator_src_ptr = b.CreatePointerCast(b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_src_ptr, denominator_offset), denominator_type->getPointerTo()); + auto * ty_aggregate_data_dst_ptr = llvm::cast(aggregate_data_dst_ptr->getType()->getScalarType())->getElementType(); + auto * denominator_dst_ptr = b.CreatePointerCast(b.CreateConstInBoundsGEP1_64(ty_aggregate_data_dst_ptr, aggregate_data_dst_ptr, denominator_offset), denominator_type->getPointerTo()); + auto * ty_aggregate_data_src_ptr = llvm::cast(aggregate_data_src_ptr->getType()->getScalarType())->getElementType(); + auto * denominator_src_ptr = b.CreatePointerCast(b.CreateConstInBoundsGEP1_64(ty_aggregate_data_src_ptr, aggregate_data_src_ptr, denominator_offset), denominator_type->getPointerTo()); auto * denominator_dst_value = b.CreateLoad(denominator_type, denominator_dst_ptr); auto * denominator_src_value = b.CreateLoad(denominator_type, denominator_src_ptr); @@ -184,7 +186,8 @@ public: auto * denominator_type = toNativeType(b); static constexpr size_t denominator_offset = offsetof(Fraction, denominator); - auto * denominator_ptr = b.CreatePointerCast(b.CreateConstGEP1_32(nullptr, aggregate_data_ptr, denominator_offset), denominator_type->getPointerTo()); + auto * ty_aggregate_data_ptr = llvm::cast(aggregate_data_ptr->getType()->getScalarType())->getElementType(); + auto * denominator_ptr = b.CreatePointerCast(b.CreateConstGEP1_32(ty_aggregate_data_ptr, aggregate_data_ptr, denominator_offset), denominator_type->getPointerTo()); auto * denominator_value = b.CreateLoad(denominator_type, denominator_ptr); auto * double_numerator = nativeCast(b, numerator_value, b.getDoubleTy()); @@ -311,7 +314,8 @@ public: auto * denominator_type = toNativeType(b); static constexpr size_t denominator_offset = offsetof(Fraction, denominator); - auto * denominator_ptr = b.CreatePointerCast(b.CreateConstGEP1_32(nullptr, aggregate_data_ptr, denominator_offset), denominator_type->getPointerTo()); + auto * ty_aggregate_data_ptr = llvm::cast(aggregate_data_ptr->getType()->getScalarType())->getElementType(); + auto * denominator_ptr = b.CreatePointerCast(b.CreateConstGEP1_32(ty_aggregate_data_ptr, aggregate_data_ptr, denominator_offset), denominator_type->getPointerTo()); auto * denominator_value_updated = b.CreateAdd(b.CreateLoad(denominator_type, denominator_ptr), llvm::ConstantInt::get(denominator_type, 1)); b.CreateStore(denominator_value_updated, denominator_ptr); } diff --git a/src/AggregateFunctions/AggregateFunctionAvgWeighted.h b/src/AggregateFunctions/AggregateFunctionAvgWeighted.h index 6189ae92ce4..bd9e12b97b6 100644 --- a/src/AggregateFunctions/AggregateFunctionAvgWeighted.h +++ b/src/AggregateFunctions/AggregateFunctionAvgWeighted.h @@ -74,7 +74,8 @@ public: auto * denominator_type = toNativeType(b); static constexpr size_t denominator_offset = offsetof(Fraction, denominator); - auto * denominator_offset_ptr = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, denominator_offset); + auto * ty_aggregate_data_ptr = llvm::cast(aggregate_data_ptr->getType()->getScalarType())->getElementType(); + auto * denominator_offset_ptr = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, denominator_offset); auto * denominator_ptr = b.CreatePointerCast(denominator_offset_ptr, denominator_type->getPointerTo()); auto * weight_cast_to_denominator = nativeCast(b, arguments_types[1], argument_values[1], denominator_type); diff --git a/src/AggregateFunctions/AggregateFunctionIf.cpp b/src/AggregateFunctions/AggregateFunctionIf.cpp index c7b4d382565..9b548e1b3f3 100644 --- a/src/AggregateFunctions/AggregateFunctionIf.cpp +++ b/src/AggregateFunctions/AggregateFunctionIf.cpp @@ -207,7 +207,8 @@ public: if constexpr (result_is_nullable) b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr); - auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); + auto * ty_aggregate_data_ptr = llvm::cast(aggregate_data_ptr->getType()->getScalarType())->getElementType(); + auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size); this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, { removeNullable(nullable_type) }, { wrapped_value }); b.CreateBr(join_block); @@ -419,7 +420,8 @@ public: if constexpr (result_is_nullable) b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr); - auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); + auto * ty_aggregate_data_ptr = llvm::cast(aggregate_data_ptr->getType()->getScalarType())->getElementType(); + auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size); this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, non_nullable_types, wrapped_values); b.CreateBr(join_block); diff --git a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h index ad633418ec3..b96ed5bc6fe 100644 --- a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h +++ b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h @@ -201,7 +201,8 @@ public: static constexpr size_t value_offset_from_structure = offsetof(SingleValueDataFixed, value); auto * type = toNativeType(builder); - auto * value_ptr_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, value_offset_from_structure); + auto * ty_aggregate_data_ptr = llvm::cast(aggregate_data_ptr->getType()->getScalarType())->getElementType(); + auto * value_ptr_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, value_offset_from_structure); auto * value_ptr = b.CreatePointerCast(value_ptr_with_offset, type->getPointerTo()); return value_ptr; diff --git a/src/AggregateFunctions/AggregateFunctionNull.h b/src/AggregateFunctions/AggregateFunctionNull.h index c26f4b03b14..aa7e15823cb 100644 --- a/src/AggregateFunctions/AggregateFunctionNull.h +++ b/src/AggregateFunctions/AggregateFunctionNull.h @@ -225,7 +225,8 @@ public: if constexpr (result_is_nullable) b.CreateMemSet(aggregate_data_ptr, llvm::ConstantInt::get(b.getInt8Ty(), 0), this->prefix_size, llvm::assumeAligned(this->alignOfData())); - auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); + auto * ty_aggregate_data_ptr = llvm::cast(aggregate_data_ptr->getType()->getScalarType())->getElementType(); + auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size); this->nested_function->compileCreate(b, aggregate_data_ptr_with_prefix_size_offset); } @@ -235,16 +236,25 @@ public: if constexpr (result_is_nullable) { +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wdeprecated-declarations" +#endif auto * aggregate_data_is_null_dst_value = b.CreateLoad(aggregate_data_dst_ptr); auto * aggregate_data_is_null_src_value = b.CreateLoad(aggregate_data_src_ptr); +#ifdef __clang__ +#pragma clang diagnostic pop +#endif auto * is_src_null = nativeBoolCast(b, std::make_shared(), aggregate_data_is_null_src_value); auto * is_null_result_value = b.CreateSelect(is_src_null, llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_is_null_dst_value); b.CreateStore(is_null_result_value, aggregate_data_dst_ptr); } - auto * aggregate_data_dst_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_dst_ptr, this->prefix_size); - auto * aggregate_data_src_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_src_ptr, this->prefix_size); + auto * ty_aggregate_data_dst_ptr = llvm::cast(aggregate_data_dst_ptr->getType()->getScalarType())->getElementType(); + auto * aggregate_data_dst_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_dst_ptr, aggregate_data_dst_ptr, this->prefix_size); + auto * ty_aggregate_data_src_ptr = llvm::cast(aggregate_data_src_ptr->getType()->getScalarType())->getElementType(); + auto * aggregate_data_src_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_src_ptr, aggregate_data_src_ptr, this->prefix_size); this->nested_function->compileMerge(b, aggregate_data_dst_ptr_with_prefix_size_offset, aggregate_data_src_ptr_with_prefix_size_offset); } @@ -278,7 +288,8 @@ public: b.CreateBr(join_block); b.SetInsertPoint(if_not_null); - auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); + auto * ty_aggregate_data_ptr = llvm::cast(aggregate_data_ptr->getType()->getScalarType())->getElementType(); + auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size); auto * nested_result = this->nested_function->compileGetResult(builder, aggregate_data_ptr_with_prefix_size_offset); b.CreateStore(b.CreateInsertValue(nullable_value, nested_result, {0}), nullable_value_ptr); b.CreateBr(join_block); @@ -374,7 +385,8 @@ public: if constexpr (result_is_nullable) b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr); - auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); + auto * ty_aggregate_data_ptr = llvm::cast(aggregate_data_ptr->getType()->getScalarType())->getElementType(); + auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size); this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, { removeNullable(nullable_type) }, { wrapped_value }); b.CreateBr(join_block); @@ -598,7 +610,8 @@ public: if constexpr (result_is_nullable) b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr); - auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); + auto * ty_aggregate_data_ptr = llvm::cast(aggregate_data_ptr->getType()->getScalarType())->getElementType(); + auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size); this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, arguments_types, wrapped_values); b.CreateBr(join_block); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 5dedc6117aa..0df030d9d2f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -399,6 +399,9 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(UInt64, max_network_bandwidth_for_user, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means unlimited.", 0)\ M(UInt64, max_network_bandwidth_for_all_users, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means unlimited.", 0) \ \ + M(UInt64, max_temp_data_on_disk_size_for_user, 0, "The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running user queries. Zero means unlimited.", 0)\ + M(UInt64, max_temp_data_on_disk_size_for_query, 0, "The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running queries. Zero means unlimited.", 0)\ + \ M(UInt64, backup_threads, 16, "The maximum number of threads to execute BACKUP requests.", 0) \ M(UInt64, restore_threads, 16, "The maximum number of threads to execute RESTORE requests.", 0) \ \ diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index c51864740f5..db27b4fa975 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -30,6 +30,8 @@ #include #include +#include + namespace DB { namespace ErrorCodes @@ -862,7 +864,19 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep for (const auto & id : dropped_tables) DatabaseCatalog::instance().waitTableFinallyDropped(id); - for (const auto & name_and_meta : table_name_to_metadata) + /// FIXME: Use proper dependency calculation instead of just moving MV to the end + using NameToMetadata = std::pair; + std::vector table_name_to_metadata_sorted; + table_name_to_metadata_sorted.reserve(table_name_to_metadata.size()); + std::move(table_name_to_metadata.begin(), table_name_to_metadata.end(), std::back_inserter(table_name_to_metadata_sorted)); + std::sort(table_name_to_metadata_sorted.begin(), table_name_to_metadata_sorted.end(), [](const NameToMetadata & lhs, const NameToMetadata & rhs) -> bool + { + const bool is_materialized_view_lhs = lhs.second.find("MATERIALIZED VIEW") != std::string::npos; + const bool is_materialized_view_rhs = rhs.second.find("MATERIALIZED VIEW") != std::string::npos; + return is_materialized_view_lhs < is_materialized_view_rhs; + }); + + for (const auto & name_and_meta : table_name_to_metadata_sorted) { if (isTableExist(name_and_meta.first, getContext())) { diff --git a/src/Disks/DiskDecorator.cpp b/src/Disks/DiskDecorator.cpp index f61496ab078..73540aaa0ab 100644 --- a/src/Disks/DiskDecorator.cpp +++ b/src/Disks/DiskDecorator.cpp @@ -241,4 +241,11 @@ DiskObjectStoragePtr DiskDecorator::createDiskObjectStorage() return delegate->createDiskObjectStorage(); } +DiskPtr DiskDecorator::getNestedDisk() const +{ + if (const auto * decorator = dynamic_cast(delegate.get())) + return decorator->getNestedDisk(); + return delegate; +} + } diff --git a/src/Disks/DiskDecorator.h b/src/Disks/DiskDecorator.h index 6bb8b541767..dcd12ab4bbf 100644 --- a/src/Disks/DiskDecorator.h +++ b/src/Disks/DiskDecorator.h @@ -107,6 +107,8 @@ public: bool supportsChmod() const override { return delegate->supportsChmod(); } void chmod(const String & path, mode_t mode) override { delegate->chmod(path, mode); } + virtual DiskPtr getNestedDisk() const; + protected: Executor & getExecutor() override; diff --git a/src/Disks/DiskRestartProxy.cpp b/src/Disks/DiskRestartProxy.cpp index bd4705f3c2d..2d923d71622 100644 --- a/src/Disks/DiskRestartProxy.cpp +++ b/src/Disks/DiskRestartProxy.cpp @@ -331,6 +331,20 @@ void DiskRestartProxy::getRemotePathsRecursive( return DiskDecorator::getRemotePathsRecursive(path, paths_map); } +DiskPtr DiskRestartProxy::getNestedDisk() const +{ + DiskPtr delegate_copy; + + { + ReadLock lock (mutex); + delegate_copy = delegate; + } + + if (const auto * decorator = dynamic_cast(delegate_copy.get())) + return decorator->getNestedDisk(); + return delegate_copy; +} + void DiskRestartProxy::restart(ContextPtr context) { /// Speed up processing unhealthy requests. diff --git a/src/Disks/DiskRestartProxy.h b/src/Disks/DiskRestartProxy.h index 7ab6b4d74af..fb4dde3bfa3 100644 --- a/src/Disks/DiskRestartProxy.h +++ b/src/Disks/DiskRestartProxy.h @@ -71,6 +71,8 @@ public: void restart(ContextPtr context); + DiskPtr getNestedDisk() const override; + private: friend class RestartAwareReadBuffer; friend class RestartAwareWriteBuffer; diff --git a/src/Formats/NativeReader.cpp b/src/Formats/NativeReader.cpp index 3ad0ce5cfc4..2500158374d 100644 --- a/src/Formats/NativeReader.cpp +++ b/src/Formats/NativeReader.cpp @@ -237,6 +237,7 @@ Block NativeReader::read() else tmp_res.insert({col.type->createColumn()->cloneResized(rows), col.type, col.name}); } + tmp_res.info = res.info; res.swap(tmp_res); } diff --git a/src/Formats/TemporaryFileStream.cpp b/src/Formats/TemporaryFileStreamLegacy.cpp similarity index 75% rename from src/Formats/TemporaryFileStream.cpp rename to src/Formats/TemporaryFileStreamLegacy.cpp index f4c66b67a45..e6651f0e83b 100644 --- a/src/Formats/TemporaryFileStream.cpp +++ b/src/Formats/TemporaryFileStreamLegacy.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include #include @@ -12,20 +12,20 @@ namespace DB { /// To read the data that was flushed into the temporary data file. -TemporaryFileStream::TemporaryFileStream(const std::string & path) +TemporaryFileStreamLegacy::TemporaryFileStreamLegacy(const std::string & path) : file_in(path) , compressed_in(file_in) , block_in(std::make_unique(compressed_in, DBMS_TCP_PROTOCOL_VERSION)) {} -TemporaryFileStream::TemporaryFileStream(const std::string & path, const Block & header_) +TemporaryFileStreamLegacy::TemporaryFileStreamLegacy(const std::string & path, const Block & header_) : file_in(path) , compressed_in(file_in) , block_in(std::make_unique(compressed_in, header_, 0)) {} /// Flush data from input stream into file for future reading -TemporaryFileStream::Stat TemporaryFileStream::write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec) +TemporaryFileStreamLegacy::Stat TemporaryFileStreamLegacy::write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec) { WriteBufferFromFile file_buf(path); CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {})); diff --git a/src/Formats/TemporaryFileStream.h b/src/Formats/TemporaryFileStreamLegacy.h similarity index 74% rename from src/Formats/TemporaryFileStream.h rename to src/Formats/TemporaryFileStreamLegacy.h index e858da1dc33..90e318c970a 100644 --- a/src/Formats/TemporaryFileStream.h +++ b/src/Formats/TemporaryFileStreamLegacy.h @@ -9,8 +9,10 @@ namespace DB { +/// Used only in MergeJoin +/// TODO: use `TemporaryDataOnDisk` instead /// To read the data that was flushed into the temporary data file. -struct TemporaryFileStream +struct TemporaryFileStreamLegacy { struct Stat { @@ -22,8 +24,8 @@ struct TemporaryFileStream CompressedReadBuffer compressed_in; std::unique_ptr block_in; - explicit TemporaryFileStream(const std::string & path); - TemporaryFileStream(const std::string & path, const Block & header_); + explicit TemporaryFileStreamLegacy(const std::string & path); + TemporaryFileStreamLegacy(const std::string & path, const Block & header_); /// Flush data from input stream into file for future reading static Stat write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec); diff --git a/src/Functions/FunctionsComparison.h b/src/Functions/FunctionsComparison.h index 7bbb1c1096c..2129201f2eb 100644 --- a/src/Functions/FunctionsComparison.h +++ b/src/Functions/FunctionsComparison.h @@ -387,15 +387,38 @@ struct StringEqualsImpl size_t size = a_offsets.size(); ColumnString::Offset prev_a_offset = 0; - for (size_t i = 0; i < size; ++i) + if (b_size == 0) { - auto a_size = a_offsets[i] - prev_a_offset - 1; + /* + * Add the fast path of string comparison if the string constant is empty + * and b_size is 0. If a_size is also 0, both of string a and b are empty + * string. There is no need to call memequalSmallAllowOverflow15() for + * string comparison. + */ + for (size_t i = 0; i < size; ++i) + { + auto a_size = a_offsets[i] - prev_a_offset - 1; - c[i] = positive == memequalSmallAllowOverflow15( - a_data.data() + prev_a_offset, a_size, - b_data.data(), b_size); + if (a_size == 0) + c[i] = positive; + else + c[i] = !positive; - prev_a_offset = a_offsets[i]; + prev_a_offset = a_offsets[i]; + } + } + else + { + for (size_t i = 0; i < size; ++i) + { + auto a_size = a_offsets[i] - prev_a_offset - 1; + + c[i] = positive == memequalSmallAllowOverflow15( + a_data.data() + prev_a_offset, a_size, + b_data.data(), b_size); + + prev_a_offset = a_offsets[i]; + } } } diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index f740dff7c42..4399faac5d2 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include @@ -59,6 +60,20 @@ namespace CurrentMetrics extern const Metric TemporaryFilesForAggregation; } +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_AGGREGATED_DATA_VARIANT; + extern const int TOO_MANY_ROWS; + extern const int EMPTY_DATA_PASSED; + extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS; + extern const int LOGICAL_ERROR; +} + +} + namespace { /** Collects observed HashMap-s sizes to avoid redundant intermediate resizes. @@ -311,17 +326,6 @@ size_t getMinBytesForPrefetch() namespace DB { -namespace ErrorCodes -{ - extern const int UNKNOWN_AGGREGATED_DATA_VARIANT; - extern const int NOT_ENOUGH_SPACE; - extern const int TOO_MANY_ROWS; - extern const int EMPTY_DATA_PASSED; - extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS; - extern const int LOGICAL_ERROR; -} - - AggregatedDataVariants::~AggregatedDataVariants() { if (aggregator && !aggregator->all_aggregates_has_trivial_destructor) @@ -566,6 +570,7 @@ Aggregator::Aggregator(const Block & header_, const Params & params_) : header(header_) , keys_positions(calculateKeysPositions(header, params_)) , params(params_) + , tmp_data(params.tmp_data_scope ? std::make_unique(params.tmp_data_scope) : nullptr) , min_bytes_for_prefetch(getMinBytesForPrefetch()) { /// Use query-level memory tracker @@ -1562,30 +1567,28 @@ bool Aggregator::executeOnBlock(Columns columns, void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size) const { + if (!tmp_data) + throw Exception("Cannot write to temporary file because temporary file is not initialized", ErrorCodes::LOGICAL_ERROR); + Stopwatch watch; size_t rows = data_variants.size(); - auto file = createTempFile(max_temp_file_size); - - const auto & path = file->path(); - WriteBufferFromFile file_buf(path); - CompressedWriteBuffer compressed_buf(file_buf); - NativeWriter block_out(compressed_buf, DBMS_TCP_PROTOCOL_VERSION, getHeader(false)); - - LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", path); + auto & out_stream = tmp_data->createStream(getHeader(false), CurrentMetrics::TemporaryFilesForAggregation, max_temp_file_size); ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart); + LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", out_stream.path()); + /// Flush only two-level data and possibly overflow data. #define M(NAME) \ else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ - writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out); + writeToTemporaryFileImpl(data_variants, *data_variants.NAME, out_stream); if (false) {} // NOLINT APPLY_FOR_VARIANTS_TWO_LEVEL(M) #undef M else - throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); + throw Exception("Unknown aggregated data variant", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); /// NOTE Instead of freeing up memory and creating new hash tables and arenas, you can re-use the old ones. data_variants.init(data_variants.type); @@ -1598,62 +1601,32 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si data_variants.without_key = place; } - block_out.flush(); - compressed_buf.next(); - file_buf.next(); + auto stat = out_stream.finishWriting(); + + ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, stat.compressed_size); + ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, stat.uncompressed_size); + ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, stat.compressed_size); + ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, stat.uncompressed_size); double elapsed_seconds = watch.elapsedSeconds(); - size_t compressed_bytes = file_buf.count(); - size_t uncompressed_bytes = compressed_buf.count(); - - { - std::lock_guard lock(temporary_files.mutex); - temporary_files.files.emplace_back(std::move(file)); - temporary_files.sum_size_uncompressed += uncompressed_bytes; - temporary_files.sum_size_compressed += compressed_bytes; - } - - ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, compressed_bytes); - ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, uncompressed_bytes); - ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, compressed_bytes); - ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, uncompressed_bytes); - + double compressed_size = stat.compressed_size; + double uncompressed_size = stat.uncompressed_size; LOG_DEBUG(log, "Written part in {:.3f} sec., {} rows, {} uncompressed, {} compressed," " {:.3f} uncompressed bytes per row, {:.3f} compressed bytes per row, compression rate: {:.3f}" " ({:.3f} rows/sec., {}/sec. uncompressed, {}/sec. compressed)", elapsed_seconds, rows, - ReadableSize(uncompressed_bytes), - ReadableSize(compressed_bytes), - static_cast(uncompressed_bytes) / rows, - static_cast(compressed_bytes) / rows, - static_cast(uncompressed_bytes) / compressed_bytes, + ReadableSize(uncompressed_size), + ReadableSize(compressed_size), + static_cast(uncompressed_size) / rows, + static_cast(compressed_size) / rows, + static_cast(uncompressed_size) / compressed_size, static_cast(rows) / elapsed_seconds, - ReadableSize(static_cast(uncompressed_bytes) / elapsed_seconds), - ReadableSize(static_cast(compressed_bytes) / elapsed_seconds)); + ReadableSize(static_cast(uncompressed_size) / elapsed_seconds), + ReadableSize(static_cast(compressed_size) / elapsed_seconds)); } - -TemporaryFileOnDiskHolder Aggregator::createTempFile(size_t max_temp_file_size) const -{ - auto file = std::make_unique(params.tmp_volume->getDisk(), CurrentMetrics::TemporaryFilesForAggregation); - - // enoughSpaceInDirectory() is not enough to make it right, since - // another process (or another thread of aggregator) can consume all - // space. - // - // But true reservation (IVolume::reserve()) cannot be used here since - // current_memory_usage does not takes compression into account and - // will reserve way more that actually will be used. - // - // Hence let's do a simple check. - if (max_temp_file_size > 0 && !enoughSpaceInDirectory(file->getPath(), max_temp_file_size)) - throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for external aggregation in '{}'", file->path()); - return file; -} - - template Block Aggregator::convertOneBucketToBlock( AggregatedDataVariants & data_variants, @@ -1703,7 +1676,7 @@ template void Aggregator::writeToTemporaryFileImpl( AggregatedDataVariants & data_variants, Method & method, - NativeWriter & out) const + TemporaryFileStream & out) const { size_t max_temporary_block_size_rows = 0; size_t max_temporary_block_size_bytes = 0; diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index fedb9ad0651..0697d67af78 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -925,7 +926,7 @@ public: /// Return empty result when aggregating without keys on empty set. bool empty_result_for_aggregation_by_empty_set; - VolumePtr tmp_volume; + TemporaryDataOnDiskScopePtr tmp_data_scope; /// Settings is used to determine cache size. No threads are created. size_t max_threads; @@ -970,7 +971,7 @@ public: size_t group_by_two_level_threshold_bytes_, size_t max_bytes_before_external_group_by_, bool empty_result_for_aggregation_by_empty_set_, - VolumePtr tmp_volume_, + TemporaryDataOnDiskScopePtr tmp_data_scope_, size_t max_threads_, size_t min_free_disk_space_, bool compile_aggregate_expressions_, @@ -990,7 +991,7 @@ public: , group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_) , max_bytes_before_external_group_by(max_bytes_before_external_group_by_) , empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_) - , tmp_volume(tmp_volume_) + , tmp_data_scope(std::move(tmp_data_scope_)) , max_threads(max_threads_) , min_free_disk_space(min_free_disk_space_) , compile_aggregate_expressions(compile_aggregate_expressions_) @@ -1071,25 +1072,9 @@ public: /// For external aggregation. void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size = 0) const; - TemporaryFileOnDiskHolder createTempFile(size_t max_temp_file_size) const; + bool hasTemporaryData() const { return tmp_data && !tmp_data->empty(); } - bool hasTemporaryFiles() const { return !temporary_files.empty(); } - - struct TemporaryFiles - { - std::vector files; - size_t sum_size_uncompressed = 0; - size_t sum_size_compressed = 0; - mutable std::mutex mutex; - - bool empty() const - { - std::lock_guard lock(mutex); - return files.empty(); - } - }; - - const TemporaryFiles & getTemporaryFiles() const { return temporary_files; } + const TemporaryDataOnDisk & getTemporaryData() const { return *tmp_data; } /// Get data structure of the result. Block getHeader(bool final) const; @@ -1148,7 +1133,7 @@ private: Poco::Logger * log = &Poco::Logger::get("Aggregator"); /// For external aggregation. - mutable TemporaryFiles temporary_files; + TemporaryDataOnDiskPtr tmp_data; size_t min_bytes_for_prefetch = 0; @@ -1251,7 +1236,7 @@ private: void writeToTemporaryFileImpl( AggregatedDataVariants & data_variants, Method & method, - NativeWriter & out) const; + TemporaryFileStream & out) const; /// Merge NULL key data from hash table `src` into `dst`. template diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 321a46baff6..9604d1796e9 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -188,7 +190,7 @@ struct ContextSharedPart : boost::noncopyable ConfigurationPtr config; /// Global configuration settings. String tmp_path; /// Path to the temporary files that occur when processing the request. - mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request. + TemporaryDataOnDiskScopePtr temp_data_on_disk; /// Temporary files that occur when processing the request accounted here. mutable std::unique_ptr embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization. mutable std::unique_ptr external_dictionaries_loader; @@ -681,10 +683,27 @@ Strings Context::getWarnings() const return common_warnings; } +/// TODO: remove, use `getTempDataOnDisk` VolumePtr Context::getTemporaryVolume() const { auto lock = getLock(); - return shared->tmp_volume; + if (shared->temp_data_on_disk) + return shared->temp_data_on_disk->getVolume(); + return nullptr; +} + +TemporaryDataOnDiskScopePtr Context::getTempDataOnDisk() const +{ + auto lock = getLock(); + if (this->temp_data_on_disk) + return this->temp_data_on_disk; + return shared->temp_data_on_disk; +} + +void Context::setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_) +{ + auto lock = getLock(); + this->temp_data_on_disk = std::move(temp_data_on_disk_); } void Context::setPath(const String & path) @@ -693,7 +712,7 @@ void Context::setPath(const String & path) shared->path = path; - if (shared->tmp_path.empty() && !shared->tmp_volume) + if (shared->tmp_path.empty() && !shared->temp_data_on_disk) shared->tmp_path = shared->path + "tmp/"; if (shared->flags_path.empty()) @@ -712,9 +731,10 @@ void Context::setPath(const String & path) shared->user_defined_path = shared->path + "user_defined/"; } -VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name) +VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name, size_t max_size) { std::lock_guard lock(shared->storage_policies_mutex); + VolumePtr volume; if (policy_name.empty()) { @@ -723,21 +743,41 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic shared->tmp_path += '/'; auto disk = std::make_shared("_tmp_default", shared->tmp_path, 0); - shared->tmp_volume = std::make_shared("_tmp_default", disk, 0); + volume = std::make_shared("_tmp_default", disk, 0); } else { StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name); if (tmp_policy->getVolumes().size() != 1) throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, - "Policy '{} is used temporary files, such policy should have exactly one volume", policy_name); - shared->tmp_volume = tmp_policy->getVolume(0); + "Policy '{}' is used temporary files, such policy should have exactly one volume", policy_name); + volume = tmp_policy->getVolume(0); } - if (shared->tmp_volume->getDisks().empty()) + if (volume->getDisks().empty()) throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG); - return shared->tmp_volume; + for (const auto & disk : volume->getDisks()) + { + if (!disk) + throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Temporary disk is null"); + + /// Check that underlying disk is local (can be wrapped in decorator) + DiskPtr disk_ptr = disk; + if (const auto * disk_decorator = dynamic_cast(disk_ptr.get())) + disk_ptr = disk_decorator->getNestedDisk(); + + if (dynamic_cast(disk_ptr.get()) == nullptr) + { + const auto * disk_raw_ptr = disk_ptr.get(); + throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, + "Disk '{}' ({}) is not local and can't be used for temporary files", + disk_ptr->getName(), typeid(*disk_raw_ptr).name()); + } + } + + shared->temp_data_on_disk = std::make_shared(volume, max_size); + return volume; } void Context::setFlagsPath(const String & path) @@ -2897,14 +2937,13 @@ void Context::shutdown() } } - // Special volumes might also use disks that require shutdown. - if (shared->tmp_volume) + /// Special volumes might also use disks that require shutdown. + auto & tmp_data = shared->temp_data_on_disk; + if (tmp_data && tmp_data->getVolume()) { - auto & disks = shared->tmp_volume->getDisks(); + auto & disks = tmp_data->getVolume()->getDisks(); for (auto & disk : disks) - { disk->shutdown(); - } } shared->shutdown(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index a9984e32c1b..635c571b173 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -161,6 +161,8 @@ using ReadTaskCallback = std::function; using MergeTreeReadTaskCallback = std::function(PartitionReadRequest)>; +class TemporaryDataOnDiskScope; +using TemporaryDataOnDiskScopePtr = std::shared_ptr; #if USE_ROCKSDB class MergeTreeMetadataCache; @@ -362,6 +364,8 @@ private: /// A flag, used to mark if reader needs to apply deleted rows mask. bool apply_deleted_mask = true; + /// Temporary data for query execution accounting. + TemporaryDataOnDiskScopePtr temp_data_on_disk; public: /// Some counters for current query execution. /// Most of them are workarounds and should be removed in the future. @@ -435,7 +439,10 @@ public: /// A list of warnings about server configuration to place in `system.warnings` table. Strings getWarnings() const; - VolumePtr getTemporaryVolume() const; + VolumePtr getTemporaryVolume() const; /// TODO: remove, use `getTempDataOnDisk` + + TemporaryDataOnDiskScopePtr getTempDataOnDisk() const; + void setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_); void setPath(const String & path); void setFlagsPath(const String & path); @@ -446,7 +453,7 @@ public: void addWarningMessage(const String & msg) const; - VolumePtr setTemporaryStorage(const String & path, const String & policy_name = ""); + VolumePtr setTemporaryStorage(const String & path, const String & policy_name, size_t max_size); using ConfigurationPtr = Poco::AutoPtr; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index a77882c85d2..79deb38317c 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1453,7 +1453,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

context->getTemporaryVolume(), + this->context->getTempDataOnDisk(), settings.min_free_disk_space_for_temporary_data, settings.optimize_sorting_by_input_stream_properties); sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", join_pos)); @@ -2354,7 +2354,7 @@ static Aggregator::Params getAggregatorParams( settings.empty_result_for_aggregation_by_empty_set || (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty() && query_analyzer.hasConstAggregationKeys()), - context.getTemporaryVolume(), + context.getTempDataOnDisk(), settings.max_threads, settings.min_free_disk_space_for_temporary_data, settings.compile_aggregate_expressions, @@ -2616,7 +2616,7 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan) settings.max_bytes_before_remerge_sort, settings.remerge_sort_lowered_memory_bytes_ratio, settings.max_bytes_before_external_sort, - context->getTemporaryVolume(), + context->getTempDataOnDisk(), settings.min_free_disk_space_for_temporary_data, settings.optimize_sorting_by_input_stream_properties); sorting_step->setStepDescription("Sorting for window '" + window.window_name + "'"); @@ -2675,7 +2675,7 @@ void InterpreterSelectQuery::executeOrder(QueryPlan & query_plan, InputOrderInfo settings.max_bytes_before_remerge_sort, settings.remerge_sort_lowered_memory_bytes_ratio, settings.max_bytes_before_external_sort, - context->getTemporaryVolume(), + context->getTempDataOnDisk(), settings.min_free_disk_space_for_temporary_data, settings.optimize_sorting_by_input_stream_properties); diff --git a/src/Interpreters/JIT/compileFunction.cpp b/src/Interpreters/JIT/compileFunction.cpp index 99646084e5a..9a2b3934f64 100644 --- a/src/Interpreters/JIT/compileFunction.cpp +++ b/src/Interpreters/JIT/compileFunction.cpp @@ -234,9 +234,13 @@ static void compileFunction(llvm::Module & module, const IFunctionBase & functio auto * cur_block = b.GetInsertBlock(); for (auto & col : columns) { - col.data->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.data, 1), cur_block); + auto * ty_data = llvm::cast(col.data->getType()->getScalarType())->getElementType(); + col.data->addIncoming(b.CreateConstInBoundsGEP1_64(ty_data, col.data, 1), cur_block); if (col.null) - col.null->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.null, 1), cur_block); + { + auto * ty_null = llvm::cast(col.null->getType()->getScalarType())->getElementType(); + col.null->addIncoming(b.CreateConstInBoundsGEP1_64(ty_null, col.null, 1), cur_block); + } } auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1)); @@ -293,7 +297,8 @@ static void compileCreateAggregateStatesFunctions(llvm::Module & module, const s { size_t aggregate_function_offset = function_to_compile.aggregate_data_offset; const auto * aggregate_function = function_to_compile.function; - auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place_arg, aggregate_function_offset); + auto * ty_aggregate_data_place_arg = llvm::cast(aggregate_data_place_arg->getType()->getScalarType())->getElementType(); + auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_place_arg, aggregate_data_place_arg, aggregate_function_offset); aggregate_function->compileCreate(b, aggregation_place_with_offset); } @@ -324,7 +329,8 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const b.SetInsertPoint(entry); llvm::IRBuilder<> entry_builder(entry); - auto * places_start_arg = entry_builder.CreateInBoundsGEP(nullptr, places_arg, row_start_arg); + auto * ty_places_arg = llvm::cast(places_arg->getType()->getScalarType())->getElementType(); + auto * places_start_arg = entry_builder.CreateInBoundsGEP(ty_places_arg, places_arg, row_start_arg); std::vector columns; size_t previous_columns_size = 0; @@ -342,11 +348,13 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const const auto & argument_type = argument_types[column_argument_index]; auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, previous_columns_size + column_argument_index)); data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo()); - data_placeholder.data_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.data_init, row_start_arg); + auto * ty_data_init = llvm::cast(data_placeholder.data_init->getType()->getScalarType())->getElementType(); + data_placeholder.data_init = entry_builder.CreateInBoundsGEP(ty_data_init, data_placeholder.data_init, row_start_arg); if (argument_type->isNullable()) { data_placeholder.null_init = b.CreateExtractValue(data, {1}); - data_placeholder.null_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.null_init, row_start_arg); + auto * ty_null_init = llvm::cast(data_placeholder.null_init->getType()->getScalarType())->getElementType(); + data_placeholder.null_init = entry_builder.CreateInBoundsGEP(ty_null_init, data_placeholder.null_init, row_start_arg); } else { @@ -419,7 +427,8 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const arguments_values[column_argument_index] = nullable_value; } - auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregation_place, aggregate_function_offset); + auto * ty_aggregation_place = llvm::cast(aggregation_place->getType()->getScalarType())->getElementType(); + auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregation_place, aggregation_place, aggregate_function_offset); aggregate_function_ptr->compileAdd(b, aggregation_place_with_offset, arguments_types, arguments_values); previous_columns_size += function_arguments_size; @@ -430,13 +439,18 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const auto * cur_block = b.GetInsertBlock(); for (auto & col : columns) { - col.data->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.data, 1), cur_block); + auto * ty_data = llvm::cast(col.data->getType()->getScalarType())->getElementType(); + col.data->addIncoming(b.CreateConstInBoundsGEP1_64(ty_data, col.data, 1), cur_block); if (col.null) - col.null->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.null, 1), cur_block); + { + auto * ty_null = llvm::cast(col.null->getType()->getScalarType())->getElementType(); + col.null->addIncoming(b.CreateConstInBoundsGEP1_64(ty_null, col.null, 1), cur_block); + } } - places_phi->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, places_phi, 1), cur_block); + auto * ty_places_phi = llvm::cast(places_phi->getType()->getScalarType())->getElementType(); + places_phi->addIncoming(b.CreateConstInBoundsGEP1_64(ty_places_phi, places_phi, 1), cur_block); auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1)); counter_phi->addIncoming(value, cur_block); @@ -488,11 +502,13 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod const auto & argument_type = argument_types[column_argument_index]; auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, previous_columns_size + column_argument_index)); data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo()); - data_placeholder.data_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.data_init, row_start_arg); + auto * ty_data_init = llvm::cast(data_placeholder.data_init->getType()->getScalarType())->getElementType(); + data_placeholder.data_init = entry_builder.CreateInBoundsGEP(ty_data_init, data_placeholder.data_init, row_start_arg); if (argument_type->isNullable()) { data_placeholder.null_init = b.CreateExtractValue(data, {1}); - data_placeholder.null_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.null_init, row_start_arg); + auto * ty_null_init = llvm::cast(data_placeholder.null_init->getType()->getScalarType())->getElementType(); + data_placeholder.null_init = entry_builder.CreateInBoundsGEP(ty_null_init, data_placeholder.null_init, row_start_arg); } else { @@ -560,7 +576,8 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod arguments_values[column_argument_index] = nullable_value; } - auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, place_arg, aggregate_function_offset); + auto * ty_place_arg = llvm::cast(place_arg->getType()->getScalarType())->getElementType(); + auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(ty_place_arg, place_arg, aggregate_function_offset); aggregate_function_ptr->compileAdd(b, aggregation_place_with_offset, arguments_types, arguments_values); previous_columns_size += function_arguments_size; @@ -571,10 +588,14 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod auto * cur_block = b.GetInsertBlock(); for (auto & col : columns) { - col.data->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.data, 1), cur_block); + auto * ty_data = llvm::cast(col.data->getType()->getScalarType())->getElementType(); + col.data->addIncoming(b.CreateConstInBoundsGEP1_64(ty_data, col.data, 1), cur_block); if (col.null) - col.null->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.null, 1), cur_block); + { + auto * ty_null = llvm::cast(col.null->getType()->getScalarType())->getElementType(); + col.null->addIncoming(b.CreateConstInBoundsGEP1_64(ty_null, col.null, 1), cur_block); + } } auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1)); @@ -607,8 +628,10 @@ static void compileMergeAggregatesStates(llvm::Module & module, const std::vecto size_t aggregate_function_offset = function_to_compile.aggregate_data_offset; const auto * aggregate_function_ptr = function_to_compile.function; - auto * aggregate_data_place_merge_dst_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place_dst_arg, aggregate_function_offset); - auto * aggregate_data_place_merge_src_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place_src_arg, aggregate_function_offset); + auto * ty_aggregate_data_place_dst_arg = llvm::cast(aggregate_data_place_dst_arg->getType()->getScalarType())->getElementType(); + auto * aggregate_data_place_merge_dst_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_place_dst_arg, aggregate_data_place_dst_arg, aggregate_function_offset); + auto * ty_aggregate_data_place_src_arg = llvm::cast(aggregate_data_place_src_arg->getType()->getScalarType())->getElementType(); + auto * aggregate_data_place_merge_src_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_place_src_arg, aggregate_data_place_src_arg, aggregate_function_offset); aggregate_function_ptr->compileMerge(b, aggregate_data_place_merge_dst_with_offset, aggregate_data_place_merge_src_with_offset); } @@ -645,11 +668,13 @@ static void compileInsertAggregatesIntoResultColumns(llvm::Module & module, cons auto return_type = functions[i].function->getReturnType(); auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, i)); columns[i].data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(return_type))->getPointerTo()); - columns[i].data_init = entry_builder.CreateInBoundsGEP(nullptr, columns[i].data_init, row_start_arg); + auto * ty_data_init = llvm::cast(columns[i].data_init->getType()->getScalarType())->getElementType(); + columns[i].data_init = entry_builder.CreateInBoundsGEP(ty_data_init, columns[i].data_init, row_start_arg); if (return_type->isNullable()) { columns[i].null_init = b.CreateExtractValue(data, {1}); - columns[i].null_init = entry_builder.CreateInBoundsGEP(nullptr, columns[i].null_init, row_start_arg); + auto * ty_null_init = llvm::cast(columns[i].null_init->getType()->getScalarType())->getElementType(); + columns[i].null_init = entry_builder.CreateInBoundsGEP(ty_null_init, columns[i].null_init, row_start_arg); } else { @@ -688,7 +713,8 @@ static void compileInsertAggregatesIntoResultColumns(llvm::Module & module, cons const auto * aggregate_function_ptr = functions[i].function; auto * aggregate_data_place = b.CreateLoad(b.getInt8Ty()->getPointerTo(), aggregate_data_place_phi); - auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place, aggregate_function_offset); + auto * ty_aggregate_data_place = llvm::cast(aggregate_data_place->getType()->getScalarType())->getElementType(); + auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_place, aggregate_data_place, aggregate_function_offset); auto * final_value = aggregate_function_ptr->compileGetResult(b, aggregation_place_with_offset); @@ -708,16 +734,21 @@ static void compileInsertAggregatesIntoResultColumns(llvm::Module & module, cons auto * cur_block = b.GetInsertBlock(); for (auto & col : columns) { - col.data->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.data, 1), cur_block); + auto * ty_col_data = llvm::cast(col.data->getType()->getScalarType())->getElementType(); + col.data->addIncoming(b.CreateConstInBoundsGEP1_64(ty_col_data, col.data, 1), cur_block); if (col.null) - col.null->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.null, 1), cur_block); + { + auto * ty_col_null = llvm::cast(col.null->getType()->getScalarType())->getElementType(); + col.null->addIncoming(b.CreateConstInBoundsGEP1_64(ty_col_null, col.null, 1), cur_block); + } } auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1), "", true, true); counter_phi->addIncoming(value, cur_block); - aggregate_data_place_phi->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place_phi, 1), cur_block); + auto * ty_aggregate_data_place_phi = llvm::cast(aggregate_data_place_phi->getType()->getScalarType())->getElementType(); + aggregate_data_place_phi->addIncoming(b.CreateConstInBoundsGEP1_64(ty_aggregate_data_place_phi, aggregate_data_place_phi, 1), cur_block); b.CreateCondBr(b.CreateICmpEQ(value, row_end_arg), end, loop); @@ -842,11 +873,20 @@ CompiledSortDescriptionFunction compileSortDescription( auto * lhs_column_data = b.CreatePointerCast(b.CreateExtractValue(lhs_column, {0}), column_native_type_pointer); auto * lhs_column_null_data = column_type_is_nullable ? b.CreateExtractValue(lhs_column, {1}) : nullptr; - llvm::Value * lhs_value = b.CreateLoad(b.CreateInBoundsGEP(nullptr, lhs_column_data, lhs_index_arg)); +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wdeprecated-declarations" +#endif + auto * ty_lhs_column_data = llvm::cast(lhs_column_data->getType()->getScalarType())->getElementType(); + llvm::Value * lhs_value = b.CreateLoad(b.CreateInBoundsGEP(ty_lhs_column_data, lhs_column_data, lhs_index_arg)); +#ifdef __clang__ +#pragma clang diagnostic pop +#endif if (lhs_column_null_data) { - auto * is_null_value_pointer = b.CreateInBoundsGEP(nullptr, lhs_column_null_data, lhs_index_arg); + auto * ty_lhs_column_null_data = llvm::cast(lhs_column_null_data->getType()->getScalarType())->getElementType(); + auto * is_null_value_pointer = b.CreateInBoundsGEP(ty_lhs_column_null_data, lhs_column_null_data, lhs_index_arg); auto * is_null = b.CreateICmpNE(b.CreateLoad(b.getInt8Ty(), is_null_value_pointer), b.getInt8(0)); auto * lhs_nullable_value = b.CreateInsertValue(b.CreateInsertValue(nullable_unitilized, lhs_value, {0}), is_null, {1}); lhs_value = lhs_nullable_value; @@ -856,10 +896,19 @@ CompiledSortDescriptionFunction compileSortDescription( auto * rhs_column_data = b.CreatePointerCast(b.CreateExtractValue(rhs_column, {0}), column_native_type_pointer); auto * rhs_column_null_data = column_type_is_nullable ? b.CreateExtractValue(rhs_column, {1}) : nullptr; - llvm::Value * rhs_value = b.CreateLoad(b.CreateInBoundsGEP(nullptr, rhs_column_data, rhs_index_arg)); +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wdeprecated-declarations" +#endif + auto * ty_rhs_column_data = llvm::cast(rhs_column_data->getType()->getScalarType())->getElementType(); + llvm::Value * rhs_value = b.CreateLoad(b.CreateInBoundsGEP(ty_rhs_column_data, rhs_column_data, rhs_index_arg)); +#ifdef __clang__ +#pragma clang diagnostic pop +#endif if (rhs_column_null_data) { - auto * is_null_value_pointer = b.CreateInBoundsGEP(nullptr, rhs_column_null_data, rhs_index_arg); + auto * ty_rhs_column_null_data = llvm::cast(rhs_column_null_data->getType()->getScalarType())->getElementType(); + auto * is_null_value_pointer = b.CreateInBoundsGEP(ty_rhs_column_null_data, rhs_column_null_data, rhs_index_arg); auto * is_null = b.CreateICmpNE(b.CreateLoad(b.getInt8Ty(), is_null_value_pointer), b.getInt8(0)); auto * rhs_nullable_value = b.CreateInsertValue(b.CreateInsertValue(nullable_unitilized, rhs_value, {0}), is_null, {1}); rhs_value = rhs_nullable_value; diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 5a5a057aedc..8957cb9c694 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -4,12 +4,13 @@ #include #include -#include +#include #include #include #include #include #include +#include #include #include #include @@ -1032,7 +1033,7 @@ std::shared_ptr MergeJoin::loadRightBlock(size_t pos) const { auto load_func = [&]() -> std::shared_ptr { - TemporaryFileStream input(flushed_right_blocks[pos]->path(), materializeBlock(right_sample_block)); + TemporaryFileStreamLegacy input(flushed_right_blocks[pos]->path(), materializeBlock(right_sample_block)); return std::make_shared(input.block_in->read()); }; diff --git a/src/Interpreters/ProcessList.cpp b/src/Interpreters/ProcessList.cpp index e7cf5e14143..d4465a69890 100644 --- a/src/Interpreters/ProcessList.cpp +++ b/src/Interpreters/ProcessList.cpp @@ -69,7 +69,7 @@ static bool isUnlimitedQuery(const IAST * ast) } -ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, ContextPtr query_context) +ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr query_context) { EntryPtr res; @@ -198,7 +198,11 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as auto user_process_list_it = user_to_queries.find(client_info.current_user); if (user_process_list_it == user_to_queries.end()) - user_process_list_it = user_to_queries.emplace(client_info.current_user, this).first; + { + user_process_list_it = user_to_queries.emplace(std::piecewise_construct, + std::forward_as_tuple(client_info.current_user), + std::forward_as_tuple(query_context->getGlobalContext(), this)).first; + } ProcessListForUser & user_process_list = user_process_list_it->second; /// Actualize thread group info @@ -208,6 +212,11 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as std::lock_guard lock_thread_group(thread_group->mutex); thread_group->performance_counters.setParent(&user_process_list.user_performance_counters); thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker); + if (user_process_list.user_temp_data_on_disk) + { + query_context->setTempDataOnDisk(std::make_shared( + user_process_list.user_temp_data_on_disk, settings.max_temp_data_on_disk_size_for_query)); + } thread_group->query = query_; thread_group->one_line_query = toOneLineQuery(query_); thread_group->normalized_query_hash = normalizedQueryHash(query_); @@ -556,9 +565,19 @@ ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_ev ProcessListForUser::ProcessListForUser(ProcessList * global_process_list) + : ProcessListForUser(nullptr, global_process_list) +{} + +ProcessListForUser::ProcessListForUser(ContextPtr global_context, ProcessList * global_process_list) : user_overcommit_tracker(global_process_list, this) { user_memory_tracker.setOvercommitTracker(&user_overcommit_tracker); + + if (global_context) + { + size_t size_limit = global_context->getSettingsRef().max_temp_data_on_disk_size_for_user; + user_temp_data_on_disk = std::make_shared(global_context->getTempDataOnDisk(), size_limit); + } } diff --git a/src/Interpreters/ProcessList.h b/src/Interpreters/ProcessList.h index e7ad4e70712..6943c7cfcd8 100644 --- a/src/Interpreters/ProcessList.h +++ b/src/Interpreters/ProcessList.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include #include @@ -236,6 +238,8 @@ struct ProcessListForUser { explicit ProcessListForUser(ProcessList * global_process_list); + ProcessListForUser(ContextPtr global_context, ProcessList * global_process_list); + /// query_id -> ProcessListElement(s). There can be multiple queries with the same query_id as long as all queries except one are cancelled. using QueryToElement = std::unordered_map; QueryToElement queries; @@ -244,6 +248,8 @@ struct ProcessListForUser /// Limit and counter for memory of all simultaneously running queries of single user. MemoryTracker user_memory_tracker{VariableContext::User}; + TemporaryDataOnDiskScopePtr user_temp_data_on_disk; + UserOvercommitTracker user_overcommit_tracker; /// Count network usage for all simultaneously running queries of single user. @@ -257,6 +263,7 @@ struct ProcessListForUser /// Clears network bandwidth Throttler, so it will not count periods of inactivity. void resetTrackers() { + /// TODO: should we drop user_temp_data_on_disk here? user_memory_tracker.reset(); if (user_throttler) user_throttler.reset(); @@ -374,7 +381,7 @@ public: * If timeout is passed - throw an exception. * Don't count KILL QUERY queries. */ - EntryPtr insert(const String & query_, const IAST * ast, ContextPtr query_context); + EntryPtr insert(const String & query_, const IAST * ast, ContextMutablePtr query_context); /// Number of currently executing queries. size_t size() const { return processes.size(); } diff --git a/src/Interpreters/SortedBlocksWriter.cpp b/src/Interpreters/SortedBlocksWriter.cpp index 8f598f3dd3f..16c0e6c2c2b 100644 --- a/src/Interpreters/SortedBlocksWriter.cpp +++ b/src/Interpreters/SortedBlocksWriter.cpp @@ -5,7 +5,7 @@ #include #include #include -#include +#include #include #include @@ -39,7 +39,7 @@ namespace TemporaryFileOnDiskHolder flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec) { auto tmp_file = std::make_unique(disk, CurrentMetrics::TemporaryFilesForJoin); - auto write_stat = TemporaryFileStream::write(tmp_file->getPath(), header, std::move(pipeline), codec); + auto write_stat = TemporaryFileStreamLegacy::write(tmp_file->getPath(), header, std::move(pipeline), codec); ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, write_stat.compressed_bytes); ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, write_stat.uncompressed_bytes); diff --git a/src/Interpreters/TemporaryDataOnDisk.cpp b/src/Interpreters/TemporaryDataOnDisk.cpp new file mode 100644 index 00000000000..d603877a9e0 --- /dev/null +++ b/src/Interpreters/TemporaryDataOnDisk.cpp @@ -0,0 +1,270 @@ +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int TOO_MANY_ROWS_OR_BYTES; + extern const int LOGICAL_ERROR; + extern const int NOT_ENOUGH_SPACE; +} + +void TemporaryDataOnDiskScope::deltaAllocAndCheck(int compressed_delta, int uncompressed_delta) +{ + if (parent) + parent->deltaAllocAndCheck(compressed_delta, uncompressed_delta); + + + /// check that we don't go negative + if ((compressed_delta < 0 && stat.compressed_size < static_cast(-compressed_delta)) || + (uncompressed_delta < 0 && stat.uncompressed_size < static_cast(-uncompressed_delta))) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Negative temporary data size"); + } + + size_t new_consumprion = stat.compressed_size + compressed_delta; + if (compressed_delta > 0 && limit && new_consumprion > limit) + throw Exception(ErrorCodes::TOO_MANY_ROWS_OR_BYTES, "Limit for temporary files size exceeded"); + + stat.compressed_size += compressed_delta; + stat.uncompressed_size += uncompressed_delta; +} + +TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, CurrentMetrics::Value metric_scope, size_t max_file_size) +{ + DiskPtr disk; + if (max_file_size > 0) + { + auto reservation = volume->reserve(max_file_size); + if (!reservation) + throw Exception("Not enough space on temporary disk", ErrorCodes::NOT_ENOUGH_SPACE); + disk = reservation->getDisk(); + } + else + { + disk = volume->getDisk(); + } + + auto tmp_file = std::make_unique(disk, metric_scope); + + std::lock_guard lock(mutex); + TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique(std::move(tmp_file), header, this)); + return *tmp_stream; +} + + +std::vector TemporaryDataOnDisk::getStreams() const +{ + std::vector res; + std::lock_guard lock(mutex); + res.reserve(streams.size()); + for (const auto & stream : streams) + res.push_back(stream.get()); + return res; +} + +bool TemporaryDataOnDisk::empty() const +{ + std::lock_guard lock(mutex); + return streams.empty(); +} + +struct TemporaryFileStream::OutputWriter +{ + OutputWriter(const String & path, const Block & header_) + : out_file_buf(path) + , out_compressed_buf(out_file_buf) + , out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_) + { + } + + void write(const Block & block) + { + if (finalized) + throw Exception("Cannot write to finalized stream", ErrorCodes::LOGICAL_ERROR); + out_writer.write(block); + } + + + void finalize() + { + if (finalized) + return; + + /// if we called finalize() explicitly, and got an exception, + /// we don't want to get it again in the destructor, so set finalized flag first + finalized = true; + + out_writer.flush(); + out_compressed_buf.finalize(); + out_file_buf.finalize(); + } + + ~OutputWriter() + { + try + { + finalize(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + + WriteBufferFromFile out_file_buf; + CompressedWriteBuffer out_compressed_buf; + NativeWriter out_writer; + + bool finalized = false; +}; + +struct TemporaryFileStream::InputReader +{ + InputReader(const String & path, const Block & header_) + : in_file_buf(path) + , in_compressed_buf(in_file_buf) + , in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION) + { + } + + explicit InputReader(const String & path) + : in_file_buf(path) + , in_compressed_buf(in_file_buf) + , in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION) + { + } + + Block read() { return in_reader.read(); } + + ReadBufferFromFile in_file_buf; + CompressedReadBuffer in_compressed_buf; + NativeReader in_reader; +}; + +TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_) + : parent(parent_) + , header(header_) + , file(std::move(file_)) + , out_writer(std::make_unique(file->path(), header)) +{ +} + +void TemporaryFileStream::write(const Block & block) +{ + if (!out_writer) + throw Exception("Writing has been finished", ErrorCodes::LOGICAL_ERROR); + + updateAllocAndCheck(); + out_writer->write(block); +} + +TemporaryFileStream::Stat TemporaryFileStream::finishWriting() +{ + if (out_writer) + { + out_writer->finalize(); + /// The amount of written data can be changed after finalization, some buffers can be flushed + /// Need to update the stat + updateAllocAndCheck(); + out_writer.reset(); + + /// reader will be created at the first read call, not to consume memory before it is needed + } + return stat; +} + +bool TemporaryFileStream::isWriteFinished() const +{ + assert(in_reader == nullptr || out_writer == nullptr); + return out_writer == nullptr; +} + +Block TemporaryFileStream::read() +{ + if (!isWriteFinished()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished"); + + if (isFinalized()) + return {}; + + if (!in_reader) + { + in_reader = std::make_unique(file->path(), header); + } + + Block block = in_reader->read(); + if (!block) + { + /// finalize earlier to release resources, do not wait for the destructor + this->finalize(); + } + return block; +} + +void TemporaryFileStream::updateAllocAndCheck() +{ + assert(out_writer); + size_t new_compressed_size = out_writer->out_compressed_buf.getCompressedBytes(); + size_t new_uncompressed_size = out_writer->out_compressed_buf.getUncompressedBytes(); + + if (unlikely(new_compressed_size < stat.compressed_size || new_uncompressed_size < stat.uncompressed_size)) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Temporary file {} size decreased after write: compressed: {} -> {}, uncompressed: {} -> {}", + file->path(), new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size); + } + + parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size); + stat.compressed_size = new_compressed_size; + stat.uncompressed_size = new_uncompressed_size; +} + +bool TemporaryFileStream::isFinalized() const +{ + return file == nullptr; +} + +void TemporaryFileStream::finalize() +{ + if (file) + { + file.reset(); + parent->deltaAllocAndCheck(-stat.compressed_size, -stat.uncompressed_size); + } + + if (in_reader) + in_reader.reset(); + + if (out_writer) + { + out_writer->finalize(); + out_writer.reset(); + } +} + +TemporaryFileStream::~TemporaryFileStream() +{ + try + { + finalize(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + assert(false); /// deltaAllocAndCheck with negative can't throw exception + } +} + +} diff --git a/src/Interpreters/TemporaryDataOnDisk.h b/src/Interpreters/TemporaryDataOnDisk.h new file mode 100644 index 00000000000..44ff20935af --- /dev/null +++ b/src/Interpreters/TemporaryDataOnDisk.h @@ -0,0 +1,139 @@ +#pragma once + +#include + +#include +#include +#include + +namespace DB +{ + +class TemporaryDataOnDiskScope; +using TemporaryDataOnDiskScopePtr = std::shared_ptr; + +class TemporaryDataOnDisk; +using TemporaryDataOnDiskPtr = std::unique_ptr; + +class TemporaryFileStream; +using TemporaryFileStreamPtr = std::unique_ptr; + + +/* + * Used to account amount of temporary data written to disk. + * If limit is set, throws exception if limit is exceeded. + * Data can be nested, so parent scope accounts all data written by children. + * Scopes are: global -> per-user -> per-query -> per-purpose (sorting, aggregation, etc). + */ +class TemporaryDataOnDiskScope : boost::noncopyable +{ +public: + struct StatAtomic + { + std::atomic compressed_size; + std::atomic uncompressed_size; + }; + + explicit TemporaryDataOnDiskScope(VolumePtr volume_, size_t limit_) + : volume(std::move(volume_)), limit(limit_) + {} + + explicit TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, size_t limit_) + : parent(std::move(parent_)), volume(parent->volume), limit(limit_) + {} + + /// TODO: remove + /// Refactor all code that uses volume directly to use TemporaryDataOnDisk. + VolumePtr getVolume() const { return volume; } + +protected: + void deltaAllocAndCheck(int compressed_delta, int uncompressed_delta); + + TemporaryDataOnDiskScopePtr parent = nullptr; + VolumePtr volume; + + StatAtomic stat; + size_t limit = 0; +}; + +/* + * Holds the set of temporary files. + * New file stream is created with `createStream`. + * Streams are owned by this object and will be deleted when it is deleted. + * It's a leaf node in temorarty data scope tree. + */ +class TemporaryDataOnDisk : private TemporaryDataOnDiskScope +{ + friend class TemporaryFileStream; /// to allow it to call `deltaAllocAndCheck` to account data +public: + using TemporaryDataOnDiskScope::StatAtomic; + + explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_) + : TemporaryDataOnDiskScope(std::move(parent_), 0) + {} + + /// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space + TemporaryFileStream & createStream(const Block & header, CurrentMetrics::Value metric_scope, size_t max_file_size = 0); + + std::vector getStreams() const; + bool empty() const; + + const StatAtomic & getStat() const { return stat; } + +private: + mutable std::mutex mutex; + std::vector streams TSA_GUARDED_BY(mutex); +}; + +/* + * Data can be written into this stream and then read. + * After finish writing, call `finishWriting` and then `read` to read the data. + * Account amount of data written to disk in parent scope. + */ +class TemporaryFileStream : boost::noncopyable +{ +public: + struct Stat + { + /// Statistics for file + /// Non-atomic because we don't allow to `read` or `write` into single file from multiple threads + size_t compressed_size = 0; + size_t uncompressed_size = 0; + }; + + TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_); + + void write(const Block & block); + Stat finishWriting(); + bool isWriteFinished() const; + + Block read(); + + const String & path() const { return file->getPath(); } + Block getHeader() const { return header; } + + ~TemporaryFileStream(); + +private: + void updateAllocAndCheck(); + + /// Finalize everything, close reader and writer, delete file + void finalize(); + bool isFinalized() const; + + TemporaryDataOnDisk * parent; + + Block header; + + TemporaryFileOnDiskHolder file; + + Stat stat; + + struct OutputWriter; + std::unique_ptr out_writer; + + struct InputReader; + std::unique_ptr in_reader; +}; + +} diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 86f5c02df83..e89392d2e1f 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -177,7 +177,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B transform_params->params.group_by_two_level_threshold_bytes, transform_params->params.max_bytes_before_external_group_by, transform_params->params.empty_result_for_aggregation_by_empty_set, - transform_params->params.tmp_volume, + transform_params->params.tmp_data_scope, transform_params->params.max_threads, transform_params->params.min_free_disk_space, transform_params->params.compile_aggregate_expressions, diff --git a/src/Processors/QueryPlan/SortingStep.cpp b/src/Processors/QueryPlan/SortingStep.cpp index 9bad6a02d53..d5066f5987c 100644 --- a/src/Processors/QueryPlan/SortingStep.cpp +++ b/src/Processors/QueryPlan/SortingStep.cpp @@ -12,6 +12,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + static ITransformingStep::Traits getTraits(size_t limit) { return ITransformingStep::Traits @@ -37,7 +42,7 @@ SortingStep::SortingStep( size_t max_bytes_before_remerge_, double remerge_lowered_memory_bytes_ratio_, size_t max_bytes_before_external_sort_, - VolumePtr tmp_volume_, + TemporaryDataOnDiskScopePtr tmp_data_, size_t min_free_disk_space_, bool optimize_sorting_by_input_stream_properties_) : ITransformingStep(input_stream, input_stream.header, getTraits(limit_)) @@ -49,10 +54,13 @@ SortingStep::SortingStep( , max_bytes_before_remerge(max_bytes_before_remerge_) , remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_) , max_bytes_before_external_sort(max_bytes_before_external_sort_) - , tmp_volume(tmp_volume_) + , tmp_data(tmp_data_) , min_free_disk_space(min_free_disk_space_) , optimize_sorting_by_input_stream_properties(optimize_sorting_by_input_stream_properties_) { + if (max_bytes_before_external_sort && tmp_data == nullptr) + throw Exception("Temporary data storage for external sorting is not provided", ErrorCodes::LOGICAL_ERROR); + /// TODO: check input_stream is partially sorted by the same description. output_stream->sort_description = result_description; output_stream->sort_scope = DataStream::SortScope::Global; @@ -189,7 +197,7 @@ void SortingStep::mergeSorting(QueryPipelineBuilder & pipeline, const SortDescri max_bytes_before_remerge / pipeline.getNumStreams(), remerge_lowered_memory_bytes_ratio, max_bytes_before_external_sort, - tmp_volume, + std::make_unique(tmp_data), min_free_disk_space); }); } diff --git a/src/Processors/QueryPlan/SortingStep.h b/src/Processors/QueryPlan/SortingStep.h index 5ba0ba7ec63..d8d86b8cf1d 100644 --- a/src/Processors/QueryPlan/SortingStep.h +++ b/src/Processors/QueryPlan/SortingStep.h @@ -2,7 +2,7 @@ #include #include #include -#include +#include namespace DB { @@ -21,7 +21,7 @@ public: size_t max_bytes_before_remerge_, double remerge_lowered_memory_bytes_ratio_, size_t max_bytes_before_external_sort_, - VolumePtr tmp_volume_, + TemporaryDataOnDiskScopePtr tmp_data_, size_t min_free_disk_space_, bool optimize_sorting_by_input_stream_properties_); @@ -85,7 +85,8 @@ private: size_t max_bytes_before_remerge = 0; double remerge_lowered_memory_bytes_ratio = 0; size_t max_bytes_before_external_sort = 0; - VolumePtr tmp_volume; + TemporaryDataOnDiskScopePtr tmp_data = nullptr; + size_t min_free_disk_space = 0; const bool optimize_sorting_by_input_stream_properties = false; }; diff --git a/src/Processors/Sources/TemporaryFileLazySource.cpp b/src/Processors/Sources/TemporaryFileLazySource.cpp index 477c7567ec7..4df5b573826 100644 --- a/src/Processors/Sources/TemporaryFileLazySource.cpp +++ b/src/Processors/Sources/TemporaryFileLazySource.cpp @@ -1,5 +1,5 @@ #include -#include +#include namespace DB { @@ -18,7 +18,7 @@ Chunk TemporaryFileLazySource::generate() return {}; if (!stream) - stream = std::make_unique(path, header); + stream = std::make_unique(path, header); auto block = stream->block_in->read(); if (!block) diff --git a/src/Processors/Sources/TemporaryFileLazySource.h b/src/Processors/Sources/TemporaryFileLazySource.h index b2e9d5d5500..9bd3b5f6c3f 100644 --- a/src/Processors/Sources/TemporaryFileLazySource.h +++ b/src/Processors/Sources/TemporaryFileLazySource.h @@ -5,7 +5,7 @@ namespace DB { -struct TemporaryFileStream; +struct TemporaryFileStreamLegacy; class TemporaryFileLazySource : public ISource { @@ -22,7 +22,7 @@ private: Block header; bool done; - std::unique_ptr stream; + std::unique_ptr stream; }; } diff --git a/src/Processors/TTL/TTLAggregationAlgorithm.cpp b/src/Processors/TTL/TTLAggregationAlgorithm.cpp index 187d68216cd..cdd6e3e917f 100644 --- a/src/Processors/TTL/TTLAggregationAlgorithm.cpp +++ b/src/Processors/TTL/TTLAggregationAlgorithm.cpp @@ -34,7 +34,7 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm( 0, settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, - storage_.getContext()->getTemporaryVolume(), + storage_.getContext()->getTempDataOnDisk(), settings.max_threads, settings.min_free_disk_space_for_temporary_data, settings.compile_aggregate_expressions, diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index ca8e9c0c85b..653f1b20eb3 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -53,33 +53,29 @@ namespace class SourceFromNativeStream : public ISource { public: - SourceFromNativeStream(const Block & header, const std::string & path) - : ISource(header), file_in(path), compressed_in(file_in), - block_in(std::make_unique(compressed_in, DBMS_TCP_PROTOCOL_VERSION)) - { - } + explicit SourceFromNativeStream(TemporaryFileStream * tmp_stream_) + : ISource(tmp_stream_->getHeader()) + , tmp_stream(tmp_stream_) + {} String getName() const override { return "SourceFromNativeStream"; } Chunk generate() override { - if (!block_in) + if (!tmp_stream) return {}; - auto block = block_in->read(); + auto block = tmp_stream->read(); if (!block) { - block_in.reset(); + tmp_stream = nullptr; return {}; } - return convertToChunk(block); } private: - ReadBufferFromFile file_in; - CompressedReadBuffer compressed_in; - std::unique_ptr block_in; + TemporaryFileStream * tmp_stream; }; } @@ -564,7 +560,7 @@ void AggregatingTransform::initGenerate() elapsed_seconds, src_rows / elapsed_seconds, ReadableSize(src_bytes / elapsed_seconds)); - if (params->aggregator.hasTemporaryFiles()) + if (params->aggregator.hasTemporaryData()) { if (variants.isConvertibleToTwoLevel()) variants.convertToTwoLevel(); @@ -577,7 +573,7 @@ void AggregatingTransform::initGenerate() if (many_data->num_finished.fetch_add(1) + 1 < many_data->variants.size()) return; - if (!params->aggregator.hasTemporaryFiles()) + if (!params->aggregator.hasTemporaryData()) { auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants); auto prepared_data_ptr = std::make_shared(std::move(prepared_data)); @@ -604,25 +600,27 @@ void AggregatingTransform::initGenerate() } } - const auto & files = params->aggregator.getTemporaryFiles(); - Pipe pipe; + const auto & tmp_data = params->aggregator.getTemporaryData(); + Pipe pipe; { - auto header = params->aggregator.getHeader(false); Pipes pipes; - for (const auto & file : files.files) - pipes.emplace_back(Pipe(std::make_unique(header, file->path()))); + for (auto * tmp_stream : tmp_data.getStreams()) + pipes.emplace_back(Pipe(std::make_unique(tmp_stream))); pipe = Pipe::unitePipes(std::move(pipes)); } + size_t num_streams = tmp_data.getStreams().size(); + size_t compressed_size = tmp_data.getStat().compressed_size; + size_t uncompressed_size = tmp_data.getStat().uncompressed_size; LOG_DEBUG( log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", - files.files.size(), - ReadableSize(files.sum_size_compressed), - ReadableSize(files.sum_size_uncompressed)); + num_streams, + ReadableSize(compressed_size), + ReadableSize(uncompressed_size)); addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads); diff --git a/src/Processors/Transforms/MergeSortingTransform.cpp b/src/Processors/Transforms/MergeSortingTransform.cpp index a94c3da5ec1..c0717f6810e 100644 --- a/src/Processors/Transforms/MergeSortingTransform.cpp +++ b/src/Processors/Transforms/MergeSortingTransform.cpp @@ -30,21 +30,15 @@ namespace CurrentMetrics namespace DB { -namespace ErrorCodes -{ - extern const int NOT_ENOUGH_SPACE; -} - - class BufferingToFileTransform : public IAccumulatingTransform { public: - BufferingToFileTransform(const Block & header, Poco::Logger * log_, std::string path_) - : IAccumulatingTransform(header, header), log(log_) - , path(std::move(path_)), file_buf_out(path), compressed_buf_out(file_buf_out) - , out_stream(std::make_unique(compressed_buf_out, 0, header)) + BufferingToFileTransform(const Block & header, TemporaryFileStream & tmp_stream_, Poco::Logger * log_) + : IAccumulatingTransform(header, header) + , tmp_stream(tmp_stream_) + , log(log_) { - LOG_INFO(log, "Sorting and writing part of data into temporary file {}", path); + LOG_INFO(log, "Sorting and writing part of data into temporary file {}", tmp_stream.path()); ProfileEvents::increment(ProfileEvents::ExternalSortWritePart); } @@ -52,71 +46,37 @@ public: void consume(Chunk chunk) override { - out_stream->write(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); + Block block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); + tmp_stream.write(block); } Chunk generate() override { - if (out_stream) + if (!tmp_stream.isWriteFinished()) { - out_stream->flush(); - compressed_buf_out.next(); - file_buf_out.next(); + auto stat = tmp_stream.finishWriting(); - auto stat = updateWriteStat(); + ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, stat.compressed_size); + ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, stat.uncompressed_size); + ProfileEvents::increment(ProfileEvents::ExternalSortCompressedBytes, stat.compressed_size); + ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, stat.uncompressed_size); LOG_INFO(log, "Done writing part of data into temporary file {}, compressed {}, uncompressed {} ", - path, ReadableSize(static_cast(stat.compressed_size)), ReadableSize(static_cast(stat.uncompressed_size))); - - out_stream.reset(); - - file_in = std::make_unique(path); - compressed_in = std::make_unique(*file_in); - block_in = std::make_unique(*compressed_in, getOutputPort().getHeader(), 0); + tmp_stream.path(), ReadableSize(static_cast(stat.compressed_size)), ReadableSize(static_cast(stat.uncompressed_size))); } - if (!block_in) - return {}; - - auto block = block_in->read(); + Block block = tmp_stream.read(); if (!block) - { - block_in.reset(); return {}; - } UInt64 num_rows = block.rows(); return Chunk(block.getColumns(), num_rows); } private: - struct Stat - { - size_t compressed_size = 0; - size_t uncompressed_size = 0; - }; - - Stat updateWriteStat() - { - Stat res{compressed_buf_out.getCompressedBytes(), compressed_buf_out.getUncompressedBytes()}; - - ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, res.compressed_size); - ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, res.uncompressed_size); - - ProfileEvents::increment(ProfileEvents::ExternalSortCompressedBytes, res.compressed_size); - ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, res.uncompressed_size); - return res; - } + TemporaryFileStream & tmp_stream; Poco::Logger * log; - std::string path; - WriteBufferFromFile file_buf_out; - CompressedWriteBuffer compressed_buf_out; - std::unique_ptr out_stream; - - std::unique_ptr file_in; - std::unique_ptr compressed_in; - std::unique_ptr block_in; }; MergeSortingTransform::MergeSortingTransform( @@ -128,13 +88,13 @@ MergeSortingTransform::MergeSortingTransform( size_t max_bytes_before_remerge_, double remerge_lowered_memory_bytes_ratio_, size_t max_bytes_before_external_sort_, - VolumePtr tmp_volume_, + TemporaryDataOnDiskPtr tmp_data_, size_t min_free_disk_space_) : SortingTransform(header, description_, max_merged_block_size_, limit_, increase_sort_description_compile_attempts) , max_bytes_before_remerge(max_bytes_before_remerge_) , remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_) , max_bytes_before_external_sort(max_bytes_before_external_sort_) - , tmp_volume(tmp_volume_) + , tmp_data(std::move(tmp_data_)) , min_free_disk_space(min_free_disk_space_) { } @@ -209,17 +169,12 @@ void MergeSortingTransform::consume(Chunk chunk) */ if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) { - size_t size = sum_bytes_in_blocks + min_free_disk_space; - auto reservation = tmp_volume->reserve(size); - if (!reservation) - throw Exception("Not enough space for external sort in temporary storage", ErrorCodes::NOT_ENOUGH_SPACE); + /// If there's less free disk space than reserve_size, an exception will be thrown + size_t reserve_size = sum_bytes_in_blocks + min_free_disk_space; + auto & tmp_stream = tmp_data->createStream(header_without_constants, CurrentMetrics::TemporaryFilesForSort, reserve_size); - temporary_files.emplace_back(std::make_unique(reservation->getDisk(), CurrentMetrics::TemporaryFilesForSort)); - - const std::string & path = temporary_files.back()->path(); - merge_sorter - = std::make_unique(header_without_constants, std::move(chunks), description, max_merged_block_size, limit); - auto current_processor = std::make_shared(header_without_constants, log, path); + merge_sorter = std::make_unique(header_without_constants, std::move(chunks), description, max_merged_block_size, limit); + auto current_processor = std::make_shared(header_without_constants, tmp_stream, log); processors.emplace_back(current_processor); @@ -261,13 +216,14 @@ void MergeSortingTransform::generate() { if (!generated_prefix) { - if (temporary_files.empty()) + size_t num_tmp_files = tmp_data ? tmp_data->getStreams().size() : 0; + if (num_tmp_files == 0) merge_sorter = std::make_unique(header_without_constants, std::move(chunks), description, max_merged_block_size, limit); else { ProfileEvents::increment(ProfileEvents::ExternalSortMerge); - LOG_INFO(log, "There are {} temporary sorted parts to merge", temporary_files.size()); + LOG_INFO(log, "There are {} temporary sorted parts to merge", num_tmp_files); processors.emplace_back(std::make_shared( header_without_constants, std::move(chunks), description, max_merged_block_size, limit)); diff --git a/src/Processors/Transforms/MergeSortingTransform.h b/src/Processors/Transforms/MergeSortingTransform.h index 465193548e7..c64c93393ce 100644 --- a/src/Processors/Transforms/MergeSortingTransform.h +++ b/src/Processors/Transforms/MergeSortingTransform.h @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -28,7 +29,7 @@ public: size_t max_bytes_before_remerge_, double remerge_lowered_memory_bytes_ratio_, size_t max_bytes_before_external_sort_, - VolumePtr tmp_volume_, + TemporaryDataOnDiskPtr tmp_data_, size_t min_free_disk_space_); String getName() const override { return "MergeSortingTransform"; } @@ -44,7 +45,7 @@ private: size_t max_bytes_before_remerge; double remerge_lowered_memory_bytes_ratio; size_t max_bytes_before_external_sort; - VolumePtr tmp_volume; + TemporaryDataOnDiskPtr tmp_data; size_t min_free_disk_space; size_t sum_rows_in_blocks = 0; @@ -55,9 +56,6 @@ private: /// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore. bool remerge_is_useful = true; - /// Everything below is for external sorting. - std::vector temporary_files; - /// Merge all accumulated blocks to keep no more than limit rows. void remerge(); diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 286739781c5..91ecb3a37a0 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -310,7 +310,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( settings.group_by_two_level_threshold_bytes, settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, - context->getTemporaryVolume(), + context->getTempDataOnDisk(), settings.max_threads, settings.min_free_disk_space_for_temporary_data, settings.compile_aggregate_expressions, diff --git a/tests/ci/compatibility_check.py b/tests/ci/compatibility_check.py index 39d027ad3c0..2b61501a0dd 100644 --- a/tests/ci/compatibility_check.py +++ b/tests/ci/compatibility_check.py @@ -99,6 +99,7 @@ def get_run_commands( return [ f"readelf -s {build_path}/usr/bin/clickhouse | grep '@GLIBC_' > {result_folder}/glibc.log", f"readelf -s {build_path}/usr/bin/clickhouse-odbc-bridge | grep '@GLIBC_' >> {result_folder}/glibc.log", + f"readelf -s {build_path}/usr/bin/clickhouse-library-bridge | grep '@GLIBC_' >> {result_folder}/glibc.log", f"docker run --network=host --volume={build_path}/usr/bin/clickhouse:/clickhouse " f"--volume={build_path}/etc/clickhouse-server:/config " f"--volume={server_log_folder}:/var/log/clickhouse-server {image_ubuntu} > {result_folder}/ubuntu:12.04", diff --git a/tests/integration/test_recovery_replica/test.py b/tests/integration/test_recovery_replica/test.py index 4a1298162da..0a63da4db22 100644 --- a/tests/integration/test_recovery_replica/test.py +++ b/tests/integration/test_recovery_replica/test.py @@ -196,6 +196,7 @@ def test_update_metadata(start_cluster): node1.query("ALTER TABLE update_metadata MODIFY COLUMN col1 String") node1.query("ALTER TABLE update_metadata ADD COLUMN col2 INT") + node3.query("SYSTEM SYNC REPLICA update_metadata") for i in range(1, 11): node3.query( "INSERT INTO update_metadata VALUES ({}, '{}', {})".format( diff --git a/tests/queries/0_stateless/00107_totals_after_having.sql b/tests/queries/0_stateless/00107_totals_after_having.sql index 40a598a194d..d0a9a3a318c 100644 --- a/tests/queries/0_stateless/00107_totals_after_having.sql +++ b/tests/queries/0_stateless/00107_totals_after_having.sql @@ -30,7 +30,7 @@ SELECT intDiv(number, 2) AS k, count(), argMax(toString(number), number) FROM (S SELECT '*** External aggregation.'; -SET max_bytes_before_external_group_by=1000000; +SET max_bytes_before_external_group_by = 1000000; SET group_by_two_level_threshold = 100000; SELECT '**** totals_mode = after_having_auto';