diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 735dcf91a36..c6fdb308162 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -190,8 +190,9 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded) { bool need_to_throw = true; - if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed); overcommit_tracker_ptr != nullptr && query_tracker != nullptr) - need_to_throw = overcommit_tracker_ptr->needToStopQuery(query_tracker, size); + bool try_to_free_memory = overcommit_tracker != nullptr && query_tracker != nullptr; + if (try_to_free_memory) + need_to_throw = overcommit_tracker->needToStopQuery(query_tracker); if (need_to_throw) { @@ -210,9 +211,6 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT } else { - // If OvercommitTracker::needToStopQuery returned false, it guarantees that enough memory is freed. - // This memory is already counted in variable `amount` in the moment of `will_be` initialization. - // Now we just need to update value stored in `will_be`, because it should have changed. will_be = amount.load(std::memory_order_relaxed); } } @@ -310,8 +308,6 @@ void MemoryTracker::free(Int64 size) accounted_size += new_amount; } } - if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed); overcommit_tracker_ptr) - overcommit_tracker_ptr->tryContinueQueryExecutionAfterFree(accounted_size); if (auto * loaded_next = parent.load(std::memory_order_relaxed)) loaded_next->free(size); diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 73af2ab8857..dbe0e7d08b3 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -73,7 +73,7 @@ private: /// This description will be used as prefix into log messages (if isn't nullptr) std::atomic description_ptr = nullptr; - std::atomic overcommit_tracker = nullptr; + OvercommitTracker * overcommit_tracker = nullptr; bool updatePeak(Int64 will_be, bool log_memory_usage); void logMemoryUsage(Int64 current) const; @@ -188,18 +188,13 @@ public: void setOvercommitTracker(OvercommitTracker * tracker) noexcept { - overcommit_tracker.store(tracker, std::memory_order_relaxed); - } - - void resetOvercommitTracker() noexcept - { - overcommit_tracker.store(nullptr, std::memory_order_relaxed); + overcommit_tracker = tracker; } /// Reset the accumulated data void resetCounters(); - /// Reset the accumulated data. + /// Reset the accumulated data and the parent. void reset(); /// Reset current counter to a new value. diff --git a/src/Common/OvercommitTracker.cpp b/src/Common/OvercommitTracker.cpp index 8c9b8592fec..7b03b9f271d 100644 --- a/src/Common/OvercommitTracker.cpp +++ b/src/Common/OvercommitTracker.cpp @@ -11,11 +11,8 @@ constexpr std::chrono::microseconds ZERO_MICROSEC = 0us; OvercommitTracker::OvercommitTracker(std::mutex & global_mutex_) : max_wait_time(ZERO_MICROSEC) , picked_tracker(nullptr) - , cancellation_state(QueryCancellationState::NONE) + , cancelation_state(QueryCancelationState::NONE) , global_mutex(global_mutex_) - , freed_memory(0) - , required_memory(0) - , allow_release(true) {} void OvercommitTracker::setMaxWaitTime(UInt64 wait_time) @@ -24,12 +21,12 @@ void OvercommitTracker::setMaxWaitTime(UInt64 wait_time) max_wait_time = wait_time * 1us; } -bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount) +bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker) { // NOTE: Do not change the order of locks // // global_mutex must be acquired before overcommit_m, because - // method OvercommitTracker::onQueryStop(MemoryTracker *) is + // method OvercommitTracker::unsubscribe(MemoryTracker *) is // always called with already acquired global_mutex in // ProcessListEntry::~ProcessListEntry(). std::unique_lock global_lock(global_mutex); @@ -39,82 +36,42 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount) return true; pickQueryToExclude(); - assert(cancellation_state != QueryCancellationState::NONE); + assert(cancelation_state == QueryCancelationState::RUNNING); global_lock.unlock(); // If no query was chosen we need to stop current query. // This may happen if no soft limit is set. if (picked_tracker == nullptr) { - assert(cancellation_state == QueryCancellationState::SELECTED); - cancellation_state = QueryCancellationState::NONE; + cancelation_state = QueryCancelationState::NONE; return true; } if (picked_tracker == tracker) - { - assert(cancellation_state == QueryCancellationState::SELECTED); - cancellation_state = QueryCancellationState::RUNNING; return true; - } - - allow_release = true; - - required_memory += amount; - required_per_thread[tracker] = amount; - bool timeout = !cv.wait_for(lk, max_wait_time, [this, tracker]() + bool timeout = !cv.wait_for(lk, max_wait_time, [this]() { - return required_per_thread[tracker] == 0 || cancellation_state == QueryCancellationState::NONE; + return cancelation_state == QueryCancelationState::NONE; }); - LOG_DEBUG(getLogger(), "Memory was{} freed within timeout", (timeout ? " not" : "")); - - required_memory -= amount; - Int64 still_need = required_per_thread[tracker]; // If enough memory is freed it will be 0 - required_per_thread.erase(tracker); - - // If threads where not released since last call of this method, - // we can release them now. - if (allow_release && required_memory <= freed_memory && still_need != 0) - releaseThreads(); - - // All required amount of memory is free now and selected query to stop doesn't know about it. - // As we don't need to free memory, we can continue execution of the selected query. - if (required_memory == 0 && cancellation_state == QueryCancellationState::SELECTED) - reset(); - return timeout || still_need != 0; + if (timeout) + LOG_DEBUG(getLogger(), "Need to stop query because reached waiting timeout"); + else + LOG_DEBUG(getLogger(), "Memory freed within timeout"); + return timeout; } -void OvercommitTracker::tryContinueQueryExecutionAfterFree(Int64 amount) -{ - std::lock_guard guard(overcommit_m); - if (cancellation_state != QueryCancellationState::NONE) - { - freed_memory += amount; - if (freed_memory >= required_memory) - releaseThreads(); - } -} - -void OvercommitTracker::onQueryStop(MemoryTracker * tracker) +void OvercommitTracker::unsubscribe(MemoryTracker * tracker) { std::unique_lock lk(overcommit_m); if (picked_tracker == tracker) { LOG_DEBUG(getLogger(), "Picked query stopped"); - reset(); + picked_tracker = nullptr; + cancelation_state = QueryCancelationState::NONE; cv.notify_all(); } } -void OvercommitTracker::releaseThreads() -{ - for (auto & required : required_per_thread) - required.second = 0; - freed_memory = 0; - allow_release = false; // To avoid repeating call of this method in OvercommitTracker::needToStopQuery - cv.notify_all(); -} - UserOvercommitTracker::UserOvercommitTracker(DB::ProcessList * process_list, DB::ProcessListForUser * user_process_list_) : OvercommitTracker(process_list->mutex) , user_process_list(user_process_list_) diff --git a/src/Common/OvercommitTracker.h b/src/Common/OvercommitTracker.h index b4de6074bbb..35997265ebc 100644 --- a/src/Common/OvercommitTracker.h +++ b/src/Common/OvercommitTracker.h @@ -34,13 +34,6 @@ struct OvercommitRatio class MemoryTracker; -enum class QueryCancellationState -{ - NONE = 0, // Hard limit is not reached, there is no selected query to kill. - SELECTED = 1, // Hard limit is reached, query to stop was chosen but it still is not aware of cancellation. - RUNNING = 2, // Hard limit is reached, selected query has started the process of cancellation. -}; - // Usually it's hard to set some reasonable hard memory limit // (especially, the default value). This class introduces new // mechanisim for the limiting of memory usage. @@ -52,11 +45,9 @@ struct OvercommitTracker : boost::noncopyable { void setMaxWaitTime(UInt64 wait_time); - bool needToStopQuery(MemoryTracker * tracker, Int64 amount); + bool needToStopQuery(MemoryTracker * tracker); - void tryContinueQueryExecutionAfterFree(Int64 amount); - - void onQueryStop(MemoryTracker * tracker); + void unsubscribe(MemoryTracker * tracker); virtual ~OvercommitTracker() = default; @@ -67,16 +58,23 @@ protected: // This mutex is used to disallow concurrent access // to picked_tracker and cancelation_state variables. - std::mutex overcommit_m; - std::condition_variable cv; + mutable std::mutex overcommit_m; + mutable std::condition_variable cv; std::chrono::microseconds max_wait_time; + enum class QueryCancelationState + { + NONE, + RUNNING, + }; + // Specifies memory tracker of the chosen to stop query. // If soft limit is not set, all the queries which reach hard limit must stop. // This case is represented as picked tracker pointer is set to nullptr and - // overcommit tracker is in SELECTED state. + // overcommit tracker is in RUNNING state. MemoryTracker * picked_tracker; + QueryCancelationState cancelation_state; virtual Poco::Logger * getLogger() = 0; @@ -84,37 +82,19 @@ private: void pickQueryToExclude() { - if (cancellation_state == QueryCancellationState::NONE) + if (cancelation_state != QueryCancelationState::RUNNING) { pickQueryToExcludeImpl(); - cancellation_state = QueryCancellationState::SELECTED; + cancelation_state = QueryCancelationState::RUNNING; } } - void reset() noexcept - { - picked_tracker = nullptr; - cancellation_state = QueryCancellationState::NONE; - freed_memory = 0; - allow_release = true; - } - - void releaseThreads(); - - QueryCancellationState cancellation_state; - - std::unordered_map required_per_thread; - // Global mutex which is used in ProcessList to synchronize // insertion and deletion of queries. // OvercommitTracker::pickQueryToExcludeImpl() implementations // require this mutex to be locked, because they read list (or sublist) // of queries. std::mutex & global_mutex; - Int64 freed_memory; - Int64 required_memory; - - bool allow_release; }; namespace DB @@ -130,7 +110,7 @@ struct UserOvercommitTracker : OvercommitTracker ~UserOvercommitTracker() override = default; protected: - void pickQueryToExcludeImpl() override; + void pickQueryToExcludeImpl() override final; Poco::Logger * getLogger() override final { return logger; } private: @@ -145,7 +125,7 @@ struct GlobalOvercommitTracker : OvercommitTracker ~GlobalOvercommitTracker() override = default; protected: - void pickQueryToExcludeImpl() override; + void pickQueryToExcludeImpl() override final; Poco::Logger * getLogger() override final { return logger; } private: diff --git a/src/Common/tests/gtest_overcommit_tracker.cpp b/src/Common/tests/gtest_overcommit_tracker.cpp deleted file mode 100644 index 07fadc6a337..00000000000 --- a/src/Common/tests/gtest_overcommit_tracker.cpp +++ /dev/null @@ -1,408 +0,0 @@ -#include -#include -#include - -#include -#include -#include - -using namespace DB; - -template -struct OvercommitTrackerForTest : BaseTracker -{ - template - explicit OvercommitTrackerForTest(Ts && ...args) - : BaseTracker(std::move(args)...) - {} - - void setCandidate(MemoryTracker * candidate) - { - tracker = candidate; - } - -protected: - void pickQueryToExcludeImpl() override - { - BaseTracker::picked_tracker = tracker; - } - - MemoryTracker * tracker; -}; - -using UserOvercommitTrackerForTest = OvercommitTrackerForTest; -using GlobalOvercommitTrackerForTest = OvercommitTrackerForTest; - -static constexpr UInt64 WAIT_TIME = 3'000'000; - -template -void free_not_continue_test(T & overcommit_tracker) -{ - overcommit_tracker.setMaxWaitTime(WAIT_TIME); - - static constexpr size_t THREADS = 5; - std::vector trackers(THREADS); - std::atomic need_to_stop = 0; - std::vector threads; - threads.reserve(THREADS); - - MemoryTracker picked; - overcommit_tracker.setCandidate(&picked); - - for (size_t i = 0; i < THREADS; ++i) - { - threads.push_back(std::thread( - [&, i]() - { - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - } - )); - } - - std::thread([&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(50); }).join(); - - for (auto & thread : threads) - { - thread.join(); - } - - ASSERT_EQ(need_to_stop, THREADS); -} - -TEST(OvercommitTracker, UserFreeNotContinue) -{ - ProcessList process_list; - ProcessListForUser user_process_list(&process_list); - UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list); - free_not_continue_test(user_overcommit_tracker); -} - -TEST(OvercommitTracker, GlobalFreeNotContinue) -{ - ProcessList process_list; - GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list); - free_not_continue_test(global_overcommit_tracker); -} - -template -void free_continue_test(T & overcommit_tracker) -{ - overcommit_tracker.setMaxWaitTime(WAIT_TIME); - - static constexpr size_t THREADS = 5; - std::vector trackers(THREADS); - std::atomic need_to_stop = 0; - std::vector threads; - threads.reserve(THREADS); - - MemoryTracker picked; - overcommit_tracker.setCandidate(&picked); - - for (size_t i = 0; i < THREADS; ++i) - { - threads.push_back(std::thread( - [&, i]() - { - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - } - )); - } - - std::thread([&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); }).join(); - - for (auto & thread : threads) - { - thread.join(); - } - - ASSERT_EQ(need_to_stop, 0); -} - -TEST(OvercommitTracker, UserFreeContinue) -{ - ProcessList process_list; - ProcessListForUser user_process_list(&process_list); - UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list); - free_continue_test(user_overcommit_tracker); -} - -TEST(OvercommitTracker, GlobalFreeContinue) -{ - ProcessList process_list; - GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list); - free_continue_test(global_overcommit_tracker); -} - -template -void free_continue_and_alloc_test(T & overcommit_tracker) -{ - overcommit_tracker.setMaxWaitTime(WAIT_TIME); - - static constexpr size_t THREADS = 5; - std::vector trackers(THREADS); - std::atomic need_to_stop = 0; - std::vector threads; - threads.reserve(THREADS); - - MemoryTracker picked; - overcommit_tracker.setCandidate(&picked); - - for (size_t i = 0; i < THREADS; ++i) - { - threads.push_back(std::thread( - [&, i]() - { - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - } - )); - } - - bool stopped_next = false; - std::thread( - [&]() - { - MemoryTracker failed; - overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); - stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); - } - ).join(); - - for (auto & thread : threads) - { - thread.join(); - } - - ASSERT_EQ(need_to_stop, 0); - ASSERT_EQ(stopped_next, true); -} - -TEST(OvercommitTracker, UserFreeContinueAndAlloc) -{ - ProcessList process_list; - ProcessListForUser user_process_list(&process_list); - UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list); - free_continue_and_alloc_test(user_overcommit_tracker); -} - -TEST(OvercommitTracker, GlobalFreeContinueAndAlloc) -{ - ProcessList process_list; - GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list); - free_continue_and_alloc_test(global_overcommit_tracker); -} - -template -void free_continue_and_alloc_2_test(T & overcommit_tracker) -{ - overcommit_tracker.setMaxWaitTime(WAIT_TIME); - - static constexpr size_t THREADS = 5; - std::vector trackers(THREADS); - std::atomic need_to_stop = 0; - std::vector threads; - threads.reserve(THREADS); - - MemoryTracker picked; - overcommit_tracker.setCandidate(&picked); - - for (size_t i = 0; i < THREADS; ++i) - { - threads.push_back(std::thread( - [&, i]() - { - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - } - )); - } - - bool stopped_next = false; - threads.push_back(std::thread( - [&]() - { - MemoryTracker failed; - overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); - stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); - } - )); - - overcommit_tracker.tryContinueQueryExecutionAfterFree(90); - - for (auto & thread : threads) - { - thread.join(); - } - - ASSERT_EQ(need_to_stop, 0); - ASSERT_EQ(stopped_next, true); -} - -TEST(OvercommitTracker, UserFreeContinueAndAlloc2) -{ - ProcessList process_list; - ProcessListForUser user_process_list(&process_list); - UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list); - free_continue_and_alloc_2_test(user_overcommit_tracker); -} - -TEST(OvercommitTracker, GlobalFreeContinueAndAlloc2) -{ - ProcessList process_list; - GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list); - free_continue_and_alloc_2_test(global_overcommit_tracker); -} - -template -void free_continue_and_alloc_3_test(T & overcommit_tracker) -{ - overcommit_tracker.setMaxWaitTime(WAIT_TIME); - - static constexpr size_t THREADS = 5; - std::vector trackers(THREADS); - std::atomic need_to_stop = 0; - std::vector threads; - threads.reserve(THREADS); - - MemoryTracker picked; - overcommit_tracker.setCandidate(&picked); - - for (size_t i = 0; i < THREADS; ++i) - { - threads.push_back(std::thread( - [&, i]() - { - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - } - )); - } - - bool stopped_next = false; - threads.push_back(std::thread( - [&]() - { - MemoryTracker failed; - overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); - stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); - } - )); - - overcommit_tracker.tryContinueQueryExecutionAfterFree(100); - - for (auto & thread : threads) - { - thread.join(); - } - - ASSERT_EQ(need_to_stop, 0); - ASSERT_EQ(stopped_next, false); -} - -TEST(OvercommitTracker, UserFreeContinueAndAlloc3) -{ - ProcessList process_list; - ProcessListForUser user_process_list(&process_list); - UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list); - free_continue_and_alloc_2_test(user_overcommit_tracker); -} - -TEST(OvercommitTracker, GlobalFreeContinueAndAlloc3) -{ - ProcessList process_list; - GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list); - free_continue_and_alloc_2_test(global_overcommit_tracker); -} - -template -void free_continue_2_test(T & overcommit_tracker) -{ - overcommit_tracker.setMaxWaitTime(WAIT_TIME); - - static constexpr size_t THREADS = 5; - std::vector trackers(THREADS); - std::atomic need_to_stop = 0; - std::vector threads; - threads.reserve(THREADS); - - MemoryTracker picked; - overcommit_tracker.setCandidate(&picked); - - for (size_t i = 0; i < THREADS; ++i) - { - threads.push_back(std::thread( - [&, i]() - { - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - } - )); - } - - std::thread( - [&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(300); } - ).join(); - - for (auto & thread : threads) - { - thread.join(); - } - - ASSERT_EQ(need_to_stop, 2); -} - -TEST(OvercommitTracker, UserFreeContinue2) -{ - ProcessList process_list; - ProcessListForUser user_process_list(&process_list); - UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list); - free_continue_2_test(user_overcommit_tracker); -} - -TEST(OvercommitTracker, GlobalFreeContinue2) -{ - ProcessList process_list; - GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list); - free_continue_2_test(global_overcommit_tracker); -} - -template -void query_stop_not_continue_test(T & overcommit_tracker) -{ - overcommit_tracker.setMaxWaitTime(WAIT_TIME); - - std::atomic need_to_stop = 0; - - MemoryTracker picked; - overcommit_tracker.setCandidate(&picked); - - MemoryTracker another; - auto thread = std::thread( - [&]() - { - if (overcommit_tracker.needToStopQuery(&another, 100)) - ++need_to_stop; - } - ); - overcommit_tracker.onQueryStop(&picked); - thread.join(); - - ASSERT_EQ(need_to_stop, 1); -} - -TEST(OvercommitTracker, UserQueryStopNotContinue) -{ - ProcessList process_list; - ProcessListForUser user_process_list(&process_list); - UserOvercommitTrackerForTest user_overcommit_tracker(&process_list, &user_process_list); - query_stop_not_continue_test(user_overcommit_tracker); -} - -TEST(OvercommitTracker, GlobalQueryStopNotContinue) -{ - ProcessList process_list; - GlobalOvercommitTrackerForTest global_overcommit_tracker(&process_list); - query_stop_not_continue_test(global_overcommit_tracker); -} diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index d67cc5eba0d..eb7b6c5526a 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -712,21 +712,14 @@ void Changelog::flush() current_writer->flush(force_sync); } -void Changelog::shutdown() -{ - if (!log_files_to_delete_queue.isFinished()) - log_files_to_delete_queue.finish(); - - if (clean_log_thread.joinable()) - clean_log_thread.join(); -} - Changelog::~Changelog() { try { flush(); - shutdown(); + log_files_to_delete_queue.finish(); + if (clean_log_thread.joinable()) + clean_log_thread.join(); } catch (...) { diff --git a/src/Coordination/Changelog.h b/src/Coordination/Changelog.h index d0d9c94f515..a8532f0ba1e 100644 --- a/src/Coordination/Changelog.h +++ b/src/Coordination/Changelog.h @@ -121,8 +121,6 @@ public: /// Fsync latest log to disk and flush buffer void flush(); - void shutdown(); - uint64_t size() const { return logs.size(); diff --git a/src/Coordination/KeeperLogStore.cpp b/src/Coordination/KeeperLogStore.cpp index 3787f30626b..2736deaf2af 100644 --- a/src/Coordination/KeeperLogStore.cpp +++ b/src/Coordination/KeeperLogStore.cpp @@ -118,18 +118,4 @@ nuraft::ptr KeeperLogStore::getLatestConfigChange() const return changelog.getLatestConfigChange(); } -void KeeperLogStore::shutdownChangelog() -{ - std::lock_guard lock(changelog_lock); - changelog.shutdown(); -} - -bool KeeperLogStore::flushChangelogAndShutdown() -{ - std::lock_guard lock(changelog_lock); - changelog.flush(); - changelog.shutdown(); - return true; -} - } diff --git a/src/Coordination/KeeperLogStore.h b/src/Coordination/KeeperLogStore.h index 3e558c0508e..b14b2255c56 100644 --- a/src/Coordination/KeeperLogStore.h +++ b/src/Coordination/KeeperLogStore.h @@ -52,12 +52,6 @@ public: /// Call fsync to the stored data bool flush() override; - /// Stop background cleanup thread in change - void shutdownChangelog(); - - /// Flush logstore and call shutdown of background thread in changelog - bool flushChangelogAndShutdown(); - /// Current log storage size uint64_t size() const; diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 6961f31ed20..f4a3715accf 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -360,7 +360,7 @@ void KeeperServer::shutdownRaftServer() void KeeperServer::shutdown() { state_machine->shutdownStorage(); - state_manager->flushAndShutDownLogStore(); + state_manager->flushLogStore(); shutdownRaftServer(); } diff --git a/src/Coordination/KeeperStateManager.cpp b/src/Coordination/KeeperStateManager.cpp index e083cd5d568..2d005ab7661 100644 --- a/src/Coordination/KeeperStateManager.cpp +++ b/src/Coordination/KeeperStateManager.cpp @@ -249,9 +249,9 @@ ClusterConfigPtr KeeperStateManager::getLatestConfigFromLogStore() const return nullptr; } -void KeeperStateManager::flushAndShutDownLogStore() +void KeeperStateManager::flushLogStore() { - log_store->flushChangelogAndShutdown(); + log_store->flush(); } void KeeperStateManager::save_config(const nuraft::cluster_config & config) diff --git a/src/Coordination/KeeperStateManager.h b/src/Coordination/KeeperStateManager.h index bdfdeeae1e5..7de9e90673e 100644 --- a/src/Coordination/KeeperStateManager.h +++ b/src/Coordination/KeeperStateManager.h @@ -52,8 +52,7 @@ public: void loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep); - /// Flush logstore and call shutdown of background thread - void flushAndShutDownLogStore(); + void flushLogStore(); /// Called on server start, in our case we don't use any separate logic for load nuraft::ptr load_config() override diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 29d6a55ab14..84f18d66196 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -455,8 +455,6 @@ struct ContextSharedPart delete_message_broker_schedule_pool.reset(); delete_ddl_worker.reset(); delete_access_control.reset(); - - total_memory_tracker.resetOvercommitTracker(); } bool hasTraceCollector() const diff --git a/src/Interpreters/ProcessList.cpp b/src/Interpreters/ProcessList.cpp index ac59d2c7235..6315b1405bd 100644 --- a/src/Interpreters/ProcessList.cpp +++ b/src/Interpreters/ProcessList.cpp @@ -344,9 +344,9 @@ QueryStatus::~QueryStatus() if (auto * memory_tracker = getMemoryTracker()) { if (user_process_list) - user_process_list->user_overcommit_tracker.onQueryStop(memory_tracker); + user_process_list->user_overcommit_tracker.unsubscribe(memory_tracker); if (auto shared_context = getContext()) - shared_context->getGlobalOvercommitTracker()->onQueryStop(memory_tracker); + shared_context->getGlobalOvercommitTracker()->unsubscribe(memory_tracker); } }