Merge branch 'master' into enable_more_s3_tests

This commit is contained in:
alesapin 2022-09-30 14:53:21 +02:00
commit a84f90e0e5
51 changed files with 982 additions and 426 deletions

6
.gitmodules vendored
View File

@ -30,9 +30,6 @@
[submodule "contrib/re2"] [submodule "contrib/re2"]
path = contrib/re2 path = contrib/re2
url = https://github.com/google/re2.git url = https://github.com/google/re2.git
[submodule "contrib/llvm"]
path = contrib/llvm
url = https://github.com/ClickHouse/llvm
[submodule "contrib/mariadb-connector-c"] [submodule "contrib/mariadb-connector-c"]
path = contrib/mariadb-connector-c path = contrib/mariadb-connector-c
url = https://github.com/ClickHouse/mariadb-connector-c.git url = https://github.com/ClickHouse/mariadb-connector-c.git
@ -284,3 +281,6 @@
[submodule "contrib/c-ares"] [submodule "contrib/c-ares"]
path = contrib/c-ares path = contrib/c-ares
url = https://github.com/ClickHouse/c-ares url = https://github.com/ClickHouse/c-ares
[submodule "contrib/llvm-project"]
path = contrib/llvm-project
url = https://github.com/ClickHouse/llvm-project.git

View File

@ -107,7 +107,7 @@ if (ENABLE_TESTS)
add_contrib (googletest-cmake googletest) add_contrib (googletest-cmake googletest)
endif() endif()
add_contrib (llvm-cmake llvm) add_contrib (llvm-project-cmake llvm-project)
add_contrib (libxml2-cmake libxml2) add_contrib (libxml2-cmake libxml2)
add_contrib (aws-s3-cmake add_contrib (aws-s3-cmake
aws aws

1
contrib/llvm vendored

@ -1 +0,0 @@
Subproject commit 0db5bf5bd2452cd8f1283a1fcdc04845af705bfc

View File

@ -1,112 +0,0 @@
if (APPLE OR NOT ARCH_AMD64 OR SANITIZE STREQUAL "undefined")
set (ENABLE_EMBEDDED_COMPILER_DEFAULT OFF)
else()
set (ENABLE_EMBEDDED_COMPILER_DEFAULT ON)
endif()
option (ENABLE_EMBEDDED_COMPILER "Enable support for 'compile_expressions' option for query execution" ${ENABLE_EMBEDDED_COMPILER_DEFAULT})
if (NOT ENABLE_EMBEDDED_COMPILER)
message(STATUS "Not using LLVM")
return()
endif()
set (LLVM_FOUND 1)
set (LLVM_VERSION "12.0.0bundled")
set (LLVM_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/llvm/llvm/include"
"${ClickHouse_BINARY_DIR}/contrib/llvm/llvm/include"
)
set (LLVM_LIBRARY_DIRS "${ClickHouse_BINARY_DIR}/contrib/llvm/llvm")
# This list was generated by listing all LLVM libraries, compiling the binary and removing all libraries while it still compiles.
set (REQUIRED_LLVM_LIBRARIES
LLVMExecutionEngine
LLVMRuntimeDyld
LLVMAsmPrinter
LLVMDebugInfoDWARF
LLVMGlobalISel
LLVMSelectionDAG
LLVMMCDisassembler
LLVMPasses
LLVMCodeGen
LLVMipo
LLVMBitWriter
LLVMInstrumentation
LLVMScalarOpts
LLVMAggressiveInstCombine
LLVMInstCombine
LLVMVectorize
LLVMTransformUtils
LLVMTarget
LLVMAnalysis
LLVMProfileData
LLVMObject
LLVMBitReader
LLVMCore
LLVMRemarks
LLVMBitstreamReader
LLVMMCParser
LLVMMC
LLVMBinaryFormat
LLVMDebugInfoCodeView
LLVMSupport
LLVMDemangle
)
if (ARCH_AMD64)
list(APPEND REQUIRED_LLVM_LIBRARIES LLVMX86Info LLVMX86Desc LLVMX86CodeGen)
elseif (ARCH_AARCH64)
list(APPEND REQUIRED_LLVM_LIBRARIES LLVMAArch64Info LLVMAArch64Desc LLVMAArch64CodeGen)
endif ()
#function(llvm_libs_all REQUIRED_LLVM_LIBRARIES)
# llvm_map_components_to_libnames (result all)
# if (USE_STATIC_LIBRARIES OR NOT "LLVM" IN_LIST result)
# list (REMOVE_ITEM result "LTO" "LLVM")
# else()
# set (result "LLVM")
# endif ()
# list (APPEND result ${CMAKE_DL_LIBS} ch_contrib::zlib)
# set (${REQUIRED_LLVM_LIBRARIES} ${result} PARENT_SCOPE)
#endfunction()
message (STATUS "LLVM include Directory: ${LLVM_INCLUDE_DIRS}")
message (STATUS "LLVM library Directory: ${LLVM_LIBRARY_DIRS}")
message (STATUS "LLVM C++ compiler flags: ${LLVM_CXXFLAGS}")
# ld: unknown option: --color-diagnostics
set (LINKER_SUPPORTS_COLOR_DIAGNOSTICS 0 CACHE INTERNAL "")
# Do not adjust RPATH in llvm, since then it will not be able to find libcxx/libcxxabi/libunwind
set (CMAKE_INSTALL_RPATH "ON")
set (LLVM_COMPILER_CHECKED 1 CACHE INTERNAL "")
set (LLVM_ENABLE_EH 1 CACHE INTERNAL "")
set (LLVM_ENABLE_RTTI 1 CACHE INTERNAL "")
set (LLVM_ENABLE_PIC 0 CACHE INTERNAL "")
set (LLVM_TARGETS_TO_BUILD "X86;AArch64" CACHE STRING "")
# Need to use C++17 since the compilation is not possible with C++20 currently, due to ambiguous operator != etc.
# LLVM project will set its default value for the -std=... but our global setting from CMake will override it.
set (CMAKE_CXX_STANDARD 17)
set (LLVM_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/llvm/llvm")
set (LLVM_BINARY_DIR "${ClickHouse_BINARY_DIR}/contrib/llvm/llvm")
add_subdirectory ("${LLVM_SOURCE_DIR}" "${LLVM_BINARY_DIR}")
set_directory_properties (PROPERTIES
# due to llvm crosscompile cmake does not know how to clean it, and on clean
# will lead to the following error:
#
# ninja: error: remove(contrib/llvm/llvm/NATIVE): Directory not empty
#
ADDITIONAL_CLEAN_FILES "${LLVM_BINARY_DIR}"
# llvm's cmake configuring this file only when cmake runs,
# and after clean cmake will not know that it should re-run,
# add explicitly depends from llvm-config.h
CMAKE_CONFIGURE_DEPENDS "${LLVM_BINARY_DIR}/include/llvm/Config/llvm-config.h"
)
add_library (_llvm INTERFACE)
target_link_libraries (_llvm INTERFACE ${REQUIRED_LLVM_LIBRARIES})
target_include_directories (_llvm SYSTEM BEFORE INTERFACE ${LLVM_INCLUDE_DIRS})
add_library(ch_contrib::llvm ALIAS _llvm)

1
contrib/llvm-project vendored Submodule

@ -0,0 +1 @@
Subproject commit 6ca2b5b3927226f6bcf6c656f502ff5d012ad9b6

View File

@ -0,0 +1,122 @@
if (APPLE OR NOT ARCH_AMD64 OR SANITIZE STREQUAL "undefined" OR NOT USE_STATIC_LIBRARIES)
set (ENABLE_EMBEDDED_COMPILER_DEFAULT OFF)
else()
set (ENABLE_EMBEDDED_COMPILER_DEFAULT ON)
endif()
option (ENABLE_EMBEDDED_COMPILER "Enable support for 'compile_expressions' option for query execution" ${ENABLE_EMBEDDED_COMPILER_DEFAULT})
if (NOT ENABLE_EMBEDDED_COMPILER)
message(STATUS "Not using LLVM")
return()
endif()
# TODO: Enable shared library build
# TODO: Enable compilation on AArch64
set (LLVM_VERSION "13.0.0bundled")
set (LLVM_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm/include"
"${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm/include"
)
set (LLVM_LIBRARY_DIRS "${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm")
# This list was generated by listing all LLVM libraries, compiling the binary and removing all libraries while it still compiles.
set (REQUIRED_LLVM_LIBRARIES
LLVMExecutionEngine
LLVMRuntimeDyld
LLVMAsmPrinter
LLVMDebugInfoDWARF
LLVMGlobalISel
LLVMSelectionDAG
LLVMMCDisassembler
LLVMPasses
LLVMCodeGen
LLVMipo
LLVMBitWriter
LLVMInstrumentation
LLVMScalarOpts
LLVMAggressiveInstCombine
LLVMInstCombine
LLVMVectorize
LLVMTransformUtils
LLVMTarget
LLVMAnalysis
LLVMProfileData
LLVMObject
LLVMBitReader
LLVMCore
LLVMRemarks
LLVMBitstreamReader
LLVMMCParser
LLVMMC
LLVMBinaryFormat
LLVMDebugInfoCodeView
LLVMSupport
LLVMDemangle
)
# if (ARCH_AMD64)
list(APPEND REQUIRED_LLVM_LIBRARIES LLVMX86Info LLVMX86Desc LLVMX86CodeGen)
# elseif (ARCH_AARCH64)
# list(APPEND REQUIRED_LLVM_LIBRARIES LLVMAArch64Info LLVMAArch64Desc LLVMAArch64CodeGen)
# endif ()
# ld: unknown option: --color-diagnostics
# set (LINKER_SUPPORTS_COLOR_DIAGNOSTICS 0 CACHE INTERNAL "")
set (CMAKE_INSTALL_RPATH "ON") # Do not adjust RPATH in llvm, since then it will not be able to find libcxx/libcxxabi/libunwind
set (LLVM_COMPILER_CHECKED 1 CACHE INTERNAL "") # Skip internal compiler selection
set (LLVM_ENABLE_EH 1 CACHE INTERNAL "") # With exception handling
set (LLVM_ENABLE_RTTI 1 CACHE INTERNAL "")
set (LLVM_ENABLE_PIC 0 CACHE INTERNAL "")
set (LLVM_TARGETS_TO_BUILD "X86" CACHE STRING "") # for x86 + ARM: "X86;AArch64"
# Omit unnecessary stuff (just the options which are ON by default)
set(LLVM_ENABLE_BACKTRACES 0 CACHE INTERNAL "")
set(LLVM_ENABLE_CRASH_OVERRIDES 0 CACHE INTERNAL "")
set(LLVM_ENABLE_TERMINFO 0 CACHE INTERNAL "")
set(LLVM_ENABLE_LIBXML2 0 CACHE INTERNAL "")
set(LLVM_ENABLE_LIBEDIT 0 CACHE INTERNAL "")
set(LLVM_ENABLE_LIBPFM 0 CACHE INTERNAL "")
set(LLVM_ENABLE_ZLIB 0 CACHE INTERNAL "")
set(LLVM_ENABLE_Z3_SOLVER 0 CACHE INTERNAL "")
set(LLVM_INCLUDE_TOOLS 0 CACHE INTERNAL "")
set(LLVM_BUILD_TOOLS 0 CACHE INTERNAL "")
set(LLVM_INCLUDE_UTILS 0 CACHE INTERNAL "")
set(LLVM_BUILD_UTILS 0 CACHE INTERNAL "")
set(LLVM_INCLUDE_RUNTIMES 0 CACHE INTERNAL "")
set(LLVM_BUILD_RUNTIMES 0 CACHE INTERNAL "")
set(LLVM_BUILD_RUNTIME 0 CACHE INTERNAL "")
set(LLVM_INCLUDE_EXAMPLES 0 CACHE INTERNAL "")
set(LLVM_INCLUDE_TESTS 0 CACHE INTERNAL "")
set(LLVM_INCLUDE_GO_TESTS 0 CACHE INTERNAL "")
set(LLVM_INCLUDE_BENCHMARKS 0 CACHE INTERNAL "")
set(LLVM_INCLUDE_DOCS 0 CACHE INTERNAL "")
set(LLVM_ENABLE_OCAMLDOC 0 CACHE INTERNAL "")
set(LLVM_ENABLE_BINDINGS 0 CACHE INTERNAL "")
# C++20 is currently not supported due to ambiguous operator != etc.
set (CMAKE_CXX_STANDARD 17)
set (LLVM_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm")
set (LLVM_BINARY_DIR "${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm")
add_subdirectory ("${LLVM_SOURCE_DIR}" "${LLVM_BINARY_DIR}")
set_directory_properties (PROPERTIES
# due to llvm crosscompile cmake does not know how to clean it, and on clean
# will lead to the following error:
#
# ninja: error: remove(contrib/llvm/llvm/NATIVE): Directory not empty
#
ADDITIONAL_CLEAN_FILES "${LLVM_BINARY_DIR}"
# llvm's cmake configuring this file only when cmake runs,
# and after clean cmake will not know that it should re-run,
# add explicitly depends from llvm-config.h
CMAKE_CONFIGURE_DEPENDS "${LLVM_BINARY_DIR}/include/llvm/Config/llvm-config.h"
)
add_library (_llvm INTERFACE)
target_link_libraries (_llvm INTERFACE ${REQUIRED_LLVM_LIBRARIES})
target_include_directories (_llvm SYSTEM BEFORE INTERFACE ${LLVM_INCLUDE_DIRS})
add_library(ch_contrib::llvm ALIAS _llvm)

View File

@ -13,25 +13,28 @@ sysctl kernel.core_pattern='core.%e.%p-%P'
# Thread Fuzzer allows to check more permutations of possible thread scheduling # Thread Fuzzer allows to check more permutations of possible thread scheduling
# and find more potential issues. # and find more potential issues.
# Temporarily disable ThreadFuzzer with tsan because of https://github.com/google/sanitizers/issues/1540
is_tsan_build=$(clickhouse local -q "select value like '% -fsanitize=thread %' from system.build_options where name='CXX_FLAGS'")
if [ "$is_tsan_build" -eq "0" ]; then
export THREAD_FUZZER_CPU_TIME_PERIOD_US=1000
export THREAD_FUZZER_SLEEP_PROBABILITY=0.1
export THREAD_FUZZER_SLEEP_TIME_US=100000
export THREAD_FUZZER_CPU_TIME_PERIOD_US=1000 export THREAD_FUZZER_pthread_mutex_lock_BEFORE_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_SLEEP_PROBABILITY=0.1 export THREAD_FUZZER_pthread_mutex_lock_AFTER_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_SLEEP_TIME_US=100000 export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_lock_BEFORE_MIGRATE_PROBABILITY=1 export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_lock_AFTER_MIGRATE_PROBABILITY=1 export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_MIGRATE_PROBABILITY=1 export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_MIGRATE_PROBABILITY=1 export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_PROBABILITY=0.001 export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_PROBABILITY=0.001 export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_PROBABILITY=0.001 export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_PROBABILITY=0.001 fi
export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US=10000
function install_packages() function install_packages()

View File

@ -1498,7 +1498,7 @@ If not set, [tmp_path](#tmp-path) is used, otherwise it is ignored.
- `move_factor` is ignored. - `move_factor` is ignored.
- `keep_free_space_bytes` is ignored. - `keep_free_space_bytes` is ignored.
- `max_data_part_size_bytes` is ignored. - `max_data_part_size_bytes` is ignored.
- Уou must have exactly one volume in that policy. - Policy should have exactly one volume with local disks.
::: :::
## uncompressed_cache_size {#server-settings-uncompressed_cache_size} ## uncompressed_cache_size {#server-settings-uncompressed_cache_size}

View File

@ -1342,12 +1342,13 @@ TCP порт для защищённого обмена данными с кли
Если политика не задана, используется [tmp_path](#tmp-path). В противном случае `tmp_path` игнорируется. Если политика не задана, используется [tmp_path](#tmp-path). В противном случае `tmp_path` игнорируется.
:::note "Примечание" :::note "Примечание"
- `move_factor` игнорируется. - `move_factor` игнорируется.
- `keep_free_space_bytes` игнорируется. - `keep_free_space_bytes` игнорируется.
- `max_data_part_size_bytes` игнорируется. - `max_data_part_size_bytes` игнорируется.
- В данной политике у вас должен быть ровно один том. - В данной политике должен быть ровно один том, содержащий только локальный диски.
::: :::
## uncompressed_cache_size {#server-settings-uncompressed_cache_size} ## uncompressed_cache_size {#server-settings-uncompressed_cache_size}
Размер кеша (в байтах) для несжатых данных, используемых движками таблиц семейства [MergeTree](../../operations/server-configuration-parameters/settings.md). Размер кеша (в байтах) для несжатых данных, используемых движками таблиц семейства [MergeTree](../../operations/server-configuration-parameters/settings.md).

View File

@ -203,7 +203,7 @@ void LocalServer::tryInitPath()
global_context->setPath(path); global_context->setPath(path);
global_context->setTemporaryStorage(path + "tmp"); global_context->setTemporaryStorage(path + "tmp", "", 0);
global_context->setFlagsPath(path + "flags"); global_context->setFlagsPath(path + "flags");
global_context->setUserFilesPath(""); // user's files are everywhere global_context->setUserFilesPath(""); // user's files are everywhere

View File

@ -209,7 +209,7 @@ try
fs::remove(it->path()); fs::remove(it->path());
} }
else else
LOG_DEBUG(log, "Skipped file in temporary path {}", it->path().string()); LOG_DEBUG(log, "Found unknown file in temporary path {}", it->path().string());
} }
} }
catch (...) catch (...)
@ -971,7 +971,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
{ {
std::string tmp_path = config().getString("tmp_path", path / "tmp/"); std::string tmp_path = config().getString("tmp_path", path / "tmp/");
std::string tmp_policy = config().getString("tmp_policy", ""); std::string tmp_policy = config().getString("tmp_policy", "");
const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy); size_t tmp_max_size = config().getUInt64("tmp_max_size", 0);
const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy, tmp_max_size);
for (const DiskPtr & disk : volume->getDisks()) for (const DiskPtr & disk : volume->getDisks())
setupTmpPath(log, disk->getPath()); setupTmpPath(log, disk->getPath());
} }

