From 369c58c0f27c9a3c494cc2ae209215e3b1c92b39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Thu, 24 Oct 2024 13:57:26 +0200 Subject: [PATCH] Reduce deps from SettingsEnums --- .../static-files-disk-uploader.cpp | 1 + .../CompressedReadBufferFromFile.h | 5 +- src/Core/Settings.cpp | 1 + src/Core/Settings.h | 2 - src/Core/SettingsEnums.h | 4 +- src/Disks/IO/createReadBufferFromFileBase.h | 6 +- src/IO/DistributedCacheLogMode.h | 15 + src/IO/DistributedCachePoolBehaviourOnLimit.h | 14 + src/IO/DistributedCacheSettings.h | 18 +- src/IO/ReadMethod.h | 58 ++++ src/IO/ReadSettings.h | 56 +--- src/Interpreters/Cache/FileCache.h | 3 +- src/Interpreters/Cache/QueryLimit.cpp | 1 + src/Storages/Cache/ExternalDataSourceCache.h | 3 +- .../MaterializedView/RefreshSettings.cpp | 286 +++++++++++++++++- src/Storages/MergeTree/IDataPartStorage.h | 3 +- src/Storages/MergeTree/MergeTreeIOSettings.h | 3 +- src/Storages/MergeTree/MergeTreeMarksLoader.h | 4 +- utils/check-marks/main.cpp | 1 + 19 files changed, 397 insertions(+), 87 deletions(-) create mode 100644 src/IO/DistributedCacheLogMode.h create mode 100644 src/IO/DistributedCachePoolBehaviourOnLimit.h create mode 100644 src/IO/ReadMethod.h diff --git a/programs/static-files-disk-uploader/static-files-disk-uploader.cpp b/programs/static-files-disk-uploader/static-files-disk-uploader.cpp index f7696dd37f1..590e0364040 100644 --- a/programs/static-files-disk-uploader/static-files-disk-uploader.cpp +++ b/programs/static-files-disk-uploader/static-files-disk-uploader.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include diff --git a/src/Compression/CompressedReadBufferFromFile.h b/src/Compression/CompressedReadBufferFromFile.h index 10b5827f4c8..a9c0bcb888a 100644 --- a/src/Compression/CompressedReadBufferFromFile.h +++ b/src/Compression/CompressedReadBufferFromFile.h @@ -1,10 +1,9 @@ #pragma once +#include +#include #include #include -#include -#include -#include namespace DB diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 925c2b38b4c..c1ee44874d6 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Core/Settings.h b/src/Core/Settings.h index ecfd4240a59..ac3b1fe651e 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -5,9 +5,7 @@ #include #include #include -#include #include -#include #include #include diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index c42ee0683e4..607011b505b 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -12,7 +12,9 @@ #include #include #include -#include +#include +#include +#include #include #include #include diff --git a/src/Disks/IO/createReadBufferFromFileBase.h b/src/Disks/IO/createReadBufferFromFileBase.h index e93725a967d..c062dfc0847 100644 --- a/src/Disks/IO/createReadBufferFromFileBase.h +++ b/src/Disks/IO/createReadBufferFromFileBase.h @@ -1,13 +1,13 @@ #pragma once -#include -#include -#include #include +#include +#include namespace DB { +struct ReadSettings; /** Create an object to read data from a file. * diff --git a/src/IO/DistributedCacheLogMode.h b/src/IO/DistributedCacheLogMode.h new file mode 100644 index 00000000000..8998ded9f96 --- /dev/null +++ b/src/IO/DistributedCacheLogMode.h @@ -0,0 +1,15 @@ +#pragma once + +#include + +namespace DB +{ + +enum class DistributedCacheLogMode +{ + LOG_NOTHING, + LOG_ON_ERROR, + LOG_ALL, +}; + +} diff --git a/src/IO/DistributedCachePoolBehaviourOnLimit.h b/src/IO/DistributedCachePoolBehaviourOnLimit.h new file mode 100644 index 00000000000..e0bd200ea68 --- /dev/null +++ b/src/IO/DistributedCachePoolBehaviourOnLimit.h @@ -0,0 +1,14 @@ +#pragma once + +#include + +namespace DB +{ + +enum class DistributedCachePoolBehaviourOnLimit +{ + WAIT, + ALLOCATE_NEW_BYPASSING_POOL, +}; + +} diff --git a/src/IO/DistributedCacheSettings.h b/src/IO/DistributedCacheSettings.h index f0c9080ed1b..85a8bffdd25 100644 --- a/src/IO/DistributedCacheSettings.h +++ b/src/IO/DistributedCacheSettings.h @@ -1,25 +1,13 @@ #pragma once - -#include #include +#include +#include +#include namespace DB { -enum class DistributedCachePoolBehaviourOnLimit -{ - WAIT, - ALLOCATE_NEW_BYPASSING_POOL, -}; - -enum class DistributedCacheLogMode -{ - LOG_NOTHING, - LOG_ON_ERROR, - LOG_ALL, -}; - struct DistributedCacheSettings { bool throw_on_error = false; diff --git a/src/IO/ReadMethod.h b/src/IO/ReadMethod.h new file mode 100644 index 00000000000..0898ffceac4 --- /dev/null +++ b/src/IO/ReadMethod.h @@ -0,0 +1,58 @@ +#pragma once + +#include + +namespace DB +{ + +enum class LocalFSReadMethod : uint8_t +{ + /** + * Simple synchronous reads with 'read'. + * Can use direct IO after specified size. + * Can use prefetch by asking OS to perform readahead. + */ + read, + + /** + * Simple synchronous reads with 'pread'. + * In contrast to 'read', shares single file descriptor from multiple threads. + * Can use direct IO after specified size. + * Can use prefetch by asking OS to perform readahead. + */ + pread, + + /** + * Use mmap after specified size or simple synchronous reads with 'pread'. + * Can use prefetch by asking OS to perform readahead. + */ + mmap, + + /** + * Use the io_uring Linux subsystem for asynchronous reads. + * Can use direct IO after specified size. + * Can do prefetch with double buffering. + */ + io_uring, + + /** + * Checks if data is in page cache with 'preadv2' on modern Linux kernels. + * If data is in page cache, read from the same thread. + * If not, offload IO to separate threadpool. + * Can do prefetch with double buffering. + * Can use specified priorities and limit the number of concurrent reads. + */ + pread_threadpool, + + /// Use asynchronous reader with fake backend that in fact synchronous. + /// @attention Use only for testing purposes. + pread_fake_async +}; + +enum class RemoteFSReadMethod : uint8_t +{ + read, + threadpool, +}; + +} diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index 7d6b9f10931..aa52e00e6d7 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -2,64 +2,16 @@ #include #include +#include +#include #include -#include +#include #include #include -#include -#include +#include namespace DB { -enum class LocalFSReadMethod : uint8_t -{ - /** - * Simple synchronous reads with 'read'. - * Can use direct IO after specified size. - * Can use prefetch by asking OS to perform readahead. - */ - read, - - /** - * Simple synchronous reads with 'pread'. - * In contrast to 'read', shares single file descriptor from multiple threads. - * Can use direct IO after specified size. - * Can use prefetch by asking OS to perform readahead. - */ - pread, - - /** - * Use mmap after specified size or simple synchronous reads with 'pread'. - * Can use prefetch by asking OS to perform readahead. - */ - mmap, - - /** - * Use the io_uring Linux subsystem for asynchronous reads. - * Can use direct IO after specified size. - * Can do prefetch with double buffering. - */ - io_uring, - - /** - * Checks if data is in page cache with 'preadv2' on modern Linux kernels. - * If data is in page cache, read from the same thread. - * If not, offload IO to separate threadpool. - * Can do prefetch with double buffering. - * Can use specified priorities and limit the number of concurrent reads. - */ - pread_threadpool, - - /// Use asynchronous reader with fake backend that in fact synchronous. - /// @attention Use only for testing purposes. - pread_fake_async -}; - -enum class RemoteFSReadMethod : uint8_t -{ - read, - threadpool, -}; class MMappedFileCache; class PageCache; diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index a25c945cdf7..810ed481300 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -6,8 +6,6 @@ #include #include -#include - #include #include #include @@ -25,6 +23,7 @@ namespace DB { +struct ReadSettings; /// Track acquired space in cache during reservation /// to make error messages when no space left more informative. diff --git a/src/Interpreters/Cache/QueryLimit.cpp b/src/Interpreters/Cache/QueryLimit.cpp index 6a5b5bf67ca..b18d23a5b7f 100644 --- a/src/Interpreters/Cache/QueryLimit.cpp +++ b/src/Interpreters/Cache/QueryLimit.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include namespace DB diff --git a/src/Storages/Cache/ExternalDataSourceCache.h b/src/Storages/Cache/ExternalDataSourceCache.h index 3b4eff28307..c48df3bd1e0 100644 --- a/src/Storages/Cache/ExternalDataSourceCache.h +++ b/src/Storages/Cache/ExternalDataSourceCache.h @@ -7,14 +7,13 @@ #include #include #include +#include #include #include #include -#include #include #include #include -#include #include #include #include diff --git a/src/Storages/MaterializedView/RefreshSettings.cpp b/src/Storages/MaterializedView/RefreshSettings.cpp index 6e130affb78..8a6a2e4b02a 100644 --- a/src/Storages/MaterializedView/RefreshSettings.cpp +++ b/src/Storages/MaterializedView/RefreshSettings.cpp @@ -11,8 +11,290 @@ namespace DB DECLARE(UInt64, refresh_retry_max_backoff_ms, 60'000, "Limit on the exponential growth of delay between refresh attempts, if they keep failing and refresh_retries is positive.", 0) \ DECLARE(Bool, all_replicas, /* do not change or existing tables will break */ false, "If the materialized view is in a Replicated database, and APPEND is enabled, this flag controls whether all replicas or one replica will refresh.", 0) \ -DECLARE_SETTINGS_TRAITS(RefreshSettingsTraits, LIST_OF_REFRESH_SETTINGS) -IMPLEMENT_SETTINGS_TRAITS(RefreshSettingsTraits, LIST_OF_REFRESH_SETTINGS) +struct RefreshSettingsTraits +{ + struct Data + { + SettingFieldInt64 refresh_retries{ 2 }; + SettingFieldUInt64 refresh_retry_initial_backoff_ms{ 100 }; + SettingFieldUInt64 refresh_retry_max_backoff_ms{ 60'000 }; + SettingFieldBool all_replicas{ false }; + }; + class Accessor + { + public: + static const Accessor& instance(); + size_t size() const + { + return field_infos.size(); + } + size_t find(std::string_view name) const; + const String& getName(size_t index) const + { + return field_infos[index].name; + } + const char* getTypeName(size_t index) const + { + return field_infos[index].type; + } + const char* getDescription(size_t index) const + { + return field_infos[index].description; + } + bool isImportant(size_t index) const + { + return field_infos[index].is_important; + } + bool isObsolete(size_t index) const + { + return field_infos[index].is_obsolete; + } + Field castValueUtil(size_t index, const Field& value) const + { + return field_infos[index].cast_value_util_function(value); + } + String valueToStringUtil(size_t index, const Field& value) const + { + return field_infos[index].value_to_string_util_function(value); + } + Field stringToValueUtil(size_t index, const String& str) const + { + return field_infos[index].string_to_value_util_function(str); + } + void setValue(Data& data, size_t index, const Field& value) const + { + return field_infos[index].set_value_function(data, value); + } + Field getValue(const Data& data, size_t index) const + { + return field_infos[index].get_value_function(data); + } + void setValueString(Data& data, size_t index, const String& str) const + { + return field_infos[index].set_value_string_function(data, str); + } + String getValueString(const Data& data, size_t index) const + { + return field_infos[index].get_value_string_function(data); + } + bool isValueChanged(const Data& data, size_t index) const + { + return field_infos[index].is_value_changed_function(data); + } + void resetValueToDefault(Data& data, size_t index) const + { + return field_infos[index].reset_value_to_default_function(data); + } + void writeBinary(const Data& data, size_t index, WriteBuffer& out) const + { + return field_infos[index].write_binary_function(data, out); + } + void readBinary(Data& data, size_t index, ReadBuffer& in) const + { + return field_infos[index].read_binary_function(data, in); + } + Field getDefaultValue(size_t index) const + { + return field_infos[index].get_default_value_function(); + } + String getDefaultValueString(size_t index) const + { + return field_infos[index].get_default_value_string_function(); + } + private: + Accessor(); + struct FieldInfo + { + String name; + const char* type; + const char* description; + bool is_important; + bool is_obsolete; + Field (* cast_value_util_function)(const Field&); + String (* value_to_string_util_function)(const Field&); + Field (* string_to_value_util_function)(const String&); + void (* set_value_function)(Data&, const Field&); + Field (* get_value_function)(const Data&); + void (* set_value_string_function)(Data&, const String&); + String (* get_value_string_function)(const Data&); + bool (* is_value_changed_function)(const Data&); + void (* reset_value_to_default_function)(Data&); + void (* write_binary_function)(const Data&, WriteBuffer&); + void (* read_binary_function)(Data&, ReadBuffer&); + Field (* get_default_value_function)(); + String (* get_default_value_string_function)(); + }; + std::vector field_infos; + std::unordered_map name_to_index_map; + }; + static constexpr bool allow_custom_settings = 0; + static inline const AliasMap aliases_to_settings = DefineAliases().setName("refresh_retries").setName( + "refresh_retry_initial_backoff_ms").setName("refresh_retry_max_backoff_ms").setName("all_replicas"); + using SettingsToAliasesMap = std::unordered_map>; + static inline const SettingsToAliasesMap& settingsToAliases() + { + static SettingsToAliasesMap setting_to_aliases_mapping = [] + { + std::unordered_map> map; + for (const auto& [alias, destination] : aliases_to_settings)map[destination].push_back(alias); + return map; + }(); + return setting_to_aliases_mapping; + } + static std::string_view resolveName(std::string_view name) + { + if (auto it = aliases_to_settings.find(name);it != aliases_to_settings.end())return it->second; + return name; + } +}; + +const RefreshSettingsTraits::Accessor& RefreshSettingsTraits::Accessor::instance() +{ + static const Accessor the_instance = [] + { + Accessor res; + constexpr int IMPORTANT = 0x01; + UNUSED(IMPORTANT); + res.field_infos.emplace_back(FieldInfo{ "refresh_retries", "Int64", + "How many times to retry refresh query if it fails. If all attempts fail, wait for the next refresh time according to schedule. 0 to disable retries. -1 for infinite retries.", + (0) & IMPORTANT, + static_cast((0) & BaseSettingsHelpers::Flags::OBSOLETE), + [](const Field& value) -> Field + { return static_cast(SettingFieldInt64{ value }); }, + [](const Field& value) -> String + { return SettingFieldInt64{ value }.toString(); }, + [](const String& str) -> Field + { + SettingFieldInt64 temp; + temp.parseFromString(str); + return static_cast(temp); + }, [](Data& data, const Field& value) + { data.refresh_retries = value; }, [](const Data& data) -> Field + { return static_cast(data.refresh_retries); }, + [](Data& data, const String& str) + { data.refresh_retries.parseFromString(str); }, + [](const Data& data) -> String + { return data.refresh_retries.toString(); }, + [](const Data& data) -> bool + { return data.refresh_retries.changed; }, [](Data& data) + { data.refresh_retries = SettingFieldInt64{ 2 }; }, + [](const Data& data, WriteBuffer& out) + { data.refresh_retries.writeBinary(out); }, + [](Data& data, ReadBuffer& in) + { data.refresh_retries.readBinary(in); }, []() -> Field + { return static_cast(SettingFieldInt64{ 2 }); }, []() -> String + { return SettingFieldInt64{ 2 }.toString(); }}); + res.field_infos.emplace_back(FieldInfo{ "refresh_retry_initial_backoff_ms", "UInt64", + "Delay before the first retry if refresh query fails (if refresh_retries setting is not zero). Each subsequent retry doubles the delay, up to refresh_retry_max_backoff_ms.", + (0) & IMPORTANT, + static_cast((0) & BaseSettingsHelpers::Flags::OBSOLETE), + [](const Field& value) -> Field + { return static_cast(SettingFieldUInt64{ value }); }, + [](const Field& value) -> String + { return SettingFieldUInt64{ value }.toString(); }, + [](const String& str) -> Field + { + SettingFieldUInt64 temp; + temp.parseFromString(str); + return static_cast(temp); + }, [](Data& data, const Field& value) + { data.refresh_retry_initial_backoff_ms = value; }, + [](const Data& data) -> Field + { return static_cast(data.refresh_retry_initial_backoff_ms); }, + [](Data& data, const String& str) + { data.refresh_retry_initial_backoff_ms.parseFromString(str); }, + [](const Data& data) -> String + { return data.refresh_retry_initial_backoff_ms.toString(); }, + [](const Data& data) -> bool + { return data.refresh_retry_initial_backoff_ms.changed; }, + [](Data& data) + { data.refresh_retry_initial_backoff_ms = SettingFieldUInt64{ 100 }; }, + [](const Data& data, WriteBuffer& out) + { data.refresh_retry_initial_backoff_ms.writeBinary(out); }, + [](Data& data, ReadBuffer& in) + { data.refresh_retry_initial_backoff_ms.readBinary(in); }, []() -> Field + { return static_cast(SettingFieldUInt64{ 100 }); }, + []() -> String + { return SettingFieldUInt64{ 100 }.toString(); }}); + res.field_infos.emplace_back(FieldInfo{ "refresh_retry_max_backoff_ms", "UInt64", + "Limit on the exponential growth of delay between refresh attempts, if they keep failing and refresh_retries is positive.", + (0) & IMPORTANT, + static_cast((0) & BaseSettingsHelpers::Flags::OBSOLETE), + [](const Field& value) -> Field + { return static_cast(SettingFieldUInt64{ value }); }, + [](const Field& value) -> String + { return SettingFieldUInt64{ value }.toString(); }, + [](const String& str) -> Field + { + SettingFieldUInt64 temp; + temp.parseFromString(str); + return static_cast(temp); + }, [](Data& data, const Field& value) + { data.refresh_retry_max_backoff_ms = value; }, + [](const Data& data) -> Field + { return static_cast(data.refresh_retry_max_backoff_ms); }, + [](Data& data, const String& str) + { data.refresh_retry_max_backoff_ms.parseFromString(str); }, + [](const Data& data) -> String + { return data.refresh_retry_max_backoff_ms.toString(); }, + [](const Data& data) -> bool + { return data.refresh_retry_max_backoff_ms.changed; }, [](Data& data) + { data.refresh_retry_max_backoff_ms = SettingFieldUInt64{ 60'000 }; }, + [](const Data& data, WriteBuffer& out) + { data.refresh_retry_max_backoff_ms.writeBinary(out); }, + [](Data& data, ReadBuffer& in) + { data.refresh_retry_max_backoff_ms.readBinary(in); }, []() -> Field + { return static_cast(SettingFieldUInt64{ 60'000 }); }, + []() -> String + { return SettingFieldUInt64{ 60'000 }.toString(); }}); + res.field_infos.emplace_back(FieldInfo{ "all_replicas", "Bool", + "If the materialized view is in a Replicated database, and APPEND is enabled, this flag controls whether all replicas or one replica will refresh.", + (0) & IMPORTANT, + static_cast((0) & BaseSettingsHelpers::Flags::OBSOLETE), + [](const Field& value) -> Field + { return static_cast(SettingFieldBool{ value }); }, + [](const Field& value) -> String + { return SettingFieldBool{ value }.toString(); }, + [](const String& str) -> Field + { + SettingFieldBool temp; + temp.parseFromString(str); + return static_cast(temp); + }, [](Data& data, const Field& value) + { data.all_replicas = value; }, [](const Data& data) -> Field + { return static_cast(data.all_replicas); }, + [](Data& data, const String& str) + { data.all_replicas.parseFromString(str); }, + [](const Data& data) -> String + { return data.all_replicas.toString(); }, [](const Data& data) -> bool + { return data.all_replicas.changed; }, [](Data& data) + { data.all_replicas = SettingFieldBool{ false }; }, + [](const Data& data, WriteBuffer& out) + { data.all_replicas.writeBinary(out); }, [](Data& data, ReadBuffer& in) + { data.all_replicas.readBinary(in); }, []() -> Field + { return static_cast(SettingFieldBool{ false }); }, + []() -> String + { return SettingFieldBool{ false }.toString(); }}); + for (size_t i : collections::range(res.field_infos.size())) + { + const auto& info = res.field_infos[i]; + res.name_to_index_map.emplace(info.name, i); + } + return res; + }(); + return the_instance; +} +RefreshSettingsTraits::Accessor::Accessor() +{ +} +size_t RefreshSettingsTraits::Accessor::find(std::string_view name) const +{ + auto it = name_to_index_map.find(name); + if (it != name_to_index_map.end())return it->second; + return static_cast(-1); +} +template +class BaseSettings; struct RefreshSettingsImpl : public BaseSettings { diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index a09c24c63ab..49d9fbf2291 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -1,5 +1,4 @@ #pragma once -#include #include #include #include @@ -16,7 +15,7 @@ namespace DB { - +struct ReadSettings; class ReadBufferFromFileBase; class WriteBufferFromFileBase; diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.h b/src/Storages/MergeTree/MergeTreeIOSettings.h index fcc72815d8f..239f40ff3cc 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.h +++ b/src/Storages/MergeTree/MergeTreeIOSettings.h @@ -2,6 +2,7 @@ #include #include #include +#include #include @@ -32,7 +33,7 @@ struct MergeTreeReaderSettings bool checksum_on_read = true; /// True if we read in order of sorting key. bool read_in_order = false; - /// Use one buffer for each column or for all columns while reading from compact. + /// Use one buffer for each column or for all columns while reading from compact.P CompactPartsReadMethod compact_parts_read_method = CompactPartsReadMethod::SingleBuffer; /// True if we read stream for dictionary of LowCardinality type. bool is_low_cardinality_dictionary = false; diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.h b/src/Storages/MergeTree/MergeTreeMarksLoader.h index 2aa4474e1c5..9c28cc65fdf 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.h +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.h @@ -1,9 +1,8 @@ #pragma once #include -#include -#include #include +#include namespace DB @@ -11,6 +10,7 @@ namespace DB struct MergeTreeIndexGranularityInfo; using MarksPtr = MarkCache::MappedPtr; +struct ReadSettings; class Threadpool; /// Class that helps to get marks by indexes. diff --git a/utils/check-marks/main.cpp b/utils/check-marks/main.cpp index b4cd44d6eb7..8f05e98ebd5 100644 --- a/utils/check-marks/main.cpp +++ b/utils/check-marks/main.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include