mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge branch 'master' into merge-constraints
This commit is contained in:
commit
60d660e263
@ -97,7 +97,7 @@ void PerformanceTestInfo::applySettings(XMLConfigurationPtr config)
|
||||
}
|
||||
|
||||
extractSettings(config, "settings", config_settings, settings_to_apply);
|
||||
settings.applyChanges(settings_to_apply);
|
||||
settings.loadFromChanges(settings_to_apply);
|
||||
|
||||
if (settings_contain("average_rows_speed_precision"))
|
||||
TestStats::avg_rows_speed_precision =
|
||||
|
@ -182,11 +182,11 @@ void TCPHandler::runImpl()
|
||||
/// Should we send internal logs to client?
|
||||
const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
|
||||
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
|
||||
&& client_logs_level.value != LogsLevel::none)
|
||||
&& client_logs_level != LogsLevel::none)
|
||||
{
|
||||
state.logs_queue = std::make_shared<InternalTextLogsQueue>();
|
||||
state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
|
||||
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level.value);
|
||||
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level);
|
||||
}
|
||||
|
||||
query_context->setExternalTablesInitializer([&connection_settings, this] (Context & context)
|
||||
@ -329,7 +329,7 @@ void TCPHandler::readData(const Settings & connection_settings)
|
||||
const auto receive_timeout = query_context->getSettingsRef().receive_timeout.value;
|
||||
|
||||
/// Poll interval should not be greater than receive_timeout
|
||||
const size_t default_poll_interval = connection_settings.poll_interval.value * 1000000;
|
||||
const size_t default_poll_interval = connection_settings.poll_interval * 1000000;
|
||||
size_t current_poll_interval = static_cast<size_t>(receive_timeout.totalMicroseconds());
|
||||
constexpr size_t min_poll_interval = 5000; // 5 ms
|
||||
size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval));
|
||||
|
@ -445,6 +445,8 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_PTHREAD_ATTR = 468;
|
||||
extern const int VIOLATED_CONSTRAINT = 469;
|
||||
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW = 470;
|
||||
extern const int SETTINGS_ARE_NOT_SUPPORTED = 471;
|
||||
extern const int IMMUTABLE_SETTING = 472;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
extern const int POCO_EXCEPTION = 1000;
|
||||
|
@ -42,7 +42,8 @@ struct Settings : public SettingsCollection<Settings>
|
||||
* but we are not going to do it, because settings is used everywhere as static struct fields.
|
||||
*/
|
||||
|
||||
#define LIST_OF_SETTINGS(M) \
|
||||
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
|
||||
#define LIST_OF_SETTINGS(M, IM) \
|
||||
M(SettingUInt64, min_compress_block_size, 65536, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
|
||||
M(SettingUInt64, max_compress_block_size, 1048576, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \
|
||||
M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \
|
||||
|
@ -17,6 +17,10 @@ class Field;
|
||||
class ReadBuffer;
|
||||
class WriteBuffer;
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int IMMUTABLE_SETTING;
|
||||
}
|
||||
|
||||
/** One setting for any type.
|
||||
* Stores a value within itself, as well as a flag - whether the value was changed.
|
||||
@ -304,6 +308,7 @@ private:
|
||||
Derived & castToDerived() { return *static_cast<Derived *>(this); }
|
||||
const Derived & castToDerived() const { return *static_cast<const Derived *>(this); }
|
||||
|
||||
using IsChangedFunction = bool (*)(const Derived &);
|
||||
using GetStringFunction = String (*)(const Derived &);
|
||||
using GetFieldFunction = Field (*)(const Derived &);
|
||||
using SetStringFunction = void (*)(Derived &, const String &);
|
||||
@ -314,9 +319,13 @@ private:
|
||||
|
||||
struct MemberInfo
|
||||
{
|
||||
size_t offset_of_changed;
|
||||
IsChangedFunction is_changed;
|
||||
StringRef name;
|
||||
StringRef description;
|
||||
/// Can be updated after first load for config/definition.
|
||||
/// Non updatable settings can be `changed`,
|
||||
/// if they were overwritten in config/definition.
|
||||
const bool updateable;
|
||||
GetStringFunction get_string;
|
||||
GetFieldFunction get_field;
|
||||
SetStringFunction set_string;
|
||||
@ -325,7 +334,7 @@ private:
|
||||
DeserializeFunction deserialize;
|
||||
CastValueWithoutApplyingFunction cast_value_without_applying;
|
||||
|
||||
bool isChanged(const Derived & collection) const { return *reinterpret_cast<const bool*>(reinterpret_cast<const UInt8*>(&collection) + offset_of_changed); }
|
||||
bool isChanged(const Derived & collection) const { return is_changed(collection); }
|
||||
};
|
||||
|
||||
class MemberInfos
|
||||
@ -396,6 +405,7 @@ public:
|
||||
const_reference(const const_reference & src) = default;
|
||||
const StringRef & getName() const { return member->name; }
|
||||
const StringRef & getDescription() const { return member->description; }
|
||||
bool isUpdateable() const { return member->updateable; }
|
||||
bool isChanged() const { return member->isChanged(*collection); }
|
||||
Field getValue() const { return member->get_field(*collection); }
|
||||
String getValueAsString() const { return member->get_string(*collection); }
|
||||
@ -415,6 +425,18 @@ public:
|
||||
reference(const const_reference & src) : const_reference(src) {}
|
||||
void setValue(const Field & value) { this->member->set_field(*const_cast<Derived *>(this->collection), value); }
|
||||
void setValue(const String & value) { this->member->set_string(*const_cast<Derived *>(this->collection), value); }
|
||||
void updateValue(const Field & value)
|
||||
{
|
||||
if (!this->member->updateable)
|
||||
throw Exception("Setting '" + this->member->name.toString() + "' is restricted for updates.", ErrorCodes::IMMUTABLE_SETTING);
|
||||
setValue(value);
|
||||
}
|
||||
void updateValue(const String & value)
|
||||
{
|
||||
if (!this->member->updateable)
|
||||
throw Exception("Setting '" + this->member->name.toString() + "' is restricted for updates.", ErrorCodes::IMMUTABLE_SETTING);
|
||||
setValue(value);
|
||||
}
|
||||
};
|
||||
|
||||
/// Iterator to iterating through all the settings.
|
||||
@ -497,6 +519,15 @@ public:
|
||||
void set(size_t index, const String & value) { (*this)[index].setValue(value); }
|
||||
void set(const String & name, const String & value) { (*this)[name].setValue(value); }
|
||||
|
||||
/// Updates setting's value. Checks it' mutability.
|
||||
void update(size_t index, const Field & value) { (*this)[index].updateValue(value); }
|
||||
|
||||
void update(const String & name, const Field & value) { (*this)[name].updateValue(value); }
|
||||
|
||||
void update(size_t index, const String & value) { (*this)[index].updateValue(value); }
|
||||
|
||||
void update(const String & name, const String & value) { (*this)[name].updateValue(value); }
|
||||
|
||||
/// Returns value of a setting.
|
||||
Field get(size_t index) const { return (*this)[index].getValue(); }
|
||||
Field get(const String & name) const { return (*this)[name].getValue(); }
|
||||
@ -560,18 +591,35 @@ public:
|
||||
return found_changes;
|
||||
}
|
||||
|
||||
/// Applies changes to the settings.
|
||||
void applyChange(const SettingChange & change)
|
||||
/// Applies change to the settings. Doesn't check settings mutability.
|
||||
void loadFromChange(const SettingChange & change)
|
||||
{
|
||||
set(change.name, change.value);
|
||||
}
|
||||
|
||||
void applyChanges(const SettingsChanges & changes)
|
||||
/// Applies changes to the settings. Should be used in initial settings loading.
|
||||
/// (on table creation or loading from config)
|
||||
void loadFromChanges(const SettingsChanges & changes)
|
||||
{
|
||||
for (const SettingChange & change : changes)
|
||||
applyChange(change);
|
||||
loadFromChange(change);
|
||||
}
|
||||
|
||||
/// Applies change to the settings, checks settings mutability.
|
||||
void updateFromChange(const SettingChange & change)
|
||||
{
|
||||
update(change.name, change.value);
|
||||
}
|
||||
|
||||
/// Applies changes to the settings. Should be used for settigns update.
|
||||
/// (ALTER MODIFY SETTINGS)
|
||||
void updateFromChanges(const SettingsChanges & changes)
|
||||
{
|
||||
for (const SettingChange & change : changes)
|
||||
updateFromChange(change);
|
||||
}
|
||||
|
||||
|
||||
void copyChangesFrom(const Derived & src)
|
||||
{
|
||||
for (const auto & member : members())
|
||||
@ -615,7 +663,7 @@ public:
|
||||
};
|
||||
|
||||
#define DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS_MACRO) \
|
||||
LIST_OF_SETTINGS_MACRO(DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_)
|
||||
LIST_OF_SETTINGS_MACRO(DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_, DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_)
|
||||
|
||||
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION(DERIVED_CLASS_NAME, LIST_OF_SETTINGS_MACRO) \
|
||||
@ -625,9 +673,9 @@ public:
|
||||
using Derived = DERIVED_CLASS_NAME; \
|
||||
struct Functions \
|
||||
{ \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_) \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_, IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_) \
|
||||
}; \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_ADD_MEMBER_INFO_HELPER_) \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_ADD_MUTABLE_MEMBER_INFO_HELPER_, IMPLEMENT_SETTINGS_COLLECTION_ADD_IMMUTABLE_MEMBER_INFO_HELPER_) \
|
||||
}
|
||||
|
||||
|
||||
@ -645,13 +693,19 @@ public:
|
||||
static Field NAME##_castValueWithoutApplying(const Field & value) { TYPE temp{DEFAULT}; temp.set(value); return temp.toField(); }
|
||||
|
||||
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
|
||||
static_assert(std::is_same_v<decltype(std::declval<Derived>().NAME.changed), bool>); \
|
||||
add({offsetof(Derived, NAME.changed), \
|
||||
StringRef(#NAME, strlen(#NAME)), StringRef(#DESCRIPTION, strlen(#DESCRIPTION)), \
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
|
||||
add({[](const Derived & d) { return d.NAME.changed; }, \
|
||||
StringRef(#NAME, strlen(#NAME)), StringRef(#DESCRIPTION, strlen(#DESCRIPTION)), true, \
|
||||
&Functions::NAME##_getString, &Functions::NAME##_getField, \
|
||||
&Functions::NAME##_setString, &Functions::NAME##_setField, \
|
||||
&Functions::NAME##_serialize, &Functions::NAME##_deserialize, \
|
||||
&Functions::NAME##_castValueWithoutApplying });
|
||||
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_IMMUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
|
||||
add({[](const Derived & d) { return d.NAME.changed; }, \
|
||||
StringRef(#NAME, strlen(#NAME)), StringRef(#DESCRIPTION, strlen(#DESCRIPTION)), false, \
|
||||
&Functions::NAME##_getString, &Functions::NAME##_getField, \
|
||||
&Functions::NAME##_setString, &Functions::NAME##_setField, \
|
||||
&Functions::NAME##_serialize, &Functions::NAME##_deserialize, \
|
||||
&Functions::NAME##_castValueWithoutApplying });
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
|
||||
/// Set as unchanged to avoid sending to remote server.
|
||||
new_settings.max_concurrent_queries_for_user.changed = false;
|
||||
new_settings.max_memory_usage_for_user.changed = false;
|
||||
new_settings.max_memory_usage_for_all_queries.changed = false;
|
||||
new_settings.max_memory_usage_for_all_queries = false;
|
||||
|
||||
Context new_context(context);
|
||||
new_context.setSettings(new_settings);
|
||||
|
@ -142,7 +142,7 @@ struct ContextShared
|
||||
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
|
||||
/// Rules for selecting the compression settings, depending on the size of the part.
|
||||
mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector;
|
||||
std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
|
||||
MergeTreeSettingsPtr merge_tree_settings; /// Settings of MergeTree* engines.
|
||||
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
|
||||
size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default)
|
||||
String format_schema_path; /// Path to a directory that contains schema files used by input formats.
|
||||
@ -1124,6 +1124,17 @@ void Context::applySettingsChanges(const SettingsChanges & changes)
|
||||
applySettingChange(change);
|
||||
}
|
||||
|
||||
void Context::updateSettingsChanges(const SettingsChanges & changes)
|
||||
{
|
||||
auto lock = getLock();
|
||||
for (const SettingChange & change : changes)
|
||||
{
|
||||
if (change.name == "profile")
|
||||
setProfile(change.value.safeGet<String>());
|
||||
else
|
||||
settings.updateFromChange(change);
|
||||
}
|
||||
}
|
||||
|
||||
void Context::checkSettingsConstraints(const SettingChange & change)
|
||||
{
|
||||
@ -1748,8 +1759,9 @@ const MergeTreeSettings & Context::getMergeTreeSettings() const
|
||||
if (!shared->merge_tree_settings)
|
||||
{
|
||||
auto & config = getConfigRef();
|
||||
shared->merge_tree_settings.emplace();
|
||||
shared->merge_tree_settings->loadFromConfig("merge_tree", config);
|
||||
MutableMergeTreeSettingsPtr settings_ptr = MergeTreeSettings::create();
|
||||
settings_ptr->loadFromConfig("merge_tree", config);
|
||||
shared->merge_tree_settings = std::move(settings_ptr);
|
||||
}
|
||||
|
||||
return *shared->merge_tree_settings;
|
||||
|
@ -287,6 +287,9 @@ public:
|
||||
void applySettingChange(const SettingChange & change);
|
||||
void applySettingsChanges(const SettingsChanges & changes);
|
||||
|
||||
/// Update checking that each setting is updatable
|
||||
void updateSettingsChanges(const SettingsChanges & changes);
|
||||
|
||||
/// Checks the constraints.
|
||||
void checkSettingsConstraints(const SettingChange & change);
|
||||
void checkSettingsConstraints(const SettingsChanges & changes);
|
||||
|
@ -10,7 +10,7 @@ BlockIO InterpreterSetQuery::execute()
|
||||
{
|
||||
const auto & ast = query_ptr->as<ASTSetQuery &>();
|
||||
context.checkSettingsConstraints(ast.changes);
|
||||
context.getSessionContext().applySettingsChanges(ast.changes);
|
||||
context.getSessionContext().updateSettingsChanges(ast.changes);
|
||||
return {};
|
||||
}
|
||||
|
||||
@ -19,7 +19,7 @@ void InterpreterSetQuery::executeForCurrentContext()
|
||||
{
|
||||
const auto & ast = query_ptr->as<ASTSetQuery &>();
|
||||
context.checkSettingsConstraints(ast.changes);
|
||||
context.applySettingsChanges(ast.changes);
|
||||
context.updateSettingsChanges(ast.changes);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -30,8 +30,8 @@ bool LogicalExpressionsOptimizer::OrWithExpression::operator<(const OrWithExpres
|
||||
return std::tie(this->or_function, this->expression) < std::tie(rhs.or_function, rhs.expression);
|
||||
}
|
||||
|
||||
LogicalExpressionsOptimizer::LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, ExtractedSettings && settings_)
|
||||
: select_query(select_query_), settings(settings_)
|
||||
LogicalExpressionsOptimizer::LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, UInt64 optimize_min_equality_disjunction_chain_length)
|
||||
: select_query(select_query_), settings(optimize_min_equality_disjunction_chain_length)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -36,7 +36,7 @@ class LogicalExpressionsOptimizer final
|
||||
|
||||
public:
|
||||
/// Constructor. Accepts the root of the query DAG.
|
||||
LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, ExtractedSettings && settings_);
|
||||
LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, UInt64 optimize_min_equality_disjunction_chain_length);
|
||||
|
||||
/** Replace all rather long homogeneous OR-chains expr = x1 OR ... OR expr = xN
|
||||
* on the expressions `expr` IN (x1, ..., xN).
|
||||
|
@ -251,7 +251,7 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
|
||||
{
|
||||
elem.client_info = query_context->getClientInfo();
|
||||
|
||||
if (query_context->getSettingsRef().log_profile_events.value != 0)
|
||||
if (query_context->getSettingsRef().log_profile_events != 0)
|
||||
{
|
||||
/// NOTE: Here we are in the same thread, so we can make memcpy()
|
||||
elem.profile_counters = std::make_shared<ProfileEvents::Counters>(performance_counters.getPartiallyAtomicSnapshot());
|
||||
|
@ -45,6 +45,11 @@ ASTPtr ASTAlterCommand::clone() const
|
||||
res->ttl = ttl->clone();
|
||||
res->children.push_back(res->ttl);
|
||||
}
|
||||
if (settings_changes)
|
||||
{
|
||||
res->settings_changes = settings_changes->clone();
|
||||
res->children.push_back(res->settings_changes);
|
||||
}
|
||||
if (values)
|
||||
{
|
||||
res->values = values->clone();
|
||||
@ -222,6 +227,11 @@ void ASTAlterCommand::formatImpl(
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY TTL " << (settings.hilite ? hilite_none : "");
|
||||
ttl->formatImpl(settings, state, frame);
|
||||
}
|
||||
else if (type == ASTAlterCommand::MODIFY_SETTING)
|
||||
{
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY SETTING " << (settings.hilite ? hilite_none : "");
|
||||
settings_changes->formatImpl(settings, state, frame);
|
||||
}
|
||||
else if (type == ASTAlterCommand::LIVE_VIEW_REFRESH)
|
||||
{
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "REFRESH " << (settings.hilite ? hilite_none : "");
|
||||
|
@ -30,6 +30,7 @@ public:
|
||||
COMMENT_COLUMN,
|
||||
MODIFY_ORDER_BY,
|
||||
MODIFY_TTL,
|
||||
MODIFY_SETTING,
|
||||
|
||||
ADD_INDEX,
|
||||
DROP_INDEX,
|
||||
@ -107,6 +108,9 @@ public:
|
||||
/// For MODIFY TTL query
|
||||
ASTPtr ttl;
|
||||
|
||||
/// FOR MODIFY_SETTING
|
||||
ASTPtr settings_changes;
|
||||
|
||||
/** In ALTER CHANNEL, ADD, DROP, SUSPEND, RESUME, REFRESH, MODIFY queries, the list of live views is stored here
|
||||
*/
|
||||
ASTPtr values;
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Parsers/ExpressionListParsers.h>
|
||||
#include <Parsers/ParserCreateQuery.h>
|
||||
#include <Parsers/ParserPartition.h>
|
||||
#include <Parsers/ParserSetQuery.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTIndexDeclaration.h>
|
||||
#include <Parsers/ASTAlterQuery.h>
|
||||
@ -28,6 +29,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
|
||||
ParserKeyword s_comment_column("COMMENT COLUMN");
|
||||
ParserKeyword s_modify_order_by("MODIFY ORDER BY");
|
||||
ParserKeyword s_modify_ttl("MODIFY TTL");
|
||||
ParserKeyword s_modify_setting("MODIFY SETTING");
|
||||
|
||||
ParserKeyword s_add_index("ADD INDEX");
|
||||
ParserKeyword s_drop_index("DROP INDEX");
|
||||
@ -78,6 +80,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
|
||||
ParserList parser_assignment_list(
|
||||
std::make_unique<ParserAssignment>(), std::make_unique<ParserToken>(TokenType::Comma),
|
||||
/* allow_empty = */ false);
|
||||
ParserSetQuery parser_settings(true);
|
||||
ParserNameList values_p;
|
||||
|
||||
if (is_live_view)
|
||||
@ -386,8 +389,15 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
|
||||
return false;
|
||||
command->type = ASTAlterCommand::MODIFY_TTL;
|
||||
}
|
||||
else if (s_modify_setting.ignore(pos, expected))
|
||||
{
|
||||
if (!parser_settings.parse(pos, command->settings_changes, expected))
|
||||
return false;
|
||||
command->type = ASTAlterCommand::MODIFY_SETTING;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
if (command->col_decl)
|
||||
@ -408,6 +418,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
|
||||
command->children.push_back(command->comment);
|
||||
if (command->ttl)
|
||||
command->children.push_back(command->ttl);
|
||||
if (command->settings_changes)
|
||||
command->children.push_back(command->settings_changes);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ namespace DB
|
||||
* [CLEAR COLUMN [IF EXISTS] col_to_clear [IN PARTITION partition],]
|
||||
* [MODIFY COLUMN [IF EXISTS] col_to_modify type, ...]
|
||||
* [MODIFY PRIMARY KEY (a, b, c...)]
|
||||
* [MODIFY SETTING setting_name=setting_value, ...]
|
||||
* [COMMENT COLUMN [IF EXISTS] col_name string]
|
||||
* [DROP|DETACH|ATTACH PARTITION|PART partition, ...]
|
||||
* [FETCH PARTITION partition FROM ...]
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTAlterQuery.h>
|
||||
#include <Parsers/ASTColumnDeclaration.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Compression/CompressionFactory.h>
|
||||
|
||||
@ -29,6 +30,7 @@ namespace ErrorCodes
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNKNOWN_SETTING;
|
||||
}
|
||||
|
||||
|
||||
@ -199,14 +201,21 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
|
||||
command.ttl = command_ast->ttl;
|
||||
return command;
|
||||
}
|
||||
else if (command_ast->type == ASTAlterCommand::MODIFY_SETTING)
|
||||
{
|
||||
AlterCommand command;
|
||||
command.type = AlterCommand::MODIFY_SETTING;
|
||||
command.settings_changes = command_ast->settings_changes->as<ASTSetQuery &>().changes;
|
||||
return command;
|
||||
}
|
||||
else
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescription & indices_description,
|
||||
ConstraintsDescription & constraints_description,
|
||||
ASTPtr & order_by_ast, ASTPtr & primary_key_ast, ASTPtr & ttl_table_ast) const
|
||||
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast, ASTPtr & primary_key_ast,
|
||||
ASTPtr & ttl_table_ast, SettingsChanges & changes) const
|
||||
{
|
||||
if (type == ADD_COLUMN)
|
||||
{
|
||||
@ -373,23 +382,31 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
|
||||
{
|
||||
ttl_table_ast = ttl;
|
||||
}
|
||||
else if (type == MODIFY_SETTING)
|
||||
{
|
||||
changes.insert(changes.end(), settings_changes.begin(), settings_changes.end());
|
||||
}
|
||||
else
|
||||
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
bool AlterCommand::isMutable() const
|
||||
{
|
||||
if (type == COMMENT_COLUMN)
|
||||
if (type == COMMENT_COLUMN || type == MODIFY_SETTING)
|
||||
return false;
|
||||
if (type == MODIFY_COLUMN)
|
||||
return data_type.get() || default_expression;
|
||||
// TODO: возможно, здесь нужно дополнить
|
||||
return true;
|
||||
}
|
||||
|
||||
bool AlterCommand::isSettingsAlter() const
|
||||
{
|
||||
return type == MODIFY_SETTING;
|
||||
}
|
||||
|
||||
void AlterCommands::apply(ColumnsDescription & columns_description, IndicesDescription & indices_description,
|
||||
ConstraintsDescription & constraints_description,
|
||||
ASTPtr & order_by_ast, ASTPtr & primary_key_ast, ASTPtr & ttl_table_ast) const
|
||||
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast, ASTPtr & primary_key_ast,
|
||||
ASTPtr & ttl_table_ast, SettingsChanges & changes) const
|
||||
{
|
||||
auto new_columns_description = columns_description;
|
||||
auto new_indices_description = indices_description;
|
||||
@ -397,10 +414,11 @@ void AlterCommands::apply(ColumnsDescription & columns_description, IndicesDescr
|
||||
auto new_order_by_ast = order_by_ast;
|
||||
auto new_primary_key_ast = primary_key_ast;
|
||||
auto new_ttl_table_ast = ttl_table_ast;
|
||||
auto new_changes = changes;
|
||||
|
||||
for (const AlterCommand & command : *this)
|
||||
if (!command.ignore)
|
||||
command.apply(new_columns_description, new_indices_description, new_constraints_description, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast);
|
||||
command.apply(new_columns_description, new_indices_description, new_constraints_description, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
|
||||
|
||||
columns_description = std::move(new_columns_description);
|
||||
indices_description = std::move(new_indices_description);
|
||||
@ -408,6 +426,7 @@ void AlterCommands::apply(ColumnsDescription & columns_description, IndicesDescr
|
||||
order_by_ast = std::move(new_order_by_ast);
|
||||
primary_key_ast = std::move(new_primary_key_ast);
|
||||
ttl_table_ast = std::move(new_ttl_table_ast);
|
||||
changes = std::move(new_changes);
|
||||
}
|
||||
|
||||
void AlterCommands::validate(const IStorage & table, const Context & context)
|
||||
@ -523,6 +542,16 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
|
||||
throw Exception{"Wrong column name. Cannot find column " + command.column_name + " to comment", ErrorCodes::ILLEGAL_COLUMN};
|
||||
}
|
||||
}
|
||||
else if (command.type == AlterCommand::MODIFY_SETTING)
|
||||
{
|
||||
for (const auto & change : command.settings_changes)
|
||||
{
|
||||
if (!table.hasSetting(change.name))
|
||||
{
|
||||
throw Exception{"Storage '" + table.getName() + "' doesn't have setting '" + change.name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Existing defaulted columns may require default expression extensions with a type conversion,
|
||||
@ -588,7 +617,7 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
|
||||
}
|
||||
}
|
||||
|
||||
void AlterCommands::apply(ColumnsDescription & columns_description) const
|
||||
void AlterCommands::applyForColumnsOnly(ColumnsDescription & columns_description) const
|
||||
{
|
||||
auto out_columns_description = columns_description;
|
||||
IndicesDescription indices_description;
|
||||
@ -596,7 +625,9 @@ void AlterCommands::apply(ColumnsDescription & columns_description) const
|
||||
ASTPtr out_order_by;
|
||||
ASTPtr out_primary_key;
|
||||
ASTPtr out_ttl_table;
|
||||
apply(out_columns_description, indices_description, constraints_description, out_order_by, out_primary_key, out_ttl_table);
|
||||
SettingsChanges out_changes;
|
||||
apply(out_columns_description, indices_description, constraints_description,
|
||||
out_order_by, out_primary_key, out_ttl_table, out_changes);
|
||||
|
||||
if (out_order_by)
|
||||
throw Exception("Storage doesn't support modifying ORDER BY expression", ErrorCodes::NOT_IMPLEMENTED);
|
||||
@ -608,10 +639,40 @@ void AlterCommands::apply(ColumnsDescription & columns_description) const
|
||||
throw Exception("Storage doesn't support modifying constraints", ErrorCodes::NOT_IMPLEMENTED);
|
||||
if (out_ttl_table)
|
||||
throw Exception("Storage doesn't support modifying TTL expression", ErrorCodes::NOT_IMPLEMENTED);
|
||||
if (!out_changes.empty())
|
||||
throw Exception("Storage doesn't support modifying settings", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
|
||||
columns_description = std::move(out_columns_description);
|
||||
}
|
||||
|
||||
|
||||
void AlterCommands::applyForSettingsOnly(SettingsChanges & changes) const
|
||||
{
|
||||
ColumnsDescription out_columns_description;
|
||||
IndicesDescription indices_description;
|
||||
ConstraintsDescription constraints_description;
|
||||
ASTPtr out_order_by;
|
||||
ASTPtr out_primary_key;
|
||||
ASTPtr out_ttl_table;
|
||||
SettingsChanges out_changes;
|
||||
apply(out_columns_description, indices_description, constraints_description, out_order_by,
|
||||
out_primary_key, out_ttl_table, out_changes);
|
||||
|
||||
if (out_columns_description.begin() != out_columns_description.end())
|
||||
throw Exception("Alter modifying columns, but only settings change applied.", ErrorCodes::LOGICAL_ERROR);
|
||||
if (out_order_by)
|
||||
throw Exception("Alter modifying ORDER BY expression, but only settings change applied.", ErrorCodes::LOGICAL_ERROR);
|
||||
if (out_primary_key)
|
||||
throw Exception("Alter modifying PRIMARY KEY expression, but only settings change applied.", ErrorCodes::LOGICAL_ERROR);
|
||||
if (!indices_description.indices.empty())
|
||||
throw Exception("Alter modifying indices, but only settings change applied.", ErrorCodes::NOT_IMPLEMENTED);
|
||||
if (out_ttl_table)
|
||||
throw Exception("Alter modifying TTL, but only settings change applied.", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
changes = std::move(out_changes);
|
||||
}
|
||||
|
||||
bool AlterCommands::isMutable() const
|
||||
{
|
||||
for (const auto & param : *this)
|
||||
@ -623,4 +684,8 @@ bool AlterCommands::isMutable() const
|
||||
return false;
|
||||
}
|
||||
|
||||
bool AlterCommands::isSettingsAlter() const
|
||||
{
|
||||
return std::all_of(begin(), end(), [](const AlterCommand & c) { return c.isSettingsAlter(); });
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,8 @@
|
||||
#include <Storages/IndicesDescription.h>
|
||||
#include <Storages/ConstraintsDescription.h>
|
||||
|
||||
#include <Common/SettingsChanges.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -30,6 +32,7 @@ struct AlterCommand
|
||||
DROP_CONSTRAINT,
|
||||
MODIFY_TTL,
|
||||
UKNOWN_TYPE,
|
||||
MODIFY_SETTING,
|
||||
};
|
||||
|
||||
Type type = UKNOWN_TYPE;
|
||||
@ -80,6 +83,9 @@ struct AlterCommand
|
||||
/// For ADD and MODIFY
|
||||
CompressionCodecPtr codec;
|
||||
|
||||
/// For MODIFY SETTING
|
||||
SettingsChanges settings_changes;
|
||||
|
||||
AlterCommand() = default;
|
||||
AlterCommand(const Type type_, const String & column_name_, const DataTypePtr & data_type_,
|
||||
const ColumnDefaultKind default_kind_, const ASTPtr & default_expression_,
|
||||
@ -93,11 +99,14 @@ struct AlterCommand
|
||||
static std::optional<AlterCommand> parse(const ASTAlterCommand * command);
|
||||
|
||||
void apply(ColumnsDescription & columns_description, IndicesDescription & indices_description,
|
||||
ConstraintsDescription & constraints_description,
|
||||
ASTPtr & order_by_ast, ASTPtr & primary_key_ast, ASTPtr & ttl_table_ast) const;
|
||||
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast,
|
||||
ASTPtr & primary_key_ast, ASTPtr & ttl_table_ast, SettingsChanges & changes) const;
|
||||
|
||||
/// Checks that not only metadata touched by that command
|
||||
bool isMutable() const;
|
||||
|
||||
/// checks that only settings changed by alter
|
||||
bool isSettingsAlter() const;
|
||||
};
|
||||
|
||||
class Context;
|
||||
@ -105,15 +114,18 @@ class Context;
|
||||
class AlterCommands : public std::vector<AlterCommand>
|
||||
{
|
||||
public:
|
||||
/// Used for primitive table engines, where only columns metadata can be changed
|
||||
void applyForColumnsOnly(ColumnsDescription & columns_description) const;
|
||||
void apply(ColumnsDescription & columns_description, IndicesDescription & indices_description,
|
||||
ConstraintsDescription & constraints_description,
|
||||
ASTPtr & order_by_ast, ASTPtr & primary_key_ast, ASTPtr & ttl_table_ast) const;
|
||||
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast, ASTPtr & primary_key_ast,
|
||||
ASTPtr & ttl_table_ast, SettingsChanges & changes) const;
|
||||
|
||||
/// For storages that don't support MODIFY_ORDER_BY.
|
||||
void apply(ColumnsDescription & columns_description) const;
|
||||
/// Apply alter commands only for settings. Exception will be thrown if any other part of table structure will be modified.
|
||||
void applyForSettingsOnly(SettingsChanges & changes) const;
|
||||
|
||||
void validate(const IStorage & table, const Context & context);
|
||||
bool isMutable() const;
|
||||
bool isSettingsAlter() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
#include <Storages/IStorage.h>
|
||||
|
||||
#include <Storages/AlterCommands.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
|
||||
#include <sparsehash/dense_hash_map>
|
||||
#include <sparsehash/dense_hash_set>
|
||||
@ -18,6 +20,8 @@ namespace ErrorCodes
|
||||
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
||||
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
|
||||
extern const int TYPE_MISMATCH;
|
||||
extern const int SETTINGS_ARE_NOT_SUPPORTED;
|
||||
extern const int UNKNOWN_SETTING;
|
||||
}
|
||||
|
||||
IStorage::IStorage(ColumnsDescription virtuals_) : virtuals(std::move(virtuals_))
|
||||
@ -304,6 +308,13 @@ bool IStorage::isVirtualColumn(const String & column_name) const
|
||||
return getColumns().get(column_name).is_virtual;
|
||||
}
|
||||
|
||||
bool IStorage::hasSetting(const String & /* setting_name */) const
|
||||
{
|
||||
if (!supportsSettings())
|
||||
throw Exception("Storage '" + getName() + "' doesn't support settings.", ErrorCodes::SETTINGS_ARE_NOT_SUPPORTED);
|
||||
return false;
|
||||
}
|
||||
|
||||
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
|
||||
{
|
||||
TableStructureReadLockHolder result;
|
||||
@ -358,6 +369,39 @@ TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id)
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
void IStorage::alterSettings(
|
||||
const SettingsChanges & new_changes,
|
||||
const String & current_database_name,
|
||||
const String & current_table_name,
|
||||
const Context & context,
|
||||
TableStructureWriteLockHolder & /* table_lock_holder */)
|
||||
{
|
||||
IDatabase::ASTModifier storage_modifier = [&] (IAST & ast)
|
||||
{
|
||||
if (!new_changes.empty())
|
||||
{
|
||||
auto & storage_changes = ast.as<ASTStorage &>().settings->changes;
|
||||
/// Make storage settings unique
|
||||
for (const auto & change : new_changes)
|
||||
{
|
||||
if (hasSetting(change.name))
|
||||
{
|
||||
auto finder = [&change] (const SettingChange & c) { return c.name == change.name; };
|
||||
if (auto it = std::find_if(storage_changes.begin(), storage_changes.end(), finder); it != storage_changes.end())
|
||||
it->value = change.value;
|
||||
else
|
||||
storage_changes.push_back(change);
|
||||
}
|
||||
else
|
||||
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + change.name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
}
|
||||
}
|
||||
};
|
||||
context.getDatabase(current_database_name)->alterTable(context, current_table_name, getColumns(), getIndices(), getConstraints(), storage_modifier);
|
||||
}
|
||||
|
||||
|
||||
void IStorage::alter(
|
||||
const AlterCommands & params,
|
||||
const String & database_name,
|
||||
@ -365,17 +409,22 @@ void IStorage::alter(
|
||||
const Context & context,
|
||||
TableStructureWriteLockHolder & table_lock_holder)
|
||||
{
|
||||
for (const auto & param : params)
|
||||
if (params.isSettingsAlter())
|
||||
{
|
||||
if (param.isMutable())
|
||||
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
SettingsChanges new_changes;
|
||||
params.applyForSettingsOnly(new_changes);
|
||||
alterSettings(new_changes, database_name, table_name, context, table_lock_holder);
|
||||
return;
|
||||
}
|
||||
|
||||
if (params.isMutable())
|
||||
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
|
||||
auto new_columns = getColumns();
|
||||
auto new_indices = getIndices();
|
||||
auto new_constraints = getConstraints();
|
||||
params.apply(new_columns);
|
||||
params.applyForColumnsOnly(new_columns);
|
||||
context.getDatabase(database_name)->alterTable(context, table_name, new_columns, new_indices, new_constraints, {});
|
||||
setColumns(std::move(new_columns));
|
||||
}
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <Common/ActionLock.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/RWLock.h>
|
||||
#include <Common/SettingsChanges.h>
|
||||
#include <Storages/ConstraintsDescription.h>
|
||||
|
||||
#include <optional>
|
||||
@ -95,6 +96,9 @@ public:
|
||||
/// Returns true if the storage supports deduplication of inserted data blocks.
|
||||
virtual bool supportsDeduplication() const { return false; }
|
||||
|
||||
/// Returns true if the storage supports settings.
|
||||
virtual bool supportsSettings() const { return false; }
|
||||
|
||||
/// Optional size information of each physical column.
|
||||
/// Currently it's only used by the MergeTree family for query optimizations.
|
||||
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
|
||||
@ -134,13 +138,15 @@ public: /// thread-unsafe part. lockStructure must be acquired
|
||||
/// If |need_all| is set, then checks that all the columns of the table are in the block.
|
||||
void check(const Block & block, bool need_all = false) const;
|
||||
|
||||
/// Check storage has setting. Exception will be thrown if it doesn't support settings at all.
|
||||
virtual bool hasSetting(const String & setting_name) const;
|
||||
|
||||
protected: /// still thread-unsafe part.
|
||||
void setIndices(IndicesDescription indices_);
|
||||
|
||||
/// Returns whether the column is virtual - by default all columns are real.
|
||||
/// Initially reserved virtual column name may be shadowed by real column.
|
||||
virtual bool isVirtualColumn(const String & column_name) const;
|
||||
|
||||
private:
|
||||
ColumnsDescription columns; /// combined real and virtual columns
|
||||
const ColumnsDescription virtuals = {};
|
||||
@ -285,6 +291,15 @@ public:
|
||||
throw Exception("Partition operations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/** ALTER table settings if possible. Otherwise throws exception.
|
||||
*/
|
||||
virtual void alterSettings(
|
||||
const SettingsChanges & new_changes,
|
||||
const String & current_database_name,
|
||||
const String & current_table_name,
|
||||
const Context & context,
|
||||
TableStructureWriteLockHolder & table_lock_holder);
|
||||
|
||||
/** Perform any background work. For example, combining parts in a MergeTree type table.
|
||||
* Returns whether any work has been done.
|
||||
*/
|
||||
|
@ -22,7 +22,7 @@ void KafkaSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
{
|
||||
try
|
||||
{
|
||||
applyChanges(storage_def.settings->changes);
|
||||
loadFromChanges(storage_def.settings->changes);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
|
@ -14,7 +14,9 @@ class ASTStorage;
|
||||
struct KafkaSettings : public SettingsCollection<KafkaSettings>
|
||||
{
|
||||
|
||||
#define LIST_OF_KAFKA_SETTINGS(M) \
|
||||
|
||||
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
|
||||
#define LIST_OF_KAFKA_SETTINGS(M, IM) \
|
||||
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
|
||||
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
|
||||
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
|
||||
|
@ -43,6 +43,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -367,7 +368,7 @@ bool StorageKafka::streamToViews()
|
||||
const Settings & settings = global_context.getSettingsRef();
|
||||
size_t block_size = max_block_size;
|
||||
if (block_size == 0)
|
||||
block_size = settings.max_block_size.value;
|
||||
block_size = settings.max_block_size;
|
||||
|
||||
// Create a stream for each consumer and join them in a union stream
|
||||
InterpreterInsertQuery interpreter{insert, global_context};
|
||||
@ -406,6 +407,22 @@ bool StorageKafka::streamToViews()
|
||||
}
|
||||
|
||||
|
||||
bool StorageKafka::hasSetting(const String & setting_name) const
|
||||
{
|
||||
return KafkaSettings::findIndex(setting_name) != KafkaSettings::npos;
|
||||
}
|
||||
|
||||
void StorageKafka::alterSettings(
|
||||
const SettingsChanges & /* new_changes */,
|
||||
const String & /* current_database_name */,
|
||||
const String & /* current_table_name */,
|
||||
const Context & /* context */,
|
||||
TableStructureWriteLockHolder & /* table_lock_holder */)
|
||||
{
|
||||
throw Exception("Storage '" + getName() + "' doesn't support settings alter", ErrorCodes::UNSUPPORTED_METHOD);
|
||||
}
|
||||
|
||||
|
||||
void registerStorageKafka(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)
|
||||
@ -470,7 +487,7 @@ void registerStorageKafka(StorageFactory & factory)
|
||||
#undef CHECK_KAFKA_STORAGE_ARGUMENT
|
||||
|
||||
// Get and check broker list
|
||||
String brokers = kafka_settings.kafka_broker_list.value;
|
||||
String brokers = kafka_settings.kafka_broker_list;
|
||||
if (args_count >= 1)
|
||||
{
|
||||
const auto * ast = engine_args[0]->as<ASTLiteral>();
|
||||
@ -525,7 +542,7 @@ void registerStorageKafka(StorageFactory & factory)
|
||||
}
|
||||
|
||||
// Parse row delimiter (optional)
|
||||
char row_delimiter = kafka_settings.kafka_row_delimiter.value;
|
||||
char row_delimiter = kafka_settings.kafka_row_delimiter;
|
||||
if (args_count >= 5)
|
||||
{
|
||||
engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
|
||||
@ -572,7 +589,7 @@ void registerStorageKafka(StorageFactory & factory)
|
||||
}
|
||||
|
||||
// Parse number of consumers (optional)
|
||||
UInt64 num_consumers = kafka_settings.kafka_num_consumers.value;
|
||||
UInt64 num_consumers = kafka_settings.kafka_num_consumers;
|
||||
if (args_count >= 7)
|
||||
{
|
||||
const auto * ast = engine_args[6]->as<ASTLiteral>();
|
||||
@ -587,7 +604,7 @@ void registerStorageKafka(StorageFactory & factory)
|
||||
}
|
||||
|
||||
// Parse max block size (optional)
|
||||
UInt64 max_block_size = static_cast<size_t>(kafka_settings.kafka_max_block_size.value);
|
||||
UInt64 max_block_size = static_cast<size_t>(kafka_settings.kafka_max_block_size);
|
||||
if (args_count >= 8)
|
||||
{
|
||||
const auto * ast = engine_args[7]->as<ASTLiteral>();
|
||||
@ -602,7 +619,7 @@ void registerStorageKafka(StorageFactory & factory)
|
||||
}
|
||||
}
|
||||
|
||||
size_t skip_broken = static_cast<size_t>(kafka_settings.kafka_skip_broken_messages.value);
|
||||
size_t skip_broken = static_cast<size_t>(kafka_settings.kafka_skip_broken_messages);
|
||||
if (args_count >= 9)
|
||||
{
|
||||
const auto * ast = engine_args[8]->as<ASTLiteral>();
|
||||
|
@ -24,6 +24,7 @@ public:
|
||||
std::string getName() const override { return "Kafka"; }
|
||||
std::string getTableName() const override { return table_name; }
|
||||
std::string getDatabaseName() const override { return database_name; }
|
||||
bool supportsSettings() const override { return true; }
|
||||
|
||||
void startup() override;
|
||||
void shutdown() override;
|
||||
@ -56,6 +57,15 @@ public:
|
||||
const auto & getSchemaName() const { return schema_name; }
|
||||
const auto & skipBroken() const { return skip_broken; }
|
||||
|
||||
bool hasSetting(const String & setting_name) const override;
|
||||
|
||||
void alterSettings(
|
||||
const SettingsChanges & new_changes,
|
||||
const String & current_database_name,
|
||||
const String & current_table_name,
|
||||
const Context & context,
|
||||
TableStructureWriteLockHolder & table_lock_holder) override;
|
||||
|
||||
protected:
|
||||
StorageKafka(
|
||||
const std::string & table_name_,
|
||||
|
@ -54,14 +54,15 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
|
||||
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
|
||||
|
||||
String part_name = params.get("part");
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
|
||||
/// Validation of the input that may come from malicious replica.
|
||||
MergeTreePartInfo::fromPartName(part_name, data.format_version);
|
||||
|
||||
static std::atomic_uint total_sends {0};
|
||||
|
||||
if ((data.settings.replicated_max_parallel_sends && total_sends >= data.settings.replicated_max_parallel_sends)
|
||||
|| (data.settings.replicated_max_parallel_sends_for_table && data.current_table_sends >= data.settings.replicated_max_parallel_sends_for_table))
|
||||
if ((data_settings->replicated_max_parallel_sends && total_sends >= data_settings->replicated_max_parallel_sends)
|
||||
|| (data_settings->replicated_max_parallel_sends_for_table && data.current_table_sends >= data_settings->replicated_max_parallel_sends_for_table))
|
||||
{
|
||||
response.setStatus(std::to_string(HTTP_TOO_MANY_REQUESTS));
|
||||
response.setReason("Too many concurrent fetches, try again later");
|
||||
@ -174,6 +175,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
{
|
||||
/// Validation of the input that may come from malicious replica.
|
||||
MergeTreePartInfo::fromPartName(part_name, data.format_version);
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
|
||||
Poco::URI uri;
|
||||
uri.setScheme(interserver_scheme);
|
||||
@ -200,7 +202,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
timeouts,
|
||||
creds,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
data.settings.replicated_max_parallel_fetches_for_host
|
||||
data_settings->replicated_max_parallel_fetches_for_host
|
||||
};
|
||||
|
||||
static const String TMP_PREFIX = "tmp_fetch_";
|
||||
|
@ -39,7 +39,7 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
|
||||
, compute_granularity(index_granularity.empty())
|
||||
, codec(std::move(codec_))
|
||||
, skip_indices(indices_to_recalc)
|
||||
, with_final_mark(storage.settings.write_final_mark && can_use_adaptive_granularity)
|
||||
, with_final_mark(storage.getCOWSettings()->write_final_mark && can_use_adaptive_granularity)
|
||||
{
|
||||
if (blocks_are_granules_size && !index_granularity.empty())
|
||||
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
|
||||
@ -139,10 +139,11 @@ void fillIndexGranularityImpl(
|
||||
|
||||
void IMergedBlockOutputStream::fillIndexGranularity(const Block & block)
|
||||
{
|
||||
const auto storage_settings = storage.getCOWSettings();
|
||||
fillIndexGranularityImpl(
|
||||
block,
|
||||
storage.settings.index_granularity_bytes,
|
||||
storage.settings.index_granularity,
|
||||
storage_settings->index_granularity_bytes,
|
||||
storage_settings->index_granularity,
|
||||
blocks_are_granules_size,
|
||||
index_offset,
|
||||
index_granularity,
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTPartition.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Parsers/ExpressionListParsers.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
@ -89,6 +90,7 @@ namespace ErrorCodes
|
||||
extern const int BAD_TTL_EXPRESSION;
|
||||
extern const int INCORRECT_FILE_NAME;
|
||||
extern const int BAD_DATA_PART_NAME;
|
||||
extern const int UNKNOWN_SETTING;
|
||||
}
|
||||
|
||||
|
||||
@ -105,13 +107,12 @@ MergeTreeData::MergeTreeData(
|
||||
const ASTPtr & sample_by_ast_,
|
||||
const ASTPtr & ttl_table_ast_,
|
||||
const MergingParams & merging_params_,
|
||||
const MergeTreeSettings & settings_,
|
||||
MergeTreeSettingsPtr settings_,
|
||||
bool require_part_metadata_,
|
||||
bool attach,
|
||||
BrokenPartCallback broken_part_callback_)
|
||||
: global_context(context_),
|
||||
merging_params(merging_params_),
|
||||
settings(settings_),
|
||||
partition_by_ast(partition_by_ast_),
|
||||
sample_by_ast(sample_by_ast_),
|
||||
ttl_table_ast(ttl_table_ast_),
|
||||
@ -120,9 +121,11 @@ MergeTreeData::MergeTreeData(
|
||||
full_path(full_path_),
|
||||
broken_part_callback(broken_part_callback_),
|
||||
log_name(database_name + "." + table_name), log(&Logger::get(log_name)),
|
||||
guarded_settings(settings_),
|
||||
data_parts_by_info(data_parts_indexes.get<TagByInfo>()),
|
||||
data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
|
||||
{
|
||||
const auto settings = getCOWSettings();
|
||||
setProperties(order_by_ast_, primary_key_ast_, columns_, indices_, constraints_);
|
||||
|
||||
/// NOTE: using the same columns list as is read when performing actual merges.
|
||||
@ -133,7 +136,7 @@ MergeTreeData::MergeTreeData(
|
||||
sampling_expr_column_name = sample_by_ast->getColumnName();
|
||||
|
||||
if (!primary_key_sample.has(sampling_expr_column_name)
|
||||
&& !attach && !settings.compatibility_allow_sampling_expression_not_in_primary_key) /// This is for backward compatibility.
|
||||
&& !attach && !settings->compatibility_allow_sampling_expression_not_in_primary_key) /// This is for backward compatibility.
|
||||
throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
auto syntax = SyntaxAnalyzer(global_context).analyze(sample_by_ast, getColumns().getAllPhysical());
|
||||
@ -729,6 +732,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
{
|
||||
LOG_DEBUG(log, "Loading data parts");
|
||||
|
||||
const auto settings = getCOWSettings();
|
||||
Strings part_file_names;
|
||||
Poco::DirectoryIterator end;
|
||||
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
|
||||
@ -750,7 +754,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
}
|
||||
|
||||
/// Parallel loading of data parts.
|
||||
size_t num_threads = std::min(size_t(settings.max_part_loading_threads), part_file_names.size());
|
||||
size_t num_threads = std::min(size_t(settings->max_part_loading_threads), part_file_names.size());
|
||||
|
||||
std::mutex mutex;
|
||||
|
||||
@ -869,12 +873,12 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
|
||||
pool.wait();
|
||||
|
||||
if (has_non_adaptive_parts && has_adaptive_parts && !settings.enable_mixed_granularity_parts)
|
||||
if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts)
|
||||
throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
has_non_adaptive_index_granularity_parts = has_non_adaptive_parts;
|
||||
|
||||
if (suspicious_broken_parts > settings.max_suspicious_broken_parts && !skip_sanity_checks)
|
||||
if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks)
|
||||
throw Exception("Suspiciously many (" + toString(suspicious_broken_parts) + ") broken parts to remove.",
|
||||
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
|
||||
|
||||
@ -961,10 +965,11 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life
|
||||
if (!lock.try_lock())
|
||||
return;
|
||||
|
||||
const auto settings = getCOWSettings();
|
||||
time_t current_time = time(nullptr);
|
||||
ssize_t deadline = (custom_directories_lifetime_seconds >= 0)
|
||||
? current_time - custom_directories_lifetime_seconds
|
||||
: current_time - settings.temporary_directories_lifetime.totalSeconds();
|
||||
: current_time - settings->temporary_directories_lifetime.totalSeconds();
|
||||
|
||||
/// Delete temporary directories older than a day.
|
||||
Poco::DirectoryIterator end;
|
||||
@ -1015,7 +1020,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
|
||||
|
||||
if (part.unique() && /// Grab only parts that are not used by anyone (SELECTs for example).
|
||||
part_remove_time < now &&
|
||||
now - part_remove_time > settings.old_parts_lifetime.totalSeconds())
|
||||
now - part_remove_time > getCOWSettings()->old_parts_lifetime.totalSeconds())
|
||||
{
|
||||
parts_to_delete.emplace_back(it);
|
||||
}
|
||||
@ -1099,11 +1104,12 @@ void MergeTreeData::clearOldPartsFromFilesystem()
|
||||
|
||||
void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_remove)
|
||||
{
|
||||
if (parts_to_remove.size() > 1 && settings.max_part_removal_threads > 1 && parts_to_remove.size() > settings.concurrent_part_removal_threshold)
|
||||
const auto settings = getCOWSettings();
|
||||
if (parts_to_remove.size() > 1 && settings->max_part_removal_threads > 1 && parts_to_remove.size() > settings->concurrent_part_removal_threshold)
|
||||
{
|
||||
/// Parallel parts removal.
|
||||
|
||||
size_t num_threads = std::min(size_t(settings.max_part_removal_threads), parts_to_remove.size());
|
||||
size_t num_threads = std::min(size_t(settings->max_part_removal_threads), parts_to_remove.size());
|
||||
ThreadPool pool(num_threads);
|
||||
|
||||
/// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
|
||||
@ -1229,7 +1235,8 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
|
||||
ASTPtr new_order_by_ast = order_by_ast;
|
||||
ASTPtr new_primary_key_ast = primary_key_ast;
|
||||
ASTPtr new_ttl_table_ast = ttl_table_ast;
|
||||
commands.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast);
|
||||
SettingsChanges new_changes;
|
||||
commands.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
|
||||
if (getIndices().empty() && !new_indices.empty() &&
|
||||
!context.getSettingsRef().allow_experimental_data_skipping_indices)
|
||||
throw Exception("You must set the setting `allow_experimental_data_skipping_indices` to 1 " \
|
||||
@ -1316,6 +1323,12 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
|
||||
|
||||
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast, /* only_check = */ true);
|
||||
|
||||
for (const auto & setting : new_changes)
|
||||
{
|
||||
if (!hasSetting(setting.name))
|
||||
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting.name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
}
|
||||
|
||||
/// Check that type conversions are possible.
|
||||
ExpressionActionsPtr unused_expression;
|
||||
NameToNameMap unused_map;
|
||||
@ -1328,6 +1341,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
|
||||
const IndicesASTs & old_indices, const IndicesASTs & new_indices, ExpressionActionsPtr & out_expression,
|
||||
NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
|
||||
{
|
||||
const auto settings = getCOWSettings();
|
||||
out_expression = nullptr;
|
||||
out_rename_map = {};
|
||||
out_force_update_metadata = false;
|
||||
@ -1335,7 +1349,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
|
||||
if (part)
|
||||
part_mrk_file_extension = part->index_granularity_info.marks_file_extension;
|
||||
else
|
||||
part_mrk_file_extension = settings.index_granularity_bytes == 0 ? getNonAdaptiveMrkExtension() : getAdaptiveMrkExtension();
|
||||
part_mrk_file_extension = settings->index_granularity_bytes == 0 ? getNonAdaptiveMrkExtension() : getAdaptiveMrkExtension();
|
||||
|
||||
using NameToType = std::map<String, const IDataType *>;
|
||||
NameToType new_types;
|
||||
@ -1493,6 +1507,7 @@ void MergeTreeData::alterDataPart(
|
||||
bool skip_sanity_checks,
|
||||
AlterDataPartTransactionPtr & transaction)
|
||||
{
|
||||
const auto settings = getCOWSettings();
|
||||
ExpressionActionsPtr expression;
|
||||
const auto & part = transaction->getDataPart();
|
||||
bool force_update_metadata;
|
||||
@ -1508,12 +1523,12 @@ void MergeTreeData::alterDataPart(
|
||||
++num_files_to_remove;
|
||||
|
||||
if (!skip_sanity_checks
|
||||
&& (num_files_to_modify > settings.max_files_to_modify_in_alter_columns
|
||||
|| num_files_to_remove > settings.max_files_to_remove_in_alter_columns))
|
||||
&& (num_files_to_modify > settings->max_files_to_modify_in_alter_columns
|
||||
|| num_files_to_remove > settings->max_files_to_remove_in_alter_columns))
|
||||
{
|
||||
transaction->clear();
|
||||
|
||||
const bool forbidden_because_of_modify = num_files_to_modify > settings.max_files_to_modify_in_alter_columns;
|
||||
const bool forbidden_because_of_modify = num_files_to_modify > settings->max_files_to_modify_in_alter_columns;
|
||||
|
||||
std::stringstream exception_message;
|
||||
exception_message
|
||||
@ -1545,7 +1560,7 @@ void MergeTreeData::alterDataPart(
|
||||
<< " If it is not an error, you could increase merge_tree/"
|
||||
<< (forbidden_because_of_modify ? "max_files_to_modify_in_alter_columns" : "max_files_to_remove_in_alter_columns")
|
||||
<< " parameter in configuration file (current value: "
|
||||
<< (forbidden_because_of_modify ? settings.max_files_to_modify_in_alter_columns : settings.max_files_to_remove_in_alter_columns)
|
||||
<< (forbidden_because_of_modify ? settings->max_files_to_modify_in_alter_columns : settings->max_files_to_remove_in_alter_columns)
|
||||
<< ")";
|
||||
|
||||
throw Exception(exception_message.str(), ErrorCodes::TABLE_DIFFERS_TOO_MUCH);
|
||||
@ -1630,6 +1645,25 @@ void MergeTreeData::alterDataPart(
|
||||
return;
|
||||
}
|
||||
|
||||
void MergeTreeData::alterSettings(
|
||||
const SettingsChanges & new_changes,
|
||||
const String & current_database_name,
|
||||
const String & current_table_name,
|
||||
const Context & context,
|
||||
TableStructureWriteLockHolder & table_lock_holder)
|
||||
{
|
||||
std::lock_guard lock(settings_mutex);
|
||||
MutableMergeTreeSettingsPtr settings = std::move(*guarded_settings.getPtr()).mutate();
|
||||
settings->updateFromChanges(new_changes);
|
||||
IStorage::alterSettings(new_changes, current_database_name, current_table_name, context, table_lock_holder);
|
||||
guarded_settings.setPtr(std::move(settings));
|
||||
}
|
||||
|
||||
bool MergeTreeData::hasSetting(const String & setting_name) const
|
||||
{
|
||||
return MergeTreeSettings::findIndex(setting_name) != MergeTreeSettings::npos;
|
||||
}
|
||||
|
||||
void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part)
|
||||
{
|
||||
auto & empty_columns = data_part->empty_columns;
|
||||
@ -2306,28 +2340,29 @@ std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const
|
||||
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
|
||||
{
|
||||
const auto settings = getCOWSettings();
|
||||
const size_t parts_count_in_total = getPartsCount();
|
||||
if (parts_count_in_total >= settings.max_parts_in_total)
|
||||
if (parts_count_in_total >= settings->max_parts_in_total)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
||||
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
|
||||
}
|
||||
|
||||
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
|
||||
if (parts_count_in_partition < settings.parts_to_delay_insert)
|
||||
if (parts_count_in_partition < settings->parts_to_delay_insert)
|
||||
return;
|
||||
|
||||
if (parts_count_in_partition >= settings.parts_to_throw_insert)
|
||||
if (parts_count_in_partition >= settings->parts_to_throw_insert)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
||||
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
|
||||
}
|
||||
|
||||
const size_t max_k = settings.parts_to_throw_insert - settings.parts_to_delay_insert; /// always > 0
|
||||
const size_t k = 1 + parts_count_in_partition - settings.parts_to_delay_insert; /// from 1 to max_k
|
||||
const double delay_milliseconds = ::pow(settings.max_delay_to_insert * 1000, static_cast<double>(k) / max_k);
|
||||
const size_t max_k = settings->parts_to_throw_insert - settings->parts_to_delay_insert; /// always > 0
|
||||
const size_t k = 1 + parts_count_in_partition - settings->parts_to_delay_insert; /// from 1 to max_k
|
||||
const double delay_milliseconds = ::pow(settings->max_delay_to_insert * 1000, static_cast<double>(k) / max_k);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::DelayedInserts);
|
||||
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds);
|
||||
@ -2345,8 +2380,9 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const
|
||||
|
||||
void MergeTreeData::throwInsertIfNeeded() const
|
||||
{
|
||||
const auto settings = getCOWSettings();
|
||||
const size_t parts_count_in_total = getPartsCount();
|
||||
if (parts_count_in_total >= settings.max_parts_in_total)
|
||||
if (parts_count_in_total >= settings->max_parts_in_total)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
||||
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
|
||||
@ -2354,7 +2390,7 @@ void MergeTreeData::throwInsertIfNeeded() const
|
||||
|
||||
const size_t parts_count_in_partition = getMaxPartsCountForPartition();
|
||||
|
||||
if (parts_count_in_partition >= settings.parts_to_throw_insert)
|
||||
if (parts_count_in_partition >= settings->parts_to_throw_insert)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
||||
throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
|
||||
@ -3039,7 +3075,9 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
|
||||
|
||||
bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
|
||||
{
|
||||
if (!settings.enable_mixed_granularity_parts || settings.index_granularity_bytes == 0)
|
||||
const auto settings = getCOWSettings();
|
||||
|
||||
if (!settings->enable_mixed_granularity_parts || settings->index_granularity_bytes == 0)
|
||||
{
|
||||
if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.is_adaptive)
|
||||
return false;
|
||||
|
@ -331,7 +331,7 @@ public:
|
||||
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
|
||||
const ASTPtr & ttl_table_ast_,
|
||||
const MergingParams & merging_params_,
|
||||
const MergeTreeSettings & settings_,
|
||||
MergeTreeSettingsPtr settings_,
|
||||
bool require_part_metadata_,
|
||||
bool attach,
|
||||
BrokenPartCallback broken_part_callback_ = [](const String &){});
|
||||
@ -360,6 +360,8 @@ public:
|
||||
|| merging_params.mode == MergingParams::VersionedCollapsing;
|
||||
}
|
||||
|
||||
bool supportsSettings() const override { return true; }
|
||||
|
||||
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &) const override;
|
||||
|
||||
NameAndTypePair getColumn(const String & column_name) const override
|
||||
@ -535,6 +537,18 @@ public:
|
||||
bool skip_sanity_checks,
|
||||
AlterDataPartTransactionPtr& transaction);
|
||||
|
||||
/// Performs ALTER of table settings (MergeTreeSettings). Lightweight operation, affects metadata only.
|
||||
/// Not atomic, have to be done with alter intention lock.
|
||||
void alterSettings(
|
||||
const SettingsChanges & new_changes,
|
||||
const String & current_database_name,
|
||||
const String & current_table_name,
|
||||
const Context & context,
|
||||
TableStructureWriteLockHolder & table_lock_holder) override;
|
||||
|
||||
/// All MergeTreeData children have settings.
|
||||
bool hasSetting(const String & setting_name) const override;
|
||||
|
||||
/// Remove columns, that have been markedd as empty after zeroing values with expired ttl
|
||||
void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part);
|
||||
|
||||
@ -606,8 +620,9 @@ public:
|
||||
/// Has additional constraint in replicated version
|
||||
virtual bool canUseAdaptiveGranularity() const
|
||||
{
|
||||
return settings.index_granularity_bytes != 0 &&
|
||||
(settings.enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts);
|
||||
const auto settings = getCOWSettings();
|
||||
return settings->index_granularity_bytes != 0 &&
|
||||
(settings->enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts);
|
||||
}
|
||||
|
||||
|
||||
@ -660,8 +675,6 @@ public:
|
||||
String sampling_expr_column_name;
|
||||
Names columns_required_for_sampling;
|
||||
|
||||
MergeTreeSettings settings;
|
||||
|
||||
/// Limiting parallel sends per one table, used in DataPartsExchange
|
||||
std::atomic_uint current_table_sends {0};
|
||||
|
||||
@ -670,7 +683,17 @@ public:
|
||||
|
||||
bool has_non_adaptive_index_granularity_parts = false;
|
||||
|
||||
/// Get copy-on-write pointer to storage settings.
|
||||
/// Copy this pointer into your scope and you will
|
||||
/// get consistent settings.
|
||||
const MergeTreeSettingsPtr getCOWSettings() const
|
||||
{
|
||||
std::shared_lock lock(settings_mutex);
|
||||
return guarded_settings.copyPtr();
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
friend struct MergeTreeDataPart;
|
||||
friend class MergeTreeDataMergerMutator;
|
||||
friend class ReplicatedMergeTreeAlterThread;
|
||||
@ -698,6 +721,26 @@ protected:
|
||||
String log_name;
|
||||
Logger * log;
|
||||
|
||||
/// Just hides settings pointer from direct usage
|
||||
class MergeTreeSettingsGuard
|
||||
{
|
||||
private:
|
||||
/// Settings COW pointer. Data maybe changed at any point of time.
|
||||
/// If you need consistent settings, just copy pointer to your scope.
|
||||
MergeTreeSettingsPtr settings_ptr;
|
||||
public:
|
||||
MergeTreeSettingsGuard(MergeTreeSettingsPtr settings_ptr_)
|
||||
: settings_ptr(settings_ptr_)
|
||||
{}
|
||||
|
||||
const MergeTreeSettingsPtr copyPtr() const { return settings_ptr; }
|
||||
MergeTreeSettingsPtr getPtr() { return settings_ptr; }
|
||||
void setPtr(MergeTreeSettingsPtr ptr) { settings_ptr = ptr; }
|
||||
};
|
||||
|
||||
/// Storage settings. Don't use this field directly, if you
|
||||
/// want readonly settings. Prefer getCOWSettings() method.
|
||||
MergeTreeSettingsGuard guarded_settings;
|
||||
|
||||
/// Work with data parts
|
||||
|
||||
@ -785,6 +828,8 @@ protected:
|
||||
std::mutex grab_old_parts_mutex;
|
||||
/// The same for clearOldTemporaryDirectories.
|
||||
std::mutex clear_old_temporary_directories_mutex;
|
||||
/// Mutex for settings usage
|
||||
mutable std::shared_mutex settings_mutex;
|
||||
|
||||
void setProperties(const ASTPtr & new_order_by_ast, const ASTPtr & new_primary_key_ast,
|
||||
const ColumnsDescription & new_columns,
|
||||
|
@ -141,15 +141,16 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz
|
||||
throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
size_t free_entries = pool_size - pool_used;
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
|
||||
UInt64 max_size = 0;
|
||||
if (free_entries >= data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge)
|
||||
max_size = data.settings.max_bytes_to_merge_at_max_space_in_pool;
|
||||
if (free_entries >= data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge)
|
||||
max_size = data_settings->max_bytes_to_merge_at_max_space_in_pool;
|
||||
else
|
||||
max_size = interpolateExponential(
|
||||
data.settings.max_bytes_to_merge_at_min_space_in_pool,
|
||||
data.settings.max_bytes_to_merge_at_max_space_in_pool,
|
||||
static_cast<double>(free_entries) / data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge);
|
||||
data_settings->max_bytes_to_merge_at_min_space_in_pool,
|
||||
data_settings->max_bytes_to_merge_at_max_space_in_pool,
|
||||
static_cast<double>(free_entries) / data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge);
|
||||
|
||||
return std::min(max_size, static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_SELECT));
|
||||
}
|
||||
@ -157,11 +158,13 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz
|
||||
|
||||
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation()
|
||||
{
|
||||
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
size_t total_threads_in_pool = pool.getNumberOfThreads();
|
||||
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
|
||||
|
||||
/// Allow mutations only if there are enough threads, leave free threads for merges else
|
||||
if (total_threads_in_pool - busy_threads_in_pool >= data.settings.number_of_free_entries_in_pool_to_execute_mutation)
|
||||
if (total_threads_in_pool - busy_threads_in_pool >= data_settings->number_of_free_entries_in_pool_to_execute_mutation)
|
||||
return static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_RESERVE);
|
||||
|
||||
return 0;
|
||||
@ -176,6 +179,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
String * out_disable_reason)
|
||||
{
|
||||
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
|
||||
if (data_parts.empty())
|
||||
{
|
||||
@ -230,7 +234,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
merge_settings.base = 1;
|
||||
|
||||
bool can_merge_with_ttl =
|
||||
(current_time - last_merge_with_ttl > data.settings.merge_with_ttl_timeout);
|
||||
(current_time - last_merge_with_ttl > data_settings->merge_with_ttl_timeout);
|
||||
|
||||
/// NOTE Could allow selection of different merge strategy.
|
||||
if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled())
|
||||
@ -552,6 +556,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
|
||||
Names all_column_names = data.getColumns().getNamesOfPhysical();
|
||||
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
|
||||
NamesAndTypesList gathering_columns, merging_columns;
|
||||
Names gathering_column_names, merging_column_names;
|
||||
@ -624,13 +629,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
/// We count total amount of bytes in parts
|
||||
/// and use direct_io + aio if there is more than min_merge_bytes_to_use_direct_io
|
||||
bool read_with_direct_io = false;
|
||||
if (data.settings.min_merge_bytes_to_use_direct_io != 0)
|
||||
if (data_settings->min_merge_bytes_to_use_direct_io != 0)
|
||||
{
|
||||
size_t total_size = 0;
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
total_size += part->bytes_on_disk;
|
||||
if (total_size >= data.settings.min_merge_bytes_to_use_direct_io)
|
||||
if (total_size >= data_settings->min_merge_bytes_to_use_direct_io)
|
||||
{
|
||||
LOG_DEBUG(log, "Will merge parts reading files in O_DIRECT");
|
||||
read_with_direct_io = true;
|
||||
@ -727,7 +732,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
merging_columns,
|
||||
compression_codec,
|
||||
merged_column_to_size,
|
||||
data.settings.min_merge_bytes_to_use_direct_io,
|
||||
data_settings->min_merge_bytes_to_use_direct_io,
|
||||
blocks_are_granules_size};
|
||||
|
||||
merged_stream->readPrefix();
|
||||
@ -960,6 +965,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
|
||||
const auto & updated_header = mutations_interpreter.getUpdatedHeader();
|
||||
|
||||
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
|
||||
Block in_header = in->getHeader();
|
||||
|
||||
@ -1034,7 +1040,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
|
||||
}
|
||||
|
||||
NameSet files_to_skip = {"checksums.txt", "columns.txt"};
|
||||
auto mrk_extension = data.settings.index_granularity_bytes ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension();
|
||||
|
||||
auto mrk_extension = data_settings->index_granularity_bytes ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension();
|
||||
for (const auto & entry : updated_header)
|
||||
{
|
||||
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
|
||||
@ -1138,9 +1145,11 @@ MergeTreeDataMergerMutator::MergeAlgorithm MergeTreeDataMergerMutator::chooseMer
|
||||
const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound,
|
||||
const NamesAndTypesList & gathering_columns, bool deduplicate, bool need_remove_expired_values) const
|
||||
{
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
|
||||
if (deduplicate)
|
||||
return MergeAlgorithm::Horizontal;
|
||||
if (data.settings.enable_vertical_merge_algorithm == 0)
|
||||
if (data_settings->enable_vertical_merge_algorithm == 0)
|
||||
return MergeAlgorithm::Horizontal;
|
||||
if (need_remove_expired_values)
|
||||
return MergeAlgorithm::Horizontal;
|
||||
@ -1151,9 +1160,9 @@ MergeTreeDataMergerMutator::MergeAlgorithm MergeTreeDataMergerMutator::chooseMer
|
||||
data.merging_params.mode == MergeTreeData::MergingParams::Replacing ||
|
||||
data.merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
|
||||
|
||||
bool enough_ordinary_cols = gathering_columns.size() >= data.settings.vertical_merge_algorithm_min_columns_to_activate;
|
||||
bool enough_ordinary_cols = gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate;
|
||||
|
||||
bool enough_total_rows = sum_rows_upper_bound >= data.settings.vertical_merge_algorithm_min_rows_to_activate;
|
||||
bool enough_total_rows = sum_rows_upper_bound >= data_settings->vertical_merge_algorithm_min_rows_to_activate;
|
||||
|
||||
bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS;
|
||||
|
||||
|
@ -673,6 +673,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
|
||||
size_t sum_marks = 0;
|
||||
size_t total_rows = 0;
|
||||
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
size_t adaptive_parts = 0;
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
{
|
||||
@ -689,18 +690,18 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
|
||||
|
||||
size_t index_granularity_bytes = 0;
|
||||
if (adaptive_parts > parts.size() / 2)
|
||||
index_granularity_bytes = data.settings.index_granularity_bytes;
|
||||
index_granularity_bytes = data_settings->index_granularity_bytes;
|
||||
|
||||
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_max_rows_to_use_cache,
|
||||
settings.merge_tree_max_bytes_to_use_cache,
|
||||
data.settings.index_granularity,
|
||||
data_settings->index_granularity,
|
||||
index_granularity_bytes);
|
||||
|
||||
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_min_rows_for_concurrent_read,
|
||||
settings.merge_tree_min_bytes_for_concurrent_read,
|
||||
data.settings.index_granularity,
|
||||
data_settings->index_granularity,
|
||||
index_granularity_bytes);
|
||||
|
||||
if (sum_marks > max_marks_to_use_cache)
|
||||
@ -831,6 +832,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO
|
||||
SortingInfoPtr sorting_info = query_info.sorting_info;
|
||||
size_t adaptive_parts = 0;
|
||||
std::vector<size_t> sum_marks_in_parts(parts.size());
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
{
|
||||
@ -846,18 +848,18 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO
|
||||
|
||||
size_t index_granularity_bytes = 0;
|
||||
if (adaptive_parts > parts.size() / 2)
|
||||
index_granularity_bytes = data.settings.index_granularity_bytes;
|
||||
index_granularity_bytes = data_settings->index_granularity_bytes;
|
||||
|
||||
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_max_rows_to_use_cache,
|
||||
settings.merge_tree_max_bytes_to_use_cache,
|
||||
data.settings.index_granularity,
|
||||
data_settings->index_granularity,
|
||||
index_granularity_bytes);
|
||||
|
||||
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_min_rows_for_concurrent_read,
|
||||
settings.merge_tree_min_bytes_for_concurrent_read,
|
||||
data.settings.index_granularity,
|
||||
data_settings->index_granularity,
|
||||
index_granularity_bytes);
|
||||
|
||||
if (sum_marks > max_marks_to_use_cache)
|
||||
@ -869,7 +871,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO
|
||||
return streams;
|
||||
|
||||
/// Let's split ranges to avoid reading much data.
|
||||
auto split_ranges = [rows_granularity = data.settings.index_granularity, max_block_size](const auto & ranges, int direction)
|
||||
auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size](const auto & ranges, int direction)
|
||||
{
|
||||
MarkRanges new_ranges;
|
||||
const size_t max_marks_in_range = (max_block_size + rows_granularity - 1) / rows_granularity;
|
||||
@ -1033,6 +1035,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
|
||||
const Names & virt_columns,
|
||||
const Settings & settings) const
|
||||
{
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
size_t sum_marks = 0;
|
||||
size_t adaptive_parts = 0;
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
@ -1046,12 +1049,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
|
||||
|
||||
size_t index_granularity_bytes = 0;
|
||||
if (adaptive_parts >= parts.size() / 2)
|
||||
index_granularity_bytes = data.settings.index_granularity_bytes;
|
||||
index_granularity_bytes = data_settings->index_granularity_bytes;
|
||||
|
||||
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_max_rows_to_use_cache,
|
||||
settings.merge_tree_max_bytes_to_use_cache,
|
||||
data.settings.index_granularity,
|
||||
data_settings->index_granularity,
|
||||
index_granularity_bytes);
|
||||
|
||||
if (sum_marks > max_marks_to_use_cache)
|
||||
|
@ -25,12 +25,13 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(
|
||||
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(
|
||||
const MergeTreeData & storage)
|
||||
{
|
||||
fixed_index_granularity = storage.settings.index_granularity;
|
||||
const auto storage_settings = storage.getCOWSettings();
|
||||
fixed_index_granularity = storage_settings->index_granularity;
|
||||
/// Granularity is fixed
|
||||
if (!storage.canUseAdaptiveGranularity())
|
||||
setNonAdaptive();
|
||||
else
|
||||
setAdaptive(storage.settings.index_granularity_bytes);
|
||||
setAdaptive(storage_settings->index_granularity_bytes);
|
||||
}
|
||||
|
||||
|
||||
|
@ -46,7 +46,7 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
{
|
||||
try
|
||||
{
|
||||
applyChanges(storage_def.settings->changes);
|
||||
loadFromChanges(storage_def.settings->changes);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
@ -67,7 +67,7 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
|
||||
#define ADD_IF_ABSENT(NAME) \
|
||||
if (std::find_if(changes.begin(), changes.end(), \
|
||||
[](const SettingChange & c) { return c.name == #NAME; }) \
|
||||
[](const SettingChange & c) { return c.name == #NAME; }) \
|
||||
== changes.end()) \
|
||||
changes.push_back(SettingChange{#NAME, NAME.value});
|
||||
|
||||
@ -75,4 +75,9 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
#undef ADD_IF_ABSENT
|
||||
}
|
||||
|
||||
|
||||
MergeTreeSettings::MutablePtr MergeTreeSettings::clone() const
|
||||
{
|
||||
return COW::create(*this);
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Core/Defines.h>
|
||||
#include <Core/SettingsCommon.h>
|
||||
#include <Common/COW.h>
|
||||
|
||||
|
||||
namespace Poco
|
||||
@ -21,11 +22,14 @@ class ASTStorage;
|
||||
/** Settings for the MergeTree family of engines.
|
||||
* Could be loaded from config or from a CREATE TABLE query (SETTINGS clause).
|
||||
*/
|
||||
struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
||||
struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>, public COW<MergeTreeSettings>
|
||||
{
|
||||
|
||||
#define LIST_OF_MERGE_TREE_SETTINGS(M) \
|
||||
M(SettingUInt64, index_granularity, 8192, "How many rows correspond to one primary key value.") \
|
||||
friend class COW<MergeTreeSettings>;
|
||||
|
||||
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
|
||||
#define LIST_OF_MERGE_TREE_SETTINGS(M, IM) \
|
||||
IM(SettingUInt64, index_granularity, 8192, "How many rows correspond to one primary key value.") \
|
||||
\
|
||||
/** Merge settings. */ \
|
||||
M(SettingUInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024, "Maximum in total size of parts to merge, when there are maximum free threads in background pool (or entries in replication queue).") \
|
||||
@ -79,7 +83,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
||||
M(SettingBool, use_minimalistic_part_header_in_zookeeper, false, "Store part header (checksums and columns) in a compact format and a single part znode instead of separate znodes (<part>/columns and <part>/checksums). This can dramatically reduce snapshot size in ZooKeeper. Before enabling check that all replicas support new format.") \
|
||||
M(SettingUInt64, finished_mutations_to_keep, 100, "How many records about mutations that are done to keep. If zero, then keep all of them.") \
|
||||
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \
|
||||
M(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
|
||||
IM(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
|
||||
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \
|
||||
M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)") \
|
||||
M(SettingBool, enable_mixed_granularity_parts, 0, "Enable parts with adaptive and non adaptive granularity") \
|
||||
@ -97,6 +101,14 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
||||
|
||||
/// NOTE: will rewrite the AST to add immutable settings.
|
||||
void loadFromQuery(ASTStorage & storage_def);
|
||||
|
||||
MutablePtr clone() const;
|
||||
private:
|
||||
MergeTreeSettings() = default;
|
||||
MergeTreeSettings(const MergeTreeSettings & o) = default;
|
||||
};
|
||||
|
||||
using MergeTreeSettingsPtr = MergeTreeSettings::Ptr;
|
||||
using MutableMergeTreeSettingsPtr = MergeTreeSettings::MutablePtr;
|
||||
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream(
|
||||
/// Maybe it will make sence to add settings `max_block_size_bytes`
|
||||
if (max_block_size_rows && !storage.canUseAdaptiveGranularity())
|
||||
{
|
||||
size_t fixed_index_granularity = storage.settings.index_granularity;
|
||||
size_t fixed_index_granularity = storage.getCOWSettings()->index_granularity;
|
||||
min_marks_to_read = (min_marks_to_read_ * fixed_index_granularity + max_block_size_rows - 1)
|
||||
/ max_block_size_rows * max_block_size_rows / fixed_index_granularity;
|
||||
}
|
||||
|
@ -27,8 +27,9 @@ ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplic
|
||||
|
||||
void ReplicatedMergeTreeCleanupThread::run()
|
||||
{
|
||||
const auto CLEANUP_SLEEP_MS = storage.settings.cleanup_delay_period * 1000
|
||||
+ std::uniform_int_distribution<UInt64>(0, storage.settings.cleanup_delay_period_random_add * 1000)(rng);
|
||||
auto storage_settings = storage.getCOWSettings();
|
||||
const auto CLEANUP_SLEEP_MS = storage_settings->cleanup_delay_period * 1000
|
||||
+ std::uniform_int_distribution<UInt64>(0, storage_settings->cleanup_delay_period_random_add * 1000)(rng);
|
||||
|
||||
try
|
||||
{
|
||||
@ -74,6 +75,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
|
||||
void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
{
|
||||
auto zookeeper = storage.getZooKeeper();
|
||||
auto storage_settings = storage.getCOWSettings();
|
||||
|
||||
Coordination::Stat stat;
|
||||
if (!zookeeper->exists(storage.zookeeper_path + "/log", &stat))
|
||||
@ -82,7 +84,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
int children_count = stat.numChildren;
|
||||
|
||||
/// We will wait for 1.1 times more records to accumulate than necessary.
|
||||
if (static_cast<double>(children_count) < storage.settings.min_replicated_logs_to_keep * 1.1)
|
||||
if (static_cast<double>(children_count) < storage_settings->min_replicated_logs_to_keep * 1.1)
|
||||
return;
|
||||
|
||||
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
|
||||
@ -100,8 +102,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
std::sort(entries.begin(), entries.end());
|
||||
|
||||
String min_saved_record_log_str = entries[
|
||||
entries.size() > storage.settings.max_replicated_logs_to_keep.value
|
||||
? entries.size() - storage.settings.max_replicated_logs_to_keep.value
|
||||
entries.size() > storage_settings->max_replicated_logs_to_keep
|
||||
? entries.size() - storage_settings->max_replicated_logs_to_keep
|
||||
: 0];
|
||||
|
||||
/// Replicas that were marked is_lost but are active.
|
||||
@ -203,7 +205,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
min_saved_log_pointer = std::min(min_saved_log_pointer, min_log_pointer_lost_candidate);
|
||||
|
||||
/// We will not touch the last `min_replicated_logs_to_keep` records.
|
||||
entries.erase(entries.end() - std::min<UInt64>(entries.size(), storage.settings.min_replicated_logs_to_keep.value), entries.end());
|
||||
entries.erase(entries.end() - std::min<UInt64>(entries.size(), storage_settings->min_replicated_logs_to_keep), entries.end());
|
||||
/// We will not touch records that are no less than `min_saved_log_pointer`.
|
||||
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end());
|
||||
|
||||
@ -285,6 +287,7 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat
|
||||
void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
|
||||
{
|
||||
auto zookeeper = storage.getZooKeeper();
|
||||
auto storage_settings = storage.getCOWSettings();
|
||||
|
||||
std::vector<NodeWithStat> timed_blocks;
|
||||
getBlocksSortedByTime(*zookeeper, timed_blocks);
|
||||
@ -294,12 +297,12 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
|
||||
|
||||
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
|
||||
Int64 current_time = timed_blocks.front().ctime;
|
||||
Int64 time_threshold = std::max(static_cast<Int64>(0), current_time - static_cast<Int64>(1000 * storage.settings.replicated_deduplication_window_seconds));
|
||||
Int64 time_threshold = std::max(static_cast<Int64>(0), current_time - static_cast<Int64>(1000 * storage_settings->replicated_deduplication_window_seconds));
|
||||
|
||||
/// Virtual node, all nodes that are "greater" than this one will be deleted
|
||||
NodeWithStat block_threshold{{}, time_threshold};
|
||||
|
||||
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage.settings.replicated_deduplication_window.value);
|
||||
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage_settings->replicated_deduplication_window);
|
||||
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
|
||||
auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
|
||||
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
|
||||
@ -401,10 +404,11 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
|
||||
|
||||
void ReplicatedMergeTreeCleanupThread::clearOldMutations()
|
||||
{
|
||||
if (!storage.settings.finished_mutations_to_keep)
|
||||
auto storage_settings = storage.getCOWSettings();
|
||||
if (!storage_settings->finished_mutations_to_keep)
|
||||
return;
|
||||
|
||||
if (storage.queue.countFinishedMutations() <= storage.settings.finished_mutations_to_keep)
|
||||
if (storage.queue.countFinishedMutations() <= storage_settings->finished_mutations_to_keep)
|
||||
{
|
||||
/// Not strictly necessary, but helps to avoid unnecessary ZooKeeper requests.
|
||||
/// If even this replica hasn't finished enough mutations yet, then we don't need to clean anything.
|
||||
@ -431,10 +435,10 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations()
|
||||
|
||||
/// Do not remove entries that are greater than `min_pointer` (they are not done yet).
|
||||
entries.erase(std::upper_bound(entries.begin(), entries.end(), padIndex(min_pointer)), entries.end());
|
||||
/// Do not remove last `storage.settings.finished_mutations_to_keep` entries.
|
||||
if (entries.size() <= storage.settings.finished_mutations_to_keep)
|
||||
/// Do not remove last `storage_settings->finished_mutations_to_keep` entries.
|
||||
if (entries.size() <= storage_settings->finished_mutations_to_keep)
|
||||
return;
|
||||
entries.erase(entries.end() - storage.settings.finished_mutations_to_keep, entries.end());
|
||||
entries.erase(entries.end() - storage_settings->finished_mutations_to_keep, entries.end());
|
||||
|
||||
if (entries.empty())
|
||||
return;
|
||||
|
@ -964,7 +964,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
||||
* Setting max_bytes_to_merge_at_max_space_in_pool still working for regular merges,
|
||||
* because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL).
|
||||
*/
|
||||
bool ignore_max_size = (entry.type == LogEntry::MERGE_PARTS) && (max_source_parts_size == data.settings.max_bytes_to_merge_at_max_space_in_pool);
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
bool ignore_max_size = (entry.type == LogEntry::MERGE_PARTS) && (max_source_parts_size == data_settings->max_bytes_to_merge_at_max_space_in_pool);
|
||||
|
||||
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
|
||||
{
|
||||
|
@ -44,11 +44,12 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
|
||||
, log(&Logger::get(log_name))
|
||||
, active_node_identifier(generateActiveNodeIdentifier())
|
||||
{
|
||||
check_period_ms = storage.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000;
|
||||
const auto storage_settings = storage.getCOWSettings();
|
||||
check_period_ms = storage_settings->zookeeper_session_expiration_check_period.totalSeconds() * 1000;
|
||||
|
||||
/// Periodicity of checking lag of replica.
|
||||
if (check_period_ms > static_cast<Int64>(storage.settings.check_delay_period) * 1000)
|
||||
check_period_ms = storage.settings.check_delay_period * 1000;
|
||||
if (check_period_ms > static_cast<Int64>(storage_settings->check_delay_period) * 1000)
|
||||
check_period_ms = storage_settings->check_delay_period * 1000;
|
||||
|
||||
task = storage.global_context.getSchedulePool().createTask(log_name, [this]{ run(); });
|
||||
}
|
||||
@ -121,7 +122,8 @@ void ReplicatedMergeTreeRestartingThread::run()
|
||||
}
|
||||
|
||||
time_t current_time = time(nullptr);
|
||||
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage.settings.check_delay_period))
|
||||
const auto storage_settings = storage.getCOWSettings();
|
||||
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage_settings->check_delay_period))
|
||||
{
|
||||
/// Find out lag of replicas.
|
||||
time_t absolute_delay = 0;
|
||||
@ -136,10 +138,10 @@ void ReplicatedMergeTreeRestartingThread::run()
|
||||
|
||||
/// We give up leadership if the relative lag is greater than threshold.
|
||||
if (storage.is_leader
|
||||
&& relative_delay > static_cast<time_t>(storage.settings.min_relative_delay_to_yield_leadership))
|
||||
&& relative_delay > static_cast<time_t>(storage_settings->min_relative_delay_to_yield_leadership))
|
||||
{
|
||||
LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold ("
|
||||
<< storage.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership.");
|
||||
<< storage_settings->min_relative_delay_to_yield_leadership << "). Will yield leadership.");
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
|
||||
|
||||
@ -169,6 +171,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
||||
activateReplica();
|
||||
|
||||
const auto & zookeeper = storage.getZooKeeper();
|
||||
const auto storage_settings = storage.getCOWSettings();
|
||||
|
||||
storage.cloneReplicaIfNeeded(zookeeper);
|
||||
|
||||
@ -181,7 +184,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
||||
|
||||
updateQuorumIfWeHavePart();
|
||||
|
||||
if (storage.settings.replicated_can_become_leader)
|
||||
if (storage_settings->replicated_can_become_leader)
|
||||
storage.enterLeaderElection();
|
||||
else
|
||||
LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0");
|
||||
|
@ -27,8 +27,9 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
if (data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
date_column = data.minmax_idx_columns[data.minmax_idx_date_column_pos];
|
||||
|
||||
const auto data_settings = data.getCOWSettings();
|
||||
sampling_expression = formattedAST(data.sample_by_ast);
|
||||
index_granularity = data.settings.index_granularity;
|
||||
index_granularity = data_settings->index_granularity;
|
||||
merging_params_mode = static_cast<int>(data.merging_params.mode);
|
||||
sign_column = data.merging_params.sign_column;
|
||||
|
||||
@ -48,7 +49,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
ttl_table = formattedAST(data.ttl_table_ast);
|
||||
skip_indices = data.getIndices().toString();
|
||||
if (data.canUseAdaptiveGranularity())
|
||||
index_granularity_bytes = data.settings.index_granularity_bytes;
|
||||
index_granularity_bytes = data_settings->index_granularity_bytes;
|
||||
else
|
||||
index_granularity_bytes = 0;
|
||||
|
||||
|
@ -574,7 +574,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
ASTPtr sample_by_ast;
|
||||
ASTPtr ttl_table_ast;
|
||||
IndicesDescription indices_description;
|
||||
MergeTreeSettings storage_settings = args.context.getMergeTreeSettings();
|
||||
MutableMergeTreeSettingsPtr storage_settings = MergeTreeSettings::create(args.context.getMergeTreeSettings());
|
||||
|
||||
if (is_extended_storage_def)
|
||||
{
|
||||
@ -602,7 +602,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
indices_description.indices.push_back(
|
||||
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
|
||||
|
||||
storage_settings.loadFromQuery(*args.storage_def);
|
||||
storage_settings->loadFromQuery(*args.storage_def);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -624,7 +624,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
|
||||
const auto * ast = engine_args.back()->as<ASTLiteral>();
|
||||
if (ast && ast->value.getType() == Field::Types::UInt64)
|
||||
storage_settings.index_granularity = safeGet<UInt64>(ast->value);
|
||||
storage_settings->index_granularity = safeGet<UInt64>(ast->value);
|
||||
else
|
||||
throw Exception(
|
||||
"Index granularity must be a positive integer" + getMergeTreeVerboseHelp(is_extended_storage_def),
|
||||
@ -640,13 +640,13 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
zookeeper_path, replica_name, args.attach, args.data_path, args.database_name, args.table_name,
|
||||
args.columns, indices_description, args.constraints,
|
||||
args.context, date_column_name, partition_by_ast, order_by_ast, primary_key_ast,
|
||||
sample_by_ast, ttl_table_ast, merging_params, storage_settings,
|
||||
sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
|
||||
args.has_force_restore_data_flag);
|
||||
else
|
||||
return StorageMergeTree::create(
|
||||
args.data_path, args.database_name, args.table_name, args.columns, indices_description,
|
||||
args.constraints, args.attach, args.context, date_column_name, partition_by_ast, order_by_ast,
|
||||
primary_key_ast, sample_by_ast, ttl_table_ast, merging_params, storage_settings,
|
||||
primary_key_ast, sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
|
||||
args.has_force_restore_data_flag);
|
||||
}
|
||||
|
||||
|
@ -705,7 +705,7 @@ void StorageBuffer::alter(const AlterCommands & params, const String & database_
|
||||
auto new_columns = getColumns();
|
||||
auto new_indices = getIndices();
|
||||
auto new_constraints = getConstraints();
|
||||
params.apply(new_columns);
|
||||
params.applyForColumnsOnly(new_columns);
|
||||
context.getDatabase(database_name_)->alterTable(context, table_name_, new_columns, new_indices, new_constraints, {});
|
||||
setColumns(std::move(new_columns));
|
||||
}
|
||||
|
@ -363,7 +363,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context & c
|
||||
const auto & settings = context.getSettingsRef();
|
||||
|
||||
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
|
||||
if (path.empty() && !owned_cluster && !settings.insert_distributed_sync.value)
|
||||
if (path.empty() && !owned_cluster && !settings.insert_distributed_sync)
|
||||
{
|
||||
throw Exception("Storage " + getName() + " must has own data directory to enable asynchronous inserts",
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
@ -396,7 +396,7 @@ void StorageDistributed::alter(
|
||||
auto new_columns = getColumns();
|
||||
auto new_indices = getIndices();
|
||||
auto new_constraints = getConstraints();
|
||||
params.apply(new_columns);
|
||||
params.applyForColumnsOnly(new_columns);
|
||||
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, {});
|
||||
setColumns(std::move(new_columns));
|
||||
}
|
||||
|
@ -166,8 +166,8 @@ void registerStorageJoin(StorageFactory & factory)
|
||||
args.database_name,
|
||||
args.table_name,
|
||||
key_names,
|
||||
join_use_nulls.value,
|
||||
SizeLimits{max_rows_in_join.value, max_bytes_in_join.value, join_overflow_mode.value},
|
||||
join_use_nulls,
|
||||
SizeLimits{max_rows_in_join, max_bytes_in_join, join_overflow_mode},
|
||||
kind,
|
||||
strictness,
|
||||
args.columns,
|
||||
|
@ -405,7 +405,7 @@ void StorageMerge::alter(
|
||||
auto new_columns = getColumns();
|
||||
auto new_indices = getIndices();
|
||||
auto new_constraints = getConstraints();
|
||||
params.apply(new_columns);
|
||||
params.applyForColumnsOnly(new_columns);
|
||||
context.getDatabase(database_name_)->alterTable(context, table_name_, new_columns, new_indices, new_constraints, {});
|
||||
setColumns(new_columns);
|
||||
}
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTPartition.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/ActiveDataPartSet.h>
|
||||
@ -36,6 +37,7 @@ namespace ErrorCodes
|
||||
extern const int INCORRECT_FILE_NAME;
|
||||
extern const int CANNOT_ASSIGN_OPTIMIZE;
|
||||
extern const int INCOMPATIBLE_COLUMNS;
|
||||
extern const int UNKNOWN_SETTING;
|
||||
}
|
||||
|
||||
namespace ActionLocks
|
||||
@ -61,7 +63,7 @@ StorageMergeTree::StorageMergeTree(
|
||||
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
|
||||
const ASTPtr & ttl_table_ast_,
|
||||
const MergingParams & merging_params_,
|
||||
const MergeTreeSettings & settings_,
|
||||
MergeTreeSettingsPtr settings_,
|
||||
bool has_force_restore_data_flag)
|
||||
: MergeTreeData(database_name_, table_name_,
|
||||
path_ + escapeForFileName(table_name_) + '/',
|
||||
@ -250,11 +252,23 @@ void StorageMergeTree::alter(
|
||||
{
|
||||
if (!params.isMutable())
|
||||
{
|
||||
SettingsChanges new_changes;
|
||||
/// We don't need to lock table structure exclusively to ALTER settings.
|
||||
if (params.isSettingsAlter())
|
||||
{
|
||||
params.applyForSettingsOnly(new_changes);
|
||||
alterSettings(new_changes, current_database_name, current_table_name, context, table_lock_holder);
|
||||
return;
|
||||
}
|
||||
|
||||
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
|
||||
auto new_columns = getColumns();
|
||||
auto new_indices = getIndices();
|
||||
auto new_constraints = getConstraints();
|
||||
params.apply(new_columns);
|
||||
ASTPtr new_order_by_ast = order_by_ast;
|
||||
ASTPtr new_primary_key_ast = primary_key_ast;
|
||||
ASTPtr new_ttl_table_ast = ttl_table_ast;
|
||||
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
|
||||
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, {});
|
||||
setColumns(std::move(new_columns));
|
||||
return;
|
||||
@ -271,7 +285,8 @@ void StorageMergeTree::alter(
|
||||
ASTPtr new_order_by_ast = order_by_ast;
|
||||
ASTPtr new_primary_key_ast = primary_key_ast;
|
||||
ASTPtr new_ttl_table_ast = ttl_table_ast;
|
||||
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast);
|
||||
SettingsChanges new_changes;
|
||||
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
|
||||
|
||||
auto transactions = prepareAlterTransactions(new_columns, new_indices, context);
|
||||
|
||||
@ -789,14 +804,15 @@ Int64 StorageMergeTree::getCurrentMutationVersion(
|
||||
|
||||
void StorageMergeTree::clearOldMutations()
|
||||
{
|
||||
if (!settings.finished_mutations_to_keep)
|
||||
const auto settings = getCOWSettings();
|
||||
if (!settings->finished_mutations_to_keep)
|
||||
return;
|
||||
|
||||
std::vector<MergeTreeMutationEntry> mutations_to_delete;
|
||||
{
|
||||
std::lock_guard lock(currently_merging_mutex);
|
||||
|
||||
if (current_mutations_by_version.size() <= settings.finished_mutations_to_keep)
|
||||
if (current_mutations_by_version.size() <= settings->finished_mutations_to_keep)
|
||||
return;
|
||||
|
||||
auto begin_it = current_mutations_by_version.begin();
|
||||
@ -807,10 +823,10 @@ void StorageMergeTree::clearOldMutations()
|
||||
end_it = current_mutations_by_version.upper_bound(*min_version);
|
||||
|
||||
size_t done_count = std::distance(begin_it, end_it);
|
||||
if (done_count <= settings.finished_mutations_to_keep)
|
||||
if (done_count <= settings->finished_mutations_to_keep)
|
||||
return;
|
||||
|
||||
size_t to_delete_count = done_count - settings.finished_mutations_to_keep;
|
||||
size_t to_delete_count = done_count - settings->finished_mutations_to_keep;
|
||||
|
||||
auto it = begin_it;
|
||||
for (size_t i = 0; i < to_delete_count; ++i)
|
||||
@ -849,7 +865,10 @@ void StorageMergeTree::clearColumnOrIndexInPartition(const ASTPtr & partition, c
|
||||
ASTPtr ignored_order_by_ast;
|
||||
ASTPtr ignored_primary_key_ast;
|
||||
ASTPtr ignored_ttl_table_ast;
|
||||
alter_command.apply(new_columns, new_indices, new_constraints, ignored_order_by_ast, ignored_primary_key_ast, ignored_ttl_table_ast);
|
||||
SettingsChanges ignored_settings_changes;
|
||||
|
||||
alter_command.apply(new_columns, new_indices, new_constraints, ignored_order_by_ast,
|
||||
ignored_primary_key_ast, ignored_ttl_table_ast, ignored_settings_changes);
|
||||
|
||||
auto columns_for_parts = new_columns.getAllPhysical();
|
||||
for (const auto & part : parts)
|
||||
|
@ -130,6 +130,7 @@ private:
|
||||
friend struct CurrentlyMergingPartsTagger;
|
||||
|
||||
protected:
|
||||
|
||||
/** Attach the table with the appropriate name, along the appropriate path (with / at the end),
|
||||
* (correctness of names and paths are not checked)
|
||||
* consisting of the specified columns.
|
||||
@ -152,7 +153,7 @@ protected:
|
||||
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
|
||||
const ASTPtr & ttl_table_ast_,
|
||||
const MergingParams & merging_params_,
|
||||
const MergeTreeSettings & settings_,
|
||||
MergeTreeSettingsPtr settings_,
|
||||
bool has_force_restore_data_flag);
|
||||
};
|
||||
|
||||
|
@ -39,7 +39,7 @@ void StorageNull::alter(
|
||||
ColumnsDescription new_columns = getColumns();
|
||||
IndicesDescription new_indices = getIndices();
|
||||
ConstraintsDescription new_constraints = getConstraints();
|
||||
params.apply(new_columns);
|
||||
params.applyForColumnsOnly(new_columns);
|
||||
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, {});
|
||||
setColumns(std::move(new_columns));
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
const ASTPtr & sample_by_ast_,
|
||||
const ASTPtr & ttl_table_ast_,
|
||||
const MergingParams & merging_params_,
|
||||
const MergeTreeSettings & settings_,
|
||||
MergeTreeSettingsPtr settings_,
|
||||
bool has_force_restore_data_flag)
|
||||
: MergeTreeData(database_name_, table_name_,
|
||||
path_ + escapeForFileName(table_name_) + '/',
|
||||
@ -376,7 +376,7 @@ void StorageReplicatedMergeTree::createTableIfNotExists()
|
||||
}
|
||||
|
||||
|
||||
/** Verify that list of columns and table settings match those specified in ZK (/ metadata).
|
||||
/** Verify that list of columns and table storage_settings match those specified in ZK (/ metadata).
|
||||
* If not, throw an exception.
|
||||
*/
|
||||
void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bool allow_alter)
|
||||
@ -637,7 +637,8 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
|
||||
for (const auto & part : parts)
|
||||
total_rows_on_filesystem += part->rows_count;
|
||||
|
||||
bool insane = unexpected_parts_rows > total_rows_on_filesystem * settings.replicated_max_ratio_of_wrong_parts;
|
||||
const auto storage_settings = getCOWSettings();
|
||||
bool insane = unexpected_parts_rows > total_rows_on_filesystem * storage_settings->replicated_max_ratio_of_wrong_parts;
|
||||
|
||||
if (insane && !skip_sanity_checks)
|
||||
{
|
||||
@ -780,12 +781,13 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
|
||||
|
||||
if (!has_been_already_added)
|
||||
{
|
||||
const auto storage_settings = getCOWSettings();
|
||||
String part_path = replica_path + "/parts/" + part_name;
|
||||
|
||||
ops.emplace_back(zkutil::makeCheckRequest(
|
||||
zookeeper_path + "/columns", expected_columns_version));
|
||||
|
||||
if (settings.use_minimalistic_part_header_in_zookeeper)
|
||||
if (storage_settings->use_minimalistic_part_header_in_zookeeper)
|
||||
{
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
part_path, local_part_header.toString(), zkutil::CreateMode::Persistent));
|
||||
@ -862,7 +864,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
|
||||
String StorageReplicatedMergeTree::getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const
|
||||
{
|
||||
return MinimalisticDataPartChecksums::getSerializedString(checksums,
|
||||
static_cast<bool>(settings.use_minimalistic_checksums_in_zookeeper));
|
||||
getCOWSettings()->use_minimalistic_checksums_in_zookeeper);
|
||||
}
|
||||
|
||||
|
||||
@ -1033,13 +1035,14 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
|
||||
parts.push_back(part);
|
||||
}
|
||||
|
||||
const auto storage_settings = getCOWSettings();
|
||||
if (!have_all_parts)
|
||||
{
|
||||
/// If you do not have all the necessary parts, try to take some already merged part from someone.
|
||||
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
|
||||
return false;
|
||||
}
|
||||
else if (entry.create_time + settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
|
||||
else if (entry.create_time + storage_settings->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
|
||||
{
|
||||
/// If entry is old enough, and have enough size, and part are exists in any replica,
|
||||
/// then prefer fetching of merged part from replica.
|
||||
@ -1048,7 +1051,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
|
||||
for (const auto & part : parts)
|
||||
sum_parts_bytes_on_disk += part->bytes_on_disk;
|
||||
|
||||
if (sum_parts_bytes_on_disk >= settings.prefer_fetch_merged_part_size_threshold)
|
||||
if (sum_parts_bytes_on_disk >= storage_settings->prefer_fetch_merged_part_size_threshold)
|
||||
{
|
||||
String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove.
|
||||
if (!replica.empty())
|
||||
@ -1158,6 +1161,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
|
||||
bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry)
|
||||
{
|
||||
const String & source_part_name = entry.source_parts.at(0);
|
||||
const auto storage_settings = getCOWSettings();
|
||||
LOG_TRACE(log, "Executing log entry to mutate part " << source_part_name << " to " << entry.new_part_name);
|
||||
|
||||
DataPartPtr source_part = getActiveContainingPart(source_part_name);
|
||||
@ -1177,8 +1181,8 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
|
||||
/// TODO - some better heuristic?
|
||||
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part});
|
||||
|
||||
if (entry.create_time + settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
|
||||
&& estimated_space_for_result >= settings.prefer_fetch_merged_part_size_threshold)
|
||||
if (entry.create_time + storage_settings->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
|
||||
&& estimated_space_for_result >= storage_settings->prefer_fetch_merged_part_size_threshold)
|
||||
{
|
||||
/// If entry is old enough, and have enough size, and some replica has the desired part,
|
||||
/// then prefer fetching from replica.
|
||||
@ -1272,20 +1276,21 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
|
||||
bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
||||
{
|
||||
String replica = findReplicaHavingCoveringPart(entry, true);
|
||||
const auto storage_settings = getCOWSettings();
|
||||
|
||||
static std::atomic_uint total_fetches {0};
|
||||
if (settings.replicated_max_parallel_fetches && total_fetches >= settings.replicated_max_parallel_fetches)
|
||||
if (storage_settings->replicated_max_parallel_fetches && total_fetches >= storage_settings->replicated_max_parallel_fetches)
|
||||
{
|
||||
throw Exception("Too many total fetches from replicas, maximum: " + settings.replicated_max_parallel_fetches.toString(),
|
||||
throw Exception("Too many total fetches from replicas, maximum: " + storage_settings->replicated_max_parallel_fetches.toString(),
|
||||
ErrorCodes::TOO_MANY_FETCHES);
|
||||
}
|
||||
|
||||
++total_fetches;
|
||||
SCOPE_EXIT({--total_fetches;});
|
||||
|
||||
if (settings.replicated_max_parallel_fetches_for_table && current_table_fetches >= settings.replicated_max_parallel_fetches_for_table)
|
||||
if (storage_settings->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings->replicated_max_parallel_fetches_for_table)
|
||||
{
|
||||
throw Exception("Too many fetches from replicas for table, maximum: " + settings.replicated_max_parallel_fetches_for_table.toString(),
|
||||
throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings->replicated_max_parallel_fetches_for_table.toString(),
|
||||
ErrorCodes::TOO_MANY_FETCHES);
|
||||
}
|
||||
|
||||
@ -1528,7 +1533,8 @@ void StorageReplicatedMergeTree::executeClearColumnOrIndexInPartition(const LogE
|
||||
ASTPtr ignored_order_by_ast;
|
||||
ASTPtr ignored_primary_key_ast;
|
||||
ASTPtr ignored_ttl_table_ast;
|
||||
alter_command.apply(new_columns, new_indices, new_constraints, ignored_order_by_ast, ignored_primary_key_ast, ignored_ttl_table_ast);
|
||||
SettingsChanges ignored_changes;
|
||||
alter_command.apply(new_columns, new_indices, new_constraints, ignored_order_by_ast, ignored_primary_key_ast, ignored_ttl_table_ast, ignored_changes);
|
||||
|
||||
size_t modified_parts = 0;
|
||||
auto parts = getDataParts();
|
||||
@ -2207,6 +2213,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
if (!is_leader)
|
||||
return;
|
||||
|
||||
const auto storage_settings = getCOWSettings();
|
||||
const bool deduplicate = false; /// TODO: read deduplicate option from table config
|
||||
const bool force_ttl = false;
|
||||
|
||||
@ -2225,18 +2232,19 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
/// If many merges is already queued, then will queue only small enough merges.
|
||||
/// Otherwise merge queue could be filled with only large merges,
|
||||
/// and in the same time, many small parts could be created and won't be merged.
|
||||
|
||||
auto merges_and_mutations_queued = queue.countMergesAndPartMutations();
|
||||
size_t merges_and_mutations_sum = merges_and_mutations_queued.first + merges_and_mutations_queued.second;
|
||||
if (merges_and_mutations_sum >= settings.max_replicated_merges_in_queue)
|
||||
if (merges_and_mutations_sum >= storage_settings->max_replicated_merges_in_queue)
|
||||
{
|
||||
LOG_TRACE(log, "Number of queued merges (" << merges_and_mutations_queued.first << ") and part mutations ("
|
||||
<< merges_and_mutations_queued.second << ") is greater than max_replicated_merges_in_queue ("
|
||||
<< settings.max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate.");
|
||||
<< storage_settings->max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate.");
|
||||
}
|
||||
else
|
||||
{
|
||||
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
|
||||
settings.max_replicated_merges_in_queue, merges_and_mutations_sum);
|
||||
storage_settings->max_replicated_merges_in_queue, merges_and_mutations_sum);
|
||||
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
|
||||
|
||||
FutureMergedMutatedPart future_merged_part;
|
||||
@ -2248,7 +2256,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
}
|
||||
/// If there are many mutations in queue it may happen, that we cannot enqueue enough merges to merge all new parts
|
||||
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0
|
||||
&& merges_and_mutations_queued.second < settings.max_replicated_mutations_in_queue)
|
||||
&& merges_and_mutations_queued.second < storage_settings->max_replicated_mutations_in_queue)
|
||||
{
|
||||
/// Choose a part to mutate.
|
||||
DataPartsVector data_parts = getDataPartsVector();
|
||||
@ -3021,10 +3029,11 @@ void StorageReplicatedMergeTree::assertNotReadonly() const
|
||||
|
||||
BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const Context & context)
|
||||
{
|
||||
const auto storage_settings = getCOWSettings();
|
||||
assertNotReadonly();
|
||||
|
||||
const Settings & query_settings = context.getSettingsRef();
|
||||
bool deduplicate = settings.replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
|
||||
bool deduplicate = storage_settings->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
|
||||
|
||||
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this,
|
||||
query_settings.insert_quorum, query_settings.insert_quorum_timeout.totalMilliseconds(), query_settings.max_partitions_per_insert_block, deduplicate);
|
||||
@ -3058,6 +3067,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
|
||||
};
|
||||
|
||||
bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL()));
|
||||
const auto storage_settings = getCOWSettings();
|
||||
|
||||
if (!partition && final)
|
||||
{
|
||||
@ -3090,7 +3100,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
|
||||
if (!partition)
|
||||
{
|
||||
selected = merger_mutator.selectPartsToMerge(
|
||||
future_merged_part, true, settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
|
||||
future_merged_part, true, storage_settings->max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -3132,13 +3142,24 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::alter(
|
||||
const AlterCommands & params, const String & /*database_name*/, const String & /*table_name*/,
|
||||
const AlterCommands & params, const String & current_database_name, const String & current_table_name,
|
||||
const Context & query_context, TableStructureWriteLockHolder & table_lock_holder)
|
||||
{
|
||||
assertNotReadonly();
|
||||
|
||||
LOG_DEBUG(log, "Doing ALTER");
|
||||
|
||||
if (params.isSettingsAlter())
|
||||
{
|
||||
/// We don't replicate storage_settings ALTER. It's local operation.
|
||||
/// Also we don't upgrade alter lock to table structure lock.
|
||||
LOG_DEBUG(log, "ALTER storage_settings only");
|
||||
SettingsChanges new_changes;
|
||||
params.applyForSettingsOnly(new_changes);
|
||||
alterSettings(new_changes, current_database_name, current_table_name, query_context, table_lock_holder);
|
||||
return;
|
||||
}
|
||||
|
||||
/// Alter is done by modifying the metadata nodes in ZK that are shared between all replicas
|
||||
/// (/columns, /metadata). We set contents of the shared nodes to the new values and wait while
|
||||
/// replicas asynchronously apply changes (see ReplicatedMergeTreeAlterThread.cpp) and modify
|
||||
@ -3182,7 +3203,8 @@ void StorageReplicatedMergeTree::alter(
|
||||
ASTPtr new_order_by_ast = order_by_ast;
|
||||
ASTPtr new_primary_key_ast = primary_key_ast;
|
||||
ASTPtr new_ttl_table_ast = ttl_table_ast;
|
||||
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast);
|
||||
SettingsChanges new_changes;
|
||||
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
|
||||
|
||||
String new_columns_str = new_columns.toString();
|
||||
if (new_columns_str != getColumns().toString())
|
||||
@ -3926,9 +3948,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
|
||||
void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
|
||||
{
|
||||
auto zookeeper = tryGetZooKeeper();
|
||||
const auto storage_settings = getCOWSettings();
|
||||
|
||||
res.is_leader = is_leader;
|
||||
res.can_become_leader = settings.replicated_can_become_leader;
|
||||
res.can_become_leader = storage_settings->replicated_can_become_leader;
|
||||
res.is_readonly = is_readonly;
|
||||
res.is_session_expired = !zookeeper || zookeeper->expired();
|
||||
|
||||
@ -4118,13 +4141,14 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
|
||||
|
||||
out_absolute_delay = getAbsoluteDelay();
|
||||
out_relative_delay = 0;
|
||||
const auto storage_settings = getCOWSettings();
|
||||
|
||||
/** Relative delay is the maximum difference of absolute delay from any other replica,
|
||||
* (if this replica lags behind any other live replica, or zero, otherwise).
|
||||
* Calculated only if the absolute delay is large enough.
|
||||
*/
|
||||
|
||||
if (out_absolute_delay < static_cast<time_t>(settings.min_relative_delay_to_yield_leadership))
|
||||
if (out_absolute_delay < static_cast<time_t>(storage_settings->min_relative_delay_to_yield_leadership))
|
||||
return;
|
||||
|
||||
auto zookeeper = getZooKeeper();
|
||||
@ -4382,7 +4406,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
|
||||
/// instead.
|
||||
///
|
||||
/// Mutations of individual parts are in fact pretty similar to merges, e.g. their assignment and execution
|
||||
/// is governed by the same settings. TODO: support a single "merge-mutation" operation when the data
|
||||
/// is governed by the same storage_settings. TODO: support a single "merge-mutation" operation when the data
|
||||
/// read from the the source parts is first mutated on the fly to some uniform mutation version and then
|
||||
/// merged to a resulting part.
|
||||
///
|
||||
@ -4945,6 +4969,7 @@ void StorageReplicatedMergeTree::getCommitPartOps(
|
||||
const String & block_id_path) const
|
||||
{
|
||||
const String & part_name = part->name;
|
||||
const auto storage_settings = getCOWSettings();
|
||||
|
||||
if (!block_id_path.empty())
|
||||
{
|
||||
@ -4962,7 +4987,7 @@ void StorageReplicatedMergeTree::getCommitPartOps(
|
||||
zookeeper_path + "/columns",
|
||||
columns_version));
|
||||
|
||||
if (settings.use_minimalistic_part_header_in_zookeeper)
|
||||
if (storage_settings->use_minimalistic_part_header_in_zookeeper)
|
||||
{
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
replica_path + "/parts/" + part->name,
|
||||
@ -4991,11 +5016,12 @@ void StorageReplicatedMergeTree::updatePartHeaderInZooKeeperAndCommit(
|
||||
AlterDataPartTransaction & transaction)
|
||||
{
|
||||
String part_path = replica_path + "/parts/" + transaction.getPartName();
|
||||
const auto storage_settings = getCOWSettings();
|
||||
|
||||
bool need_delete_columns_and_checksums_nodes = false;
|
||||
try
|
||||
{
|
||||
if (settings.use_minimalistic_part_header_in_zookeeper)
|
||||
if (storage_settings->use_minimalistic_part_header_in_zookeeper)
|
||||
{
|
||||
auto part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
|
||||
transaction.getNewColumns(), transaction.getNewChecksums());
|
||||
@ -5175,8 +5201,9 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, const C
|
||||
|
||||
bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const
|
||||
{
|
||||
return settings.index_granularity_bytes != 0 &&
|
||||
(settings.enable_mixed_granularity_parts ||
|
||||
const auto storage_settings = getCOWSettings();
|
||||
return storage_settings->index_granularity_bytes != 0 &&
|
||||
(storage_settings->enable_mixed_granularity_parts ||
|
||||
(!has_non_adaptive_index_granularity_parts && !other_replicas_fixed_granularity));
|
||||
}
|
||||
|
||||
|
@ -540,7 +540,7 @@ protected:
|
||||
const ASTPtr & sample_by_ast_,
|
||||
const ASTPtr & table_ttl_ast_,
|
||||
const MergingParams & merging_params_,
|
||||
const MergeTreeSettings & settings_,
|
||||
MergeTreeSettingsPtr settings_,
|
||||
bool has_force_restore_data_flag);
|
||||
};
|
||||
|
||||
|
@ -0,0 +1 @@
|
||||
1
|
32
dbms/tests/queries/0_stateless/00980_alter_settings_race.sh
Executable file
32
dbms/tests/queries/0_stateless/00980_alter_settings_race.sh
Executable file
@ -0,0 +1,32 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS table_for_concurrent_alter"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="CREATE TABLE table_for_concurrent_alter (id UInt64, Data String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity=4096;";
|
||||
|
||||
n=0
|
||||
while [ "$n" -lt 100 ];
|
||||
do
|
||||
n=$(( n + 1 ))
|
||||
$CLICKHOUSE_CLIENT --query="INSERT INTO table_for_concurrent_alter VALUES(1, 'Hello')" > /dev/null 2> /dev/null &
|
||||
$CLICKHOUSE_CLIENT --query="OPTIMIZE TABLE table_for_concurrent_alter FINAL" > /dev/null 2> /dev/null &
|
||||
done &
|
||||
|
||||
|
||||
q=0
|
||||
while [ "$q" -lt 100 ];
|
||||
do
|
||||
q=$(( q + 1 ))
|
||||
counter=$(( 100 + q ))
|
||||
$CLICKHOUSE_CLIENT --query="ALTER TABLE table_for_concurrent_alter MODIFY SETTING parts_to_throw_insert = $counter, parts_to_delay_insert = $counter, min_merge_bytes_to_use_direct_io = $counter" > /dev/null 2> /dev/null &
|
||||
done &
|
||||
|
||||
sleep 4
|
||||
|
||||
# we just test race conditions, not logic
|
||||
$CLICKHOUSE_CLIENT --query "SELECT 1"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS table_for_concurrent_alter"
|
@ -0,0 +1,5 @@
|
||||
CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096
|
||||
CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 1, parts_to_delay_insert = 1
|
||||
CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 100, parts_to_delay_insert = 100
|
||||
2
|
||||
CREATE TABLE default.table_for_alter (`id` UInt64, `Data` String) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 4096, parts_to_throw_insert = 100, parts_to_delay_insert = 100, check_delay_period = 30
|
@ -0,0 +1,51 @@
|
||||
DROP TABLE IF EXISTS log_for_alter;
|
||||
|
||||
CREATE TABLE log_for_alter (
|
||||
id UInt64,
|
||||
Data String
|
||||
) ENGINE = Log();
|
||||
|
||||
ALTER TABLE log_for_alter MODIFY SETTING aaa=123; -- { serverError 471 }
|
||||
|
||||
DROP TABLE IF EXISTS log_for_alter;
|
||||
|
||||
DROP TABLE IF EXISTS table_for_alter;
|
||||
|
||||
CREATE TABLE table_for_alter (
|
||||
id UInt64,
|
||||
Data String
|
||||
) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity=4096;
|
||||
|
||||
ALTER TABLE table_for_alter MODIFY SETTING index_granularity=555; -- { serverError 472 }
|
||||
|
||||
SHOW CREATE TABLE table_for_alter;
|
||||
|
||||
ALTER TABLE table_for_alter MODIFY SETTING parts_to_throw_insert = 1, parts_to_delay_insert = 1;
|
||||
|
||||
SHOW CREATE TABLE table_for_alter;
|
||||
|
||||
INSERT INTO table_for_alter VALUES (1, '1');
|
||||
INSERT INTO table_for_alter VALUES (2, '2'); -- { serverError 252 }
|
||||
|
||||
DETACH TABLE table_for_alter;
|
||||
|
||||
ATTACH TABLE table_for_alter;
|
||||
|
||||
INSERT INTO table_for_alter VALUES (2, '2'); -- { serverError 252 }
|
||||
|
||||
ALTER TABLE table_for_alter MODIFY SETTING xxx_yyy=124; -- { serverError 115 }
|
||||
|
||||
ALTER TABLE table_for_alter MODIFY SETTING parts_to_throw_insert = 100, parts_to_delay_insert = 100;
|
||||
|
||||
INSERT INTO table_for_alter VALUES (2, '2');
|
||||
|
||||
SHOW CREATE TABLE table_for_alter;
|
||||
|
||||
SELECT COUNT() FROM table_for_alter;
|
||||
|
||||
ALTER TABLE table_for_alter MODIFY SETTING check_delay_period=10, check_delay_period=20, check_delay_period=30;
|
||||
|
||||
SHOW CREATE TABLE table_for_alter;
|
||||
|
||||
DROP TABLE IF EXISTS table_for_alter;
|
||||
|
@ -0,0 +1,10 @@
|
||||
CREATE TABLE default.replicated_table_for_alter1 (`id` UInt64, `Data` String) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'1\') ORDER BY id SETTINGS index_granularity = 8192
|
||||
CREATE TABLE default.replicated_table_for_alter1 (`id` UInt64, `Data` String) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'1\') ORDER BY id SETTINGS index_granularity = 8192
|
||||
4
|
||||
4
|
||||
4
|
||||
4
|
||||
6
|
||||
6
|
||||
CREATE TABLE default.replicated_table_for_alter1 (`id` UInt64, `Data` String) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'1\') ORDER BY id SETTINGS index_granularity = 8192, use_minimalistic_part_header_in_zookeeper = 1
|
||||
CREATE TABLE default.replicated_table_for_alter2 (`id` UInt64, `Data` String) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/replicated_table_for_alter\', \'2\') ORDER BY id SETTINGS index_granularity = 8192, parts_to_throw_insert = 1, parts_to_delay_insert = 1
|
@ -0,0 +1,62 @@
|
||||
DROP TABLE IF EXISTS replicated_table_for_alter1;
|
||||
DROP TABLE IF EXISTS replicated_table_for_alter2;
|
||||
|
||||
CREATE TABLE replicated_table_for_alter1 (
|
||||
id UInt64,
|
||||
Data String
|
||||
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_table_for_alter', '1') ORDER BY id;
|
||||
|
||||
CREATE TABLE replicated_table_for_alter2 (
|
||||
id UInt64,
|
||||
Data String
|
||||
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_table_for_alter', '2') ORDER BY id;
|
||||
|
||||
SHOW CREATE TABLE replicated_table_for_alter1;
|
||||
|
||||
ALTER TABLE replicated_table_for_alter1 MODIFY SETTING index_granularity = 4096; -- { serverError 472 }
|
||||
|
||||
SHOW CREATE TABLE replicated_table_for_alter1;
|
||||
|
||||
INSERT INTO replicated_table_for_alter2 VALUES (1, '1'), (2, '2');
|
||||
|
||||
SYSTEM SYNC REPLICA replicated_table_for_alter1;
|
||||
|
||||
ALTER TABLE replicated_table_for_alter1 MODIFY SETTING use_minimalistic_part_header_in_zookeeper = 1;
|
||||
|
||||
INSERT INTO replicated_table_for_alter1 VALUES (3, '3'), (4, '4');
|
||||
|
||||
SYSTEM SYNC REPLICA replicated_table_for_alter2;
|
||||
|
||||
SELECT COUNT() FROM replicated_table_for_alter1;
|
||||
SELECT COUNT() FROM replicated_table_for_alter2;
|
||||
|
||||
DETACH TABLE replicated_table_for_alter2;
|
||||
ATTACH TABLE replicated_table_for_alter2;
|
||||
|
||||
DETACH TABLE replicated_table_for_alter1;
|
||||
ATTACH TABLE replicated_table_for_alter1;
|
||||
|
||||
SELECT COUNT() FROM replicated_table_for_alter1;
|
||||
SELECT COUNT() FROM replicated_table_for_alter2;
|
||||
|
||||
ALTER TABLE replicated_table_for_alter2 MODIFY SETTING parts_to_throw_insert = 1, parts_to_delay_insert = 1;
|
||||
INSERT INTO replicated_table_for_alter2 VALUES (3, '1'), (4, '2'); -- { serverError 252 }
|
||||
|
||||
INSERT INTO replicated_table_for_alter1 VALUES (5, '5'), (6, '6');
|
||||
|
||||
SYSTEM SYNC REPLICA replicated_table_for_alter2;
|
||||
|
||||
SELECT COUNT() FROM replicated_table_for_alter1;
|
||||
SELECT COUNT() FROM replicated_table_for_alter2;
|
||||
|
||||
DETACH TABLE replicated_table_for_alter2;
|
||||
ATTACH TABLE replicated_table_for_alter2;
|
||||
|
||||
DETACH TABLE replicated_table_for_alter1;
|
||||
ATTACH TABLE replicated_table_for_alter1;
|
||||
|
||||
SHOW CREATE TABLE replicated_table_for_alter1;
|
||||
SHOW CREATE TABLE replicated_table_for_alter2;
|
||||
|
||||
DROP TABLE IF EXISTS replicated_table_for_alter2;
|
||||
DROP TABLE IF EXISTS replicated_table_for_alter1;
|
Loading…
Reference in New Issue
Block a user