View File

@ -164,8 +164,10 @@ public:
auto * denominator_type = toNativeType<Denominator>(b); auto * denominator_type = toNativeType<Denominator>(b);
static constexpr size_t denominator_offset = offsetof(Fraction, denominator); static constexpr size_t denominator_offset = offsetof(Fraction, denominator);
auto * denominator_dst_ptr = b.CreatePointerCast(b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_dst_ptr, denominator_offset), denominator_type->getPointerTo()); auto * ty_aggregate_data_dst_ptr = llvm::cast<llvm::PointerType>(aggregate_data_dst_ptr->getType()->getScalarType())->getElementType();
auto * denominator_src_ptr = b.CreatePointerCast(b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_src_ptr, denominator_offset), denominator_type->getPointerTo()); auto * denominator_dst_ptr = b.CreatePointerCast(b.CreateConstInBoundsGEP1_64(ty_aggregate_data_dst_ptr, aggregate_data_dst_ptr, denominator_offset), denominator_type->getPointerTo());
auto * ty_aggregate_data_src_ptr = llvm::cast<llvm::PointerType>(aggregate_data_src_ptr->getType()->getScalarType())->getElementType();
auto * denominator_src_ptr = b.CreatePointerCast(b.CreateConstInBoundsGEP1_64(ty_aggregate_data_src_ptr, aggregate_data_src_ptr, denominator_offset), denominator_type->getPointerTo());
auto * denominator_dst_value = b.CreateLoad(denominator_type, denominator_dst_ptr); auto * denominator_dst_value = b.CreateLoad(denominator_type, denominator_dst_ptr);
auto * denominator_src_value = b.CreateLoad(denominator_type, denominator_src_ptr); auto * denominator_src_value = b.CreateLoad(denominator_type, denominator_src_ptr);
@ -184,7 +186,8 @@ public:
auto * denominator_type = toNativeType<Denominator>(b); auto * denominator_type = toNativeType<Denominator>(b);
static constexpr size_t denominator_offset = offsetof(Fraction, denominator); static constexpr size_t denominator_offset = offsetof(Fraction, denominator);
auto * denominator_ptr = b.CreatePointerCast(b.CreateConstGEP1_32(nullptr, aggregate_data_ptr, denominator_offset), denominator_type->getPointerTo()); auto * ty_aggregate_data_ptr = llvm::cast<llvm::PointerType>(aggregate_data_ptr->getType()->getScalarType())->getElementType();
auto * denominator_ptr = b.CreatePointerCast(b.CreateConstGEP1_32(ty_aggregate_data_ptr, aggregate_data_ptr, denominator_offset), denominator_type->getPointerTo());
auto * denominator_value = b.CreateLoad(denominator_type, denominator_ptr); auto * denominator_value = b.CreateLoad(denominator_type, denominator_ptr);
auto * double_numerator = nativeCast<Numerator>(b, numerator_value, b.getDoubleTy()); auto * double_numerator = nativeCast<Numerator>(b, numerator_value, b.getDoubleTy());
@ -311,7 +314,8 @@ public:
auto * denominator_type = toNativeType<Denominator>(b); auto * denominator_type = toNativeType<Denominator>(b);
static constexpr size_t denominator_offset = offsetof(Fraction, denominator); static constexpr size_t denominator_offset = offsetof(Fraction, denominator);
auto * denominator_ptr = b.CreatePointerCast(b.CreateConstGEP1_32(nullptr, aggregate_data_ptr, denominator_offset), denominator_type->getPointerTo()); auto * ty_aggregate_data_ptr = llvm::cast<llvm::PointerType>(aggregate_data_ptr->getType()->getScalarType())->getElementType();
auto * denominator_ptr = b.CreatePointerCast(b.CreateConstGEP1_32(ty_aggregate_data_ptr, aggregate_data_ptr, denominator_offset), denominator_type->getPointerTo());
auto * denominator_value_updated = b.CreateAdd(b.CreateLoad(denominator_type, denominator_ptr), llvm::ConstantInt::get(denominator_type, 1)); auto * denominator_value_updated = b.CreateAdd(b.CreateLoad(denominator_type, denominator_ptr), llvm::ConstantInt::get(denominator_type, 1));
b.CreateStore(denominator_value_updated, denominator_ptr); b.CreateStore(denominator_value_updated, denominator_ptr);
} }

View File

@ -74,7 +74,8 @@ public:
auto * denominator_type = toNativeType<Denominator>(b); auto * denominator_type = toNativeType<Denominator>(b);
static constexpr size_t denominator_offset = offsetof(Fraction, denominator); static constexpr size_t denominator_offset = offsetof(Fraction, denominator);
auto * denominator_offset_ptr = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, denominator_offset); auto * ty_aggregate_data_ptr = llvm::cast<llvm::PointerType>(aggregate_data_ptr->getType()->getScalarType())->getElementType();
auto * denominator_offset_ptr = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, denominator_offset);
auto * denominator_ptr = b.CreatePointerCast(denominator_offset_ptr, denominator_type->getPointerTo()); auto * denominator_ptr = b.CreatePointerCast(denominator_offset_ptr, denominator_type->getPointerTo());
auto * weight_cast_to_denominator = nativeCast(b, arguments_types[1], argument_values[1], denominator_type); auto * weight_cast_to_denominator = nativeCast(b, arguments_types[1], argument_values[1], denominator_type);

View File

@ -207,7 +207,8 @@ public:
if constexpr (result_is_nullable) if constexpr (result_is_nullable)
b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr); b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr);
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); auto * ty_aggregate_data_ptr = llvm::cast<llvm::PointerType>(aggregate_data_ptr->getType()->getScalarType())->getElementType();
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size);
this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, { removeNullable(nullable_type) }, { wrapped_value }); this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, { removeNullable(nullable_type) }, { wrapped_value });
b.CreateBr(join_block); b.CreateBr(join_block);
@ -419,7 +420,8 @@ public:
if constexpr (result_is_nullable) if constexpr (result_is_nullable)
b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr); b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr);
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); auto * ty_aggregate_data_ptr = llvm::cast<llvm::PointerType>(aggregate_data_ptr->getType()->getScalarType())->getElementType();
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size);
this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, non_nullable_types, wrapped_values); this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, non_nullable_types, wrapped_values);
b.CreateBr(join_block); b.CreateBr(join_block);

View File

@ -201,7 +201,8 @@ public:
static constexpr size_t value_offset_from_structure = offsetof(SingleValueDataFixed<T>, value); static constexpr size_t value_offset_from_structure = offsetof(SingleValueDataFixed<T>, value);
auto * type = toNativeType<T>(builder); auto * type = toNativeType<T>(builder);
auto * value_ptr_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, value_offset_from_structure); auto * ty_aggregate_data_ptr = llvm::cast<llvm::PointerType>(aggregate_data_ptr->getType()->getScalarType())->getElementType();
auto * value_ptr_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, value_offset_from_structure);
auto * value_ptr = b.CreatePointerCast(value_ptr_with_offset, type->getPointerTo()); auto * value_ptr = b.CreatePointerCast(value_ptr_with_offset, type->getPointerTo());
return value_ptr; return value_ptr;

View File

@ -225,7 +225,8 @@ public:
if constexpr (result_is_nullable) if constexpr (result_is_nullable)
b.CreateMemSet(aggregate_data_ptr, llvm::ConstantInt::get(b.getInt8Ty(), 0), this->prefix_size, llvm::assumeAligned(this->alignOfData())); b.CreateMemSet(aggregate_data_ptr, llvm::ConstantInt::get(b.getInt8Ty(), 0), this->prefix_size, llvm::assumeAligned(this->alignOfData()));
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); auto * ty_aggregate_data_ptr = llvm::cast<llvm::PointerType>(aggregate_data_ptr->getType()->getScalarType())->getElementType();
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size);
this->nested_function->compileCreate(b, aggregate_data_ptr_with_prefix_size_offset); this->nested_function->compileCreate(b, aggregate_data_ptr_with_prefix_size_offset);
} }
@ -235,16 +236,25 @@ public:
if constexpr (result_is_nullable) if constexpr (result_is_nullable)
{ {
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#endif
auto * aggregate_data_is_null_dst_value = b.CreateLoad(aggregate_data_dst_ptr); auto * aggregate_data_is_null_dst_value = b.CreateLoad(aggregate_data_dst_ptr);
auto * aggregate_data_is_null_src_value = b.CreateLoad(aggregate_data_src_ptr); auto * aggregate_data_is_null_src_value = b.CreateLoad(aggregate_data_src_ptr);
#ifdef __clang__
#pragma clang diagnostic pop
#endif
auto * is_src_null = nativeBoolCast(b, std::make_shared<DataTypeUInt8>(), aggregate_data_is_null_src_value); auto * is_src_null = nativeBoolCast(b, std::make_shared<DataTypeUInt8>(), aggregate_data_is_null_src_value);
auto * is_null_result_value = b.CreateSelect(is_src_null, llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_is_null_dst_value); auto * is_null_result_value = b.CreateSelect(is_src_null, llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_is_null_dst_value);
b.CreateStore(is_null_result_value, aggregate_data_dst_ptr); b.CreateStore(is_null_result_value, aggregate_data_dst_ptr);
} }
auto * aggregate_data_dst_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_dst_ptr, this->prefix_size); auto * ty_aggregate_data_dst_ptr = llvm::cast<llvm::PointerType>(aggregate_data_dst_ptr->getType()->getScalarType())->getElementType();
auto * aggregate_data_src_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_src_ptr, this->prefix_size); auto * aggregate_data_dst_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_dst_ptr, aggregate_data_dst_ptr, this->prefix_size);
auto * ty_aggregate_data_src_ptr = llvm::cast<llvm::PointerType>(aggregate_data_src_ptr->getType()->getScalarType())->getElementType();
auto * aggregate_data_src_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_src_ptr, aggregate_data_src_ptr, this->prefix_size);
this->nested_function->compileMerge(b, aggregate_data_dst_ptr_with_prefix_size_offset, aggregate_data_src_ptr_with_prefix_size_offset); this->nested_function->compileMerge(b, aggregate_data_dst_ptr_with_prefix_size_offset, aggregate_data_src_ptr_with_prefix_size_offset);
} }
@ -278,7 +288,8 @@ public:
b.CreateBr(join_block); b.CreateBr(join_block);
b.SetInsertPoint(if_not_null); b.SetInsertPoint(if_not_null);
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); auto * ty_aggregate_data_ptr = llvm::cast<llvm::PointerType>(aggregate_data_ptr->getType()->getScalarType())->getElementType();
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size);
auto * nested_result = this->nested_function->compileGetResult(builder, aggregate_data_ptr_with_prefix_size_offset); auto * nested_result = this->nested_function->compileGetResult(builder, aggregate_data_ptr_with_prefix_size_offset);
b.CreateStore(b.CreateInsertValue(nullable_value, nested_result, {0}), nullable_value_ptr); b.CreateStore(b.CreateInsertValue(nullable_value, nested_result, {0}), nullable_value_ptr);
b.CreateBr(join_block); b.CreateBr(join_block);
@ -374,7 +385,8 @@ public:
if constexpr (result_is_nullable) if constexpr (result_is_nullable)
b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr); b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr);
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); auto * ty_aggregate_data_ptr = llvm::cast<llvm::PointerType>(aggregate_data_ptr->getType()->getScalarType())->getElementType();
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size);
this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, { removeNullable(nullable_type) }, { wrapped_value }); this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, { removeNullable(nullable_type) }, { wrapped_value });
b.CreateBr(join_block); b.CreateBr(join_block);
@ -598,7 +610,8 @@ public:
if constexpr (result_is_nullable) if constexpr (result_is_nullable)
b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr); b.CreateStore(llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_ptr);
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, this->prefix_size); auto * ty_aggregate_data_ptr = llvm::cast<llvm::PointerType>(aggregate_data_ptr->getType()->getScalarType())->getElementType();
auto * aggregate_data_ptr_with_prefix_size_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_ptr, aggregate_data_ptr, this->prefix_size);
this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, arguments_types, wrapped_values); this->nested_function->compileAdd(b, aggregate_data_ptr_with_prefix_size_offset, arguments_types, wrapped_values);
b.CreateBr(join_block); b.CreateBr(join_block);

