diff --git a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp index ac696d99f73..cc19c0be07f 100644 --- a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp @@ -697,13 +697,16 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st } else if (ex.code() == ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE) { - /// A remote performer has gone offline. + /// A remote performer has gone offline or we are experiencing network problems. /// Put the current distributed job on hold. Also jab the job scheduler /// so that it will come accross this distributed job even if no new jobs /// are submitted. - setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ON_HOLD, - ex.message()); - dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id); + if (current_job.isCoordinated()) + { + setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ON_HOLD, + ex.message()); + dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id); + } softCleanup(); wakeUpTrackerThread(); } @@ -734,21 +737,24 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st tryLogCurrentException(__PRETTY_FUNCTION__); } - LOG_ERROR(log, dumped_coordinator_state); + if (current_job.isCoordinated()) + LOG_ERROR(log, dumped_coordinator_state); throw; } catch (const std::exception & ex) { /// An error has occurred on this performer. handle_exception("Resharding job cancelled", ex.what()); - LOG_ERROR(log, dumped_coordinator_state); + if (current_job.isCoordinated()) + LOG_ERROR(log, dumped_coordinator_state); throw; } catch (...) { /// An error has occurred on this performer. handle_exception("Resharding job cancelled", "An unspecified error has occurred"); - LOG_ERROR(log, dumped_coordinator_state); + if (current_job.isCoordinated()) + LOG_ERROR(log, dumped_coordinator_state); throw; } @@ -953,6 +959,20 @@ void ReshardingWorker::publishShardedPartitions() pool.schedule([j, &tasks]{ tasks[j](); }); } } + catch (const Poco::TimeoutException & ex) + { + try + { + pool.wait(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + throw Exception{"Sharded partition upload operation timed out", + ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE}; + } catch (...) { try @@ -1092,6 +1112,20 @@ void ReshardingWorker::commit() } } } + catch (const Poco::TimeoutException & ex) + { + try + { + pool.wait(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + throw Exception{"A remote operation timed out while committing", + ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE}; + } catch (...) { try @@ -1257,6 +1291,21 @@ bool ReshardingWorker::checkAttachLogRecord(LogRecord & log_record) pool.schedule([i, &tasks]{ tasks[i](); }); } } + catch (const Poco::TimeoutException & ex) + { + try + { + pool.wait(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + throw Exception{"Part checking on remote node timed out while attempting " + "to fix a failed ATTACH operation", + ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE}; + } catch (...) { try diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index ad0a4aad0cf..92d5bfd17fc 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3413,13 +3413,15 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & /// For any reason the coordinator has disappeared. So obviously /// we don't have any means to notify other nodes of an error. } - else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED) { /// nothing here } else + { handle_exception(ex.message()); + LOG_ERROR(log, dumped_coordinator_state); + } } throw;