Make atomic settings

This commit is contained in:
alesapin 2019-08-09 16:02:01 +03:00
parent 50cabe4ab0
commit 2803fcc2ba
13 changed files with 230 additions and 110 deletions

View File

@ -350,7 +350,7 @@ if (OS_LINUX AND NOT UNBUNDLED AND (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_L
endif ()
# Add Libc. GLIBC is actually a collection of interdependent libraries.
set (DEFAULT_LIBS "${DEFAULT_LIBS} -lrt -ldl -lpthread -lm -lc")
set (DEFAULT_LIBS "${DEFAULT_LIBS} -lrt -ldl -lpthread -lm -lc -latomic")
# Note: we'd rather use Musl libc library, but it's little bit more difficult to use.

View File

@ -323,7 +323,7 @@ private:
insert_format = "Values";
/// Setting value from cmd arg overrides one from config
if (context.getSettingsRef().max_insert_block_size.changed)
if (context.getSettingsRef().max_insert_block_size.isChanged())
insert_format_max_block_size = context.getSettingsRef().max_insert_block_size;
else
insert_format_max_block_size = config().getInt("insert_format_max_block_size", context.getSettingsRef().max_insert_block_size);

View File

@ -708,7 +708,7 @@ void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & c
auto set_default_value = [] (auto && setting, auto && default_value)
{
setting = setting.changed ? setting.getValue() : default_value;
setting = setting.isChanged() ? setting.getValue() : default_value;
};
/// Override important settings

View File

@ -182,11 +182,11 @@ void TCPHandler::runImpl()
/// Should we send internal logs to client?
const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
&& client_logs_level.value != LogsLevel::none)
&& client_logs_level != LogsLevel::none)
{
state.logs_queue = std::make_shared<InternalTextLogsQueue>();
state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level.value);
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level);
}
query_context->setExternalTablesInitializer([&connection_settings, this] (Context & context)
@ -329,7 +329,7 @@ void TCPHandler::readData(const Settings & connection_settings)
const auto receive_timeout = query_context->getSettingsRef().receive_timeout.value;
/// Poll interval should not be greater than receive_timeout
const size_t default_poll_interval = connection_settings.poll_interval.value * 1000000;
const size_t default_poll_interval = connection_settings.poll_interval * 1000000;
size_t current_poll_interval = static_cast<size_t>(receive_timeout.totalMicroseconds());
constexpr size_t min_poll_interval = 5000; // 5 ms
size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval));

View File