View File

@ -399,6 +399,9 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_network_bandwidth_for_user, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means unlimited.", 0)\ M(UInt64, max_network_bandwidth_for_user, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means unlimited.", 0)\
M(UInt64, max_network_bandwidth_for_all_users, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means unlimited.", 0) \ M(UInt64, max_network_bandwidth_for_all_users, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means unlimited.", 0) \
\ \
M(UInt64, max_temp_data_on_disk_size_for_user, 0, "The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running user queries. Zero means unlimited.", 0)\
M(UInt64, max_temp_data_on_disk_size_for_query, 0, "The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running queries. Zero means unlimited.", 0)\
\
M(UInt64, backup_threads, 16, "The maximum number of threads to execute BACKUP requests.", 0) \ M(UInt64, backup_threads, 16, "The maximum number of threads to execute BACKUP requests.", 0) \
M(UInt64, restore_threads, 16, "The maximum number of threads to execute RESTORE requests.", 0) \ M(UInt64, restore_threads, 16, "The maximum number of threads to execute RESTORE requests.", 0) \
\ \

View File

@ -30,6 +30,8 @@
#include <Common/Macros.h> #include <Common/Macros.h>
#include <base/chrono_io.h> #include <base/chrono_io.h>
#include <utility>
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
@ -862,7 +864,19 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
for (const auto & id : dropped_tables) for (const auto & id : dropped_tables)
DatabaseCatalog::instance().waitTableFinallyDropped(id); DatabaseCatalog::instance().waitTableFinallyDropped(id);
for (const auto & name_and_meta : table_name_to_metadata) /// FIXME: Use proper dependency calculation instead of just moving MV to the end
using NameToMetadata = std::pair<String, String>;
std::vector<NameToMetadata> table_name_to_metadata_sorted;
table_name_to_metadata_sorted.reserve(table_name_to_metadata.size());
std::move(table_name_to_metadata.begin(), table_name_to_metadata.end(), std::back_inserter(table_name_to_metadata_sorted));
std::sort(table_name_to_metadata_sorted.begin(), table_name_to_metadata_sorted.end(), [](const NameToMetadata & lhs, const NameToMetadata & rhs) -> bool
{
const bool is_materialized_view_lhs = lhs.second.find("MATERIALIZED VIEW") != std::string::npos;
const bool is_materialized_view_rhs = rhs.second.find("MATERIALIZED VIEW") != std::string::npos;
return is_materialized_view_lhs < is_materialized_view_rhs;
});
for (const auto & name_and_meta : table_name_to_metadata_sorted)
{ {
if (isTableExist(name_and_meta.first, getContext())) if (isTableExist(name_and_meta.first, getContext()))
{ {

View File

@ -241,4 +241,11 @@ DiskObjectStoragePtr DiskDecorator::createDiskObjectStorage()
return delegate->createDiskObjectStorage(); return delegate->createDiskObjectStorage();
} }
DiskPtr DiskDecorator::getNestedDisk() const
{
if (const auto * decorator = dynamic_cast<const DiskDecorator *>(delegate.get()))
return decorator->getNestedDisk();
return delegate;
}
} }

View File

@ -107,6 +107,8 @@ public:
bool supportsChmod() const override { return delegate->supportsChmod(); } bool supportsChmod() const override { return delegate->supportsChmod(); }
void chmod(const String & path, mode_t mode) override { delegate->chmod(path, mode); } void chmod(const String & path, mode_t mode) override { delegate->chmod(path, mode); }
virtual DiskPtr getNestedDisk() const;
protected: protected:
Executor & getExecutor() override; Executor & getExecutor() override;

View File

@ -331,6 +331,20 @@ void DiskRestartProxy::getRemotePathsRecursive(
return DiskDecorator::getRemotePathsRecursive(path, paths_map); return DiskDecorator::getRemotePathsRecursive(path, paths_map);
} }
DiskPtr DiskRestartProxy::getNestedDisk() const
{
DiskPtr delegate_copy;
{
ReadLock lock (mutex);
delegate_copy = delegate;
}
if (const auto * decorator = dynamic_cast<const DiskDecorator *>(delegate_copy.get()))
return decorator->getNestedDisk();
return delegate_copy;
}
void DiskRestartProxy::restart(ContextPtr context) void DiskRestartProxy::restart(ContextPtr context)
{ {
/// Speed up processing unhealthy requests. /// Speed up processing unhealthy requests.

View File

@ -71,6 +71,8 @@ public:
void restart(ContextPtr context); void restart(ContextPtr context);
DiskPtr getNestedDisk() const override;
private: private:
friend class RestartAwareReadBuffer; friend class RestartAwareReadBuffer;
friend class RestartAwareWriteBuffer; friend class RestartAwareWriteBuffer;

View File

@ -237,6 +237,7 @@ Block NativeReader::read()
else else
tmp_res.insert({col.type->createColumn()->cloneResized(rows), col.type, col.name}); tmp_res.insert({col.type->createColumn()->cloneResized(rows), col.type, col.name});
} }
tmp_res.info = res.info;
res.swap(tmp_res); res.swap(tmp_res);
} }

View File

@ -1,4 +1,4 @@
#include <Formats/TemporaryFileStream.h> #include <Formats/TemporaryFileStreamLegacy.h>
#include <Formats/NativeReader.h> #include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h> #include <Formats/NativeWriter.h>
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
@ -12,20 +12,20 @@ namespace DB
{ {
/// To read the data that was flushed into the temporary data file. /// To read the data that was flushed into the temporary data file.
TemporaryFileStream::TemporaryFileStream(const std::string & path) TemporaryFileStreamLegacy::TemporaryFileStreamLegacy(const std::string & path)
: file_in(path) : file_in(path)
, compressed_in(file_in) , compressed_in(file_in)
, block_in(std::make_unique<NativeReader>(compressed_in, DBMS_TCP_PROTOCOL_VERSION)) , block_in(std::make_unique<NativeReader>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{} {}
TemporaryFileStream::TemporaryFileStream(const std::string & path, const Block & header_) TemporaryFileStreamLegacy::TemporaryFileStreamLegacy(const std::string & path, const Block & header_)
: file_in(path) : file_in(path)
, compressed_in(file_in) , compressed_in(file_in)
, block_in(std::make_unique<NativeReader>(compressed_in, header_, 0)) , block_in(std::make_unique<NativeReader>(compressed_in, header_, 0))
{} {}
/// Flush data from input stream into file for future reading /// Flush data from input stream into file for future reading
TemporaryFileStream::Stat TemporaryFileStream::write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec) TemporaryFileStreamLegacy::Stat TemporaryFileStreamLegacy::write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec)
{ {
WriteBufferFromFile file_buf(path); WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {})); CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));

View File

