This commit is contained in:
Alexey Milovidov 2016-04-15 05:34:48 +03:00
commit 02ceda2ba4
2 changed files with 59 additions and 8 deletions

View File

@ -697,13 +697,16 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
}
else if (ex.code() == ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE)
{
/// A remote performer has gone offline.
/// A remote performer has gone offline or we are experiencing network problems.
/// Put the current distributed job on hold. Also jab the job scheduler
/// so that it will come accross this distributed job even if no new jobs
/// are submitted.
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ON_HOLD,
ex.message());
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
if (current_job.isCoordinated())
{
setStatus(current_job.coordinator_id, getFQDNOrHostName(), STATUS_ON_HOLD,
ex.message());
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
}
softCleanup();
wakeUpTrackerThread();
}
@ -734,21 +737,24 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
tryLogCurrentException(__PRETTY_FUNCTION__);
}
LOG_ERROR(log, dumped_coordinator_state);
if (current_job.isCoordinated())
LOG_ERROR(log, dumped_coordinator_state);
throw;
}
catch (const std::exception & ex)
{
/// An error has occurred on this performer.
handle_exception("Resharding job cancelled", ex.what());
LOG_ERROR(log, dumped_coordinator_state);
if (current_job.isCoordinated())
LOG_ERROR(log, dumped_coordinator_state);
throw;
}
catch (...)
{
/// An error has occurred on this performer.
handle_exception("Resharding job cancelled", "An unspecified error has occurred");
LOG_ERROR(log, dumped_coordinator_state);
if (current_job.isCoordinated())
LOG_ERROR(log, dumped_coordinator_state);
throw;
}
@ -953,6 +959,20 @@ void ReshardingWorker::publishShardedPartitions()
pool.schedule([j, &tasks]{ tasks[j](); });
}
}
catch (const Poco::TimeoutException & ex)
{
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
throw Exception{"Sharded partition upload operation timed out",
ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE};
}
catch (...)
{
try
@ -1092,6 +1112,20 @@ void ReshardingWorker::commit()
}
}
}
catch (const Poco::TimeoutException & ex)
{
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
throw Exception{"A remote operation timed out while committing",
ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE};
}
catch (...)
{
try
@ -1257,6 +1291,21 @@ bool ReshardingWorker::checkAttachLogRecord(LogRecord & log_record)
pool.schedule([i, &tasks]{ tasks[i](); });
}
}
catch (const Poco::TimeoutException & ex)
{
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
throw Exception{"Part checking on remote node timed out while attempting "
"to fix a failed ATTACH operation",
ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE};
}
catch (...)
{
try

View File

@ -3413,13 +3413,15 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
/// For any reason the coordinator has disappeared. So obviously
/// we don't have any means to notify other nodes of an error.
}
else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)
{
/// nothing here
}
else
{
handle_exception(ex.message());
LOG_ERROR(log, dumped_coordinator_state);
}
}
throw;