Reduce deps from SettingsEnums

This commit is contained in:
Raúl Marín 2024-10-24 13:57:26 +02:00
parent 8566916cc3
commit 369c58c0f2
19 changed files with 397 additions and 87 deletions

View File

@ -4,6 +4,7 @@
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadSettings.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteBufferFromFile.h>

View File

@ -1,10 +1,9 @@
#pragma once
#include <memory>
#include <time.h>
#include <Compression/CompressedReadBufferBase.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <time.h>
#include <memory>
namespace DB

View File

@ -4,6 +4,7 @@
#include <Core/BaseSettingsFwdMacros.h>
#include <Core/BaseSettingsFwdMacrosImpl.h>
#include <Core/BaseSettingsProgramOptions.h>
#include <Core/DistributedCacheProtocol.h>
#include <Core/FormatFactorySettings.h>
#include <Core/Settings.h>
#include <Core/SettingsChangesHistory.h>

View File

@ -5,9 +5,7 @@
#include <Core/SettingsEnums.h>
#include <Core/SettingsFields.h>
#include <Core/SettingsWriteFormat.h>
#include <Core/ParallelReplicasMode.h>
#include <base/types.h>
#include <Common/SettingConstraintWritability.h>
#include <Common/SettingsChanges.h>
#include <string_view>

View File

@ -12,7 +12,9 @@
#include <Core/ShortCircuitFunctionEvaluation.h>
#include <Core/StreamingHandleErrorMode.h>
#include <Formats/FormatSettings.h>
#include <IO/ReadSettings.h>
#include <IO/DistributedCacheLogMode.h>
#include <IO/DistributedCachePoolBehaviourOnLimit.h>
#include <IO/ReadMethod.h>
#include <Parsers/IdentifierQuotingStyle.h>
#include <QueryPipeline/SizeLimits.h>
#include <Common/ShellCommandSettings.h>

View File

@ -1,13 +1,13 @@
#pragma once
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <string>
#include <memory>
#include <string>
#include <IO/ReadBufferFromFileBase.h>
namespace DB
{
struct ReadSettings;
/** Create an object to read data from a file.
*

View File

@ -0,0 +1,15 @@
#pragma once
#include <cstdint>
namespace DB
{
enum class DistributedCacheLogMode
{
LOG_NOTHING,
LOG_ON_ERROR,
LOG_ALL,
};
}

View File

@ -0,0 +1,14 @@
#pragma once
#include <cstdint>
namespace DB
{
enum class DistributedCachePoolBehaviourOnLimit
{
WAIT,
ALLOCATE_NEW_BYPASSING_POOL,
};
}

View File

@ -1,25 +1,13 @@
#pragma once
#include <Core/Types.h>
#include <Core/DistributedCacheProtocol.h>
#include <Core/Types.h>
#include <IO/DistributedCacheLogMode.h>
#include <IO/DistributedCachePoolBehaviourOnLimit.h>
namespace DB
{
enum class DistributedCachePoolBehaviourOnLimit
{
WAIT,
ALLOCATE_NEW_BYPASSING_POOL,
};
enum class DistributedCacheLogMode
{
LOG_NOTHING,
LOG_ON_ERROR,
LOG_ALL,
};
struct DistributedCacheSettings
{
bool throw_on_error = false;

58
src/IO/ReadMethod.h Normal file
View File

@ -0,0 +1,58 @@
#pragma once
#include <cstdint>
namespace DB
{
enum class LocalFSReadMethod : uint8_t
{
/**
* Simple synchronous reads with 'read'.
* Can use direct IO after specified size.
* Can use prefetch by asking OS to perform readahead.
*/
read,
/**
* Simple synchronous reads with 'pread'.
* In contrast to 'read', shares single file descriptor from multiple threads.
* Can use direct IO after specified size.
* Can use prefetch by asking OS to perform readahead.
*/
pread,
/**
* Use mmap after specified size or simple synchronous reads with 'pread'.
* Can use prefetch by asking OS to perform readahead.
*/
mmap,
/**
* Use the io_uring Linux subsystem for asynchronous reads.
* Can use direct IO after specified size.
* Can do prefetch with double buffering.
*/
io_uring,
/**
* Checks if data is in page cache with 'preadv2' on modern Linux kernels.
* If data is in page cache, read from the same thread.
* If not, offload IO to separate threadpool.
* Can do prefetch with double buffering.
* Can use specified priorities and limit the number of concurrent reads.
*/
pread_threadpool,
/// Use asynchronous reader with fake backend that in fact synchronous.
/// @attention Use only for testing purposes.
pread_fake_async
};
enum class RemoteFSReadMethod : uint8_t
{
read,
threadpool,
};
}

