diff --git a/contrib/icu-cmake/CMakeLists.txt b/contrib/icu-cmake/CMakeLists.txt index afaa189701d..5714fef8347 100644 --- a/contrib/icu-cmake/CMakeLists.txt +++ b/contrib/icu-cmake/CMakeLists.txt @@ -481,6 +481,11 @@ if (ARCH_S390X) else() set(ICUDATA_SOURCE_FILE "${ICUDATA_SOURCE_DIR}/icudt75l_dat.S" ) endif() +# ^^ you might be confused how for different little endian platforms (x86, ARM) the same assembly files can be used. +# These files are indeed assembly but they only contain data ('.long' directive), which makes them portable accross CPUs. +# Only the endianness and the character set (ASCII, EBCDIC) makes a difference, also see +# https://unicode-org.github.io/icu/userguide/icu_data/#sharing-icu-data-between-platforms, 'Sharing ICU Data Between Platforms') +# (and as an experiment, try re-generating the data files on x86 vs. ARM, ... you'll get exactly the same files) set(ICUDATA_SOURCES "${ICUDATA_SOURCE_FILE}" diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index ced661d9772..3007df60765 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -384,6 +385,9 @@ try LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds()); }); + MemoryWorker memory_worker(config().getUInt64("memory_worker_period_ms", 0)); + memory_worker.start(); + static ServerErrorHandler error_handler; Poco::ErrorHandler::set(&error_handler); @@ -425,8 +429,9 @@ try for (const auto & server : *servers) metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()}); return metrics; - } - ); + }, + /*update_jemalloc_epoch_=*/memory_worker.getSource() != MemoryWorker::MemoryUsageSource::Jemalloc, + /*update_rss_=*/memory_worker.getSource() == MemoryWorker::MemoryUsageSource::None); std::vector listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host"); @@ -655,7 +660,6 @@ try GWPAsan::initFinished(); #endif - LOG_INFO(log, "Ready for connections."); waitForTerminationRequest(); diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index fb5717ba33f..c69d822e383 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include #include @@ -25,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -111,6 +111,8 @@ #include #include +#include + #include "config.h" #include @@ -449,9 +451,12 @@ void checkForUsersNotInMainConfig( } } +namespace +{ + /// Unused in other builds #if defined(OS_LINUX) -static String readLine(const String & path) +String readLine(const String & path) { ReadBufferFromFile in(path); String contents; @@ -459,7 +464,7 @@ static String readLine(const String & path) return contents; } -static int readNumber(const String & path) +int readNumber(const String & path) { ReadBufferFromFile in(path); int result; @@ -469,7 +474,7 @@ static int readNumber(const String & path) #endif -static void sanityChecks(Server & server) +void sanityChecks(Server & server) { std::string data_path = getCanonicalPath(server.config().getString("path", DBMS_DEFAULT_PATH)); std::string logs_path = server.config().getString("logger.log", ""); @@ -590,6 +595,8 @@ static void sanityChecks(Server & server) } } +} + void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, ContextMutablePtr context, Poco::Logger * log) { try @@ -906,6 +913,8 @@ try LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds()); }); + MemoryWorker memory_worker(global_context->getServerSettings().memory_worker_period_ms); + /// This object will periodically calculate some metrics. ServerAsynchronousMetrics async_metrics( global_context, @@ -924,8 +933,9 @@ try for (const auto & server : servers) metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()}); return metrics; - } - ); + }, + /*update_jemalloc_epoch_=*/memory_worker.getSource() != MemoryWorker::MemoryUsageSource::Jemalloc, + /*update_rss_=*/memory_worker.getSource() == MemoryWorker::MemoryUsageSource::None); /// NOTE: global context should be destroyed *before* GlobalThreadPool::shutdown() /// Otherwise GlobalThreadPool::shutdown() will hang, since Context holds some threads. @@ -1204,6 +1214,8 @@ try FailPointInjection::enableFromGlobalConfig(config()); + memory_worker.start(); + int default_oom_score = 0; #if !defined(NDEBUG) @@ -1547,15 +1559,6 @@ try total_memory_tracker.setDescription("(total)"); total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); - if (cgroups_memory_usage_observer) - { - double hard_limit_ratio = new_server_settings.cgroup_memory_watcher_hard_limit_ratio; - double soft_limit_ratio = new_server_settings.cgroup_memory_watcher_soft_limit_ratio; - cgroups_memory_usage_observer->setMemoryUsageLimits( - static_cast(max_server_memory_usage * hard_limit_ratio), - static_cast(max_server_memory_usage * soft_limit_ratio)); - } - size_t merges_mutations_memory_usage_soft_limit = new_server_settings.merges_mutations_memory_usage_soft_limit; size_t default_merges_mutations_server_memory_usage = static_cast(current_physical_server_memory * new_server_settings.merges_mutations_memory_usage_to_ram_ratio); @@ -1584,8 +1587,6 @@ try background_memory_tracker.setDescription("(background)"); background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking); - total_memory_tracker.setAllowUseJemallocMemory(new_server_settings.allow_use_jemalloc_memory); - auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker(); total_memory_tracker.setOvercommitTracker(global_overcommit_tracker); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1889bba3b39..f0410eee9fe 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -176,7 +176,7 @@ add_library (clickhouse_new_delete STATIC Common/new_delete.cpp) target_link_libraries (clickhouse_new_delete PRIVATE clickhouse_common_io) if (TARGET ch_contrib::jemalloc) target_link_libraries (clickhouse_new_delete PRIVATE ch_contrib::jemalloc) - target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::jemalloc) + target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::jemalloc) target_link_libraries (clickhouse_storages_system PRIVATE ch_contrib::jemalloc) endif() diff --git a/src/Common/AsynchronousMetrics.cpp b/src/Common/AsynchronousMetrics.cpp index 9b6a7428411..a92d321f8aa 100644 --- a/src/Common/AsynchronousMetrics.cpp +++ b/src/Common/AsynchronousMetrics.cpp @@ -1,5 +1,3 @@ -#include - #include #include #include @@ -8,8 +6,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -69,10 +69,14 @@ static void openCgroupv2MetricFile(const std::string & filename, std::optional(jemalloc_full_name.c_str()); values[clickhouse_full_name] = AsynchronousMetricValue(value, "An internal metric of the low-level memory allocator (jemalloc). See https://jemalloc.net/jemalloc.3.html"); return value; } @@ -768,8 +770,11 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update) // 'epoch' is a special mallctl -- it updates the statistics. Without it, all // the following calls will return stale values. It increments and returns // the current epoch number, which might be useful to log as a sanity check. - auto epoch = updateJemallocEpoch(); - new_values["jemalloc.epoch"] = { epoch, "An internal incremental update number of the statistics of jemalloc (Jason Evans' memory allocator), used in all other `jemalloc` metrics." }; + auto epoch = update_jemalloc_epoch ? updateJemallocEpoch() : getJemallocValue("epoch"); + new_values["jemalloc.epoch"] + = {epoch, + "An internal incremental update number of the statistics of jemalloc (Jason Evans' memory allocator), used in all other " + "`jemalloc` metrics."}; // Collect the statistics themselves. saveJemallocMetric(new_values, "allocated"); @@ -782,10 +787,10 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update) saveJemallocMetric(new_values, "background_thread.num_threads"); saveJemallocMetric(new_values, "background_thread.num_runs"); saveJemallocMetric(new_values, "background_thread.run_intervals"); - saveJemallocProf(new_values, "active"); + saveJemallocProf(new_values, "active"); saveAllArenasMetric(new_values, "pactive"); - [[maybe_unused]] size_t je_malloc_pdirty = saveAllArenasMetric(new_values, "pdirty"); - [[maybe_unused]] size_t je_malloc_pmuzzy = saveAllArenasMetric(new_values, "pmuzzy"); + saveAllArenasMetric(new_values, "pdirty"); + saveAllArenasMetric(new_values, "pmuzzy"); saveAllArenasMetric(new_values, "dirty_purged"); saveAllArenasMetric(new_values, "muzzy_purged"); #endif @@ -814,41 +819,8 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update) " It is unspecified whether it includes the per-thread stacks and most of the allocated memory, that is allocated with the 'mmap' system call." " This metric exists only for completeness reasons. I recommend to use the `MemoryResident` metric for monitoring."}; - /// We must update the value of total_memory_tracker periodically. - /// Otherwise it might be calculated incorrectly - it can include a "drift" of memory amount. - /// See https://github.com/ClickHouse/ClickHouse/issues/10293 - { - Int64 amount = total_memory_tracker.get(); - Int64 peak = total_memory_tracker.getPeak(); - Int64 rss = data.resident; - Int64 free_memory_in_allocator_arenas = 0; - -#if USE_JEMALLOC - /// According to jemalloc man, pdirty is: - /// - /// Number of pages within unused extents that are potentially - /// dirty, and for which madvise() or similar has not been called. - /// - /// So they will be subtracted from RSS to make accounting more - /// accurate, since those pages are not really RSS but a memory - /// that can be used at anytime via jemalloc. - free_memory_in_allocator_arenas = je_malloc_pdirty * getPageSize(); -#endif - - Int64 difference = rss - amount; - - /// Log only if difference is high. This is for convenience. The threshold is arbitrary. - if (difference >= 1048576 || difference <= -1048576) - LOG_TRACE(log, - "MemoryTracking: was {}, peak {}, free memory in arenas {}, will set to {} (RSS), difference: {}", - ReadableSize(amount), - ReadableSize(peak), - ReadableSize(free_memory_in_allocator_arenas), - ReadableSize(rss), - ReadableSize(difference)); - - MemoryTracker::setRSS(rss, free_memory_in_allocator_arenas); - } + if (update_rss) + MemoryTracker::updateRSS(data.resident); } { diff --git a/src/Common/AsynchronousMetrics.h b/src/Common/AsynchronousMetrics.h index 78d07ef4b6c..215dc6e1337 100644 --- a/src/Common/AsynchronousMetrics.h +++ b/src/Common/AsynchronousMetrics.h @@ -1,15 +1,14 @@ #pragma once +#include #include #include #include #include #include -#include #include #include -#include #include #include #include @@ -69,7 +68,9 @@ public: AsynchronousMetrics( unsigned update_period_seconds, - const ProtocolServerMetricsFunc & protocol_server_metrics_func_); + const ProtocolServerMetricsFunc & protocol_server_metrics_func_, + bool update_jemalloc_epoch_, + bool update_rss_); virtual ~AsynchronousMetrics(); @@ -112,6 +113,9 @@ private: MemoryStatisticsOS memory_stat TSA_GUARDED_BY(data_mutex); #endif + [[maybe_unused]] const bool update_jemalloc_epoch; + [[maybe_unused]] const bool update_rss; + #if defined(OS_LINUX) std::optional meminfo TSA_GUARDED_BY(data_mutex); std::optional loadavg TSA_GUARDED_BY(data_mutex); diff --git a/src/Common/CgroupsMemoryUsageObserver.cpp b/src/Common/CgroupsMemoryUsageObserver.cpp index 83b04360164..28bb861865a 100644 --- a/src/Common/CgroupsMemoryUsageObserver.cpp +++ b/src/Common/CgroupsMemoryUsageObserver.cpp @@ -14,239 +14,21 @@ #include #include -#include -#include -#include - -#include "config.h" -#if USE_JEMALLOC -# include -#define STRINGIFY_HELPER(x) #x -#define STRINGIFY(x) STRINGIFY_HELPER(x) -#endif using namespace DB; -namespace fs = std::filesystem; - -namespace DB -{ - -namespace ErrorCodes -{ -extern const int FILE_DOESNT_EXIST; -extern const int INCORRECT_DATA; -} - -} - -namespace -{ - -/// Format is -/// kernel 5 -/// rss 15 -/// [...] -using Metrics = std::map; - -Metrics readAllMetricsFromStatFile(ReadBufferFromFile & buf) -{ - Metrics metrics; - while (!buf.eof()) - { - std::string current_key; - readStringUntilWhitespace(current_key, buf); - - assertChar(' ', buf); - - uint64_t value = 0; - readIntText(value, buf); - assertChar('\n', buf); - - auto [_, inserted] = metrics.emplace(std::move(current_key), value); - chassert(inserted, "Duplicate keys in stat file"); - } - return metrics; -} - -uint64_t readMetricFromStatFile(ReadBufferFromFile & buf, const std::string & key) -{ - const auto all_metrics = readAllMetricsFromStatFile(buf); - if (const auto it = all_metrics.find(key); it != all_metrics.end()) - return it->second; - throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot find '{}' in '{}'", key, buf.getFileName()); -} - -struct CgroupsV1Reader : ICgroupsReader -{ - explicit CgroupsV1Reader(const fs::path & stat_file_dir) : buf(stat_file_dir / "memory.stat") { } - - uint64_t readMemoryUsage() override - { - std::lock_guard lock(mutex); - buf.rewind(); - return readMetricFromStatFile(buf, "rss"); - } - - std::string dumpAllStats() override - { - std::lock_guard lock(mutex); - buf.rewind(); - return fmt::format("{}", readAllMetricsFromStatFile(buf)); - } - -private: - std::mutex mutex; - ReadBufferFromFile buf TSA_GUARDED_BY(mutex); -}; - -struct CgroupsV2Reader : ICgroupsReader -{ - explicit CgroupsV2Reader(const fs::path & stat_file_dir) - : current_buf(stat_file_dir / "memory.current"), stat_buf(stat_file_dir / "memory.stat") - { - } - - uint64_t readMemoryUsage() override - { - std::lock_guard lock(mutex); - current_buf.rewind(); - stat_buf.rewind(); - - int64_t mem_usage = 0; - /// memory.current contains a single number - /// the reason why we subtract it described here: https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667 - readIntText(mem_usage, current_buf); - mem_usage -= readMetricFromStatFile(stat_buf, "inactive_file"); - chassert(mem_usage >= 0, "Negative memory usage"); - return mem_usage; - } - - std::string dumpAllStats() override - { - std::lock_guard lock(mutex); - stat_buf.rewind(); - return fmt::format("{}", readAllMetricsFromStatFile(stat_buf)); - } - -private: - std::mutex mutex; - ReadBufferFromFile current_buf TSA_GUARDED_BY(mutex); - ReadBufferFromFile stat_buf TSA_GUARDED_BY(mutex); -}; - -/// Caveats: -/// - All of the logic in this file assumes that the current process is the only process in the -/// containing cgroup (or more precisely: the only process with significant memory consumption). -/// If this is not the case, then other processe's memory consumption may affect the internal -/// memory tracker ... -/// - Cgroups v1 and v2 allow nested cgroup hierarchies. As v1 is deprecated for over half a -/// decade and will go away at some point, hierarchical detection is only implemented for v2. -/// - I did not test what happens if a host has v1 and v2 simultaneously enabled. I believe such -/// systems existed only for a short transition period. - -std::optional getCgroupsV1Path() -{ - auto path = default_cgroups_mount / "memory/memory.stat"; - if (!fs::exists(path)) - return {}; - return {default_cgroups_mount / "memory"}; -} - -std::pair getCgroupsPath() -{ - auto v2_path = getCgroupsV2PathContainingFile("memory.current"); - if (v2_path.has_value()) - return {*v2_path, CgroupsMemoryUsageObserver::CgroupsVersion::V2}; - - auto v1_path = getCgroupsV1Path(); - if (v1_path.has_value()) - return {*v1_path, CgroupsMemoryUsageObserver::CgroupsVersion::V1}; - - throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot find cgroups v1 or v2 current memory file"); -} - -} namespace DB { CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_) : log(getLogger("CgroupsMemoryUsageObserver")), wait_time(wait_time_) -{ - const auto [cgroup_path, version] = getCgroupsPath(); - - cgroup_reader = createCgroupsReader(version, cgroup_path); - - LOG_INFO( - log, - "Will read the current memory usage from '{}' (cgroups version: {}), wait time is {} sec", - cgroup_path, - (version == CgroupsVersion::V1) ? "v1" : "v2", - wait_time.count()); -} +{} CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver() { stopThread(); } -void CgroupsMemoryUsageObserver::setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_) -{ - std::lock_guard limit_lock(limit_mutex); - - if (hard_limit_ == hard_limit && soft_limit_ == soft_limit) - return; - - hard_limit = hard_limit_; - soft_limit = soft_limit_; - - on_hard_limit = [this, hard_limit_](bool up) - { - if (up) - { - LOG_WARNING(log, "Exceeded hard memory limit ({})", ReadableSize(hard_limit_)); - - /// Update current usage in memory tracker. Also reset free_memory_in_allocator_arenas to zero though we don't know if they are - /// really zero. Trying to avoid OOM ... - MemoryTracker::setRSS(hard_limit_, 0); - } - else - { - LOG_INFO(log, "Dropped below hard memory limit ({})", ReadableSize(hard_limit_)); - } - }; - - on_soft_limit = [this, soft_limit_](bool up) - { - if (up) - { - LOG_WARNING(log, "Exceeded soft memory limit ({})", ReadableSize(soft_limit_)); - -# if USE_JEMALLOC - LOG_INFO(log, "Purging jemalloc arenas"); - mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0); -# endif - /// Reset current usage in memory tracker. Expect zero for free_memory_in_allocator_arenas as we just purged them. - uint64_t memory_usage = cgroup_reader->readMemoryUsage(); - LOG_TRACE( - log, - "Read current memory usage {} bytes ({}) from cgroups, full available stats: {}", - memory_usage, - ReadableSize(memory_usage), - cgroup_reader->dumpAllStats()); - MemoryTracker::setRSS(memory_usage, 0); - - LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(memory_usage)); - } - else - { - LOG_INFO(log, "Dropped below soft memory limit ({})", ReadableSize(soft_limit_)); - } - }; - - LOG_INFO(log, "Set new limits, soft limit: {}, hard limit: {}", ReadableSize(soft_limit_), ReadableSize(hard_limit_)); -} - void CgroupsMemoryUsageObserver::setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_) { std::lock_guard memory_amount_available_changed_lock(memory_amount_available_changed_mutex); @@ -300,35 +82,6 @@ void CgroupsMemoryUsageObserver::runThread() std::lock_guard memory_amount_available_changed_lock(memory_amount_available_changed_mutex); on_memory_amount_available_changed(); } - - std::lock_guard limit_lock(limit_mutex); - if (soft_limit > 0 && hard_limit > 0) - { - uint64_t memory_usage = cgroup_reader->readMemoryUsage(); - LOG_TRACE(log, "Read current memory usage {} bytes ({}) from cgroups", memory_usage, ReadableSize(memory_usage)); - if (memory_usage > hard_limit) - { - if (last_memory_usage <= hard_limit) - on_hard_limit(true); - } - else - { - if (last_memory_usage > hard_limit) - on_hard_limit(false); - } - - if (memory_usage > soft_limit) - { - if (last_memory_usage <= soft_limit) - on_soft_limit(true); - } - else - { - if (last_memory_usage > soft_limit) - on_soft_limit(false); - } - last_memory_usage = memory_usage; - } } catch (...) { @@ -337,13 +90,6 @@ void CgroupsMemoryUsageObserver::runThread() } } -std::unique_ptr createCgroupsReader(CgroupsMemoryUsageObserver::CgroupsVersion version, const fs::path & cgroup_path) -{ - if (version == CgroupsMemoryUsageObserver::CgroupsVersion::V2) - return std::make_unique(cgroup_path); - else - return std::make_unique(cgroup_path); -} } #endif diff --git a/src/Common/CgroupsMemoryUsageObserver.h b/src/Common/CgroupsMemoryUsageObserver.h index 7f888fe631b..3de83d6b437 100644 --- a/src/Common/CgroupsMemoryUsageObserver.h +++ b/src/Common/CgroupsMemoryUsageObserver.h @@ -3,53 +3,27 @@ #include #include -#include #include namespace DB { -struct ICgroupsReader -{ - virtual ~ICgroupsReader() = default; - - virtual uint64_t readMemoryUsage() = 0; - - virtual std::string dumpAllStats() = 0; -}; - -/// Does two things: -/// 1. Periodically reads the memory usage of the process from Linux cgroups. -/// You can specify soft or hard memory limits: -/// - When the soft memory limit is hit, drop jemalloc cache. -/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster. -/// The goal of this is to avoid that the process hits the maximum allowed memory limit at which there is a good -/// chance that the Limux OOM killer terminates it. All of this is done is because internal memory tracking in -/// ClickHouse can unfortunately under-estimate the actually used memory. -/// 2. Periodically reads the the maximum memory available to the process (which can change due to cgroups settings). -/// You can specify a callback to react on changes. The callback typically reloads the configuration, i.e. Server -/// or Keeper configuration file. This reloads settings 'max_server_memory_usage' (Server) and 'max_memory_usage_soft_limit' -/// (Keeper) from which various other internal limits are calculated, including the soft and hard limits for (1.). -/// The goal of this is to provide elasticity when the container is scaled-up/scaled-down. The mechanism (polling -/// cgroups) is quite implicit, unfortunately there is currently no better way to communicate memory threshold changes -/// to the database. +/// Periodically reads the the maximum memory available to the process (which can change due to cgroups settings). +/// You can specify a callback to react on changes. The callback typically reloads the configuration, i.e. Server +/// or Keeper configuration file. This reloads settings 'max_server_memory_usage' (Server) and 'max_memory_usage_soft_limit' +/// (Keeper) from which various other internal limits are calculated, including the soft and hard limits for (1.). +/// The goal of this is to provide elasticity when the container is scaled-up/scaled-down. The mechanism (polling +/// cgroups) is quite implicit, unfortunately there is currently no better way to communicate memory threshold changes +/// to the database. #if defined(OS_LINUX) class CgroupsMemoryUsageObserver { public: - using OnMemoryLimitFn = std::function; using OnMemoryAmountAvailableChangedFn = std::function; - enum class CgroupsVersion : uint8_t - { - V1, - V2 - }; - explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_); ~CgroupsMemoryUsageObserver(); - void setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_); void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_); void startThread(); @@ -60,32 +34,22 @@ private: const std::chrono::seconds wait_time; std::mutex limit_mutex; - size_t hard_limit TSA_GUARDED_BY(limit_mutex) = 0; - size_t soft_limit TSA_GUARDED_BY(limit_mutex) = 0; - OnMemoryLimitFn on_hard_limit TSA_GUARDED_BY(limit_mutex); - OnMemoryLimitFn on_soft_limit TSA_GUARDED_BY(limit_mutex); std::mutex memory_amount_available_changed_mutex; OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed TSA_GUARDED_BY(memory_amount_available_changed_mutex); - uint64_t last_memory_usage = 0; /// how much memory does the process use uint64_t last_available_memory_amount; /// how much memory can the process use void stopThread(); void runThread(); - std::unique_ptr cgroup_reader; - std::mutex thread_mutex; std::condition_variable cond; ThreadFromGlobalPool thread; bool quit = false; }; -std::unique_ptr -createCgroupsReader(CgroupsMemoryUsageObserver::CgroupsVersion version, const std::filesystem::path & cgroup_path); - #else class CgroupsMemoryUsageObserver { diff --git a/src/Common/Jemalloc.cpp b/src/Common/Jemalloc.cpp index d7cc246db6a..d8ff9268cca 100644 --- a/src/Common/Jemalloc.cpp +++ b/src/Common/Jemalloc.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #define STRINGIFY_HELPER(x) #x #define STRINGIFY(x) STRINGIFY_HELPER(x) @@ -26,7 +25,6 @@ namespace ErrorCodes void purgeJemallocArenas() { - LOG_TRACE(getLogger("SystemJemalloc"), "Purging unused memory"); Stopwatch watch; mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0); ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge); @@ -46,20 +44,6 @@ void checkJemallocProfilingEnabled() "set: MALLOC_CONF=background_thread:true,prof:true"); } -template -void setJemallocValue(const char * name, T value) -{ - T old_value; - size_t old_value_size = sizeof(T); - if (mallctl(name, &old_value, &old_value_size, reinterpret_cast(&value), sizeof(T))) - { - LOG_WARNING(getLogger("Jemalloc"), "mallctl for {} failed", name); - return; - } - - LOG_INFO(getLogger("Jemalloc"), "Value for {} set to {} (from {})", name, value, old_value); -} - void setJemallocProfileActive(bool value) { checkJemallocProfilingEnabled(); diff --git a/src/Common/Jemalloc.h b/src/Common/Jemalloc.h index 499a906fd3d..22a94a44eba 100644 --- a/src/Common/Jemalloc.h +++ b/src/Common/Jemalloc.h @@ -5,6 +5,8 @@ #if USE_JEMALLOC #include +#include +#include namespace DB { @@ -21,6 +23,59 @@ void setJemallocBackgroundThreads(bool enabled); void setJemallocMaxBackgroundThreads(size_t max_threads); +template +void setJemallocValue(const char * name, T value) +{ + T old_value; + size_t old_value_size = sizeof(T); + mallctl(name, &old_value, &old_value_size, reinterpret_cast(&value), sizeof(T)); + LOG_INFO(getLogger("Jemalloc"), "Value for {} set to {} (from {})", name, value, old_value); +} + +template +T getJemallocValue(const char * name) +{ + T value; + size_t value_size = sizeof(T); + mallctl(name, &value, &value_size, nullptr, 0); + return value; +} + +/// Each mallctl call consists of string name lookup which can be expensive. +/// This can be avoided by translating name to "Management Information Base" (MIB) +/// and using it in mallctlbymib calls +template +struct JemallocMibCache +{ + explicit JemallocMibCache(const char * name) + { + mallctlnametomib(name, mib, &mib_length); + } + + void setValue(T value) + { + mallctlbymib(mib, mib_length, nullptr, nullptr, reinterpret_cast(&value), sizeof(T)); + } + + T getValue() + { + T value; + size_t value_size = sizeof(T); + mallctlbymib(mib, mib_length, &value, &value_size, nullptr, 0); + return value; + } + + void run() + { + mallctlbymib(mib, mib_length, nullptr, nullptr, nullptr, 0); + } + +private: + static constexpr size_t max_mib_length = 4; + size_t mib[max_mib_length]; + size_t mib_length = max_mib_length; +}; + } #endif diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 7c0115467c6..7bf665ea7a0 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -20,13 +20,9 @@ #if USE_JEMALLOC # include -#define STRINGIFY_HELPER(x) #x -#define STRINGIFY(x) STRINGIFY_HELPER(x) - #endif #include -#include #include #include #include @@ -115,8 +111,6 @@ void AllocationTrace::onFreeImpl(void * ptr, size_t size) const namespace ProfileEvents { extern const Event QueryMemoryLimitExceeded; - extern const Event MemoryAllocatorPurge; - extern const Event MemoryAllocatorPurgeTimeMicroseconds; } using namespace std::chrono_literals; @@ -126,15 +120,13 @@ static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; MemoryTracker total_memory_tracker(nullptr, VariableContext::Global); MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false); -std::atomic MemoryTracker::free_memory_in_allocator_arenas; - MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {} MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {} + MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_) - : parent(parent_) - , log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_) - , level(level_) -{} + : parent(parent_), log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_), level(level_) +{ +} MemoryTracker::~MemoryTracker() { @@ -204,10 +196,14 @@ void MemoryTracker::debugLogBigAllocationWithoutCheck(Int64 size [[maybe_unused] return; MemoryTrackerBlockerInThread blocker(VariableContext::Global); - LOG_TEST(getLogger("MemoryTracker"), "Too big allocation ({} bytes) without checking memory limits, " - "it may lead to OOM. Stack trace: {}", size, StackTrace().toString()); + LOG_TEST( + getLogger("MemoryTracker"), + "Too big allocation ({} bytes) without checking memory limits, " + "it may lead to OOM. Stack trace: {}", + size, + StackTrace().toString()); #else - return; /// Avoid trash logging in release builds + /// Avoid trash logging in release builds #endif } @@ -228,6 +224,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed { /// For global memory tracker always update memory usage. amount.fetch_add(size, std::memory_order_relaxed); + rss.fetch_add(size, std::memory_order_relaxed); auto metric_loaded = metric.load(std::memory_order_relaxed); if (metric_loaded != CurrentMetrics::end()) @@ -249,6 +246,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed * So, we allow over-allocations. */ Int64 will_be = size ? size + amount.fetch_add(size, std::memory_order_relaxed) : amount.load(std::memory_order_relaxed); + Int64 will_be_rss = size ? size + rss.fetch_add(size, std::memory_order_relaxed) : rss.load(std::memory_order_relaxed); auto metric_loaded = metric.load(std::memory_order_relaxed); if (metric_loaded != CurrentMetrics::end() && size) @@ -275,6 +273,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed { /// Revert amount.fetch_sub(size, std::memory_order_relaxed); + rss.fetch_sub(size, std::memory_order_relaxed); /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global); @@ -297,33 +296,8 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed } } - Int64 limit_to_check = current_hard_limit; - -#if USE_JEMALLOC - if (level == VariableContext::Global && allow_use_jemalloc_memory.load(std::memory_order_relaxed)) - { - /// Jemalloc arenas may keep some extra memory. - /// This memory was substucted from RSS to decrease memory drift. - /// In case memory is close to limit, try to pugre the arenas. - /// This is needed to avoid OOM, because some allocations are directly done with mmap. - Int64 current_free_memory_in_allocator_arenas = free_memory_in_allocator_arenas.load(std::memory_order_relaxed); - - if (current_free_memory_in_allocator_arenas > 0 && current_hard_limit && current_free_memory_in_allocator_arenas + will_be > current_hard_limit) - { - if (free_memory_in_allocator_arenas.exchange(-current_free_memory_in_allocator_arenas) > 0) - { - Stopwatch watch; - mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0); - ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge); - ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, watch.elapsedMicroseconds()); - } - } - - limit_to_check += abs(current_free_memory_in_allocator_arenas); - } -#endif - - if (unlikely(current_hard_limit && will_be > limit_to_check)) + if (unlikely( + current_hard_limit && (will_be > current_hard_limit || (level == VariableContext::Global && will_be_rss > current_hard_limit)))) { if (memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded) { @@ -335,6 +309,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed { /// Revert amount.fetch_sub(size, std::memory_order_relaxed); + rss.fetch_sub(size, std::memory_order_relaxed); /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global); @@ -343,12 +318,13 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed throw DB::Exception( DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED, "Memory limit{}{} exceeded: " - "would use {} (attempt to allocate chunk of {} bytes), maximum: {}." + "would use {} (attempt to allocate chunk of {} bytes), current RSS {}, maximum: {}." "{}{}", description ? " " : "", description ? description : "", formatReadableSizeWithBinarySuffix(will_be), size, + formatReadableSizeWithBinarySuffix(rss.load(std::memory_order_relaxed)), formatReadableSizeWithBinarySuffix(current_hard_limit), overcommit_result == OvercommitResult::NONE ? "" : " OvercommitTracker decision: ", toDescription(overcommit_result)); @@ -442,6 +418,7 @@ AllocationTrace MemoryTracker::free(Int64 size, double _sample_probability) { /// For global memory tracker always update memory usage. amount.fetch_sub(size, std::memory_order_relaxed); + rss.fetch_sub(size, std::memory_order_relaxed); auto metric_loaded = metric.load(std::memory_order_relaxed); if (metric_loaded != CurrentMetrics::end()) CurrentMetrics::sub(metric_loaded, size); @@ -455,7 +432,12 @@ AllocationTrace MemoryTracker::free(Int64 size, double _sample_probability) } Int64 accounted_size = size; - if (level == VariableContext::Thread || level == VariableContext::Global) + if (level == VariableContext::Global) + { + amount.fetch_sub(accounted_size, std::memory_order_relaxed); + rss.fetch_sub(accounted_size, std::memory_order_relaxed); + } + else if (level == VariableContext::Thread) { /// Could become negative if memory allocated in this thread is freed in another one amount.fetch_sub(accounted_size, std::memory_order_relaxed); @@ -529,21 +511,29 @@ void MemoryTracker::reset() } -void MemoryTracker::setRSS(Int64 rss_, Int64 free_memory_in_allocator_arenas_) +void MemoryTracker::updateRSS(Int64 rss_) { - Int64 new_amount = rss_; + total_memory_tracker.rss.store(rss_, std::memory_order_relaxed); +} + +void MemoryTracker::updateAllocated(Int64 allocated_) +{ + Int64 new_amount = allocated_; + LOG_INFO( + getLogger("MemoryTracker"), + "Correcting the value of global memory tracker from {} to {}", + ReadableSize(total_memory_tracker.amount.load(std::memory_order_relaxed)), + ReadableSize(allocated_)); total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed); - free_memory_in_allocator_arenas.store(free_memory_in_allocator_arenas_, std::memory_order_relaxed); auto metric_loaded = total_memory_tracker.metric.load(std::memory_order_relaxed); if (metric_loaded != CurrentMetrics::end()) CurrentMetrics::set(metric_loaded, new_amount); bool log_memory_usage = true; - total_memory_tracker.updatePeak(rss_, log_memory_usage); + total_memory_tracker.updatePeak(new_amount, log_memory_usage); } - void MemoryTracker::setSoftLimit(Int64 value) { soft_limit.store(value, std::memory_order_relaxed); diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index fd32b631774..f15465a20c1 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -2,7 +2,6 @@ #include #include -#include #include #include #include @@ -57,9 +56,8 @@ private: std::atomic soft_limit {0}; std::atomic hard_limit {0}; std::atomic profiler_limit {0}; - std::atomic_bool allow_use_jemalloc_memory {true}; - static std::atomic free_memory_in_allocator_arenas; + std::atomic rss{0}; Int64 profiler_step = 0; @@ -122,6 +120,11 @@ public: return amount.load(std::memory_order_relaxed); } + Int64 getRSS() const + { + return rss.load(std::memory_order_relaxed); + } + // Merges and mutations may pass memory ownership to other threads thus in the end of execution // MemoryTracker for background task may have a non-zero counter. // This method is intended to fix the counter inside of background_memory_tracker. @@ -154,14 +157,6 @@ public: { return soft_limit.load(std::memory_order_relaxed); } - void setAllowUseJemallocMemory(bool value) - { - allow_use_jemalloc_memory.store(value, std::memory_order_relaxed); - } - bool getAllowUseJemallocMmemory() const - { - return allow_use_jemalloc_memory.load(std::memory_order_relaxed); - } /** Set limit if it was not set. * Otherwise, set limit to new value, if new value is greater than previous limit. @@ -249,10 +244,9 @@ public: /// Reset the accumulated data. void reset(); - /// Reset current counter to an RSS value. - /// Jemalloc may have pre-allocated arenas, they are accounted in RSS. - /// We can free this arenas in case of exception to avoid OOM. - static void setRSS(Int64 rss_, Int64 free_memory_in_allocator_arenas_); + /// update values based on external information (e.g. jemalloc's stat) + static void updateRSS(Int64 rss_); + static void updateAllocated(Int64 allocated_); /// Prints info about peak memory consumption into log. void logPeakMemoryUsage(); diff --git a/src/Common/MemoryWorker.cpp b/src/Common/MemoryWorker.cpp new file mode 100644 index 00000000000..11f3bff348c --- /dev/null +++ b/src/Common/MemoryWorker.cpp @@ -0,0 +1,333 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +namespace fs = std::filesystem; + +namespace ProfileEvents +{ + extern const Event MemoryAllocatorPurge; + extern const Event MemoryAllocatorPurgeTimeMicroseconds; + extern const Event MemoryWorkerRun; + extern const Event MemoryWorkerRunElapsedMicroseconds; +} + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int FILE_DOESNT_EXIST; + extern const int LOGICAL_ERROR; +} + +#if defined(OS_LINUX) +namespace +{ + +using Metrics = std::map; + +/// Format is +/// kernel 5 +/// rss 15 +/// [...] +Metrics readAllMetricsFromStatFile(ReadBufferFromFile & buf) +{ + Metrics metrics; + while (!buf.eof()) + { + std::string current_key; + readStringUntilWhitespace(current_key, buf); + + assertChar(' ', buf); + + uint64_t value = 0; + readIntText(value, buf); + assertChar('\n', buf); + + auto [_, inserted] = metrics.emplace(std::move(current_key), value); + chassert(inserted, "Duplicate keys in stat file"); + } + return metrics; +} + +uint64_t readMetricFromStatFile(ReadBufferFromFile & buf, std::string_view key) +{ + while (!buf.eof()) + { + std::string current_key; + readStringUntilWhitespace(current_key, buf); + if (current_key != key) + { + std::string dummy; + readStringUntilNewlineInto(dummy, buf); + buf.ignore(); + continue; + } + + assertChar(' ', buf); + uint64_t value = 0; + readIntText(value, buf); + return value; + } + LOG_ERROR(getLogger("CgroupsReader"), "Cannot find '{}' in '{}'", key, buf.getFileName()); + return 0; +} + +struct CgroupsV1Reader : ICgroupsReader +{ + explicit CgroupsV1Reader(const fs::path & stat_file_dir) : buf(stat_file_dir / "memory.stat") { } + + uint64_t readMemoryUsage() override + { + std::lock_guard lock(mutex); + buf.rewind(); + return readMetricFromStatFile(buf, "rss"); + } + + std::string dumpAllStats() override + { + std::lock_guard lock(mutex); + buf.rewind(); + return fmt::format("{}", readAllMetricsFromStatFile(buf)); + } + +private: + std::mutex mutex; + ReadBufferFromFile buf TSA_GUARDED_BY(mutex); +}; + +struct CgroupsV2Reader : ICgroupsReader +{ + explicit CgroupsV2Reader(const fs::path & stat_file_dir) : stat_buf(stat_file_dir / "memory.stat") { } + + uint64_t readMemoryUsage() override + { + std::lock_guard lock(mutex); + stat_buf.rewind(); + return readMetricFromStatFile(stat_buf, "anon"); + } + + std::string dumpAllStats() override + { + std::lock_guard lock(mutex); + stat_buf.rewind(); + return fmt::format("{}", readAllMetricsFromStatFile(stat_buf)); + } + +private: + std::mutex mutex; + ReadBufferFromFile stat_buf TSA_GUARDED_BY(mutex); +}; + +/// Caveats: +/// - All of the logic in this file assumes that the current process is the only process in the +/// containing cgroup (or more precisely: the only process with significant memory consumption). +/// If this is not the case, then other processe's memory consumption may affect the internal +/// memory tracker ... +/// - Cgroups v1 and v2 allow nested cgroup hierarchies. As v1 is deprecated for over half a +/// decade and will go away at some point, hierarchical detection is only implemented for v2. +/// - I did not test what happens if a host has v1 and v2 simultaneously enabled. I believe such +/// systems existed only for a short transition period. + +std::optional getCgroupsV1Path() +{ + auto path = default_cgroups_mount / "memory/memory.stat"; + if (!fs::exists(path)) + return {}; + return {default_cgroups_mount / "memory"}; +} + +std::pair getCgroupsPath() +{ + auto v2_path = getCgroupsV2PathContainingFile("memory.current"); + if (v2_path.has_value()) + return {*v2_path, ICgroupsReader::CgroupsVersion::V2}; + + auto v1_path = getCgroupsV1Path(); + if (v1_path.has_value()) + return {*v1_path, ICgroupsReader::CgroupsVersion::V1}; + + throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot find cgroups v1 or v2 current memory file"); +} + +} + +std::shared_ptr ICgroupsReader::createCgroupsReader(ICgroupsReader::CgroupsVersion version, const std::filesystem::path & cgroup_path) +{ + if (version == CgroupsVersion::V2) + return std::make_shared(cgroup_path); + else + { + chassert(version == CgroupsVersion::V1); + return std::make_shared(cgroup_path); + } +} +#endif + +namespace +{ + +std::string_view sourceToString(MemoryWorker::MemoryUsageSource source) +{ + switch (source) + { + case MemoryWorker::MemoryUsageSource::Cgroups: return "Cgroups"; + case MemoryWorker::MemoryUsageSource::Jemalloc: return "Jemalloc"; + case MemoryWorker::MemoryUsageSource::None: return "None"; + } +} + +} + +/// We try to pick the best possible supported source for reading memory usage. +/// Supported sources in order of priority +/// - reading from cgroups' pseudo-files (fastest and most accurate) +/// - reading jemalloc's resident stat (doesn't take into account allocations that didn't use jemalloc) +/// Also, different tick rates are used because not all options are equally fast +MemoryWorker::MemoryWorker(uint64_t period_ms_) + : log(getLogger("MemoryWorker")) + , period_ms(period_ms_) +{ +#if defined(OS_LINUX) + try + { + static constexpr uint64_t cgroups_memory_usage_tick_ms{50}; + + const auto [cgroup_path, version] = getCgroupsPath(); + LOG_INFO( + getLogger("CgroupsReader"), + "Will create cgroup reader from '{}' (cgroups version: {})", + cgroup_path, + (version == ICgroupsReader::CgroupsVersion::V1) ? "v1" : "v2"); + + cgroups_reader = ICgroupsReader::createCgroupsReader(version, cgroup_path); + source = MemoryUsageSource::Cgroups; + if (period_ms == 0) + period_ms = cgroups_memory_usage_tick_ms; + + return; + } + catch (...) + { + tryLogCurrentException(log, "Cannot use cgroups reader"); + } +#endif + +#if USE_JEMALLOC + static constexpr uint64_t jemalloc_memory_usage_tick_ms{100}; + + source = MemoryUsageSource::Jemalloc; + if (period_ms == 0) + period_ms = jemalloc_memory_usage_tick_ms; +#endif +} + +MemoryWorker::MemoryUsageSource MemoryWorker::getSource() +{ + return source; +} + +void MemoryWorker::start() +{ + if (source == MemoryUsageSource::None) + return; + + LOG_INFO( + getLogger("MemoryWorker"), + "Starting background memory thread with period of {}ms, using {} as source", + period_ms, + sourceToString(source)); + background_thread = ThreadFromGlobalPool([this] { backgroundThread(); }); +} + +MemoryWorker::~MemoryWorker() +{ + { + std::unique_lock lock(mutex); + shutdown = true; + } + cv.notify_all(); + + if (background_thread.joinable()) + background_thread.join(); +} + +uint64_t MemoryWorker::getMemoryUsage() +{ + switch (source) + { + case MemoryUsageSource::Cgroups: + return cgroups_reader != nullptr ? cgroups_reader->readMemoryUsage() : 0; + case MemoryUsageSource::Jemalloc: +#if USE_JEMALLOC + return resident_mib.getValue(); +#else + return 0; +#endif + case MemoryUsageSource::None: + throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Trying to fetch memory usage while no memory source can be used"); + } +} + +void MemoryWorker::backgroundThread() +{ + std::chrono::milliseconds chrono_period_ms{period_ms}; + [[maybe_unused]] bool first_run = true; + std::unique_lock lock(mutex); + while (true) + { + cv.wait_for(lock, chrono_period_ms, [this] { return shutdown; }); + if (shutdown) + return; + + Stopwatch total_watch; + +#if USE_JEMALLOC + if (source == MemoryUsageSource::Jemalloc) + epoch_mib.setValue(0); +#endif + + Int64 resident = getMemoryUsage(); + MemoryTracker::updateRSS(resident); + +#if USE_JEMALLOC + if (resident > total_memory_tracker.getHardLimit()) + { + Stopwatch purge_watch; + purge_mib.run(); + ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge); + ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, purge_watch.elapsedMicroseconds()); + } +#endif + +#if USE_JEMALLOC + if (unlikely(first_run || total_memory_tracker.get() < 0)) + { + if (source != MemoryUsageSource::Jemalloc) + epoch_mib.setValue(0); + + MemoryTracker::updateAllocated(allocated_mib.getValue()); + } +#endif + + ProfileEvents::increment(ProfileEvents::MemoryWorkerRun); + ProfileEvents::increment(ProfileEvents::MemoryWorkerRunElapsedMicroseconds, total_watch.elapsedMicroseconds()); + first_run = false; + } +} + +} diff --git a/src/Common/MemoryWorker.h b/src/Common/MemoryWorker.h new file mode 100644 index 00000000000..f4b0fed23ec --- /dev/null +++ b/src/Common/MemoryWorker.h @@ -0,0 +1,84 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +struct ICgroupsReader +{ + enum class CgroupsVersion : uint8_t + { + V1, + V2 + }; + +#if defined(OS_LINUX) + static std::shared_ptr + createCgroupsReader(ICgroupsReader::CgroupsVersion version, const std::filesystem::path & cgroup_path); +#endif + + virtual ~ICgroupsReader() = default; + + virtual uint64_t readMemoryUsage() = 0; + + virtual std::string dumpAllStats() = 0; +}; + + +/// Correct MemoryTracker based on external information (e.g. Cgroups or stats.resident from jemalloc) +/// The worker spawns a background thread which periodically reads current resident memory from the source, +/// whose value is sent to global MemoryTracker. +/// It can do additional things like purging jemalloc dirty pages if the current memory usage is higher than global hard limit. +class MemoryWorker +{ +public: + explicit MemoryWorker(uint64_t period_ms_); + + enum class MemoryUsageSource : uint8_t + { + None, + Cgroups, + Jemalloc + }; + + MemoryUsageSource getSource(); + + void start(); + + ~MemoryWorker(); +private: + uint64_t getMemoryUsage(); + + void backgroundThread(); + + ThreadFromGlobalPool background_thread; + + std::mutex mutex; + std::condition_variable cv; + bool shutdown = false; + + LoggerPtr log; + + uint64_t period_ms; + + MemoryUsageSource source{MemoryUsageSource::None}; + + std::shared_ptr cgroups_reader; + +#if USE_JEMALLOC + JemallocMibCache epoch_mib{"epoch"}; + JemallocMibCache resident_mib{"stats.resident"}; + JemallocMibCache allocated_mib{"stats.allocated"}; + +#define STRINGIFY_HELPER(x) #x +#define STRINGIFY(x) STRINGIFY_HELPER(x) + JemallocMibCache purge_mib{"arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge"}; +#undef STRINGIFY +#undef STRINGIFY_HELPER +#endif +}; + +} diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index af1b7fbeb4a..467dfe60cd7 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -827,6 +827,9 @@ The server successfully detected this situation and will download merged part fr M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \ M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \ M(GWPAsanFree, "Number of free operations done by GWPAsan") \ + \ + M(MemoryWorkerRun, "Number of runs done by MemoryWorker in background") \ + M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work") \ #ifdef APPLY_FOR_EXTERNAL_EVENTS diff --git a/src/Common/tests/gtest_cgroups_reader.cpp b/src/Common/tests/gtest_cgroups_reader.cpp index 2de25bb42ce..e24b91a59b8 100644 --- a/src/Common/tests/gtest_cgroups_reader.cpp +++ b/src/Common/tests/gtest_cgroups_reader.cpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include using namespace DB; @@ -126,7 +126,7 @@ const std::string EXPECTED[2] "\"workingset_restore_anon\": 0, \"workingset_restore_file\": 0, \"zswap\": 0, \"zswapped\": 0, \"zswpin\": 0, \"zswpout\": 0}"}; -class CgroupsMemoryUsageObserverFixture : public ::testing::TestWithParam +class CgroupsMemoryUsageObserverFixture : public ::testing::TestWithParam { void SetUp() override { @@ -138,7 +138,7 @@ class CgroupsMemoryUsageObserverFixture : public ::testing::TestWithParamreadMemoryUsage(), - version == CgroupsMemoryUsageObserver::CgroupsVersion::V1 ? /* rss from memory.stat */ 2232029184 - : /* value from memory.current - inactive_file */ 20952338432); + version == ICgroupsReader::CgroupsVersion::V1 ? /* rss from memory.stat */ 2232029184 + : /* anon from memory.stat */ 10429399040); } TEST_P(CgroupsMemoryUsageObserverFixture, DumpAllStatsTest) { const auto version = GetParam(); - auto reader = createCgroupsReader(version, tmp_dir); + auto reader = ICgroupsReader::createCgroupsReader(version, tmp_dir); ASSERT_EQ(reader->dumpAllStats(), EXPECTED[static_cast(version)]); } @@ -173,6 +173,6 @@ TEST_P(CgroupsMemoryUsageObserverFixture, DumpAllStatsTest) INSTANTIATE_TEST_SUITE_P( CgroupsMemoryUsageObserverTests, CgroupsMemoryUsageObserverFixture, - ::testing::Values(CgroupsMemoryUsageObserver::CgroupsVersion::V1, CgroupsMemoryUsageObserver::CgroupsVersion::V2)); + ::testing::Values(ICgroupsReader::CgroupsVersion::V1, ICgroupsReader::CgroupsVersion::V2)); #endif diff --git a/src/Coordination/KeeperAsynchronousMetrics.cpp b/src/Coordination/KeeperAsynchronousMetrics.cpp index 86166ffe31b..157858f3c44 100644 --- a/src/Coordination/KeeperAsynchronousMetrics.cpp +++ b/src/Coordination/KeeperAsynchronousMetrics.cpp @@ -114,8 +114,13 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM } KeeperAsynchronousMetrics::KeeperAsynchronousMetrics( - ContextPtr context_, unsigned update_period_seconds, const ProtocolServerMetricsFunc & protocol_server_metrics_func_) - : AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_), context(std::move(context_)) + ContextPtr context_, + unsigned update_period_seconds, + const ProtocolServerMetricsFunc & protocol_server_metrics_func_, + bool update_jemalloc_epoch_, + bool update_rss_) + : AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_, update_jemalloc_epoch_, update_rss_) + , context(std::move(context_)) { } diff --git a/src/Coordination/KeeperAsynchronousMetrics.h b/src/Coordination/KeeperAsynchronousMetrics.h index ec0e60cbb6e..a2ab7cab756 100644 --- a/src/Coordination/KeeperAsynchronousMetrics.h +++ b/src/Coordination/KeeperAsynchronousMetrics.h @@ -13,9 +13,13 @@ class KeeperAsynchronousMetrics : public AsynchronousMetrics { public: KeeperAsynchronousMetrics( - ContextPtr context_, unsigned update_period_seconds, const ProtocolServerMetricsFunc & protocol_server_metrics_func_); - ~KeeperAsynchronousMetrics() override; + ContextPtr context_, + unsigned update_period_seconds, + const ProtocolServerMetricsFunc & protocol_server_metrics_func_, + bool update_jemalloc_epoch_, + bool update_rss_); + ~KeeperAsynchronousMetrics() override; private: ContextPtr context; diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 4a350077596..893bb8e6082 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -148,7 +148,14 @@ void KeeperDispatcher::requestThread() Int64 mem_soft_limit = keeper_context->getKeeperMemorySoftLimit(); if (configuration_and_settings->standalone_keeper && isExceedingMemorySoftLimit() && checkIfRequestIncreaseMem(request.request)) { - LOG_WARNING(log, "Processing requests refused because of max_memory_usage_soft_limit {}, the total used memory is {}, request type is {}", ReadableSize(mem_soft_limit), ReadableSize(total_memory_tracker.get()), request.request->getOpNum()); + LOG_WARNING( + log, + "Processing requests refused because of max_memory_usage_soft_limit {}, the total allocated memory is {}, RSS is {}, request type " + "is {}", + ReadableSize(mem_soft_limit), + ReadableSize(total_memory_tracker.get()), + ReadableSize(total_memory_tracker.getRSS()), + request.request->getOpNum()); addErrorResponses({request}, Coordination::Error::ZCONNECTIONLOSS); continue; } diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index f09ea56391a..918f24efb2c 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -602,7 +602,7 @@ bool KeeperServer::isLeaderAlive() const bool KeeperServer::isExceedingMemorySoftLimit() const { Int64 mem_soft_limit = keeper_context->getKeeperMemorySoftLimit(); - return mem_soft_limit > 0 && total_memory_tracker.get() >= mem_soft_limit; + return mem_soft_limit > 0 && std::max(total_memory_tracker.get(), total_memory_tracker.getRSS()) >= mem_soft_limit; } /// TODO test whether taking failed peer in count diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index 5b2aaf5407a..18ee096569a 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -170,6 +170,7 @@ namespace DB M(Bool, prepare_system_log_tables_on_startup, false, "If true, ClickHouse creates all configured `system.*_log` tables before the startup. It can be helpful if some startup scripts depend on these tables.", 0) \ M(Double, gwp_asan_force_sample_probability, 0.0003, "Probability that an allocation from specific places will be sampled by GWP Asan (i.e. PODArray allocations)", 0) \ M(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \ + M(UInt64, memory_worker_period_ms, 0, "Tick period of background memory worker which corrects memory tracker memory usages and cleans up unused pages during higher memory usage. If set to 0, default value will be used depending on the memory usage source", 0) \ M(Bool, disable_insertion_and_mutation, false, "Disable all insert/alter/delete queries. This setting will be enabled if someone needs read-only nodes to prevent insertion and mutation affect reading performance.", 0) /// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 373cc91ebcb..7adfb42fb51 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index fb5337158ba..858b4a78430 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -152,6 +152,7 @@ class ServerType; template class MergeTreeBackgroundExecutor; class AsyncLoader; +struct ICgroupsReader; struct TemporaryTableHolder; using TemporaryTablesMapping = std::map>; diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 11df92d071f..916dee01431 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -1224,9 +1224,18 @@ void MutationsInterpreter::Source::read( createReadFromPartStep( MergeTreeSequentialSourceType::Mutation, - plan, *data, storage_snapshot, - part, alter_conversions, required_columns, - apply_deleted_mask_, std::move(filter), context_, + plan, + *data, + storage_snapshot, + part, + alter_conversions, + required_columns, + nullptr, + apply_deleted_mask_, + std::move(filter), + false, + false, + context_, getLogger("MutationsInterpreter")); } else diff --git a/src/Interpreters/ServerAsynchronousMetrics.cpp b/src/Interpreters/ServerAsynchronousMetrics.cpp index 872a9f864df..079029695c9 100644 --- a/src/Interpreters/ServerAsynchronousMetrics.cpp +++ b/src/Interpreters/ServerAsynchronousMetrics.cpp @@ -55,9 +55,11 @@ ServerAsynchronousMetrics::ServerAsynchronousMetrics( ContextPtr global_context_, unsigned update_period_seconds, unsigned heavy_metrics_update_period_seconds, - const ProtocolServerMetricsFunc & protocol_server_metrics_func_) + const ProtocolServerMetricsFunc & protocol_server_metrics_func_, + bool update_jemalloc_epoch_, + bool update_rss_) : WithContext(global_context_) - , AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_) + , AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_, update_jemalloc_epoch_, update_rss_) , heavy_metric_update_period(heavy_metrics_update_period_seconds) { /// sanity check diff --git a/src/Interpreters/ServerAsynchronousMetrics.h b/src/Interpreters/ServerAsynchronousMetrics.h index e3c83dc748e..5fab419a32b 100644 --- a/src/Interpreters/ServerAsynchronousMetrics.h +++ b/src/Interpreters/ServerAsynchronousMetrics.h @@ -14,7 +14,10 @@ public: ContextPtr global_context_, unsigned update_period_seconds, unsigned heavy_metrics_update_period_seconds, - const ProtocolServerMetricsFunc & protocol_server_metrics_func_); + const ProtocolServerMetricsFunc & protocol_server_metrics_func_, + bool update_jemalloc_epoch_, + bool update_rss_); + ~ServerAsynchronousMetrics() override; private: diff --git a/src/Interpreters/Squashing.cpp b/src/Interpreters/Squashing.cpp index 95b76c60063..c656a1a797b 100644 --- a/src/Interpreters/Squashing.cpp +++ b/src/Interpreters/Squashing.cpp @@ -45,7 +45,7 @@ Chunk Squashing::squash(Chunk && input_chunk) Chunk Squashing::add(Chunk && input_chunk) { - if (!input_chunk) + if (!input_chunk || input_chunk.getNumRows() == 0) return {}; /// Just read block is already enough. diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 7365b9334aa..5c993504245 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -38,6 +38,11 @@ #include #include #include +#include +#include +#include +#include +#include #include #include #include @@ -190,7 +195,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu } } -bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() +bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const { ProfileEvents::increment(ProfileEvents::Merge); @@ -664,7 +669,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::constructTaskForProjectionPart } -bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() // NOLINT +bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() const { /// In case if there are no projections we didn't construct a task if (!ctx->merge_projection_parts_task_ptr) @@ -683,7 +688,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() // N return true; } -bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() +bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() const { Stopwatch watch(CLOCK_MONOTONIC_COARSE); UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds(); @@ -812,33 +817,154 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const ctx->use_prefetch = all_parts_on_remote_disks && global_ctx->data->getSettings()->vertical_merge_remote_filesystem_prefetch; if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end()) - ctx->prepared_pipe = createPipeForReadingOneColumn(ctx->it_name_and_type->name); + ctx->prepared_pipeline = createPipelineForReadingOneColumn(ctx->it_name_and_type->name); return false; } -Pipe MergeTask::VerticalMergeStage::createPipeForReadingOneColumn(const String & column_name) const +/// Gathers values from all parts for one column using rows sources temporary file +class ColumnGathererStep : public ITransformingStep { - Pipes pipes; +public: + ColumnGathererStep( + const DataStream & input_stream_, + CompressedReadBufferFromFile * rows_sources_read_buf_, + UInt64 merge_block_size_rows_, + UInt64 merge_block_size_bytes_, + bool is_result_sparse_) + : ITransformingStep(input_stream_, input_stream_.header, getTraits()) + , rows_sources_read_buf(rows_sources_read_buf_) + , merge_block_size_rows(merge_block_size_rows_) + , merge_block_size_bytes(merge_block_size_bytes_) + , is_result_sparse(is_result_sparse_) + {} + + String getName() const override { return "ColumnGatherer"; } + + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override + { + const auto &header = pipeline.getHeader(); + const auto input_streams_count = pipeline.getNumStreams(); + + rows_sources_read_buf->seek(0, 0); + + auto transform = std::make_unique( + header, + input_streams_count, + *rows_sources_read_buf, + merge_block_size_rows, + merge_block_size_bytes, + is_result_sparse); + + pipeline.addTransform(std::move(transform)); + } + + void updateOutputStream() override + { + output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); + } + +private: + static Traits getTraits() + { + return ITransformingStep::Traits + { + { + .returns_single_stream = true, + .preserves_number_of_streams = true, + .preserves_sorting = true, + }, + { + .preserves_number_of_rows = false, + } + }; + } + + MergeTreeData::MergingParams merging_params{}; + CompressedReadBufferFromFile * rows_sources_read_buf; + const UInt64 merge_block_size_rows; + const UInt64 merge_block_size_bytes; + const bool is_result_sparse; +}; + +MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::VerticalMergeStage::createPipelineForReadingOneColumn(const String & column_name) const +{ + /// Read from all parts + std::vector plans; for (size_t part_num = 0; part_num < global_ctx->future_part->parts.size(); ++part_num) { - Pipe pipe = createMergeTreeSequentialSource( + auto plan_for_part = std::make_unique(); + createReadFromPartStep( MergeTreeSequentialSourceType::Merge, + *plan_for_part, *global_ctx->data, global_ctx->storage_snapshot, global_ctx->future_part->parts[part_num], global_ctx->alter_conversions[part_num], Names{column_name}, - /*mark_ranges=*/ {}, global_ctx->input_rows_filtered, /*apply_deleted_mask=*/ true, + std::nullopt, ctx->read_with_direct_io, - ctx->use_prefetch); + ctx->use_prefetch, + global_ctx->context, + getLogger("VerticalMergeStage")); - pipes.emplace_back(std::move(pipe)); + plans.emplace_back(std::move(plan_for_part)); } - return Pipe::unitePipes(std::move(pipes)); + QueryPlan merge_column_query_plan; + + /// Union of all parts streams + { + DataStreams input_streams; + input_streams.reserve(plans.size()); + for (auto & plan : plans) + input_streams.emplace_back(plan->getCurrentDataStream()); + + auto union_step = std::make_unique(std::move(input_streams)); + merge_column_query_plan.unitePlans(std::move(union_step), std::move(plans)); + } + + /// Add column gatherer step + { + bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE; + const auto data_settings = global_ctx->data->getSettings(); + auto merge_step = std::make_unique( + merge_column_query_plan.getCurrentDataStream(), + ctx->rows_sources_read_buf.get(), //global_ctx->rows_sources_temporary_file_name, + data_settings->merge_max_block_size, + data_settings->merge_max_block_size_bytes, + is_result_sparse); + merge_step->setStepDescription("Gather column"); + merge_column_query_plan.addStep(std::move(merge_step)); + } + + /// Add expression step for indexes + MergeTreeIndices indexes_to_recalc; + IndicesDescription indexes_to_recalc_description; + { + auto indexes_it = global_ctx->skip_indexes_by_column.find(column_name); + + if (indexes_it != global_ctx->skip_indexes_by_column.end()) + { + indexes_to_recalc_description = indexes_it->second; + indexes_to_recalc = MergeTreeIndexFactory::instance().getMany(indexes_it->second); + + auto indices_expression_dag = indexes_it->second.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())->getActionsDAG().clone(); + indices_expression_dag.addMaterializingOutputActions(); /// Const columns cannot be written without materialization. + auto calculate_indices_expression_step = std::make_unique( + merge_column_query_plan.getCurrentDataStream(), + std::move(indices_expression_dag)); + merge_column_query_plan.addStep(std::move(calculate_indices_expression_step)); + } + } + + auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context); + auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); + auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings); + + return {QueryPipelineBuilder::getPipeline(std::move(*builder)), std::move(indexes_to_recalc)}; } void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const @@ -848,50 +974,22 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed); global_ctx->column_progress = std::make_unique(ctx->progress_before, ctx->column_sizes->columnWeight(column_name)); - Pipe pipe; - if (ctx->prepared_pipe) + VerticalMergeRuntimeContext::PreparedColumnPipeline column_pipepline; + if (ctx->prepared_pipeline) { - pipe = std::move(*ctx->prepared_pipe); + column_pipepline = std::move(*ctx->prepared_pipeline); + /// Prepare next column pipeline to initiate prefetching auto next_column_it = std::next(ctx->it_name_and_type); if (next_column_it != global_ctx->gathering_columns.end()) - ctx->prepared_pipe = createPipeForReadingOneColumn(next_column_it->name); + ctx->prepared_pipeline = createPipelineForReadingOneColumn(next_column_it->name); } else { - pipe = createPipeForReadingOneColumn(column_name); + column_pipepline = createPipelineForReadingOneColumn(column_name); } - ctx->rows_sources_read_buf->seek(0, 0); - bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE; - - const auto data_settings = global_ctx->data->getSettings(); - auto transform = std::make_unique( - pipe.getHeader(), - pipe.numOutputPorts(), - *ctx->rows_sources_read_buf, - data_settings->merge_max_block_size, - data_settings->merge_max_block_size_bytes, - is_result_sparse); - - pipe.addTransform(std::move(transform)); - - MergeTreeIndices indexes_to_recalc; - auto indexes_it = global_ctx->skip_indexes_by_column.find(column_name); - - if (indexes_it != global_ctx->skip_indexes_by_column.end()) - { - indexes_to_recalc = MergeTreeIndexFactory::instance().getMany(indexes_it->second); - - pipe.addTransform(std::make_shared( - pipe.getHeader(), - indexes_it->second.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), - global_ctx->data->getContext()))); - - pipe.addTransform(std::make_shared(pipe.getHeader())); - } - - ctx->column_parts_pipeline = QueryPipeline(std::move(pipe)); + ctx->column_parts_pipeline = std::move(column_pipepline.pipeline); /// Dereference unique_ptr ctx->column_parts_pipeline.setProgressCallback(MergeProgressCallback( @@ -909,12 +1007,16 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const global_ctx->metadata_snapshot, columns_list, ctx->compression_codec, - indexes_to_recalc, + column_pipepline.indexes_to_recalc, getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot), &global_ctx->written_offset_columns, global_ctx->to->getIndexGranularity()); ctx->column_elems_written = 0; + + /// rows_sources_read_buf is reused for each column so we need to rewind it explicitly each time + /// This sharing also prevents from from running multiple merge of individual columns in parallel. + ctx->rows_sources_read_buf->seek(0, 0); } @@ -1219,12 +1321,200 @@ bool MergeTask::execute() } -void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() +/// Apply merge strategy (Ordinary, Colapsing, Aggregating, etc) to the stream +class MergePartsStep : public ITransformingStep +{ +public: + MergePartsStep( + const DataStream & input_stream_, + const SortDescription & sort_description_, + const Names partition_key_columns_, + const MergeTreeData::MergingParams & merging_params_, + WriteBuffer * rows_sources_write_buf_, + UInt64 merge_block_size_rows_, + UInt64 merge_block_size_bytes_, + bool blocks_are_granules_size_, + bool cleanup_, + time_t time_of_merge_) + : ITransformingStep(input_stream_, input_stream_.header, getTraits()) + , sort_description(sort_description_) + , partition_key_columns(partition_key_columns_) + , merging_params(merging_params_) + , rows_sources_write_buf(rows_sources_write_buf_) + , merge_block_size_rows(merge_block_size_rows_) + , merge_block_size_bytes(merge_block_size_bytes_) + , blocks_are_granules_size(blocks_are_granules_size_) + , cleanup(cleanup_) + , time_of_merge(time_of_merge_) + {} + + String getName() const override { return "MergeParts"; } + + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override + { + /// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number. + /// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part, + /// that is going in insertion order. + ProcessorPtr merged_transform; + + const auto &header = pipeline.getHeader(); + const auto input_streams_count = pipeline.getNumStreams(); + + switch (merging_params.mode) + { + case MergeTreeData::MergingParams::Ordinary: + merged_transform = std::make_shared( + header, + input_streams_count, + sort_description, + merge_block_size_rows, + merge_block_size_bytes, + SortingQueueStrategy::Default, + /* limit_= */0, + /* always_read_till_end_= */false, + rows_sources_write_buf, + blocks_are_granules_size); + break; + + case MergeTreeData::MergingParams::Collapsing: + merged_transform = std::make_shared( + header, input_streams_count, sort_description, merging_params.sign_column, false, + merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size); + break; + + case MergeTreeData::MergingParams::Summing: + merged_transform = std::make_shared( + header, input_streams_count, sort_description, merging_params.columns_to_sum, partition_key_columns, merge_block_size_rows, merge_block_size_bytes); + break; + + case MergeTreeData::MergingParams::Aggregating: + merged_transform = std::make_shared(header, input_streams_count, sort_description, merge_block_size_rows, merge_block_size_bytes); + break; + + case MergeTreeData::MergingParams::Replacing: + merged_transform = std::make_shared( + header, input_streams_count, sort_description, merging_params.is_deleted_column, merging_params.version_column, + merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size, + cleanup); + break; + + case MergeTreeData::MergingParams::Graphite: + merged_transform = std::make_shared( + header, input_streams_count, sort_description, merge_block_size_rows, merge_block_size_bytes, + merging_params.graphite_params, time_of_merge); + break; + + case MergeTreeData::MergingParams::VersionedCollapsing: + merged_transform = std::make_shared( + header, input_streams_count, sort_description, merging_params.sign_column, + merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size); + break; + } + + pipeline.addTransform(std::move(merged_transform)); + +#ifndef NDEBUG + if (!sort_description.empty()) + { + pipeline.addSimpleTransform([&](const Block & header_) + { + auto transform = std::make_shared(header_, sort_description); + return transform; + }); + } +#endif + } + + void updateOutputStream() override + { + output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); + } + +private: + static Traits getTraits() + { + return ITransformingStep::Traits + { + { + .returns_single_stream = true, + .preserves_number_of_streams = true, + .preserves_sorting = true, + }, + { + .preserves_number_of_rows = false, + } + }; + } + + const SortDescription sort_description; + const Names partition_key_columns; + const MergeTreeData::MergingParams merging_params{}; + WriteBuffer * rows_sources_write_buf; + const UInt64 merge_block_size_rows; + const UInt64 merge_block_size_bytes; + const bool blocks_are_granules_size; + const bool cleanup{false}; + const time_t time_of_merge{0}; +}; + +class TTLStep : public ITransformingStep +{ +public: + TTLStep( + const DataStream & input_stream_, + const ContextPtr & context_, + const MergeTreeData & storage_, + const StorageMetadataPtr & metadata_snapshot_, + const MergeTreeData::MutableDataPartPtr & data_part_, + time_t current_time, + bool force_) + : ITransformingStep(input_stream_, input_stream_.header, getTraits()) + { + transform = std::make_shared(context_, input_stream_.header, storage_, metadata_snapshot_, data_part_, current_time, force_); + subqueries_for_sets = transform->getSubqueries(); + } + + String getName() const override { return "TTL"; } + + PreparedSets::Subqueries getSubqueries() { return std::move(subqueries_for_sets); } + + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override + { + pipeline.addTransform(transform); + } + + void updateOutputStream() override + { + output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); + } + +private: + static Traits getTraits() + { + return ITransformingStep::Traits + { + { + .returns_single_stream = true, + .preserves_number_of_streams = true, + .preserves_sorting = true, + }, + { + .preserves_number_of_rows = false, + } + }; + } + + std::shared_ptr transform; + PreparedSets::Subqueries subqueries_for_sets; +}; + + +void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const { /** Read from all parts, merge and write into a new one. * In passing, we calculate expression for sorting. */ - Pipes pipes; + global_ctx->watch_prev_elapsed = 0; /// We count total amount of bytes in parts @@ -1251,143 +1541,91 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() global_ctx->horizontal_stage_progress = std::make_unique( ctx->column_sizes ? ctx->column_sizes->keyColumnsWeight() : 1.0); + /// Read from all parts + std::vector plans; for (size_t i = 0; i < global_ctx->future_part->parts.size(); ++i) { - Pipe pipe = createMergeTreeSequentialSource( + if (global_ctx->future_part->parts[i]->getMarksCount() == 0) + LOG_TRACE(ctx->log, "Part {} is empty", global_ctx->future_part->parts[i]->name); + + auto plan_for_part = std::make_unique(); + createReadFromPartStep( MergeTreeSequentialSourceType::Merge, + *plan_for_part, *global_ctx->data, global_ctx->storage_snapshot, global_ctx->future_part->parts[i], global_ctx->alter_conversions[i], global_ctx->merging_columns.getNames(), - /*mark_ranges=*/ {}, global_ctx->input_rows_filtered, /*apply_deleted_mask=*/ true, + /*filter=*/ std::nullopt, ctx->read_with_direct_io, - /*prefetch=*/ false); + /*prefetch=*/ false, + global_ctx->context, + ctx->log); - if (global_ctx->metadata_snapshot->hasSortingKey()) - { - pipe.addSimpleTransform([this](const Block & header) - { - return std::make_shared(header, global_ctx->metadata_snapshot->getSortingKey().expression); - }); - } - - pipes.emplace_back(std::move(pipe)); + plans.emplace_back(std::move(plan_for_part)); } + QueryPlan merge_parts_query_plan; - Names sort_columns = global_ctx->metadata_snapshot->getSortingKeyColumns(); - SortDescription sort_description; - sort_description.compile_sort_description = global_ctx->data->getContext()->getSettingsRef().compile_sort_description; - sort_description.min_count_to_compile_sort_description = global_ctx->data->getContext()->getSettingsRef().min_count_to_compile_sort_description; - - size_t sort_columns_size = sort_columns.size(); - sort_description.reserve(sort_columns_size); - - Names partition_key_columns = global_ctx->metadata_snapshot->getPartitionKey().column_names; - - Block header = pipes.at(0).getHeader(); - for (size_t i = 0; i < sort_columns_size; ++i) - sort_description.emplace_back(sort_columns[i], 1, 1); - -#ifndef NDEBUG - if (!sort_description.empty()) + /// Union of all parts streams { - for (size_t i = 0; i < pipes.size(); ++i) - { - auto & pipe = pipes[i]; - pipe.addSimpleTransform([&](const Block & header_) - { - auto transform = std::make_shared(header_, sort_description); - transform->setDescription(global_ctx->future_part->parts[i]->name); - return transform; - }); - } + DataStreams input_streams; + input_streams.reserve(plans.size()); + for (auto & plan : plans) + input_streams.emplace_back(plan->getCurrentDataStream()); + + auto union_step = std::make_unique(std::move(input_streams)); + merge_parts_query_plan.unitePlans(std::move(union_step), std::move(plans)); } -#endif - /// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number. - /// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part, - /// that is going in insertion order. - ProcessorPtr merged_transform; - - /// If merge is vertical we cannot calculate it - ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical); - - /// There is no sense to have the block size bigger than one granule for merge operations. - const UInt64 merge_block_size_rows = data_settings->merge_max_block_size; - const UInt64 merge_block_size_bytes = data_settings->merge_max_block_size_bytes; - - switch (ctx->merging_params.mode) + if (global_ctx->metadata_snapshot->hasSortingKey()) { - case MergeTreeData::MergingParams::Ordinary: - merged_transform = std::make_shared( - header, - pipes.size(), - sort_description, - merge_block_size_rows, - merge_block_size_bytes, - SortingQueueStrategy::Default, - /* limit_= */0, - /* always_read_till_end_= */false, - ctx->rows_sources_write_buf.get(), - ctx->blocks_are_granules_size); - break; - - case MergeTreeData::MergingParams::Collapsing: - merged_transform = std::make_shared( - header, pipes.size(), sort_description, ctx->merging_params.sign_column, false, - merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size); - break; - - case MergeTreeData::MergingParams::Summing: - merged_transform = std::make_shared( - header, pipes.size(), sort_description, ctx->merging_params.columns_to_sum, partition_key_columns, merge_block_size_rows, merge_block_size_bytes); - break; - - case MergeTreeData::MergingParams::Aggregating: - merged_transform = std::make_shared(header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes); - break; - - case MergeTreeData::MergingParams::Replacing: - if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup) - throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed"); - - merged_transform = std::make_shared( - header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column, - merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size, - global_ctx->cleanup); - break; - - case MergeTreeData::MergingParams::Graphite: - merged_transform = std::make_shared( - header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes, - ctx->merging_params.graphite_params, global_ctx->time_of_merge); - break; - - case MergeTreeData::MergingParams::VersionedCollapsing: - merged_transform = std::make_shared( - header, pipes.size(), sort_description, ctx->merging_params.sign_column, - merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size); - break; + /// Calculate sorting key expressions so that they are available for merge sorting. + auto sorting_key_expression_dag = global_ctx->metadata_snapshot->getSortingKey().expression->getActionsDAG().clone(); + auto calculate_sorting_key_expression_step = std::make_unique( + merge_parts_query_plan.getCurrentDataStream(), + std::move(sorting_key_expression_dag)); + merge_parts_query_plan.addStep(std::move(calculate_sorting_key_expression_step)); } - auto builder = std::make_unique(); - builder->init(Pipe::unitePipes(std::move(pipes))); - builder->addTransform(std::move(merged_transform)); - -#ifndef NDEBUG - if (!sort_description.empty()) + /// Merge { - builder->addSimpleTransform([&](const Block & header_) - { - auto transform = std::make_shared(header_, sort_description); - return transform; - }); + Names sort_columns = global_ctx->metadata_snapshot->getSortingKeyColumns(); + SortDescription sort_description; + sort_description.compile_sort_description = global_ctx->data->getContext()->getSettingsRef().compile_sort_description; + sort_description.min_count_to_compile_sort_description = global_ctx->data->getContext()->getSettingsRef().min_count_to_compile_sort_description; + + size_t sort_columns_size = sort_columns.size(); + sort_description.reserve(sort_columns_size); + + Names partition_key_columns = global_ctx->metadata_snapshot->getPartitionKey().column_names; + + for (size_t i = 0; i < sort_columns_size; ++i) + sort_description.emplace_back(sort_columns[i], 1, 1); + + /// If merge is vertical we cannot calculate it + ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical); + + if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup) + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed"); + + auto merge_step = std::make_unique( + merge_parts_query_plan.getCurrentDataStream(), + sort_description, + partition_key_columns, + ctx->merging_params, + ctx->rows_sources_write_buf.get(), + data_settings->merge_max_block_size, + data_settings->merge_max_block_size_bytes, + ctx->blocks_are_granules_size, + global_ctx->cleanup, + global_ctx->time_of_merge); + merge_step->setStepDescription("Merge sorted parts"); + merge_parts_query_plan.addStep(std::move(merge_step)); } -#endif if (global_ctx->deduplicate) { @@ -1406,37 +1644,50 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() } } - if (DistinctSortedTransform::isApplicable(header, sort_description, global_ctx->deduplicate_by_columns)) - builder->addTransform(std::make_shared( - builder->getHeader(), sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns)); - else - builder->addTransform(std::make_shared( - builder->getHeader(), SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns)); + auto deduplication_step = std::make_unique( + merge_parts_query_plan.getCurrentDataStream(), + SizeLimits(), 0 /*limit_hint*/, + global_ctx->deduplicate_by_columns, + false /*pre_distinct*/, + true /*optimize_distinct_in_order TODO: looks like it should be enabled*/); + deduplication_step->setStepDescription("Deduplication step"); + merge_parts_query_plan.addStep(std::move(deduplication_step)); } PreparedSets::Subqueries subqueries; + /// TTL step if (ctx->need_remove_expired_values) { - auto transform = std::make_shared(global_ctx->context, builder->getHeader(), *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl); - subqueries = transform->getSubqueries(); - builder->addTransform(std::move(transform)); + auto ttl_step = std::make_unique( + merge_parts_query_plan.getCurrentDataStream(), global_ctx->context, *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl); + subqueries = ttl_step->getSubqueries(); + ttl_step->setStepDescription("TTL step"); + merge_parts_query_plan.addStep(std::move(ttl_step)); } + /// Secondary indices expressions if (!global_ctx->merging_skip_indexes.empty()) { - builder->addTransform(std::make_shared( - builder->getHeader(), - global_ctx->merging_skip_indexes.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), - global_ctx->data->getContext()))); - - builder->addTransform(std::make_shared(builder->getHeader())); + auto indices_expression_dag = global_ctx->merging_skip_indexes.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())->getActionsDAG().clone(); + indices_expression_dag.addMaterializingOutputActions(); /// Const columns cannot be written without materialization. + auto calculate_indices_expression_step = std::make_unique( + merge_parts_query_plan.getCurrentDataStream(), + std::move(indices_expression_dag)); + merge_parts_query_plan.addStep(std::move(calculate_indices_expression_step)); } if (!subqueries.empty()) - builder = addCreatingSetsTransform(std::move(builder), std::move(subqueries), global_ctx->context); + addCreatingSetsStep(merge_parts_query_plan, std::move(subqueries), global_ctx->context); + + { + auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context); + auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); + auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings); + + global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + } - global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); /// Dereference unique_ptr and pass horizontal_stage_progress by reference global_ctx->merged_pipeline.setProgressCallback(MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->horizontal_stage_progress)); /// Is calculated inside MergeProgressCallback. diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 9450fda7b08..84f00d521fd 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -269,12 +269,12 @@ private: { bool execute() override; - bool prepare(); - bool executeImpl(); + bool prepare() const; + bool executeImpl() const; void finalize() const; /// NOTE: Using pointer-to-member instead of std::function and lambda makes stacktraces much more concise and readable - using ExecuteAndFinalizeHorizontalPartSubtasks = std::array; + using ExecuteAndFinalizeHorizontalPartSubtasks = std::array; const ExecuteAndFinalizeHorizontalPartSubtasks subtasks { @@ -289,10 +289,10 @@ private: void calculateProjections(const Block & block) const; void finalizeProjections() const; void constructTaskForProjectionPartsMerge() const; - bool executeMergeProjections(); + bool executeMergeProjections() const; MergeAlgorithm chooseMergeAlgorithm() const; - void createMergedStream(); + void createMergedStream() const; void extractMergingAndGatheringColumns() const; void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) override @@ -334,7 +334,16 @@ private: Float64 progress_before = 0; std::unique_ptr column_to{nullptr}; - std::optional prepared_pipe; + + /// Used for prefetching. Right before starting merge of a column we create a pipeline for the next column + /// and it initiates prefetching of the first range of that column. + struct PreparedColumnPipeline + { + QueryPipeline pipeline; + MergeTreeIndices indexes_to_recalc; + }; + + std::optional prepared_pipeline; size_t max_delayed_streams = 0; bool use_prefetch = false; std::list> delayed_streams; @@ -379,7 +388,7 @@ private: bool executeVerticalMergeForOneColumn() const; void finalizeVerticalMergeForOneColumn() const; - Pipe createPipeForReadingOneColumn(const String & column_name) const; + VerticalMergeRuntimeContext::PreparedColumnPipeline createPipelineForReadingOneColumn(const String & column_name) const; VerticalMergeRuntimeContextPtr ctx; GlobalRuntimeContextPtr global_ctx; diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index a120716d180..78ba02aa7ac 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -354,8 +354,11 @@ public: MergeTreeData::DataPartPtr data_part_, AlterConversionsPtr alter_conversions_, Names columns_to_read_, + std::shared_ptr> filtered_rows_count_, bool apply_deleted_mask_, std::optional filter_, + bool read_with_direct_io_, + bool prefetch_, ContextPtr context_, LoggerPtr log_) : ISourceStep(DataStream{.header = storage_snapshot_->getSampleBlockForColumns(columns_to_read_)}) @@ -365,8 +368,11 @@ public: , data_part(std::move(data_part_)) , alter_conversions(std::move(alter_conversions_)) , columns_to_read(std::move(columns_to_read_)) + , filtered_rows_count(std::move(filtered_rows_count_)) , apply_deleted_mask(apply_deleted_mask_) , filter(std::move(filter_)) + , read_with_direct_io(read_with_direct_io_) + , prefetch(prefetch_) , context(std::move(context_)) , log(log_) { @@ -410,25 +416,28 @@ public: alter_conversions, columns_to_read, std::move(mark_ranges), - /*filtered_rows_count=*/ nullptr, + filtered_rows_count, apply_deleted_mask, - /*read_with_direct_io=*/ false, - /*prefetch=*/ false); + read_with_direct_io, + prefetch); pipeline.init(Pipe(std::move(source))); } private: - MergeTreeSequentialSourceType type; + const MergeTreeSequentialSourceType type; const MergeTreeData & storage; - StorageSnapshotPtr storage_snapshot; - MergeTreeData::DataPartPtr data_part; - AlterConversionsPtr alter_conversions; - Names columns_to_read; - bool apply_deleted_mask; - std::optional filter; - ContextPtr context; - LoggerPtr log; + const StorageSnapshotPtr storage_snapshot; + const MergeTreeData::DataPartPtr data_part; + const AlterConversionsPtr alter_conversions; + const Names columns_to_read; + const std::shared_ptr> filtered_rows_count; + const bool apply_deleted_mask; + const std::optional filter; + const bool read_with_direct_io; + const bool prefetch; + const ContextPtr context; + const LoggerPtr log; }; void createReadFromPartStep( @@ -439,16 +448,28 @@ void createReadFromPartStep( MergeTreeData::DataPartPtr data_part, AlterConversionsPtr alter_conversions, Names columns_to_read, + std::shared_ptr> filtered_rows_count, bool apply_deleted_mask, std::optional filter, + bool read_with_direct_io, + bool prefetch, ContextPtr context, LoggerPtr log) { - auto reading = std::make_unique(type, - storage, storage_snapshot, - std::move(data_part), std::move(alter_conversions), - std::move(columns_to_read), apply_deleted_mask, - std::move(filter), std::move(context), log); + auto reading = std::make_unique( + type, + storage, + storage_snapshot, + std::move(data_part), + std::move(alter_conversions), + std::move(columns_to_read), + filtered_rows_count, + apply_deleted_mask, + std::move(filter), + read_with_direct_io, + prefetch, + std::move(context), + log); plan.addStep(std::move(reading)); } diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.h b/src/Storages/MergeTree/MergeTreeSequentialSource.h index 5b7c80385f6..d2ed1394dbd 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.h +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.h @@ -39,8 +39,11 @@ void createReadFromPartStep( MergeTreeData::DataPartPtr data_part, AlterConversionsPtr alter_conversions, Names columns_to_read, + std::shared_ptr> filtered_rows_count, bool apply_deleted_mask, std::optional filter, + bool read_with_direct_io, + bool prefetch, ContextPtr context, LoggerPtr log); diff --git a/src/Storages/System/StorageSystemServerSettings.cpp b/src/Storages/System/StorageSystemServerSettings.cpp index d242b6de4ec..ee99c472620 100644 --- a/src/Storages/System/StorageSystemServerSettings.cpp +++ b/src/Storages/System/StorageSystemServerSettings.cpp @@ -63,7 +63,6 @@ void StorageSystemServerSettings::fillData(MutableColumns & res_columns, Context /// current setting values, one needs to ask the components directly. std::unordered_map> changeable_settings = { {"max_server_memory_usage", {std::to_string(total_memory_tracker.getHardLimit()), ChangeableWithoutRestart::Yes}}, - {"allow_use_jemalloc_memory", {std::to_string(total_memory_tracker.getAllowUseJemallocMmemory()), ChangeableWithoutRestart::Yes}}, {"max_table_size_to_drop", {std::to_string(context->getMaxTableSizeToDrop()), ChangeableWithoutRestart::Yes}}, {"max_partition_size_to_drop", {std::to_string(context->getMaxPartitionSizeToDrop()), ChangeableWithoutRestart::Yes}}, diff --git a/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config2.xml b/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config2.xml index 25ececea3e8..e71b93379d0 100644 --- a/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config2.xml +++ b/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config2.xml @@ -16,7 +16,7 @@ az-zoo2 1 - 20000000 + 200000000 10000 diff --git a/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config3.xml b/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config3.xml index 81e343b77c9..cf4a4686f2c 100644 --- a/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config3.xml +++ b/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config3.xml @@ -13,7 +13,7 @@ 2181 3 - 20000000 + 200000000 10000 diff --git a/tests/integration/test_memory_limit/test.py b/tests/integration/test_memory_limit/test.py index 6d6745711da..db68a38c1b1 100644 --- a/tests/integration/test_memory_limit/test.py +++ b/tests/integration/test_memory_limit/test.py @@ -13,7 +13,6 @@ node = cluster.add_instance( "configs/async_metrics_no.xml", ], mem_limit="4g", - env_variables={"MALLOC_CONF": "dirty_decay_ms:0"}, ) diff --git a/tests/queries/0_stateless/03175_sparse_and_skip_index.reference b/tests/queries/0_stateless/03175_sparse_and_skip_index.reference new file mode 100644 index 00000000000..619e98a152a --- /dev/null +++ b/tests/queries/0_stateless/03175_sparse_and_skip_index.reference @@ -0,0 +1,4 @@ +key Sparse +value Sparse +1000 +1 diff --git a/tests/queries/0_stateless/03175_sparse_and_skip_index.sql b/tests/queries/0_stateless/03175_sparse_and_skip_index.sql new file mode 100644 index 00000000000..4de6d1ac6df --- /dev/null +++ b/tests/queries/0_stateless/03175_sparse_and_skip_index.sql @@ -0,0 +1,45 @@ +DROP TABLE IF EXISTS t_bloom_filter; +CREATE TABLE t_bloom_filter( + key UInt64, + value UInt64, + + INDEX key_bf key TYPE bloom_filter(0.01) GRANULARITY 2147483648, -- bloom filter on sorting key column + INDEX value_bf value TYPE bloom_filter(0.01) GRANULARITY 2147483648 -- bloom filter on no-sorting column +) ENGINE=MergeTree ORDER BY key +SETTINGS + -- settings to trigger sparse serialization and vertical merge + ratio_of_defaults_for_sparse_serialization = 0.0 + ,vertical_merge_algorithm_min_rows_to_activate = 1 + ,vertical_merge_algorithm_min_columns_to_activate = 1 + ,allow_vertical_merges_from_compact_to_wide_parts = 1 + ,min_bytes_for_wide_part=0 +; + +SYSTEM STOP MERGES t_bloom_filter; + +-- Create at least one part +INSERT INTO t_bloom_filter +SELECT + number % 100 as key, -- 100 unique keys + rand() % 100 as value -- 100 unique values +FROM numbers(50_000); + +-- And another part +INSERT INTO t_bloom_filter +SELECT + number % 100 as key, -- 100 unique keys + rand() % 100 as value -- 100 unique values +FROM numbers(50_000, 50_000); + +SYSTEM START MERGES t_bloom_filter; + +-- Merge everything into a single part +OPTIMIZE TABLE t_bloom_filter FINAL; + +-- Check sparse serialization +SELECT column, serialization_kind FROM system.parts_columns WHERE database = currentDatabase() AND table = 't_bloom_filter' AND active ORDER BY column; + +SELECT COUNT() FROM t_bloom_filter WHERE key = 1; + +-- Check bloom filter non-zero size +SELECT COUNT() FROM system.parts WHERE database = currentDatabase() AND table = 't_bloom_filter' AND secondary_indices_uncompressed_bytes > 200 AND active; diff --git a/tests/queries/0_stateless/03236_squashing_high_memory.reference b/tests/queries/0_stateless/03236_squashing_high_memory.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03236_squashing_high_memory.sql b/tests/queries/0_stateless/03236_squashing_high_memory.sql new file mode 100644 index 00000000000..f6e5dbdef03 --- /dev/null +++ b/tests/queries/0_stateless/03236_squashing_high_memory.sql @@ -0,0 +1,27 @@ +-- Tags: no-fasttest, no-asan, no-tsan, no-msan, no-ubsan +-- reason: test requires too many rows to read + +SET max_rows_to_read = '501G'; + +DROP TABLE IF EXISTS id_values; + +DROP TABLE IF EXISTS test_table; + +CREATE TABLE id_values ENGINE MergeTree ORDER BY id1 AS + SELECT arrayJoin(range(500000)) AS id1, arrayJoin(range(1000)) AS id2; + +SET max_memory_usage = '1G'; + +CREATE TABLE test_table ENGINE MergeTree ORDER BY id AS +SELECT id_values.id1 AS id, + string_values.string_val1 AS string_val1, + string_values.string_val2 AS string_val2 +FROM id_values + JOIN (SELECT arrayJoin(range(10)) AS id1, + 'qwe' AS string_val1, + 'asd' AS string_val2) AS string_values + ON id_values.id1 = string_values.id1 + SETTINGS join_algorithm = 'hash'; + +DROP TABLE IF EXISTS id_values; +DROP TABLE IF EXISTS test_table;