diff --git a/base/base/scope_guard_safe.h b/base/base/scope_guard_safe.h index 73c85b65b57..98521dd17c1 100644 --- a/base/base/scope_guard_safe.h +++ b/base/base/scope_guard_safe.h @@ -2,7 +2,7 @@ #include #include -#include +#include /// Same as SCOPE_EXIT() but block the MEMORY_LIMIT_EXCEEDED errors. /// @@ -12,8 +12,7 @@ /// /// NOTE: it should be used with caution. #define SCOPE_EXIT_MEMORY(...) SCOPE_EXIT( \ - MemoryTracker::LockExceptionInThread \ - lock_memory_tracker(VariableContext::Global); \ + LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); \ __VA_ARGS__; \ ) @@ -57,8 +56,7 @@ #define SCOPE_EXIT_MEMORY_SAFE(...) SCOPE_EXIT( \ try \ { \ - MemoryTracker::LockExceptionInThread \ - lock_memory_tracker(VariableContext::Global); \ + LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); \ __VA_ARGS__; \ } \ catch (...) \ diff --git a/base/daemon/BaseDaemon.cpp b/base/daemon/BaseDaemon.cpp index b92a68f104e..f3026d7c87a 100644 --- a/base/daemon/BaseDaemon.cpp +++ b/base/daemon/BaseDaemon.cpp @@ -51,6 +51,7 @@ #include #include #include +#include #include #include diff --git a/base/loggers/OwnSplitChannel.cpp b/base/loggers/OwnSplitChannel.cpp index 2ae1e65729c..2267b8f425d 100644 --- a/base/loggers/OwnSplitChannel.cpp +++ b/base/loggers/OwnSplitChannel.cpp @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include #include #include @@ -57,7 +59,7 @@ void OwnSplitChannel::tryLogSplit(const Poco::Message & msg) /// but let's log it into the stderr at least. catch (...) { - MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global); + LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); const std::string & exception_message = getCurrentExceptionMessage(true); const std::string & message = msg.getText(); diff --git a/debian/.pbuilderrc b/debian/.pbuilderrc index 9449be7c7d4..485906f6198 100644 --- a/debian/.pbuilderrc +++ b/debian/.pbuilderrc @@ -104,8 +104,7 @@ ALLOWUNTRUSTED=${SET_ALLOWUNTRUSTED:=${ALLOWUNTRUSTED}} if $(echo ${DEBIAN_SUITES[@]} | grep -q $DIST); then # Debian configuration OSNAME=debian - #MIRRORSITE=${SET_MIRRORSITE="http://deb.debian.org/$OSNAME/"} - MIRRORSITE=${SET_MIRRORSITE="http://mirror.yandex.ru/$OSNAME/"} + MIRRORSITE=${SET_MIRRORSITE="http://deb.debian.org/$OSNAME/"} COMPONENTS="main contrib non-free" if $(echo "$STABLE_CODENAME stable" | grep -q $DIST); then OTHERMIRROR="$OTHERMIRROR | deb $MIRRORSITE $STABLE_BACKPORTS_SUITE $COMPONENTS" @@ -125,8 +124,7 @@ elif $(echo ${UBUNTU_SUITES[@]} | grep -q $DIST); then OSNAME=ubuntu if [[ "$ARCH" == "amd64" || "$ARCH" == "i386" ]]; then - #MIRRORSITE=${SET_MIRRORSITE="http://archive.ubuntu.com/$OSNAME/"} - MIRRORSITE=${SET_MIRRORSITE="http://mirror.yandex.ru/$OSNAME/"} + MIRRORSITE=${SET_MIRRORSITE="http://archive.ubuntu.com/$OSNAME/"} else MIRRORSITE=${SET_MIRRORSITE="http://ports.ubuntu.com/ubuntu-ports/"} fi diff --git a/debian/clickhouse-server.init b/debian/clickhouse-server.init index 1dd87fe80ae..1695f6286b8 100755 --- a/debian/clickhouse-server.init +++ b/debian/clickhouse-server.init @@ -5,7 +5,7 @@ # Default-Stop: 0 1 6 # Should-Start: $time $network # Should-Stop: $network -# Short-Description: Yandex clickhouse-server daemon +# Short-Description: clickhouse-server daemon ### END INIT INFO # # NOTES: diff --git a/debian/control b/debian/control index ac75b00df22..f22980fdbc4 100644 --- a/debian/control +++ b/debian/control @@ -18,7 +18,7 @@ Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-common-static (= ${binar Replaces: clickhouse-compressor Conflicts: clickhouse-compressor Description: Client binary for ClickHouse - Yandex ClickHouse is a column-oriented database management system + ClickHouse is a column-oriented database management system that allows generating analytical data reports in real time. . This package provides clickhouse-client , clickhouse-local and clickhouse-benchmark @@ -30,7 +30,7 @@ Suggests: clickhouse-common-static-dbg Replaces: clickhouse-common, clickhouse-server-base Provides: clickhouse-common, clickhouse-server-base Description: Common files for ClickHouse - Yandex ClickHouse is a column-oriented database management system + ClickHouse is a column-oriented database management system that allows generating analytical data reports in real time. . This package provides common files for both clickhouse server and client @@ -42,7 +42,7 @@ Recommends: libcap2-bin Replaces: clickhouse-server-common, clickhouse-server-base Provides: clickhouse-server-common Description: Server binary for ClickHouse - Yandex ClickHouse is a column-oriented database management system + ClickHouse is a column-oriented database management system that allows generating analytical data reports in real time. . This package provides clickhouse common configuration files diff --git a/docs/en/sql-reference/statements/use.md b/docs/en/sql-reference/statements/use.md index 841c23d333d..41cba58bb9d 100644 --- a/docs/en/sql-reference/statements/use.md +++ b/docs/en/sql-reference/statements/use.md @@ -3,14 +3,14 @@ toc_priority: 53 toc_title: USE --- -# USE Statement {#use} +# USE 语句 {#use} ``` sql USE db ``` -Lets you set the current database for the session. +用于设置会话的当前数据库。 -The current database is used for searching for tables if the database is not explicitly defined in the query with a dot before the table name. +如果查询语句中没有在表名前面以加点的方式指明数据库名, 则用当前数据库进行搜索。 -This query can’t be made when using the HTTP protocol, since there is no concept of a session. +使用 HTTP 协议时无法进行此查询,因为没有会话的概念。 diff --git a/docs/en/whats-new/changelog/2021.md b/docs/en/whats-new/changelog/2021.md index 1c3ceca08a7..2e81d981990 100644 --- a/docs/en/whats-new/changelog/2021.md +++ b/docs/en/whats-new/changelog/2021.md @@ -1018,6 +1018,9 @@ toc_title: '2021' ### ClickHouse release 21.6, 2021-06-05 +#### Backward Incompatible Change +* uniqState / uniqHLL12State / uniqCombinedState / uniqCombined64State produce incompatible states with `UUID` type. [#33607](https://github.com/ClickHouse/ClickHouse/issues/33607). + #### Upgrade Notes * `zstd` compression library is updated to v1.5.0. You may get messages about "checksum does not match" in replication. These messages are expected due to update of compression algorithm and you can ignore them. These messages are informational and do not indicate any kinds of undesired behaviour. diff --git a/docs/zh/sql-reference/statements/index.md b/docs/zh/sql-reference/statements/index.md index ab080584c66..385639fde0b 100644 --- a/docs/zh/sql-reference/statements/index.md +++ b/docs/zh/sql-reference/statements/index.md @@ -1,8 +1,32 @@ --- -machine_translated: true -machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd -toc_folder_title: "\u8BED\u53E5" +toc_folder_title: SQL 语句 +toc_hidden: true toc_priority: 31 --- +# ClickHouse SQL 语句 {#clickhouse-sql-statements} +语句表示可以使用 SQL 查询执行的各种操作。每种类型的语句都有自己的语法和用法详细信息,这些语法和用法详细信息单独描述如下所示: + +- [SELECT](../../sql-reference/statements/select/index.md) +- [INSERT INTO](../../sql-reference/statements/insert-into.md) +- [CREATE](../../sql-reference/statements/create/index.md) +- [ALTER](../../sql-reference/statements/alter/index.md) +- [SYSTEM](../../sql-reference/statements/system.md) +- [SHOW](../../sql-reference/statements/show.md) +- [GRANT](../../sql-reference/statements/grant.md) +- [REVOKE](../../sql-reference/statements/revoke.md) +- [ATTACH](../../sql-reference/statements/attach.md) +- [CHECK TABLE](../../sql-reference/statements/check-table.md) +- [DESCRIBE TABLE](../../sql-reference/statements/describe-table.md) +- [DETACH](../../sql-reference/statements/detach.md) +- [DROP](../../sql-reference/statements/drop.md) +- [EXISTS](../../sql-reference/statements/exists.md) +- [KILL](../../sql-reference/statements/kill.md) +- [OPTIMIZE](../../sql-reference/statements/optimize.md) +- [RENAME](../../sql-reference/statements/rename.md) +- [SET](../../sql-reference/statements/set.md) +- [SET ROLE](../../sql-reference/statements/set-role.md) +- [TRUNCATE](../../sql-reference/statements/truncate.md) +- [USE](../../sql-reference/statements/use.md) +- [EXPLAIN](../../sql-reference/statements/explain.md) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index aa4747636c9..a294857ace8 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 62767e02a64..c06635cbaa9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -584,7 +584,6 @@ if (ENABLE_TESTS AND USE_GTEST) dbms clickhouse_common_config clickhouse_common_zookeeper - clickhouse_common_config string_utils) add_check(unit_tests_dbms) diff --git a/src/Common/Config/CMakeLists.txt b/src/Common/Config/CMakeLists.txt index 4d72960f727..cc41a8b2bb2 100644 --- a/src/Common/Config/CMakeLists.txt +++ b/src/Common/Config/CMakeLists.txt @@ -8,7 +8,6 @@ set (SRCS ) add_library(clickhouse_common_config ${SRCS}) - target_link_libraries(clickhouse_common_config PUBLIC clickhouse_common_zookeeper @@ -18,9 +17,17 @@ target_link_libraries(clickhouse_common_config string_utils ) -if (USE_YAML_CPP) -target_link_libraries(clickhouse_common_config +add_library(clickhouse_common_config_no_zookeeper_log ${SRCS}) +target_link_libraries(clickhouse_common_config_no_zookeeper_log + PUBLIC + clickhouse_common_zookeeper_no_log + common + Poco::XML PRIVATE - yaml-cpp + string_utils ) + +if (USE_YAML_CPP) + target_link_libraries(clickhouse_common_config PRIVATE yaml-cpp) + target_link_libraries(clickhouse_common_config_no_zookeeper_log PRIVATE yaml-cpp) endif() diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index cdf5816237c..b2987cbd73f 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -175,7 +176,7 @@ void tryLogCurrentException(const char * log_name, const std::string & start_of_ /// /// And in this case the exception will not be logged, so let's block the /// MemoryTracker until the exception will be logged. - MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global); + LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); /// Poco::Logger::get can allocate memory too tryLogCurrentExceptionImpl(&Poco::Logger::get(log_name), start_of_message); @@ -188,7 +189,7 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_ /// /// And in this case the exception will not be logged, so let's block the /// MemoryTracker until the exception will be logged. - MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global); + LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); tryLogCurrentExceptionImpl(logger, start_of_message); } diff --git a/src/Common/LockMemoryExceptionInThread.cpp b/src/Common/LockMemoryExceptionInThread.cpp new file mode 100644 index 00000000000..606f02abcb0 --- /dev/null +++ b/src/Common/LockMemoryExceptionInThread.cpp @@ -0,0 +1,20 @@ +#include + +/// LockMemoryExceptionInThread +thread_local uint64_t LockMemoryExceptionInThread::counter = 0; +thread_local VariableContext LockMemoryExceptionInThread::level = VariableContext::Global; +thread_local bool LockMemoryExceptionInThread::block_fault_injections = false; +LockMemoryExceptionInThread::LockMemoryExceptionInThread(VariableContext level_, bool block_fault_injections_) + : previous_level(level) + , previous_block_fault_injections(block_fault_injections) +{ + ++counter; + level = level_; + block_fault_injections = block_fault_injections_; +} +LockMemoryExceptionInThread::~LockMemoryExceptionInThread() +{ + --counter; + level = previous_level; + block_fault_injections = previous_block_fault_injections; +} diff --git a/src/Common/LockMemoryExceptionInThread.h b/src/Common/LockMemoryExceptionInThread.h new file mode 100644 index 00000000000..dc2bccf257b --- /dev/null +++ b/src/Common/LockMemoryExceptionInThread.h @@ -0,0 +1,39 @@ +#pragma once + +#include + +/// To be able to avoid MEMORY_LIMIT_EXCEEDED Exception in destructors: +/// - either configured memory limit reached +/// - or fault injected +/// +/// So this will simply ignore the configured memory limit (and avoid fault injection). +/// +/// NOTE: exception will be silently ignored, no message in log +/// (since logging from MemoryTracker::alloc() is tricky) +/// +/// NOTE: MEMORY_LIMIT_EXCEEDED Exception implicitly blocked if +/// stack unwinding is currently in progress in this thread (to avoid +/// std::terminate()), so you don't need to use it in this case explicitly. +struct LockMemoryExceptionInThread +{ +private: + static thread_local uint64_t counter; + static thread_local VariableContext level; + static thread_local bool block_fault_injections; + + VariableContext previous_level; + bool previous_block_fault_injections; +public: + /// level_ - block in level and above + /// block_fault_injections_ - block in fault injection too + explicit LockMemoryExceptionInThread(VariableContext level_ = VariableContext::User, bool block_fault_injections_ = true); + ~LockMemoryExceptionInThread(); + + LockMemoryExceptionInThread(const LockMemoryExceptionInThread &) = delete; + LockMemoryExceptionInThread & operator=(const LockMemoryExceptionInThread &) = delete; + + static bool isBlocked(VariableContext current_level, bool fault_injection) + { + return counter > 0 && current_level >= level && (!fault_injection || block_fault_injections); + } +}; diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 013005442be..ba98ede221a 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -1,12 +1,14 @@ #include "MemoryTracker.h" #include -#include "Common/TraceCollector.h" +#include #include +#include +#include #include -#include #include #include +#include #include #include @@ -34,7 +36,7 @@ namespace /// noexcept(false)) will cause std::terminate() bool inline memoryTrackerCanThrow(VariableContext level, bool fault_injection) { - return !MemoryTracker::LockExceptionInThread::isBlocked(level, fault_injection) && !std::uncaught_exceptions(); + return !LockMemoryExceptionInThread::isBlocked(level, fault_injection) && !std::uncaught_exceptions(); } } @@ -55,41 +57,6 @@ namespace ProfileEvents static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; -// BlockerInThread -thread_local uint64_t MemoryTracker::BlockerInThread::counter = 0; -thread_local VariableContext MemoryTracker::BlockerInThread::level = VariableContext::Global; -MemoryTracker::BlockerInThread::BlockerInThread(VariableContext level_) - : previous_level(level) -{ - ++counter; - level = level_; -} -MemoryTracker::BlockerInThread::~BlockerInThread() -{ - --counter; - level = previous_level; -} - -/// LockExceptionInThread -thread_local uint64_t MemoryTracker::LockExceptionInThread::counter = 0; -thread_local VariableContext MemoryTracker::LockExceptionInThread::level = VariableContext::Global; -thread_local bool MemoryTracker::LockExceptionInThread::block_fault_injections = false; -MemoryTracker::LockExceptionInThread::LockExceptionInThread(VariableContext level_, bool block_fault_injections_) - : previous_level(level) - , previous_block_fault_injections(block_fault_injections) -{ - ++counter; - level = level_; - block_fault_injections = block_fault_injections_; -} -MemoryTracker::LockExceptionInThread::~LockExceptionInThread() -{ - --counter; - level = previous_level; - block_fault_injections = previous_block_fault_injections; -} - - MemoryTracker total_memory_tracker(nullptr, VariableContext::Global); @@ -133,9 +100,9 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) if (size < 0) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size); - if (BlockerInThread::isBlocked(level)) + if (MemoryTrackerBlockerInThread::isBlocked(level)) { - /// Since the BlockerInThread should respect the level, we should go to the next parent. + /// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent. if (auto * loaded_next = parent.load(std::memory_order_relaxed)) loaded_next->allocImpl(size, throw_if_memory_exceeded); return; @@ -184,7 +151,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true) && throw_if_memory_exceeded) { /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc - BlockerInThread untrack_lock(VariableContext::Global); + MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global); ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded); const auto * description = description_ptr.load(std::memory_order_relaxed); @@ -203,7 +170,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) bool allocation_traced = false; if (unlikely(current_profiler_limit && will_be > current_profiler_limit)) { - BlockerInThread untrack_lock(VariableContext::Global); + MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global); DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size); setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step); allocation_traced = true; @@ -212,7 +179,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) std::bernoulli_distribution sample(sample_probability); if (unlikely(sample_probability && sample(thread_local_rng))) { - BlockerInThread untrack_lock(VariableContext::Global); + MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global); DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size); allocation_traced = true; } @@ -220,7 +187,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded) { /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc - BlockerInThread untrack_lock(VariableContext::Global); + MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global); ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded); const auto * description = description_ptr.load(std::memory_order_relaxed); throw DB::Exception( @@ -237,7 +204,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) if (throw_if_memory_exceeded) { /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc - BlockerInThread untrack_lock(VariableContext::Global); + MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global); bool log_memory_usage = true; peak_updated = updatePeak(will_be, log_memory_usage); } @@ -249,7 +216,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded) if (peak_updated && allocation_traced) { - BlockerInThread untrack_lock(VariableContext::Global); + MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global); DB::TraceCollector::collect(DB::TraceType::MemoryPeak, StackTrace(), will_be); } @@ -288,9 +255,9 @@ bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage) void MemoryTracker::free(Int64 size) { - if (BlockerInThread::isBlocked(level)) + if (MemoryTrackerBlockerInThread::isBlocked(level)) { - /// Since the BlockerInThread should respect the level, we should go to the next parent. + /// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent. if (auto * loaded_next = parent.load(std::memory_order_relaxed)) loaded_next->free(size); return; @@ -299,7 +266,7 @@ void MemoryTracker::free(Int64 size) std::bernoulli_distribution sample(sample_probability); if (unlikely(sample_probability && sample(thread_local_rng))) { - BlockerInThread untrack_lock(VariableContext::Global); + MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global); DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), -size); } diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index ce0eef52e17..a0138b25b5f 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -31,6 +31,9 @@ extern thread_local bool memory_tracker_always_throw_logical_error_on_allocation /** Tracks memory consumption. * It throws an exception if amount of consumed memory become greater than certain limit. * The same memory tracker could be simultaneously used in different threads. + * + * @see LockMemoryExceptionInThread + * @see MemoryTrackerBlockerInThread */ class MemoryTracker { @@ -167,64 +170,6 @@ public: /// Prints info about peak memory consumption into log. void logPeakMemoryUsage() const; - - /// To be able to temporarily stop memory tracking from current thread. - struct BlockerInThread - { - private: - static thread_local uint64_t counter; - static thread_local VariableContext level; - - VariableContext previous_level; - public: - /// level_ - block in level and above - explicit BlockerInThread(VariableContext level_ = VariableContext::User); - ~BlockerInThread(); - - BlockerInThread(const BlockerInThread &) = delete; - BlockerInThread & operator=(const BlockerInThread &) = delete; - - static bool isBlocked(VariableContext current_level) - { - return counter > 0 && current_level >= level; - } - }; - - /// To be able to avoid MEMORY_LIMIT_EXCEEDED Exception in destructors: - /// - either configured memory limit reached - /// - or fault injected - /// - /// So this will simply ignore the configured memory limit (and avoid fault injection). - /// - /// NOTE: exception will be silently ignored, no message in log - /// (since logging from MemoryTracker::alloc() is tricky) - /// - /// NOTE: MEMORY_LIMIT_EXCEEDED Exception implicitly blocked if - /// stack unwinding is currently in progress in this thread (to avoid - /// std::terminate()), so you don't need to use it in this case explicitly. - struct LockExceptionInThread - { - private: - static thread_local uint64_t counter; - static thread_local VariableContext level; - static thread_local bool block_fault_injections; - - VariableContext previous_level; - bool previous_block_fault_injections; - public: - /// level_ - block in level and above - /// block_fault_injections_ - block in fault injection too - explicit LockExceptionInThread(VariableContext level_ = VariableContext::User, bool block_fault_injections_ = true); - ~LockExceptionInThread(); - - LockExceptionInThread(const LockExceptionInThread &) = delete; - LockExceptionInThread & operator=(const LockExceptionInThread &) = delete; - - static bool isBlocked(VariableContext current_level, bool fault_injection) - { - return counter > 0 && current_level >= level && (!fault_injection || block_fault_injections); - } - }; }; extern MemoryTracker total_memory_tracker; diff --git a/src/Common/MemoryTrackerBlockerInThread.cpp b/src/Common/MemoryTrackerBlockerInThread.cpp new file mode 100644 index 00000000000..8eb119b2fe5 --- /dev/null +++ b/src/Common/MemoryTrackerBlockerInThread.cpp @@ -0,0 +1,16 @@ +#include + +// MemoryTrackerBlockerInThread +thread_local uint64_t MemoryTrackerBlockerInThread::counter = 0; +thread_local VariableContext MemoryTrackerBlockerInThread::level = VariableContext::Global; +MemoryTrackerBlockerInThread::MemoryTrackerBlockerInThread(VariableContext level_) + : previous_level(level) +{ + ++counter; + level = level_; +} +MemoryTrackerBlockerInThread::~MemoryTrackerBlockerInThread() +{ + --counter; + level = previous_level; +} diff --git a/src/Common/MemoryTrackerBlockerInThread.h b/src/Common/MemoryTrackerBlockerInThread.h new file mode 100644 index 00000000000..caad28f636e --- /dev/null +++ b/src/Common/MemoryTrackerBlockerInThread.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +/// To be able to temporarily stop memory tracking from current thread. +struct MemoryTrackerBlockerInThread +{ +private: + static thread_local uint64_t counter; + static thread_local VariableContext level; + + VariableContext previous_level; +public: + /// level_ - block in level and above + explicit MemoryTrackerBlockerInThread(VariableContext level_ = VariableContext::User); + ~MemoryTrackerBlockerInThread(); + + MemoryTrackerBlockerInThread(const MemoryTrackerBlockerInThread &) = delete; + MemoryTrackerBlockerInThread & operator=(const MemoryTrackerBlockerInThread &) = delete; + + static bool isBlocked(VariableContext current_level) + { + return counter > 0 && current_level >= level; + } +}; diff --git a/src/Common/QueryProfiler.cpp b/src/Common/QueryProfiler.cpp index 0b2cd602b38..9718d15c072 100644 --- a/src/Common/QueryProfiler.cpp +++ b/src/Common/QueryProfiler.cpp @@ -1,9 +1,9 @@ #include "QueryProfiler.h" #include +#include #include #include -#include #include #include #include diff --git a/src/Common/ThreadStatus.cpp b/src/Common/ThreadStatus.cpp index 411f725f2db..ad42468de68 100644 --- a/src/Common/ThreadStatus.cpp +++ b/src/Common/ThreadStatus.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -11,6 +12,7 @@ #include #include +#include namespace DB diff --git a/src/Common/TraceCollector.cpp b/src/Common/TraceCollector.cpp deleted file mode 100644 index 523251fa2a2..00000000000 --- a/src/Common/TraceCollector.cpp +++ /dev/null @@ -1,187 +0,0 @@ -#include "TraceCollector.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -namespace DB -{ - -namespace -{ - /// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id. - /// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler. - /// - /// And it cannot be large, since otherwise it will not fit into PIPE_BUF. - /// The performance test query ids can be surprisingly long like - /// `aggregating_merge_tree_simple_aggregate_function_string.query100.profile100`, - /// so make some allowance for them as well. - constexpr size_t QUERY_ID_MAX_LEN = 128; - static_assert(QUERY_ID_MAX_LEN <= std::numeric_limits::max()); -} - -LazyPipeFDs pipe; - - -TraceCollector::TraceCollector(std::shared_ptr trace_log_) - : trace_log(std::move(trace_log_)) -{ - pipe.open(); - - /** Turn write end of pipe to non-blocking mode to avoid deadlocks - * when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe. - */ - pipe.setNonBlockingWrite(); - pipe.tryIncreaseSize(1 << 20); - - thread = ThreadFromGlobalPool(&TraceCollector::run, this); -} - - -TraceCollector::~TraceCollector() -{ - if (!thread.joinable()) - LOG_ERROR(&Poco::Logger::get("TraceCollector"), "TraceCollector thread is malformed and cannot be joined"); - else - stop(); - - pipe.close(); -} - - -void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trace, Int64 size) -{ - constexpr size_t buf_size = sizeof(char) /// TraceCollector stop flag - + sizeof(UInt8) /// String size - + QUERY_ID_MAX_LEN /// Maximum query_id length - + sizeof(UInt8) /// Number of stack frames - + sizeof(StackTrace::FramePointers) /// Collected stack trace, maximum capacity - + sizeof(TraceType) /// trace type - + sizeof(UInt64) /// thread_id - + sizeof(Int64); /// size - - /// Write should be atomic to avoid overlaps - /// (since recursive collect() is possible) - static_assert(PIPE_BUF >= 512); - static_assert(buf_size <= 512, "Only write of PIPE_BUF to pipe is atomic and the minimal known PIPE_BUF across supported platforms is 512"); - - char buffer[buf_size]; - WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer); - - StringRef query_id; - UInt64 thread_id; - - if (CurrentThread::isInitialized()) - { - query_id = CurrentThread::getQueryId(); - query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN); - - thread_id = CurrentThread::get().thread_id; - } - else - { - thread_id = MainThreadStatus::get()->thread_id; - } - - writeChar(false, out); /// true if requested to stop the collecting thread. - - writeBinary(static_cast(query_id.size), out); - out.write(query_id.data, query_id.size); - - size_t stack_trace_size = stack_trace.getSize(); - size_t stack_trace_offset = stack_trace.getOffset(); - writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out); - for (size_t i = stack_trace_offset; i < stack_trace_size; ++i) - writePODBinary(stack_trace.getFramePointers()[i], out); - - writePODBinary(trace_type, out); - writePODBinary(thread_id, out); - writePODBinary(size, out); - - out.next(); -} - - -/** Sends TraceCollector stop message - * - * Each sequence of data for TraceCollector thread starts with a boolean flag. - * If this flag is true, TraceCollector must stop reading trace_pipe and exit. - * This function sends flag with a true value to stop TraceCollector gracefully. - */ -void TraceCollector::stop() -{ - WriteBufferFromFileDescriptor out(pipe.fds_rw[1]); - writeChar(true, out); - out.next(); - thread.join(); -} - - -void TraceCollector::run() -{ - setThreadName("TraceCollector"); - - ReadBufferFromFileDescriptor in(pipe.fds_rw[0]); - - while (true) - { - char is_last; - readChar(is_last, in); - if (is_last) - break; - - std::string query_id; - UInt8 query_id_size = 0; - readBinary(query_id_size, in); - query_id.resize(query_id_size); - in.read(query_id.data(), query_id_size); - - UInt8 trace_size = 0; - readIntBinary(trace_size, in); - - Array trace; - trace.reserve(trace_size); - - for (size_t i = 0; i < trace_size; ++i) - { - uintptr_t addr = 0; - readPODBinary(addr, in); - trace.emplace_back(UInt64(addr)); - } - - TraceType trace_type; - readPODBinary(trace_type, in); - - UInt64 thread_id; - readPODBinary(thread_id, in); - - Int64 size; - readPODBinary(size, in); - - if (trace_log) - { - // time and time_in_microseconds are both being constructed from the same timespec so that the - // times will be equal up to the precision of a second. - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - - UInt64 time = UInt64(ts.tv_sec * 1000000000LL + ts.tv_nsec); - UInt64 time_in_microseconds = UInt64((ts.tv_sec * 1000000LL) + (ts.tv_nsec / 1000)); - TraceLogElement element{time_t(time / 1000000000), time_in_microseconds, time, trace_type, thread_id, query_id, trace, size}; - trace_log->add(element); - } - } -} - -} diff --git a/src/Common/TraceCollector.h b/src/Common/TraceCollector.h deleted file mode 100644 index d3bbc74726e..00000000000 --- a/src/Common/TraceCollector.h +++ /dev/null @@ -1,46 +0,0 @@ -#pragma once - -#include "Common/PipeFDs.h" -#include - -class StackTrace; - -namespace Poco -{ - class Logger; -} - -namespace DB -{ - -class TraceLog; - -enum class TraceType : uint8_t -{ - Real, - CPU, - Memory, - MemorySample, - MemoryPeak, -}; - -class TraceCollector -{ -public: - TraceCollector(std::shared_ptr trace_log_); - ~TraceCollector(); - - /// Collect a stack trace. This method is signal safe. - /// Precondition: the TraceCollector object must be created. - /// size - for memory tracing is the amount of memory allocated; for other trace types it is 0. - static void collect(TraceType trace_type, const StackTrace & stack_trace, Int64 size); - -private: - std::shared_ptr trace_log; - ThreadFromGlobalPool thread; - - void run(); - void stop(); -}; - -} diff --git a/src/Common/TraceSender.cpp b/src/Common/TraceSender.cpp new file mode 100644 index 00000000000..57ab3df8f96 --- /dev/null +++ b/src/Common/TraceSender.cpp @@ -0,0 +1,78 @@ +#include + +#include +#include +#include +#include + +namespace +{ + /// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id. + /// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler. + /// + /// And it cannot be large, since otherwise it will not fit into PIPE_BUF. + /// The performance test query ids can be surprisingly long like + /// `aggregating_merge_tree_simple_aggregate_function_string.query100.profile100`, + /// so make some allowance for them as well. + constexpr size_t QUERY_ID_MAX_LEN = 128; + static_assert(QUERY_ID_MAX_LEN <= std::numeric_limits::max()); +} + +namespace DB +{ + +LazyPipeFDs TraceSender::pipe; + +void TraceSender::send(TraceType trace_type, const StackTrace & stack_trace, Int64 size) +{ + constexpr size_t buf_size = sizeof(char) /// TraceCollector stop flag + + sizeof(UInt8) /// String size + + QUERY_ID_MAX_LEN /// Maximum query_id length + + sizeof(UInt8) /// Number of stack frames + + sizeof(StackTrace::FramePointers) /// Collected stack trace, maximum capacity + + sizeof(TraceType) /// trace type + + sizeof(UInt64) /// thread_id + + sizeof(Int64); /// size + + /// Write should be atomic to avoid overlaps + /// (since recursive collect() is possible) + static_assert(PIPE_BUF >= 512); + static_assert(buf_size <= 512, "Only write of PIPE_BUF to pipe is atomic and the minimal known PIPE_BUF across supported platforms is 512"); + + char buffer[buf_size]; + WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer); + + StringRef query_id; + UInt64 thread_id; + + if (CurrentThread::isInitialized()) + { + query_id = CurrentThread::getQueryId(); + query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN); + + thread_id = CurrentThread::get().thread_id; + } + else + { + thread_id = MainThreadStatus::get()->thread_id; + } + + writeChar(false, out); /// true if requested to stop the collecting thread. + + writeBinary(static_cast(query_id.size), out); + out.write(query_id.data, query_id.size); + + size_t stack_trace_size = stack_trace.getSize(); + size_t stack_trace_offset = stack_trace.getOffset(); + writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out); + for (size_t i = stack_trace_offset; i < stack_trace_size; ++i) + writePODBinary(stack_trace.getFramePointers()[i], out); + + writePODBinary(trace_type, out); + writePODBinary(thread_id, out); + writePODBinary(size, out); + + out.next(); +} + +} diff --git a/src/Common/TraceSender.h b/src/Common/TraceSender.h new file mode 100644 index 00000000000..04c9286ad39 --- /dev/null +++ b/src/Common/TraceSender.h @@ -0,0 +1,36 @@ +#pragma once + +#include +#include + +class StackTrace; +class TraceCollector; + +namespace DB +{ + +enum class TraceType : uint8_t +{ + Real, + CPU, + Memory, + MemorySample, + MemoryPeak, +}; + +/// This is the second part of TraceCollector, that sends stacktrace to the pipe. +/// It has been split out to avoid dependency from interpreters part. +class TraceSender +{ +public: + /// Collect a stack trace. This method is signal safe. + /// Precondition: the TraceCollector object must be created. + /// size - for memory tracing is the amount of memory allocated; for other trace types it is 0. + static void send(TraceType trace_type, const StackTrace & stack_trace, Int64 size); + +private: + friend class TraceCollector; + static LazyPipeFDs pipe; +}; + +} diff --git a/src/Common/ZooKeeper/CMakeLists.txt b/src/Common/ZooKeeper/CMakeLists.txt index d29fba53277..7e0558dd575 100644 --- a/src/Common/ZooKeeper/CMakeLists.txt +++ b/src/Common/ZooKeeper/CMakeLists.txt @@ -2,9 +2,32 @@ include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake") add_headers_and_sources(clickhouse_common_zookeeper .) +# for clickhouse server add_library(clickhouse_common_zookeeper ${clickhouse_common_zookeeper_headers} ${clickhouse_common_zookeeper_sources}) +target_compile_definitions (clickhouse_common_zookeeper PRIVATE -DZOOKEEPER_LOG) +target_link_libraries (clickhouse_common_zookeeper + PUBLIC + clickhouse_common_io + common + PRIVATE + string_utils +) +# To avoid circular dependency from interpreters. +if (OS_DARWIN) + target_link_libraries (clickhouse_common_zookeeper PRIVATE -Wl,-undefined,dynamic_lookup) +else() + target_link_libraries (clickhouse_common_zookeeper PRIVATE -Wl,--unresolved-symbols=ignore-all) +endif() -target_link_libraries (clickhouse_common_zookeeper PUBLIC clickhouse_common_io common PRIVATE string_utils) +# for examples -- no logging (to avoid extra dependencies) +add_library(clickhouse_common_zookeeper_no_log ${clickhouse_common_zookeeper_headers} ${clickhouse_common_zookeeper_sources}) +target_link_libraries (clickhouse_common_zookeeper_no_log + PUBLIC + clickhouse_common_io + common + PRIVATE + string_utils +) if (ENABLE_EXAMPLES) add_subdirectory(examples) diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index 6a449cf0122..f6c9a3d3ca0 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index f2603cc267f..0627a70193f 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1230,6 +1230,7 @@ void ZooKeeper::setZooKeeperLog(std::shared_ptr zk_log_) std::atomic_store(&zk_log, std::move(zk_log_)); } +#ifdef ZOOKEEPER_LOG void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize) { auto maybe_zk_log = std::atomic_load(&zk_log); @@ -1271,5 +1272,9 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const maybe_zk_log->add(elem); } } +#else +void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr &, const ZooKeeperResponsePtr &, bool) +{} +#endif } diff --git a/src/Common/ZooKeeper/ZooKeeperLock.cpp b/src/Common/ZooKeeper/ZooKeeperLock.cpp new file mode 100644 index 00000000000..1200dcdb533 --- /dev/null +++ b/src/Common/ZooKeeper/ZooKeeperLock.cpp @@ -0,0 +1,93 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +} + +namespace fs = std::filesystem; + +namespace zkutil +{ + +ZooKeeperLock::ZooKeeperLock( + const ZooKeeperPtr & zookeeper_, + const std::string & lock_prefix_, + const std::string & lock_name_, + const std::string & lock_message_) + : zookeeper(zookeeper_) + , lock_path(fs::path(lock_prefix_) / lock_name_) + , lock_message(lock_message_) + , log(&Poco::Logger::get("zkutil::Lock")) +{ + zookeeper->createIfNotExists(lock_prefix_, ""); +} + +ZooKeeperLock::~ZooKeeperLock() +{ + try + { + unlock(); + } + catch (...) + { + DB::tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + +void ZooKeeperLock::unlock() +{ + if (!locked) + return; + + locked = false; + + if (zookeeper->expired()) + { + LOG_WARNING(log, "Lock is lost, because session was expired. Path: {}, message: {}", lock_path, lock_message); + return; + } + + Coordination::Stat stat; + /// NOTE It will throw if session expired after we checked it above + bool result = zookeeper->exists(lock_path, &stat); + + if (result && stat.ephemeralOwner == zookeeper->getClientID()) + zookeeper->remove(lock_path, -1); + else if (result) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Lock is lost, it has another owner. Path: {}, message: {}, owner: {}, our id: {}", + lock_path, lock_message, stat.ephemeralOwner, zookeeper->getClientID()); + else + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Lock is lost, node does not exist. Path: {}, message: {}", lock_path, lock_message); +} + +bool ZooKeeperLock::tryLock() +{ + Coordination::Error code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral); + + if (code == Coordination::Error::ZOK) + { + locked = true; + } + else if (code != Coordination::Error::ZNODEEXISTS) + { + throw Coordination::Exception(code); + } + + return locked; +} + +std::unique_ptr createSimpleZooKeeperLock( + const ZooKeeperPtr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message) +{ + return std::make_unique(zookeeper, lock_prefix, lock_name, lock_message); +} + + +} diff --git a/src/Common/ZooKeeper/ZooKeeperLock.h b/src/Common/ZooKeeper/ZooKeeperLock.h new file mode 100644 index 00000000000..218f14ef132 --- /dev/null +++ b/src/Common/ZooKeeper/ZooKeeperLock.h @@ -0,0 +1,54 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace zkutil +{ + +/** Caveats: usage of locks in ZooKeeper is incorrect in 99% of cases, + * and highlights your poor understanding of distributed systems. + * + * It's only correct if all the operations that are performed under lock + * are atomically checking that the lock still holds + * or if we ensure that these operations will be undone if lock is lost + * (due to ZooKeeper session loss) that's very difficult to achieve. + * + * It's Ok if every operation that we perform under lock is actually operation in ZooKeeper. + * + * In 1% of cases when you can correctly use Lock, the logic is complex enough, so you don't need this class. + * + * TLDR: Don't use this code if you are not sure. We only have a few cases of it's usage. + */ +class ZooKeeperLock +{ +public: + /// lock_prefix - path where the ephemeral lock node will be created + /// lock_name - the name of the ephemeral lock node + ZooKeeperLock( + const ZooKeeperPtr & zookeeper_, + const std::string & lock_prefix_, + const std::string & lock_name_, + const std::string & lock_message_ = ""); + + ~ZooKeeperLock(); + + void unlock(); + bool tryLock(); + +private: + zkutil::ZooKeeperPtr zookeeper; + + std::string lock_path; + std::string lock_message; + Poco::Logger * log; + bool locked = false; + +}; + +std::unique_ptr createSimpleZooKeeperLock( + const ZooKeeperPtr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message); + +} diff --git a/src/Common/ZooKeeper/examples/CMakeLists.txt b/src/Common/ZooKeeper/examples/CMakeLists.txt index bbfa3e1f137..8bec951e24f 100644 --- a/src/Common/ZooKeeper/examples/CMakeLists.txt +++ b/src/Common/ZooKeeper/examples/CMakeLists.txt @@ -1,14 +1,14 @@ add_executable(zkutil_test_commands zkutil_test_commands.cpp) -target_link_libraries(zkutil_test_commands PRIVATE clickhouse_common_zookeeper) +target_link_libraries(zkutil_test_commands PRIVATE clickhouse_common_zookeeper_no_log) add_executable(zkutil_test_commands_new_lib zkutil_test_commands_new_lib.cpp) -target_link_libraries(zkutil_test_commands_new_lib PRIVATE clickhouse_common_zookeeper string_utils) +target_link_libraries(zkutil_test_commands_new_lib PRIVATE clickhouse_common_zookeeper_no_log string_utils) add_executable(zkutil_test_async zkutil_test_async.cpp) -target_link_libraries(zkutil_test_async PRIVATE clickhouse_common_zookeeper) +target_link_libraries(zkutil_test_async PRIVATE clickhouse_common_zookeeper_no_log) add_executable (zk_many_watches_reconnect zk_many_watches_reconnect.cpp) -target_link_libraries (zk_many_watches_reconnect PRIVATE clickhouse_common_zookeeper clickhouse_common_config) +target_link_libraries (zk_many_watches_reconnect PRIVATE clickhouse_common_zookeeper_no_log clickhouse_common_config) add_executable (zookeeper_impl zookeeper_impl.cpp) -target_link_libraries (zookeeper_impl PRIVATE clickhouse_common_zookeeper) +target_link_libraries (zookeeper_impl PRIVATE clickhouse_common_zookeeper_no_log) diff --git a/src/Coordination/FourLetterCommand.cpp b/src/Coordination/FourLetterCommand.cpp index 294e623d803..3d0ebe86bf3 100644 --- a/src/Coordination/FourLetterCommand.cpp +++ b/src/Coordination/FourLetterCommand.cpp @@ -9,6 +9,8 @@ #include #include #include +#include +#include #include diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index a64a7d425f6..4f174e4e803 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1,16 +1,18 @@ #include #include #include -#include -#include -#include #include -#include -#include +#include +#include +#include #include #include #include -#include +#include +#include +#include +#include +#include namespace DB { diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index f61b17a88a6..11d191b7f50 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Dictionaries/CacheDictionary.cpp b/src/Dictionaries/CacheDictionary.cpp index 102421e8721..c21ea763ac3 100644 --- a/src/Dictionaries/CacheDictionary.cpp +++ b/src/Dictionaries/CacheDictionary.cpp @@ -69,7 +69,7 @@ CacheDictionary::CacheDictionary( , rnd_engine(randomSeed()) { if (!source_ptr->supportsSelectiveLoad()) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "{}: source cannot be used with CacheDictionary", full_name); + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "{}: source cannot be used with CacheDictionary", getFullName()); } template diff --git a/src/Dictionaries/DirectDictionary.cpp b/src/Dictionaries/DirectDictionary.cpp index 19bbcb6ca98..9a65d916022 100644 --- a/src/Dictionaries/DirectDictionary.cpp +++ b/src/Dictionaries/DirectDictionary.cpp @@ -28,7 +28,7 @@ DirectDictionary::DirectDictionary( , source_ptr{std::move(source_ptr_)} { if (!source_ptr->supportsSelectiveLoad()) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "{}: source cannot be used with DirectDictionary", full_name); + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "{}: source cannot be used with DirectDictionary", getFullName()); } template diff --git a/src/Dictionaries/FlatDictionary.cpp b/src/Dictionaries/FlatDictionary.cpp index d9c6b04b593..5d26ad3ebc2 100644 --- a/src/Dictionaries/FlatDictionary.cpp +++ b/src/Dictionaries/FlatDictionary.cpp @@ -370,7 +370,7 @@ void FlatDictionary::loadData() updateData(); if (configuration.require_nonempty && 0 == element_count) - throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", full_name); + throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", getFullName()); } void FlatDictionary::calculateBytesAllocated() @@ -478,7 +478,7 @@ void FlatDictionary::resize(Attribute & attribute, UInt64 key) if (key >= configuration.max_array_size) throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "{}: identifier should be less than {}", - full_name, + getFullName(), toString(configuration.max_array_size)); auto & container = std::get>(attribute.container); diff --git a/src/Dictionaries/HashedArrayDictionary.cpp b/src/Dictionaries/HashedArrayDictionary.cpp index 8d16f453328..148aaafb160 100644 --- a/src/Dictionaries/HashedArrayDictionary.cpp +++ b/src/Dictionaries/HashedArrayDictionary.cpp @@ -695,7 +695,7 @@ void HashedArrayDictionary::loadData() if (configuration.require_nonempty && 0 == element_count) throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", - full_name); + getFullName()); } template diff --git a/src/Dictionaries/HashedDictionary.cpp b/src/Dictionaries/HashedDictionary.cpp index bcc93a34d1e..7025c771e8f 100644 --- a/src/Dictionaries/HashedDictionary.cpp +++ b/src/Dictionaries/HashedDictionary.cpp @@ -583,7 +583,7 @@ void HashedDictionary::loadData() if (configuration.require_nonempty && 0 == element_count) throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", - full_name); + getFullName()); } template diff --git a/src/Dictionaries/IDictionary.h b/src/Dictionaries/IDictionary.h index 022fecab03f..042153f0971 100644 --- a/src/Dictionaries/IDictionary.h +++ b/src/Dictionaries/IDictionary.h @@ -54,11 +54,14 @@ class IDictionary : public IExternalLoadable public: explicit IDictionary(const StorageID & dictionary_id_) : dictionary_id(dictionary_id_) - , full_name(dictionary_id.getInternalDictionaryName()) { } - const std::string & getFullName() const{ return full_name; } + std::string getFullName() const + { + std::lock_guard lock{name_mutex}; + return dictionary_id.getInternalDictionaryName(); + } StorageID getDictionaryID() const { @@ -73,7 +76,11 @@ public: dictionary_id = new_name; } - const std::string & getLoadableName() const override final { return getFullName(); } + std::string getLoadableName() const override final + { + std::lock_guard lock{name_mutex}; + return dictionary_id.getInternalDictionaryName(); + } /// Specifies that no database is used. /// Sometimes we cannot simply use an empty string for that because an empty string is @@ -270,7 +277,6 @@ private: mutable StorageID dictionary_id; protected: - const String full_name; String dictionary_comment; }; diff --git a/src/Dictionaries/IPAddressDictionary.cpp b/src/Dictionaries/IPAddressDictionary.cpp index c1f244e3b81..9945ee1d4b3 100644 --- a/src/Dictionaries/IPAddressDictionary.cpp +++ b/src/Dictionaries/IPAddressDictionary.cpp @@ -336,7 +336,7 @@ void IPAddressDictionary::createAttributes() if (attribute.is_nullable) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "{}: array or nullable attributes not supported for dictionary of type {}", - full_name, + getFullName(), getTypeName()); attribute_index_by_name.emplace(attribute.name, attributes.size()); @@ -345,7 +345,7 @@ void IPAddressDictionary::createAttributes() if (attribute.hierarchical) throw Exception(ErrorCodes::TYPE_MISMATCH, "{}: hierarchical attributes not supported for dictionary of type {}", - full_name, + getFullName(), getTypeName()); } }; @@ -520,7 +520,7 @@ void IPAddressDictionary::loadData() LOG_TRACE(logger, "{} ip records are read", ip_records.size()); if (require_nonempty && 0 == element_count) - throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", full_name); + throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", getFullName()); } template @@ -781,7 +781,7 @@ const IPAddressDictionary::Attribute & IPAddressDictionary::getAttribute(const s { const auto it = attribute_index_by_name.find(attribute_name); if (it == std::end(attribute_index_by_name)) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}: no such attribute '{}'", full_name, attribute_name); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}: no such attribute '{}'", getFullName(), attribute_name); return attributes[it->second]; } diff --git a/src/Dictionaries/SSDCacheDictionaryStorage.h b/src/Dictionaries/SSDCacheDictionaryStorage.h index e30b0a257d9..7c7dc838436 100644 --- a/src/Dictionaries/SSDCacheDictionaryStorage.h +++ b/src/Dictionaries/SSDCacheDictionaryStorage.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Functions/FunctionBinaryArithmetic.h b/src/Functions/FunctionBinaryArithmetic.h index 69d65bfcf66..a1c3320f88a 100644 --- a/src/Functions/FunctionBinaryArithmetic.h +++ b/src/Functions/FunctionBinaryArithmetic.h @@ -7,39 +7,38 @@ #include #include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include #include -#include -#include +#include #include +#include #include +#include +#include #include #include -#include -#include -#include -#include -#include -#include -#include -#include "Core/DecimalFunctions.h" -#include "IFunction.h" -#include "FunctionHelpers.h" -#include "IsOperation.h" -#include "DivisionUtils.h" -#include "castTypeToEither.h" -#include "FunctionFactory.h" -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include #include #include - -#include +#include +#include +#include #if USE_EMBEDDED_COMPILER # pragma GCC diagnostic push @@ -134,11 +133,11 @@ public: Case && IsIntegralOrExtended, LeftDataType>, Case && IsIntegralOrExtended, RightDataType>, - /// e.g Decimal * Float64 = Float64 - Case::multiply && IsDataTypeDecimal && IsFloatingPoint, - RightDataType>, - Case::multiply && IsDataTypeDecimal && IsFloatingPoint, - LeftDataType>, + /// e.g Decimal +-*/ Float, least(Decimal, Float), greatest(Decimal, Float) = Float64 + Case::allow_decimal && IsDataTypeDecimal && IsFloatingPoint, + DataTypeFloat64>, + Case::allow_decimal && IsDataTypeDecimal && IsFloatingPoint, + DataTypeFloat64>, /// Decimal Real is not supported (traditional DBs convert Decimal Real to Real) Case && !IsIntegralOrExtendedOrDecimal, InvalidType>, @@ -959,25 +958,16 @@ class FunctionBinaryArithmetic : public IFunction static constexpr const bool left_is_decimal = is_decimal; static constexpr const bool right_is_decimal = is_decimal; - static constexpr const bool result_is_decimal = IsDataTypeDecimal; typename ColVecResult::MutablePtr col_res = nullptr; - const ResultDataType type = [&] - { - if constexpr (left_is_decimal && IsFloatingPoint) - return RightDataType(); - else if constexpr (right_is_decimal && IsFloatingPoint) - return LeftDataType(); - else - return decimalResultType(left, right); - }(); + const ResultDataType type = decimalResultType(left, right); const ResultType scale_a = [&] { if constexpr (IsDataTypeDecimal && is_division) return right.getScaleMultiplier(); // the division impl uses only the scale_a - else if constexpr (result_is_decimal) + else { if constexpr (is_multiply) // the decimal impl uses scales, but if the result is decimal, both of the arguments are decimal, @@ -988,37 +978,14 @@ class FunctionBinaryArithmetic : public IFunction else return type.scaleFactorFor(left, false); } - else if constexpr (left_is_decimal) - { - if (col_left_const) - // the column will be converted to native type later, no need to scale it twice. - // the explicit type is needed to specify lambda return type - return ResultType{1}; - - return 1 / DecimalUtils::convertTo(left.getScaleMultiplier(), 0); - } - else - return 1; // the default value which won't cause any re-scale }(); const ResultType scale_b = [&] { - if constexpr (result_is_decimal) - { if constexpr (is_multiply) return ResultType{1}; else return type.scaleFactorFor(right, is_division); - } - else if constexpr (right_is_decimal) - { - if (col_right_const) - return ResultType{1}; - - return 1 / DecimalUtils::convertTo(right.getScaleMultiplier(), 0); - } - else - return 1; }(); /// non-vector result @@ -1029,20 +996,15 @@ class FunctionBinaryArithmetic : public IFunction ResultType res = {}; if (!right_nullmap || !(*right_nullmap)[0]) - res = check_decimal_overflow ? OpImplCheck::template process(const_a, const_b, scale_a, scale_b) + res = check_decimal_overflow + ? OpImplCheck::template process(const_a, const_b, scale_a, scale_b) : OpImpl::template process(const_a, const_b, scale_a, scale_b); - if constexpr (result_is_decimal) - return ResultDataType(type.getPrecision(), type.getScale()).createColumnConst( - col_left_const->size(), toField(res, type.getScale())); - else - return ResultDataType().createColumnConst(col_left_const->size(), toField(res)); + return ResultDataType(type.getPrecision(), type.getScale()) + .createColumnConst(col_left_const->size(), toField(res, type.getScale())); } - if constexpr (result_is_decimal) - col_res = ColVecResult::create(0, type.getScale()); - else - col_res = ColVecResult::create(0); + col_res = ColVecResult::create(0, type.getScale()); auto & vec_res = col_res->getData(); vec_res.resize(col_left_size); @@ -1219,8 +1181,7 @@ public: } else if constexpr ((IsDataTypeDecimal && IsFloatingPoint) || (IsDataTypeDecimal && IsFloatingPoint)) - type_res = std::make_shared, - LeftDataType, RightDataType>>(); + type_res = std::make_shared(); else if constexpr (IsDataTypeDecimal) type_res = std::make_shared(left.getPrecision(), left.getScale()); else if constexpr (IsDataTypeDecimal) @@ -1453,15 +1414,33 @@ public: else // we can't avoid the else because otherwise the compiler may assume the ResultDataType may be Invalid // and that would produce the compile error. { - using T0 = typename LeftDataType::FieldType; - using T1 = typename RightDataType::FieldType; + constexpr bool decimal_with_float = (IsDataTypeDecimal && IsFloatingPoint) + || (IsFloatingPoint && IsDataTypeDecimal); + + using T0 = std::conditional_t; + using T1 = std::conditional_t; using ResultType = typename ResultDataType::FieldType; using ColVecT0 = ColumnVectorOrDecimal; using ColVecT1 = ColumnVectorOrDecimal; using ColVecResult = ColumnVectorOrDecimal; - const auto * const col_left_raw = arguments[0].column.get(); - const auto * const col_right_raw = arguments[1].column.get(); + ColumnPtr left_col = nullptr; + ColumnPtr right_col = nullptr; + + /// When Decimal op Float32/64, convert both of them into Float64 + if constexpr (decimal_with_float) + { + const auto converted_type = std::make_shared(); + left_col = castColumn(arguments[0], converted_type); + right_col = castColumn(arguments[1], converted_type); + } + else + { + left_col = arguments[0].column; + right_col = arguments[1].column; + } + const auto * const col_left_raw = left_col.get(); + const auto * const col_right_raw = right_col.get(); const size_t col_left_size = col_left_raw->size(); @@ -1471,7 +1450,7 @@ public: const ColVecT0 * const col_left = checkAndGetColumn(col_left_raw); const ColVecT1 * const col_right = checkAndGetColumn(col_right_raw); - if constexpr (IsDataTypeDecimal || IsDataTypeDecimal) + if constexpr (IsDataTypeDecimal) { return executeNumericWithDecimal( left, right, @@ -1525,11 +1504,7 @@ public: const T1 value = col_right_const->template getValue(); OpImpl::template process( - col_left->getData().data(), - &value, - vec_res.data(), - vec_res.size(), - right_nullmap); + col_left->getData().data(), &value, vec_res.data(), vec_res.size(), right_nullmap); } else return nullptr; diff --git a/src/Functions/IsOperation.h b/src/Functions/IsOperation.h index 369978fe271..5af8ae77727 100644 --- a/src/Functions/IsOperation.h +++ b/src/Functions/IsOperation.h @@ -57,10 +57,7 @@ struct IsOperation static constexpr bool division = div_floating || div_int || div_int_or_zero; - static constexpr bool allow_decimal = - plus || minus || multiply || - div_floating || div_int || div_int_or_zero || - least || greatest; + static constexpr bool allow_decimal = plus || minus || multiply || division || least || greatest; }; } diff --git a/src/IO/WriteBuffer.h b/src/IO/WriteBuffer.h index 4d7f300a504..9440ac0a855 100644 --- a/src/IO/WriteBuffer.h +++ b/src/IO/WriteBuffer.h @@ -8,7 +8,7 @@ #include #include -#include +#include #include @@ -116,7 +116,7 @@ public: return; /// finalize() is often called from destructors. - MemoryTracker::LockExceptionInThread lock(VariableContext::Global); + LockMemoryExceptionInThread lock(VariableContext::Global); try { finalizeImpl(); diff --git a/src/Interpreters/AsynchronousMetricLog.h b/src/Interpreters/AsynchronousMetricLog.h index 6275572935c..d0f07a041b7 100644 --- a/src/Interpreters/AsynchronousMetricLog.h +++ b/src/Interpreters/AsynchronousMetricLog.h @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include diff --git a/src/Interpreters/CatBoostModel.h b/src/Interpreters/CatBoostModel.h index eb599b43ef2..51bf0ba94f5 100644 --- a/src/Interpreters/CatBoostModel.h +++ b/src/Interpreters/CatBoostModel.h @@ -61,7 +61,7 @@ public: const ExternalLoadableLifetime & getLifetime() const override; - const std::string & getLoadableName() const override { return name; } + std::string getLoadableName() const override { return name; } bool supportUpdates() const override { return true; } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 14b0f65072a..1c71ab2cd6f 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -63,6 +63,7 @@ #include #include #include +#include #include #include #include @@ -74,7 +75,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Interpreters/CrashLog.h b/src/Interpreters/CrashLog.h index ba27c1f513e..61503d95cee 100644 --- a/src/Interpreters/CrashLog.h +++ b/src/Interpreters/CrashLog.h @@ -1,6 +1,8 @@ #pragma once #include +#include +#include /// Call this function on crash. diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index ee5dc4deebb..3eeb817cbab 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -54,113 +55,6 @@ namespace ErrorCodes constexpr const char * TASK_PROCESSED_OUT_REASON = "Task has been already processed"; -/** Caveats: usage of locks in ZooKeeper is incorrect in 99% of cases, - * and highlights your poor understanding of distributed systems. - * - * It's only correct if all the operations that are performed under lock - * are atomically checking that the lock still holds - * or if we ensure that these operations will be undone if lock is lost - * (due to ZooKeeper session loss) that's very difficult to achieve. - * - * It's Ok if every operation that we perform under lock is actually operation in ZooKeeper. - * - * In 1% of cases when you can correctly use Lock, the logic is complex enough, so you don't need this class. - * - * TLDR: Don't use this code. - * We only have a few cases of it's usage and it will be removed. - */ -class ZooKeeperLock -{ -public: - /// lock_prefix - path where the ephemeral lock node will be created - /// lock_name - the name of the ephemeral lock node - ZooKeeperLock( - const zkutil::ZooKeeperPtr & zookeeper_, - const std::string & lock_prefix_, - const std::string & lock_name_, - const std::string & lock_message_ = "") - : - zookeeper(zookeeper_), - lock_path(fs::path(lock_prefix_) / lock_name_), - lock_message(lock_message_), - log(&Poco::Logger::get("zkutil::Lock")) - { - zookeeper->createIfNotExists(lock_prefix_, ""); - } - - ~ZooKeeperLock() - { - try - { - unlock(); - } - catch (...) - { - DB::tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - - void unlock() - { - if (!locked) - return; - - locked = false; - - if (zookeeper->expired()) - { - LOG_WARNING(log, "Lock is lost, because session was expired. Path: {}, message: {}", lock_path, lock_message); - return; - } - - Coordination::Stat stat; - std::string dummy; - /// NOTE It will throw if session expired after we checked it above - bool result = zookeeper->tryGet(lock_path, dummy, &stat); - - if (result && stat.ephemeralOwner == zookeeper->getClientID()) - zookeeper->remove(lock_path, -1); - else if (result) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Lock is lost, it has another owner. Path: {}, message: {}, owner: {}, our id: {}", - lock_path, lock_message, stat.ephemeralOwner, zookeeper->getClientID()); - else - throw Exception(ErrorCodes::LOGICAL_ERROR, "Lock is lost, node does not exist. Path: {}, message: {}", lock_path, lock_message); - } - - bool tryLock() - { - std::string dummy; - Coordination::Error code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral, dummy); - - if (code == Coordination::Error::ZOK) - { - locked = true; - } - else if (code != Coordination::Error::ZNODEEXISTS) - { - throw Coordination::Exception(code); - } - - return locked; - } - -private: - zkutil::ZooKeeperPtr zookeeper; - - std::string lock_path; - std::string lock_message; - Poco::Logger * log; - bool locked = false; - -}; - -std::unique_ptr createSimpleZooKeeperLock( - const zkutil::ZooKeeperPtr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message) -{ - return std::make_unique(zookeeper, lock_prefix, lock_name, lock_message); -} - - DDLWorker::DDLWorker( int pool_size_, const std::string & zk_root_dir, @@ -656,7 +550,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper) /// We must hold the lock until task execution status is committed to ZooKeeper, /// otherwise another replica may try to execute query again. - std::unique_ptr execute_on_leader_lock; + std::unique_ptr execute_on_leader_lock; /// Step 2: Execute query from the task. if (!task.was_executed) @@ -776,7 +670,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( const String & rewritten_query, const String & /*node_path*/, const ZooKeeperPtr & zookeeper, - std::unique_ptr & execute_on_leader_lock) + std::unique_ptr & execute_on_leader_lock) { StorageReplicatedMergeTree * replicated_storage = dynamic_cast(storage.get()); diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index 0b8b0a4a4d8..dbdf0e94f06 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -30,6 +30,11 @@ namespace Coordination struct Stat; } +namespace zkutil +{ + class ZooKeeperLock; +} + namespace DB { class ASTAlterQuery; @@ -38,7 +43,6 @@ struct DDLTaskBase; using DDLTaskPtr = std::unique_ptr; using ZooKeeperPtr = std::shared_ptr; class AccessRightsElements; -class ZooKeeperLock; class DDLWorker { @@ -95,7 +99,7 @@ protected: const String & rewritten_query, const String & node_path, const ZooKeeperPtr & zookeeper, - std::unique_ptr & execute_on_leader_lock); + std::unique_ptr & execute_on_leader_lock); bool tryExecuteQuery(const String & query, DDLTaskBase & task, const ZooKeeperPtr & zookeeper); diff --git a/src/Interpreters/IExternalLoadable.h b/src/Interpreters/IExternalLoadable.h index f49e8e86b3a..3c004508b0a 100644 --- a/src/Interpreters/IExternalLoadable.h +++ b/src/Interpreters/IExternalLoadable.h @@ -37,7 +37,7 @@ public: virtual const ExternalLoadableLifetime & getLifetime() const = 0; - virtual const std::string & getLoadableName() const = 0; + virtual std::string getLoadableName() const = 0; /// True if object can be updated when lifetime exceeded. virtual bool supportUpdates() const = 0; /// If lifetime exceeded and isModified(), ExternalLoader replace current object with the result of clone(). diff --git a/src/Interpreters/IInterpreter.cpp b/src/Interpreters/IInterpreter.cpp index 1b0e9738429..af0c06e7503 100644 --- a/src/Interpreters/IInterpreter.cpp +++ b/src/Interpreters/IInterpreter.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace DB { diff --git a/src/Interpreters/IInterpreterUnionOrSelectQuery.cpp b/src/Interpreters/IInterpreterUnionOrSelectQuery.cpp index 55c007e2713..fb02ba3dc5a 100644 --- a/src/Interpreters/IInterpreterUnionOrSelectQuery.cpp +++ b/src/Interpreters/IInterpreterUnionOrSelectQuery.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB { diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 2475d437acb..d01f2b05567 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index d0308d11a35..d2b77f1a439 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include diff --git a/src/Interpreters/InterpreterExplainQuery.cpp b/src/Interpreters/InterpreterExplainQuery.cpp index fdb35637a9a..37b944d72d6 100644 --- a/src/Interpreters/InterpreterExplainQuery.cpp +++ b/src/Interpreters/InterpreterExplainQuery.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -15,6 +16,7 @@ #include #include #include +#include #include #include diff --git a/src/Interpreters/InterpreterWatchQuery.cpp b/src/Interpreters/InterpreterWatchQuery.cpp index 9e23cdf2af2..8a079ee471d 100644 --- a/src/Interpreters/InterpreterWatchQuery.cpp +++ b/src/Interpreters/InterpreterWatchQuery.cpp @@ -16,6 +16,7 @@ limitations under the License. */ #include #include #include +#include namespace DB diff --git a/src/Interpreters/InterpreterWatchQuery.h b/src/Interpreters/InterpreterWatchQuery.h index ac167182a71..1ca8f18dd67 100644 --- a/src/Interpreters/InterpreterWatchQuery.h +++ b/src/Interpreters/InterpreterWatchQuery.h @@ -15,7 +15,7 @@ limitations under the License. */ #include #include #include -#include +#include #include namespace DB diff --git a/src/Interpreters/MetricLog.h b/src/Interpreters/MetricLog.h index c43c2872788..579e741c479 100644 --- a/src/Interpreters/MetricLog.h +++ b/src/Interpreters/MetricLog.h @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include diff --git a/src/Interpreters/OpenTelemetrySpanLog.cpp b/src/Interpreters/OpenTelemetrySpanLog.cpp index 89cce890555..4c415800a20 100644 --- a/src/Interpreters/OpenTelemetrySpanLog.cpp +++ b/src/Interpreters/OpenTelemetrySpanLog.cpp @@ -8,8 +8,10 @@ #include #include #include +#include #include +#include namespace DB diff --git a/src/Interpreters/OpenTelemetrySpanLog.h b/src/Interpreters/OpenTelemetrySpanLog.h index b287301325c..8dfc2eccc00 100644 --- a/src/Interpreters/OpenTelemetrySpanLog.h +++ b/src/Interpreters/OpenTelemetrySpanLog.h @@ -1,6 +1,8 @@ #pragma once #include +#include +#include namespace DB { diff --git a/src/Interpreters/PartLog.h b/src/Interpreters/PartLog.h index b2d18e4d40d..bdd1db4334a 100644 --- a/src/Interpreters/PartLog.h +++ b/src/Interpreters/PartLog.h @@ -1,6 +1,8 @@ #pragma once #include +#include +#include namespace DB diff --git a/src/Interpreters/QueryLog.h b/src/Interpreters/QueryLog.h index 49c38e7d2a9..f015afb9249 100644 --- a/src/Interpreters/QueryLog.h +++ b/src/Interpreters/QueryLog.h @@ -1,6 +1,8 @@ #pragma once +#include #include +#include #include #include diff --git a/src/Interpreters/QueryThreadLog.h b/src/Interpreters/QueryThreadLog.h index f826ebac4fd..3b260b71441 100644 --- a/src/Interpreters/QueryThreadLog.h +++ b/src/Interpreters/QueryThreadLog.h @@ -2,6 +2,8 @@ #include #include +#include +#include namespace ProfileEvents diff --git a/src/Interpreters/QueryViewsLog.h b/src/Interpreters/QueryViewsLog.h index 254cacd2387..c0936e52a1c 100644 --- a/src/Interpreters/QueryViewsLog.h +++ b/src/Interpreters/QueryViewsLog.h @@ -9,6 +9,8 @@ #include #include #include +#include +#include #include #include diff --git a/src/Interpreters/SessionLog.cpp b/src/Interpreters/SessionLog.cpp index f9419088df8..d9698be1a9b 100644 --- a/src/Interpreters/SessionLog.cpp +++ b/src/Interpreters/SessionLog.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include diff --git a/src/Interpreters/SessionLog.h b/src/Interpreters/SessionLog.h index 93766d685e0..26f137565cb 100644 --- a/src/Interpreters/SessionLog.h +++ b/src/Interpreters/SessionLog.h @@ -3,6 +3,9 @@ #include #include #include +#include +#include +#include namespace DB { diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index fc2a5b620e2..66e28678ce6 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -10,17 +10,38 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include +#include +#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576 + namespace DB { namespace ErrorCodes { extern const int BAD_ARGUMENTS; + extern const int TIMEOUT_EXCEEDED; + extern const int LOGICAL_ERROR; } namespace @@ -96,6 +117,9 @@ std::shared_ptr createSystemLog( } +/// +/// ISystemLog +/// ASTPtr ISystemLog::getCreateTableQueryClean(const StorageID & table_id, ContextPtr context) { DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name); @@ -111,7 +135,40 @@ ASTPtr ISystemLog::getCreateTableQueryClean(const StorageID & table_id, ContextP return old_ast; } +void ISystemLog::stopFlushThread() +{ + { + std::lock_guard lock(mutex); + if (!saving_thread.joinable()) + { + return; + } + + if (is_shutdown) + { + return; + } + + is_shutdown = true; + + /// Tell thread to shutdown. + flush_event.notify_all(); + } + + saving_thread.join(); +} + +void ISystemLog::startup() +{ + std::lock_guard lock(mutex); + saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); }); +} + + +/// +/// SystemLogs +/// SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config) { query_log = createSystemLog(global_context, "system", "query_log", config, "query_log"); @@ -193,4 +250,392 @@ void SystemLogs::shutdown() log->shutdown(); } +/// +/// SystemLog +/// +template +SystemLog::SystemLog( + ContextPtr context_, + const String & database_name_, + const String & table_name_, + const String & storage_def_, + size_t flush_interval_milliseconds_) + : WithContext(context_) + , table_id(database_name_, table_name_) + , storage_def(storage_def_) + , create_query(serializeAST(*getCreateTableQuery())) + , flush_interval_milliseconds(flush_interval_milliseconds_) +{ + assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE); + log = &Poco::Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")"); +} + + +static thread_local bool recursive_add_call = false; + +template +void SystemLog::add(const LogElement & element) +{ + /// It is possible that the method will be called recursively. + /// Better to drop these events to avoid complications. + if (recursive_add_call) + return; + recursive_add_call = true; + SCOPE_EXIT({ recursive_add_call = false; }); + + /// Memory can be allocated while resizing on queue.push_back. + /// The size of allocation can be in order of a few megabytes. + /// But this should not be accounted for query memory usage. + /// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky. + MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global); + + /// Should not log messages under mutex. + bool queue_is_half_full = false; + + { + std::unique_lock lock(mutex); + + if (is_shutdown) + return; + + if (queue.size() == DBMS_SYSTEM_LOG_QUEUE_SIZE / 2) + { + queue_is_half_full = true; + + // The queue more than half full, time to flush. + // We only check for strict equality, because messages are added one + // by one, under exclusive lock, so we will see each message count. + // It is enough to only wake the flushing thread once, after the message + // count increases past half available size. + const uint64_t queue_end = queue_front_index + queue.size(); + if (requested_flush_up_to < queue_end) + requested_flush_up_to = queue_end; + + flush_event.notify_all(); + } + + if (queue.size() >= DBMS_SYSTEM_LOG_QUEUE_SIZE) + { + // Ignore all further entries until the queue is flushed. + // Log a message about that. Don't spam it -- this might be especially + // problematic in case of trace log. Remember what the front index of the + // queue was when we last logged the message. If it changed, it means the + // queue was flushed, and we can log again. + if (queue_front_index != logged_queue_full_at_index) + { + logged_queue_full_at_index = queue_front_index; + + // TextLog sets its logger level to 0, so this log is a noop and + // there is no recursive logging. + lock.unlock(); + LOG_ERROR(log, "Queue is full for system log '{}' at {}", demangle(typeid(*this).name()), queue_front_index); + } + + return; + } + + queue.push_back(element); + } + + if (queue_is_half_full) + LOG_INFO(log, "Queue is half full for system log '{}'.", demangle(typeid(*this).name())); +} + +template +void SystemLog::shutdown() +{ + stopFlushThread(); + + auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext()); + if (table) + table->flushAndShutdown(); +} + +template +void SystemLog::flush(bool force) +{ + uint64_t this_thread_requested_offset; + + { + std::unique_lock lock(mutex); + + if (is_shutdown) + return; + + this_thread_requested_offset = queue_front_index + queue.size(); + + // Publish our flush request, taking care not to overwrite the requests + // made by other threads. + is_force_prepare_tables |= force; + requested_flush_up_to = std::max(requested_flush_up_to, + this_thread_requested_offset); + + flush_event.notify_all(); + } + + LOG_DEBUG(log, "Requested flush up to offset {}", + this_thread_requested_offset); + + // Use an arbitrary timeout to avoid endless waiting. 60s proved to be + // too fast for our parallel functional tests, probably because they + // heavily load the disk. + const int timeout_seconds = 180; + std::unique_lock lock(mutex); + bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), + [&] { return flushed_up_to >= this_thread_requested_offset + && !is_force_prepare_tables; }); + + if (!result) + { + throw Exception("Timeout exceeded (" + toString(timeout_seconds) + " s) while flushing system log '" + demangle(typeid(*this).name()) + "'.", + ErrorCodes::TIMEOUT_EXCEEDED); + } +} + + +template +void SystemLog::savingThreadFunction() +{ + setThreadName("SystemLogFlush"); + + std::vector to_flush; + bool exit_this_thread = false; + while (!exit_this_thread) + { + try + { + // The end index (exclusive, like std end()) of the messages we are + // going to flush. + uint64_t to_flush_end = 0; + // Should we prepare table even if there are no new messages. + bool should_prepare_tables_anyway = false; + + { + std::unique_lock lock(mutex); + flush_event.wait_for(lock, + std::chrono::milliseconds(flush_interval_milliseconds), + [&] () + { + return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables; + } + ); + + queue_front_index += queue.size(); + to_flush_end = queue_front_index; + // Swap with existing array from previous flush, to save memory + // allocations. + to_flush.resize(0); + queue.swap(to_flush); + + should_prepare_tables_anyway = is_force_prepare_tables; + + exit_this_thread = is_shutdown; + } + + if (to_flush.empty()) + { + if (should_prepare_tables_anyway) + { + prepareTable(); + LOG_TRACE(log, "Table created (force)"); + + std::lock_guard lock(mutex); + is_force_prepare_tables = false; + flush_event.notify_all(); + } + } + else + { + flushImpl(to_flush, to_flush_end); + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + LOG_TRACE(log, "Terminating"); +} + + +template +void SystemLog::flushImpl(const std::vector & to_flush, uint64_t to_flush_end) +{ + try + { + LOG_TRACE(log, "Flushing system log, {} entries to flush up to offset {}", + to_flush.size(), to_flush_end); + + /// We check for existence of the table and create it as needed at every + /// flush. This is done to allow user to drop the table at any moment + /// (new empty table will be created automatically). BTW, flush method + /// is called from single thread. + prepareTable(); + + ColumnsWithTypeAndName log_element_columns; + auto log_element_names_and_types = LogElement::getNamesAndTypes(); + + for (const auto & name_and_type : log_element_names_and_types) + log_element_columns.emplace_back(name_and_type.type, name_and_type.name); + + Block block(std::move(log_element_columns)); + + MutableColumns columns = block.mutateColumns(); + for (const auto & elem : to_flush) + elem.appendToBlock(columns); + + block.setColumns(std::move(columns)); + + /// We write to table indirectly, using InterpreterInsertQuery. + /// This is needed to support DEFAULT-columns in table. + + std::unique_ptr insert = std::make_unique(); + insert->table_id = table_id; + ASTPtr query_ptr(insert.release()); + + // we need query context to do inserts to target table with MV containing subqueries or joins + auto insert_context = Context::createCopy(context); + insert_context->makeQueryContext(); + + InterpreterInsertQuery interpreter(query_ptr, insert_context); + BlockIO io = interpreter.execute(); + + PushingPipelineExecutor executor(io.pipeline); + + executor.start(); + executor.push(block); + executor.finish(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + { + std::lock_guard lock(mutex); + flushed_up_to = to_flush_end; + is_force_prepare_tables = false; + flush_event.notify_all(); + } + + LOG_TRACE(log, "Flushed system log up to offset {}", to_flush_end); +} + + +template +void SystemLog::prepareTable() +{ + String description = table_id.getNameForLogs(); + + auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext()); + if (table) + { + if (old_create_query.empty()) + { + old_create_query = serializeAST(*getCreateTableQueryClean(table_id, getContext())); + if (old_create_query.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty CREATE QUERY for {}", backQuoteIfNeed(table_id.table_name)); + } + + if (old_create_query != create_query) + { + /// Rename the existing table. + int suffix = 0; + while (DatabaseCatalog::instance().isTableExist( + {table_id.database_name, table_id.table_name + "_" + toString(suffix)}, getContext())) + ++suffix; + + auto rename = std::make_shared(); + + ASTRenameQuery::Table from; + from.database = table_id.database_name; + from.table = table_id.table_name; + + ASTRenameQuery::Table to; + to.database = table_id.database_name; + to.table = table_id.table_name + "_" + toString(suffix); + + ASTRenameQuery::Element elem; + elem.from = from; + elem.to = to; + + rename->elements.emplace_back(elem); + + LOG_DEBUG( + log, + "Existing table {} for system log has obsolete or different structure. Renaming it to {}.\nOld: {}\nNew: {}\n.", + description, + backQuoteIfNeed(to.table), + old_create_query, + create_query); + + auto query_context = Context::createCopy(context); + query_context->makeQueryContext(); + InterpreterRenameQuery(rename, query_context).execute(); + + /// The required table will be created. + table = nullptr; + } + else if (!is_prepared) + LOG_DEBUG(log, "Will use existing table {} for {}", description, LogElement::name()); + } + + if (!table) + { + /// Create the table. + LOG_DEBUG(log, "Creating new table {} for {}", description, LogElement::name()); + + auto query_context = Context::createCopy(context); + query_context->makeQueryContext(); + + auto create_query_ast = getCreateTableQuery(); + InterpreterCreateQuery interpreter(create_query_ast, query_context); + interpreter.setInternal(true); + interpreter.execute(); + + table = DatabaseCatalog::instance().getTable(table_id, getContext()); + + old_create_query.clear(); + } + + is_prepared = true; +} + + +template +ASTPtr SystemLog::getCreateTableQuery() +{ + auto create = std::make_shared(); + + create->setDatabase(table_id.database_name); + create->setTable(table_id.table_name); + + auto ordinary_columns = LogElement::getNamesAndTypes(); + auto alias_columns = LogElement::getNamesAndAliases(); + auto new_columns_list = std::make_shared(); + new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(ordinary_columns, alias_columns)); + create->set(create->columns_list, new_columns_list); + + ParserStorage storage_parser; + ASTPtr storage_ast = parseQuery( + storage_parser, storage_def.data(), storage_def.data() + storage_def.size(), + "Storage to create table for " + LogElement::name(), 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); + create->set(create->storage, storage_ast); + + return create; +} + +template class SystemLog; +template class SystemLog; +template class SystemLog; +template class SystemLog; +template class SystemLog; +template class SystemLog; +template class SystemLog; +template class SystemLog; +template class SystemLog; +template class SystemLog; +template class SystemLog; +template class SystemLog; + } diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index 46254d0c3a2..3209dd2e13e 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -4,32 +4,27 @@ #include #include #include - #include #include -#include -#include + #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include #include -#include -#include +namespace Poco +{ +class Logger; +namespace Util +{ +class AbstractConfiguration; +} +} + namespace DB { @@ -58,14 +53,6 @@ namespace DB }; */ -namespace ErrorCodes -{ - extern const int TIMEOUT_EXCEEDED; - extern const int LOGICAL_ERROR; -} - -#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576 - class QueryLog; class QueryThreadLog; class PartLog; @@ -87,15 +74,33 @@ public: //// force -- force table creation (used for SYSTEM FLUSH LOGS) virtual void flush(bool force = false) = 0; virtual void prepareTable() = 0; - virtual void startup() = 0; + + /// Start the background thread. + virtual void startup(); + + /// Stop the background flush thread before destructor. No more data will be written. virtual void shutdown() = 0; + virtual ~ISystemLog() = default; + virtual void savingThreadFunction() = 0; + /// returns CREATE TABLE query, but with removed: /// - UUID /// - SETTINGS (for MergeTree) /// That way it can be used to compare with the SystemLog::getCreateTableQuery() static ASTPtr getCreateTableQueryClean(const StorageID & table_id, ContextPtr context); + +protected: + ThreadFromGlobalPool saving_thread; + + /// Data shared between callers of add()/flush()/shutdown(), and the saving thread + std::mutex mutex; + + bool is_shutdown = false; + std::condition_variable flush_event; + + void stopFlushThread(); }; @@ -156,23 +161,10 @@ public: */ void add(const LogElement & element); - void stopFlushThread(); + void shutdown() override; /// Flush data in the buffer to disk - void flush(bool force = false) override; - - /// Start the background thread. - void startup() override; - - /// Stop the background flush thread before destructor. No more data will be written. - void shutdown() override - { - stopFlushThread(); - - auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext()); - if (table) - table->flushAndShutdown(); - } + void flush(bool force) override; String getName() override { @@ -192,10 +184,7 @@ private: String old_create_query; bool is_prepared = false; const size_t flush_interval_milliseconds; - ThreadFromGlobalPool saving_thread; - /* Data shared between callers of add()/flush()/shutdown(), and the saving thread */ - std::mutex mutex; // Queue is bounded. But its size is quite large to not block in all normal cases. std::vector queue; // An always-incrementing index of the first message currently in the queue. @@ -203,10 +192,8 @@ private: // can wait until a particular message is flushed. This is used to implement // synchronous log flushing for SYSTEM FLUSH LOGS. uint64_t queue_front_index = 0; - bool is_shutdown = false; // A flag that says we must create the tables even if the queue is empty. bool is_force_prepare_tables = false; - std::condition_variable flush_event; // Requested to flush logs up to this index, exclusive uint64_t requested_flush_up_to = 0; // Flushed log up to this index, exclusive @@ -214,7 +201,7 @@ private: // Logged overflow message at this queue front index uint64_t logged_queue_full_at_index = -1; - void savingThreadFunction(); + void savingThreadFunction() override; /** Creates new table if it does not exist. * Renames old table if its structure is not suitable. @@ -226,403 +213,4 @@ private: void flushImpl(const std::vector & to_flush, uint64_t to_flush_end); }; - -template -SystemLog::SystemLog( - ContextPtr context_, - const String & database_name_, - const String & table_name_, - const String & storage_def_, - size_t flush_interval_milliseconds_) - : WithContext(context_) - , table_id(database_name_, table_name_) - , storage_def(storage_def_) - , create_query(serializeAST(*getCreateTableQuery())) - , flush_interval_milliseconds(flush_interval_milliseconds_) -{ - assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE); - log = &Poco::Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")"); -} - - -template -void SystemLog::startup() -{ - std::lock_guard lock(mutex); - saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); }); -} - - -static thread_local bool recursive_add_call = false; - -template -void SystemLog::add(const LogElement & element) -{ - /// It is possible that the method will be called recursively. - /// Better to drop these events to avoid complications. - if (recursive_add_call) - return; - recursive_add_call = true; - SCOPE_EXIT({ recursive_add_call = false; }); - - /// Memory can be allocated while resizing on queue.push_back. - /// The size of allocation can be in order of a few megabytes. - /// But this should not be accounted for query memory usage. - /// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky. - MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global); - - /// Should not log messages under mutex. - bool queue_is_half_full = false; - - { - std::unique_lock lock(mutex); - - if (is_shutdown) - return; - - if (queue.size() == DBMS_SYSTEM_LOG_QUEUE_SIZE / 2) - { - queue_is_half_full = true; - - // The queue more than half full, time to flush. - // We only check for strict equality, because messages are added one - // by one, under exclusive lock, so we will see each message count. - // It is enough to only wake the flushing thread once, after the message - // count increases past half available size. - const uint64_t queue_end = queue_front_index + queue.size(); - if (requested_flush_up_to < queue_end) - requested_flush_up_to = queue_end; - - flush_event.notify_all(); - } - - if (queue.size() >= DBMS_SYSTEM_LOG_QUEUE_SIZE) - { - // Ignore all further entries until the queue is flushed. - // Log a message about that. Don't spam it -- this might be especially - // problematic in case of trace log. Remember what the front index of the - // queue was when we last logged the message. If it changed, it means the - // queue was flushed, and we can log again. - if (queue_front_index != logged_queue_full_at_index) - { - logged_queue_full_at_index = queue_front_index; - - // TextLog sets its logger level to 0, so this log is a noop and - // there is no recursive logging. - lock.unlock(); - LOG_ERROR(log, "Queue is full for system log '{}' at {}", demangle(typeid(*this).name()), queue_front_index); - } - - return; - } - - queue.push_back(element); - } - - if (queue_is_half_full) - LOG_INFO(log, "Queue is half full for system log '{}'.", demangle(typeid(*this).name())); -} - - -template -void SystemLog::flush(bool force) -{ - uint64_t this_thread_requested_offset; - - { - std::unique_lock lock(mutex); - - if (is_shutdown) - return; - - this_thread_requested_offset = queue_front_index + queue.size(); - - // Publish our flush request, taking care not to overwrite the requests - // made by other threads. - is_force_prepare_tables |= force; - requested_flush_up_to = std::max(requested_flush_up_to, - this_thread_requested_offset); - - flush_event.notify_all(); - } - - LOG_DEBUG(log, "Requested flush up to offset {}", - this_thread_requested_offset); - - // Use an arbitrary timeout to avoid endless waiting. 60s proved to be - // too fast for our parallel functional tests, probably because they - // heavily load the disk. - const int timeout_seconds = 180; - std::unique_lock lock(mutex); - bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), - [&] { return flushed_up_to >= this_thread_requested_offset - && !is_force_prepare_tables; }); - - if (!result) - { - throw Exception("Timeout exceeded (" + toString(timeout_seconds) + " s) while flushing system log '" + demangle(typeid(*this).name()) + "'.", - ErrorCodes::TIMEOUT_EXCEEDED); - } -} - - -template -void SystemLog::stopFlushThread() -{ - { - std::lock_guard lock(mutex); - - if (!saving_thread.joinable()) - { - return; - } - - if (is_shutdown) - { - return; - } - - is_shutdown = true; - - /// Tell thread to shutdown. - flush_event.notify_all(); - } - - saving_thread.join(); -} - - -template -void SystemLog::savingThreadFunction() -{ - setThreadName("SystemLogFlush"); - - std::vector to_flush; - bool exit_this_thread = false; - while (!exit_this_thread) - { - try - { - // The end index (exclusive, like std end()) of the messages we are - // going to flush. - uint64_t to_flush_end = 0; - // Should we prepare table even if there are no new messages. - bool should_prepare_tables_anyway = false; - - { - std::unique_lock lock(mutex); - flush_event.wait_for(lock, - std::chrono::milliseconds(flush_interval_milliseconds), - [&] () - { - return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables; - } - ); - - queue_front_index += queue.size(); - to_flush_end = queue_front_index; - // Swap with existing array from previous flush, to save memory - // allocations. - to_flush.resize(0); - queue.swap(to_flush); - - should_prepare_tables_anyway = is_force_prepare_tables; - - exit_this_thread = is_shutdown; - } - - if (to_flush.empty()) - { - if (should_prepare_tables_anyway) - { - prepareTable(); - LOG_TRACE(log, "Table created (force)"); - - std::lock_guard lock(mutex); - is_force_prepare_tables = false; - flush_event.notify_all(); - } - } - else - { - flushImpl(to_flush, to_flush_end); - } - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - LOG_TRACE(log, "Terminating"); -} - - -template -void SystemLog::flushImpl(const std::vector & to_flush, uint64_t to_flush_end) -{ - try - { - LOG_TRACE(log, "Flushing system log, {} entries to flush up to offset {}", - to_flush.size(), to_flush_end); - - /// We check for existence of the table and create it as needed at every - /// flush. This is done to allow user to drop the table at any moment - /// (new empty table will be created automatically). BTW, flush method - /// is called from single thread. - prepareTable(); - - ColumnsWithTypeAndName log_element_columns; - auto log_element_names_and_types = LogElement::getNamesAndTypes(); - - for (auto name_and_type : log_element_names_and_types) - log_element_columns.emplace_back(name_and_type.type, name_and_type.name); - - Block block(std::move(log_element_columns)); - - MutableColumns columns = block.mutateColumns(); - for (const auto & elem : to_flush) - elem.appendToBlock(columns); - - block.setColumns(std::move(columns)); - - /// We write to table indirectly, using InterpreterInsertQuery. - /// This is needed to support DEFAULT-columns in table. - - std::unique_ptr insert = std::make_unique(); - insert->table_id = table_id; - ASTPtr query_ptr(insert.release()); - - // we need query context to do inserts to target table with MV containing subqueries or joins - auto insert_context = Context::createCopy(context); - insert_context->makeQueryContext(); - - InterpreterInsertQuery interpreter(query_ptr, insert_context); - BlockIO io = interpreter.execute(); - - PushingPipelineExecutor executor(io.pipeline); - - executor.start(); - executor.push(block); - executor.finish(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - - { - std::lock_guard lock(mutex); - flushed_up_to = to_flush_end; - is_force_prepare_tables = false; - flush_event.notify_all(); - } - - LOG_TRACE(log, "Flushed system log up to offset {}", to_flush_end); -} - - -template -void SystemLog::prepareTable() -{ - String description = table_id.getNameForLogs(); - - auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext()); - - if (table) - { - if (old_create_query.empty()) - { - old_create_query = serializeAST(*getCreateTableQueryClean(table_id, getContext())); - if (old_create_query.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty CREATE QUERY for {}", backQuoteIfNeed(table_id.table_name)); - } - - if (old_create_query != create_query) - { - /// Rename the existing table. - int suffix = 0; - while (DatabaseCatalog::instance().isTableExist( - {table_id.database_name, table_id.table_name + "_" + toString(suffix)}, getContext())) - ++suffix; - - auto rename = std::make_shared(); - - ASTRenameQuery::Table from; - from.database = table_id.database_name; - from.table = table_id.table_name; - - ASTRenameQuery::Table to; - to.database = table_id.database_name; - to.table = table_id.table_name + "_" + toString(suffix); - - ASTRenameQuery::Element elem; - elem.from = from; - elem.to = to; - - rename->elements.emplace_back(elem); - - LOG_DEBUG( - log, - "Existing table {} for system log has obsolete or different structure. Renaming it to {}.\nOld: {}\nNew: {}\n.", - description, - backQuoteIfNeed(to.table), - old_create_query, - create_query); - - auto query_context = Context::createCopy(context); - query_context->makeQueryContext(); - InterpreterRenameQuery(rename, query_context).execute(); - - /// The required table will be created. - table = nullptr; - } - else if (!is_prepared) - LOG_DEBUG(log, "Will use existing table {} for {}", description, LogElement::name()); - } - - if (!table) - { - /// Create the table. - LOG_DEBUG(log, "Creating new table {} for {}", description, LogElement::name()); - - auto query_context = Context::createCopy(context); - query_context->makeQueryContext(); - - auto create_query_ast = getCreateTableQuery(); - InterpreterCreateQuery interpreter(create_query_ast, query_context); - interpreter.setInternal(true); - interpreter.execute(); - - table = DatabaseCatalog::instance().getTable(table_id, getContext()); - - old_create_query.clear(); - } - - is_prepared = true; -} - - -template -ASTPtr SystemLog::getCreateTableQuery() -{ - auto create = std::make_shared(); - - create->setDatabase(table_id.database_name); - create->setTable(table_id.table_name); - - auto ordinary_columns = LogElement::getNamesAndTypes(); - auto alias_columns = LogElement::getNamesAndAliases(); - auto new_columns_list = std::make_shared(); - new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(ordinary_columns, alias_columns)); - create->set(create->columns_list, new_columns_list); - - ParserStorage storage_parser; - ASTPtr storage_ast = parseQuery( - storage_parser, storage_def.data(), storage_def.data() + storage_def.size(), - "Storage to create table for " + LogElement::name(), 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); - create->set(create->storage, storage_ast); - - return create; -} - } diff --git a/src/Interpreters/TextLog.cpp b/src/Interpreters/TextLog.cpp index 51ffbdd66ee..9aa5b26508e 100644 --- a/src/Interpreters/TextLog.cpp +++ b/src/Interpreters/TextLog.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include diff --git a/src/Interpreters/TextLog.h b/src/Interpreters/TextLog.h index d2ddd23d1e9..3026452fcc3 100644 --- a/src/Interpreters/TextLog.h +++ b/src/Interpreters/TextLog.h @@ -1,5 +1,9 @@ #pragma once + #include +#include +#include +#include namespace DB { diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index b3720b89eaa..2ea371d3d03 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -14,7 +15,8 @@ #include #include #include -#include +#include +#include #include #if defined(OS_LINUX) @@ -341,7 +343,7 @@ void ThreadStatus::finalizeQueryProfiler() void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) { - MemoryTracker::LockExceptionInThread lock(VariableContext::Global); + LockMemoryExceptionInThread lock(VariableContext::Global); if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery) { diff --git a/src/Interpreters/TraceCollector.cpp b/src/Interpreters/TraceCollector.cpp new file mode 100644 index 00000000000..84d2e25f70a --- /dev/null +++ b/src/Interpreters/TraceCollector.cpp @@ -0,0 +1,114 @@ +#include "TraceCollector.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +TraceCollector::TraceCollector(std::shared_ptr trace_log_) + : trace_log(std::move(trace_log_)) +{ + TraceSender::pipe.open(); + + /** Turn write end of pipe to non-blocking mode to avoid deadlocks + * when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe. + */ + TraceSender::pipe.setNonBlockingWrite(); + TraceSender::pipe.tryIncreaseSize(1 << 20); + + thread = ThreadFromGlobalPool(&TraceCollector::run, this); +} + + +TraceCollector::~TraceCollector() +{ + if (!thread.joinable()) + LOG_ERROR(&Poco::Logger::get("TraceCollector"), "TraceCollector thread is malformed and cannot be joined"); + else + stop(); + + TraceSender::pipe.close(); +} + + +/** Sends TraceCollector stop message + * + * Each sequence of data for TraceCollector thread starts with a boolean flag. + * If this flag is true, TraceCollector must stop reading trace_pipe and exit. + * This function sends flag with a true value to stop TraceCollector gracefully. + */ +void TraceCollector::stop() +{ + WriteBufferFromFileDescriptor out(TraceSender::pipe.fds_rw[1]); + writeChar(true, out); + out.next(); + thread.join(); +} + + +void TraceCollector::run() +{ + setThreadName("TraceCollector"); + + ReadBufferFromFileDescriptor in(TraceSender::pipe.fds_rw[0]); + + while (true) + { + char is_last; + readChar(is_last, in); + if (is_last) + break; + + std::string query_id; + UInt8 query_id_size = 0; + readBinary(query_id_size, in); + query_id.resize(query_id_size); + in.read(query_id.data(), query_id_size); + + UInt8 trace_size = 0; + readIntBinary(trace_size, in); + + Array trace; + trace.reserve(trace_size); + + for (size_t i = 0; i < trace_size; ++i) + { + uintptr_t addr = 0; + readPODBinary(addr, in); + trace.emplace_back(UInt64(addr)); + } + + TraceType trace_type; + readPODBinary(trace_type, in); + + UInt64 thread_id; + readPODBinary(thread_id, in); + + Int64 size; + readPODBinary(size, in); + + if (trace_log) + { + // time and time_in_microseconds are both being constructed from the same timespec so that the + // times will be equal up to the precision of a second. + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + + UInt64 time = UInt64(ts.tv_sec * 1000000000LL + ts.tv_nsec); + UInt64 time_in_microseconds = UInt64((ts.tv_sec * 1000000LL) + (ts.tv_nsec / 1000)); + TraceLogElement element{time_t(time / 1000000000), time_in_microseconds, time, trace_type, thread_id, query_id, trace, size}; + trace_log->add(element); + } + } +} + +} diff --git a/src/Interpreters/TraceCollector.h b/src/Interpreters/TraceCollector.h new file mode 100644 index 00000000000..3a9edf676be --- /dev/null +++ b/src/Interpreters/TraceCollector.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include + +class StackTrace; + +namespace Poco +{ + class Logger; +} + +namespace DB +{ + +class TraceLog; + +class TraceCollector +{ +public: + TraceCollector(std::shared_ptr trace_log_); + ~TraceCollector(); + + static inline void collect(TraceType trace_type, const StackTrace & stack_trace, Int64 size) + { + return TraceSender::send(trace_type, stack_trace, size); + } + +private: + std::shared_ptr trace_log; + ThreadFromGlobalPool thread; + + void run(); + void stop(); +}; + +} diff --git a/src/Interpreters/TraceLog.h b/src/Interpreters/TraceLog.h index 85400560a7b..e8836955d96 100644 --- a/src/Interpreters/TraceLog.h +++ b/src/Interpreters/TraceLog.h @@ -3,8 +3,10 @@ #include #include #include +#include #include -#include +#include +#include namespace DB diff --git a/src/Interpreters/UserDefinedExecutableFunction.h b/src/Interpreters/UserDefinedExecutableFunction.h index a4fad8ceb7b..80d6b85ad90 100644 --- a/src/Interpreters/UserDefinedExecutableFunction.h +++ b/src/Interpreters/UserDefinedExecutableFunction.h @@ -33,7 +33,7 @@ public: return lifetime; } - const std::string & getLoadableName() const override + std::string getLoadableName() const override { return configuration.name; } diff --git a/src/Interpreters/ZooKeeperLog.h b/src/Interpreters/ZooKeeperLog.h index d721081fdae..284675a7ff5 100644 --- a/src/Interpreters/ZooKeeperLog.h +++ b/src/Interpreters/ZooKeeperLog.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 9770d1a988f..870e01d3b5c 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -29,6 +30,7 @@ #include #include #include +#include #include #include @@ -38,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -193,7 +196,7 @@ static void setExceptionStackTrace(QueryLogElement & elem) { /// Disable memory tracker for stack trace. /// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string. - MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global); + MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global); try { diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index da412e4941e..826589361e9 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -15,10 +15,13 @@ #include #include #include +#include #include #include #include +#include #include +#include #include #include @@ -615,7 +618,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks /// Memory should not be limited during ATTACH TABLE query. /// This is already true at the server startup but must be also ensured for manual table ATTACH. /// Motivation: memory for index is shared between queries - not belong to the query itself. - MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global); + MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global); loadUUID(); loadColumns(require_columns_checksums); diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 366834b8f09..a03cb053e1f 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -2,8 +2,9 @@ #include #include #include -#include #include +#include +#include namespace DB diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index b38a0112116..54705a3c405 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -5516,6 +5516,9 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge { LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size()); + const auto settings = getSettings(); + + bool result = true; for (const auto & moving_part : moving_tagger->parts_to_move) { Stopwatch stopwatch; @@ -5535,8 +5538,41 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge try { - cloned_part = parts_mover.clonePart(moving_part); - parts_mover.swapClonedPart(cloned_part); + /// If zero-copy replication enabled than replicas shouldn't try to + /// move parts to another disk simultaneously. For this purpose we + /// use shared lock across replicas. NOTE: it's not 100% reliable, + /// because we are not checking lock while finishing part move. + /// However it's not dangerous at all, we will just have very rare + /// copies of some part. + /// + /// FIXME: this code is related to Replicated merge tree, and not + /// common for ordinary merge tree. So it's a bad design and should + /// be fixed. + auto disk = moving_part.reserved_space->getDisk(); + if (supportsReplication() && disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication) + { + /// If we acuqired lock than let's try to move. After one + /// replica will actually move the part from disk to some + /// zero-copy storage other replicas will just fetch + /// metainformation. + if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part, disk); lock) + { + cloned_part = parts_mover.clonePart(moving_part); + parts_mover.swapClonedPart(cloned_part); + } + else + { + /// Move will be retried but with backoff. + LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name); + result = false; + continue; + } + } + else /// Ordinary move as it should be + { + cloned_part = parts_mover.clonePart(moving_part); + parts_mover.swapClonedPart(cloned_part); + } write_part_log({}); } catch (...) @@ -5548,7 +5584,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge throw; } } - return true; + return result; } bool MergeTreeData::partsContainSameProjections(const DataPartPtr & left, const DataPartPtr & right) diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index f1d0abffc7a..4c58a53f368 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -27,6 +27,8 @@ #include #include #include +#include + #include #include @@ -43,6 +45,7 @@ class MergeTreeDataMergerMutator; class MutationCommands; class Context; struct JobAndPool; +struct ZeroCopyLock; /// Auxiliary struct holding information about the future merged or mutated part. struct EmergingPartInfo @@ -1189,6 +1192,10 @@ private: DataPartsVector & duplicate_parts_to_remove, MutableDataPartsVector & parts_from_wal, DataPartsLock & part_lock); + + /// Create zero-copy exclusive lock for part and disk. Useful for coordination of + /// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree. + virtual std::optional tryCreateZeroCopyExclusiveLock(const DataPartPtr &, const DiskPtr &) { return std::nullopt; } }; /// RAII struct to record big parts that are submerging or emerging. diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index 03ae6688beb..2e299bb2447 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -1,4 +1,5 @@ #include +#include #include @@ -184,7 +185,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc * And otherwise it will look like excessively growing memory consumption in context of query. * (observed in long INSERT SELECTs) */ - MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; + MemoryTrackerBlockerInThread temporarily_disable_memory_tracker; /// Write index. The index contains Primary Key value for each `index_granularity` row. for (const auto & granule : granules_to_write) diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index 446ae9b97a1..e7ead4dc8bb 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -47,7 +48,7 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() { /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. - MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; + MemoryTrackerBlockerInThread temporarily_disable_memory_tracker; size_t file_size = disk->getFileSize(mrk_path); size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark); diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp index 5a889ea5e8b..190fc0d30a0 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -200,7 +200,7 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt auto settings = data->getSettings(); auto part = moving_part.part; auto disk = moving_part.reserved_space->getDisk(); - LOG_DEBUG(log, "Cloning part {} from {} to {}", part->name, part->volume->getDisk()->getName(), disk->getName()); + LOG_DEBUG(log, "Cloning part {} from '{}' to '{}'", part->name, part->volume->getDisk()->getName(), disk->getName()); const String directory_to_move = "moving"; if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 1432728d00a..4d24f491551 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1,13 +1,14 @@ #include #include -#include -#include #include #include #include #include +#include +#include #include #include +#include namespace DB diff --git a/src/Storages/MergeTree/ZeroCopyLock.cpp b/src/Storages/MergeTree/ZeroCopyLock.cpp new file mode 100644 index 00000000000..dbb12d0d610 --- /dev/null +++ b/src/Storages/MergeTree/ZeroCopyLock.cpp @@ -0,0 +1,9 @@ +#include "ZeroCopyLock.h" + +namespace DB +{ + ZeroCopyLock::ZeroCopyLock(const zkutil::ZooKeeperPtr & zookeeper, const std::string & lock_path) + : lock(zkutil::createSimpleZooKeeperLock(zookeeper, lock_path, "part_exclusive_lock", "")) + { + } +} diff --git a/src/Storages/MergeTree/ZeroCopyLock.h b/src/Storages/MergeTree/ZeroCopyLock.h new file mode 100644 index 00000000000..96709fb01c9 --- /dev/null +++ b/src/Storages/MergeTree/ZeroCopyLock.h @@ -0,0 +1,21 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace DB +{ + +/// Very simple wrapper for zookeeper ephemeral lock. It's better to have it +/// because due to bad abstraction we use it in MergeTreeData. +struct ZeroCopyLock +{ + ZeroCopyLock(const zkutil::ZooKeeperPtr & zookeeper, const std::string & lock_path); + + /// Actual lock + std::unique_ptr lock; +}; + +} diff --git a/src/Storages/ProjectionsDescription.cpp b/src/Storages/ProjectionsDescription.cpp index 791583e2495..e13895e60f1 100644 --- a/src/Storages/ProjectionsDescription.cpp +++ b/src/Storages/ProjectionsDescription.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 0cc401aa93c..29772a14752 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include #include @@ -467,7 +467,7 @@ static void appendBlock(const Block & from, Block & to) MutableColumnPtr last_col; try { - MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; + MemoryTrackerBlockerInThread temporarily_disable_memory_tracker; if (to.rows() == 0) { @@ -496,7 +496,7 @@ static void appendBlock(const Block & from, Block & to) /// In case of rollback, it is better to ignore memory limits instead of abnormal server termination. /// So ignore any memory limits, even global (since memory tracking has drift). - MemoryTracker::BlockerInThread temporarily_ignore_any_memory_limits(VariableContext::Global); + MemoryTrackerBlockerInThread temporarily_ignore_any_memory_limits(VariableContext::Global); try { @@ -924,7 +924,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl } auto destination_metadata_snapshot = table->getInMemoryMetadataPtr(); - MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; + MemoryTrackerBlockerInThread temporarily_disable_memory_tracker; auto insert = std::make_shared(); insert->table_id = destination_id; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 6f165dfb4a5..bcb12cc86b0 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -44,6 +45,7 @@ #include #include #include +#include #include #include #include @@ -55,6 +57,7 @@ #include #include +#include #include #include #include diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 37cb238ba0f..72851472b79 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -11,6 +11,7 @@ #include #include #include +#include namespace DB diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 11815d9ceef..90440ed084f 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 91a9c8567ba..a23859e7b5e 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -31,17 +31,20 @@ #include #include #include +#include #include #include +#include #include #include #include #include #include #include +#include #include #include @@ -7128,11 +7131,11 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par } -bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String & table_uuid, const String & part_name, +bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings, Poco::Logger * logger, const String & zookeeper_path_old) { - boost::replace_all(id, "/", "_"); + boost::replace_all(part_id, "/", "_"); Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk->getType(), table_uuid, part_name, zookeeper_path_old); @@ -7140,13 +7143,16 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String & for (const auto & zc_zookeeper_path : zc_zookeeper_paths) { - String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / id; - String zookeeper_node = fs::path(zookeeper_part_uniq_node) / replica_name_; + String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / part_id; - LOG_TRACE(logger, "Remove zookeeper lock {}", zookeeper_node); + /// Delete our replica node for part from zookeeper (we are not interested in it anymore) + String zookeeper_part_replica_node = fs::path(zookeeper_part_uniq_node) / replica_name_; - zookeeper_ptr->tryRemove(zookeeper_node); + LOG_TRACE(logger, "Remove zookeeper lock {}", zookeeper_part_replica_node); + zookeeper_ptr->tryRemove(zookeeper_part_replica_node); + + /// Check, maybe we were the last replica and can remove part forever Strings children; zookeeper_ptr->tryGetChildren(zookeeper_part_uniq_node, children); @@ -7157,9 +7163,9 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String & continue; } - auto e = zookeeper_ptr->tryRemove(zookeeper_part_uniq_node); + auto error_code = zookeeper_ptr->tryRemove(zookeeper_part_uniq_node); - LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_uniq_node, e != Coordination::Error::ZNOTEMPTY); + LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_uniq_node, error_code != Coordination::Error::ZNOTEMPTY); /// Even when we have lock with same part name, but with different uniq, we can remove files on S3 children.clear(); @@ -7168,9 +7174,9 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String & if (children.empty()) { /// Cleanup after last uniq removing - e = zookeeper_ptr->tryRemove(zookeeper_part_node); + error_code = zookeeper_ptr->tryRemove(zookeeper_part_node); - LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_node, e != Coordination::Error::ZNOTEMPTY); + LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_node, error_code != Coordination::Error::ZNOTEMPTY); } else { @@ -7213,7 +7219,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica( zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper(); if (!zookeeper) - return best_replica; + return ""; Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk_type, getTableSharedID(), part.name, zookeeper_path); @@ -7251,7 +7257,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica( LOG_TRACE(log, "Found zookeper active replicas for part {}: {}", part.name, active_replicas.size()); if (active_replicas.empty()) - return best_replica; + return ""; /** You must select the best (most relevant) replica. * This is a replica with the maximum `log_pointer`, then with the minimum `queue` size. @@ -7305,6 +7311,30 @@ Strings StorageReplicatedMergeTree::getZeroCopyPartPath(const MergeTreeSettings } +std::optional StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const DataPartPtr & part, const DiskPtr & disk) +{ + if (!disk || !disk->supportZeroCopyReplication()) + return std::nullopt; + + zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper(); + if (!zookeeper) + return std::nullopt; + + String zc_zookeeper_path = getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(), + part->name, zookeeper_path)[0]; + + /// Just recursively create ancestors for lock + zookeeper->createAncestors(zc_zookeeper_path); + zookeeper->createIfNotExists(zc_zookeeper_path, ""); + + /// Create actual lock + ZeroCopyLock lock(zookeeper, zc_zookeeper_path); + if (lock.lock->tryLock()) + return lock; + else + return std::nullopt; +} + String StorageReplicatedMergeTree::findReplicaHavingPart( const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_ptr) { diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index e390a0bcea4..c91152ca0f3 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -243,7 +243,7 @@ public: /// Unlock shared data part in zookeeper by part id /// Return true if data unlocked /// Return false if data is still used by another node - static bool unlockSharedDataByID(String id, const String & table_uuid, const String & part_name, const String & replica_name_, + static bool unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger, const String & zookeeper_path_old); @@ -758,6 +758,10 @@ private: // Create table id if needed void createTableSharedID(); + /// Create ephemeral lock in zookeeper for part and disk which support zero copy replication. + /// If somebody already holding the lock -- return std::nullopt. + std::optional tryCreateZeroCopyExclusiveLock(const DataPartPtr & part, const DiskPtr & disk) override; + protected: /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table. */ diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index a81a5a9649a..96943a886c1 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -9,6 +9,8 @@ #include #include #include +#include +#include #include #include #include @@ -21,7 +23,9 @@ #include #include #include +#include #include +#include #include #include #include diff --git a/tests/integration/test_s3_zero_copy_ttl/__init__.py b/tests/integration/test_s3_zero_copy_ttl/__init__.py new file mode 100644 index 00000000000..e5a0d9b4834 --- /dev/null +++ b/tests/integration/test_s3_zero_copy_ttl/__init__.py @@ -0,0 +1 @@ +#!/usr/bin/env python3 diff --git a/tests/integration/test_s3_zero_copy_ttl/configs/s3.xml b/tests/integration/test_s3_zero_copy_ttl/configs/s3.xml new file mode 100644 index 00000000000..c4889186e38 --- /dev/null +++ b/tests/integration/test_s3_zero_copy_ttl/configs/s3.xml @@ -0,0 +1,26 @@ + + + + + s3 + http://minio1:9001/root/data/ + minio + minio123 + + + + + + +
+ default +
+ + s3_disk + +
+
+
+ +
+
diff --git a/tests/integration/test_s3_zero_copy_ttl/test.py b/tests/integration/test_s3_zero_copy_ttl/test.py new file mode 100644 index 00000000000..5f63bfbfdff --- /dev/null +++ b/tests/integration/test_s3_zero_copy_ttl/test.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +import time + +import pytest +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance("node1", main_configs=["configs/s3.xml"], with_minio=True, with_zookeeper=True) +node2 = cluster.add_instance("node2", main_configs=["configs/s3.xml"], with_minio=True, with_zookeeper=True) +node3 = cluster.add_instance("node3", main_configs=["configs/s3.xml"], with_minio=True, with_zookeeper=True) + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + finally: + cluster.shutdown() + +def test_ttl_move_and_s3(started_cluster): + for i, node in enumerate([node1, node2, node3]): + node.query( + """ + CREATE TABLE s3_test_with_ttl (date DateTime, id UInt32, value String) + ENGINE=ReplicatedMergeTree('/clickhouse/tables/s3_test', '{}') + ORDER BY id + PARTITION BY id + TTL date TO DISK 's3_disk' + SETTINGS storage_policy='s3_and_default' + """.format(i)) + + node1.query("SYSTEM STOP MOVES s3_test_with_ttl") + + node2.query("SYSTEM STOP MOVES s3_test_with_ttl") + + for i in range(30): + if i % 2 == 0: + node = node1 + else: + node = node2 + + node.query(f"INSERT INTO s3_test_with_ttl SELECT now() + 5, {i}, randomPrintableASCII(1048570)") + + node1.query("SYSTEM SYNC REPLICA s3_test_with_ttl") + node2.query("SYSTEM SYNC REPLICA s3_test_with_ttl") + node3.query("SYSTEM SYNC REPLICA s3_test_with_ttl") + + assert node1.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n" + assert node2.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n" + + node1.query("SYSTEM START MOVES s3_test_with_ttl") + node2.query("SYSTEM START MOVES s3_test_with_ttl") + + assert node1.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n" + assert node2.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n" + + time.sleep(5) + + print(node1.query("SELECT * FROM system.parts WHERE table = 's3_test_with_ttl' FORMAT Vertical")) + + minio = cluster.minio_client + objects = minio.list_objects(cluster.minio_bucket, 'data/', recursive=True) + counter = 0 + for obj in objects: + print("Objectname:", obj.object_name, "metadata:", obj.metadata) + counter += 1 + print("Total objects", counter) + assert counter == 300 diff --git a/tests/queries/0_stateless/01603_decimal_mult_float.reference b/tests/queries/0_stateless/01603_decimal_mult_float.reference index ee1eeb525ba..c2917516e99 100644 --- a/tests/queries/0_stateless/01603_decimal_mult_float.reference +++ b/tests/queries/0_stateless/01603_decimal_mult_float.reference @@ -5,10 +5,10 @@ 7.775900000000001 56.62269 598.8376688440277 -299.41883723437786 +299.41883695311844 0.7485470860550345 -2.245641373854596 +2.2456412771483882 1.641386318314034 1.641386318314034 -1.641386334333447 -1.641386334333447 +1.6413863258732018 +1.6413863258732018 diff --git a/tests/queries/0_stateless/02155_binary_op_between_float_and_decimal.reference b/tests/queries/0_stateless/02155_binary_op_between_float_and_decimal.reference new file mode 100644 index 00000000000..a024d51e285 --- /dev/null +++ b/tests/queries/0_stateless/02155_binary_op_between_float_and_decimal.reference @@ -0,0 +1,215 @@ +3 +0 +2.25 +1 +3 +0 +2.25 +1 +inf +1 +1.5 + +plus +4.5 -3.5 1 +2.5 -2.5 0 +-4.5 2.5 -2 +-2.5 3.5 1 +-4.5 -3.5 -8 +-2.5 -2.5 -5 +4.5 2.5 7 +2.5 3.5 6 +45.5 -3.5 42 +25.5 -2.5 23 +-45.5 2.5 -43 +-25.5 3.5 -22 +-45.5 -3.5 -49 +-25.5 -2.5 -27.999999999999996 +45.5 2.5 48 +25.5 3.5 28.999999999999996 +-4.5 -3.5 -8 +-2.5 -2.5 -5 +4.5 2.5 7 +2.5 3.5 6 +4.5 -3.5 1 +2.5 -2.5 0 +-4.5 2.5 -2 +-2.5 3.5 1 +-45.5 -3.5 -49 +-25.5 -2.5 -28 +45.5 2.5 48 +25.5 3.5 29 +45.5 -3.5 42 +25.5 -2.5 22.999999999999996 +-45.5 2.5 -43 +-25.5 3.5 -21.999999999999996 + +minus +4.5 -3.5 8 +2.5 -2.5 5 +-4.5 2.5 -7 +-2.5 3.5 -6 +-4.5 -3.5 -1 +-2.5 -2.5 0 +4.5 2.5 2 +2.5 3.5 -1 +45.5 -3.5 49 +25.5 -2.5 28 +-45.5 2.5 -48 +-25.5 3.5 -29 +-45.5 -3.5 -42 +-25.5 -2.5 -22.999999999999996 +45.5 2.5 43 +25.5 3.5 21.999999999999996 +-4.5 -3.5 -1 +-2.5 -2.5 0 +4.5 2.5 2 +2.5 3.5 -1 +4.5 -3.5 8 +2.5 -2.5 5 +-4.5 2.5 -7 +-2.5 3.5 -6 +-45.5 -3.5 -42 +-25.5 -2.5 -23 +45.5 2.5 43 +25.5 3.5 22 +45.5 -3.5 49 +25.5 -2.5 27.999999999999996 +-45.5 2.5 -48 +-25.5 3.5 -28.999999999999996 + +multiply +4.5 -3.5 -15.75 +2.5 -2.5 -6.25 +-4.5 2.5 -11.25 +-2.5 3.5 -8.75 +-4.5 -3.5 15.75 +-2.5 -2.5 6.25 +4.5 2.5 11.25 +2.5 3.5 8.75 +45.5 -3.5 -159.25 +25.5 -2.5 -63.75 +-45.5 2.5 -113.75 +-25.5 3.5 -89.25 +-45.5 -3.5 159.25 +-25.5 -2.5 63.74999999999999 +45.5 2.5 113.75 +25.5 3.5 89.24999999999999 +-4.5 -3.5 15.75 +-2.5 -2.5 6.25 +4.5 2.5 11.25 +2.5 3.5 8.75 +4.5 -3.5 -15.75 +2.5 -2.5 -6.25 +-4.5 2.5 -11.25 +-2.5 3.5 -8.75 +-45.5 -3.5 159.25 +-25.5 -2.5 63.75 +45.5 2.5 113.75 +25.5 3.5 89.25 +45.5 -3.5 -159.25 +25.5 -2.5 -63.74999999999999 +-45.5 2.5 -113.75 +-25.5 3.5 -89.24999999999999 + +division +4.5 -3.5 -1.2857142857142858 +2.5 -2.5 -1 +-4.5 2.5 -1.8 +-2.5 3.5 -0.7142857142857143 +-4.5 -3.5 1.2857142857142858 +-2.5 -2.5 1 +4.5 2.5 1.8 +2.5 3.5 0.7142857142857143 +45.5 -3.5 -13 +25.5 -2.5 -10.2 +-45.5 2.5 -18.2 +-25.5 3.5 -7.285714285714286 +-45.5 -3.5 13 +-25.5 -2.5 10.2 +45.5 2.5 18.2 +25.5 3.5 7.285714285714285 +-4.5 -3.5 1.2857142857142858 +-2.5 -2.5 1 +4.5 2.5 1.8 +2.5 3.5 0.7142857142857143 +4.5 -3.5 -1.2857142857142858 +2.5 -2.5 -1 +-4.5 2.5 -1.8 +-2.5 3.5 -0.7142857142857143 +-45.5 -3.5 13 +-25.5 -2.5 10.2 +45.5 2.5 18.2 +25.5 3.5 7.285714285714286 +45.5 -3.5 -13 +25.5 -2.5 -10.2 +-45.5 2.5 -18.2 +-25.5 3.5 -7.285714285714285 + +least +4.5 -3.5 -3.5 +2.5 -2.5 -2.5 +-4.5 2.5 -4.5 +-2.5 3.5 -2.5 +-4.5 -3.5 -4.5 +-2.5 -2.5 -2.5 +4.5 2.5 2.5 +2.5 3.5 2.5 +45.5 -3.5 -3.5 +25.5 -2.5 -2.5 +-45.5 2.5 -45.5 +-25.5 3.5 -25.5 +-45.5 -3.5 -45.5 +-25.5 -2.5 -25.499999999999996 +45.5 2.5 2.5 +25.5 3.5 3.5 +-4.5 -3.5 -4.5 +-2.5 -2.5 -2.5 +4.5 2.5 2.5 +2.5 3.5 2.5 +4.5 -3.5 -3.5 +2.5 -2.5 -2.5 +-4.5 2.5 -4.5 +-2.5 3.5 -2.5 +-45.5 -3.5 -45.5 +-25.5 -2.5 -25.5 +45.5 2.5 2.5 +25.5 3.5 3.5 +45.5 -3.5 -3.5 +25.5 -2.5 -2.5 +-45.5 2.5 -45.5 +-25.5 3.5 -25.499999999999996 + +greatest +4.5 -3.5 4.5 +2.5 -2.5 2.5 +-4.5 2.5 2.5 +-2.5 3.5 3.5 +-4.5 -3.5 -3.5 +-2.5 -2.5 -2.5 +4.5 2.5 4.5 +2.5 3.5 3.5 +45.5 -3.5 45.5 +25.5 -2.5 25.5 +-45.5 2.5 2.5 +-25.5 3.5 3.5 +-45.5 -3.5 -3.5 +-25.5 -2.5 -2.5 +45.5 2.5 45.5 +25.5 3.5 25.499999999999996 +-4.5 -3.5 -3.5 +-2.5 -2.5 -2.5 +4.5 2.5 4.5 +2.5 3.5 3.5 +4.5 -3.5 4.5 +2.5 -2.5 2.5 +-4.5 2.5 2.5 +-2.5 3.5 3.5 +-45.5 -3.5 -3.5 +-25.5 -2.5 -2.5 +45.5 2.5 45.5 +25.5 3.5 25.5 +45.5 -3.5 45.5 +25.5 -2.5 25.499999999999996 +-45.5 2.5 2.5 +-25.5 3.5 3.5 diff --git a/tests/queries/0_stateless/02155_binary_op_between_float_and_decimal.sql b/tests/queries/0_stateless/02155_binary_op_between_float_and_decimal.sql new file mode 100644 index 00000000000..2e8ac32462e --- /dev/null +++ b/tests/queries/0_stateless/02155_binary_op_between_float_and_decimal.sql @@ -0,0 +1,96 @@ +SELECT 1.5::Decimal32(5) + 1.5; +SELECT 1.5::Decimal32(5) - 1.5; +SELECT 1.5::Decimal32(5) * 1.5; +SELECT 1.5::Decimal32(5) / 1.5; + +SELECT 1.5 + 1.5::Decimal32(5); +SELECT 1.5 - 1.5::Decimal32(5); +SELECT 1.5 * 1.5::Decimal32(5); +SELECT 1.5 / 1.5::Decimal32(5); + +SELECT 1.0::Decimal32(5) / 0.0; + +SELECT least(1.5, 1.0::Decimal32(5)); +SELECT greatest(1.5, 1.0::Decimal32(5)); + +DROP TABLE IF EXISTS t; +CREATE TABLE t(d1 Decimal32(5), d2 Decimal64(10), d3 Decimal128(20), d4 Decimal256(40), f1 Float32, f2 Float64) ENGINE=Memory; + +INSERT INTO t values (-4.5, 4.5, -45.5, 45.5, 2.5, -3.5); +INSERT INTO t values (4.5, -4.5, 45.5, -45.5, -3.5, 2.5); +INSERT INTO t values (2.5, -2.5, 25.5, -25.5, -2.5, 3.5); +INSERT INTO t values (-2.5, 2.5, -25.5, 25.5, 3.5, -2.5); + +SELECT ''; +SELECT 'plus'; +SELECT d1, f1, d1 + f1 FROM t ORDER BY f1; +SELECT d2, f1, d2 + f1 FROM t ORDER BY f1; +SELECT d3, f1, d3 + f1 FROM t ORDER BY f1; +SELECT d4, f1, d4 + f1 FROM t ORDER BY f1; + +SELECT d1, f2, d1 + f2 FROM t ORDER BY f2; +SELECT d2, f2, d2 + f2 FROM t ORDER BY f2; +SELECT d3, f2, d3 + f2 FROM t ORDER BY f2; +SELECT d4, f2, d4 + f2 FROM t ORDER BY f2; + +SELECT ''; +SELECT 'minus'; +SELECT d1, f1, d1 - f1 FROM t ORDER BY f1; +SELECT d2, f1, d2 - f1 FROM t ORDER BY f1; +SELECT d3, f1, d3 - f1 FROM t ORDER BY f1; +SELECT d4, f1, d4 - f1 FROM t ORDER BY f1; + +SELECT d1, f2, d1 - f2 FROM t ORDER BY f2; +SELECT d2, f2, d2 - f2 FROM t ORDER BY f2; +SELECT d3, f2, d3 - f2 FROM t ORDER BY f2; +SELECT d4, f2, d4 - f2 FROM t ORDER BY f2; + +SELECT ''; +SELECT 'multiply'; +SELECT d1, f1, d1 * f1 FROM t ORDER BY f1; +SELECT d2, f1, d2 * f1 FROM t ORDER BY f1; +SELECT d3, f1, d3 * f1 FROM t ORDER BY f1; +SELECT d4, f1, d4 * f1 FROM t ORDER BY f1; + +SELECT d1, f2, d1 * f2 FROM t ORDER BY f2; +SELECT d2, f2, d2 * f2 FROM t ORDER BY f2; +SELECT d3, f2, d3 * f2 FROM t ORDER BY f2; +SELECT d4, f2, d4 * f2 FROM t ORDER BY f2; + +SELECT ''; +SELECT 'division'; +SELECT d1, f1, d1 / f1 FROM t ORDER BY f1; +SELECT d2, f1, d2 / f1 FROM t ORDER BY f1; +SELECT d3, f1, d3 / f1 FROM t ORDER BY f1; +SELECT d4, f1, d4 / f1 FROM t ORDER BY f1; + +SELECT d1, f2, d1 / f2 FROM t ORDER BY f2; +SELECT d2, f2, d2 / f2 FROM t ORDER BY f2; +SELECT d3, f2, d3 / f2 FROM t ORDER BY f2; +SELECT d4, f2, d4 / f2 FROM t ORDER BY f2; + +SELECT ''; +SELECT 'least'; +SELECT d1, f1, least(d1, f1) FROM t ORDER BY f1; +SELECT d2, f1, least(d2, f1) FROM t ORDER BY f1; +SELECT d3, f1, least(d3, f1) FROM t ORDER BY f1; +SELECT d4, f1, least(d4, f1) FROM t ORDER BY f1; + +SELECT d1, f2, least(d1, f2) FROM t ORDER BY f2; +SELECT d2, f2, least(d2, f2) FROM t ORDER BY f2; +SELECT d3, f2, least(d3, f2) FROM t ORDER BY f2; +SELECT d4, f2, least(d4, f2) FROM t ORDER BY f2; + +SELECT ''; +SELECT 'greatest'; +SELECT d1, f1, greatest(d1, f1) FROM t ORDER BY f1; +SELECT d2, f1, greatest(d2, f1) FROM t ORDER BY f1; +SELECT d3, f1, greatest(d3, f1) FROM t ORDER BY f1; +SELECT d4, f1, greatest(d4, f1) FROM t ORDER BY f1; + +SELECT d1, f2, greatest(d1, f2) FROM t ORDER BY f2; +SELECT d2, f2, greatest(d2, f2) FROM t ORDER BY f2; +SELECT d3, f2, greatest(d3, f2) FROM t ORDER BY f2; +SELECT d4, f2, greatest(d4, f2) FROM t ORDER BY f2; + +DROP TABLE t; diff --git a/utils/config-processor/CMakeLists.txt b/utils/config-processor/CMakeLists.txt index a378d66a3d3..76c10b5f2fd 100644 --- a/utils/config-processor/CMakeLists.txt +++ b/utils/config-processor/CMakeLists.txt @@ -1,2 +1,2 @@ add_executable (config-processor config-processor.cpp) -target_link_libraries(config-processor PRIVATE clickhouse_common_config) +target_link_libraries(config-processor PRIVATE clickhouse_common_config_no_zookeeper_log) diff --git a/utils/keeper-bench/CMakeLists.txt b/utils/keeper-bench/CMakeLists.txt index 2f12194d1b7..cd65152db87 100644 --- a/utils/keeper-bench/CMakeLists.txt +++ b/utils/keeper-bench/CMakeLists.txt @@ -1,2 +1,2 @@ add_executable(keeper-bench Generator.cpp Runner.cpp Stats.cpp main.cpp) -target_link_libraries(keeper-bench PRIVATE clickhouse_common_zookeeper) +target_link_libraries(keeper-bench PRIVATE clickhouse_common_zookeeper_no_log) diff --git a/utils/zookeeper-cli/CMakeLists.txt b/utils/zookeeper-cli/CMakeLists.txt index 2199a1b38ff..0258b0d695c 100644 --- a/utils/zookeeper-cli/CMakeLists.txt +++ b/utils/zookeeper-cli/CMakeLists.txt @@ -1,2 +1,2 @@ add_executable(clickhouse-zookeeper-cli zookeeper-cli.cpp) -target_link_libraries(clickhouse-zookeeper-cli PRIVATE clickhouse_common_zookeeper) +target_link_libraries(clickhouse-zookeeper-cli PRIVATE clickhouse_common_zookeeper_no_log) diff --git a/utils/zookeeper-dump-tree/CMakeLists.txt b/utils/zookeeper-dump-tree/CMakeLists.txt index 9f5da351068..db86c7005b2 100644 --- a/utils/zookeeper-dump-tree/CMakeLists.txt +++ b/utils/zookeeper-dump-tree/CMakeLists.txt @@ -1,2 +1,2 @@ add_executable (zookeeper-dump-tree main.cpp ${SRCS}) -target_link_libraries(zookeeper-dump-tree PRIVATE clickhouse_common_zookeeper clickhouse_common_io boost::program_options) +target_link_libraries(zookeeper-dump-tree PRIVATE clickhouse_common_zookeeper_no_log clickhouse_common_io boost::program_options) diff --git a/utils/zookeeper-remove-by-list/CMakeLists.txt b/utils/zookeeper-remove-by-list/CMakeLists.txt index c31b1ec3388..f24b794c629 100644 --- a/utils/zookeeper-remove-by-list/CMakeLists.txt +++ b/utils/zookeeper-remove-by-list/CMakeLists.txt @@ -1,2 +1,2 @@ add_executable (zookeeper-remove-by-list main.cpp ${SRCS}) -target_link_libraries(zookeeper-remove-by-list PRIVATE clickhouse_common_zookeeper boost::program_options) +target_link_libraries(zookeeper-remove-by-list PRIVATE clickhouse_common_zookeeper_no_log boost::program_options) diff --git a/utils/zookeeper-test/CMakeLists.txt b/utils/zookeeper-test/CMakeLists.txt index 56a1d3e380b..f99539fed79 100644 --- a/utils/zookeeper-test/CMakeLists.txt +++ b/utils/zookeeper-test/CMakeLists.txt @@ -1,2 +1,2 @@ add_executable(zk-test main.cpp) -target_link_libraries(zk-test PRIVATE clickhouse_common_zookeeper) +target_link_libraries(zk-test PRIVATE clickhouse_common_zookeeper_no_log)