View File

@ -2,64 +2,16 @@
#include <cstddef>
#include <Core/Defines.h>
#include <IO/DistributedCacheSettings.h>
#include <IO/ReadMethod.h>
#include <Interpreters/Cache/FileCache_fwd.h>
#include <Common/Throttler_fwd.h>
#include <Interpreters/Cache/UserInfo.h>
#include <Common/Priority.h>
#include <Common/Scheduler/ResourceLink.h>
#include <IO/DistributedCacheSettings.h>
#include <Interpreters/Cache/UserInfo.h>
#include <Common/Throttler_fwd.h>
namespace DB
{
enum class LocalFSReadMethod : uint8_t
{
/**
* Simple synchronous reads with 'read'.
* Can use direct IO after specified size.
* Can use prefetch by asking OS to perform readahead.
*/
read,
/**
* Simple synchronous reads with 'pread'.
* In contrast to 'read', shares single file descriptor from multiple threads.
* Can use direct IO after specified size.
* Can use prefetch by asking OS to perform readahead.
*/
pread,
/**
* Use mmap after specified size or simple synchronous reads with 'pread'.
* Can use prefetch by asking OS to perform readahead.
*/
mmap,
/**
* Use the io_uring Linux subsystem for asynchronous reads.
* Can use direct IO after specified size.
* Can do prefetch with double buffering.
*/
io_uring,
/**
* Checks if data is in page cache with 'preadv2' on modern Linux kernels.
* If data is in page cache, read from the same thread.
* If not, offload IO to separate threadpool.
* Can do prefetch with double buffering.
* Can use specified priorities and limit the number of concurrent reads.
*/
pread_threadpool,
/// Use asynchronous reader with fake backend that in fact synchronous.
/// @attention Use only for testing purposes.
pread_fake_async
};
enum class RemoteFSReadMethod : uint8_t
{
read,
threadpool,
};
class MMappedFileCache;
class PageCache;

View File

@ -6,8 +6,6 @@
#include <unordered_map>
#include <boost/functional/hash.hpp>
#include <IO/ReadSettings.h>
#include <Common/callOnce.h>
#include <Common/ThreadPool.h>
#include <Common/StatusFile.h>
@ -25,6 +23,7 @@
namespace DB
{
struct ReadSettings;
/// Track acquired space in cache during reservation
/// to make error messages when no space left more informative.

View File

@ -1,6 +1,7 @@
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/Metadata.h>
#include <Interpreters/Cache/QueryLimit.h>
#include <IO/ReadSettings.h>
#include <Common/CurrentThread.h>
namespace DB

View File

@ -7,14 +7,13 @@
#include <mutex>
#include <set>
#include <Core/BackgroundSchedulePool.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Interpreters/Context.h>
#include <Storages/Cache/IRemoteFileMetadata.h>
#include <Storages/Cache/RemoteCacheController.h>

View File

@ -11,8 +11,290 @@ namespace DB
DECLARE(UInt64, refresh_retry_max_backoff_ms, 60'000, "Limit on the exponential growth of delay between refresh attempts, if they keep failing and refresh_retries is positive.", 0) \
DECLARE(Bool, all_replicas, /* do not change or existing tables will break */ false, "If the materialized view is in a Replicated database, and APPEND is enabled, this flag controls whether all replicas or one replica will refresh.", 0) \
DECLARE_SETTINGS_TRAITS(RefreshSettingsTraits, LIST_OF_REFRESH_SETTINGS)
IMPLEMENT_SETTINGS_TRAITS(RefreshSettingsTraits, LIST_OF_REFRESH_SETTINGS)
struct RefreshSettingsTraits
{
struct Data
{
SettingFieldInt64 refresh_retries{ 2 };
SettingFieldUInt64 refresh_retry_initial_backoff_ms{ 100 };
SettingFieldUInt64 refresh_retry_max_backoff_ms{ 60'000 };
SettingFieldBool all_replicas{ false };
};
class Accessor
{
public:
static const Accessor& instance();
size_t size() const
{
return field_infos.size();
}
size_t find(std::string_view name) const;
const String& getName(size_t index) const
{
return field_infos[index].name;
}
const char* getTypeName(size_t index) const
{
return field_infos[index].type;
}
const char* getDescription(size_t index) const
{
return field_infos[index].description;
}
bool isImportant(size_t index) const
{
return field_infos[index].is_important;
}
bool isObsolete(size_t index) const
{
return field_infos[index].is_obsolete;
}
Field castValueUtil(size_t index, const Field& value) const
{
return field_infos[index].cast_value_util_function(value);
}
String valueToStringUtil(size_t index, const Field& value) const
{
return field_infos[index].value_to_string_util_function(value);
}
Field stringToValueUtil(size_t index, const String& str) const
{
return field_infos[index].string_to_value_util_function(str);
}
void setValue(Data& data, size_t index, const Field& value) const
{
return field_infos[index].set_value_function(data, value);
}
Field getValue(const Data& data, size_t index) const
{
return field_infos[index].get_value_function(data);
}
void setValueString(Data& data, size_t index, const String& str) const
{
return field_infos[index].set_value_string_function(data, str);
}
String getValueString(const Data& data, size_t index) const
{
return field_infos[index].get_value_string_function(data);
}
bool isValueChanged(const Data& data, size_t index) const
{
return field_infos[index].is_value_changed_function(data);
}
void resetValueToDefault(Data& data, size_t index) const
{
return field_infos[index].reset_value_to_default_function(data);
}
void writeBinary(const Data& data, size_t index, WriteBuffer& out) const
{
return field_infos[index].write_binary_function(data, out);
}
void readBinary(Data& data, size_t index, ReadBuffer& in) const
{
return field_infos[index].read_binary_function(data, in);
}
Field getDefaultValue(size_t index) const
{
return field_infos[index].get_default_value_function();
}
String getDefaultValueString(size_t index) const
{
return field_infos[index].get_default_value_string_function();
}
private:
Accessor();
struct FieldInfo
{
String name;
const char* type;
const char* description;
bool is_important;
bool is_obsolete;
Field (* cast_value_util_function)(const Field&);
String (* value_to_string_util_function)(const Field&);
Field (* string_to_value_util_function)(const String&);
void (* set_value_function)(Data&, const Field&);
Field (* get_value_function)(const Data&);
void (* set_value_string_function)(Data&, const String&);
String (* get_value_string_function)(const Data&);
bool (* is_value_changed_function)(const Data&);
void (* reset_value_to_default_function)(Data&);
void (* write_binary_function)(const Data&, WriteBuffer&);
void (* read_binary_function)(Data&, ReadBuffer&);
Field (* get_default_value_function)();
String (* get_default_value_string_function)();
};
std::vector<FieldInfo> field_infos;
std::unordered_map<std::string_view, size_t> name_to_index_map;
};
static constexpr bool allow_custom_settings = 0;
static inline const AliasMap aliases_to_settings = DefineAliases().setName("refresh_retries").setName(
"refresh_retry_initial_backoff_ms").setName("refresh_retry_max_backoff_ms").setName("all_replicas");
using SettingsToAliasesMap = std::unordered_map<std::string_view, std::vector<std::string_view>>;
static inline const SettingsToAliasesMap& settingsToAliases()
{
static SettingsToAliasesMap setting_to_aliases_mapping = []
{
std::unordered_map<std::string_view, std::vector<std::string_view>> map;
for (const auto& [alias, destination] : aliases_to_settings)map[destination].push_back(alias);
return map;
}();
return setting_to_aliases_mapping;
}
static std::string_view resolveName(std::string_view name)
{
if (auto it = aliases_to_settings.find(name);it != aliases_to_settings.end())return it->second;
return name;
}
};
const RefreshSettingsTraits::Accessor& RefreshSettingsTraits::Accessor::instance()
{
static const Accessor the_instance = []
{
Accessor res;
constexpr int IMPORTANT = 0x01;
UNUSED(IMPORTANT);
res.field_infos.emplace_back(FieldInfo{ "refresh_retries", "Int64",
"How many times to retry refresh query if it fails. If all attempts fail, wait for the next refresh time according to schedule. 0 to disable retries. -1 for infinite retries.",
(0) & IMPORTANT,
static_cast<bool>((0) & BaseSettingsHelpers::Flags::OBSOLETE),
[](const Field& value) -> Field
{ return static_cast<Field>(SettingFieldInt64{ value }); },
[](const Field& value) -> String
{ return SettingFieldInt64{ value }.toString(); },
[](const String& str) -> Field
{
SettingFieldInt64 temp;
temp.parseFromString(str);
return static_cast<Field>(temp);
}, [](Data& data, const Field& value)
{ data.refresh_retries = value; }, [](const Data& data) -> Field
{ return static_cast<Field>(data.refresh_retries); },
[](Data& data, const String& str)
{ data.refresh_retries.parseFromString(str); },
[](const Data& data) -> String
{ return data.refresh_retries.toString(); },
[](const Data& data) -> bool
{ return data.refresh_retries.changed; }, [](Data& data)
{ data.refresh_retries = SettingFieldInt64{ 2 }; },
[](const Data& data, WriteBuffer& out)
{ data.refresh_retries.writeBinary(out); },
[](Data& data, ReadBuffer& in)
{ data.refresh_retries.readBinary(in); }, []() -> Field
{ return static_cast<Field>(SettingFieldInt64{ 2 }); }, []() -> String
{ return SettingFieldInt64{ 2 }.toString(); }});
res.field_infos.emplace_back(FieldInfo{ "refresh_retry_initial_backoff_ms", "UInt64",
"Delay before the first retry if refresh query fails (if refresh_retries setting is not zero). Each subsequent retry doubles the delay, up to refresh_retry_max_backoff_ms.",
(0) & IMPORTANT,
static_cast<bool>((0) & BaseSettingsHelpers::Flags::OBSOLETE),
[](const Field& value) -> Field
{ return static_cast<Field>(SettingFieldUInt64{ value }); },
[](const Field& value) -> String
{ return SettingFieldUInt64{ value }.toString(); },
[](const String& str) -> Field
{
SettingFieldUInt64 temp;
temp.parseFromString(str);
return static_cast<Field>(temp);
}, [](Data& data, const Field& value)
{ data.refresh_retry_initial_backoff_ms = value; },
[](const Data& data) -> Field
{ return static_cast<Field>(data.refresh_retry_initial_backoff_ms); },
[](Data& data, const String& str)
{ data.refresh_retry_initial_backoff_ms.parseFromString(str); },
[](const Data& data) -> String
{ return data.refresh_retry_initial_backoff_ms.toString(); },
[](const Data& data) -> bool
{ return data.refresh_retry_initial_backoff_ms.changed; },
[](Data& data)
{ data.refresh_retry_initial_backoff_ms = SettingFieldUInt64{ 100 }; },
[](const Data& data, WriteBuffer& out)
{ data.refresh_retry_initial_backoff_ms.writeBinary(out); },
[](Data& data, ReadBuffer& in)
{ data.refresh_retry_initial_backoff_ms.readBinary(in); }, []() -> Field
{ return static_cast<Field>(SettingFieldUInt64{ 100 }); },
[]() -> String
{ return SettingFieldUInt64{ 100 }.toString(); }});
res.field_infos.emplace_back(FieldInfo{ "refresh_retry_max_backoff_ms", "UInt64",
"Limit on the exponential growth of delay between refresh attempts, if they keep failing and refresh_retries is positive.",
(0) & IMPORTANT,
static_cast<bool>((0) & BaseSettingsHelpers::Flags::OBSOLETE),
[](const Field& value) -> Field
{ return static_cast<Field>(SettingFieldUInt64{ value }); },
[](const Field& value) -> String
{ return SettingFieldUInt64{ value }.toString(); },
[](const String& str) -> Field
{
SettingFieldUInt64 temp;
temp.parseFromString(str);
return static_cast<Field>(temp);
}, [](Data& data, const Field& value)
{ data.refresh_retry_max_backoff_ms = value; },
[](const Data& data) -> Field
{ return static_cast<Field>(data.refresh_retry_max_backoff_ms); },
[](Data& data, const String& str)
{ data.refresh_retry_max_backoff_ms.parseFromString(str); },
[](const Data& data) -> String
{ return data.refresh_retry_max_backoff_ms.toString(); },
[](const Data& data) -> bool
{ return data.refresh_retry_max_backoff_ms.changed; }, [](Data& data)
{ data.refresh_retry_max_backoff_ms = SettingFieldUInt64{ 60'000 }; },
[](const Data& data, WriteBuffer& out)
{ data.refresh_retry_max_backoff_ms.writeBinary(out); },
[](Data& data, ReadBuffer& in)
{ data.refresh_retry_max_backoff_ms.readBinary(in); }, []() -> Field
{ return static_cast<Field>(SettingFieldUInt64{ 60'000 }); },
[]() -> String
{ return SettingFieldUInt64{ 60'000 }.toString(); }});
res.field_infos.emplace_back(FieldInfo{ "all_replicas", "Bool",
"If the materialized view is in a Replicated database, and APPEND is enabled, this flag controls whether all replicas or one replica will refresh.",
(0) & IMPORTANT,
static_cast<bool>((0) & BaseSettingsHelpers::Flags::OBSOLETE),
[](const Field& value) -> Field
{ return static_cast<Field>(SettingFieldBool{ value }); },
[](const Field& value) -> String
{ return SettingFieldBool{ value }.toString(); },
[](const String& str) -> Field
{
SettingFieldBool temp;
temp.parseFromString(str);
return static_cast<Field>(temp);
}, [](Data& data, const Field& value)
{ data.all_replicas = value; }, [](const Data& data) -> Field
{ return static_cast<Field>(data.all_replicas); },
[](Data& data, const String& str)
{ data.all_replicas.parseFromString(str); },
[](const Data& data) -> String
{ return data.all_replicas.toString(); }, [](const Data& data) -> bool
{ return data.all_replicas.changed; }, [](Data& data)
{ data.all_replicas = SettingFieldBool{ false }; },
[](const Data& data, WriteBuffer& out)
{ data.all_replicas.writeBinary(out); }, [](Data& data, ReadBuffer& in)
{ data.all_replicas.readBinary(in); }, []() -> Field
{ return static_cast<Field>(SettingFieldBool{ false }); },
[]() -> String
{ return SettingFieldBool{ false }.toString(); }});
for (size_t i : collections::range(res.field_infos.size()))
{
const auto& info = res.field_infos[i];
res.name_to_index_map.emplace(info.name, i);
}
return res;
}();
return the_instance;
}
RefreshSettingsTraits::Accessor::Accessor()
{
}
size_t RefreshSettingsTraits::Accessor::find(std::string_view name) const
{
auto it = name_to_index_map.find(name);
if (it != name_to_index_map.end())return it->second;
return static_cast<size_t>(-1);
}
template
class BaseSettings<RefreshSettingsTraits>;
struct RefreshSettingsImpl : public BaseSettings<RefreshSettingsTraits>
{

View File

@ -1,5 +1,4 @@
#pragma once
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
#include <IO/WriteBufferFromFileBase.h>
#include <base/types.h>
@ -16,7 +15,7 @@
namespace DB
{
struct ReadSettings;
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;

View File

@ -2,6 +2,7 @@
#include <cstddef>
#include <Compression/CompressionFactory.h>
#include <Compression/ICompressionCodec.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
@ -32,7 +33,7 @@ struct MergeTreeReaderSettings
bool checksum_on_read = true;
/// True if we read in order of sorting key.
bool read_in_order = false;
/// Use one buffer for each column or for all columns while reading from compact.
/// Use one buffer for each column or for all columns while reading from compact.P
CompactPartsReadMethod compact_parts_read_method = CompactPartsReadMethod::SingleBuffer;
/// True if we read stream for dictionary of LowCardinality type.
bool is_low_cardinality_dictionary = false;

View File

@ -1,9 +1,8 @@
#pragma once
#include <Storages/MarkCache.h>
#include <IO/ReadSettings.h>
#include <Common/ThreadPool_fwd.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
#include <Common/ThreadPool_fwd.h>
namespace DB
@ -11,6 +10,7 @@ namespace DB
struct MergeTreeIndexGranularityInfo;
using MarksPtr = MarkCache::MappedPtr;
struct ReadSettings;
class Threadpool;
/// Class that helps to get marks by indexes.

View File

@ -9,6 +9,7 @@
#include <IO/Operators.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadSettings.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Compression/CompressedReadBufferFromFile.h>