@ -34,20 +34,19 @@ namespace ErrorCodes
template <typename Type>
String SettingNumber<Type>::toString() const
{
return DB::toString(value.load(std::memory_order_relaxed));
return DB::toString(getValue());
}
template <typename Type>
Field SettingNumber<Type>::toField() const
{
return value.load(std::memory_order_relaxed);
return getValue();
}
template <typename Type>
void SettingNumber<Type>::set(Type x)
{
value.store(x, std::memory_order_relaxed);
changed = true;
data.store(Data{x, true}, std::memory_order_relaxed);
}
template <typename Type>
@ -59,6 +58,14 @@ void SettingNumber<Type>::set(const Field & x)
set(applyVisitor(FieldVisitorConvertToNumber<Type>(), x));
}
template <typename Type>
SettingNumber<Type> & SettingNumber<Type>::operator= (const SettingNumber & o)
{
data.store(o.data.load(std::memory_order_relaxed), std::memory_order_relaxed);
return *this;
}
template <typename Type>
void SettingNumber<Type>::set(const String & x)
{
@ -93,9 +100,9 @@ template <typename Type>
void SettingNumber<Type>::serialize(WriteBuffer & buf) const
{
if constexpr (std::is_integral_v<Type> && std::is_unsigned_v<Type>)
writeVarUInt(static_cast<UInt64>(value), buf);
writeVarUInt(static_cast<UInt64>(getValue()), buf);
else if constexpr (std::is_integral_v<Type> && std::is_signed_v<Type>)
writeVarInt(static_cast<Int64>(value), buf);
writeVarInt(static_cast<Int64>(getValue()), buf);
else
{
static_assert(std::is_floating_point_v<Type>);
@ -133,22 +140,28 @@ template struct SettingNumber<float>;
template struct SettingNumber<bool>;
SettingMaxThreads & SettingMaxThreads::operator= (const SettingMaxThreads & o)
{
data.store(o.data.load(std::memory_order_relaxed), std::memory_order_relaxed);
return *this;
}
String SettingMaxThreads::toString() const
{
auto d = data.load(std::memory_order_relaxed);
/// Instead of the `auto` value, we output the actual value to make it easier to see.
return is_auto ? ("auto(" + DB::toString(getValue()) + ")") : DB::toString(getValue());
return d.is_auto ? ("auto(" + DB::toString(d.value) + ")") : DB::toString(d.value);
}
Field SettingMaxThreads::toField() const
{
return is_auto ? 0 : getValue();
auto d = data.load(std::memory_order_relaxed);
return d.is_auto ? 0 : d.value;
}
void SettingMaxThreads::set(UInt64 x)
{
value.store(x ? x : getAutoValue(), std::memory_order_relaxed);
is_auto = x == 0;
changed = true;
data.store({x ? x : getAutoValue(), x == 0, true});
}
void SettingMaxThreads::set(const Field & x)
@ -169,7 +182,8 @@ void SettingMaxThreads::set(const String & x)
void SettingMaxThreads::serialize(WriteBuffer & buf) const
{
writeVarUInt(is_auto ? 0 : getValue(), buf);
auto d = data.load(std::memory_order_relaxed);
writeVarUInt(d.is_auto ? 0 : d.value, buf);
}
void SettingMaxThreads::deserialize(ReadBuffer & buf)
@ -181,8 +195,7 @@ void SettingMaxThreads::deserialize(ReadBuffer & buf)
void SettingMaxThreads::setAuto()
{
value = getAutoValue();
is_auto = true;
data.store({getAutoValue(), true, isChanged()});
}
UInt64 SettingMaxThreads::getAutoValue() const
@ -191,25 +204,54 @@ UInt64 SettingMaxThreads::getAutoValue() const
return res;
}
void SettingMaxThreads::setChanged(bool changed)
{
auto d = data.load(std::memory_order_relaxed);
data.store({d.value, d.is_auto, changed});
}
template <SettingTimespanIO io_unit>
SettingTimespan<io_unit> & SettingTimespan<io_unit>::operator= (const SettingTimespan & o)
{
std::shared_lock lock_o(o.mutex);
value = o.value;
changed = o.changed;
return *this;
}
template <SettingTimespanIO io_unit>
SettingTimespan<io_unit>::SettingTimespan(const SettingTimespan & o)
{
std::shared_lock lock_o(o.mutex);
value = o.value;
changed = o.changed;
}
template <SettingTimespanIO io_unit>
void SettingTimespan<io_unit>::setChanged(bool c)
{
std::unique_lock lock(mutex);
changed = c;
}
template <SettingTimespanIO io_unit>
String SettingTimespan<io_unit>::toString() const
{
std::lock_guard lock(m);
return DB::toString(value.totalMicroseconds() / microseconds_per_io_unit);
return DB::toString(getValue().totalMicroseconds() / microseconds_per_io_unit);
}
template <SettingTimespanIO io_unit>
Field SettingTimespan<io_unit>::toField() const
{
std::lock_guard lock(m);
return value.totalMicroseconds() / microseconds_per_io_unit;
return getValue().totalMicroseconds() / microseconds_per_io_unit;
}
template <SettingTimespanIO io_unit>
void SettingTimespan<io_unit>::set(const Poco::Timespan & x)
{
std::lock_guard lock(m);
std::unique_lock lock(mutex);
value = x;
changed = true;
}
@ -238,8 +280,7 @@ void SettingTimespan<io_unit>::set(const String & x)
template <SettingTimespanIO io_unit>
void SettingTimespan<io_unit>::serialize(WriteBuffer & buf) const
{
std::lock_guard lock(m);
writeVarUInt(value.totalMicroseconds() / microseconds_per_io_unit, buf);
writeVarUInt(getValue().totalMicroseconds() / microseconds_per_io_unit, buf);
}
template <SettingTimespanIO io_unit>
@ -253,26 +294,47 @@ void SettingTimespan<io_unit>::deserialize(ReadBuffer & buf)
template struct SettingTimespan<SettingTimespanIO::SECOND>;
template struct SettingTimespan<SettingTimespanIO::MILLISECOND>;
SettingString & SettingString::operator= (const SettingString & o)
{
std::shared_lock lock_o(o.mutex);
value = o.value;
changed = o.changed;
return *this;
}
SettingString::SettingString(const SettingString & o)
{
std::shared_lock lock(o.mutex);
value = o.value;
changed = o.changed;
}
String SettingString::toString() const
{
std::lock_guard lock(m);
std::shared_lock lock(mutex);
return value;
}
Field SettingString::toField() const
{
std::lock_guard lock(m);
std::shared_lock lock(mutex);
return value;
}
void SettingString::set(const String & x)
{
std::lock_guard lock(m);
std::unique_lock lock(mutex);
value = x;
changed = true;
}
void SettingString::setChanged(bool c)
{
std::unique_lock lock(mutex);
changed = c;
}
void SettingString::set(const Field & x)
{
set(safeGet<const String &>(x));
@ -280,7 +342,6 @@ void SettingString::set(const Field & x)
void SettingString::serialize(WriteBuffer & buf) const
{
std::lock_guard lock(m);
writeBinary(value, buf);
}
@ -291,10 +352,15 @@ void SettingString::deserialize(ReadBuffer & buf)
set(s);
}
SettingChar & SettingChar::operator= (const SettingChar & o)
{
data.store(o.data.load(std::memory_order_relaxed), std::memory_order_relaxed);
return *this;
}
String SettingChar::toString() const
{
return String(1, value);
return String(1, getValue());
}
Field SettingChar::toField() const
@ -304,8 +370,7 @@ Field SettingChar::toField() const
void SettingChar::set(char x)
{
value.store(x, std::memory_order_relaxed);
changed = true;
data.store({x, true});
}
void SettingChar::set(const String & x)
@ -335,6 +400,19 @@ void SettingChar::deserialize(ReadBuffer & buf)
}
template <typename EnumType, typename Tag>
SettingEnum<EnumType, Tag> & SettingEnum<EnumType, Tag>::operator= (const SettingEnum & o)
{
data.store(o.data.load(std::memory_order_relaxed), std::memory_order_relaxed);
return *this;
}
template <typename EnumType, typename Tag>
void SettingEnum<EnumType, Tag>::set(EnumType x)
{
data.store({x, true}, std::memory_order_relaxed);
}
template <typename EnumType, typename Tag>
void SettingEnum<EnumType, Tag>::serialize(WriteBuffer & buf) const
{
@ -350,6 +428,7 @@ void SettingEnum<EnumType, Tag>::deserialize(ReadBuffer & buf)
}
#define IMPLEMENT_SETTING_ENUM(ENUM_NAME, LIST_OF_NAMES_MACRO, ERROR_CODE_FOR_UNEXPECTED_NAME) \
IMPLEMENT_SETTING_ENUM_WITH_TAG(ENUM_NAME, void, LIST_OF_NAMES_MACRO, ERROR_CODE_FOR_UNEXPECTED_NAME)

View File

@ -9,6 +9,7 @@
#include <ext/singleton.h>
#include <unordered_map>
#include <atomic>
#include <shared_mutex>
namespace DB
@ -33,22 +34,25 @@ namespace ErrorCodes
template <typename Type>
struct SettingNumber
{
public:
/// The value is atomic, because we want to avoid race conditions on value precisely.
/// It doesn't gurantee atomicy on whole structure. It just helps to avoid the most common
/// case: when we change setting from one thread and use in another (for example in MergeTreeSettings).
std::atomic<Type> value;
struct Data
{
Type value;
bool changed;
};
bool changed = false;
std::atomic<Data> data;
SettingNumber(Type x = 0) : value(x) {}
SettingNumber(const SettingNumber & o) : value(o.getValue()), changed(o.changed) { }
SettingNumber(Type x = 0) : data{{x, false}} {}
SettingNumber(const SettingNumber & o) : data{o.data.load(std::memory_order_relaxed)} {}
bool isChanged() const { return data.load(std::memory_order_relaxed).changed; }
void setChanged(bool changed) { data.store({getValue(), changed}, std::memory_order_relaxed); }
operator Type() const { return getValue(); }
Type getValue() const { return value.load(std::memory_order_relaxed); }
Type getValue() const { return data.load(std::memory_order_relaxed).value; }
SettingNumber & operator= (Type x) { set(x); return *this; }
SettingNumber & operator= (SettingNumber o) { set(o.getValue()); return *this; }
SettingNumber & operator= (const SettingNumber & o);
/// Serialize to a test string.
String toString() const;
@ -83,23 +87,26 @@ using SettingBool = SettingNumber<bool>;
*/
struct SettingMaxThreads
{
std::atomic<UInt64> value;
bool is_auto;
bool changed = false;
SettingMaxThreads(UInt64 x = 0) : value(x ? x : getAutoValue()), is_auto(x == 0) {}
SettingMaxThreads(const SettingMaxThreads & o)
: is_auto(o.is_auto)
, changed(o.changed)
struct Data
{
value.store(o.value, std::memory_order_relaxed);
}
UInt64 value;
bool is_auto;
bool changed;
};
std::atomic<Data> data;
SettingMaxThreads(UInt64 x = 0) : data{{x ? x : getAutoValue(), x == 0, false}} {}
SettingMaxThreads(const SettingMaxThreads & o) : data{o.data.load(std::memory_order_relaxed)} {}
bool isChanged() const { return data.load(std::memory_order_relaxed).changed; }
void setChanged(bool changed);
operator UInt64() const { return getValue(); }
UInt64 getValue() const { return value.load(std::memory_order_relaxed); }
UInt64 getValue() const { return data.load(std::memory_order_relaxed).value; }
SettingMaxThreads & operator= (UInt64 x) { set(x); return *this; }
SettingMaxThreads & operator= (const SettingMaxThreads & o) { set(o.getValue()); return *this; }
SettingMaxThreads & operator= (const SettingMaxThreads & o);
String toString() const;
Field toField() const;
@ -111,6 +118,7 @@ struct SettingMaxThreads
void serialize(WriteBuffer & buf) const;
void deserialize(ReadBuffer & buf);
bool isAuto() const { return data.load(std::memory_order_relaxed).is_auto; }
void setAuto();
UInt64 getAutoValue() const;
};
@ -121,20 +129,37 @@ enum class SettingTimespanIO { MILLISECOND, SECOND };
template <SettingTimespanIO io_unit>
struct SettingTimespan
{
mutable std::mutex m;
mutable std::shared_mutex mutex;
Poco::Timespan value;
bool changed = false;
SettingTimespan(UInt64 x = 0) : value(x * microseconds_per_io_unit) {}
SettingTimespan(const SettingTimespan & o) : value(o.value), changed(o.changed) {}
SettingTimespan(const SettingTimespan & o);
operator Poco::Timespan() const { return getValue(); }
Poco::Timespan getValue() const { std::lock_guard guard(m); return value; }
Poco::Timespan getValue() const { std::shared_lock lock(mutex); return value; }
SettingTimespan & operator= (const Poco::Timespan & x) { set(x); return *this; }
SettingTimespan & operator= (const SettingTimespan & o) { set(o.value); return *this; }
SettingTimespan & operator= (const SettingTimespan & o);
Poco::Timespan::TimeDiff totalSeconds() const { return getValue().totalSeconds(); }
Poco::Timespan::TimeDiff totalMilliseconds() const { return getValue().totalMilliseconds(); }
Poco::Timespan::TimeDiff totalSeconds() const
{
std::shared_lock lock(mutex);
return value.totalSeconds();
}
Poco::Timespan::TimeDiff totalMilliseconds() const
{
std::shared_lock lock(mutex);
return value.totalMilliseconds();
}
bool isChanged() const
{
std::shared_lock lock(mutex);
return changed;
}
void setChanged(bool changed);
String toString() const;
Field toField() const;
@ -157,18 +182,19 @@ using SettingMilliseconds = SettingTimespan<SettingTimespanIO::MILLISECOND>;
struct SettingString
{
mutable std::mutex m;
mutable std::shared_mutex mutex;
String value;
bool changed = false;
SettingString(const String & x = String{}) : value(x) {}
SettingString(const SettingString & o) : value(o.value), changed(o.changed) {}
SettingString(const SettingString & o);
operator String() const { return getValue(); }
String getValue() const { std::lock_guard guard(m); return value; }
String getValue() const { std::shared_lock lock(mutex); return value; }
SettingString & operator= (const String & x) { set(x); return *this; }
SettingString & operator= (const SettingString & o) { set(o.value); return *this; }
SettingString & operator= (const SettingString & o);
bool isChanged() const { std::shared_lock lock(mutex); return changed; }
void setChanged(bool changed);
String toString() const;
Field toField() const;
@ -184,15 +210,25 @@ struct SettingString
struct SettingChar
{
public:
std::atomic<char> value;
bool changed = false;
struct Data
{
char value;
bool changed;
};
SettingChar(char x = '\0') : value(x) {}
SettingChar(const SettingChar & o) : value(o.getValue()), changed(o.changed) { }
std::atomic<Data> data;
SettingChar(char x = '\0') : data({x, false}) {}
SettingChar(const SettingChar & o) : data{o.data.load(std::memory_order_relaxed)} {}
operator char() const { return getValue(); }
char getValue() const { return data.load(std::memory_order_relaxed).value; }
SettingChar & operator= (char x) { set(x); return *this; }
SettingChar & operator= (const SettingChar & o) { set(o.getValue()); return *this; }
SettingChar & operator= (const SettingChar & o);
bool isChanged() const { return data.load(std::memory_order_relaxed).changed; }
void setChanged(bool changed) { data.store({getValue(), changed}, std::memory_order_relaxed);}
String toString() const;
Field toField() const;
@ -203,8 +239,6 @@ public:
void serialize(WriteBuffer & buf) const;
void deserialize(ReadBuffer & buf);
char getValue() const { return value.load(std::memory_order_relaxed); }
};
@ -212,27 +246,35 @@ public:
template <typename EnumType, typename Tag = void>
struct SettingEnum
{
std::atomic<EnumType> value;
bool changed = false;
struct Data
{
EnumType value;
bool changed;
};
SettingEnum(EnumType x) : value(x) {}
SettingEnum(const SettingEnum & o) : value(o.getValue()), changed(o.changed) { }
std::atomic<Data> data;
SettingEnum(EnumType x) : data({x, false}) {}
SettingEnum(const SettingEnum & o) : data{o.data.load(std::memory_order_relaxed)} {}
operator EnumType() const { return getValue(); }
EnumType getValue() const { return data.load(std::memory_order_relaxed).value; }
SettingEnum & operator= (EnumType x) { set(x); return *this; }
SettingEnum & operator= (SettingEnum o) { set(o.getValue()); return *this; }
SettingEnum & operator= (const SettingEnum & o);
bool isChanged() const { return data.load(std::memory_order_relaxed).changed; }
void setChanged(bool changed) { data.store({getValue(), changed}, std::memory_order_relaxed);}
String toString() const;
Field toField() const { return toString(); }
void set(EnumType x) { value.store(x, std::memory_order_relaxed); changed = true; }
void set(EnumType x);
void set(const Field & x) { set(safeGet<const String &>(x)); }
void set(const String & x);
void serialize(WriteBuffer & buf) const;
void deserialize(ReadBuffer & buf);
EnumType getValue() const { return value.load(std::memory_order_relaxed); }
};
@ -344,6 +386,7 @@ private:
Derived & castToDerived() { return *static_cast<Derived *>(this); }
const Derived & castToDerived() const { return *static_cast<const Derived *>(this); }
using IsChangedFunction = bool (*)(const Derived &);
using GetStringFunction = String (*)(const Derived &);
using GetFieldFunction = Field (*)(const Derived &);
using SetStringFunction = void (*)(Derived &, const String &);
@ -354,7 +397,7 @@ private:
struct MemberInfo
{
size_t offset_of_changed;
IsChangedFunction is_changed;
StringRef name;
StringRef description;
/// Can be updated after first load for config/definition.
@ -369,7 +412,7 @@ private:
DeserializeFunction deserialize;
CastValueWithoutApplyingFunction cast_value_without_applying;
bool isChanged(const Derived & collection) const { return *reinterpret_cast<const bool*>(reinterpret_cast<const UInt8*>(&collection) + offset_of_changed); }
bool isChanged(const Derived & collection) const { return is_changed(collection); }
};
class MemberInfos
@ -729,8 +772,7 @@ public:
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
static_assert(std::is_same_v<decltype(std::declval<Derived>().NAME.changed), bool>); \
add({offsetof(Derived, NAME.changed), \
add({[](const Derived & d) { return d.NAME.isChanged(); }, \
StringRef(#NAME, strlen(#NAME)), StringRef(#DESCRIPTION, strlen(#DESCRIPTION)), true, \
&Functions::NAME##_getString, &Functions::NAME##_getField, \
&Functions::NAME##_setString, &Functions::NAME##_setField, \
@ -738,8 +780,7 @@ public:
&Functions::NAME##_castValueWithoutApplying });
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_IMMUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
static_assert(std::is_same_v<decltype(std::declval<Derived>().NAME.changed), bool>); \
add({offsetof(Derived, NAME.changed), \
add({[](const Derived & d) { return d.NAME.isChanged(); }, \
StringRef(#NAME, strlen(#NAME)), StringRef(#DESCRIPTION, strlen(#DESCRIPTION)), false, \
&Functions::NAME##_getString, &Functions::NAME##_getField, \
&Functions::NAME##_setString, &Functions::NAME##_setField, \

View File

@ -26,9 +26,9 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
new_settings.max_memory_usage_for_all_queries = 0;
/// Set as unchanged to avoid sending to remote server.
new_settings.max_concurrent_queries_for_user.changed = false;
new_settings.max_memory_usage_for_user.changed = false;
new_settings.max_memory_usage_for_all_queries.changed = false;
new_settings.max_concurrent_queries_for_user.setChanged(false);
new_settings.max_memory_usage_for_user.setChanged(false);
new_settings.max_memory_usage_for_all_queries.setChanged(false);
Context new_context(context);
new_context.setSettings(new_settings);

View File

@ -596,7 +596,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
InJoinSubqueriesPreprocessor(context).visit(query);
/// Optimizes logical expressions.
LogicalExpressionsOptimizer(select_query, settings.optimize_min_equality_disjunction_chain_length).perform();
LogicalExpressionsOptimizer(select_query, settings.optimize_min_equality_disjunction_chain_length.getValue()).perform();
}
/// Creates a dictionary `aliases`: alias -> ASTPtr

View File

@ -251,7 +251,7 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
{
elem.client_info = query_context->getClientInfo();
if (query_context->getSettingsRef().log_profile_events.value != 0)
if (query_context->getSettingsRef().log_profile_events != 0)
{
/// NOTE: Here we are in the same thread, so we can make memcpy()
elem.profile_counters = std::make_shared<ProfileEvents::Counters>(performance_counters.getPartiallyAtomicSnapshot());

View File

@ -199,7 +199,7 @@ BufferPtr StorageKafka::createBuffer()
const Settings & settings = global_context.getSettingsRef();
size_t batch_size = max_block_size;
if (!batch_size)
batch_size = settings.max_block_size.value;
batch_size = settings.max_block_size;
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
return std::make_shared<DelimitedReadBuffer>(
@ -350,7 +350,7 @@ bool StorageKafka::streamToViews()
const Settings & settings = global_context.getSettingsRef();
size_t block_size = max_block_size;
if (block_size == 0)
block_size = settings.max_block_size.value;
block_size = settings.max_block_size;
// Create a stream for each consumer and join them in a union stream
InterpreterInsertQuery interpreter{insert, global_context};
@ -436,7 +436,7 @@ void registerStorageKafka(StorageFactory & factory)
#define CHECK_KAFKA_STORAGE_ARGUMENT(ARG_NUM, PAR_NAME) \
/* One of the four required arguments is not specified */ \
if (args_count < ARG_NUM && ARG_NUM <= 4 && \
!kafka_settings.PAR_NAME.changed) \
!kafka_settings.PAR_NAME.isChanged()) \
{ \
throw Exception( \
"Required parameter '" #PAR_NAME "' " \
@ -445,7 +445,7 @@ void registerStorageKafka(StorageFactory & factory)
} \
/* The same argument is given in two places */ \
if (has_settings && \
kafka_settings.PAR_NAME.changed && \
kafka_settings.PAR_NAME.isChanged() && \
args_count >= ARG_NUM) \
{ \
throw Exception( \
@ -469,7 +469,7 @@ void registerStorageKafka(StorageFactory & factory)
#undef CHECK_KAFKA_STORAGE_ARGUMENT
// Get and check broker list
String brokers = kafka_settings.kafka_broker_list.value;
String brokers = kafka_settings.kafka_broker_list;
if (args_count >= 1)
{
const auto * ast = engine_args[0]->as<ASTLiteral>();
@ -524,7 +524,7 @@ void registerStorageKafka(StorageFactory & factory)
}
// Parse row delimiter (optional)
char row_delimiter = kafka_settings.kafka_row_delimiter.value;
char row_delimiter = kafka_settings.kafka_row_delimiter;
if (args_count >= 5)
{
engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
@ -571,7 +571,7 @@ void registerStorageKafka(StorageFactory & factory)
}
// Parse number of consumers (optional)
UInt64 num_consumers = kafka_settings.kafka_num_consumers.value;
UInt64 num_consumers = kafka_settings.kafka_num_consumers;
if (args_count >= 7)
{
const auto * ast = engine_args[6]->as<ASTLiteral>();
@ -586,7 +586,7 @@ void registerStorageKafka(StorageFactory & factory)
}
// Parse max block size (optional)
UInt64 max_block_size = static_cast<size_t>(kafka_settings.kafka_max_block_size.value);
UInt64 max_block_size = static_cast<size_t>(kafka_settings.kafka_max_block_size);
if (args_count >= 8)
{
const auto * ast = engine_args[7]->as<ASTLiteral>();
@ -601,7 +601,7 @@ void registerStorageKafka(StorageFactory & factory)
}
}
size_t skip_broken = static_cast<size_t>(kafka_settings.kafka_skip_broken_messages.value);
size_t skip_broken = static_cast<size_t>(kafka_settings.kafka_skip_broken_messages);
if (args_count >= 9)
{
const auto * ast = engine_args[8]->as<ASTLiteral>();

View File

@ -100,8 +100,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
std::sort(entries.begin(), entries.end());
String min_saved_record_log_str = entries[
entries.size() > storage.settings.max_replicated_logs_to_keep.value
? entries.size() - storage.settings.max_replicated_logs_to_keep.value
entries.size() > storage.settings.max_replicated_logs_to_keep
? entries.size() - storage.settings.max_replicated_logs_to_keep
: 0];
/// Replicas that were marked is_lost but are active.
@ -203,7 +203,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
min_saved_log_pointer = std::min(min_saved_log_pointer, min_log_pointer_lost_candidate);
/// We will not touch the last `min_replicated_logs_to_keep` records.
entries.erase(entries.end() - std::min<UInt64>(entries.size(), storage.settings.min_replicated_logs_to_keep.value), entries.end());
entries.erase(entries.end() - std::min<UInt64>(entries.size(), storage.settings.min_replicated_logs_to_keep), entries.end());
/// We will not touch records that are no less than `min_saved_log_pointer`.
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end());
@ -299,7 +299,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
/// Virtual node, all nodes that are "greater" than this one will be deleted
NodeWithStat block_threshold{{}, time_threshold};
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage.settings.replicated_deduplication_window.value);
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage.settings.replicated_deduplication_window);
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);

View File

@ -329,7 +329,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context & c
const auto & settings = context.getSettingsRef();
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
if (path.empty() && !owned_cluster && !settings.insert_distributed_sync.value)
if (path.empty() && !owned_cluster && !settings.insert_distributed_sync)
{
throw Exception("Storage " + getName() + " must has own data directory to enable asynchronous inserts",
ErrorCodes::BAD_ARGUMENTS);

View File

@ -164,8 +164,8 @@ void registerStorageJoin(StorageFactory & factory)
args.database_name,
args.table_name,
key_names,
join_use_nulls.value,
SizeLimits{max_rows_in_join.value, max_bytes_in_join.value, join_overflow_mode.value},
join_use_nulls,
SizeLimits{max_rows_in_join, max_bytes_in_join, join_overflow_mode},
kind,
strictness,
args.columns,