diff --git a/dbms/src/Server/ClusterCopier.cpp b/dbms/src/Server/ClusterCopier.cpp index ec89ff2c99c..98106efe4a4 100644 --- a/dbms/src/Server/ClusterCopier.cpp +++ b/dbms/src/Server/ClusterCopier.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include @@ -62,6 +61,7 @@ #include #include #include +#include namespace DB @@ -157,12 +157,12 @@ struct TaskTable; struct TaskCluster; struct ClusterPartition; -using TasksPartition = std::map; +using TasksPartition = std::map>; using ShardInfo = Cluster::ShardInfo; using TaskShardPtr = std::shared_ptr; using TasksShard = std::vector; using TasksTable = std::list; -using ClusterPartitions = std::map; +using ClusterPartitions = std::map>; /// Just destination partition of a shard @@ -207,6 +207,7 @@ struct TaskShard UInt32 indexInCluster() const { return info.shard_num - 1; } String getDescription() const; + String getHostNameExample() const; /// Used to sort clusters by thier proximity ShardPriority priority; @@ -452,11 +453,19 @@ String TaskTable::getPartitionIsDirtyPath(const String & partition_name) const String DB::TaskShard::getDescription() const { - return "№" + toString(numberInCluster()) - + " of pull table " + getDatabaseDotTable(task_table.table_pull) - + " of cluster " + task_table.cluster_pull_name; + std::stringstream ss; + ss << "N" << numberInCluster() + << " (with a replica " << getHostNameExample() << "," + << " of pull table " + getDatabaseDotTable(task_table.table_pull) + << " of cluster " + task_table.cluster_pull_name << ")"; + return ss.str(); } +String DB::TaskShard::getHostNameExample() const +{ + auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster()); + return replicas.at(0).readableString(); +} static bool isExtedndedDefinitionStorage(const ASTPtr & storage_ast) @@ -743,93 +752,148 @@ public: task_table.initShards(task_cluster->random_engine); } - LOG_DEBUG(log, "Loaded " << task_cluster->table_tasks.size() << " table tasks"); + LOG_DEBUG(log, "Will process " << task_cluster->table_tasks.size() << " table tasks"); - /// Compute set of partitions, assume set of partitions aren't changed during the processing - for (auto & task_table : task_cluster->table_tasks) - { - LOG_DEBUG(log, "Set up table task " << task_table.table_id); - - for (const TaskShardPtr & task_shard : task_table.all_shards) - { - if (task_shard->info.pool == nullptr) - { - throw Exception("It is impossible to have only local shards, at least port number must be different", - ErrorCodes::LOGICAL_ERROR); - } - - LOG_DEBUG(log, "Set up shard " << task_shard->getDescription()); - LOG_DEBUG(log, "There are " << task_table.all_shards.size() << " shards, " << task_table.local_shards.size() << " of them are local ones"); - - auto existing_partitions_names = getShardPartitions(*task_shard); - Strings filtered_partitions_names; - - /// Check that user specified correct partition names - auto check_partition_format = [&] (const String & partition_text_quoted) - { - const DataTypePtr & type = task_shard->partition_key_column.type; - MutableColumnPtr column_dummy = type->createColumn(); - ReadBufferFromString rb(partition_text_quoted); - - try - { - type->deserializeTextQuoted(*column_dummy, rb); - } - catch (Exception & e) - { - throw Exception("Partition " + partition_text_quoted + " has incorrect format. " + e.displayText(), ErrorCodes::BAD_ARGUMENTS); - } - }; - - if (task_table.has_enabled_partitions) - { - /// Process partition in order specified by - for (const String & partition_name : task_table.enabled_partitions) - { - check_partition_format(partition_name); - auto it = existing_partitions_names.find(partition_name); - - /// Do not process partition if it is not in enabled_partitions list - if (it == existing_partitions_names.end()) - { - LOG_WARNING(log, "There is no enabled " << partition_name << " specified in enabled_partitions in shard " - << task_shard->getDescription()); - continue; - } - - filtered_partitions_names.emplace_back(*it); - } - - for (const String & partition_name : existing_partitions_names) - { - if (!task_table.enabled_partitions_set.count(partition_name)) - { - LOG_DEBUG(log, "Partition " << partition_name << " will not be processed, since it is not in " - << "enabled_partitions of " << task_table.table_id); - } - } - } - else - { - for (const String & partition_name : existing_partitions_names) - filtered_partitions_names.emplace_back(partition_name); - } - - for (const String & partition_name : filtered_partitions_names) - { - task_shard->partition_tasks.emplace(partition_name, ShardPartition(*task_shard, partition_name)); - - ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name]; - cluster_partition.shards.emplace_back(task_shard); - } - - LOG_DEBUG(log, "Will copy " << task_shard->partition_tasks.size() << " partitions from shard " << task_shard->getDescription()); - } - } + /// Do not initialize tables, will make deferred initialization in process() getZooKeeper()->createAncestors(getWorkersPath() + "/"); } + /// Compute set of partitions, assume set of partitions aren't changed during the processing + void initTaskTable(TaskTable & task_table) + { + LOG_INFO(log, "Set up table task " << task_table.table_id); + LOG_DEBUG(log, "There are " << task_table.all_shards.size() << " shards, " << task_table.local_shards.size() << " of them are local ones"); + + /// Check that user specified correct partition names + auto check_partition_format = [] (const DataTypePtr & type, const String & partition_text_quoted) + { + MutableColumnPtr column_dummy = type->createColumn(); + ReadBufferFromString rb(partition_text_quoted); + + try + { + type->deserializeTextQuoted(*column_dummy, rb); + } + catch (Exception & e) + { + throw Exception("Partition " + partition_text_quoted + " has incorrect format. " + e.displayText(), ErrorCodes::BAD_ARGUMENTS); + } + }; + + auto retry = [this] (auto && func, size_t max_tries) + { + std::exception_ptr exception; + + for (size_t try_number = 1; try_number <= max_tries; ++try_number) + { + try + { + return func(); + } + catch (...) + { + exception = std::current_exception(); + if (try_number < max_tries) + { + tryLogCurrentException(log, "Will retry"); + std::this_thread::sleep_for(default_sleep_time); + } + } + } + + std::rethrow_exception(exception); + }; + + /// Fetch partitions list from a shard + auto process_shard = [this, check_partition_format, retry] (const TaskShardPtr & task_shard) + { + TaskTable & task_table = task_shard->task_table; + + LOG_INFO(log, "Set up shard " << task_shard->getDescription()); + + auto get_partitions = [&] () { return getShardPartitions(*task_shard); }; + + auto existing_partitions_names = retry(get_partitions, 60); + Strings filtered_partitions_names; + Strings missing_partitions; + + if (task_table.has_enabled_partitions) + { + /// Process partition in order specified by + for (const String & partition_name : task_table.enabled_partitions) + { + /// Check that user specified correct partition names + check_partition_format(task_shard->partition_key_column.type, partition_name); + + auto it = existing_partitions_names.find(partition_name); + + /// Do not process partition if it is not in enabled_partitions list + if (it == existing_partitions_names.end()) + { + missing_partitions.emplace_back(partition_name); + continue; + } + + filtered_partitions_names.emplace_back(*it); + } + + for (const String & partition_name : existing_partitions_names) + { + if (!task_table.enabled_partitions_set.count(partition_name)) + { + LOG_DEBUG(log, "Partition " << partition_name << " will not be processed, since it is not in " + << "enabled_partitions of " << task_table.table_id); + } + } + } + else + { + for (const String & partition_name : existing_partitions_names) + filtered_partitions_names.emplace_back(partition_name); + } + + for (const String & partition_name : filtered_partitions_names) + { + task_shard->partition_tasks.emplace(partition_name, ShardPartition(*task_shard, partition_name)); + } + + if (!missing_partitions.empty()) + { + std::stringstream ss; + for (const String & missing_partition : missing_partitions) + ss << " " << missing_partition; + + LOG_WARNING(log, "There are no " << missing_partitions.size() << " partitions from enabled_partitions in shard " + << task_shard->getDescription() << " :" << ss.str()); + } + + LOG_DEBUG(log, "Will copy " << task_shard->partition_tasks.size() << " partitions from shard " << task_shard->getDescription()); + }; + + { + ThreadPool thread_pool(2 * getNumberOfPhysicalCPUCores()); + + for (const TaskShardPtr & task_shard : task_table.all_shards) + thread_pool.schedule([task_shard, process_shard]() { process_shard(task_shard); }); + + LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs"); + thread_pool.wait(); + } + + /// After partitions of each shard are initialized, initialize cluster partitions + for (const TaskShardPtr & task_shard : task_table.all_shards) + { + for (const auto & partition : task_shard->partition_tasks) + { + const String & partition_name = partition.first; + + ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name]; + cluster_partition.shards.emplace_back(task_shard); + } + } + } + void reloadTaskDescription() { String task_config_str; @@ -973,6 +1037,9 @@ public: if (task_table.all_shards.empty()) continue; + /// Deferred initialization + initTaskTable(task_table); + task_table.watch.restart(); bool table_is_done = false; @@ -1441,7 +1508,8 @@ protected: // Select all fields ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", inject_fault ? "1" : ""); - LOG_DEBUG(log, "Executing SELECT query: " << queryToString(query_select_ast)); + LOG_DEBUG(log, "Executing SELECT query and pull from " << task_shard.getDescription() + << " : " << queryToString(query_select_ast)); ASTPtr query_insert_ast; { @@ -1609,7 +1677,7 @@ protected: return parseQuery(parser_create_query, create_query_pull_str); } - void createShardInternalTables(TaskShard & task_shard) + void createShardInternalTables(TaskShard & task_shard, bool create_split = true) { TaskTable & task_table = task_shard.task_table; @@ -1635,17 +1703,16 @@ protected: auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast); auto create_table_split_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_split_shard, storage_split_ast); - //LOG_DEBUG(log, "Create shard reading table. Query: " << queryToString(create_table_pull_ast)); dropAndCreateLocalTable(create_table_pull_ast); - //LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast)); - dropAndCreateLocalTable(create_table_split_ast); + if (create_split) + dropAndCreateLocalTable(create_table_split_ast); } std::set getShardPartitions(TaskShard & task_shard) { - createShardInternalTables(task_shard); + createShardInternalTables(task_shard, false); TaskTable & task_table = task_shard.task_table; @@ -1663,6 +1730,7 @@ protected: ASTPtr query_ast = parseQuery(parser_query, query); Context local_context = context; + local_context.setSettings(task_cluster->settings_pull); Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().in); std::set res; @@ -1911,8 +1979,8 @@ void ClusterCopierApp::setupLogging() console_channel->open(); } - Poco::AutoPtr formatter(new Poco::PatternFormatter); - formatter->setProperty("pattern", "%L%Y-%m-%d %H:%M:%S.%i <%p> %s: %t"); + Poco::AutoPtr formatter = new OwnPatternFormatter(nullptr); + formatter->setProperty("times", "local"); Poco::AutoPtr formatting_channel(new Poco::FormattingChannel(formatter)); formatting_channel->setChannel(split_channel); split_channel->open(); diff --git a/dbms/src/Server/ClusterCopier.h b/dbms/src/Server/ClusterCopier.h index b24b08be0e7..347a1d8f645 100644 --- a/dbms/src/Server/ClusterCopier.h +++ b/dbms/src/Server/ClusterCopier.h @@ -24,7 +24,7 @@ * /server_fqdn#PID_timestamp - cluster-copier worker ID * ... * /tables - directory with table tasks - * /cluster.db.table - directory of table_hits task + * /cluster.db.table1 - directory of table_hits task * /partition1 - directory for partition1 * /shards - directory for source cluster shards * /1 - worker job for the first shard of partition1 of table test.hits @@ -45,7 +45,7 @@ * During this procedure a single 'cleaner' worker is selected. The worker waits for stopping all partition * workers, removes /shards node, executes DROP PARTITION on each destination node and removes /is_dirty node. * /cleaner- An ephemeral node used to select 'cleaner' worker. Contains ID of the worker. - * /test_visits + * /cluster.db.table2 * ... */ diff --git a/dbms/tests/integration/test_cluster_copier/configs/users.xml b/dbms/tests/integration/test_cluster_copier/configs/users.xml index 3c739cc2cc4..e742d4f05a6 100644 --- a/dbms/tests/integration/test_cluster_copier/configs/users.xml +++ b/dbms/tests/integration/test_cluster_copier/configs/users.xml @@ -3,6 +3,8 @@ 1 + + 5 diff --git a/dbms/tests/integration/test_cluster_copier/task0_description.xml b/dbms/tests/integration/test_cluster_copier/task0_description.xml index c54c07cddae..b5ded21ea67 100644 --- a/dbms/tests/integration/test_cluster_copier/task0_description.xml +++ b/dbms/tests/integration/test_cluster_copier/task0_description.xml @@ -10,6 +10,7 @@ + 0 diff --git a/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml b/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml index fe2d4a71596..5e7c614d2b7 100644 --- a/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml +++ b/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml @@ -8,6 +8,10 @@ 1 + + 0 + +