@ -9,8 +9,10 @@
namespace DB namespace DB
{ {
/// Used only in MergeJoin
/// TODO: use `TemporaryDataOnDisk` instead
/// To read the data that was flushed into the temporary data file. /// To read the data that was flushed into the temporary data file.
struct TemporaryFileStream struct TemporaryFileStreamLegacy
{ {
struct Stat struct Stat
{ {
@ -22,8 +24,8 @@ struct TemporaryFileStream
CompressedReadBuffer compressed_in; CompressedReadBuffer compressed_in;
std::unique_ptr<NativeReader> block_in; std::unique_ptr<NativeReader> block_in;
explicit TemporaryFileStream(const std::string & path); explicit TemporaryFileStreamLegacy(const std::string & path);
TemporaryFileStream(const std::string & path, const Block & header_); TemporaryFileStreamLegacy(const std::string & path, const Block & header_);
/// Flush data from input stream into file for future reading /// Flush data from input stream into file for future reading
static Stat write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec); static Stat write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec);

View File

@ -387,15 +387,38 @@ struct StringEqualsImpl
size_t size = a_offsets.size(); size_t size = a_offsets.size();
ColumnString::Offset prev_a_offset = 0; ColumnString::Offset prev_a_offset = 0;
for (size_t i = 0; i < size; ++i) if (b_size == 0)
{ {
auto a_size = a_offsets[i] - prev_a_offset - 1; /*
* Add the fast path of string comparison if the string constant is empty
* and b_size is 0. If a_size is also 0, both of string a and b are empty
* string. There is no need to call memequalSmallAllowOverflow15() for
* string comparison.
*/
for (size_t i = 0; i < size; ++i)
{
auto a_size = a_offsets[i] - prev_a_offset - 1;
c[i] = positive == memequalSmallAllowOverflow15( if (a_size == 0)
a_data.data() + prev_a_offset, a_size, c[i] = positive;
b_data.data(), b_size); else
c[i] = !positive;
prev_a_offset = a_offsets[i]; prev_a_offset = a_offsets[i];
}
}
else
{
for (size_t i = 0; i < size; ++i)
{
auto a_size = a_offsets[i] - prev_a_offset - 1;
c[i] = positive == memequalSmallAllowOverflow15(
a_data.data() + prev_a_offset, a_size,
b_data.data(), b_size);
prev_a_offset = a_offsets[i];
}
} }
} }

View File

@ -35,6 +35,7 @@
#include <Interpreters/JIT/CompiledExpressionCache.h> #include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Core/ProtocolDefines.h> #include <Core/ProtocolDefines.h>
#include <Disks/TemporaryFileOnDisk.h> #include <Disks/TemporaryFileOnDisk.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
@ -59,6 +60,20 @@ namespace CurrentMetrics
extern const Metric TemporaryFilesForAggregation; extern const Metric TemporaryFilesForAggregation;
} }
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
extern const int TOO_MANY_ROWS;
extern const int EMPTY_DATA_PASSED;
extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS;
extern const int LOGICAL_ERROR;
}
}
namespace namespace
{ {
/** Collects observed HashMap-s sizes to avoid redundant intermediate resizes. /** Collects observed HashMap-s sizes to avoid redundant intermediate resizes.
@ -311,17 +326,6 @@ size_t getMinBytesForPrefetch()
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
extern const int NOT_ENOUGH_SPACE;
extern const int TOO_MANY_ROWS;
extern const int EMPTY_DATA_PASSED;
extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS;
extern const int LOGICAL_ERROR;
}
AggregatedDataVariants::~AggregatedDataVariants() AggregatedDataVariants::~AggregatedDataVariants()
{ {
if (aggregator && !aggregator->all_aggregates_has_trivial_destructor) if (aggregator && !aggregator->all_aggregates_has_trivial_destructor)
@ -566,6 +570,7 @@ Aggregator::Aggregator(const Block & header_, const Params & params_)
: header(header_) : header(header_)
, keys_positions(calculateKeysPositions(header, params_)) , keys_positions(calculateKeysPositions(header, params_))
, params(params_) , params(params_)
, tmp_data(params.tmp_data_scope ? std::make_unique<TemporaryDataOnDisk>(params.tmp_data_scope) : nullptr)
, min_bytes_for_prefetch(getMinBytesForPrefetch()) , min_bytes_for_prefetch(getMinBytesForPrefetch())
{ {
/// Use query-level memory tracker /// Use query-level memory tracker
@ -1562,30 +1567,28 @@ bool Aggregator::executeOnBlock(Columns columns,
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size) const void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size) const
{ {
if (!tmp_data)
throw Exception("Cannot write to temporary file because temporary file is not initialized", ErrorCodes::LOGICAL_ERROR);
Stopwatch watch; Stopwatch watch;
size_t rows = data_variants.size(); size_t rows = data_variants.size();
auto file = createTempFile(max_temp_file_size); auto & out_stream = tmp_data->createStream(getHeader(false), CurrentMetrics::TemporaryFilesForAggregation, max_temp_file_size);
const auto & path = file->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeWriter block_out(compressed_buf, DBMS_TCP_PROTOCOL_VERSION, getHeader(false));
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", path);
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart); ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", out_stream.path());
/// Flush only two-level data and possibly overflow data. /// Flush only two-level data and possibly overflow data.
#define M(NAME) \ #define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out); writeToTemporaryFileImpl(data_variants, *data_variants.NAME, out_stream);
if (false) {} // NOLINT if (false) {} // NOLINT
APPLY_FOR_VARIANTS_TWO_LEVEL(M) APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M #undef M
else else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); throw Exception("Unknown aggregated data variant", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
/// NOTE Instead of freeing up memory and creating new hash tables and arenas, you can re-use the old ones. /// NOTE Instead of freeing up memory and creating new hash tables and arenas, you can re-use the old ones.
data_variants.init(data_variants.type); data_variants.init(data_variants.type);
@ -1598,62 +1601,32 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
data_variants.without_key = place; data_variants.without_key = place;
} }
block_out.flush(); auto stat = out_stream.finishWriting();
compressed_buf.next();
file_buf.next(); ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, stat.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, stat.uncompressed_size);
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, stat.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, stat.uncompressed_size);
double elapsed_seconds = watch.elapsedSeconds(); double elapsed_seconds = watch.elapsedSeconds();
size_t compressed_bytes = file_buf.count(); double compressed_size = stat.compressed_size;
size_t uncompressed_bytes = compressed_buf.count(); double uncompressed_size = stat.uncompressed_size;
{
std::lock_guard lock(temporary_files.mutex);
temporary_files.files.emplace_back(std::move(file));
temporary_files.sum_size_uncompressed += uncompressed_bytes;
temporary_files.sum_size_compressed += compressed_bytes;
}
ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, uncompressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, uncompressed_bytes);
LOG_DEBUG(log, LOG_DEBUG(log,
"Written part in {:.3f} sec., {} rows, {} uncompressed, {} compressed," "Written part in {:.3f} sec., {} rows, {} uncompressed, {} compressed,"
" {:.3f} uncompressed bytes per row, {:.3f} compressed bytes per row, compression rate: {:.3f}" " {:.3f} uncompressed bytes per row, {:.3f} compressed bytes per row, compression rate: {:.3f}"
" ({:.3f} rows/sec., {}/sec. uncompressed, {}/sec. compressed)", " ({:.3f} rows/sec., {}/sec. uncompressed, {}/sec. compressed)",
elapsed_seconds, elapsed_seconds,
rows, rows,
ReadableSize(uncompressed_bytes), ReadableSize(uncompressed_size),
ReadableSize(compressed_bytes), ReadableSize(compressed_size),
static_cast<double>(uncompressed_bytes) / rows, static_cast<double>(uncompressed_size) / rows,
static_cast<double>(compressed_bytes) / rows, static_cast<double>(compressed_size) / rows,
static_cast<double>(uncompressed_bytes) / compressed_bytes, static_cast<double>(uncompressed_size) / compressed_size,
static_cast<double>(rows) / elapsed_seconds, static_cast<double>(rows) / elapsed_seconds,
ReadableSize(static_cast<double>(uncompressed_bytes) / elapsed_seconds), ReadableSize(static_cast<double>(uncompressed_size) / elapsed_seconds),
ReadableSize(static_cast<double>(compressed_bytes) / elapsed_seconds)); ReadableSize(static_cast<double>(compressed_size) / elapsed_seconds));
} }
TemporaryFileOnDiskHolder Aggregator::createTempFile(size_t max_temp_file_size) const
{
auto file = std::make_unique<TemporaryFileOnDisk>(params.tmp_volume->getDisk(), CurrentMetrics::TemporaryFilesForAggregation);
// enoughSpaceInDirectory() is not enough to make it right, since
// another process (or another thread of aggregator) can consume all
// space.
//
// But true reservation (IVolume::reserve()) cannot be used here since
// current_memory_usage does not takes compression into account and
// will reserve way more that actually will be used.
//
// Hence let's do a simple check.
if (max_temp_file_size > 0 && !enoughSpaceInDirectory(file->getPath(), max_temp_file_size))
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for external aggregation in '{}'", file->path());
return file;
}
template <typename Method> template <typename Method>
Block Aggregator::convertOneBucketToBlock( Block Aggregator::convertOneBucketToBlock(
AggregatedDataVariants & data_variants, AggregatedDataVariants & data_variants,
@ -1703,7 +1676,7 @@ template <typename Method>
void Aggregator::writeToTemporaryFileImpl( void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants, AggregatedDataVariants & data_variants,
Method & method, Method & method,
NativeWriter & out) const TemporaryFileStream & out) const
{ {
size_t max_temporary_block_size_rows = 0; size_t max_temporary_block_size_rows = 0;
size_t max_temporary_block_size_bytes = 0; size_t max_temporary_block_size_bytes = 0;

View File

@ -29,6 +29,7 @@
#include <Interpreters/AggregateDescription.h> #include <Interpreters/AggregateDescription.h>
#include <Interpreters/AggregationCommon.h> #include <Interpreters/AggregationCommon.h>
#include <Interpreters/JIT/compileFunction.h> #include <Interpreters/JIT/compileFunction.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h> #include <Columns/ColumnFixedString.h>
@ -925,7 +926,7 @@ public:
/// Return empty result when aggregating without keys on empty set. /// Return empty result when aggregating without keys on empty set.
bool empty_result_for_aggregation_by_empty_set; bool empty_result_for_aggregation_by_empty_set;
VolumePtr tmp_volume; TemporaryDataOnDiskScopePtr tmp_data_scope;
/// Settings is used to determine cache size. No threads are created. /// Settings is used to determine cache size. No threads are created.
size_t max_threads; size_t max_threads;
@ -970,7 +971,7 @@ public:
size_t group_by_two_level_threshold_bytes_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_, size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_, bool empty_result_for_aggregation_by_empty_set_,
VolumePtr tmp_volume_, TemporaryDataOnDiskScopePtr tmp_data_scope_,
size_t max_threads_, size_t max_threads_,
size_t min_free_disk_space_, size_t min_free_disk_space_,
bool compile_aggregate_expressions_, bool compile_aggregate_expressions_,
@ -990,7 +991,7 @@ public:
, group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_) , group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_)
, max_bytes_before_external_group_by(max_bytes_before_external_group_by_) , max_bytes_before_external_group_by(max_bytes_before_external_group_by_)
, empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_) , empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_)
, tmp_volume(tmp_volume_) , tmp_data_scope(std::move(tmp_data_scope_))
, max_threads(max_threads_) , max_threads(max_threads_)
, min_free_disk_space(min_free_disk_space_) , min_free_disk_space(min_free_disk_space_)
, compile_aggregate_expressions(compile_aggregate_expressions_) , compile_aggregate_expressions(compile_aggregate_expressions_)
@ -1071,25 +1072,9 @@ public:
/// For external aggregation. /// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size = 0) const; void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size = 0) const;
TemporaryFileOnDiskHolder createTempFile(size_t max_temp_file_size) const; bool hasTemporaryData() const { return tmp_data && !tmp_data->empty(); }
bool hasTemporaryFiles() const { return !temporary_files.empty(); } const TemporaryDataOnDisk & getTemporaryData() const { return *tmp_data; }
struct TemporaryFiles
{
std::vector<TemporaryFileOnDiskHolder> files;
size_t sum_size_uncompressed = 0;
size_t sum_size_compressed = 0;
mutable std::mutex mutex;
bool empty() const
{
std::lock_guard lock(mutex);
return files.empty();
}
};
const TemporaryFiles & getTemporaryFiles() const { return temporary_files; }
/// Get data structure of the result. /// Get data structure of the result.
Block getHeader(bool final) const; Block getHeader(bool final) const;
@ -1148,7 +1133,7 @@ private:
Poco::Logger * log = &Poco::Logger::get("Aggregator"); Poco::Logger * log = &Poco::Logger::get("Aggregator");
/// For external aggregation. /// For external aggregation.
mutable TemporaryFiles temporary_files; TemporaryDataOnDiskPtr tmp_data;
size_t min_bytes_for_prefetch = 0; size_t min_bytes_for_prefetch = 0;
@ -1251,7 +1236,7 @@ private:
void writeToTemporaryFileImpl( void writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants, AggregatedDataVariants & data_variants,
Method & method, Method & method,
NativeWriter & out) const; TemporaryFileStream & out) const;
/// Merge NULL key data from hash table `src` into `dst`. /// Merge NULL key data from hash table `src` into `dst`.
template <typename Method, typename Table> template <typename Method, typename Table>

View File

@ -30,6 +30,7 @@
#include <Storages/CompressionCodecSelector.h> #include <Storages/CompressionCodecSelector.h>
#include <Storages/StorageS3Settings.h> #include <Storages/StorageS3Settings.h>
#include <Disks/DiskLocal.h> #include <Disks/DiskLocal.h>
#include <Disks/DiskDecorator.h>
#include <Disks/ObjectStorages/IObjectStorage.h> #include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h> #include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/ThreadPoolReader.h> #include <Disks/IO/ThreadPoolReader.h>
@ -37,6 +38,7 @@
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/ActionLocksManager.h> #include <Interpreters/ActionLocksManager.h>
#include <Interpreters/ExternalLoaderXMLConfigRepository.h> #include <Interpreters/ExternalLoaderXMLConfigRepository.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Core/SettingsQuirks.h> #include <Core/SettingsQuirks.h>
#include <Access/AccessControl.h> #include <Access/AccessControl.h>
@ -188,7 +190,7 @@ struct ContextSharedPart : boost::noncopyable
ConfigurationPtr config; /// Global configuration settings. ConfigurationPtr config; /// Global configuration settings.
String tmp_path; /// Path to the temporary files that occur when processing the request. String tmp_path; /// Path to the temporary files that occur when processing the request.
mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request. TemporaryDataOnDiskScopePtr temp_data_on_disk; /// Temporary files that occur when processing the request accounted here.
mutable std::unique_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization. mutable std::unique_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::unique_ptr<ExternalDictionariesLoader> external_dictionaries_loader; mutable std::unique_ptr<ExternalDictionariesLoader> external_dictionaries_loader;
@ -681,10 +683,27 @@ Strings Context::getWarnings() const
return common_warnings; return common_warnings;
} }
/// TODO: remove, use `getTempDataOnDisk`
VolumePtr Context::getTemporaryVolume() const VolumePtr Context::getTemporaryVolume() const
{ {
auto lock = getLock(); auto lock = getLock();
return shared->tmp_volume; if (shared->temp_data_on_disk)
return shared->temp_data_on_disk->getVolume();
return nullptr;
}
TemporaryDataOnDiskScopePtr Context::getTempDataOnDisk() const
{
auto lock = getLock();
if (this->temp_data_on_disk)
return this->temp_data_on_disk;
return shared->temp_data_on_disk;
}
void Context::setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_)
{
auto lock = getLock();
this->temp_data_on_disk = std::move(temp_data_on_disk_);
} }
void Context::setPath(const String & path) void Context::setPath(const String & path)
@ -693,7 +712,7 @@ void Context::setPath(const String & path)
shared->path = path; shared->path = path;
if (shared->tmp_path.empty() && !shared->tmp_volume) if (shared->tmp_path.empty() && !shared->temp_data_on_disk)
shared->tmp_path = shared->path + "tmp/"; shared->tmp_path = shared->path + "tmp/";
if (shared->flags_path.empty()) if (shared->flags_path.empty())
@ -712,9 +731,10 @@ void Context::setPath(const String & path)
shared->user_defined_path = shared->path + "user_defined/"; shared->user_defined_path = shared->path + "user_defined/";
} }
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name) VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name, size_t max_size)
{ {
std::lock_guard lock(shared->storage_policies_mutex); std::lock_guard lock(shared->storage_policies_mutex);
VolumePtr volume;
if (policy_name.empty()) if (policy_name.empty())
{ {
@ -723,21 +743,41 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic
shared->tmp_path += '/'; shared->tmp_path += '/';
auto disk = std::make_shared<DiskLocal>("_tmp_default", shared->tmp_path, 0); auto disk = std::make_shared<DiskLocal>("_tmp_default", shared->tmp_path, 0);
shared->tmp_volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk, 0); volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk, 0);
} }
else else
{ {
StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name); StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name);
if (tmp_policy->getVolumes().size() != 1) if (tmp_policy->getVolumes().size() != 1)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"Policy '{} is used temporary files, such policy should have exactly one volume", policy_name); "Policy '{}' is used temporary files, such policy should have exactly one volume", policy_name);
shared->tmp_volume = tmp_policy->getVolume(0); volume = tmp_policy->getVolume(0);
} }
if (shared->tmp_volume->getDisks().empty()) if (volume->getDisks().empty())
throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG); throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
return shared->tmp_volume; for (const auto & disk : volume->getDisks())
{
if (!disk)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Temporary disk is null");
/// Check that underlying disk is local (can be wrapped in decorator)
DiskPtr disk_ptr = disk;
if (const auto * disk_decorator = dynamic_cast<const DiskDecorator *>(disk_ptr.get()))
disk_ptr = disk_decorator->getNestedDisk();
if (dynamic_cast<const DiskLocal *>(disk_ptr.get()) == nullptr)
{
const auto * disk_raw_ptr = disk_ptr.get();
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"Disk '{}' ({}) is not local and can't be used for temporary files",
disk_ptr->getName(), typeid(*disk_raw_ptr).name());
}
}
shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
return volume;
} }
void Context::setFlagsPath(const String & path) void Context::setFlagsPath(const String & path)
@ -2897,14 +2937,13 @@ void Context::shutdown()
} }
} }
// Special volumes might also use disks that require shutdown. /// Special volumes might also use disks that require shutdown.
if (shared->tmp_volume) auto & tmp_data = shared->temp_data_on_disk;
if (tmp_data && tmp_data->getVolume())
{ {
auto & disks = shared->tmp_volume->getDisks(); auto & disks = tmp_data->getVolume()->getDisks();
for (auto & disk : disks) for (auto & disk : disks)
{
disk->shutdown(); disk->shutdown();
}
} }
shared->shutdown(); shared->shutdown();

View File

@ -161,6 +161,8 @@ using ReadTaskCallback = std::function<String()>;
using MergeTreeReadTaskCallback = std::function<std::optional<PartitionReadResponse>(PartitionReadRequest)>; using MergeTreeReadTaskCallback = std::function<std::optional<PartitionReadResponse>(PartitionReadRequest)>;
class TemporaryDataOnDiskScope;
using TemporaryDataOnDiskScopePtr = std::shared_ptr<TemporaryDataOnDiskScope>;
#if USE_ROCKSDB #if USE_ROCKSDB
class MergeTreeMetadataCache; class MergeTreeMetadataCache;
@ -362,6 +364,8 @@ private:
/// A flag, used to mark if reader needs to apply deleted rows mask. /// A flag, used to mark if reader needs to apply deleted rows mask.
bool apply_deleted_mask = true; bool apply_deleted_mask = true;
/// Temporary data for query execution accounting.
TemporaryDataOnDiskScopePtr temp_data_on_disk;
public: public:
/// Some counters for current query execution. /// Some counters for current query execution.
/// Most of them are workarounds and should be removed in the future. /// Most of them are workarounds and should be removed in the future.
@ -435,7 +439,10 @@ public:
/// A list of warnings about server configuration to place in `system.warnings` table. /// A list of warnings about server configuration to place in `system.warnings` table.
Strings getWarnings() const; Strings getWarnings() const;
VolumePtr getTemporaryVolume() const; VolumePtr getTemporaryVolume() const; /// TODO: remove, use `getTempDataOnDisk`
TemporaryDataOnDiskScopePtr getTempDataOnDisk() const;
void setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_);
void setPath(const String & path); void setPath(const String & path);
void setFlagsPath(const String & path); void setFlagsPath(const String & path);
@ -446,7 +453,7 @@ public:
void addWarningMessage(const String & msg) const; void addWarningMessage(const String & msg) const;
VolumePtr setTemporaryStorage(const String & path, const String & policy_name = ""); VolumePtr setTemporaryStorage(const String & path, const String & policy_name, size_t max_size);
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>; using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;

View File

@ -1453,7 +1453,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
settings.max_bytes_before_remerge_sort, settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio, settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort, settings.max_bytes_before_external_sort,
this->context->getTemporaryVolume(), this->context->getTempDataOnDisk(),
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.optimize_sorting_by_input_stream_properties); settings.optimize_sorting_by_input_stream_properties);
sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", join_pos)); sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", join_pos));
@ -2354,7 +2354,7 @@ static Aggregator::Params getAggregatorParams(
settings.empty_result_for_aggregation_by_empty_set settings.empty_result_for_aggregation_by_empty_set
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty() || (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
&& query_analyzer.hasConstAggregationKeys()), && query_analyzer.hasConstAggregationKeys()),
context.getTemporaryVolume(), context.getTempDataOnDisk(),
settings.max_threads, settings.max_threads,
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions, settings.compile_aggregate_expressions,
@ -2616,7 +2616,7 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
settings.max_bytes_before_remerge_sort, settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio, settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort, settings.max_bytes_before_external_sort,
context->getTemporaryVolume(), context->getTempDataOnDisk(),
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.optimize_sorting_by_input_stream_properties); settings.optimize_sorting_by_input_stream_properties);
sorting_step->setStepDescription("Sorting for window '" + window.window_name + "'"); sorting_step->setStepDescription("Sorting for window '" + window.window_name + "'");
@ -2675,7 +2675,7 @@ void InterpreterSelectQuery::executeOrder(QueryPlan & query_plan, InputOrderInfo
settings.max_bytes_before_remerge_sort, settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio, settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort, settings.max_bytes_before_external_sort,
context->getTemporaryVolume(), context->getTempDataOnDisk(),
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.optimize_sorting_by_input_stream_properties); settings.optimize_sorting_by_input_stream_properties);

View File

@ -234,9 +234,13 @@ static void compileFunction(llvm::Module & module, const IFunctionBase & functio
auto * cur_block = b.GetInsertBlock(); auto * cur_block = b.GetInsertBlock();
for (auto & col : columns) for (auto & col : columns)
{ {
col.data->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.data, 1), cur_block); auto * ty_data = llvm::cast<llvm::PointerType>(col.data->getType()->getScalarType())->getElementType();
col.data->addIncoming(b.CreateConstInBoundsGEP1_64(ty_data, col.data, 1), cur_block);
if (col.null) if (col.null)
col.null->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.null, 1), cur_block); {
auto * ty_null = llvm::cast<llvm::PointerType>(col.null->getType()->getScalarType())->getElementType();
col.null->addIncoming(b.CreateConstInBoundsGEP1_64(ty_null, col.null, 1), cur_block);
}
} }
auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1)); auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1));
@ -293,7 +297,8 @@ static void compileCreateAggregateStatesFunctions(llvm::Module & module, const s
{ {
size_t aggregate_function_offset = function_to_compile.aggregate_data_offset; size_t aggregate_function_offset = function_to_compile.aggregate_data_offset;
const auto * aggregate_function = function_to_compile.function; const auto * aggregate_function = function_to_compile.function;
auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place_arg, aggregate_function_offset); auto * ty_aggregate_data_place_arg = llvm::cast<llvm::PointerType>(aggregate_data_place_arg->getType()->getScalarType())->getElementType();
auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_place_arg, aggregate_data_place_arg, aggregate_function_offset);
aggregate_function->compileCreate(b, aggregation_place_with_offset); aggregate_function->compileCreate(b, aggregation_place_with_offset);
} }
@ -324,7 +329,8 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
b.SetInsertPoint(entry); b.SetInsertPoint(entry);
llvm::IRBuilder<> entry_builder(entry); llvm::IRBuilder<> entry_builder(entry);
auto * places_start_arg = entry_builder.CreateInBoundsGEP(nullptr, places_arg, row_start_arg); auto * ty_places_arg = llvm::cast<llvm::PointerType>(places_arg->getType()->getScalarType())->getElementType();
auto * places_start_arg = entry_builder.CreateInBoundsGEP(ty_places_arg, places_arg, row_start_arg);
std::vector<ColumnDataPlaceholder> columns; std::vector<ColumnDataPlaceholder> columns;
size_t previous_columns_size = 0; size_t previous_columns_size = 0;
@ -342,11 +348,13 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
const auto & argument_type = argument_types[column_argument_index]; const auto & argument_type = argument_types[column_argument_index];
auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, previous_columns_size + column_argument_index)); auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, previous_columns_size + column_argument_index));
data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo()); data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo());
data_placeholder.data_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.data_init, row_start_arg); auto * ty_data_init = llvm::cast<llvm::PointerType>(data_placeholder.data_init->getType()->getScalarType())->getElementType();
data_placeholder.data_init = entry_builder.CreateInBoundsGEP(ty_data_init, data_placeholder.data_init, row_start_arg);
if (argument_type->isNullable()) if (argument_type->isNullable())
{ {
data_placeholder.null_init = b.CreateExtractValue(data, {1}); data_placeholder.null_init = b.CreateExtractValue(data, {1});
data_placeholder.null_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.null_init, row_start_arg); auto * ty_null_init = llvm::cast<llvm::PointerType>(data_placeholder.null_init->getType()->getScalarType())->getElementType();
data_placeholder.null_init = entry_builder.CreateInBoundsGEP(ty_null_init, data_placeholder.null_init, row_start_arg);
} }
else else
{ {
@ -419,7 +427,8 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
arguments_values[column_argument_index] = nullable_value; arguments_values[column_argument_index] = nullable_value;
} }
auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregation_place, aggregate_function_offset); auto * ty_aggregation_place = llvm::cast<llvm::PointerType>(aggregation_place->getType()->getScalarType())->getElementType();
auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregation_place, aggregation_place, aggregate_function_offset);
aggregate_function_ptr->compileAdd(b, aggregation_place_with_offset, arguments_types, arguments_values); aggregate_function_ptr->compileAdd(b, aggregation_place_with_offset, arguments_types, arguments_values);
previous_columns_size += function_arguments_size; previous_columns_size += function_arguments_size;
@ -430,13 +439,18 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
auto * cur_block = b.GetInsertBlock(); auto * cur_block = b.GetInsertBlock();
for (auto & col : columns) for (auto & col : columns)
{ {
col.data->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.data, 1), cur_block); auto * ty_data = llvm::cast<llvm::PointerType>(col.data->getType()->getScalarType())->getElementType();
col.data->addIncoming(b.CreateConstInBoundsGEP1_64(ty_data, col.data, 1), cur_block);
if (col.null) if (col.null)
col.null->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.null, 1), cur_block); {
auto * ty_null = llvm::cast<llvm::PointerType>(col.null->getType()->getScalarType())->getElementType();
col.null->addIncoming(b.CreateConstInBoundsGEP1_64(ty_null, col.null, 1), cur_block);
}
} }
places_phi->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, places_phi, 1), cur_block); auto * ty_places_phi = llvm::cast<llvm::PointerType>(places_phi->getType()->getScalarType())->getElementType();
places_phi->addIncoming(b.CreateConstInBoundsGEP1_64(ty_places_phi, places_phi, 1), cur_block);
auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1)); auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1));
counter_phi->addIncoming(value, cur_block); counter_phi->addIncoming(value, cur_block);
@ -488,11 +502,13 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
const auto & argument_type = argument_types[column_argument_index]; const auto & argument_type = argument_types[column_argument_index];
auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, previous_columns_size + column_argument_index)); auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, previous_columns_size + column_argument_index));
data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo()); data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo());
data_placeholder.data_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.data_init, row_start_arg); auto * ty_data_init = llvm::cast<llvm::PointerType>(data_placeholder.data_init->getType()->getScalarType())->getElementType();
data_placeholder.data_init = entry_builder.CreateInBoundsGEP(ty_data_init, data_placeholder.data_init, row_start_arg);
if (argument_type->isNullable()) if (argument_type->isNullable())
{ {
data_placeholder.null_init = b.CreateExtractValue(data, {1}); data_placeholder.null_init = b.CreateExtractValue(data, {1});
data_placeholder.null_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.null_init, row_start_arg); auto * ty_null_init = llvm::cast<llvm::PointerType>(data_placeholder.null_init->getType()->getScalarType())->getElementType();
data_placeholder.null_init = entry_builder.CreateInBoundsGEP(ty_null_init, data_placeholder.null_init, row_start_arg);
} }
else else
{ {
@ -560,7 +576,8 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
arguments_values[column_argument_index] = nullable_value; arguments_values[column_argument_index] = nullable_value;
} }
auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, place_arg, aggregate_function_offset); auto * ty_place_arg = llvm::cast<llvm::PointerType>(place_arg->getType()->getScalarType())->getElementType();
auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(ty_place_arg, place_arg, aggregate_function_offset);
aggregate_function_ptr->compileAdd(b, aggregation_place_with_offset, arguments_types, arguments_values); aggregate_function_ptr->compileAdd(b, aggregation_place_with_offset, arguments_types, arguments_values);
previous_columns_size += function_arguments_size; previous_columns_size += function_arguments_size;
@ -571,10 +588,14 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
auto * cur_block = b.GetInsertBlock(); auto * cur_block = b.GetInsertBlock();
for (auto & col : columns) for (auto & col : columns)
{ {
col.data->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.data, 1), cur_block); auto * ty_data = llvm::cast<llvm::PointerType>(col.data->getType()->getScalarType())->getElementType();
col.data->addIncoming(b.CreateConstInBoundsGEP1_64(ty_data, col.data, 1), cur_block);
if (col.null) if (col.null)
col.null->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.null, 1), cur_block); {
auto * ty_null = llvm::cast<llvm::PointerType>(col.null->getType()->getScalarType())->getElementType();
col.null->addIncoming(b.CreateConstInBoundsGEP1_64(ty_null, col.null, 1), cur_block);
}
} }
auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1)); auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1));
@ -607,8 +628,10 @@ static void compileMergeAggregatesStates(llvm::Module & module, const std::vecto
size_t aggregate_function_offset = function_to_compile.aggregate_data_offset; size_t aggregate_function_offset = function_to_compile.aggregate_data_offset;
const auto * aggregate_function_ptr = function_to_compile.function; const auto * aggregate_function_ptr = function_to_compile.function;
auto * aggregate_data_place_merge_dst_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place_dst_arg, aggregate_function_offset); auto * ty_aggregate_data_place_dst_arg = llvm::cast<llvm::PointerType>(aggregate_data_place_dst_arg->getType()->getScalarType())->getElementType();
auto * aggregate_data_place_merge_src_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place_src_arg, aggregate_function_offset); auto * aggregate_data_place_merge_dst_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_place_dst_arg, aggregate_data_place_dst_arg, aggregate_function_offset);
auto * ty_aggregate_data_place_src_arg = llvm::cast<llvm::PointerType>(aggregate_data_place_src_arg->getType()->getScalarType())->getElementType();
auto * aggregate_data_place_merge_src_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_place_src_arg, aggregate_data_place_src_arg, aggregate_function_offset);
aggregate_function_ptr->compileMerge(b, aggregate_data_place_merge_dst_with_offset, aggregate_data_place_merge_src_with_offset); aggregate_function_ptr->compileMerge(b, aggregate_data_place_merge_dst_with_offset, aggregate_data_place_merge_src_with_offset);
} }
@ -645,11 +668,13 @@ static void compileInsertAggregatesIntoResultColumns(llvm::Module & module, cons
auto return_type = functions[i].function->getReturnType(); auto return_type = functions[i].function->getReturnType();
auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, i)); auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, i));
columns[i].data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(return_type))->getPointerTo()); columns[i].data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(return_type))->getPointerTo());
columns[i].data_init = entry_builder.CreateInBoundsGEP(nullptr, columns[i].data_init, row_start_arg); auto * ty_data_init = llvm::cast<llvm::PointerType>(columns[i].data_init->getType()->getScalarType())->getElementType();
columns[i].data_init = entry_builder.CreateInBoundsGEP(ty_data_init, columns[i].data_init, row_start_arg);
if (return_type->isNullable()) if (return_type->isNullable())
{ {
columns[i].null_init = b.CreateExtractValue(data, {1}); columns[i].null_init = b.CreateExtractValue(data, {1});
columns[i].null_init = entry_builder.CreateInBoundsGEP(nullptr, columns[i].null_init, row_start_arg); auto * ty_null_init = llvm::cast<llvm::PointerType>(columns[i].null_init->getType()->getScalarType())->getElementType();
columns[i].null_init = entry_builder.CreateInBoundsGEP(ty_null_init, columns[i].null_init, row_start_arg);
} }
else else
{ {
@ -688,7 +713,8 @@ static void compileInsertAggregatesIntoResultColumns(llvm::Module & module, cons
const auto * aggregate_function_ptr = functions[i].function; const auto * aggregate_function_ptr = functions[i].function;
auto * aggregate_data_place = b.CreateLoad(b.getInt8Ty()->getPointerTo(), aggregate_data_place_phi); auto * aggregate_data_place = b.CreateLoad(b.getInt8Ty()->getPointerTo(), aggregate_data_place_phi);
auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place, aggregate_function_offset); auto * ty_aggregate_data_place = llvm::cast<llvm::PointerType>(aggregate_data_place->getType()->getScalarType())->getElementType();
auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(ty_aggregate_data_place, aggregate_data_place, aggregate_function_offset);
auto * final_value = aggregate_function_ptr->compileGetResult(b, aggregation_place_with_offset); auto * final_value = aggregate_function_ptr->compileGetResult(b, aggregation_place_with_offset);
@ -708,16 +734,21 @@ static void compileInsertAggregatesIntoResultColumns(llvm::Module & module, cons
auto * cur_block = b.GetInsertBlock(); auto * cur_block = b.GetInsertBlock();
for (auto & col : columns) for (auto & col : columns)
{ {
col.data->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.data, 1), cur_block); auto * ty_col_data = llvm::cast<llvm::PointerType>(col.data->getType()->getScalarType())->getElementType();
col.data->addIncoming(b.CreateConstInBoundsGEP1_64(ty_col_data, col.data, 1), cur_block);
if (col.null) if (col.null)
col.null->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.null, 1), cur_block); {
auto * ty_col_null = llvm::cast<llvm::PointerType>(col.null->getType()->getScalarType())->getElementType();
col.null->addIncoming(b.CreateConstInBoundsGEP1_64(ty_col_null, col.null, 1), cur_block);
}
} }
auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1), "", true, true); auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1), "", true, true);
counter_phi->addIncoming(value, cur_block); counter_phi->addIncoming(value, cur_block);
aggregate_data_place_phi->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place_phi, 1), cur_block); auto * ty_aggregate_data_place_phi = llvm::cast<llvm::PointerType>(aggregate_data_place_phi->getType()->getScalarType())->getElementType();
aggregate_data_place_phi->addIncoming(b.CreateConstInBoundsGEP1_64(ty_aggregate_data_place_phi, aggregate_data_place_phi, 1), cur_block);
b.CreateCondBr(b.CreateICmpEQ(value, row_end_arg), end, loop); b.CreateCondBr(b.CreateICmpEQ(value, row_end_arg), end, loop);
@ -842,11 +873,20 @@ CompiledSortDescriptionFunction compileSortDescription(
auto * lhs_column_data = b.CreatePointerCast(b.CreateExtractValue(lhs_column, {0}), column_native_type_pointer); auto * lhs_column_data = b.CreatePointerCast(b.CreateExtractValue(lhs_column, {0}), column_native_type_pointer);
auto * lhs_column_null_data = column_type_is_nullable ? b.CreateExtractValue(lhs_column, {1}) : nullptr; auto * lhs_column_null_data = column_type_is_nullable ? b.CreateExtractValue(lhs_column, {1}) : nullptr;
llvm::Value * lhs_value = b.CreateLoad(b.CreateInBoundsGEP(nullptr, lhs_column_data, lhs_index_arg)); #ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#endif
auto * ty_lhs_column_data = llvm::cast<llvm::PointerType>(lhs_column_data->getType()->getScalarType())->getElementType();
llvm::Value * lhs_value = b.CreateLoad(b.CreateInBoundsGEP(ty_lhs_column_data, lhs_column_data, lhs_index_arg));
#ifdef __clang__
#pragma clang diagnostic pop
#endif
if (lhs_column_null_data) if (lhs_column_null_data)
{ {
auto * is_null_value_pointer = b.CreateInBoundsGEP(nullptr, lhs_column_null_data, lhs_index_arg); auto * ty_lhs_column_null_data = llvm::cast<llvm::PointerType>(lhs_column_null_data->getType()->getScalarType())->getElementType();
auto * is_null_value_pointer = b.CreateInBoundsGEP(ty_lhs_column_null_data, lhs_column_null_data, lhs_index_arg);
auto * is_null = b.CreateICmpNE(b.CreateLoad(b.getInt8Ty(), is_null_value_pointer), b.getInt8(0)); auto * is_null = b.CreateICmpNE(b.CreateLoad(b.getInt8Ty(), is_null_value_pointer), b.getInt8(0));
auto * lhs_nullable_value = b.CreateInsertValue(b.CreateInsertValue(nullable_unitilized, lhs_value, {0}), is_null, {1}); auto * lhs_nullable_value = b.CreateInsertValue(b.CreateInsertValue(nullable_unitilized, lhs_value, {0}), is_null, {1});
lhs_value = lhs_nullable_value; lhs_value = lhs_nullable_value;
@ -856,10 +896,19 @@ CompiledSortDescriptionFunction compileSortDescription(
auto * rhs_column_data = b.CreatePointerCast(b.CreateExtractValue(rhs_column, {0}), column_native_type_pointer); auto * rhs_column_data = b.CreatePointerCast(b.CreateExtractValue(rhs_column, {0}), column_native_type_pointer);
auto * rhs_column_null_data = column_type_is_nullable ? b.CreateExtractValue(rhs_column, {1}) : nullptr; auto * rhs_column_null_data = column_type_is_nullable ? b.CreateExtractValue(rhs_column, {1}) : nullptr;
llvm::Value * rhs_value = b.CreateLoad(b.CreateInBoundsGEP(nullptr, rhs_column_data, rhs_index_arg)); #ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#endif
auto * ty_rhs_column_data = llvm::cast<llvm::PointerType>(rhs_column_data->getType()->getScalarType())->getElementType();
llvm::Value * rhs_value = b.CreateLoad(b.CreateInBoundsGEP(ty_rhs_column_data, rhs_column_data, rhs_index_arg));
#ifdef __clang__
#pragma clang diagnostic pop
#endif
if (rhs_column_null_data) if (rhs_column_null_data)
{ {
auto * is_null_value_pointer = b.CreateInBoundsGEP(nullptr, rhs_column_null_data, rhs_index_arg); auto * ty_rhs_column_null_data = llvm::cast<llvm::PointerType>(rhs_column_null_data->getType()->getScalarType())->getElementType();
auto * is_null_value_pointer = b.CreateInBoundsGEP(ty_rhs_column_null_data, rhs_column_null_data, rhs_index_arg);
auto * is_null = b.CreateICmpNE(b.CreateLoad(b.getInt8Ty(), is_null_value_pointer), b.getInt8(0)); auto * is_null = b.CreateICmpNE(b.CreateLoad(b.getInt8Ty(), is_null_value_pointer), b.getInt8(0));
auto * rhs_nullable_value = b.CreateInsertValue(b.CreateInsertValue(nullable_unitilized, rhs_value, {0}), is_null, {1}); auto * rhs_nullable_value = b.CreateInsertValue(b.CreateInsertValue(nullable_unitilized, rhs_value, {0}), is_null, {1});
rhs_value = rhs_nullable_value; rhs_value = rhs_nullable_value;

View File

@ -4,12 +4,13 @@
#include <Columns/ColumnLowCardinality.h> #include <Columns/ColumnLowCardinality.h>
#include <Core/SortCursor.h> #include <Core/SortCursor.h>
#include <Formats/TemporaryFileStream.h> #include <Formats/TemporaryFileStreamLegacy.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <Interpreters/MergeJoin.h> #include <Interpreters/MergeJoin.h>
#include <Interpreters/TableJoin.h> #include <Interpreters/TableJoin.h>
#include <Interpreters/JoinUtils.h> #include <Interpreters/JoinUtils.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Interpreters/sortBlock.h> #include <Interpreters/sortBlock.h>
#include <Processors/Sources/BlocksListSource.h> #include <Processors/Sources/BlocksListSource.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
@ -1032,7 +1033,7 @@ std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos) const
{ {
auto load_func = [&]() -> std::shared_ptr<Block> auto load_func = [&]() -> std::shared_ptr<Block>
{ {
TemporaryFileStream input(flushed_right_blocks[pos]->path(), materializeBlock(right_sample_block)); TemporaryFileStreamLegacy input(flushed_right_blocks[pos]->path(), materializeBlock(right_sample_block));
return std::make_shared<Block>(input.block_in->read()); return std::make_shared<Block>(input.block_in->read());
}; };

View File

@ -69,7 +69,7 @@ static bool isUnlimitedQuery(const IAST * ast)
} }
ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, ContextPtr query_context) ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr query_context)
{ {
EntryPtr res; EntryPtr res;
@ -198,7 +198,11 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
auto user_process_list_it = user_to_queries.find(client_info.current_user); auto user_process_list_it = user_to_queries.find(client_info.current_user);
if (user_process_list_it == user_to_queries.end()) if (user_process_list_it == user_to_queries.end())
user_process_list_it = user_to_queries.emplace(client_info.current_user, this).first; {
user_process_list_it = user_to_queries.emplace(std::piecewise_construct,
std::forward_as_tuple(client_info.current_user),
std::forward_as_tuple(query_context->getGlobalContext(), this)).first;
}
ProcessListForUser & user_process_list = user_process_list_it->second; ProcessListForUser & user_process_list = user_process_list_it->second;
/// Actualize thread group info /// Actualize thread group info
@ -208,6 +212,11 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
std::lock_guard lock_thread_group(thread_group->mutex); std::lock_guard lock_thread_group(thread_group->mutex);
thread_group->performance_counters.setParent(&user_process_list.user_performance_counters); thread_group->performance_counters.setParent(&user_process_list.user_performance_counters);
thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker); thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker);
if (user_process_list.user_temp_data_on_disk)
{
query_context->setTempDataOnDisk(std::make_shared<TemporaryDataOnDiskScope>(
user_process_list.user_temp_data_on_disk, settings.max_temp_data_on_disk_size_for_query));
}
thread_group->query = query_; thread_group->query = query_;
thread_group->one_line_query = toOneLineQuery(query_); thread_group->one_line_query = toOneLineQuery(query_);
thread_group->normalized_query_hash = normalizedQueryHash<false>(query_); thread_group->normalized_query_hash = normalizedQueryHash<false>(query_);
@ -556,9 +565,19 @@ ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_ev
ProcessListForUser::ProcessListForUser(ProcessList * global_process_list) ProcessListForUser::ProcessListForUser(ProcessList * global_process_list)
: ProcessListForUser(nullptr, global_process_list)
{}
ProcessListForUser::ProcessListForUser(ContextPtr global_context, ProcessList * global_process_list)
: user_overcommit_tracker(global_process_list, this) : user_overcommit_tracker(global_process_list, this)
{ {
user_memory_tracker.setOvercommitTracker(&user_overcommit_tracker); user_memory_tracker.setOvercommitTracker(&user_overcommit_tracker);
if (global_context)
{
size_t size_limit = global_context->getSettingsRef().max_temp_data_on_disk_size_for_user;
user_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(global_context->getTempDataOnDisk(), size_limit);
}
} }

View File

@ -5,6 +5,8 @@
#include <Interpreters/CancellationCode.h> #include <Interpreters/CancellationCode.h>
#include <Interpreters/ClientInfo.h> #include <Interpreters/ClientInfo.h>
#include <Interpreters/QueryPriorities.h> #include <Interpreters/QueryPriorities.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Interpreters/Context.h>
#include <QueryPipeline/BlockIO.h> #include <QueryPipeline/BlockIO.h>
#include <QueryPipeline/ExecutionSpeedLimits.h> #include <QueryPipeline/ExecutionSpeedLimits.h>
#include <Storages/IStorage_fwd.h> #include <Storages/IStorage_fwd.h>
@ -236,6 +238,8 @@ struct ProcessListForUser
{ {
explicit ProcessListForUser(ProcessList * global_process_list); explicit ProcessListForUser(ProcessList * global_process_list);
ProcessListForUser(ContextPtr global_context, ProcessList * global_process_list);
/// query_id -> ProcessListElement(s). There can be multiple queries with the same query_id as long as all queries except one are cancelled. /// query_id -> ProcessListElement(s). There can be multiple queries with the same query_id as long as all queries except one are cancelled.
using QueryToElement = std::unordered_map<String, QueryStatus *>; using QueryToElement = std::unordered_map<String, QueryStatus *>;
QueryToElement queries; QueryToElement queries;
@ -244,6 +248,8 @@ struct ProcessListForUser
/// Limit and counter for memory of all simultaneously running queries of single user. /// Limit and counter for memory of all simultaneously running queries of single user.
MemoryTracker user_memory_tracker{VariableContext::User}; MemoryTracker user_memory_tracker{VariableContext::User};
TemporaryDataOnDiskScopePtr user_temp_data_on_disk;
UserOvercommitTracker user_overcommit_tracker; UserOvercommitTracker user_overcommit_tracker;
/// Count network usage for all simultaneously running queries of single user. /// Count network usage for all simultaneously running queries of single user.
@ -257,6 +263,7 @@ struct ProcessListForUser
/// Clears network bandwidth Throttler, so it will not count periods of inactivity. /// Clears network bandwidth Throttler, so it will not count periods of inactivity.
void resetTrackers() void resetTrackers()
{ {
/// TODO: should we drop user_temp_data_on_disk here?
user_memory_tracker.reset(); user_memory_tracker.reset();
if (user_throttler) if (user_throttler)
user_throttler.reset(); user_throttler.reset();
@ -374,7 +381,7 @@ public:
* If timeout is passed - throw an exception. * If timeout is passed - throw an exception.
* Don't count KILL QUERY queries. * Don't count KILL QUERY queries.
*/ */
EntryPtr insert(const String & query_, const IAST * ast, ContextPtr query_context); EntryPtr insert(const String & query_, const IAST * ast, ContextMutablePtr query_context);
/// Number of currently executing queries. /// Number of currently executing queries.
size_t size() const { return processes.size(); } size_t size() const { return processes.size(); }

View File

@ -5,7 +5,7 @@
#include <Processors/Sources/SourceFromSingleChunk.h> #include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Merges/MergingSortedTransform.h> #include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Sources/TemporaryFileLazySource.h> #include <Processors/Sources/TemporaryFileLazySource.h>
#include <Formats/TemporaryFileStream.h> #include <Formats/TemporaryFileStreamLegacy.h>
#include <Disks/IVolume.h> #include <Disks/IVolume.h>
#include <Disks/TemporaryFileOnDisk.h> #include <Disks/TemporaryFileOnDisk.h>
@ -39,7 +39,7 @@ namespace
TemporaryFileOnDiskHolder flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec) TemporaryFileOnDiskHolder flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec)
{ {
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, CurrentMetrics::TemporaryFilesForJoin); auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, CurrentMetrics::TemporaryFilesForJoin);
auto write_stat = TemporaryFileStream::write(tmp_file->getPath(), header, std::move(pipeline), codec); auto write_stat = TemporaryFileStreamLegacy::write(tmp_file->getPath(), header, std::move(pipeline), codec);
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, write_stat.compressed_bytes); ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, write_stat.compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, write_stat.uncompressed_bytes); ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, write_stat.uncompressed_bytes);

