diff --git a/dbms/src/DataStreams/copyData.cpp b/dbms/src/DataStreams/copyData.cpp index 106f2f5033e..5730ed52c4e 100644 --- a/dbms/src/DataStreams/copyData.cpp +++ b/dbms/src/DataStreams/copyData.cpp @@ -1,6 +1,9 @@ #include #include #include +#include +#include +#include namespace DB @@ -22,8 +25,29 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall from.readPrefix(); to.writePrefix(); - while (Block block = from.read()) + size_t num_blocks = 0; + double total_blocks_time = 0; + size_t slowest_block_number = 0; + double slowest_block_time = 0; + + while (true) { + Stopwatch watch; + Block block = from.read(); + double elapsed = watch.elapsedSeconds(); + + if (num_blocks == 0 || elapsed > slowest_block_time) + { + slowest_block_number = num_blocks; + slowest_block_time = elapsed; + } + + total_blocks_time += elapsed; + ++num_blocks; + + if (!block) + break; + if (is_cancelled()) break; @@ -47,8 +71,28 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall if (is_cancelled()) return; - from.readSuffix(); - to.writeSuffix(); + auto log = &Poco::Logger::get("copyData"); + bool print_dbg = num_blocks > 2; + + if (print_dbg) + { + LOG_DEBUG(log, "Read " << num_blocks << " blocks. It took " << std::fixed << total_blocks_time << " seconds, average " + << std::fixed << total_blocks_time / num_blocks * 1000 << " ms, the slowest block #" << slowest_block_number + << " was read for " << std::fixed << slowest_block_time * 1000 << " ms "); + } + + { + Stopwatch watch; + to.writeSuffix(); + if (num_blocks > 1) + LOG_DEBUG(log, "It took " << std::fixed << watch.elapsedSeconds() << " for writeSuffix()"); + } + { + Stopwatch watch; + from.readSuffix(); + if (num_blocks > 1) + LOG_DEBUG(log, "It took " << std::fixed << watch.elapsedSeconds() << " seconds for readSuffix()"); + } } diff --git a/dbms/src/Server/ClusterCopier.cpp b/dbms/src/Server/ClusterCopier.cpp index c0d9c95b427..5699465e860 100644 --- a/dbms/src/Server/ClusterCopier.cpp +++ b/dbms/src/Server/ClusterCopier.cpp @@ -699,15 +699,19 @@ void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & c if (config.has(prefix + "settings_push")) settings_push.loadSettingsFromConfig(prefix + "settings_push", config); - /// Override important settings - settings_pull.load_balancing = LoadBalancing::NEAREST_HOSTNAME; - settings_pull.readonly = 1; - settings_pull.max_threads = 1; - settings_pull.max_block_size = settings_pull.max_block_size.changed ? settings_pull.max_block_size.value : 8192UL; - settings_pull.preferred_block_size_bytes = 0; + auto set_default_value = [] (auto && setting, auto && default_value) + { + setting = setting.changed ? setting.value : default_value; + }; - settings_push.insert_distributed_timeout = 0; + /// Override important settings + settings_pull.readonly = 1; settings_push.insert_distributed_sync = 1; + set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME); + set_default_value(settings_pull.max_threads, 1); + set_default_value(settings_pull.max_block_size, 8192UL); + set_default_value(settings_pull.preferred_block_size_bytes, 0); + set_default_value(settings_push.insert_distributed_timeout, 0); } @@ -1097,22 +1101,27 @@ protected: status_paths.emplace_back(task_shard_partition.getShardStatusPath()); } - zkutil::Stat stat; std::vector zxid1, zxid2; try { - // Check that state is Finished and remember zxid + std::vector get_futures; for (const String & path : status_paths) + get_futures.emplace_back(zookeeper->asyncGet(path)); + + // Check that state is Finished and remember zxid + for (auto & future : get_futures) { - TaskStateWithOwner status = TaskStateWithOwner::fromString(zookeeper->get(path, &stat)); + auto res = future.get(); + + TaskStateWithOwner status = TaskStateWithOwner::fromString(res.value); if (status.state != TaskState::Finished) { - LOG_INFO(log, "The task " << path << " is being rewritten by " << status.owner - << ". Partition will be rechecked"); + LOG_INFO(log, "The task " << res.value << " is being rewritten by " << status.owner << ". Partition will be rechecked"); return false; } - zxid1.push_back(stat.pzxid); + + zxid1.push_back(res.stat.pzxid); } // Check that partition is not dirty @@ -1122,11 +1131,15 @@ protected: return false; } + get_futures.clear(); + for (const String & path : status_paths) + get_futures.emplace_back(zookeeper->asyncGet(path)); + // Remember zxid of states again - for (const auto & path : status_paths) + for (auto & future : get_futures) { - zookeeper->exists(path, &stat); - zxid2.push_back(stat.pzxid); + auto res = future.get(); + zxid2.push_back(res.stat.pzxid); } } catch (const zkutil::KeeperException & e) @@ -1664,7 +1677,7 @@ protected: BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute(); BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute(); - input = std::make_shared(io_select.in); + input = io_select.in; output = io_insert.out; } diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index d739badba88..2ff2174c751 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -28,6 +28,7 @@ #include #include +#include #include #include @@ -353,35 +354,53 @@ void DistributedBlockOutputStream::writeSync(const Block & block) inserted_blocks += 1; inserted_rows += block.rows(); + last_block_finish_time = time(nullptr); } void DistributedBlockOutputStream::writeSuffix() { + auto log_performance = [this] () + { + double elapsed = watch.elapsedSeconds(); + LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << inserted_blocks << " blocks" + << ", " << std::fixed << std::setprecision(1) << inserted_rows / elapsed << " rows per second" + << ". " << getCurrentStateDescription()); + }; + if (insert_sync && pool) { + auto format_ts = [] (time_t ts) { + WriteBufferFromOwnString wb; + writeDateTimeText(ts, wb); + return wb.str(); + }; + + LOG_DEBUG(log, "Writing suffix, the last block was at " << format_ts(last_block_finish_time)); + finished_jobs_count = 0; for (auto & shard_jobs : per_shard_jobs) for (JobReplica & job : shard_jobs.replicas_jobs) { if (job.stream) - pool->schedule([&job] () { job.stream->writeSuffix(); }); + { + pool->schedule([&job] () { + job.stream->writeSuffix(); + }); + } } try { pool->wait(); + log_performance(); } catch (Exception & exception) { + log_performance(); exception.addMessage(getCurrentStateDescription()); throw; } - - double elapsed = watch.elapsedSeconds(); - LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << inserted_blocks << " blocks" - << ", " << std::fixed << std::setprecision(1) << inserted_rows / elapsed << " rows per second" - << ". " << getCurrentStateDescription()); } } diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h index 6b3349eb16d..32f3a49f549 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h @@ -12,6 +12,7 @@ #include #include + namespace Poco { class Logger; @@ -93,6 +94,7 @@ private: std::optional pool; ThrottlerPtr throttler; String query_string; + time_t last_block_finish_time = 0; struct JobReplica { diff --git a/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml b/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml index 5e7c614d2b7..e212d1a3d04 100644 --- a/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml +++ b/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml @@ -6,6 +6,7 @@ 1 + 2