Autoupdate of ClusterCopier settings. [#CLICKHOUSE-3346]

This commit is contained in:
Vitaliy Lyudvichenko 2018-02-13 21:42:59 +03:00
parent 2e2c70edb5
commit f6a63c4d0c

View File

@ -279,7 +279,12 @@ struct TaskTable
struct TaskCluster struct TaskCluster
{ {
TaskCluster(const String & task_zookeeper_path_, const Poco::Util::AbstractConfiguration & config, const String & base_key, const String & default_local_database_); TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_)
: task_zookeeper_path(task_zookeeper_path_), default_local_database(default_local_database_) {}
void loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");
void reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");
/// Base node for all tasks. Its structure: /// Base node for all tasks. Its structure:
/// workers/ - directory with active workers (amount of them is less or equal max_workers) /// workers/ - directory with active workers (amount of them is less or equal max_workers)
@ -287,6 +292,9 @@ struct TaskCluster
/// table_table1/ - directories with per-partition copying status /// table_table1/ - directories with per-partition copying status
String task_zookeeper_path; String task_zookeeper_path;
/// Database used to create temporary Distributed tables
String default_local_database;
/// Limits number of simultaneous workers /// Limits number of simultaneous workers
size_t max_workers = 0; size_t max_workers = 0;
@ -297,15 +305,11 @@ struct TaskCluster
/// Settings used to insert data /// Settings used to insert data
Settings settings_push; Settings settings_push;
String clusters_prefix;
/// Subtasks /// Subtasks
TasksTable table_tasks; TasksTable table_tasks;
/// Database used to create temporary Distributed tables
String default_local_database;
/// Path to remote_servers in task config
String clusters_prefix;
std::random_device random_device; std::random_device random_device;
pcg64 random_engine; pcg64 random_engine;
}; };
@ -565,32 +569,12 @@ void TaskTable::initShards(RandomEngine && random_engine)
local_shards.assign(all_shards.begin(), it_first_remote); local_shards.assign(all_shards.begin(), it_first_remote);
} }
TaskCluster::TaskCluster(const String & task_zookeeper_path_, const Poco::Util::AbstractConfiguration & config, const String & base_key,
const String & default_local_database_) void DB::TaskCluster::loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key)
{ {
String prefix = base_key.empty() ? "" : base_key + "."; String prefix = base_key.empty() ? "" : base_key + ".";
task_zookeeper_path = task_zookeeper_path_;
default_local_database = default_local_database_;
max_workers = config.getUInt64(prefix + "max_workers");
if (config.has(prefix + "settings"))
{
settings_common.loadSettingsFromConfig(prefix + "settings", config);
settings_pull = settings_common;
settings_push = settings_common;
}
if (config.has(prefix + "settings_pull"))
settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config);
if (config.has(prefix + "settings_push"))
settings_push.loadSettingsFromConfig(prefix + "settings_push", config);
clusters_prefix = prefix + "remote_servers"; clusters_prefix = prefix + "remote_servers";
if (!config.has(clusters_prefix)) if (!config.has(clusters_prefix))
throw Exception("You should specify list of clusters in " + clusters_prefix, ErrorCodes::BAD_ARGUMENTS); throw Exception("You should specify list of clusters in " + clusters_prefix, ErrorCodes::BAD_ARGUMENTS);
@ -603,6 +587,36 @@ TaskCluster::TaskCluster(const String & task_zookeeper_path_, const Poco::Util::
} }
} }
void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key)
{
String prefix = base_key.empty() ? "" : base_key + ".";
max_workers = config.getUInt64(prefix + "max_workers");
settings_common = Settings();
if (config.has(prefix + "settings"))
settings_common.loadSettingsFromConfig(prefix + "settings", config);
settings_pull = settings_common;
if (config.has(prefix + "settings_pull"))
settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config);
settings_push = settings_common;
if (config.has(prefix + "settings_push"))
settings_push.loadSettingsFromConfig(prefix + "settings_push", config);
/// Override important settings
settings_pull.load_balancing = LoadBalancing::NEAREST_HOSTNAME;
settings_pull.limits.readonly = 1;
settings_pull.max_threads = 1;
settings_pull.max_block_size = std::min(8192UL, settings_pull.max_block_size.value);
settings_pull.preferred_block_size_bytes = 0;
settings_push.insert_distributed_timeout = 0;
settings_push.insert_distributed_sync = 1;
}
} // end of an anonymous namespace } // end of an anonymous namespace
@ -628,27 +642,22 @@ public:
void init() void init()
{ {
String description_path = task_zookeeper_path + "/description"; auto zookeeper = getZooKeeper();
String task_config_str = getZooKeeper()->get(description_path);
task_cluster_config = getConfigurationFromXMLString(task_config_str); task_description_watch_callback = [this] (zkutil::ZooKeeper &, int, int, const char *)
task_cluster = std::make_unique<TaskCluster>(task_zookeeper_path, *task_cluster_config, "", working_database_name); {
UInt64 version = ++task_descprtion_version;
LOG_DEBUG(log, "Task description should be updated, local version " << version);
};
/// Override important settings task_description_path = task_zookeeper_path + "/description";
Settings & settings_pull = task_cluster->settings_pull; task_cluster = std::make_unique<TaskCluster>(task_zookeeper_path, working_database_name);
settings_pull.load_balancing = LoadBalancing::NEAREST_HOSTNAME;
settings_pull.limits.readonly = 1;
settings_pull.max_threads = 1;
settings_pull.max_block_size = std::min(8192UL, settings_pull.max_block_size.value);
settings_pull.preferred_block_size_bytes = 0;
Settings & settings_push = task_cluster->settings_push; reloadTaskDescription();
settings_push.insert_distributed_timeout = 0; task_cluster_initial_config = task_cluster_current_config;
settings_push.insert_distributed_sync = 1;
/// Set up clusters task_cluster->loadTasks(*task_cluster_initial_config);
context.getSettingsRef() = task_cluster->settings_common; context.setClustersConfig(task_cluster_initial_config, task_cluster->clusters_prefix);
context.setClustersConfig(task_cluster_config, task_cluster->clusters_prefix);
/// Set up shards and their priority /// Set up shards and their priority
task_cluster->random_engine.seed(task_cluster->random_device()); task_cluster->random_engine.seed(task_cluster->random_device());
@ -705,13 +714,41 @@ public:
} }
} }
auto zookeeper = getZooKeeper(); getZooKeeper()->createAncestors(getWorkersPath() + "/");
zookeeper->createAncestors(getWorkersPath() + "/");
} }
void reloadTaskDescription()
{
String task_config_str;
zkutil::Stat stat;
int code;
getZooKeeper()->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code);
if (code != ZOK)
throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS);
LOG_DEBUG(log, "Loading description, zxid=" << task_descprtion_current_stat.czxid);
auto config = getConfigurationFromXMLString(task_config_str);
/// Setup settings
task_cluster->reloadSettings(*config);
context.getSettingsRef() = task_cluster->settings_common;
task_cluster_current_config = config;
task_descprtion_current_stat = stat;
}
void updateConfigIfNeeded()
{
UInt64 version_to_update = task_descprtion_version;
if (task_descprtion_current_version == version_to_update)
return;
task_descprtion_current_version = version_to_update;
}
static constexpr size_t max_table_tries = 1000; static constexpr size_t max_table_tries = 1000;
static constexpr size_t max_partition_tries = 3; static constexpr size_t max_partition_tries = 1;
bool tryProcessTable(TaskTable & task_table) bool tryProcessTable(TaskTable & task_table)
{ {
@ -934,7 +971,8 @@ protected:
return getWorkersPath() + "/" + host_id; return getWorkersPath() + "/" + host_id;
} }
zkutil::EphemeralNodeHolder::Ptr createWorkerNodeAndWaitIfNeed(const zkutil::ZooKeeperPtr & zookeeper, const String & task_description) zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(const zkutil::ZooKeeperPtr & zookeeper,
const String & description)
{ {
while (true) while (true)
{ {
@ -944,12 +982,12 @@ protected:
if (static_cast<size_t>(stat.numChildren) >= task_cluster->max_workers) if (static_cast<size_t>(stat.numChildren) >= task_cluster->max_workers)
{ {
LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")" LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")"
<< ". Postpone processing " << task_description); << ". Postpone processing " << description);
std::this_thread::sleep_for(default_sleep_time); std::this_thread::sleep_for(default_sleep_time);
} }
else else
{ {
return std::make_shared<zkutil::EphemeralNodeHolder>(getCurrentWorkerNodePath(), *zookeeper, true, false, task_description); return std::make_shared<zkutil::EphemeralNodeHolder>(getCurrentWorkerNodePath(), *zookeeper, true, false, description);
} }
} }
} }
@ -1072,17 +1110,32 @@ protected:
return true; return true;
} }
bool processPartitionTask(TaskPartition & task_partition) bool processPartitionTask(TaskPartition & task_partition)
{ {
bool res;
try try
{ {
return processPartitionTaskImpl(task_partition); res = processPartitionTaskImpl(task_partition);
} }
catch (...) catch (...)
{ {
tryLogCurrentException(log, "An error occurred while processing partition " + task_partition.name); tryLogCurrentException(log, "An error occurred while processing partition " + task_partition.name);
return false; return false;
} }
/// At the end of each task check if the config is updated
try
{
updateConfigIfNeeded();
}
catch (...)
{
tryLogCurrentException(log, "An error occurred while updating the config");
}
return res;
} }
bool processPartitionTaskImpl(TaskPartition & task_partition) bool processPartitionTaskImpl(TaskPartition & task_partition)
@ -1125,7 +1178,7 @@ protected:
/// Load balancing /// Load balancing
auto worker_node_holder = createWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path); auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path);
LOG_DEBUG(log, "Processing " << current_task_status_path); LOG_DEBUG(log, "Processing " << current_task_status_path);
@ -1187,14 +1240,14 @@ protected:
zookeeper->createAncestors(current_task_status_path); zookeeper->createAncestors(current_task_status_path);
/// We need to update table definitions for each part, it could be changed after ALTER /// We need to update table definitions for each part, it could be changed after ALTER
ASTPtr create_query_pull_ast; ASTPtr query_create_pull_table;
{ {
/// Fetch and parse (possibly) new definition /// Fetch and parse (possibly) new definition
auto connection_entry = task_shard.info.pool->get(&task_cluster->settings_pull); auto connection_entry = task_shard.info.pool->get(&task_cluster->settings_pull);
String create_query_pull_str = getRemoteCreateTable(task_table.table_pull, *connection_entry, &task_cluster->settings_pull); String create_query_pull_str = getRemoteCreateTable(task_table.table_pull, *connection_entry, &task_cluster->settings_pull);
ParserCreateQuery parser_create_query; ParserCreateQuery parser_create_query;
create_query_pull_ast = parseQuery(parser_create_query, create_query_pull_str); query_create_pull_table = parseQuery(parser_create_query, create_query_pull_str);
} }
/// Create local Distributed tables: /// Create local Distributed tables:
@ -1210,7 +1263,7 @@ protected:
auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second); auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
const auto & storage_split_ast = task_table.engine_split_ast; const auto & storage_split_ast = task_table.engine_split_ast;
auto create_query_ast = removeAliasColumnsFromCreateQuery(create_query_pull_ast); auto create_query_ast = removeAliasColumnsFromCreateQuery(query_create_pull_table);
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, table_shard, storage_shard_ast); auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, table_shard, storage_shard_ast);
auto create_table_split_ast = rewriteCreateQueryStorage(create_query_ast, table_split, storage_split_ast); auto create_table_split_ast = rewriteCreateQueryStorage(create_query_ast, table_split, storage_split_ast);
@ -1278,12 +1331,15 @@ protected:
/// Try create table (if not exists) on each shard /// Try create table (if not exists) on each shard
{ {
auto create_query_push_ast = rewriteCreateQueryStorage(create_query_pull_ast, task_table.table_push, task_table.engine_push_ast); auto create_query_push_ast = rewriteCreateQueryStorage(query_create_pull_table, task_table.table_push, task_table.engine_push_ast);
typeid_cast<ASTCreateQuery &>(*create_query_push_ast).if_not_exists = true; typeid_cast<ASTCreateQuery &>(*create_query_push_ast).if_not_exists = true;
String query = queryToString(create_query_push_ast); String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create remote push tables. Query: " << query); LOG_DEBUG(log, "Create remote push tables. Query: " << query);
executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push, PoolMode::GET_MANY); size_t shards = executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_DEBUG(log, "Remote push tables have been created on " << shards << " shards of "
<< task_table.cluster_push->getShardCount());
} }
/// Do the copying /// Do the copying
@ -1545,28 +1601,28 @@ protected:
/// Will try to make as many as possible queries /// Will try to make as many as possible queries
if (shard.hasRemoteConnections()) if (shard.hasRemoteConnections())
{ {
Settings current_settings = *settings; Settings current_settings = settings ? *settings : task_cluster->settings_common;
current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1; current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
std::vector<IConnectionPool::Entry> connections = shard.pool->getMany(&current_settings, pool_mode); std::vector<IConnectionPool::Entry> connections = shard.pool->getMany(&current_settings, pool_mode);
for (auto & connection : connections) for (auto & connection : connections)
{ {
if (!connection.isNull()) if (connection.isNull())
{ continue;
try
{
RemoteBlockInputStream stream(*connection, query, {}, context, &current_settings);
NullBlockOutputStream output;
copyData(stream, output);
if (increment_and_check_exit()) try
return; {
} RemoteBlockInputStream stream(*connection, query, context, &current_settings);
catch (const Exception & e) NullBlockOutputStream output;
{ copyData(stream, output);
LOG_INFO(log, getCurrentExceptionMessage(false, true));
} if (increment_and_check_exit())
return;
}
catch (const Exception & e)
{
LOG_INFO(log, getCurrentExceptionMessage(false, true));
} }
} }
} }
@ -1604,17 +1660,25 @@ protected:
private: private:
ConfigurationPtr zookeeper_config; ConfigurationPtr zookeeper_config;
String task_zookeeper_path; String task_zookeeper_path;
String task_description_path;
String host_id; String host_id;
String working_database_name; String working_database_name;
bool is_safe_mode = false; UInt64 task_descprtion_current_version = 1;
double copy_fault_probability = 0.0; std::atomic<UInt64> task_descprtion_version{1};
zkutil::WatchCallback task_description_watch_callback;
ConfigurationPtr task_cluster_initial_config;
ConfigurationPtr task_cluster_current_config;
zkutil::Stat task_descprtion_current_stat;
ConfigurationPtr task_cluster_config;
std::unique_ptr<TaskCluster> task_cluster; std::unique_ptr<TaskCluster> task_cluster;
zkutil::ZooKeeperPtr current_zookeeper; zkutil::ZooKeeperPtr current_zookeeper;
bool is_safe_mode = false;
double copy_fault_probability = 0.0;
Context & context; Context & context;
Poco::Logger * log; Poco::Logger * log;