View File

@ -0,0 +1,270 @@
#include <Interpreters/TemporaryDataOnDisk.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Formats/NativeWriter.h>
#include <Formats/NativeReader.h>
#include <Core/ProtocolDefines.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int LOGICAL_ERROR;
extern const int NOT_ENOUGH_SPACE;
}
void TemporaryDataOnDiskScope::deltaAllocAndCheck(int compressed_delta, int uncompressed_delta)
{
if (parent)
parent->deltaAllocAndCheck(compressed_delta, uncompressed_delta);
/// check that we don't go negative
if ((compressed_delta < 0 && stat.compressed_size < static_cast<size_t>(-compressed_delta)) ||
(uncompressed_delta < 0 && stat.uncompressed_size < static_cast<size_t>(-uncompressed_delta)))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Negative temporary data size");
}
size_t new_consumprion = stat.compressed_size + compressed_delta;
if (compressed_delta > 0 && limit && new_consumprion > limit)
throw Exception(ErrorCodes::TOO_MANY_ROWS_OR_BYTES, "Limit for temporary files size exceeded");
stat.compressed_size += compressed_delta;
stat.uncompressed_size += uncompressed_delta;
}
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, CurrentMetrics::Value metric_scope, size_t max_file_size)
{
DiskPtr disk;
if (max_file_size > 0)
{
auto reservation = volume->reserve(max_file_size);
if (!reservation)
throw Exception("Not enough space on temporary disk", ErrorCodes::NOT_ENOUGH_SPACE);
disk = reservation->getDisk();
}
else
{
disk = volume->getDisk();
}
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, metric_scope);
std::lock_guard lock(mutex);
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, this));
return *tmp_stream;
}
std::vector<TemporaryFileStream *> TemporaryDataOnDisk::getStreams() const
{
std::vector<TemporaryFileStream *> res;
std::lock_guard lock(mutex);
res.reserve(streams.size());
for (const auto & stream : streams)
res.push_back(stream.get());
return res;
}
bool TemporaryDataOnDisk::empty() const
{
std::lock_guard lock(mutex);
return streams.empty();
}
struct TemporaryFileStream::OutputWriter
{
OutputWriter(const String & path, const Block & header_)
: out_file_buf(path)
, out_compressed_buf(out_file_buf)
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
{
}
void write(const Block & block)
{
if (finalized)
throw Exception("Cannot write to finalized stream", ErrorCodes::LOGICAL_ERROR);
out_writer.write(block);
}
void finalize()
{
if (finalized)
return;
/// if we called finalize() explicitly, and got an exception,
/// we don't want to get it again in the destructor, so set finalized flag first
finalized = true;
out_writer.flush();
out_compressed_buf.finalize();
out_file_buf.finalize();
}
~OutputWriter()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
WriteBufferFromFile out_file_buf;
CompressedWriteBuffer out_compressed_buf;
NativeWriter out_writer;
bool finalized = false;
};
struct TemporaryFileStream::InputReader
{
InputReader(const String & path, const Block & header_)
: in_file_buf(path)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION)
{
}
explicit InputReader(const String & path)
: in_file_buf(path)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION)
{
}
Block read() { return in_reader.read(); }
ReadBufferFromFile in_file_buf;
CompressedReadBuffer in_compressed_buf;
NativeReader in_reader;
};
TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_)
: parent(parent_)
, header(header_)
, file(std::move(file_))
, out_writer(std::make_unique<OutputWriter>(file->path(), header))
{
}
void TemporaryFileStream::write(const Block & block)
{
if (!out_writer)
throw Exception("Writing has been finished", ErrorCodes::LOGICAL_ERROR);
updateAllocAndCheck();
out_writer->write(block);
}
TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
{
if (out_writer)
{
out_writer->finalize();
/// The amount of written data can be changed after finalization, some buffers can be flushed
/// Need to update the stat
updateAllocAndCheck();
out_writer.reset();
/// reader will be created at the first read call, not to consume memory before it is needed
}
return stat;
}
bool TemporaryFileStream::isWriteFinished() const
{
assert(in_reader == nullptr || out_writer == nullptr);
return out_writer == nullptr;
}
Block TemporaryFileStream::read()
{
if (!isWriteFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
if (isFinalized())
return {};
if (!in_reader)
{
in_reader = std::make_unique<InputReader>(file->path(), header);
}
Block block = in_reader->read();
if (!block)
{
/// finalize earlier to release resources, do not wait for the destructor
this->finalize();
}
return block;
}
void TemporaryFileStream::updateAllocAndCheck()
{
assert(out_writer);
size_t new_compressed_size = out_writer->out_compressed_buf.getCompressedBytes();
size_t new_uncompressed_size = out_writer->out_compressed_buf.getUncompressedBytes();
if (unlikely(new_compressed_size < stat.compressed_size || new_uncompressed_size < stat.uncompressed_size))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Temporary file {} size decreased after write: compressed: {} -> {}, uncompressed: {} -> {}",
file->path(), new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size);
}
parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size);
stat.compressed_size = new_compressed_size;
stat.uncompressed_size = new_uncompressed_size;
}
bool TemporaryFileStream::isFinalized() const
{
return file == nullptr;
}
void TemporaryFileStream::finalize()
{
if (file)
{
file.reset();
parent->deltaAllocAndCheck(-stat.compressed_size, -stat.uncompressed_size);
}
if (in_reader)
in_reader.reset();
if (out_writer)
{
out_writer->finalize();
out_writer.reset();
}
}
TemporaryFileStream::~TemporaryFileStream()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
assert(false); /// deltaAllocAndCheck with negative can't throw exception
}
}
}

