From a11e67d4aae4433dd0f3d8ee46ba40e1cd73fdd5 Mon Sep 17 00:00:00 2001 From: Alexander Sapin Date: Fri, 16 Feb 2024 16:41:58 +0100 Subject: [PATCH 001/101] Make max_insert_delayed_streams_for_parallel_write actually work --- src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 1fb2393948a..f5494e56049 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -323,6 +323,9 @@ void ReplicatedMergeTreeSinkImpl::consume(Chunk chunk) if (!temp_part.part) continue; + if (!support_parallel_write && temp_part.part->getDataPartStorage().supportParallelWrite()) + support_parallel_write = true; + BlockIDsType block_id; if constexpr (async_insert) From 458793cc50b92361848c91803d07105a91acea85 Mon Sep 17 00:00:00 2001 From: Alexander Sapin Date: Fri, 16 Feb 2024 17:13:37 +0100 Subject: [PATCH 002/101] Review fix --- src/Storages/MergeTree/MergeTreeSink.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 36816904a81..ebc49e22d03 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -109,9 +109,14 @@ void MergeTreeSink::consume(Chunk chunk) } } - size_t max_insert_delayed_streams_for_parallel_write = DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE; - if (!support_parallel_write || settings.max_insert_delayed_streams_for_parallel_write.changed) + size_t max_insert_delayed_streams_for_parallel_write; + + if (settings.max_insert_delayed_streams_for_parallel_write.changed) max_insert_delayed_streams_for_parallel_write = settings.max_insert_delayed_streams_for_parallel_write; + else if (support_parallel_write) + max_insert_delayed_streams_for_parallel_write = DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE; + else + max_insert_delayed_streams_for_parallel_write = 0; /// In case of too much columns/parts in block, flush explicitly. streams += temp_part.streams.size(); From f7b524465c60b15c85f579ca22c48d4c165bf6f2 Mon Sep 17 00:00:00 2001 From: Alexander Sapin Date: Fri, 16 Feb 2024 17:14:36 +0100 Subject: [PATCH 003/101] Followup --- src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index f5494e56049..3cbdcf5106e 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -368,9 +368,13 @@ void ReplicatedMergeTreeSinkImpl::consume(Chunk chunk) profile_events_scope.reset(); UInt64 elapsed_ns = watch.elapsed(); - size_t max_insert_delayed_streams_for_parallel_write = DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE; - if (!support_parallel_write || settings.max_insert_delayed_streams_for_parallel_write.changed) + size_t max_insert_delayed_streams_for_parallel_write; + if (settings.max_insert_delayed_streams_for_parallel_write.changed) max_insert_delayed_streams_for_parallel_write = settings.max_insert_delayed_streams_for_parallel_write; + else if (support_parallel_write) + max_insert_delayed_streams_for_parallel_write = DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE; + else + max_insert_delayed_streams_for_parallel_write = 0; /// In case of too much columns/parts in block, flush explicitly. streams += temp_part.streams.size(); From 1768b4477f4ff5db238cd4cc553587b136ed015d Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 3 Mar 2024 11:50:22 +0100 Subject: [PATCH 004/101] Revert "Merge pull request #60690 from ClickHouse/remove-bad-test-8" This reverts commit c77eb8b1427f98daf63f7087bbdc0530b07db825, reversing changes made to bae4783fe9bd25decc41383a1234b0e936284c21. --- ..._external_tables_memory_tracking.reference | 16 ++++++ ...52_http_external_tables_memory_tracking.sh | 51 +++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 tests/queries/0_stateless/02152_http_external_tables_memory_tracking.reference create mode 100755 tests/queries/0_stateless/02152_http_external_tables_memory_tracking.sh diff --git a/tests/queries/0_stateless/02152_http_external_tables_memory_tracking.reference b/tests/queries/0_stateless/02152_http_external_tables_memory_tracking.reference new file mode 100644 index 00000000000..1fc09c8d154 --- /dev/null +++ b/tests/queries/0_stateless/02152_http_external_tables_memory_tracking.reference @@ -0,0 +1,16 @@ +Checking input_format_parallel_parsing=false& +1 +Checking input_format_parallel_parsing=false&cancel_http_readonly_queries_on_client_close=1&readonly=1 +1 +Checking input_format_parallel_parsing=false&send_progress_in_http_headers=true +1 +Checking input_format_parallel_parsing=false&cancel_http_readonly_queries_on_client_close=1&readonly=1&send_progress_in_http_headers=true +1 +Checking input_format_parallel_parsing=true& +1 +Checking input_format_parallel_parsing=true&cancel_http_readonly_queries_on_client_close=1&readonly=1 +1 +Checking input_format_parallel_parsing=true&send_progress_in_http_headers=true +1 +Checking input_format_parallel_parsing=true&cancel_http_readonly_queries_on_client_close=1&readonly=1&send_progress_in_http_headers=true +1 diff --git a/tests/queries/0_stateless/02152_http_external_tables_memory_tracking.sh b/tests/queries/0_stateless/02152_http_external_tables_memory_tracking.sh new file mode 100755 index 00000000000..5f9eb460e44 --- /dev/null +++ b/tests/queries/0_stateless/02152_http_external_tables_memory_tracking.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# Tags: no-tsan, no-cpu-aarch64, no-parallel +# TSan does not supports tracing. +# trace_log doesn't work on aarch64 + +# Regression for proper release of Context, +# via tracking memory of external tables. + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +tmp_file=$(mktemp "$CURDIR/clickhouse.XXXXXX.csv") +trap 'rm $tmp_file' EXIT + +$CLICKHOUSE_CLIENT -q "SELECT toString(number) FROM numbers(1e6) FORMAT TSV" > "$tmp_file" + +function run_and_check() +{ + local query_id + query_id="$(${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}" --data-binary @- <<<'SELECT generateUUIDv4()')" + + echo "Checking $*" + + # Run query with external table (implicit StorageMemory user) + $CLICKHOUSE_CURL -sS -F "s=@$tmp_file;" "$CLICKHOUSE_URL&s_structure=key+Int&query=SELECT+count()+FROM+s&memory_profiler_sample_probability=1&max_untracked_memory=0&query_id=$query_id&$*" -o /dev/null + + ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}" --data-binary @- <<<'SYSTEM FLUSH LOGS' + + # Check that temporary table had been destroyed. + ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&allow_introspection_functions=1" --data-binary @- <<<" + WITH arrayStringConcat(arrayMap(x -> demangle(addressToSymbol(x)), trace), '\n') AS sym + SELECT count()>0 FROM system.trace_log + WHERE + sym LIKE '%DB::StorageMemory::drop%\n%TemporaryTableHolder::~TemporaryTableHolder%' AND + query_id = '$query_id' + " +} + +for input_format_parallel_parsing in false true; do + query_args_variants=( + "" + "cancel_http_readonly_queries_on_client_close=1&readonly=1" + "send_progress_in_http_headers=true" + # nested progress callback + "cancel_http_readonly_queries_on_client_close=1&readonly=1&send_progress_in_http_headers=true" + ) + for query_args in "${query_args_variants[@]}"; do + run_and_check "input_format_parallel_parsing=$input_format_parallel_parsing&$query_args" + done +done From 048a042dc4963631a23358d3e454dcd8a9eaafa2 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 3 Mar 2024 11:50:46 +0100 Subject: [PATCH 005/101] Make 02152_http_external_tables_memory_tracking less flaky Signed-off-by: Azat Khuzhin --- .../02152_http_external_tables_memory_tracking.sh | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/queries/0_stateless/02152_http_external_tables_memory_tracking.sh b/tests/queries/0_stateless/02152_http_external_tables_memory_tracking.sh index 5f9eb460e44..5494f7d59cb 100755 --- a/tests/queries/0_stateless/02152_http_external_tables_memory_tracking.sh +++ b/tests/queries/0_stateless/02152_http_external_tables_memory_tracking.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# Tags: no-tsan, no-cpu-aarch64, no-parallel +# Tags: no-tsan, no-cpu-aarch64, no-parallel, no-debug # TSan does not supports tracing. # trace_log doesn't work on aarch64 @@ -30,10 +30,16 @@ function run_and_check() # Check that temporary table had been destroyed. ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&allow_introspection_functions=1" --data-binary @- <<<" WITH arrayStringConcat(arrayMap(x -> demangle(addressToSymbol(x)), trace), '\n') AS sym - SELECT count()>0 FROM system.trace_log + SELECT 1 FROM system.trace_log + PREWHERE + query_id = '$query_id' AND + trace_type = 'MemorySample' AND + /* only deallocations */ + size < 0 AND + event_date >= yesterday() WHERE - sym LIKE '%DB::StorageMemory::drop%\n%TemporaryTableHolder::~TemporaryTableHolder%' AND - query_id = '$query_id' + sym LIKE '%DB::StorageMemory::drop%\n%TemporaryTableHolder::~TemporaryTableHolder%' + LIMIT 1 " } From f2a3ffe9eb79046093e77ed39f2366754e7a8ba2 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Thu, 7 Mar 2024 17:14:12 +0800 Subject: [PATCH 006/101] Replace some headers with forward decl --- src/Backups/BackupCoordinationRemote.cpp | 1 + .../NamedCollections/NamedCollectionUtils.cpp | 1 + .../ObjectStorages/ObjectStorageFactory.cpp | 3 +++ src/Formats/ReadSchemaUtils.cpp | 1 + src/Interpreters/DatabaseCatalog.cpp | 1 - src/Interpreters/DatabaseCatalog.h | 10 +++---- src/Processors/QueryPlan/AggregatingStep.cpp | 1 + src/Processors/QueryPlan/CubeStep.cpp | 1 + src/Storages/StorageAzureBlob.cpp | 4 +++ src/Storages/StorageS3.h | 27 ++++++++++--------- 10 files changed, 29 insertions(+), 21 deletions(-) diff --git a/src/Backups/BackupCoordinationRemote.cpp b/src/Backups/BackupCoordinationRemote.cpp index 9c509858b2a..b869f890f56 100644 --- a/src/Backups/BackupCoordinationRemote.cpp +++ b/src/Backups/BackupCoordinationRemote.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include diff --git a/src/Common/NamedCollections/NamedCollectionUtils.cpp b/src/Common/NamedCollections/NamedCollectionUtils.cpp index fe0f42467c7..e3ff50f5e3f 100644 --- a/src/Common/NamedCollections/NamedCollectionUtils.cpp +++ b/src/Common/NamedCollections/NamedCollectionUtils.cpp @@ -17,6 +17,7 @@ #include #include +#include namespace fs = std::filesystem; diff --git a/src/Disks/ObjectStorages/ObjectStorageFactory.cpp b/src/Disks/ObjectStorages/ObjectStorageFactory.cpp index 4f198be64fe..5fae257e8d4 100644 --- a/src/Disks/ObjectStorages/ObjectStorageFactory.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageFactory.cpp @@ -21,6 +21,9 @@ #include #include +#include + +namespace fs = std::filesystem; namespace DB { diff --git a/src/Formats/ReadSchemaUtils.cpp b/src/Formats/ReadSchemaUtils.cpp index 5badf4301bf..736a35927c3 100644 --- a/src/Formats/ReadSchemaUtils.cpp +++ b/src/Formats/ReadSchemaUtils.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB { diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index a9fd5c852ba..a5a523b658b 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -26,7 +26,6 @@ #include #include -#include "Interpreters/Context_fwd.h" #include "config.h" #if USE_MYSQL diff --git a/src/Interpreters/DatabaseCatalog.h b/src/Interpreters/DatabaseCatalog.h index 4fe114cc493..6995fc51941 100644 --- a/src/Interpreters/DatabaseCatalog.h +++ b/src/Interpreters/DatabaseCatalog.h @@ -1,15 +1,14 @@ #pragma once #include +#include +#include #include #include -#include #include #include -#include "Common/NamePrompter.h" +#include #include -#include "Storages/IStorage.h" -#include "Databases/IDatabase.h" #include #include @@ -23,9 +22,6 @@ #include #include #include -#include - -namespace fs = std::filesystem; namespace DB { diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index f374a7b7b10..a76bacdd97b 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Processors/QueryPlan/CubeStep.cpp b/src/Processors/QueryPlan/CubeStep.cpp index 0c632c346c7..bf2ce148529 100644 --- a/src/Processors/QueryPlan/CubeStep.cpp +++ b/src/Processors/QueryPlan/CubeStep.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB { diff --git a/src/Storages/StorageAzureBlob.cpp b/src/Storages/StorageAzureBlob.cpp index 1f0fba99f84..2d4f1db04a1 100644 --- a/src/Storages/StorageAzureBlob.cpp +++ b/src/Storages/StorageAzureBlob.cpp @@ -41,6 +41,10 @@ #include #include +#include + +namespace fs = std::filesystem; + using namespace Azure::Storage::Blobs; namespace CurrentMetrics diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 65fb3b51be2..bf81ead0599 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -4,27 +4,28 @@ #if USE_AWS_S3 -#include - #include - -#include -#include - -#include -#include -#include -#include -#include +#include #include +#include +#include #include #include -#include +#include +#include +#include #include +#include #include #include +#include #include -#include +#include +#include + +#include + +namespace fs = std::filesystem; namespace Aws::S3 { From 7fd13df8a5055892d2f8cdc83dcb900c19c87a95 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 7 Mar 2024 17:09:55 +0100 Subject: [PATCH 007/101] check memory limit periodically --- programs/keeper/Keeper.cpp | 10 +++++++++ programs/server/Server.cpp | 1 + src/Common/CgroupsMemoryUsageObserver.cpp | 26 +++++++++++++++++------ src/Common/CgroupsMemoryUsageObserver.h | 7 ++++-- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index 8972c82eab8..76dd8cb15a5 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -623,6 +624,15 @@ try buildLoggers(config(), logger()); main_config_reloader->start(); + std::optional observer; + auto cgroups_memory_observer_wait_time = config().getUInt64("keeper_server.cgroups_memory_observer_wait_time", 1); + if (cgroups_memory_observer_wait_time > 0) + { + observer.emplace(std::chrono::seconds(cgroups_memory_observer_wait_time)); + observer->startThread(); + } + + LOG_INFO(log, "Ready for connections."); waitForTerminationRequest(); diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index c45291ba52c..6b282893dee 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1362,6 +1362,7 @@ try cgroups_memory_usage_observer->setLimits( static_cast(max_server_memory_usage * hard_limit_ratio), static_cast(max_server_memory_usage * soft_limit_ratio)); + cgroups_memory_usage_observer->startThread(); } size_t merges_mutations_memory_usage_soft_limit = new_server_settings.merges_mutations_memory_usage_soft_limit; diff --git a/src/Common/CgroupsMemoryUsageObserver.cpp b/src/Common/CgroupsMemoryUsageObserver.cpp index 9bed6b191e4..5f24c2553b5 100644 --- a/src/Common/CgroupsMemoryUsageObserver.cpp +++ b/src/Common/CgroupsMemoryUsageObserver.cpp @@ -8,7 +8,9 @@ #include #include #include +#include #include +#include #include #include @@ -48,11 +50,10 @@ CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver() void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_limit_) { + std::lock_guard lock(set_limit_mutex); if (hard_limit_ == hard_limit && soft_limit_ == soft_limit) return; - stopThread(); - hard_limit = hard_limit_; soft_limit = soft_limit_; @@ -94,8 +95,6 @@ void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_l } }; - startThread(); - LOG_INFO(log, "Set new limits, soft limit: {}, hard limit: {}", ReadableSize(soft_limit_), ReadableSize(hard_limit_)); } @@ -277,7 +276,7 @@ void CgroupsMemoryUsageObserver::stopThread() void CgroupsMemoryUsageObserver::runThread() { setThreadName("CgrpMemUsgObsr"); - + last_memory_amount = getMemoryAmount(); std::unique_lock lock(thread_mutex); while (true) { @@ -286,8 +285,21 @@ void CgroupsMemoryUsageObserver::runThread() try { - uint64_t memory_usage = file.readMemoryUsage(); - processMemoryUsage(memory_usage); + uint64_t memory_limit = getMemoryAmount(); + if (memory_limit != last_memory_amount) + { + last_memory_amount = memory_limit; + /// if we find memory amount changes, we just reload config. + /// Reloading config will check the memory amount again and calculate soft/hard limit again. + auto global_context = getContext()->getGlobalContext(); + global_context->reloadConfig(); + } + std::lock_guard set_limit_lock(set_limit_mutex); + if (soft_limit > 0 && hard_limit > 0) + { + uint64_t memory_usage = file.readMemoryUsage(); + processMemoryUsage(memory_usage); + } } catch (...) { diff --git a/src/Common/CgroupsMemoryUsageObserver.h b/src/Common/CgroupsMemoryUsageObserver.h index 28bf08c82b5..6edf2e2049d 100644 --- a/src/Common/CgroupsMemoryUsageObserver.h +++ b/src/Common/CgroupsMemoryUsageObserver.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -14,7 +15,7 @@ namespace DB /// - When the soft memory limit is hit, drop jemalloc cache. /// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster. #if defined(OS_LINUX) -class CgroupsMemoryUsageObserver +class CgroupsMemoryUsageObserver : public WithContext { public: enum class CgroupsVersion @@ -27,6 +28,7 @@ public: ~CgroupsMemoryUsageObserver(); void setLimits(uint64_t hard_limit_, uint64_t soft_limit_); + void startThread(); size_t getHardLimit() const { return hard_limit; } size_t getSoftLimit() const { return soft_limit; } @@ -64,16 +66,17 @@ private: File file; - void startThread(); void stopThread(); void runThread(); void processMemoryUsage(uint64_t usage); std::mutex thread_mutex; + std::mutex set_limit_mutex; std::condition_variable cond; ThreadFromGlobalPool thread; bool quit = false; + uint64_t last_memory_amount; }; #else From f4fc65449cc3ace36f33323600fd1a47fbfb9736 Mon Sep 17 00:00:00 2001 From: Peter Date: Fri, 8 Mar 2024 01:20:50 +0800 Subject: [PATCH 008/101] Add another example dataset for presenting usage --- .../example-datasets/tw-weather.md | 293 ++++++++++++++++++ 1 file changed, 293 insertions(+) create mode 100644 docs/en/getting-started/example-datasets/tw-weather.md diff --git a/docs/en/getting-started/example-datasets/tw-weather.md b/docs/en/getting-started/example-datasets/tw-weather.md new file mode 100644 index 00000000000..e5f16c403d5 --- /dev/null +++ b/docs/en/getting-started/example-datasets/tw-weather.md @@ -0,0 +1,293 @@ +--- +slug: /en/getting-started/example-datasets/tw-weather +sidebar_label: Taiwan Historical Weather Datasets +sidebar_position: 1 +description: 131 million rows of weather observation data for the last 128 yrs +--- + +# Taiwan Historical Weather Datasets + +This dataset contains historical meteorological observations measurements for the last 128 years. Each row is a measurement for a point in date time and weather station. + +The origin of this dataset is available [here](https://github.com/Raingel/historical_weather) and the list of weather station numbers can be found [here](https://github.com/Raingel/weather_station_list). + +> The sources of meteorological datasets include the meteorological stations that are established by the Central Weather Administration (station code is beginning with C0, C1, and 4) and the agricultural meteorological stations belonging to the Council of Agriculture (station code other than those mentioned above): + + - StationId + - MeasuredDate, the observation time + - StnPres, the station air pressure + - SeaPres, the sea level pressure + - Td, the dew point temperature + - RH, the relative humidity + - Other elements where available + +## Downloading the data + +- A [pre-processed version](#pre-processed-data) of the data for the ClickHouse, which has been cleaned, re-structured, and enriched. This dataset covers the years from 1896 to 2023. +- [Download the original raw data](#original-raw-data) and convert to the format required by ClickHouse. Users wanting to add their own columns may wish to explore or complete their approaches. + +### Pre-processed data + +The dataset has also been re-structured from a measurement per line to a row per weather station id and measured date, i.e. + +```csv +StationId,MeasuredDate,StnPres,Tx,RH,WS,WD,WSGust,WDGust,Precp,GloblRad,TxSoil0cm,TxSoil5cm,TxSoil20cm,TxSoil50cm,TxSoil100cm,SeaPres,Td,PrecpHour,SunShine,TxSoil10cm,EvapA,Visb,UVI,Cloud Amount,TxSoil30cm,TxSoil200cm,TxSoil300cm,TxSoil500cm,VaporPressure +C0X100,2016-01-01 01:00:00,1022.1,16.1,72,1.1,8.0,,,,,,,,,,,,,,,,,,,,,,, +C0X100,2016-01-01 02:00:00,1021.6,16.0,73,1.2,358.0,,,,,,,,,,,,,,,,,,,,,,, +C0X100,2016-01-01 03:00:00,1021.3,15.8,74,1.5,353.0,,,,,,,,,,,,,,,,,,,,,,, +C0X100,2016-01-01 04:00:00,1021.2,15.8,74,1.7,8.0,,,,,,,,,,,,,,,,,,,,,,, +``` + +It is easy to query and ensure that the resulting table has less sparse and some elements are null because they're not available to be measured in this weather station. + +This dataset is available in the following Google CloudStorage location. Either download the dataset to your local filesystem (and insert them with the ClickHouse client) or insert them directly into the ClickHouse (see [Inserting from URL](#inserting-from-url)). + +To download: + +```bash +wget https://storage.googleapis.com/taiwan-weather-observaiton-datasets/preprocessed_weather_daily_1896_2023.tar.gz + +# Option: Validate the checksum +md5sum preprocessed_weather_daily_1896_2023.tar.gz +# Checksum should be equal to: 11b484f5bd9ddafec5cfb131eb2dd008 + +tar -xzvf preprocessed_weather_daily_1896_2023.tar.gz +daily_weather_preprocessed_1896_2023.csv + +# Option: Validate the checksum +md5sum daily_weather_preprocessed_1896_2023.csv +# Checksum should be equal to: 1132248c78195c43d93f843753881754 +``` + +### Original raw data + +The following details are about the steps to download the original raw data to transform and convert as you want. + +#### Download + +To download the original raw data: + +```bash +mkdir tw_raw_weather_data && cd tw_raw_weather_data + +wget https://storage.googleapis.com/taiwan-weather-observaiton-datasets/raw_data_weather_daily_1896_2023.tar.gz + +# Option: Validate the checksum +md5sum raw_data_weather_daily_1896_2023.tar.gz +# Checksum should be equal to: b66b9f137217454d655e3004d7d1b51a + +tar -xzvf raw_data_weather_daily_1896_2023.tar.gz +466920_1928.csv +466920_1929.csv +466920_1930.csv +466920_1931.csv +... + +# Option: Validate the checksum +cat *.csv | md5sum +# Checksum should be equal to: b26db404bf84d4063fac42e576464ce1 +``` + +#### Retrieve the Taiwan weather stations + +```bash +wget -O weather_sta_list.csv https://github.com/Raingel/weather_station_list/raw/main/data/weather_sta_list.csv + +# Option: Convert the UTF-8-BOM to UTF-8 encoding +sed -i '1s/^\xEF\xBB\xBF//' weather_sta_list.csv +``` + +## Create table schema + +Create the MergeTree table in ClickHouse (from the ClickHouse client). + +```bash +CREATE TABLE tw_weather_data ( + StationId String null, + MeasuredDate DateTime64, + StnPres Float64 null, + SeaPres Float64 null, + Tx Float64 null, + Td Float64 null, + RH Float64 null, + WS Float64 null, + WD Float64 null, + WSGust Float64 null, + WDGust Float64 null, + Precp Float64 null, + PrecpHour Float64 null, + SunShine Float64 null, + GloblRad Float64 null, + TxSoil0cm Float64 null, + TxSoil5cm Float64 null, + TxSoil10cm Float64 null, + TxSoil20cm Float64 null, + TxSoil50cm Float64 null, + TxSoil100cm Float64 null, + TxSoil30cm Float64 null, + TxSoil200cm Float64 null, + TxSoil300cm Float64 null, + TxSoil500cm Float64 null, + VaporPressure Float64 null, + UVI Float64 null, + "Cloud Amount" Float64 null, + EvapA Float64 null, + Visb Float64 null +) +ENGINE = MergeTree +ORDER BY (MeasuredDate); +``` + +## Inserting into ClickHouse + +### Inserting from local file + +Data can be inserted from a local file as follows (from the ClickHouse client): + +```sql +INSERT INTO tw_weather_data FROM INFILE '/path/to/daily_weather_preprocessed_1896_2023.csv' +``` + +where `/path/to` represents the specific user path to the local file on the disk. + +And the sample response output is as follows after inserting data into the ClickHouse: + +```response +Query id: 90e4b524-6e14-4855-817c-7e6f98fbeabb + +Ok. +131985329 rows in set. Elapsed: 71.770 sec. Processed 131.99 million rows, 10.06 GB (1.84 million rows/s., 140.14 MB/s.) +Peak memory usage: 583.23 MiB. +``` + +### Inserting from URL + +```sql +INSERT INTO tw_weather_data SELECT * +FROM url('https://storage.googleapis.com/taiwan-weather-observaiton-datasets/daily_weather_preprocessed_1896_2023.csv', 'CSVWithNames') + +``` +To know how to speed this up, please see our blog post on [tuning large data loads](https://clickhouse.com/blog/supercharge-your-clickhouse-data-loads-part2). + +## Check data rows and sizes + +1. Let's see how many rows are inserted: + +```sql +SELECT formatReadableQuantity(count()) +FROM tw_weather_data; +``` + +```response +┌─formatReadableQuantity(count())─┐ +│ 131.99 million │ +└─────────────────────────────────┘ +``` + +2. Let's see how much disk space are used for this table: + +```sql +SELECT + formatReadableSize(sum(bytes)) AS disk_size, + formatReadableSize(sum(data_uncompressed_bytes)) AS uncompressed_size +FROM system.parts +WHERE (`table` = 'tw_weather_data') AND active +``` + +```response +┌─disk_size─┬─uncompressed_size─┐ +│ 2.13 GiB │ 32.94 GiB │ +└───────────┴───────────────────┘ +``` + +## Sample queries + +### Q1: Retrieve the highest dew point temperature for each weather station in the specific year + +```sql +SELECT + StationId, + max(Td) AS max_td +FROM tw_weather_data +WHERE (year(MeasuredDate) = 2023) AND (Td IS NOT NULL) +GROUP BY StationId + +┌─StationId─┬─max_td─┐ +│ 466940 │ 1 │ +│ 467300 │ 1 │ +│ 467540 │ 1 │ +│ 467490 │ 1 │ +│ 467080 │ 1 │ +│ 466910 │ 1 │ +│ 467660 │ 1 │ +│ 467270 │ 1 │ +│ 467350 │ 1 │ +│ 467571 │ 1 │ +│ 466920 │ 1 │ +│ 467650 │ 1 │ +│ 467550 │ 1 │ +│ 467480 │ 1 │ +│ 467610 │ 1 │ +│ 467050 │ 1 │ +│ 467590 │ 1 │ +│ 466990 │ 1 │ +│ 467060 │ 1 │ +│ 466950 │ 1 │ +│ 467620 │ 1 │ +│ 467990 │ 1 │ +│ 466930 │ 1 │ +│ 467110 │ 1 │ +│ 466881 │ 1 │ +│ 467410 │ 1 │ +│ 467441 │ 1 │ +│ 467420 │ 1 │ +│ 467530 │ 1 │ +│ 466900 │ 1 │ +└───────────┴────────┘ + +30 rows in set. Elapsed: 0.045 sec. Processed 6.41 million rows, 187.33 MB (143.92 million rows/s., 4.21 GB/s.) +``` + +### Q2: Raw data fetching with the specific duration time range, fields and weather station + +```sql +SELECT + StnPres, + SeaPres, + Tx, + Td, + RH, + WS, + WD, + WSGust, + WDGust, + Precp, + PrecpHour +FROM tw_weather_data +WHERE (StationId = 'C0UB10') AND (MeasuredDate >= '2023-12-23') AND (MeasuredDate < '2023-12-24') +ORDER BY MeasuredDate ASC +LIMIT 10 +``` + +```response +┌─StnPres─┬─SeaPres─┬───Tx─┬───Td─┬─RH─┬──WS─┬──WD─┬─WSGust─┬─WDGust─┬─Precp─┬─PrecpHour─┐ +│ 1029.5 │ ᴺᵁᴸᴸ │ 11.8 │ ᴺᵁᴸᴸ │ 78 │ 2.7 │ 271 │ 5.5 │ 275 │ -99.8 │ -99.8 │ +│ 1029.8 │ ᴺᵁᴸᴸ │ 12.3 │ ᴺᵁᴸᴸ │ 78 │ 2.7 │ 289 │ 5.5 │ 308 │ -99.8 │ -99.8 │ +│ 1028.6 │ ᴺᵁᴸᴸ │ 12.3 │ ᴺᵁᴸᴸ │ 79 │ 2.3 │ 251 │ 6.1 │ 289 │ -99.8 │ -99.8 │ +│ 1028.2 │ ᴺᵁᴸᴸ │ 13 │ ᴺᵁᴸᴸ │ 75 │ 4.3 │ 312 │ 7.5 │ 316 │ -99.8 │ -99.8 │ +│ 1027.8 │ ᴺᵁᴸᴸ │ 11.1 │ ᴺᵁᴸᴸ │ 89 │ 7.1 │ 310 │ 11.6 │ 322 │ -99.8 │ -99.8 │ +│ 1027.8 │ ᴺᵁᴸᴸ │ 11.6 │ ᴺᵁᴸᴸ │ 90 │ 3.1 │ 269 │ 10.7 │ 295 │ -99.8 │ -99.8 │ +│ 1027.9 │ ᴺᵁᴸᴸ │ 12.3 │ ᴺᵁᴸᴸ │ 89 │ 4.7 │ 296 │ 8.1 │ 310 │ -99.8 │ -99.8 │ +│ 1028.2 │ ᴺᵁᴸᴸ │ 12.2 │ ᴺᵁᴸᴸ │ 94 │ 2.5 │ 246 │ 7.1 │ 283 │ -99.8 │ -99.8 │ +│ 1028.4 │ ᴺᵁᴸᴸ │ 12.5 │ ᴺᵁᴸᴸ │ 94 │ 3.1 │ 265 │ 4.8 │ 297 │ -99.8 │ -99.8 │ +│ 1028.3 │ ᴺᵁᴸᴸ │ 13.6 │ ᴺᵁᴸᴸ │ 91 │ 1.2 │ 273 │ 4.4 │ 256 │ -99.8 │ -99.8 │ +└─────────┴─────────┴──────┴──────┴────┴─────┴─────┴────────┴────────┴───────┴───────────┘ + +10 rows in set. Elapsed: 0.009 sec. Processed 91.70 thousand rows, 2.33 MB (9.67 million rows/s., 245.31 MB/s.) +``` + +## Credits + +We would like to acknowledge the efforts of the Central Weather Administration and Agricultural Meteorological Observation Network (Station) of the Council of Agriculture for preparing, cleaning, and distributing this dataset. We appreciate your efforts. + +Ou, J.-H., Kuo, C.-H., Wu, Y.-F., Lin, G.-C., Lee, M.-H., Chen, R.-K., Chou, H.-P., Wu, H.-Y., Chu, S.-C., Lai, Q.-J., Tsai, Y.-C., Lin, C.-C., Kuo, C.-C., Liao, C.-T., Chen, Y.-N., Chu, Y.-W., Chen, C.-Y., 2023. Application-oriented deep learning model for early warning of rice blast in Taiwan. Ecological Informatics 73, 101950. https://doi.org/10.1016/j.ecoinf.2022.101950 [13/12/2022] From d6e0dd45b9cc88b9002de68138440cd24452fb17 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Fri, 8 Mar 2024 22:57:49 +0800 Subject: [PATCH 009/101] Fix build --- src/Functions/coverage.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Functions/coverage.cpp b/src/Functions/coverage.cpp index f4cac26df78..0f4cd1940b7 100644 --- a/src/Functions/coverage.cpp +++ b/src/Functions/coverage.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include From 4997f95426786a037328d4ed2bbce2245144eb1f Mon Sep 17 00:00:00 2001 From: Han Fei Date: Fri, 8 Mar 2024 17:30:58 +0100 Subject: [PATCH 010/101] fix build --- programs/keeper/Keeper.cpp | 3 +++ programs/server/Server.cpp | 5 +++++ src/Common/CgroupsMemoryUsageObserver.cpp | 4 +--- src/Common/CgroupsMemoryUsageObserver.h | 10 ++++++++-- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index 76dd8cb15a5..c2ad81a3227 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -630,6 +630,9 @@ try { observer.emplace(std::chrono::seconds(cgroups_memory_observer_wait_time)); observer->startThread(); + observer->setOnMemoryLimitUpdate([&](){ + main_config_reloader->reload(); + }); } diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index ee55cfd1837..c81e9b56e35 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1701,6 +1701,11 @@ try throw; } + if (cgroups_memory_usage_observer) + cgroups_memory_usage_observer->setOnMemoryLimitUpdate([&](){ + main_config_reloader->reload(); + }); + /// Reload config in SYSTEM RELOAD CONFIG query. global_context->setConfigReloadCallback([&]() { diff --git a/src/Common/CgroupsMemoryUsageObserver.cpp b/src/Common/CgroupsMemoryUsageObserver.cpp index 5f24c2553b5..3fda51a119f 100644 --- a/src/Common/CgroupsMemoryUsageObserver.cpp +++ b/src/Common/CgroupsMemoryUsageObserver.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -291,8 +290,7 @@ void CgroupsMemoryUsageObserver::runThread() last_memory_amount = memory_limit; /// if we find memory amount changes, we just reload config. /// Reloading config will check the memory amount again and calculate soft/hard limit again. - auto global_context = getContext()->getGlobalContext(); - global_context->reloadConfig(); + on_memory_limit_update(); } std::lock_guard set_limit_lock(set_limit_mutex); if (soft_limit > 0 && hard_limit > 0) diff --git a/src/Common/CgroupsMemoryUsageObserver.h b/src/Common/CgroupsMemoryUsageObserver.h index 6edf2e2049d..639433b5016 100644 --- a/src/Common/CgroupsMemoryUsageObserver.h +++ b/src/Common/CgroupsMemoryUsageObserver.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include @@ -15,7 +14,7 @@ namespace DB /// - When the soft memory limit is hit, drop jemalloc cache. /// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster. #if defined(OS_LINUX) -class CgroupsMemoryUsageObserver : public WithContext +class CgroupsMemoryUsageObserver { public: enum class CgroupsVersion @@ -28,6 +27,10 @@ public: ~CgroupsMemoryUsageObserver(); void setLimits(uint64_t hard_limit_, uint64_t soft_limit_); + void setOnMemoryLimitUpdate(std::function on_memory_limit_update_) + { + on_memory_limit_update = on_memory_limit_update_; + } void startThread(); size_t getHardLimit() const { return hard_limit; } @@ -46,6 +49,7 @@ private: using CallbackFn = std::function; CallbackFn on_hard_limit; CallbackFn on_soft_limit; + std::function on_memory_limit_update; uint64_t last_usage = 0; @@ -87,8 +91,10 @@ public: void setLimits(uint64_t, uint64_t) {} size_t readMemoryUsage() { return 0; } + void startThread(); size_t getHardLimit() { return 0; } size_t getSoftLimit() { return 0; } + void setOnMemoryLimitUpdate(std::function) {} }; #endif From c01a6775d747606b3aae70c9615404720208aeda Mon Sep 17 00:00:00 2001 From: Han Fei Date: Sat, 9 Mar 2024 02:28:39 +0100 Subject: [PATCH 011/101] fix style --- programs/keeper/Keeper.cpp | 3 ++- programs/server/Server.cpp | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index c2ad81a3227..31e0b7dc576 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -630,7 +630,8 @@ try { observer.emplace(std::chrono::seconds(cgroups_memory_observer_wait_time)); observer->startThread(); - observer->setOnMemoryLimitUpdate([&](){ + observer->setOnMemoryLimitUpdate([&]() + { main_config_reloader->reload(); }); } diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index c81e9b56e35..33a30a1de2c 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1702,9 +1702,12 @@ try } if (cgroups_memory_usage_observer) - cgroups_memory_usage_observer->setOnMemoryLimitUpdate([&](){ + { + cgroups_memory_usage_observer->setOnMemoryLimitUpdate([&]() + { main_config_reloader->reload(); }); + } /// Reload config in SYSTEM RELOAD CONFIG query. global_context->setConfigReloadCallback([&]() From c1caeaa81483e33c666679fb1ce20d6a7da154c3 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Sun, 10 Mar 2024 20:51:54 +0100 Subject: [PATCH 012/101] fix build --- src/Common/CgroupsMemoryUsageObserver.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/CgroupsMemoryUsageObserver.h b/src/Common/CgroupsMemoryUsageObserver.h index 639433b5016..fc0d847af4f 100644 --- a/src/Common/CgroupsMemoryUsageObserver.h +++ b/src/Common/CgroupsMemoryUsageObserver.h @@ -91,7 +91,7 @@ public: void setLimits(uint64_t, uint64_t) {} size_t readMemoryUsage() { return 0; } - void startThread(); + void startThread() {} size_t getHardLimit() { return 0; } size_t getSoftLimit() { return 0; } void setOnMemoryLimitUpdate(std::function) {} From 5f1991fbef2f959f1d55c62194d948814d199fa9 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Tue, 12 Mar 2024 15:53:28 +0800 Subject: [PATCH 013/101] too big translation unit in Aggregator --- src/Common/HashTable/FixedHashMap.h | 3 + .../HashTable/TwoLevelStringHashTable.h | 1 + src/Interpreters/AggregatedData.h | 142 +++ src/Interpreters/AggregatedDataVariants.cpp | 255 ++++ src/Interpreters/AggregatedDataVariants.h | 320 +++++ src/Interpreters/AggregationMethod.cpp | 215 ++++ src/Interpreters/AggregationMethod.h | 320 +++++ src/Interpreters/Aggregator.cpp | 512 ++++---- src/Interpreters/Aggregator.h | 1035 +---------------- 9 files changed, 1541 insertions(+), 1262 deletions(-) create mode 100644 src/Interpreters/AggregatedData.h create mode 100644 src/Interpreters/AggregatedDataVariants.cpp create mode 100644 src/Interpreters/AggregatedDataVariants.h create mode 100644 src/Interpreters/AggregationMethod.cpp create mode 100644 src/Interpreters/AggregationMethod.h diff --git a/src/Common/HashTable/FixedHashMap.h b/src/Common/HashTable/FixedHashMap.h index e835a6fba94..537f37a9e6c 100644 --- a/src/Common/HashTable/FixedHashMap.h +++ b/src/Common/HashTable/FixedHashMap.h @@ -109,6 +109,9 @@ public: using Base::Base; + FixedHashMap() = default; + FixedHashMap(size_t ) {} /// NOLINT + template void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func) { diff --git a/src/Common/HashTable/TwoLevelStringHashTable.h b/src/Common/HashTable/TwoLevelStringHashTable.h index 54c208c5b60..1ce6b3d02e3 100644 --- a/src/Common/HashTable/TwoLevelStringHashTable.h +++ b/src/Common/HashTable/TwoLevelStringHashTable.h @@ -38,6 +38,7 @@ public: Impl impls[NUM_BUCKETS]; TwoLevelStringHashTable() = default; + TwoLevelStringHashTable(size_t ) {} /// NOLINT template explicit TwoLevelStringHashTable(const Source & src) diff --git a/src/Interpreters/AggregatedData.h b/src/Interpreters/AggregatedData.h new file mode 100644 index 00000000000..6cd6b190801 --- /dev/null +++ b/src/Interpreters/AggregatedData.h @@ -0,0 +1,142 @@ +#pragma once +#include + +#include +#include +#include +#include +namespace DB +{ +/** Different data structures that can be used for aggregation + * For efficiency, the aggregation data itself is put into the pool. + * Data and pool ownership (states of aggregate functions) + * is acquired later - in `convertToBlocks` function, by the ColumnAggregateFunction object. + * + * Most data structures exist in two versions: normal and two-level (TwoLevel). + * A two-level hash table works a little slower with a small number of different keys, + * but with a large number of different keys scales better, because it allows + * parallelize some operations (merging, post-processing) in a natural way. + * + * To ensure efficient work over a wide range of conditions, + * first single-level hash tables are used, + * and when the number of different keys is large enough, + * they are converted to two-level ones. + * + * PS. There are many different approaches to the effective implementation of parallel and distributed aggregation, + * best suited for different cases, and this approach is just one of them, chosen for a combination of reasons. + */ + +using AggregatedDataWithoutKey = AggregateDataPtr; + +using AggregatedDataWithUInt8Key = FixedImplicitZeroHashMapWithCalculatedSize; +using AggregatedDataWithUInt16Key = FixedImplicitZeroHashMap; + +using AggregatedDataWithUInt32Key = HashMap>; +using AggregatedDataWithUInt64Key = HashMap>; + +using AggregatedDataWithShortStringKey = StringHashMap; + +using AggregatedDataWithStringKey = HashMapWithSavedHash; + +using AggregatedDataWithKeys128 = HashMap; +using AggregatedDataWithKeys256 = HashMap; + +using AggregatedDataWithUInt32KeyTwoLevel = TwoLevelHashMap>; +using AggregatedDataWithUInt64KeyTwoLevel = TwoLevelHashMap>; + +using AggregatedDataWithShortStringKeyTwoLevel = TwoLevelStringHashMap; + +using AggregatedDataWithStringKeyTwoLevel = TwoLevelHashMapWithSavedHash; + +using AggregatedDataWithKeys128TwoLevel = TwoLevelHashMap; +using AggregatedDataWithKeys256TwoLevel = TwoLevelHashMap; + +/** Variants with better hash function, using more than 32 bits for hash. + * Using for merging phase of external aggregation, where number of keys may be far greater than 4 billion, + * but we keep in memory and merge only sub-partition of them simultaneously. + * TODO We need to switch for better hash function not only for external aggregation, + * but also for huge aggregation results on machines with terabytes of RAM. + */ + +using AggregatedDataWithUInt64KeyHash64 = HashMap>; +using AggregatedDataWithStringKeyHash64 = HashMapWithSavedHash; +using AggregatedDataWithKeys128Hash64 = HashMap; +using AggregatedDataWithKeys256Hash64 = HashMap; + +template +struct AggregationDataWithNullKey : public Base +{ + using Base::Base; + + bool & hasNullKeyData() { return has_null_key_data; } + AggregateDataPtr & getNullKeyData() { return null_key_data; } + bool hasNullKeyData() const { return has_null_key_data; } + const AggregateDataPtr & getNullKeyData() const { return null_key_data; } + size_t size() const { return Base::size() + (has_null_key_data ? 1 : 0); } + bool empty() const { return Base::empty() && !has_null_key_data; } + void clear() + { + Base::clear(); + has_null_key_data = false; + } + void clearAndShrink() + { + Base::clearAndShrink(); + has_null_key_data = false; + } + +private: + bool has_null_key_data = false; + AggregateDataPtr null_key_data = nullptr; +}; + +template +struct AggregationDataWithNullKeyTwoLevel : public Base +{ + using Base::Base; + using Base::impls; + + AggregationDataWithNullKeyTwoLevel() = default; + + template + explicit AggregationDataWithNullKeyTwoLevel(const Other & other) : Base(other) + { + impls[0].hasNullKeyData() = other.hasNullKeyData(); + impls[0].getNullKeyData() = other.getNullKeyData(); + } + + bool & hasNullKeyData() { return impls[0].hasNullKeyData(); } + AggregateDataPtr & getNullKeyData() { return impls[0].getNullKeyData(); } + bool hasNullKeyData() const { return impls[0].hasNullKeyData(); } + const AggregateDataPtr & getNullKeyData() const { return impls[0].getNullKeyData(); } +}; + +template +using HashTableWithNullKey = AggregationDataWithNullKey>; +template +using StringHashTableWithNullKey = AggregationDataWithNullKey>; + +using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey; +using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey; +using AggregatedDataWithNullableUInt32Key = AggregationDataWithNullKey; + + +using AggregatedDataWithNullableUInt64Key = AggregationDataWithNullKey; +using AggregatedDataWithNullableStringKey = AggregationDataWithNullKey; +using AggregatedDataWithNullableShortStringKey = AggregationDataWithNullKey; + + +using AggregatedDataWithNullableUInt32KeyTwoLevel = AggregationDataWithNullKeyTwoLevel< + TwoLevelHashMap, + TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>; +using AggregatedDataWithNullableUInt64KeyTwoLevel = AggregationDataWithNullKeyTwoLevel< + TwoLevelHashMap, + TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>; + +using AggregatedDataWithNullableShortStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel< + TwoLevelStringHashMap>; + +using AggregatedDataWithNullableStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel< + TwoLevelHashMapWithSavedHash, + TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>; +} diff --git a/src/Interpreters/AggregatedDataVariants.cpp b/src/Interpreters/AggregatedDataVariants.cpp new file mode 100644 index 00000000000..0c86c58bd3e --- /dev/null +++ b/src/Interpreters/AggregatedDataVariants.cpp @@ -0,0 +1,255 @@ +#include +#include + +namespace ProfileEvents +{ + extern const Event AggregationPreallocatedElementsInHashTables; +} + +namespace DB +{ +namespace ErrorCodes +{ + extern const int UNKNOWN_AGGREGATED_DATA_VARIANT; + extern const int LOGICAL_ERROR; + +} +using ColumnsHashing::HashMethodContext; +using ColumnsHashing::HashMethodContextPtr; +using ColumnsHashing::LastElementCacheStats; + +AggregatedDataVariants::AggregatedDataVariants() : aggregates_pools(1, std::make_shared()), aggregates_pool(aggregates_pools.back().get()) {} + +AggregatedDataVariants::~AggregatedDataVariants() +{ + if (aggregator && !aggregator->all_aggregates_has_trivial_destructor) + { + try + { + aggregator->destroyAllAggregateStates(*this); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +} + +// The std::is_constructible trait isn't suitable here because some classes have template constructors with semantics different from providing size hints. +// Also string hash table variants are not supported due to the fact that both local perf tests and tests in CI showed slowdowns for them. +template +struct HasConstructorOfNumberOfElements : std::false_type +{ +}; + +template +struct HasConstructorOfNumberOfElements> : std::true_type +{ +}; + +template typename ImplTable> +struct HasConstructorOfNumberOfElements> : std::true_type +{ +}; + +template +struct HasConstructorOfNumberOfElements> : std::true_type +{ +}; + +template +struct HasConstructorOfNumberOfElements> : std::true_type +{ +}; + +template