Fix error reporting while copying to Azure Blob Storage.

This commit is contained in:
Vitaly Baranov 2024-07-09 22:49:41 +02:00
parent b53e58c501
commit 5f302e539d

View File

@ -49,7 +49,7 @@ namespace
const String & dest_blob_, const String & dest_blob_,
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings_, std::shared_ptr<const AzureBlobStorage::RequestSettings> settings_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_, ThreadPoolCallbackRunnerUnsafe<void> schedule_,
const Poco::Logger * log_) LoggerPtr log_)
: create_read_buffer(create_read_buffer_) : create_read_buffer(create_read_buffer_)
, client(client_) , client(client_)
, offset (offset_) , offset (offset_)
@ -74,7 +74,7 @@ namespace
const String & dest_blob; const String & dest_blob;
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings; std::shared_ptr<const AzureBlobStorage::RequestSettings> settings;
ThreadPoolCallbackRunnerUnsafe<void> schedule; ThreadPoolCallbackRunnerUnsafe<void> schedule;
const Poco::Logger * log; const LoggerPtr log;
size_t max_single_part_upload_size; size_t max_single_part_upload_size;
struct UploadPartTask struct UploadPartTask
@ -83,7 +83,6 @@ namespace
size_t part_size; size_t part_size;
std::vector<std::string> block_ids; std::vector<std::string> block_ids;
bool is_finished = false; bool is_finished = false;
std::exception_ptr exception;
}; };
size_t normal_part_size; size_t normal_part_size;
@ -92,6 +91,7 @@ namespace
std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks; std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks;
int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
std::exception_ptr bg_exception TSA_GUARDED_BY(bg_tasks_mutex);
std::mutex bg_tasks_mutex; std::mutex bg_tasks_mutex;
std::condition_variable bg_tasks_condvar; std::condition_variable bg_tasks_condvar;
@ -186,7 +186,7 @@ namespace
} }
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(log, fmt::format("While performing multipart upload of blob {} in container {}", dest_blob, dest_container_for_logging));
waitForAllBackgroundTasks(); waitForAllBackgroundTasks();
throw; throw;
} }
@ -242,7 +242,12 @@ namespace
} }
catch (...) catch (...)
{ {
task->exception = std::current_exception(); std::lock_guard lock(bg_tasks_mutex);
if (!bg_exception)
{
tryLogCurrentException(log, "While writing part");
bg_exception = std::current_exception(); /// The exception will be rethrown after all background tasks stop working.
}
} }
task_finish_notify(); task_finish_notify();
}, Priority{}); }, Priority{});
@ -299,13 +304,13 @@ namespace
/// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock /// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock
bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); }); bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(bg_tasks); auto exception = TSA_SUPPRESS_WARNING_FOR_READ(bg_exception);
for (auto & task : tasks) if (exception)
{ std::rethrow_exception(exception);
if (task.exception)
std::rethrow_exception(task.exception); const auto & tasks = TSA_SUPPRESS_WARNING_FOR_READ(bg_tasks);
for (const auto & task : tasks)
block_ids.insert(block_ids.end(),task.block_ids.begin(), task.block_ids.end()); block_ids.insert(block_ids.end(),task.block_ids.begin(), task.block_ids.end());
}
} }
}; };
} }
@ -321,7 +326,8 @@ void copyDataToAzureBlobStorageFile(
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings, std::shared_ptr<const AzureBlobStorage::RequestSettings> settings,
ThreadPoolCallbackRunnerUnsafe<void> schedule) ThreadPoolCallbackRunnerUnsafe<void> schedule)
{ {
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyDataToAzureBlobStorageFile")}; auto log = getLogger("copyDataToAzureBlobStorageFile");
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, log};
helper.performCopy(); helper.performCopy();
} }
@ -339,9 +345,11 @@ void copyAzureBlobStorageFile(
const ReadSettings & read_settings, const ReadSettings & read_settings,
ThreadPoolCallbackRunnerUnsafe<void> schedule) ThreadPoolCallbackRunnerUnsafe<void> schedule)
{ {
auto log = getLogger("copyAzureBlobStorageFile");
if (settings->use_native_copy) if (settings->use_native_copy)
{ {
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob); LOG_TRACE(log, "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob);
ProfileEvents::increment(ProfileEvents::AzureCopyObject); ProfileEvents::increment(ProfileEvents::AzureCopyObject);
if (dest_client->GetClickhouseOptions().IsClientForDisk) if (dest_client->GetClickhouseOptions().IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject); ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
@ -352,7 +360,7 @@ void copyAzureBlobStorageFile(
if (size < settings->max_single_part_copy_size) if (size < settings->max_single_part_copy_size)
{ {
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy blob sync {} -> {}", src_blob, dest_blob); LOG_TRACE(log, "Copy blob sync {} -> {}", src_blob, dest_blob);
block_blob_client_dest.CopyFromUri(source_uri); block_blob_client_dest.CopyFromUri(source_uri);
} }
else else
@ -368,7 +376,7 @@ void copyAzureBlobStorageFile(
if (copy_status.HasValue() && copy_status.Value() == Azure::Storage::Blobs::Models::CopyStatus::Success) if (copy_status.HasValue() && copy_status.Value() == Azure::Storage::Blobs::Models::CopyStatus::Success)
{ {
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy of {} to {} finished", properties_model.CopySource.Value(), dest_blob); LOG_TRACE(log, "Copy of {} to {} finished", properties_model.CopySource.Value(), dest_blob);
} }
else else
{ {
@ -382,14 +390,14 @@ void copyAzureBlobStorageFile(
} }
else else
{ {
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob); LOG_TRACE(log, "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob);
auto create_read_buffer = [&] auto create_read_buffer = [&]
{ {
return std::make_unique<ReadBufferFromAzureBlobStorage>( return std::make_unique<ReadBufferFromAzureBlobStorage>(
src_client, src_blob, read_settings, settings->max_single_read_retries, settings->max_single_download_retries); src_client, src_blob, read_settings, settings->max_single_read_retries, settings->max_single_download_retries);
}; };
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyAzureBlobStorageFile")}; UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, log};
helper.performCopy(); helper.performCopy();
} }
} }