View File

@ -0,0 +1,139 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Interpreters/Context.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Disks/IVolume.h>
namespace DB
{
class TemporaryDataOnDiskScope;
using TemporaryDataOnDiskScopePtr = std::shared_ptr<TemporaryDataOnDiskScope>;
class TemporaryDataOnDisk;
using TemporaryDataOnDiskPtr = std::unique_ptr<TemporaryDataOnDisk>;
class TemporaryFileStream;
using TemporaryFileStreamPtr = std::unique_ptr<TemporaryFileStream>;
/*
* Used to account amount of temporary data written to disk.
* If limit is set, throws exception if limit is exceeded.
* Data can be nested, so parent scope accounts all data written by children.
* Scopes are: global -> per-user -> per-query -> per-purpose (sorting, aggregation, etc).
*/
class TemporaryDataOnDiskScope : boost::noncopyable
{
public:
struct StatAtomic
{
std::atomic<size_t> compressed_size;
std::atomic<size_t> uncompressed_size;
};
explicit TemporaryDataOnDiskScope(VolumePtr volume_, size_t limit_)
: volume(std::move(volume_)), limit(limit_)
{}
explicit TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, size_t limit_)
: parent(std::move(parent_)), volume(parent->volume), limit(limit_)
{}
/// TODO: remove
/// Refactor all code that uses volume directly to use TemporaryDataOnDisk.
VolumePtr getVolume() const { return volume; }
protected:
void deltaAllocAndCheck(int compressed_delta, int uncompressed_delta);
TemporaryDataOnDiskScopePtr parent = nullptr;
VolumePtr volume;
StatAtomic stat;
size_t limit = 0;
};
/*
* Holds the set of temporary files.
* New file stream is created with `createStream`.
* Streams are owned by this object and will be deleted when it is deleted.
* It's a leaf node in temorarty data scope tree.
*/
class TemporaryDataOnDisk : private TemporaryDataOnDiskScope
{
friend class TemporaryFileStream; /// to allow it to call `deltaAllocAndCheck` to account data
public:
using TemporaryDataOnDiskScope::StatAtomic;
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_)
: TemporaryDataOnDiskScope(std::move(parent_), 0)
{}
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
TemporaryFileStream & createStream(const Block & header, CurrentMetrics::Value metric_scope, size_t max_file_size = 0);
std::vector<TemporaryFileStream *> getStreams() const;
bool empty() const;
const StatAtomic & getStat() const { return stat; }
private:
mutable std::mutex mutex;
std::vector<TemporaryFileStreamPtr> streams TSA_GUARDED_BY(mutex);
};
/*
* Data can be written into this stream and then read.
* After finish writing, call `finishWriting` and then `read` to read the data.
* Account amount of data written to disk in parent scope.
*/
class TemporaryFileStream : boost::noncopyable
{
public:
struct Stat
{
/// Statistics for file
/// Non-atomic because we don't allow to `read` or `write` into single file from multiple threads
size_t compressed_size = 0;
size_t uncompressed_size = 0;
};
TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_);
void write(const Block & block);
Stat finishWriting();
bool isWriteFinished() const;
Block read();
const String & path() const { return file->getPath(); }
Block getHeader() const { return header; }
~TemporaryFileStream();
private:
void updateAllocAndCheck();
/// Finalize everything, close reader and writer, delete file
void finalize();
bool isFinalized() const;
TemporaryDataOnDisk * parent;
Block header;
TemporaryFileOnDiskHolder file;
Stat stat;
struct OutputWriter;
std::unique_ptr<OutputWriter> out_writer;
struct InputReader;
std::unique_ptr<InputReader> in_reader;
};
}

