Resolve conflicts with master, support reset setting

This commit is contained in:
kssenii 2024-10-24 20:17:47 +02:00
parent c83c282dda
commit 0eddccbcc5
9 changed files with 200 additions and 52 deletions

View File

@ -292,12 +292,12 @@ void ObjectStorageQueueMetadata::alterSettings(const SettingsChanges & changes)
}
new_table_metadata.tracked_files_limit = value;
}
else if (endsWith(change.name, "tracked_files_ttl_sec"))
else if (endsWith(change.name, "tracked_file_ttl_sec"))
{
const auto value = change.value.safeGet<UInt64>();
if (table_metadata.tracked_files_ttl_sec == value)
{
LOG_TRACE(log, "Setting `tracked_files_ttl_sec` already equals {}. "
LOG_TRACE(log, "Setting `tracked_file_ttl_sec` already equals {}. "
"Will do nothing", value);
return;
}

View File

@ -23,15 +23,15 @@ namespace ErrorCodes
0) \
DECLARE(ObjectStorageQueueAction, after_processing, ObjectStorageQueueAction::KEEP, "Delete or keep file in after successful processing", 0) \
DECLARE(String, keeper_path, "", "Zookeeper node path", 0) \
DECLARE(UInt32, loading_retries, 10, "Retry loading up to specified number of times", 0) \
DECLARE(UInt32, processing_threads_num, 1, "Number of processing threads", 0) \
DECLARE(UInt64, loading_retries, 10, "Retry loading up to specified number of times", 0) \
DECLARE(UInt64, processing_threads_num, 1, "Number of processing threads", 0) \
DECLARE(UInt32, enable_logging_to_queue_log, 1, "Enable logging to system table system.(s3/azure_)queue_log", 0) \
DECLARE(String, last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
DECLARE(UInt32, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
DECLARE(UInt64, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
DECLARE(UInt64, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
DECLARE(UInt32, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
DECLARE(UInt32, polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
DECLARE(UInt32, polling_backoff_ms, 1000, "Polling backoff", 0) \
DECLARE(UInt32, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
DECLARE(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
DECLARE(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
DECLARE(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
@ -112,6 +112,11 @@ ObjectStorageQueueSettings::~ObjectStorageQueueSettings() = default;
OBJECT_STORAGE_QUEUE_SETTINGS_SUPPORTED_TYPES(ObjectStorageQueueSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR)
void ObjectStorageQueueSettings::applyChanges(const SettingsChanges & changes)
{
impl->applyChanges(changes);
}
void ObjectStorageQueueSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
@ -156,4 +161,9 @@ void ObjectStorageQueueSettings::loadFromQuery(ASTStorage & storage_def)
}
}
Field ObjectStorageQueueSettings::get(const std::string & name)
{
return impl->get(name);
}
}

View File

@ -12,6 +12,7 @@ class ASTStorage;
struct ObjectStorageQueueSettingsImpl;
struct MutableColumnsAndConstraints;
class StorageObjectStorageQueue;
class SettingsChanges;
/// List of available types supported in ObjectStorageQueueSettings object
#define OBJECT_STORAGE_QUEUE_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
@ -61,6 +62,10 @@ struct ObjectStorageQueueSettings
void loadFromQuery(ASTStorage & storage_def);
void applyChanges(const SettingsChanges & changes);
Field get(const std::string & name);
private:
std::unique_ptr<ObjectStorageQueueSettingsImpl> impl;
};

View File

@ -657,7 +657,7 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio
void ObjectStorageQueueSource::applyActionAfterProcessing(const String & path)
{
if (files_metadata->getTableMetadata().after_processing == "delete")
if (files_metadata->getTableMetadata().after_processing == ObjectStorageQueueAction::DELETE)
{
object_storage->removeObject(StoredObject(path));
}

View File

@ -17,11 +17,11 @@ namespace ObjectStorageQueueSetting
extern const ObjectStorageQueueSettingsObjectStorageQueueAction after_processing;
extern const ObjectStorageQueueSettingsUInt32 buckets;
extern const ObjectStorageQueueSettingsString last_processed_path;
extern const ObjectStorageQueueSettingsUInt32 loading_retries;
extern const ObjectStorageQueueSettingsObjectStorageQueueMode mode;
extern const ObjectStorageQueueSettingsUInt32 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt32 tracked_files_limit;
extern const ObjectStorageQueueSettingsUInt32 tracked_file_ttl_sec;
extern const ObjectStorageQueueSettingsUInt64 loading_retries;
extern const ObjectStorageQueueSettingsUInt64 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt64 tracked_files_limit;
extern const ObjectStorageQueueSettingsUInt64 tracked_file_ttl_sec;
}
@ -56,13 +56,13 @@ ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(
const std::string & format_)
: format_name(format_)
, columns(columns_.toString())
, mode(engine_settings.mode.toString())
, buckets(engine_settings.buckets)
, last_processed_path(engine_settings.last_processed_path)
, after_processing(engine_settings.after_processing)
, loading_retries(engine_settings.loading_retries)
, tracked_files_limit(engine_settings.tracked_files_limit)
, tracked_files_ttl_sec(engine_settings.tracked_file_ttl_sec)
, mode(engine_settings[ObjectStorageQueueSetting::mode].toString())
, buckets(engine_settings[ObjectStorageQueueSetting::buckets])
, last_processed_path(engine_settings[ObjectStorageQueueSetting::last_processed_path])
, after_processing(engine_settings[ObjectStorageQueueSetting::after_processing])
, loading_retries(engine_settings[ObjectStorageQueueSetting::loading_retries])
, tracked_files_limit(engine_settings[ObjectStorageQueueSetting::tracked_files_limit])
, tracked_files_ttl_sec(engine_settings[ObjectStorageQueueSetting::tracked_file_ttl_sec])
{
processing_threads_num_changed = engine_settings[ObjectStorageQueueSetting::processing_threads_num].changed;
if (!processing_threads_num_changed && engine_settings[ObjectStorageQueueSetting::processing_threads_num] <= 1)

View File

@ -23,14 +23,14 @@ struct ObjectStorageQueueTableMetadata
const String format_name;
const String columns;
const String mode;
const UInt64 buckets;
const UInt32 buckets;
const String last_processed_path;
/// Changeable settings.
std::atomic<ObjectStorageQueueAction> after_processing;
std::atomic<UInt32> loading_retries;
std::atomic<UInt32> processing_threads_num;
std::atomic<UInt32> tracked_files_limit;
std::atomic<UInt32> tracked_files_ttl_sec;
std::atomic<UInt64> loading_retries;
std::atomic<UInt64> processing_threads_num;
std::atomic<UInt64> tracked_files_limit;
std::atomic<UInt64> tracked_files_ttl_sec;
bool processing_threads_num_changed = false;

View File

@ -55,12 +55,12 @@ namespace ObjectStorageQueueSetting
extern const ObjectStorageQueueSettingsUInt32 polling_min_timeout_ms;
extern const ObjectStorageQueueSettingsUInt32 polling_max_timeout_ms;
extern const ObjectStorageQueueSettingsUInt32 polling_backoff_ms;
extern const ObjectStorageQueueSettingsUInt32 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt64 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt32 buckets;
extern const ObjectStorageQueueSettingsUInt32 tracked_file_ttl_sec;
extern const ObjectStorageQueueSettingsUInt32 tracked_files_limit;
extern const ObjectStorageQueueSettingsUInt64 tracked_file_ttl_sec;
extern const ObjectStorageQueueSettingsUInt64 tracked_files_limit;
extern const ObjectStorageQueueSettingsString last_processed_path;
extern const ObjectStorageQueueSettingsUInt32 loading_retries;
extern const ObjectStorageQueueSettingsUInt64 loading_retries;
extern const ObjectStorageQueueSettingsObjectStorageQueueAction after_processing;
}
@ -356,10 +356,10 @@ void ReadFromObjectStorageQueue::initializePipeline(QueryPipelineBuilder & pipel
{
Pipes pipes;
size_t adjusted_num_stream = storage->getTableMetadata().processing_threads_num.load();
size_t processing_threads_num = storage->getTableMetadata().processing_threads_num;
createIterator(nullptr);
for (size_t i = 0; i < adjusted_num_streams; ++i)
for (size_t i = 0; i < processing_threads_num; ++i)
pipes.emplace_back(storage->createSource(
i/* processor_id */,
info,
@ -490,12 +490,6 @@ bool StorageObjectStorageQueue::streamToViews()
LOG_TEST(log, "Using {} processing threads", processing_threads_num);
size_t adjusted_num_streams;
{
std::lock_guard lock(changeable_settings_mutex);
adjusted_num_streams = queue_settings->processing_threads_num;
}
while (!shutdown_called && !file_iterator->isFinished())
{
InterpreterInsertQuery interpreter(
@ -515,10 +509,10 @@ bool StorageObjectStorageQueue::streamToViews()
Pipes pipes;
std::vector<std::shared_ptr<ObjectStorageQueueSource>> sources;
pipes.reserve(adjusted_num_streams);
sources.reserve(adjusted_num_streams);
pipes.reserve(processing_threads_num);
sources.reserve(processing_threads_num);
for (size_t i = 0; i < adjusted_num_streams; ++i)
for (size_t i = 0; i < processing_threads_num; ++i)
{
auto source = createSource(
i/* processor_id */,
@ -534,7 +528,7 @@ bool StorageObjectStorageQueue::streamToViews()
auto pipe = Pipe::unitePipes(std::move(pipes));
block_io.pipeline.complete(std::move(pipe));
block_io.pipeline.setNumThreads(adjusted_num_streams);
block_io.pipeline.setNumThreads(processing_threads_num);
block_io.pipeline.setConcurrencyControl(queue_context->getSettingsRef()[Setting::use_concurrency_control]);
std::atomic_size_t rows = 0;
@ -570,13 +564,13 @@ static const std::unordered_set<std::string_view> changeable_settings_unordered_
"loading_retries",
"after_processing",
"tracked_files_limit",
"tracked_files_ttl_sec",
"tracked_file_ttl_sec",
/// For compatibility.
"s3queue_processing_threads_num",
"s3queue_loading_retries",
"s3queue_after_processing",
"s3queue_tracked_files_limit",
"s3queue_tracked_files_ttl_sec",
"s3queue_tracked_file_ttl_sec",
};
static const std::unordered_set<std::string_view> changeable_settings_ordered_mode
@ -600,7 +594,7 @@ void StorageObjectStorageQueue::checkAlterIsPossible(const AlterCommands & comma
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::MODIFY_SETTING)
if (command.type != AlterCommand::MODIFY_SETTING && command.type != AlterCommand::RESET_SETTING)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only MODIFY SETTING alter is allowed for {}", getName());
}
@ -613,6 +607,7 @@ void StorageObjectStorageQueue::checkAlterIsPossible(const AlterCommands & comma
const auto & new_changes = new_metadata.settings_changes->as<const ASTSetQuery &>().changes;
const auto & old_changes = old_metadata.settings_changes->as<const ASTSetQuery &>().changes;
const auto mode = getTableMetadata().getMode();
for (const auto & changed_setting : new_changes)
{
auto it = std::find_if(
@ -623,12 +618,12 @@ void StorageObjectStorageQueue::checkAlterIsPossible(const AlterCommands & comma
if (setting_changed)
{
if (!isSettingChangeable(changed_setting.name, queue_settings->mode))
if (!isSettingChangeable(changed_setting.name, mode))
{
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Changing setting {} is not allowed for {} mode of {}",
changed_setting.name, magic_enum::enum_name(queue_settings->mode.value), getName());
changed_setting.name, magic_enum::enum_name(mode), getName());
}
}
}
@ -648,9 +643,24 @@ void StorageObjectStorageQueue::alter(
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
commands.apply(new_metadata, local_context);
const auto & new_settings = new_metadata.settings_changes->as<ASTSetQuery &>().changes;
auto new_settings = new_metadata.settings_changes->as<ASTSetQuery &>().changes;
ObjectStorageQueueSettings default_settings;
for (const auto & setting : old_settings)
{
auto it = std::find_if(
new_settings.begin(), new_settings.end(),
[&](const SettingChange & change) { return change.name == setting.name; });
if (it == new_settings.end())
{
/// Setting was reset.
new_settings.push_back(SettingChange(setting.name, default_settings.get(setting.name)));
}
}
SettingsChanges changed_settings;
const auto mode = getTableMetadata().getMode();
for (const auto & setting : new_settings)
{
auto it = std::find_if(
@ -661,18 +671,23 @@ void StorageObjectStorageQueue::alter(
if (!setting_changed)
continue;
if (!isSettingChangeable(setting.name, queue_settings->mode))
if (!isSettingChangeable(setting.name, mode))
{
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Changing setting {} is not allowed for {} mode of {}",
setting.name, magic_enum::enum_name(queue_settings->mode.value), getName());
setting.name, magic_enum::enum_name(mode), getName());
}
changed_settings.push_back(setting);
}
files_metadata->alterSettings(changed_settings);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
metadata.setSettingsChanges(new_metadata.settings_changes);
setInMemoryMetadata(metadata);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
}
}

View File

@ -70,10 +70,6 @@ private:
ObjectStorageType type;
const std::string engine_name;
const std::unique_ptr<ObjectStorageQueueSettings> queue_settings;
std::mutex changeable_settings_mutex;
const fs::path zk_path;
const bool enable_logging_to_queue_log;
const UInt32 polling_min_timeout_ms;

View File

@ -1652,7 +1652,7 @@ def test_processed_file_setting(started_cluster, processing_threads):
values_csv = (
"\n".join((",".join(map(str, row)) for row in correct_values)) + "\n"
).encode()
file_path = f"{files_path}/99.csv"
file_path = f"{files_path}/test_99.csv"
put_s3_file_content(started_cluster, file_path, values_csv)
expected_rows += 1
@ -2118,3 +2118,125 @@ def test_processing_threads(started_cluster):
assert node.contains_in_log(
f"StorageS3Queue (default.{table_name}): Using 16 processing threads"
)
def test_alter_settings(started_cluster):
node1 = started_cluster.instances["node1"]
node2 = started_cluster.instances["node2"]
table_name = f"test_alter_settings_{uuid.uuid4().hex[:8]}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 1000
node1.query("DROP DATABASE IF EXISTS r")
node2.query("DROP DATABASE IF EXISTS r")
node1.query(
f"CREATE DATABASE r ENGINE=Replicated('/clickhouse/databases/{table_name}', 'shard1', 'node1')"
)
node2.query(
f"CREATE DATABASE r ENGINE=Replicated('/clickhouse/databases/{table_name}', 'shard1', 'node2')"
)
create_table(
started_cluster,
node1,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"processing_threads_num": 10,
"loading_retries": 20,
},
database_name="r",
)
assert '"processing_threads_num":10' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
assert '"loading_retries":20' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
assert '"after_processing":"keep"' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node1, f"r.{table_name}", dst_table_name)
create_mv(node2, f"r.{table_name}", dst_table_name)
def get_count():
return int(
node1.query(
f"SELECT count() FROM clusterAllReplicas(cluster, default.{dst_table_name})"
)
)
expected_rows = files_to_generate
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
node1.query(
f"""
ALTER TABLE r.{table_name}
MODIFY SETTING processing_threads_num=5, loading_retries=10, after_processing='delete', tracked_files_limit=50, tracked_file_ttl_sec=10000
"""
)
assert '"processing_threads_num":5' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
assert '"loading_retries":10' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
assert '"after_processing":"delete"' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
node1.restart_clickhouse()
assert '"processing_threads_num":5' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
assert '"loading_retries":10' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
assert '"after_processing":"delete"' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
node1.query(
f"""
ALTER TABLE r.{table_name} RESET SETTING after_processing
"""
)
assert '"processing_threads_num":5' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
assert '"loading_retries":10' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
assert '"after_processing":"keep"' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
node1.restart_clickhouse()
assert expected_rows == get_count()