dbms: Code cleanup. [#METR-18510]

This commit is contained in:
Alexey Arno 2016-03-05 13:33:01 +03:00
parent f5247021b1
commit a1fd25865b
3 changed files with 10 additions and 10 deletions

View File

@ -397,7 +397,6 @@ void ReshardingWorker::perform(const Strings & job_nodes)
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
zookeeper->remove(child_full_path);
throw;
}
@ -774,7 +773,6 @@ void ReshardingWorker::applyChanges()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
pool.wait();
throw;
}

View File

@ -246,6 +246,8 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database
std::string coordinator_id = resharding_worker.createCoordinator(cluster);
std::atomic<bool> has_notified_error{false};
try
{
/// Создать запрос ALTER TABLE ... RESHARD PARTITION ... COORDINATE WITH ...
@ -322,11 +324,12 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database
/// inside a barrier. Actually, even without this solution, we would avoid such a deadlock
/// because we would eventually timeout while trying to read blocks from these other shards.
/// Nevertheless this is not the ideal way of sorting out this issue.
auto exception_callback = [&resharding_worker, coordinator_id]()
auto exception_callback = [&resharding_worker, coordinator_id, &has_notified_error]()
{
try
{
resharding_worker.setStatus(coordinator_id, ReshardingWorker::STATUS_ERROR);
has_notified_error = true;
}
catch (...)
{
@ -334,7 +337,8 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database
}
};
streams[0] = new UnionBlockInputStream<>(streams, nullptr, settings.max_distributed_connections, exception_callback);
streams[0] = new UnionBlockInputStream<>(streams, nullptr, settings.max_distributed_connections,
exception_callback);
streams.resize(1);
auto stream_ptr = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]);
@ -347,15 +351,15 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database
while (!stream.isCancelled() && stream.read())
;
stream.readSuffix();
if (!stream.isCancelled())
stream.readSuffix();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
try
{
resharding_worker.setStatus(coordinator_id, ReshardingWorker::STATUS_ERROR);
if (!has_notified_error)
resharding_worker.setStatus(coordinator_id, ReshardingWorker::STATUS_ERROR);
resharding_worker.deleteCoordinator(coordinator_id);
}
catch (...)

View File

@ -3706,8 +3706,6 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
if (has_coordinator)
{
try