View File

@ -177,7 +177,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params->params.group_by_two_level_threshold_bytes, transform_params->params.group_by_two_level_threshold_bytes,
transform_params->params.max_bytes_before_external_group_by, transform_params->params.max_bytes_before_external_group_by,
transform_params->params.empty_result_for_aggregation_by_empty_set, transform_params->params.empty_result_for_aggregation_by_empty_set,
transform_params->params.tmp_volume, transform_params->params.tmp_data_scope,
transform_params->params.max_threads, transform_params->params.max_threads,
transform_params->params.min_free_disk_space, transform_params->params.min_free_disk_space,
transform_params->params.compile_aggregate_expressions, transform_params->params.compile_aggregate_expressions,

View File

@ -12,6 +12,11 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static ITransformingStep::Traits getTraits(size_t limit) static ITransformingStep::Traits getTraits(size_t limit)
{ {
return ITransformingStep::Traits return ITransformingStep::Traits
@ -37,7 +42,7 @@ SortingStep::SortingStep(
size_t max_bytes_before_remerge_, size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_, double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_, size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_, TemporaryDataOnDiskScopePtr tmp_data_,
size_t min_free_disk_space_, size_t min_free_disk_space_,
bool optimize_sorting_by_input_stream_properties_) bool optimize_sorting_by_input_stream_properties_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_)) : ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
@ -49,10 +54,13 @@ SortingStep::SortingStep(
, max_bytes_before_remerge(max_bytes_before_remerge_) , max_bytes_before_remerge(max_bytes_before_remerge_)
, remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_) , remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_) , max_bytes_before_external_sort(max_bytes_before_external_sort_)
, tmp_volume(tmp_volume_) , tmp_data(tmp_data_)
, min_free_disk_space(min_free_disk_space_) , min_free_disk_space(min_free_disk_space_)
, optimize_sorting_by_input_stream_properties(optimize_sorting_by_input_stream_properties_) , optimize_sorting_by_input_stream_properties(optimize_sorting_by_input_stream_properties_)
{ {
if (max_bytes_before_external_sort && tmp_data == nullptr)
throw Exception("Temporary data storage for external sorting is not provided", ErrorCodes::LOGICAL_ERROR);
/// TODO: check input_stream is partially sorted by the same description. /// TODO: check input_stream is partially sorted by the same description.
output_stream->sort_description = result_description; output_stream->sort_description = result_description;
output_stream->sort_scope = DataStream::SortScope::Global; output_stream->sort_scope = DataStream::SortScope::Global;
@ -189,7 +197,7 @@ void SortingStep::mergeSorting(QueryPipelineBuilder & pipeline, const SortDescri
max_bytes_before_remerge / pipeline.getNumStreams(), max_bytes_before_remerge / pipeline.getNumStreams(),
remerge_lowered_memory_bytes_ratio, remerge_lowered_memory_bytes_ratio,
max_bytes_before_external_sort, max_bytes_before_external_sort,
tmp_volume, std::make_unique<TemporaryDataOnDisk>(tmp_data),
min_free_disk_space); min_free_disk_space);
}); });
} }

View File

@ -2,7 +2,7 @@
#include <Processors/QueryPlan/ITransformingStep.h> #include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h> #include <Core/SortDescription.h>
#include <QueryPipeline/SizeLimits.h> #include <QueryPipeline/SizeLimits.h>
#include <Disks/IVolume.h> #include <Interpreters/TemporaryDataOnDisk.h>
namespace DB namespace DB
{ {
@ -21,7 +21,7 @@ public:
size_t max_bytes_before_remerge_, size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_, double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_, size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_, TemporaryDataOnDiskScopePtr tmp_data_,
size_t min_free_disk_space_, size_t min_free_disk_space_,
bool optimize_sorting_by_input_stream_properties_); bool optimize_sorting_by_input_stream_properties_);
@ -85,7 +85,8 @@ private:
size_t max_bytes_before_remerge = 0; size_t max_bytes_before_remerge = 0;
double remerge_lowered_memory_bytes_ratio = 0; double remerge_lowered_memory_bytes_ratio = 0;
size_t max_bytes_before_external_sort = 0; size_t max_bytes_before_external_sort = 0;
VolumePtr tmp_volume; TemporaryDataOnDiskScopePtr tmp_data = nullptr;
size_t min_free_disk_space = 0; size_t min_free_disk_space = 0;
const bool optimize_sorting_by_input_stream_properties = false; const bool optimize_sorting_by_input_stream_properties = false;
}; };

View File

