mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
Make settings values atomic to avoid race conditions
This commit is contained in:
parent
e0d18c0fe8
commit
a03fcd9f12
@ -708,7 +708,7 @@ void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & c
|
||||
|
||||
auto set_default_value = [] (auto && setting, auto && default_value)
|
||||
{
|
||||
setting = setting.changed ? setting.value : default_value;
|
||||
setting = setting.changed ? setting.getValue() : default_value;
|
||||
};
|
||||
|
||||
/// Override important settings
|
||||
|
@ -34,19 +34,19 @@ namespace ErrorCodes
|
||||
template <typename Type>
|
||||
String SettingNumber<Type>::toString() const
|
||||
{
|
||||
return DB::toString(value);
|
||||
return DB::toString(value.load(std::memory_order_relaxed));
|
||||
}
|
||||
|
||||
template <typename Type>
|
||||
Field SettingNumber<Type>::toField() const
|
||||
{
|
||||
return value;
|
||||
return value.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
template <typename Type>
|
||||
void SettingNumber<Type>::set(Type x)
|
||||
{
|
||||
value = x;
|
||||
value.store(x, std::memory_order_relaxed);
|
||||
changed = true;
|
||||
}
|
||||
|
||||
@ -136,17 +136,17 @@ template struct SettingNumber<bool>;
|
||||
String SettingMaxThreads::toString() const
|
||||
{
|
||||
/// Instead of the `auto` value, we output the actual value to make it easier to see.
|
||||
return is_auto ? ("auto(" + DB::toString(value) + ")") : DB::toString(value);
|
||||
return is_auto ? ("auto(" + DB::toString(getValue()) + ")") : DB::toString(getValue());
|
||||
}
|
||||
|
||||
Field SettingMaxThreads::toField() const
|
||||
{
|
||||
return is_auto ? 0 : value;
|
||||
return is_auto ? 0 : getValue();
|
||||
}
|
||||
|
||||
void SettingMaxThreads::set(UInt64 x)
|
||||
{
|
||||
value = x ? x : getAutoValue();
|
||||
value.store(x ? x : getAutoValue(), std::memory_order_relaxed);
|
||||
is_auto = x == 0;
|
||||
changed = true;
|
||||
}
|
||||
@ -169,7 +169,7 @@ void SettingMaxThreads::set(const String & x)
|
||||
|
||||
void SettingMaxThreads::serialize(WriteBuffer & buf) const
|
||||
{
|
||||
writeVarUInt(is_auto ? 0 : value, buf);
|
||||
writeVarUInt(is_auto ? 0 : getValue(), buf);
|
||||
}
|
||||
|
||||
void SettingMaxThreads::deserialize(ReadBuffer & buf)
|
||||
@ -195,18 +195,21 @@ UInt64 SettingMaxThreads::getAutoValue() const
|
||||
template <SettingTimespanIO io_unit>
|
||||
String SettingTimespan<io_unit>::toString() const
|
||||
{
|
||||
std::lock_guard lock(m);
|
||||
return DB::toString(value.totalMicroseconds() / microseconds_per_io_unit);
|
||||
}
|
||||
|
||||
template <SettingTimespanIO io_unit>
|
||||
Field SettingTimespan<io_unit>::toField() const
|
||||
{
|
||||
std::lock_guard lock(m);
|
||||
return value.totalMicroseconds() / microseconds_per_io_unit;
|
||||
}
|
||||
|
||||
template <SettingTimespanIO io_unit>
|
||||
void SettingTimespan<io_unit>::set(const Poco::Timespan & x)
|
||||
{
|
||||
std::lock_guard lock(m);
|
||||
value = x;
|
||||
changed = true;
|
||||
}
|
||||
@ -235,6 +238,7 @@ void SettingTimespan<io_unit>::set(const String & x)
|
||||
template <SettingTimespanIO io_unit>
|
||||
void SettingTimespan<io_unit>::serialize(WriteBuffer & buf) const
|
||||
{
|
||||
std::lock_guard lock(m);
|
||||
writeVarUInt(value.totalMicroseconds() / microseconds_per_io_unit, buf);
|
||||
}
|
||||
|
||||
@ -252,16 +256,19 @@ template struct SettingTimespan<SettingTimespanIO::MILLISECOND>;
|
||||
|
||||
String SettingString::toString() const
|
||||
{
|
||||
std::lock_guard lock(m);
|
||||
return value;
|
||||
}
|
||||
|
||||
Field SettingString::toField() const
|
||||
{
|
||||
std::lock_guard lock(m);
|
||||
return value;
|
||||
}
|
||||
|
||||
void SettingString::set(const String & x)
|
||||
{
|
||||
std::lock_guard lock(m);
|
||||
value = x;
|
||||
changed = true;
|
||||
}
|
||||
@ -273,6 +280,7 @@ void SettingString::set(const Field & x)
|
||||
|
||||
void SettingString::serialize(WriteBuffer & buf) const
|
||||
{
|
||||
std::lock_guard lock(m);
|
||||
writeBinary(value, buf);
|
||||
}
|
||||
|
||||
@ -296,7 +304,7 @@ Field SettingChar::toField() const
|
||||
|
||||
void SettingChar::set(char x)
|
||||
{
|
||||
value = x;
|
||||
value.store(x, std::memory_order_relaxed);
|
||||
changed = true;
|
||||
}
|
||||
|
||||
@ -351,7 +359,7 @@ void SettingEnum<EnumType, Tag>::deserialize(ReadBuffer & buf)
|
||||
{ \
|
||||
using EnumType = ENUM_NAME; \
|
||||
using UnderlyingType = std::underlying_type<EnumType>::type; \
|
||||
switch (static_cast<UnderlyingType>(value)) \
|
||||
switch (static_cast<UnderlyingType>(getValue())) \
|
||||
{ \
|
||||
LIST_OF_NAMES_MACRO(IMPLEMENT_SETTING_ENUM_TO_STRING_HELPER_) \
|
||||
} \
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <Core/Types.h>
|
||||
#include <ext/singleton.h>
|
||||
#include <unordered_map>
|
||||
#include <atomic>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -32,13 +33,23 @@ namespace ErrorCodes
|
||||
template <typename Type>
|
||||
struct SettingNumber
|
||||
{
|
||||
Type value;
|
||||
public:
|
||||
/// The value is atomic, because we want to avoid race conditions on value precisely.
|
||||
/// It doesn't gurantee atomicy on whole structure. It just helps to avoid the most common
|
||||
/// case: when we change setting from one thread and use in another (for example in MergeTreeSettings).
|
||||
///
|
||||
std::atomic<Type> value;
|
||||
|
||||
bool changed = false;
|
||||
|
||||
SettingNumber(Type x = 0) : value(x) {}
|
||||
SettingNumber(const SettingNumber & o) : value(o.getValue()), changed(o.changed) { }
|
||||
|
||||
operator Type() const { return getValue(); }
|
||||
Type getValue() const { return value.load(std::memory_order_relaxed); }
|
||||
|
||||
operator Type() const { return value; }
|
||||
SettingNumber & operator= (Type x) { set(x); return *this; }
|
||||
SettingNumber & operator= (SettingNumber o) { set(o.getValue()); return *this; }
|
||||
|
||||
/// Serialize to a test string.
|
||||
String toString() const;
|
||||
@ -73,14 +84,23 @@ using SettingBool = SettingNumber<bool>;
|
||||
*/
|
||||
struct SettingMaxThreads
|
||||
{
|
||||
UInt64 value;
|
||||
std::atomic<UInt64> value;
|
||||
bool is_auto;
|
||||
bool changed = false;
|
||||
|
||||
SettingMaxThreads(UInt64 x = 0) : value(x ? x : getAutoValue()), is_auto(x == 0) {}
|
||||
SettingMaxThreads(const SettingMaxThreads & o)
|
||||
: is_auto(o.is_auto)
|
||||
, changed(o.changed)
|
||||
{
|
||||
value.store(o.value, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
operator UInt64() const { return getValue(); }
|
||||
UInt64 getValue() const { return value.load(std::memory_order_relaxed); }
|
||||
|
||||
operator UInt64() const { return value; }
|
||||
SettingMaxThreads & operator= (UInt64 x) { set(x); return *this; }
|
||||
SettingMaxThreads & operator= (const SettingMaxThreads & o) { set(o.getValue()); return *this; }
|
||||
|
||||
String toString() const;
|
||||
Field toField() const;
|
||||
@ -102,16 +122,20 @@ enum class SettingTimespanIO { MILLISECOND, SECOND };
|
||||
template <SettingTimespanIO io_unit>
|
||||
struct SettingTimespan
|
||||
{
|
||||
mutable std::mutex m;
|
||||
Poco::Timespan value;
|
||||
bool changed = false;
|
||||
|
||||
SettingTimespan(UInt64 x = 0) : value(x * microseconds_per_io_unit) {}
|
||||
SettingTimespan(const SettingTimespan & o) : value(o.value), changed(o.changed) {}
|
||||
|
||||
operator Poco::Timespan() const { return value; }
|
||||
operator Poco::Timespan() const { return getValue(); }
|
||||
Poco::Timespan getValue() const { std::lock_guard guard(m); return value; }
|
||||
SettingTimespan & operator= (const Poco::Timespan & x) { set(x); return *this; }
|
||||
SettingTimespan & operator= (const SettingTimespan & o) { set(o.value); return *this; }
|
||||
|
||||
Poco::Timespan::TimeDiff totalSeconds() const { return value.totalSeconds(); }
|
||||
Poco::Timespan::TimeDiff totalMilliseconds() const { return value.totalMilliseconds(); }
|
||||
Poco::Timespan::TimeDiff totalSeconds() const { return getValue().totalSeconds(); }
|
||||
Poco::Timespan::TimeDiff totalMilliseconds() const { return getValue().totalMilliseconds(); }
|
||||
|
||||
String toString() const;
|
||||
Field toField() const;
|
||||
@ -134,13 +158,18 @@ using SettingMilliseconds = SettingTimespan<SettingTimespanIO::MILLISECOND>;
|
||||
|
||||
struct SettingString
|
||||
{
|
||||
mutable std::mutex m;
|
||||
String value;
|
||||
bool changed = false;
|
||||
|
||||
SettingString(const String & x = String{}) : value(x) {}
|
||||
SettingString(const SettingString & o) : value(o.value), changed(o.changed) {}
|
||||
|
||||
operator String() const { return getValue(); }
|
||||
String getValue() const { std::lock_guard guard(m); return value; }
|
||||
|
||||
operator String() const { return value; }
|
||||
SettingString & operator= (const String & x) { set(x); return *this; }
|
||||
SettingString & operator= (const SettingString & o) { set(o.value); return *this; }
|
||||
|
||||
String toString() const;
|
||||
Field toField() const;
|
||||
@ -156,13 +185,15 @@ struct SettingString
|
||||
struct SettingChar
|
||||
{
|
||||
public:
|
||||
char value;
|
||||
std::atomic<char> value;
|
||||
bool changed = false;
|
||||
|
||||
SettingChar(char x = '\0') : value(x) {}
|
||||
SettingChar(const SettingChar & o) : value(o.getValue()), changed(o.changed) { }
|
||||
|
||||
operator char() const { return value; }
|
||||
operator char() const { return getValue(); }
|
||||
SettingChar & operator= (char x) { set(x); return *this; }
|
||||
SettingChar & operator= (const SettingChar & o) { set(o.getValue()); return *this; }
|
||||
|
||||
String toString() const;
|
||||
Field toField() const;
|
||||
@ -173,6 +204,8 @@ public:
|
||||
|
||||
void serialize(WriteBuffer & buf) const;
|
||||
void deserialize(ReadBuffer & buf);
|
||||
|
||||
char getValue() const { return value.load(std::memory_order_relaxed); }
|
||||
};
|
||||
|
||||
|
||||
@ -180,23 +213,27 @@ public:
|
||||
template <typename EnumType, typename Tag = void>
|
||||
struct SettingEnum
|
||||
{
|
||||
EnumType value;
|
||||
std::atomic<EnumType> value;
|
||||
bool changed = false;
|
||||
|
||||
SettingEnum(EnumType x) : value(x) {}
|
||||
SettingEnum(const SettingEnum & o) : value(o.getValue()), changed(o.changed) { }
|
||||
|
||||
operator EnumType() const { return value; }
|
||||
operator EnumType() const { return getValue(); }
|
||||
SettingEnum & operator= (EnumType x) { set(x); return *this; }
|
||||
SettingEnum & operator= (SettingEnum o) { set(o.getValue()); return *this; }
|
||||
|
||||
String toString() const;
|
||||
Field toField() const { return toString(); }
|
||||
|
||||
void set(EnumType x) { value = x; changed = true; }
|
||||
void set(EnumType x) { value.store(x, std::memory_order_relaxed); changed = true; }
|
||||
void set(const Field & x) { set(safeGet<const String &>(x)); }
|
||||
void set(const String & x);
|
||||
|
||||
void serialize(WriteBuffer & buf) const;
|
||||
void deserialize(ReadBuffer & buf);
|
||||
|
||||
EnumType getValue() const { return value.load(std::memory_order_relaxed); }
|
||||
};
|
||||
|
||||
|
||||
|
@ -30,8 +30,8 @@ bool LogicalExpressionsOptimizer::OrWithExpression::operator<(const OrWithExpres
|
||||
return std::tie(this->or_function, this->expression) < std::tie(rhs.or_function, rhs.expression);
|
||||
}
|
||||
|
||||
LogicalExpressionsOptimizer::LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, ExtractedSettings && settings_)
|
||||
: select_query(select_query_), settings(settings_)
|
||||
LogicalExpressionsOptimizer::LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, UInt64 optimize_min_equality_disjunction_chain_length)
|
||||
: select_query(select_query_), settings(optimize_min_equality_disjunction_chain_length)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -36,7 +36,7 @@ class LogicalExpressionsOptimizer final
|
||||
|
||||
public:
|
||||
/// Constructor. Accepts the root of the query DAG.
|
||||
LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, ExtractedSettings && settings_);
|
||||
LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, UInt64 optimize_min_equality_disjunction_chain_length);
|
||||
|
||||
/** Replace all rather long homogeneous OR-chains expr = x1 OR ... OR expr = xN
|
||||
* on the expressions `expr` IN (x1, ..., xN).
|
||||
|
@ -596,7 +596,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
|
||||
InJoinSubqueriesPreprocessor(context).visit(query);
|
||||
|
||||
/// Optimizes logical expressions.
|
||||
LogicalExpressionsOptimizer(select_query, settings.optimize_min_equality_disjunction_chain_length.value).perform();
|
||||
LogicalExpressionsOptimizer(select_query, settings.optimize_min_equality_disjunction_chain_length).perform();
|
||||
}
|
||||
|
||||
/// Creates a dictionary `aliases`: alias -> ASTPtr
|
||||
|
@ -400,8 +400,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
|
||||
if (relative_sample_size == RelativeSize(0))
|
||||
relative_sample_size = 1;
|
||||
|
||||
relative_sample_size /= settings.parallel_replicas_count.value;
|
||||
relative_sample_offset += relative_sample_size * RelativeSize(settings.parallel_replica_offset.value);
|
||||
relative_sample_size /= settings.parallel_replicas_count.getValue();
|
||||
relative_sample_offset += relative_sample_size * RelativeSize(settings.parallel_replica_offset.getValue());
|
||||
}
|
||||
|
||||
if (relative_sample_offset >= RelativeSize(1))
|
||||
|
@ -69,7 +69,7 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
if (std::find_if(changes.begin(), changes.end(), \
|
||||
[](const SettingChange & c) { return c.name == #NAME; }) \
|
||||
== changes.end()) \
|
||||
changes.push_back(SettingChange{#NAME, NAME.value});
|
||||
changes.push_back(SettingChange{#NAME, NAME.getValue()});
|
||||
|
||||
APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(ADD_IF_ABSENT)
|
||||
#undef ADD_IF_ABSENT
|
||||
|
Loading…
Reference in New Issue
Block a user