diff --git a/dbms/src/Server/ClusterCopier.cpp b/dbms/src/Server/ClusterCopier.cpp index b07874261e2..89ea0ba39b6 100644 --- a/dbms/src/Server/ClusterCopier.cpp +++ b/dbms/src/Server/ClusterCopier.cpp @@ -279,7 +279,12 @@ struct TaskTable struct TaskCluster { - TaskCluster(const String & task_zookeeper_path_, const Poco::Util::AbstractConfiguration & config, const String & base_key, const String & default_local_database_); + TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_) + : task_zookeeper_path(task_zookeeper_path_), default_local_database(default_local_database_) {} + + void loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key = ""); + + void reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key = ""); /// Base node for all tasks. Its structure: /// workers/ - directory with active workers (amount of them is less or equal max_workers) @@ -287,6 +292,9 @@ struct TaskCluster /// table_table1/ - directories with per-partition copying status String task_zookeeper_path; + /// Database used to create temporary Distributed tables + String default_local_database; + /// Limits number of simultaneous workers size_t max_workers = 0; @@ -297,15 +305,11 @@ struct TaskCluster /// Settings used to insert data Settings settings_push; + String clusters_prefix; + /// Subtasks TasksTable table_tasks; - /// Database used to create temporary Distributed tables - String default_local_database; - - /// Path to remote_servers in task config - String clusters_prefix; - std::random_device random_device; pcg64 random_engine; }; @@ -565,32 +569,12 @@ void TaskTable::initShards(RandomEngine && random_engine) local_shards.assign(all_shards.begin(), it_first_remote); } -TaskCluster::TaskCluster(const String & task_zookeeper_path_, const Poco::Util::AbstractConfiguration & config, const String & base_key, - const String & default_local_database_) + +void DB::TaskCluster::loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key) { String prefix = base_key.empty() ? "" : base_key + "."; - task_zookeeper_path = task_zookeeper_path_; - - default_local_database = default_local_database_; - - max_workers = config.getUInt64(prefix + "max_workers"); - - if (config.has(prefix + "settings")) - { - settings_common.loadSettingsFromConfig(prefix + "settings", config); - settings_pull = settings_common; - settings_push = settings_common; - } - - if (config.has(prefix + "settings_pull")) - settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config); - - if (config.has(prefix + "settings_push")) - settings_push.loadSettingsFromConfig(prefix + "settings_push", config); - clusters_prefix = prefix + "remote_servers"; - if (!config.has(clusters_prefix)) throw Exception("You should specify list of clusters in " + clusters_prefix, ErrorCodes::BAD_ARGUMENTS); @@ -603,6 +587,36 @@ TaskCluster::TaskCluster(const String & task_zookeeper_path_, const Poco::Util:: } } +void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key) +{ + String prefix = base_key.empty() ? "" : base_key + "."; + + max_workers = config.getUInt64(prefix + "max_workers"); + + settings_common = Settings(); + if (config.has(prefix + "settings")) + settings_common.loadSettingsFromConfig(prefix + "settings", config); + + settings_pull = settings_common; + if (config.has(prefix + "settings_pull")) + settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config); + + settings_push = settings_common; + if (config.has(prefix + "settings_push")) + settings_push.loadSettingsFromConfig(prefix + "settings_push", config); + + /// Override important settings + settings_pull.load_balancing = LoadBalancing::NEAREST_HOSTNAME; + settings_pull.limits.readonly = 1; + settings_pull.max_threads = 1; + settings_pull.max_block_size = std::min(8192UL, settings_pull.max_block_size.value); + settings_pull.preferred_block_size_bytes = 0; + + settings_push.insert_distributed_timeout = 0; + settings_push.insert_distributed_sync = 1; +} + + } // end of an anonymous namespace @@ -628,27 +642,22 @@ public: void init() { - String description_path = task_zookeeper_path + "/description"; - String task_config_str = getZooKeeper()->get(description_path); + auto zookeeper = getZooKeeper(); - task_cluster_config = getConfigurationFromXMLString(task_config_str); - task_cluster = std::make_unique(task_zookeeper_path, *task_cluster_config, "", working_database_name); + task_description_watch_callback = [this] (zkutil::ZooKeeper &, int, int, const char *) + { + UInt64 version = ++task_descprtion_version; + LOG_DEBUG(log, "Task description should be updated, local version " << version); + }; - /// Override important settings - Settings & settings_pull = task_cluster->settings_pull; - settings_pull.load_balancing = LoadBalancing::NEAREST_HOSTNAME; - settings_pull.limits.readonly = 1; - settings_pull.max_threads = 1; - settings_pull.max_block_size = std::min(8192UL, settings_pull.max_block_size.value); - settings_pull.preferred_block_size_bytes = 0; + task_description_path = task_zookeeper_path + "/description"; + task_cluster = std::make_unique(task_zookeeper_path, working_database_name); - Settings & settings_push = task_cluster->settings_push; - settings_push.insert_distributed_timeout = 0; - settings_push.insert_distributed_sync = 1; + reloadTaskDescription(); + task_cluster_initial_config = task_cluster_current_config; - /// Set up clusters - context.getSettingsRef() = task_cluster->settings_common; - context.setClustersConfig(task_cluster_config, task_cluster->clusters_prefix); + task_cluster->loadTasks(*task_cluster_initial_config); + context.setClustersConfig(task_cluster_initial_config, task_cluster->clusters_prefix); /// Set up shards and their priority task_cluster->random_engine.seed(task_cluster->random_device()); @@ -705,13 +714,41 @@ public: } } - auto zookeeper = getZooKeeper(); - zookeeper->createAncestors(getWorkersPath() + "/"); + getZooKeeper()->createAncestors(getWorkersPath() + "/"); } + void reloadTaskDescription() + { + String task_config_str; + zkutil::Stat stat; + int code; + + getZooKeeper()->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code); + if (code != ZOK) + throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS); + + LOG_DEBUG(log, "Loading description, zxid=" << task_descprtion_current_stat.czxid); + auto config = getConfigurationFromXMLString(task_config_str); + + /// Setup settings + task_cluster->reloadSettings(*config); + context.getSettingsRef() = task_cluster->settings_common; + + task_cluster_current_config = config; + task_descprtion_current_stat = stat; + } + + void updateConfigIfNeeded() + { + UInt64 version_to_update = task_descprtion_version; + if (task_descprtion_current_version == version_to_update) + return; + + task_descprtion_current_version = version_to_update; + } static constexpr size_t max_table_tries = 1000; - static constexpr size_t max_partition_tries = 3; + static constexpr size_t max_partition_tries = 1; bool tryProcessTable(TaskTable & task_table) { @@ -934,7 +971,8 @@ protected: return getWorkersPath() + "/" + host_id; } - zkutil::EphemeralNodeHolder::Ptr createWorkerNodeAndWaitIfNeed(const zkutil::ZooKeeperPtr & zookeeper, const String & task_description) + zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(const zkutil::ZooKeeperPtr & zookeeper, + const String & description) { while (true) { @@ -944,12 +982,12 @@ protected: if (static_cast(stat.numChildren) >= task_cluster->max_workers) { LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")" - << ". Postpone processing " << task_description); + << ". Postpone processing " << description); std::this_thread::sleep_for(default_sleep_time); } else { - return std::make_shared(getCurrentWorkerNodePath(), *zookeeper, true, false, task_description); + return std::make_shared(getCurrentWorkerNodePath(), *zookeeper, true, false, description); } } } @@ -1072,17 +1110,32 @@ protected: return true; } + bool processPartitionTask(TaskPartition & task_partition) { + bool res; + try { - return processPartitionTaskImpl(task_partition); + res = processPartitionTaskImpl(task_partition); } catch (...) { tryLogCurrentException(log, "An error occurred while processing partition " + task_partition.name); return false; } + + /// At the end of each task check if the config is updated + try + { + updateConfigIfNeeded(); + } + catch (...) + { + tryLogCurrentException(log, "An error occurred while updating the config"); + } + + return res; } bool processPartitionTaskImpl(TaskPartition & task_partition) @@ -1125,7 +1178,7 @@ protected: /// Load balancing - auto worker_node_holder = createWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path); + auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path); LOG_DEBUG(log, "Processing " << current_task_status_path); @@ -1187,14 +1240,14 @@ protected: zookeeper->createAncestors(current_task_status_path); /// We need to update table definitions for each part, it could be changed after ALTER - ASTPtr create_query_pull_ast; + ASTPtr query_create_pull_table; { /// Fetch and parse (possibly) new definition auto connection_entry = task_shard.info.pool->get(&task_cluster->settings_pull); String create_query_pull_str = getRemoteCreateTable(task_table.table_pull, *connection_entry, &task_cluster->settings_pull); ParserCreateQuery parser_create_query; - create_query_pull_ast = parseQuery(parser_create_query, create_query_pull_str); + query_create_pull_table = parseQuery(parser_create_query, create_query_pull_str); } /// Create local Distributed tables: @@ -1210,7 +1263,7 @@ protected: auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second); const auto & storage_split_ast = task_table.engine_split_ast; - auto create_query_ast = removeAliasColumnsFromCreateQuery(create_query_pull_ast); + auto create_query_ast = removeAliasColumnsFromCreateQuery(query_create_pull_table); auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, table_shard, storage_shard_ast); auto create_table_split_ast = rewriteCreateQueryStorage(create_query_ast, table_split, storage_split_ast); @@ -1278,12 +1331,15 @@ protected: /// Try create table (if not exists) on each shard { - auto create_query_push_ast = rewriteCreateQueryStorage(create_query_pull_ast, task_table.table_push, task_table.engine_push_ast); + auto create_query_push_ast = rewriteCreateQueryStorage(query_create_pull_table, task_table.table_push, task_table.engine_push_ast); typeid_cast(*create_query_push_ast).if_not_exists = true; String query = queryToString(create_query_push_ast); LOG_DEBUG(log, "Create remote push tables. Query: " << query); - executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push, PoolMode::GET_MANY); + size_t shards = executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push, + PoolMode::GET_MANY); + LOG_DEBUG(log, "Remote push tables have been created on " << shards << " shards of " + << task_table.cluster_push->getShardCount()); } /// Do the copying @@ -1545,28 +1601,28 @@ protected: /// Will try to make as many as possible queries if (shard.hasRemoteConnections()) { - Settings current_settings = *settings; + Settings current_settings = settings ? *settings : task_cluster->settings_common; current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1; std::vector connections = shard.pool->getMany(¤t_settings, pool_mode); for (auto & connection : connections) { - if (!connection.isNull()) - { - try - { - RemoteBlockInputStream stream(*connection, query, {}, context, ¤t_settings); - NullBlockOutputStream output; - copyData(stream, output); + if (connection.isNull()) + continue; - if (increment_and_check_exit()) - return; - } - catch (const Exception & e) - { - LOG_INFO(log, getCurrentExceptionMessage(false, true)); - } + try + { + RemoteBlockInputStream stream(*connection, query, context, ¤t_settings); + NullBlockOutputStream output; + copyData(stream, output); + + if (increment_and_check_exit()) + return; + } + catch (const Exception & e) + { + LOG_INFO(log, getCurrentExceptionMessage(false, true)); } } } @@ -1604,17 +1660,25 @@ protected: private: ConfigurationPtr zookeeper_config; String task_zookeeper_path; + String task_description_path; String host_id; String working_database_name; - bool is_safe_mode = false; - double copy_fault_probability = 0.0; + UInt64 task_descprtion_current_version = 1; + std::atomic task_descprtion_version{1}; + zkutil::WatchCallback task_description_watch_callback; + + ConfigurationPtr task_cluster_initial_config; + ConfigurationPtr task_cluster_current_config; + zkutil::Stat task_descprtion_current_stat; - ConfigurationPtr task_cluster_config; std::unique_ptr task_cluster; zkutil::ZooKeeperPtr current_zookeeper; + bool is_safe_mode = false; + double copy_fault_probability = 0.0; + Context & context; Poco::Logger * log;