@ -1,5 +1,5 @@
#include <Processors/Sources/TemporaryFileLazySource.h> #include <Processors/Sources/TemporaryFileLazySource.h>
#include <Formats/TemporaryFileStream.h> #include <Formats/TemporaryFileStreamLegacy.h>
namespace DB namespace DB
{ {
@ -18,7 +18,7 @@ Chunk TemporaryFileLazySource::generate()
return {}; return {};
if (!stream) if (!stream)
stream = std::make_unique<TemporaryFileStream>(path, header); stream = std::make_unique<TemporaryFileStreamLegacy>(path, header);
auto block = stream->block_in->read(); auto block = stream->block_in->read();
if (!block) if (!block)

View File

@ -5,7 +5,7 @@
namespace DB namespace DB
{ {
struct TemporaryFileStream; struct TemporaryFileStreamLegacy;
class TemporaryFileLazySource : public ISource class TemporaryFileLazySource : public ISource
{ {
@ -22,7 +22,7 @@ private:
Block header; Block header;
bool done; bool done;
std::unique_ptr<TemporaryFileStream> stream; std::unique_ptr<TemporaryFileStreamLegacy> stream;
}; };
} }

View File

@ -34,7 +34,7 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
0, 0,
settings.max_bytes_before_external_group_by, settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set, settings.empty_result_for_aggregation_by_empty_set,
storage_.getContext()->getTemporaryVolume(), storage_.getContext()->getTempDataOnDisk(),
settings.max_threads, settings.max_threads,
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions, settings.compile_aggregate_expressions,

View File

@ -53,33 +53,29 @@ namespace
class SourceFromNativeStream : public ISource class SourceFromNativeStream : public ISource
{ {
public: public:
SourceFromNativeStream(const Block & header, const std::string & path) explicit SourceFromNativeStream(TemporaryFileStream * tmp_stream_)
: ISource(header), file_in(path), compressed_in(file_in), : ISource(tmp_stream_->getHeader())
block_in(std::make_unique<NativeReader>(compressed_in, DBMS_TCP_PROTOCOL_VERSION)) , tmp_stream(tmp_stream_)
{ {}
}
String getName() const override { return "SourceFromNativeStream"; } String getName() const override { return "SourceFromNativeStream"; }
Chunk generate() override Chunk generate() override
{ {
if (!block_in) if (!tmp_stream)
return {}; return {};
auto block = block_in->read(); auto block = tmp_stream->read();
if (!block) if (!block)
{ {
block_in.reset(); tmp_stream = nullptr;
return {}; return {};
} }
return convertToChunk(block); return convertToChunk(block);
} }
private: private:
ReadBufferFromFile file_in; TemporaryFileStream * tmp_stream;
CompressedReadBuffer compressed_in;
std::unique_ptr<NativeReader> block_in;
}; };
} }
@ -564,7 +560,7 @@ void AggregatingTransform::initGenerate()
elapsed_seconds, src_rows / elapsed_seconds, elapsed_seconds, src_rows / elapsed_seconds,
ReadableSize(src_bytes / elapsed_seconds)); ReadableSize(src_bytes / elapsed_seconds));
if (params->aggregator.hasTemporaryFiles()) if (params->aggregator.hasTemporaryData())
{ {
if (variants.isConvertibleToTwoLevel()) if (variants.isConvertibleToTwoLevel())
variants.convertToTwoLevel(); variants.convertToTwoLevel();
@ -577,7 +573,7 @@ void AggregatingTransform::initGenerate()
if (many_data->num_finished.fetch_add(1) + 1 < many_data->variants.size()) if (many_data->num_finished.fetch_add(1) + 1 < many_data->variants.size())
return; return;
if (!params->aggregator.hasTemporaryFiles()) if (!params->aggregator.hasTemporaryData())
{ {
auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants); auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
auto prepared_data_ptr = std::make_shared<ManyAggregatedDataVariants>(std::move(prepared_data)); auto prepared_data_ptr = std::make_shared<ManyAggregatedDataVariants>(std::move(prepared_data));
@ -604,25 +600,27 @@ void AggregatingTransform::initGenerate()
} }
} }
const auto & files = params->aggregator.getTemporaryFiles(); const auto & tmp_data = params->aggregator.getTemporaryData();
Pipe pipe;
Pipe pipe;
{ {
auto header = params->aggregator.getHeader(false);
Pipes pipes; Pipes pipes;
for (const auto & file : files.files) for (auto * tmp_stream : tmp_data.getStreams())
pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(header, file->path()))); pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(tmp_stream)));
pipe = Pipe::unitePipes(std::move(pipes)); pipe = Pipe::unitePipes(std::move(pipes));
} }
size_t num_streams = tmp_data.getStreams().size();
size_t compressed_size = tmp_data.getStat().compressed_size;
size_t uncompressed_size = tmp_data.getStat().uncompressed_size;
LOG_DEBUG( LOG_DEBUG(
log, log,
"Will merge {} temporary files of size {} compressed, {} uncompressed.", "Will merge {} temporary files of size {} compressed, {} uncompressed.",
files.files.size(), num_streams,
ReadableSize(files.sum_size_compressed), ReadableSize(compressed_size),
ReadableSize(files.sum_size_uncompressed)); ReadableSize(uncompressed_size));
addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads); addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads);

View File

@ -30,21 +30,15 @@ namespace CurrentMetrics
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
class BufferingToFileTransform : public IAccumulatingTransform class BufferingToFileTransform : public IAccumulatingTransform
{ {
public: public:
BufferingToFileTransform(const Block & header, Poco::Logger * log_, std::string path_) BufferingToFileTransform(const Block & header, TemporaryFileStream & tmp_stream_, Poco::Logger * log_)
: IAccumulatingTransform(header, header), log(log_) : IAccumulatingTransform(header, header)
, path(std::move(path_)), file_buf_out(path), compressed_buf_out(file_buf_out) , tmp_stream(tmp_stream_)
, out_stream(std::make_unique<NativeWriter>(compressed_buf_out, 0, header)) , log(log_)
{ {
LOG_INFO(log, "Sorting and writing part of data into temporary file {}", path); LOG_INFO(log, "Sorting and writing part of data into temporary file {}", tmp_stream.path());
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart); ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
} }
@ -52,71 +46,37 @@ public:
void consume(Chunk chunk) override void consume(Chunk chunk) override
{ {
out_stream->write(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); Block block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
tmp_stream.write(block);
} }
Chunk generate() override Chunk generate() override
{ {
if (out_stream) if (!tmp_stream.isWriteFinished())
{ {
out_stream->flush(); auto stat = tmp_stream.finishWriting();
compressed_buf_out.next();
file_buf_out.next();
auto stat = updateWriteStat(); ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, stat.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, stat.uncompressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortCompressedBytes, stat.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, stat.uncompressed_size);
LOG_INFO(log, "Done writing part of data into temporary file {}, compressed {}, uncompressed {} ", LOG_INFO(log, "Done writing part of data into temporary file {}, compressed {}, uncompressed {} ",
path, ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size))); tmp_stream.path(), ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size)));
out_stream.reset();
file_in = std::make_unique<ReadBufferFromFile>(path);
compressed_in = std::make_unique<CompressedReadBuffer>(*file_in);
block_in = std::make_unique<NativeReader>(*compressed_in, getOutputPort().getHeader(), 0);
} }
if (!block_in) Block block = tmp_stream.read();
return {};
auto block = block_in->read();
if (!block) if (!block)
{
block_in.reset();
return {}; return {};
}
UInt64 num_rows = block.rows(); UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows); return Chunk(block.getColumns(), num_rows);
} }
private: private:
struct Stat TemporaryFileStream & tmp_stream;
{
size_t compressed_size = 0;
size_t uncompressed_size = 0;
};
Stat updateWriteStat()
{
Stat res{compressed_buf_out.getCompressedBytes(), compressed_buf_out.getUncompressedBytes()};
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, res.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, res.uncompressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortCompressedBytes, res.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, res.uncompressed_size);
return res;
}
Poco::Logger * log; Poco::Logger * log;
std::string path;
WriteBufferFromFile file_buf_out;
CompressedWriteBuffer compressed_buf_out;
std::unique_ptr<NativeWriter> out_stream;
std::unique_ptr<ReadBufferFromFile> file_in;
std::unique_ptr<CompressedReadBuffer> compressed_in;
std::unique_ptr<NativeReader> block_in;
}; };
MergeSortingTransform::MergeSortingTransform( MergeSortingTransform::MergeSortingTransform(
@ -128,13 +88,13 @@ MergeSortingTransform::MergeSortingTransform(
size_t max_bytes_before_remerge_, size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_, double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_, size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_, TemporaryDataOnDiskPtr tmp_data_,
size_t min_free_disk_space_) size_t min_free_disk_space_)
: SortingTransform(header, description_, max_merged_block_size_, limit_, increase_sort_description_compile_attempts) : SortingTransform(header, description_, max_merged_block_size_, limit_, increase_sort_description_compile_attempts)
, max_bytes_before_remerge(max_bytes_before_remerge_) , max_bytes_before_remerge(max_bytes_before_remerge_)
, remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_) , remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_) , max_bytes_before_external_sort(max_bytes_before_external_sort_)
, tmp_volume(tmp_volume_) , tmp_data(std::move(tmp_data_))
, min_free_disk_space(min_free_disk_space_) , min_free_disk_space(min_free_disk_space_)
{ {
} }
@ -209,17 +169,12 @@ void MergeSortingTransform::consume(Chunk chunk)
*/ */
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{ {
size_t size = sum_bytes_in_blocks + min_free_disk_space; /// If there's less free disk space than reserve_size, an exception will be thrown
auto reservation = tmp_volume->reserve(size); size_t reserve_size = sum_bytes_in_blocks + min_free_disk_space;
if (!reservation) auto & tmp_stream = tmp_data->createStream(header_without_constants, CurrentMetrics::TemporaryFilesForSort, reserve_size);
throw Exception("Not enough space for external sort in temporary storage", ErrorCodes::NOT_ENOUGH_SPACE);
temporary_files.emplace_back(std::make_unique<TemporaryFileOnDisk>(reservation->getDisk(), CurrentMetrics::TemporaryFilesForSort)); merge_sorter = std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, tmp_stream, log);
const std::string & path = temporary_files.back()->path();
merge_sorter
= std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, log, path);
processors.emplace_back(current_processor); processors.emplace_back(current_processor);
@ -261,13 +216,14 @@ void MergeSortingTransform::generate()
{ {
if (!generated_prefix) if (!generated_prefix)
{ {
if (temporary_files.empty()) size_t num_tmp_files = tmp_data ? tmp_data->getStreams().size() : 0;
if (num_tmp_files == 0)
merge_sorter merge_sorter
= std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit); = std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
else else
{ {
ProfileEvents::increment(ProfileEvents::ExternalSortMerge); ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
LOG_INFO(log, "There are {} temporary sorted parts to merge", temporary_files.size()); LOG_INFO(log, "There are {} temporary sorted parts to merge", num_tmp_files);
processors.emplace_back(std::make_shared<MergeSorterSource>( processors.emplace_back(std::make_shared<MergeSorterSource>(
header_without_constants, std::move(chunks), description, max_merged_block_size, limit)); header_without_constants, std::move(chunks), description, max_merged_block_size, limit));

View File

@ -4,6 +4,7 @@
#include <Core/SortDescription.h> #include <Core/SortDescription.h>
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <Disks/TemporaryFileOnDisk.h> #include <Disks/TemporaryFileOnDisk.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -28,7 +29,7 @@ public:
size_t max_bytes_before_remerge_, size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_, double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_, size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_, TemporaryDataOnDiskPtr tmp_data_,
size_t min_free_disk_space_); size_t min_free_disk_space_);
String getName() const override { return "MergeSortingTransform"; } String getName() const override { return "MergeSortingTransform"; }
@ -44,7 +45,7 @@ private:
size_t max_bytes_before_remerge; size_t max_bytes_before_remerge;
double remerge_lowered_memory_bytes_ratio; double remerge_lowered_memory_bytes_ratio;
size_t max_bytes_before_external_sort; size_t max_bytes_before_external_sort;
VolumePtr tmp_volume; TemporaryDataOnDiskPtr tmp_data;
size_t min_free_disk_space; size_t min_free_disk_space;
size_t sum_rows_in_blocks = 0; size_t sum_rows_in_blocks = 0;
@ -55,9 +56,6 @@ private:
/// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore. /// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore.
bool remerge_is_useful = true; bool remerge_is_useful = true;
/// Everything below is for external sorting.
std::vector<TemporaryFileOnDiskHolder> temporary_files;
/// Merge all accumulated blocks to keep no more than limit rows. /// Merge all accumulated blocks to keep no more than limit rows.
void remerge(); void remerge();

View File

@ -310,7 +310,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.group_by_two_level_threshold_bytes, settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by, settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set, settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(), context->getTempDataOnDisk(),
settings.max_threads, settings.max_threads,
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions, settings.compile_aggregate_expressions,

View File

@ -99,6 +99,7 @@ def get_run_commands(
return [ return [
f"readelf -s {build_path}/usr/bin/clickhouse | grep '@GLIBC_' > {result_folder}/glibc.log", f"readelf -s {build_path}/usr/bin/clickhouse | grep '@GLIBC_' > {result_folder}/glibc.log",
f"readelf -s {build_path}/usr/bin/clickhouse-odbc-bridge | grep '@GLIBC_' >> {result_folder}/glibc.log", f"readelf -s {build_path}/usr/bin/clickhouse-odbc-bridge | grep '@GLIBC_' >> {result_folder}/glibc.log",
f"readelf -s {build_path}/usr/bin/clickhouse-library-bridge | grep '@GLIBC_' >> {result_folder}/glibc.log",
f"docker run --network=host --volume={build_path}/usr/bin/clickhouse:/clickhouse " f"docker run --network=host --volume={build_path}/usr/bin/clickhouse:/clickhouse "
f"--volume={build_path}/etc/clickhouse-server:/config " f"--volume={build_path}/etc/clickhouse-server:/config "
f"--volume={server_log_folder}:/var/log/clickhouse-server {image_ubuntu} > {result_folder}/ubuntu:12.04", f"--volume={server_log_folder}:/var/log/clickhouse-server {image_ubuntu} > {result_folder}/ubuntu:12.04",

View File

@ -196,6 +196,7 @@ def test_update_metadata(start_cluster):
node1.query("ALTER TABLE update_metadata MODIFY COLUMN col1 String") node1.query("ALTER TABLE update_metadata MODIFY COLUMN col1 String")
node1.query("ALTER TABLE update_metadata ADD COLUMN col2 INT") node1.query("ALTER TABLE update_metadata ADD COLUMN col2 INT")
node3.query("SYSTEM SYNC REPLICA update_metadata")
for i in range(1, 11): for i in range(1, 11):
node3.query( node3.query(
"INSERT INTO update_metadata VALUES ({}, '{}', {})".format( "INSERT INTO update_metadata VALUES ({}, '{}', {})".format(

View File

@ -30,7 +30,7 @@ SELECT intDiv(number, 2) AS k, count(), argMax(toString(number), number) FROM (S
SELECT '*** External aggregation.'; SELECT '*** External aggregation.';
SET max_bytes_before_external_group_by=1000000; SET max_bytes_before_external_group_by = 1000000;
SET group_by_two_level_threshold = 100000; SET group_by_two_level_threshold = 100000;
SELECT '**** totals_mode = after_having_auto'; SELECT '**** totals_mode = after_having_auto';