From 4e9d894e24041f7c462a486394f19d602403ae15 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 23 Oct 2022 01:32:35 +0200 Subject: [PATCH 001/118] Compress marks and primary key by default --- src/Storages/MergeTree/MergeTreeSettings.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index a0db39a97f1..210846b9bb2 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -147,8 +147,8 @@ struct Settings; M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \ \ /** Compress marks and primary key. */ \ - M(Bool, compress_marks, false, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \ - M(Bool, compress_primary_key, false, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \ + M(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \ + M(Bool, compress_primary_key, true, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \ M(String, marks_compression_codec, "ZSTD(3)", "Compression encoding used by marks, marks are small enough and cached, so the default compression is ZSTD(3).", 0) \ M(String, primary_key_compression_codec, "ZSTD(3)", "Compression encoding used by primary, primary key is small enough and cached, so the default compression is ZSTD(3).", 0) \ M(UInt64, marks_compress_block_size, 65536, "Mark compress block size, the actual size of the block to compress.", 0) \ From d8e8dc39e8b929ce0299f96a1be4065a74b45b99 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 3 Apr 2023 01:29:55 +0200 Subject: [PATCH 002/118] Fix test --- .../test_vertical_merges_from_compact_parts.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py b/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py index 1781ed7c976..6fdf0942ee4 100644 --- a/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py +++ b/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py @@ -41,7 +41,9 @@ def test_vertical_merges_from_comapact_parts(start_cluster): vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1, min_bytes_for_wide_part = 0, - min_rows_for_wide_part = 100; + min_rows_for_wide_part = 100, + compress_marks = 0, + compress_primary_key = 0; """.format( i ) From 01728d6d391c07e40c42cbd50151be5de472a62e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 4 Apr 2023 01:05:06 +0200 Subject: [PATCH 003/118] Fix tests --- .../configs/no_compress_marks.xml | 6 ++++++ .../test_backward_compatibility/configs/wide_parts_only.xml | 4 +--- tests/integration/test_backward_compatibility/test.py | 2 +- .../test_vertical_merges_from_compact_parts.py | 5 ++--- 4 files changed, 10 insertions(+), 7 deletions(-) create mode 100644 tests/integration/test_backward_compatibility/configs/no_compress_marks.xml diff --git a/tests/integration/test_backward_compatibility/configs/no_compress_marks.xml b/tests/integration/test_backward_compatibility/configs/no_compress_marks.xml new file mode 100644 index 00000000000..cc968525bbb --- /dev/null +++ b/tests/integration/test_backward_compatibility/configs/no_compress_marks.xml @@ -0,0 +1,6 @@ + + + 0 + 0 + + diff --git a/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml b/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml index c823dd02d5a..e9cf053f1c5 100644 --- a/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml +++ b/tests/integration/test_backward_compatibility/configs/wide_parts_only.xml @@ -1,7 +1,5 @@ 0 - 0 - 0 - + diff --git a/tests/integration/test_backward_compatibility/test.py b/tests/integration/test_backward_compatibility/test.py index 01ed02720f8..e4ee6aa2de9 100644 --- a/tests/integration/test_backward_compatibility/test.py +++ b/tests/integration/test_backward_compatibility/test.py @@ -12,7 +12,7 @@ node1 = cluster.add_instance( with_installed_binary=True, ) node2 = cluster.add_instance( - "node2", main_configs=["configs/wide_parts_only.xml"], with_zookeeper=True + "node2", main_configs=["configs/wide_parts_only.xml", "configs/no_compress_marks.xml"], with_zookeeper=True ) diff --git a/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py b/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py index 6fdf0942ee4..4b144a37ce9 100644 --- a/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py +++ b/tests/integration/test_backward_compatibility/test_vertical_merges_from_compact_parts.py @@ -14,6 +14,7 @@ node_old = cluster.add_instance( ) node_new = cluster.add_instance( "node2", + main_configs=["configs/no_compress_marks.xml"], with_zookeeper=True, stay_alive=True, ) @@ -41,9 +42,7 @@ def test_vertical_merges_from_comapact_parts(start_cluster): vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1, min_bytes_for_wide_part = 0, - min_rows_for_wide_part = 100, - compress_marks = 0, - compress_primary_key = 0; + min_rows_for_wide_part = 100; """.format( i ) From 806c7a5f9d1a6869cc464f79b6bfd83028874540 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Mon, 3 Apr 2023 23:13:50 +0000 Subject: [PATCH 004/118] Automatic style fix --- tests/integration/test_backward_compatibility/test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_backward_compatibility/test.py b/tests/integration/test_backward_compatibility/test.py index e4ee6aa2de9..ea1d3ab9c07 100644 --- a/tests/integration/test_backward_compatibility/test.py +++ b/tests/integration/test_backward_compatibility/test.py @@ -12,7 +12,9 @@ node1 = cluster.add_instance( with_installed_binary=True, ) node2 = cluster.add_instance( - "node2", main_configs=["configs/wide_parts_only.xml", "configs/no_compress_marks.xml"], with_zookeeper=True + "node2", + main_configs=["configs/wide_parts_only.xml", "configs/no_compress_marks.xml"], + with_zookeeper=True, ) From cf5d9a175ae7c61ad0a7293fd49a86a4f7e09f01 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Fri, 14 Apr 2023 16:34:19 +0200 Subject: [PATCH 005/118] Revert "Merge pull request #48760 from ClickHouse/revert-46089-background-memory-tracker" This reverts commit a61ed332239e4a35af1b9cd31479560da6f08cca, reversing changes made to 5f01b8a2b59f6f340c65bfbc4ed0dd655d997ff9. --- .../settings.md | 33 ++++++++++++++-- programs/server/Server.cpp | 20 ++++++++++ src/Common/CurrentMetrics.cpp | 1 + src/Common/MemoryTracker.cpp | 8 ++++ src/Common/MemoryTracker.h | 19 ++++++++++ src/Core/ServerSettings.h | 2 + src/Interpreters/ThreadStatusExt.cpp | 1 + src/Storages/MergeTree/MergeList.cpp | 5 +++ src/Storages/MergeTree/MergeList.h | 2 + src/Storages/StorageMergeTree.cpp | 11 +++++- src/Storages/StorageReplicatedMergeTree.cpp | 10 ++++- .../test_merges_memory_limit/__init__.py | 0 .../test_merges_memory_limit/test.py | 38 +++++++++++++++++++ 13 files changed, 145 insertions(+), 5 deletions(-) create mode 100644 tests/integration/test_merges_memory_limit/__init__.py create mode 100644 tests/integration/test_merges_memory_limit/test.py diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index 7c97d0ab640..a9f0cc276ff 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -1045,7 +1045,7 @@ Default value: `0`. ## background_pool_size {#background_pool_size} -Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance. +Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance. Before changing it, please also take a look at related MergeTree settings, such as [number_of_free_entries_in_pool_to_lower_max_size_of_merge](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-lower-max-size-of-merge) and [number_of_free_entries_in_pool_to_execute_mutation](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-execute-mutation). @@ -1063,8 +1063,8 @@ Default value: 16. ## background_merges_mutations_concurrency_ratio {#background_merges_mutations_concurrency_ratio} -Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example if the ratio equals to 2 and -`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operation could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server. +Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example, if the ratio equals to 2 and +`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operations could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server. The same as for `background_pool_size` setting `background_merges_mutations_concurrency_ratio` could be applied from the `default` profile for backward compatibility. Possible values: @@ -1079,6 +1079,33 @@ Default value: 2. 3 ``` +## merges_mutations_memory_usage_soft_limit {#merges_mutations_memory_usage_soft_limit} + +Sets the limit on how much RAM is allowed to use for performing merge and mutation operations. +Zero means unlimited. +If ClickHouse reaches this limit, it won't schedule any new background merge or mutation operations but will continue to execute already scheduled tasks. + +Possible values: + +- Any positive integer. + +**Example** + +```xml +0 +``` + +## merges_mutations_memory_usage_to_ram_ratio {#merges_mutations_memory_usage_to_ram_ratio} + +The default `merges_mutations_memory_usage_soft_limit` value is calculated as `memory_amount * merges_mutations_memory_usage_to_ram_ratio`. + +Default value: `0.5`. + +**See also** + +- [max_memory_usage](../../operations/settings/query-complexity.md#settings_max_memory_usage) +- [merges_mutations_memory_usage_soft_limit](#merges_mutations_memory_usage_soft_limit) + ## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy} Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart. diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 8c0d50bae55..cba7a4c4778 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -135,6 +135,7 @@ namespace CurrentMetrics extern const Metric Revision; extern const Metric VersionInteger; extern const Metric MemoryTracking; + extern const Metric MergesMutationsMemoryTracking; extern const Metric MaxDDLEntryID; extern const Metric MaxPushedDDLEntryID; } @@ -1225,6 +1226,25 @@ try total_memory_tracker.setDescription("(total)"); total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); + size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit; + + size_t default_merges_mutations_server_memory_usage = static_cast(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio); + if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage) + { + merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage; + LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}" + " ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)", + formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit), + formatReadableSizeWithBinarySuffix(memory_amount), + server_settings_.merges_mutations_memory_usage_to_ram_ratio); + } + + LOG_INFO(log, "Merges and mutations memory limit is set to {}", + formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit)); + background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit); + background_memory_tracker.setDescription("(background)"); + background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking); + total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory); auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker(); diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 81c1481e2de..ea248d996a7 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -53,6 +53,7 @@ M(QueryThread, "Number of query processing threads") \ M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \ M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \ + M(MergesMutationsMemoryTracking, "Total amount of memory (bytes) allocated by background tasks (merges and mutations).") \ M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \ M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \ M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \ diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 674d8d469af..9bff365483d 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -96,6 +96,7 @@ using namespace std::chrono_literals; static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; MemoryTracker total_memory_tracker(nullptr, VariableContext::Global); +MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User); std::atomic MemoryTracker::free_memory_in_allocator_arenas; @@ -528,3 +529,10 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value) while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value)) ; } + +bool canEnqueueBackgroundTask() +{ + auto limit = background_memory_tracker.getSoftLimit(); + auto amount = background_memory_tracker.get(); + return limit == 0 || amount < limit; +} diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 0d7748856bd..260005fd536 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -110,6 +110,22 @@ public: return amount.load(std::memory_order_relaxed); } + // Merges and mutations may pass memory ownership to other threads thus in the end of execution + // MemoryTracker for background task may have a non-zero counter. + // This method is intended to fix the counter inside of background_memory_tracker. + // NOTE: We can't use alloc/free methods to do it, because they also will change the value inside + // of total_memory_tracker. + void adjustOnBackgroundTaskEnd(const MemoryTracker * child) + { + auto background_memory_consumption = child->amount.load(std::memory_order_relaxed); + amount.fetch_sub(background_memory_consumption, std::memory_order_relaxed); + + // Also fix CurrentMetrics::MergesMutationsMemoryTracking + auto metric_loaded = metric.load(std::memory_order_relaxed); + if (metric_loaded != CurrentMetrics::end()) + CurrentMetrics::sub(metric_loaded, background_memory_consumption); + } + Int64 getPeak() const { return peak.load(std::memory_order_relaxed); @@ -220,3 +236,6 @@ public: }; extern MemoryTracker total_memory_tracker; +extern MemoryTracker background_memory_tracker; + +bool canEnqueueBackgroundTask(); diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index aabc89cc6d7..2d8c37783db 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -40,6 +40,8 @@ namespace DB M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \ M(UInt64, max_server_memory_usage, 0, "Limit on total memory usage. Zero means Unlimited.", 0) \ M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to ram ratio. Allows to lower max memory on low-memory systems.", 0) \ + M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Limit on total memory usage for merges and mutations. Zero means Unlimited.", 0) \ + M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to ram ratio. Allows to lower memory limit on low-memory systems.", 0) \ M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \ \ M(UInt64, max_concurrent_queries, 0, "Limit on total number of concurrently executed queries. Zero means Unlimited.", 0) \ diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index fd4a6b5e996..559652fe56c 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -84,6 +84,7 @@ ThreadGroupPtr ThreadGroup::createForBackgroundProcess(ContextPtr storage_contex group->memory_tracker.setProfilerStep(settings.memory_profiler_step); group->memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability); group->memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator); + group->memory_tracker.setParent(&background_memory_tracker); if (settings.memory_tracker_fault_probability > 0.0) group->memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability); diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 0bf662921ad..d54079bc7a5 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -78,4 +78,9 @@ MergeInfo MergeListElement::getInfo() const return res; } +MergeListElement::~MergeListElement() +{ + background_memory_tracker.adjustOnBackgroundTaskEnd(&getMemoryTracker()); +} + } diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index 9c8c2ebd1e4..d8271a66b45 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -113,6 +113,8 @@ struct MergeListElement : boost::noncopyable MergeListElement * ptr() { return this; } MergeListElement & ref() { return *this; } + + ~MergeListElement(); }; /** Maintains a list of currently running merges. diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 34bf5d55270..6cb3ce35e5b 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,7 @@ #include #include #include +#include namespace DB { @@ -918,7 +920,14 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT; - if (partition_id.empty()) + if (!canEnqueueBackgroundTask()) + { + if (out_disable_reason) + *out_disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})", + formatReadableSizeWithBinarySuffix(background_memory_tracker.get()), + formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit())); + } + else if (partition_id.empty()) { UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 5cd02c33d55..4a36cf03c2a 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -3223,7 +3224,14 @@ void StorageReplicatedMergeTree::mergeSelectingTask() auto merges_and_mutations_queued = queue.countMergesAndPartMutations(); size_t merges_and_mutations_sum = merges_and_mutations_queued.merges + merges_and_mutations_queued.mutations; - if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue) + if (!canEnqueueBackgroundTask()) + { + LOG_TRACE(log, "Reached memory limit for the background tasks ({}), so won't select new parts to merge or mutate." + "Current background tasks memory usage: {}.", + formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()), + formatReadableSizeWithBinarySuffix(background_memory_tracker.get())); + } + else if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue) { LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})" " is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.", diff --git a/tests/integration/test_merges_memory_limit/__init__.py b/tests/integration/test_merges_memory_limit/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_merges_memory_limit/test.py b/tests/integration/test_merges_memory_limit/test.py new file mode 100644 index 00000000000..04729f3a01c --- /dev/null +++ b/tests/integration/test_merges_memory_limit/test.py @@ -0,0 +1,38 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node = cluster.add_instance("node") + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_memory_limit_success(): + node.query( + "CREATE TABLE test_merge_oom ENGINE=AggregatingMergeTree ORDER BY id EMPTY AS SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(20000)" + ) + node.query("SYSTEM STOP MERGES test_merge_oom") + node.query( + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" + ) + node.query( + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" + ) + node.query( + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" + ) + _, error = node.query_and_get_answer_with_error( + "SYSTEM START MERGES test_merge_oom;SET optimize_throw_if_noop=1;OPTIMIZE TABLE test_merge_oom FINAL" + ) + + assert not error + node.query("DROP TABLE test_merge_oom") From a1f21a5fc427e860199aa9e1db3c1c4177a340f7 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Fri, 14 Apr 2023 16:00:32 +0200 Subject: [PATCH 006/118] Do not log peak for background memory tracker --- src/Common/MemoryTracker.cpp | 8 ++++++-- src/Common/MemoryTracker.h | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 9bff365483d..81cac2617c5 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -96,13 +96,17 @@ using namespace std::chrono_literals; static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; MemoryTracker total_memory_tracker(nullptr, VariableContext::Global); -MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User); +MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false); std::atomic MemoryTracker::free_memory_in_allocator_arenas; MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {} MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {} - +MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_) + : parent(parent_) + , log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_) + , level(level_) +{} MemoryTracker::~MemoryTracker() { diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 260005fd536..4e29d40c953 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -98,6 +98,7 @@ public: explicit MemoryTracker(VariableContext level_ = VariableContext::Thread); explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread); + MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_); ~MemoryTracker(); From 6f501f983ed0f9d63df4abdbbe25afe64283776b Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 17 Apr 2023 17:47:03 +0200 Subject: [PATCH 007/118] Update test.py --- tests/integration/test_merges_memory_limit/test.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/integration/test_merges_memory_limit/test.py b/tests/integration/test_merges_memory_limit/test.py index 04729f3a01c..22bf143f4f7 100644 --- a/tests/integration/test_merges_memory_limit/test.py +++ b/tests/integration/test_merges_memory_limit/test.py @@ -27,9 +27,6 @@ def test_memory_limit_success(): node.query( "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" ) - node.query( - "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" - ) _, error = node.query_and_get_answer_with_error( "SYSTEM START MERGES test_merge_oom;SET optimize_throw_if_noop=1;OPTIMIZE TABLE test_merge_oom FINAL" ) From 8cc425cd2946243f3756eed577d128705e781ceb Mon Sep 17 00:00:00 2001 From: Manas Alekar Date: Tue, 18 Apr 2023 00:14:35 -0700 Subject: [PATCH 008/118] INTO OUTFILE enhancements --- src/Client/ClientBase.cpp | 14 +++++++++++++- src/Parsers/ASTQueryWithOutput.h | 1 + src/Parsers/ParserQueryWithOutput.cpp | 6 ++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 64a18479ca9..91c8c7e6e79 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -571,6 +571,12 @@ try CompressionMethod compression_method = chooseCompressionMethod(out_file, compression_method_string); UInt64 compression_level = 3; + if (query_with_output->is_outfile_append && compression_method != CompressionMethod::None) { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Cannot append to compressed file. Please use uncompressed file or remove APPEND keyword."); + } + if (query_with_output->compression_level) { const auto & compression_level_node = query_with_output->compression_level->as(); @@ -585,8 +591,14 @@ try range.second); } + auto flags = O_WRONLY | O_EXCL; + if (query_with_output->is_outfile_append) + flags |= O_APPEND; + else + flags |= O_CREAT; + out_file_buf = wrapWriteBufferWithCompressionMethod( - std::make_unique(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT), + std::make_unique(out_file, DBMS_DEFAULT_BUFFER_SIZE, flags), compression_method, static_cast(compression_level) ); diff --git a/src/Parsers/ASTQueryWithOutput.h b/src/Parsers/ASTQueryWithOutput.h index 892d911e2e2..09f08772468 100644 --- a/src/Parsers/ASTQueryWithOutput.h +++ b/src/Parsers/ASTQueryWithOutput.h @@ -16,6 +16,7 @@ class ASTQueryWithOutput : public IAST public: ASTPtr out_file; bool is_into_outfile_with_stdout; + bool is_outfile_append; ASTPtr format; ASTPtr settings_ast; ASTPtr compression; diff --git a/src/Parsers/ParserQueryWithOutput.cpp b/src/Parsers/ParserQueryWithOutput.cpp index 2fb7c406d74..1ba44fc9939 100644 --- a/src/Parsers/ParserQueryWithOutput.cpp +++ b/src/Parsers/ParserQueryWithOutput.cpp @@ -100,6 +100,12 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec if (!out_file_p.parse(pos, query_with_output.out_file, expected)) return false; + ParserKeyword s_append("APPEND"); + if (s_append.ignore(pos, expected)) + { + query_with_output.is_outfile_append = true; + } + ParserKeyword s_stdout("AND STDOUT"); if (s_stdout.ignore(pos, expected)) { From 2efc7492270a7583654d88cb4a5391d01dccab92 Mon Sep 17 00:00:00 2001 From: Manas Alekar Date: Tue, 18 Apr 2023 00:30:00 -0700 Subject: [PATCH 009/118] Add test for INTO FILE ... APPEND. --- .../02001_append_output_file.reference | 2 ++ .../0_stateless/02001_append_output_file.sh | 15 +++++++++++++++ 2 files changed, 17 insertions(+) create mode 100644 tests/queries/0_stateless/02001_append_output_file.reference create mode 100755 tests/queries/0_stateless/02001_append_output_file.sh diff --git a/tests/queries/0_stateless/02001_append_output_file.reference b/tests/queries/0_stateless/02001_append_output_file.reference new file mode 100644 index 00000000000..6f51dfc24e1 --- /dev/null +++ b/tests/queries/0_stateless/02001_append_output_file.reference @@ -0,0 +1,2 @@ +Hello, World! From client. +Hello, World! From local. diff --git a/tests/queries/0_stateless/02001_append_output_file.sh b/tests/queries/0_stateless/02001_append_output_file.sh new file mode 100755 index 00000000000..5cbae5b8cb3 --- /dev/null +++ b/tests/queries/0_stateless/02001_append_output_file.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +set -e + +[ -e "${CLICKHOUSE_TMP}"/test_append_to_output_file] && rm "${CLICKHOUSE_TMP}"/test_append_to_output_file + +${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_append_to_output_file'" +${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_append_to_output_file' APPEND" +cat ${CLICKHOUSE_TMP}/test_append_to_output_file + +rm -f "${CLICKHOUSE_TMP}/test_append_to_output_file" From 8874ede98a05a5e41670e5c9a666c2eaa6faced0 Mon Sep 17 00:00:00 2001 From: Manas Alekar Date: Tue, 18 Apr 2023 00:56:14 -0700 Subject: [PATCH 010/118] Fix style in test script. --- tests/queries/0_stateless/02001_append_output_file.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02001_append_output_file.sh b/tests/queries/0_stateless/02001_append_output_file.sh index 5cbae5b8cb3..47ac0183d91 100755 --- a/tests/queries/0_stateless/02001_append_output_file.sh +++ b/tests/queries/0_stateless/02001_append_output_file.sh @@ -6,7 +6,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) set -e -[ -e "${CLICKHOUSE_TMP}"/test_append_to_output_file] && rm "${CLICKHOUSE_TMP}"/test_append_to_output_file +[ -e "${CLICKHOUSE_TMP}"/test_append_to_output_file ] && rm "${CLICKHOUSE_TMP}"/test_append_to_output_file ${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_append_to_output_file'" ${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local.') INTO OUTFILE '${CLICKHOUSE_TMP}/test_append_to_output_file' APPEND" From 966902fdfda744bfb386cd555c21ae168af02d53 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 16 Feb 2023 16:25:07 +0800 Subject: [PATCH 011/118] extend firt_value/last_value to accept null --- .../AggregateFunctionAny.cpp | 14 +- .../AggregateFunctionFactory.cpp | 2 +- .../AggregateFunctionMinMaxAny.h | 165 +++++++++++++++--- src/AggregateFunctions/HelpersMinMaxAny.h | 48 ++++- 4 files changed, 199 insertions(+), 30 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionAny.cpp b/src/AggregateFunctions/AggregateFunctionAny.cpp index 9bc6e6af14f..219d530c58d 100644 --- a/src/AggregateFunctions/AggregateFunctionAny.cpp +++ b/src/AggregateFunctions/AggregateFunctionAny.cpp @@ -14,11 +14,21 @@ AggregateFunctionPtr createAggregateFunctionAny(const std::string & name, const return AggregateFunctionPtr(createAggregateFunctionSingleValue(name, argument_types, parameters, settings)); } +AggregateFunctionPtr createAggregateFunctionNullableAny(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) +{ + return AggregateFunctionPtr(createAggregateFunctionSingleNullableValue(name, argument_types, parameters, settings)); +} + AggregateFunctionPtr createAggregateFunctionAnyLast(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) { return AggregateFunctionPtr(createAggregateFunctionSingleValue(name, argument_types, parameters, settings)); } +AggregateFunctionPtr createAggregateFunctionNullableAnyLast(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) +{ + return AggregateFunctionPtr(createAggregateFunctionSingleNullableValue(name, argument_types, parameters, settings)); +} + AggregateFunctionPtr createAggregateFunctionAnyHeavy(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) { return AggregateFunctionPtr(createAggregateFunctionSingleValue(name, argument_types, parameters, settings)); @@ -36,10 +46,10 @@ void registerAggregateFunctionsAny(AggregateFunctionFactory & factory) // Synonyms for use as window functions. factory.registerFunction("first_value", - { createAggregateFunctionAny, properties }, + { createAggregateFunctionNullableAny, properties }, AggregateFunctionFactory::CaseInsensitive); factory.registerFunction("last_value", - { createAggregateFunctionAnyLast, properties }, + { createAggregateFunctionNullableAnyLast, properties }, AggregateFunctionFactory::CaseInsensitive); } diff --git a/src/AggregateFunctions/AggregateFunctionFactory.cpp b/src/AggregateFunctions/AggregateFunctionFactory.cpp index 6cacf66500f..dc8b410dd9a 100644 --- a/src/AggregateFunctions/AggregateFunctionFactory.cpp +++ b/src/AggregateFunctions/AggregateFunctionFactory.cpp @@ -106,7 +106,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get( // nullability themselves. Another special case is functions from Nothing // that are rewritten to AggregateFunctionNothing, in this case // nested_function is nullptr. - if (!nested_function || !nested_function->isOnlyWindowFunction()) + if (!nested_function->getResultType()->isNullable() && (!nested_function || !nested_function->isOnlyWindowFunction())) return combinator->transformAggregateFunction(nested_function, out_properties, types_without_low_cardinality, parameters); } diff --git a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h index b984772c8ea..76cdd5690a5 100644 --- a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h +++ b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h @@ -766,19 +766,24 @@ static_assert( /// For any other value types. +template struct SingleValueDataGeneric { private: using Self = SingleValueDataGeneric; Field value; + bool has_value = false; public: - static constexpr bool is_nullable = false; + static constexpr bool is_nullable = IS_NULLABLE; static constexpr bool is_any = false; + static constexpr bool is_null_greater = IS_NULL_GREATER; bool has() const { + if constexpr (is_nullable) + return has_value; return !value.isNull(); } @@ -813,11 +818,15 @@ public: void change(const IColumn & column, size_t row_num, Arena *) { column.get(row_num, value); + if constexpr (is_nullable) + has_value = true; } void change(const Self & to, Arena *) { value = to.value; + if constexpr (is_nullable) + has_value = true; } bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena) @@ -833,7 +842,7 @@ public: bool changeFirstTime(const Self & to, Arena * arena) { - if (!has() && to.has()) + if (!has() && (is_nullable || to.has())) { change(to, arena); return true; @@ -868,27 +877,86 @@ public: } else { - Field new_value; - column.get(row_num, new_value); - if (new_value < value) + if constexpr (is_nullable) { - value = new_value; - return true; + Field new_value; + column.get(row_num, new_value); + if constexpr (!is_null_greater) + { + if (!value.isNull() && (new_value.isNull() || new_value < value)) + { + value = new_value; + return true; + } + else + return false; + } + else + { + if ((value.isNull() && !new_value.isNull()) || (!new_value.isNull() && new_value < value)) + { + value = new_value; + return true; + } + + return false; + } } else - return false; + { + Field new_value; + column.get(row_num, new_value); + if (new_value < value) + { + value = new_value; + return true; + } + else + return false; + } } } bool changeIfLess(const Self & to, Arena * arena) { - if (to.has() && (!has() || to.value < value)) + if (!to.has()) + return false; + if constexpr (is_nullable) { - change(to, arena); - return true; + if (!has()) + { + change(to, arena); + return true; + } + if constexpr (!is_null_greater) + { + if (to.value.isNull() || (!value.isNull() && to.value < value)) + { + value = to.value; + return true; + } + return false; + } + else + { + if ((value.isNull() && !to.value.isNull()) || (!to.value.isNull() || to.value < value)) + { + value = to.value; + return true; + } + return false; + } } else - return false; + { + if (!has() || to.value < value) + { + change(to, arena); + return true; + } + else + return false; + } } bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) @@ -900,27 +968,80 @@ public: } else { - Field new_value; - column.get(row_num, new_value); - if (new_value > value) + if constexpr (is_nullable) { - value = new_value; - return true; + Field new_value; + column.get(row_num, new_value); + if constexpr (is_null_greater) + { + if (!value.isNull() && (new_value.isNull() || value < new_value)) + { + value = new_value; + return true; + } + return false; + } + else + { + if ((value.isNull() && !new_value.isNull()) || (!new_value.isNull() && value < new_value)) + { + value = new_value; + return true; + } + return false; + } } else - return false; + { + Field new_value; + column.get(row_num, new_value); + if (new_value > value) + { + value = new_value; + return true; + } + else + return false; + } } } bool changeIfGreater(const Self & to, Arena * arena) { - if (to.has() && (!has() || to.value > value)) + if (!to.has()) + return false; + if constexpr (is_nullable) { - change(to, arena); - return true; + if constexpr (is_null_greater) + { + if (!value.isNull() && (to.value.isNull() || value < to.value)) + { + value = to.value; + return true; + } + return false; + } + else + { + if ((value.isNull() && !to.value.isNull()) || (!to.value.isNull() && value < to.value)) + { + value = to.value; + return true; + } + return false; + + } } else - return false; + { + if (!has() || to.value > value) + { + change(to, arena); + return true; + } + else + return false; + } } bool isEqualTo(const IColumn & column, size_t row_num) const diff --git a/src/AggregateFunctions/HelpersMinMaxAny.h b/src/AggregateFunctions/HelpersMinMaxAny.h index 026a206b109..f8fac616d2c 100644 --- a/src/AggregateFunctions/HelpersMinMaxAny.h +++ b/src/AggregateFunctions/HelpersMinMaxAny.h @@ -9,9 +9,12 @@ #include #include - namespace DB { +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} struct Settings; /// min, max, any, anyLast, anyHeavy, etc... @@ -22,7 +25,6 @@ static IAggregateFunction * createAggregateFunctionSingleValue(const String & na assertUnary(name, argument_types); const DataTypePtr & argument_type = argument_types[0]; - WhichDataType which(argument_type); #define DISPATCH(TYPE) \ if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate>>(argument_type); /// NOLINT @@ -46,7 +48,43 @@ static IAggregateFunction * createAggregateFunctionSingleValue(const String & na if (which.idx == TypeIndex::String) return new AggregateFunctionTemplate>(argument_type); - return new AggregateFunctionTemplate>(argument_type); + return new AggregateFunctionTemplate>>(argument_type); +} + +template