mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge branch 'add-test-for-deadlock-system-tables' of github.com:yandex/ClickHouse into add-test-for-deadlock-system-tables
This commit is contained in:
commit
2b6d27d587
@ -97,7 +97,7 @@ void PerformanceTestInfo::applySettings(XMLConfigurationPtr config)
|
||||
}
|
||||
|
||||
extractSettings(config, "settings", config_settings, settings_to_apply);
|
||||
settings.loadFromChanges(settings_to_apply);
|
||||
settings.applyChanges(settings_to_apply);
|
||||
|
||||
if (settings_contain("average_rows_speed_precision"))
|
||||
TestStats::avg_rows_speed_precision =
|
||||
|
@ -446,7 +446,7 @@ namespace ErrorCodes
|
||||
extern const int VIOLATED_CONSTRAINT = 469;
|
||||
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW = 470;
|
||||
extern const int SETTINGS_ARE_NOT_SUPPORTED = 471;
|
||||
extern const int IMMUTABLE_SETTING = 472;
|
||||
extern const int READONLY_SETTING = 472;
|
||||
extern const int DEADLOCK_AVOIDED = 473;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
|
@ -61,6 +61,7 @@ public:
|
||||
InternalTextLogsQueueWeakPtr logs_queue_ptr;
|
||||
|
||||
std::vector<UInt32> thread_numbers;
|
||||
std::vector<UInt32> os_thread_ids;
|
||||
|
||||
/// The first thread created this thread group
|
||||
UInt32 master_thread_number = 0;
|
||||
|
@ -42,8 +42,7 @@ struct Settings : public SettingsCollection<Settings>
|
||||
* but we are not going to do it, because settings is used everywhere as static struct fields.
|
||||
*/
|
||||
|
||||
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
|
||||
#define LIST_OF_SETTINGS(M, IM) \
|
||||
#define LIST_OF_SETTINGS(M) \
|
||||
M(SettingUInt64, min_compress_block_size, 65536, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
|
||||
M(SettingUInt64, max_compress_block_size, 1048576, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \
|
||||
M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \
|
||||
|
@ -17,11 +17,6 @@ class Field;
|
||||
class ReadBuffer;
|
||||
class WriteBuffer;
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int IMMUTABLE_SETTING;
|
||||
}
|
||||
|
||||
/** One setting for any type.
|
||||
* Stores a value within itself, as well as a flag - whether the value was changed.
|
||||
* This is done so that you can send to the remote servers only changed settings (or explicitly specified in the config) values.
|
||||
@ -317,15 +312,12 @@ private:
|
||||
using DeserializeFunction = void (*)(Derived &, ReadBuffer & buf);
|
||||
using CastValueWithoutApplyingFunction = Field (*)(const Field &);
|
||||
|
||||
|
||||
struct MemberInfo
|
||||
{
|
||||
IsChangedFunction is_changed;
|
||||
StringRef name;
|
||||
StringRef description;
|
||||
/// Can be updated after first load for config/definition.
|
||||
/// Non updatable settings can be `changed`,
|
||||
/// if they were overwritten in config/definition.
|
||||
const bool updateable;
|
||||
GetStringFunction get_string;
|
||||
GetFieldFunction get_field;
|
||||
SetStringFunction set_string;
|
||||
@ -405,7 +397,6 @@ public:
|
||||
const_reference(const const_reference & src) = default;
|
||||
const StringRef & getName() const { return member->name; }
|
||||
const StringRef & getDescription() const { return member->description; }
|
||||
bool isUpdateable() const { return member->updateable; }
|
||||
bool isChanged() const { return member->isChanged(*collection); }
|
||||
Field getValue() const { return member->get_field(*collection); }
|
||||
String getValueAsString() const { return member->get_string(*collection); }
|
||||
@ -425,18 +416,6 @@ public:
|
||||
reference(const const_reference & src) : const_reference(src) {}
|
||||
void setValue(const Field & value) { this->member->set_field(*const_cast<Derived *>(this->collection), value); }
|
||||
void setValue(const String & value) { this->member->set_string(*const_cast<Derived *>(this->collection), value); }
|
||||
void updateValue(const Field & value)
|
||||
{
|
||||
if (!this->member->updateable)
|
||||
throw Exception("Setting '" + this->member->name.toString() + "' is restricted for updates.", ErrorCodes::IMMUTABLE_SETTING);
|
||||
setValue(value);
|
||||
}
|
||||
void updateValue(const String & value)
|
||||
{
|
||||
if (!this->member->updateable)
|
||||
throw Exception("Setting '" + this->member->name.toString() + "' is restricted for updates.", ErrorCodes::IMMUTABLE_SETTING);
|
||||
setValue(value);
|
||||
}
|
||||
};
|
||||
|
||||
/// Iterator to iterating through all the settings.
|
||||
@ -519,15 +498,6 @@ public:
|
||||
void set(size_t index, const String & value) { (*this)[index].setValue(value); }
|
||||
void set(const String & name, const String & value) { (*this)[name].setValue(value); }
|
||||
|
||||
/// Updates setting's value. Checks it' mutability.
|
||||
void update(size_t index, const Field & value) { (*this)[index].updateValue(value); }
|
||||
|
||||
void update(const String & name, const Field & value) { (*this)[name].updateValue(value); }
|
||||
|
||||
void update(size_t index, const String & value) { (*this)[index].updateValue(value); }
|
||||
|
||||
void update(const String & name, const String & value) { (*this)[name].updateValue(value); }
|
||||
|
||||
/// Returns value of a setting.
|
||||
Field get(size_t index) const { return (*this)[index].getValue(); }
|
||||
Field get(const String & name) const { return (*this)[name].getValue(); }
|
||||
@ -591,35 +561,19 @@ public:
|
||||
return found_changes;
|
||||
}
|
||||
|
||||
/// Applies change to the settings. Doesn't check settings mutability.
|
||||
void loadFromChange(const SettingChange & change)
|
||||
/// Applies change to concrete setting.
|
||||
void applyChange(const SettingChange & change)
|
||||
{
|
||||
set(change.name, change.value);
|
||||
}
|
||||
|
||||
/// Applies changes to the settings. Should be used in initial settings loading.
|
||||
/// (on table creation or loading from config)
|
||||
void loadFromChanges(const SettingsChanges & changes)
|
||||
/// Applies changes to the settings.
|
||||
void applyChanges(const SettingsChanges & changes)
|
||||
{
|
||||
for (const SettingChange & change : changes)
|
||||
loadFromChange(change);
|
||||
applyChange(change);
|
||||
}
|
||||
|
||||
/// Applies change to the settings, checks settings mutability.
|
||||
void updateFromChange(const SettingChange & change)
|
||||
{
|
||||
update(change.name, change.value);
|
||||
}
|
||||
|
||||
/// Applies changes to the settings. Should be used for settigns update.
|
||||
/// (ALTER MODIFY SETTINGS)
|
||||
void updateFromChanges(const SettingsChanges & changes)
|
||||
{
|
||||
for (const SettingChange & change : changes)
|
||||
updateFromChange(change);
|
||||
}
|
||||
|
||||
|
||||
void copyChangesFrom(const Derived & src)
|
||||
{
|
||||
for (const auto & member : members())
|
||||
@ -663,7 +617,7 @@ public:
|
||||
};
|
||||
|
||||
#define DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS_MACRO) \
|
||||
LIST_OF_SETTINGS_MACRO(DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_, DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_)
|
||||
LIST_OF_SETTINGS_MACRO(DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_)
|
||||
|
||||
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION(DERIVED_CLASS_NAME, LIST_OF_SETTINGS_MACRO) \
|
||||
@ -673,9 +627,9 @@ public:
|
||||
using Derived = DERIVED_CLASS_NAME; \
|
||||
struct Functions \
|
||||
{ \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_, IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_) \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_) \
|
||||
}; \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_ADD_MUTABLE_MEMBER_INFO_HELPER_, IMPLEMENT_SETTINGS_COLLECTION_ADD_IMMUTABLE_MEMBER_INFO_HELPER_) \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_ADD_MEMBER_INFO_HELPER_) \
|
||||
}
|
||||
|
||||
|
||||
@ -690,22 +644,14 @@ public:
|
||||
static void NAME##_setField(Derived & collection, const Field & value) { collection.NAME.set(value); } \
|
||||
static void NAME##_serialize(const Derived & collection, WriteBuffer & buf) { collection.NAME.serialize(buf); } \
|
||||
static void NAME##_deserialize(Derived & collection, ReadBuffer & buf) { collection.NAME.deserialize(buf); } \
|
||||
static Field NAME##_castValueWithoutApplying(const Field & value) { TYPE temp{DEFAULT}; temp.set(value); return temp.toField(); }
|
||||
static Field NAME##_castValueWithoutApplying(const Field & value) { TYPE temp{DEFAULT}; temp.set(value); return temp.toField(); } \
|
||||
|
||||
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
|
||||
add({[](const Derived & d) { return d.NAME.changed; }, \
|
||||
StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), true, \
|
||||
StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), \
|
||||
&Functions::NAME##_getString, &Functions::NAME##_getField, \
|
||||
&Functions::NAME##_setString, &Functions::NAME##_setField, \
|
||||
&Functions::NAME##_serialize, &Functions::NAME##_deserialize, \
|
||||
&Functions::NAME##_castValueWithoutApplying });
|
||||
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_IMMUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
|
||||
add({[](const Derived & d) { return d.NAME.changed; }, \
|
||||
StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), false, \
|
||||
&Functions::NAME##_getString, &Functions::NAME##_getField, \
|
||||
&Functions::NAME##_setString, &Functions::NAME##_setField, \
|
||||
&Functions::NAME##_serialize, &Functions::NAME##_deserialize, \
|
||||
&Functions::NAME##_castValueWithoutApplying });
|
||||
}
|
||||
|
@ -1132,7 +1132,7 @@ void Context::updateSettingsChanges(const SettingsChanges & changes)
|
||||
if (change.name == "profile")
|
||||
setProfile(change.value.safeGet<String>());
|
||||
else
|
||||
settings.updateFromChange(change);
|
||||
settings.applyChange(change);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -444,6 +444,7 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
|
||||
{
|
||||
std::lock_guard lock(thread_group->mutex);
|
||||
res.thread_numbers = thread_group->thread_numbers;
|
||||
res.os_thread_ids = thread_group->os_thread_ids;
|
||||
}
|
||||
|
||||
if (get_profile_events)
|
||||
|
@ -66,6 +66,7 @@ struct QueryStatusInfo
|
||||
|
||||
/// Optional fields, filled by request
|
||||
std::vector<UInt32> thread_numbers;
|
||||
std::vector<UInt32> os_thread_ids;
|
||||
std::shared_ptr<ProfileEvents::Counters> profile_counters;
|
||||
std::shared_ptr<Settings> query_settings;
|
||||
};
|
||||
|
@ -78,6 +78,7 @@ Block QueryLogElement::createBlock()
|
||||
{std::make_shared<DataTypeUInt32>(), "revision"},
|
||||
|
||||
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>()), "thread_numbers"},
|
||||
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>()), "os_thread_ids"},
|
||||
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "ProfileEvents.Names"},
|
||||
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()), "ProfileEvents.Values"},
|
||||
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "Settings.Names"},
|
||||
@ -123,6 +124,14 @@ void QueryLogElement::appendToBlock(Block & block) const
|
||||
columns[i++]->insert(threads_array);
|
||||
}
|
||||
|
||||
{
|
||||
Array threads_array;
|
||||
threads_array.reserve(os_thread_ids.size());
|
||||
for (const UInt32 thread_number : os_thread_ids)
|
||||
threads_array.emplace_back(UInt64(thread_number));
|
||||
columns[i++]->insert(threads_array);
|
||||
}
|
||||
|
||||
if (profile_counters)
|
||||
{
|
||||
auto column_names = columns[i++].get();
|
||||
|
@ -60,6 +60,7 @@ struct QueryLogElement
|
||||
ClientInfo client_info;
|
||||
|
||||
std::vector<UInt32> thread_numbers;
|
||||
std::vector<UInt32> os_thread_ids;
|
||||
std::shared_ptr<ProfileEvents::Counters> profile_counters;
|
||||
std::shared_ptr<Settings> query_settings;
|
||||
|
||||
|
@ -61,6 +61,7 @@ void ThreadStatus::initializeQuery()
|
||||
thread_group->memory_tracker.setDescription("(for query)");
|
||||
|
||||
thread_group->thread_numbers.emplace_back(thread_number);
|
||||
thread_group->os_thread_ids.emplace_back(os_thread_id);
|
||||
thread_group->master_thread_number = thread_number;
|
||||
thread_group->master_thread_os_id = os_thread_id;
|
||||
|
||||
@ -99,6 +100,7 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
|
||||
|
||||
/// NOTE: A thread may be attached multiple times if it is reused from a thread pool.
|
||||
thread_group->thread_numbers.emplace_back(thread_number);
|
||||
thread_group->os_thread_ids.emplace_back(os_thread_id);
|
||||
}
|
||||
|
||||
if (query_context)
|
||||
|
@ -400,6 +400,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
}
|
||||
|
||||
elem.thread_numbers = std::move(info.thread_numbers);
|
||||
elem.os_thread_ids = std::move(info.os_thread_ids);
|
||||
elem.profile_counters = std::move(info.profile_counters);
|
||||
|
||||
if (log_queries)
|
||||
@ -437,6 +438,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
|
||||
|
||||
elem.thread_numbers = std::move(info.thread_numbers);
|
||||
elem.os_thread_ids = std::move(info.os_thread_ids);
|
||||
elem.profile_counters = std::move(info.profile_counters);
|
||||
}
|
||||
|
||||
|
@ -543,15 +543,8 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
|
||||
}
|
||||
}
|
||||
else if (command.type == AlterCommand::MODIFY_SETTING)
|
||||
{
|
||||
for (const auto & change : command.settings_changes)
|
||||
{
|
||||
if (!table.hasSetting(change.name))
|
||||
{
|
||||
throw Exception{"Storage '" + table.getName() + "' doesn't have setting '" + change.name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
}
|
||||
}
|
||||
}
|
||||
table.checkSettingCanBeChanged(change.name);
|
||||
}
|
||||
|
||||
/** Existing defaulted columns may require default expression extensions with a type conversion,
|
||||
|
@ -309,11 +309,10 @@ bool IStorage::isVirtualColumn(const String & column_name) const
|
||||
return getColumns().get(column_name).is_virtual;
|
||||
}
|
||||
|
||||
bool IStorage::hasSetting(const String & /* setting_name */) const
|
||||
void IStorage::checkSettingCanBeChanged(const String & /* setting_name */) const
|
||||
{
|
||||
if (!supportsSettings())
|
||||
throw Exception("Storage '" + getName() + "' doesn't support settings.", ErrorCodes::SETTINGS_ARE_NOT_SUPPORTED);
|
||||
return false;
|
||||
}
|
||||
|
||||
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
|
||||
@ -381,16 +380,13 @@ IDatabase::ASTModifier IStorage::getSettingsModifier(const SettingsChanges & new
|
||||
/// Make storage settings unique
|
||||
for (const auto & change : new_changes)
|
||||
{
|
||||
if (hasSetting(change.name))
|
||||
{
|
||||
auto finder = [&change] (const SettingChange & c) { return c.name == change.name; };
|
||||
if (auto it = std::find_if(storage_changes.begin(), storage_changes.end(), finder); it != storage_changes.end())
|
||||
it->value = change.value;
|
||||
else
|
||||
storage_changes.push_back(change);
|
||||
}
|
||||
checkSettingCanBeChanged(change.name);
|
||||
|
||||
auto finder = [&change] (const SettingChange & c) { return c.name == change.name; };
|
||||
if (auto it = std::find_if(storage_changes.begin(), storage_changes.end(), finder); it != storage_changes.end())
|
||||
it->value = change.value;
|
||||
else
|
||||
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + change.name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
storage_changes.push_back(change);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -139,8 +139,8 @@ public: /// thread-unsafe part. lockStructure must be acquired
|
||||
/// If |need_all| is set, then checks that all the columns of the table are in the block.
|
||||
void check(const Block & block, bool need_all = false) const;
|
||||
|
||||
/// Check storage has setting. Exception will be thrown if it doesn't support settings at all.
|
||||
virtual bool hasSetting(const String & setting_name) const;
|
||||
/// Check storage has setting and setting can be modified.
|
||||
virtual void checkSettingCanBeChanged(const String & setting_name) const;
|
||||
|
||||
protected: /// still thread-unsafe part.
|
||||
void setIndices(IndicesDescription indices_);
|
||||
@ -150,7 +150,7 @@ protected: /// still thread-unsafe part.
|
||||
virtual bool isVirtualColumn(const String & column_name) const;
|
||||
|
||||
/// Returns modifier of settings in storage definition
|
||||
virtual IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const;
|
||||
IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const;
|
||||
|
||||
private:
|
||||
ColumnsDescription columns; /// combined real and virtual columns
|
||||
|
@ -22,7 +22,7 @@ void KafkaSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
{
|
||||
try
|
||||
{
|
||||
loadFromChanges(storage_def.settings->changes);
|
||||
applyChanges(storage_def.settings->changes);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
|
@ -15,18 +15,17 @@ struct KafkaSettings : public SettingsCollection<KafkaSettings>
|
||||
{
|
||||
|
||||
|
||||
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
|
||||
#define LIST_OF_KAFKA_SETTINGS(M, IM) \
|
||||
IM(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
|
||||
IM(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
|
||||
IM(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
|
||||
IM(SettingString, kafka_format, "", "The message format for Kafka engine.") \
|
||||
IM(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
|
||||
IM(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
|
||||
IM(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
|
||||
IM(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
|
||||
IM(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
|
||||
IM(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
|
||||
#define LIST_OF_KAFKA_SETTINGS(M) \
|
||||
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
|
||||
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
|
||||
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
|
||||
M(SettingString, kafka_format, "", "The message format for Kafka engine.") \
|
||||
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
|
||||
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
|
||||
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
|
||||
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
|
||||
M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
|
||||
M(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
|
||||
|
||||
DECLARE_SETTINGS_COLLECTION(LIST_OF_KAFKA_SETTINGS)
|
||||
|
||||
|
@ -44,6 +44,8 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
extern const int UNKNOWN_SETTING;
|
||||
extern const int READONLY_SETTING;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -410,14 +412,12 @@ bool StorageKafka::streamToViews()
|
||||
}
|
||||
|
||||
|
||||
bool StorageKafka::hasSetting(const String & setting_name) const
|
||||
void StorageKafka::checkSettingCanBeChanged(const String & setting_name) const
|
||||
{
|
||||
return KafkaSettings::findIndex(setting_name) != KafkaSettings::npos;
|
||||
}
|
||||
if (KafkaSettings::findIndex(setting_name) == KafkaSettings::npos)
|
||||
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting_name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
|
||||
IDatabase::ASTModifier StorageKafka::getSettingsModifier(const SettingsChanges & /* new_changes */) const
|
||||
{
|
||||
throw Exception("Storage '" + getName() + "' doesn't support settings alter", ErrorCodes::UNSUPPORTED_METHOD);
|
||||
throw Exception{"Setting '" + setting_name + "' is readonly for storage '" + getName() + "'", ErrorCodes::READONLY_SETTING};
|
||||
}
|
||||
|
||||
void registerStorageKafka(StorageFactory & factory)
|
||||
|
@ -57,8 +57,7 @@ public:
|
||||
const auto & getSchemaName() const { return schema_name; }
|
||||
const auto & skipBroken() const { return skip_broken; }
|
||||
|
||||
bool hasSetting(const String & setting_name) const override;
|
||||
|
||||
void checkSettingCanBeChanged(const String & setting_name) const override;
|
||||
|
||||
protected:
|
||||
StorageKafka(
|
||||
@ -71,7 +70,6 @@ protected:
|
||||
size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken,
|
||||
bool intermediate_commit_);
|
||||
|
||||
IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const override;
|
||||
private:
|
||||
// Configuration and state
|
||||
String table_name;
|
||||
|
@ -91,6 +91,7 @@ namespace ErrorCodes
|
||||
extern const int INCORRECT_FILE_NAME;
|
||||
extern const int BAD_DATA_PART_NAME;
|
||||
extern const int UNKNOWN_SETTING;
|
||||
extern const int READONLY_SETTING;
|
||||
}
|
||||
|
||||
|
||||
@ -1324,10 +1325,7 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
|
||||
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast, /* only_check = */ true);
|
||||
|
||||
for (const auto & setting : new_changes)
|
||||
{
|
||||
if (!hasSetting(setting.name))
|
||||
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting.name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
}
|
||||
checkSettingCanBeChanged(setting.name);
|
||||
|
||||
/// Check that type conversions are possible.
|
||||
ExpressionActionsPtr unused_expression;
|
||||
@ -1654,14 +1652,18 @@ void MergeTreeData::changeSettings(
|
||||
if (!new_changes.empty())
|
||||
{
|
||||
MergeTreeSettings copy = *getSettings();
|
||||
copy.updateFromChanges(new_changes);
|
||||
copy.applyChanges(new_changes);
|
||||
storage_settings.set(std::make_unique<const MergeTreeSettings>(copy));
|
||||
}
|
||||
}
|
||||
|
||||
bool MergeTreeData::hasSetting(const String & setting_name) const
|
||||
void MergeTreeData::checkSettingCanBeChanged(const String & setting_name) const
|
||||
{
|
||||
return MergeTreeSettings::findIndex(setting_name) != MergeTreeSettings::npos;
|
||||
if (MergeTreeSettings::findIndex(setting_name) == MergeTreeSettings::npos)
|
||||
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting_name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
if (MergeTreeSettings::isReadonlySetting(setting_name))
|
||||
throw Exception{"Setting '" + setting_name + "' is readonly for storage '" + getName() + "'", ErrorCodes::READONLY_SETTING};
|
||||
|
||||
}
|
||||
|
||||
void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part)
|
||||
|
@ -543,7 +543,7 @@ public:
|
||||
TableStructureWriteLockHolder & table_lock_holder);
|
||||
|
||||
/// All MergeTreeData children have settings.
|
||||
bool hasSetting(const String & setting_name) const override;
|
||||
void checkSettingCanBeChanged(const String & setting_name) const override;
|
||||
|
||||
/// Remove columns, that have been markedd as empty after zeroing values with expired ttl
|
||||
void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part);
|
||||
|
@ -46,7 +46,7 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
{
|
||||
try
|
||||
{
|
||||
loadFromChanges(storage_def.settings->changes);
|
||||
applyChanges(storage_def.settings->changes);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Core/Defines.h>
|
||||
#include <Core/SettingsCommon.h>
|
||||
#include <Common/SettingsChanges.h>
|
||||
|
||||
|
||||
namespace Poco
|
||||
@ -24,9 +25,8 @@ class ASTStorage;
|
||||
struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
||||
{
|
||||
|
||||
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
|
||||
#define LIST_OF_MERGE_TREE_SETTINGS(M, IM) \
|
||||
IM(SettingUInt64, index_granularity, 8192, "How many rows correspond to one primary key value.") \
|
||||
#define LIST_OF_MERGE_TREE_SETTINGS(M) \
|
||||
M(SettingUInt64, index_granularity, 8192, "How many rows correspond to one primary key value.") \
|
||||
\
|
||||
/** Merge settings. */ \
|
||||
M(SettingUInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024, "Maximum in total size of parts to merge, when there are maximum free threads in background pool (or entries in replication queue).") \
|
||||
@ -80,7 +80,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
||||
M(SettingBool, use_minimalistic_part_header_in_zookeeper, false, "Store part header (checksums and columns) in a compact format and a single part znode instead of separate znodes (<part>/columns and <part>/checksums). This can dramatically reduce snapshot size in ZooKeeper. Before enabling check that all replicas support new format.") \
|
||||
M(SettingUInt64, finished_mutations_to_keep, 100, "How many records about mutations that are done to keep. If zero, then keep all of them.") \
|
||||
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \
|
||||
IM(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
|
||||
M(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
|
||||
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \
|
||||
M(SettingBool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.") \
|
||||
M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)") \
|
||||
@ -99,6 +99,12 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
||||
|
||||
/// NOTE: will rewrite the AST to add immutable settings.
|
||||
void loadFromQuery(ASTStorage & storage_def);
|
||||
|
||||
/// We check settings after storage creation
|
||||
static bool isReadonlySetting(const String & name)
|
||||
{
|
||||
return name == "index_granularity" || name == "index_granularity_bytes";
|
||||
}
|
||||
};
|
||||
|
||||
using MergeTreeSettingsPtr = std::shared_ptr<const MergeTreeSettings>;
|
||||
|
@ -58,6 +58,7 @@ NamesAndTypesList StorageSystemProcesses::getNamesAndTypes()
|
||||
{"query", std::make_shared<DataTypeString>()},
|
||||
|
||||
{"thread_numbers", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>())},
|
||||
{"os_thread_ids", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>())},
|
||||
{"ProfileEvents.Names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
|
||||
{"ProfileEvents.Values", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
|
||||
{"Settings.Names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
|
||||
@ -120,6 +121,14 @@ void StorageSystemProcesses::fillData(MutableColumns & res_columns, const Contex
|
||||
res_columns[i++]->insert(threads_array);
|
||||
}
|
||||
|
||||
{
|
||||
Array threads_array;
|
||||
threads_array.reserve(process.os_thread_ids.size());
|
||||
for (const UInt32 thread_number : process.os_thread_ids)
|
||||
threads_array.emplace_back(thread_number);
|
||||
res_columns[i++]->insert(threads_array);
|
||||
}
|
||||
|
||||
{
|
||||
IColumn * column_profile_events_names = res_columns[i++].get();
|
||||
IColumn * column_profile_events_values = res_columns[i++].get();
|
||||
|
Loading…
Reference in New Issue
Block a user