This commit is contained in:
kssenii 2022-04-04 20:44:39 +02:00
parent 6310ad7cc7
commit 79627798c4
2 changed files with 47 additions and 8 deletions

View File

@ -52,6 +52,7 @@ struct WriteBufferFromS3::PutObjectTask
Aws::S3::Model::PutObjectRequest req; Aws::S3::Model::PutObjectRequest req;
bool is_finised = false; bool is_finised = false;
std::exception_ptr exception; std::exception_ptr exception;
std::optional<FileSegmentsHolder> cache_files;
}; };
WriteBufferFromS3::WriteBufferFromS3( WriteBufferFromS3::WriteBufferFromS3(
@ -279,6 +280,13 @@ void WriteBufferFromS3::writePart()
} }
fillUploadRequest(task->req, part_number); fillUploadRequest(task->req, part_number);
if (file_segments_holder)
{
task->cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
schedule([this, task]() schedule([this, task]()
{ {
try try
@ -290,6 +298,15 @@ void WriteBufferFromS3::writePart()
task->exception = std::current_exception(); task->exception = std::current_exception();
} }
try
{
finalizeCacheIfNeeded(task->cache_files);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
{ {
std::lock_guard lock(bg_tasks_mutex); std::lock_guard lock(bg_tasks_mutex);
task->is_finised = true; task->is_finised = true;
@ -300,17 +317,20 @@ void WriteBufferFromS3::writePart()
/// Releasing lock and condvar notification. /// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one(); bg_tasks_condvar.notify_one();
} }
finalizeCacheIfNeeded();
}, query_context); }, query_context);
} }
else else
{ {
UploadPartTask task; UploadPartTask task;
fillUploadRequest(task.req, part_tags.size() + 1); fillUploadRequest(task.req, part_tags.size() + 1);
if (file_segments_holder)
{
task.cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
processUploadRequest(task); processUploadRequest(task);
part_tags.push_back(task.tag); part_tags.push_back(task.tag);
finalizeCacheIfNeeded(); finalizeCacheIfNeeded(task.cache_files);
} }
} }
@ -397,7 +417,14 @@ void WriteBufferFromS3::makeSinglepartUpload()
if (schedule) if (schedule)
{ {
put_object_task = std::make_unique<PutObjectTask>(); put_object_task = std::make_unique<PutObjectTask>();
fillPutRequest(put_object_task->req); fillPutRequest(put_object_task->req);
if (file_segments_holder)
{
put_object_task->cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
schedule([this]() schedule([this]()
{ {
try try
@ -409,6 +436,15 @@ void WriteBufferFromS3::makeSinglepartUpload()
put_object_task->exception = std::current_exception(); put_object_task->exception = std::current_exception();
} }
try
{
finalizeCacheIfNeeded(put_object_task->cache_files);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
{ {
std::lock_guard lock(bg_tasks_mutex); std::lock_guard lock(bg_tasks_mutex);
put_object_task->is_finised = true; put_object_task->is_finised = true;
@ -418,16 +454,19 @@ void WriteBufferFromS3::makeSinglepartUpload()
/// Releasing lock and condvar notification. /// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one(); bg_tasks_condvar.notify_one();
} }
finalizeCacheIfNeeded();
}, query_context); }, query_context);
} }
else else
{ {
PutObjectTask task; PutObjectTask task;
fillPutRequest(task.req); fillPutRequest(task.req);
if (file_segments_holder)
{
task.cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
processPutRequest(task); processPutRequest(task);
finalizeCacheIfNeeded(); finalizeCacheIfNeeded(task.cache_files);
} }
} }
@ -455,7 +494,7 @@ void WriteBufferFromS3::processPutRequest(PutObjectTask & task)
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
} }
void WriteBufferFromS3::finalizeCacheIfNeeded() void WriteBufferFromS3::finalizeCacheIfNeeded(std::optional<FileSegmentsHolder> & file_segments_holder)
{ {
if (!file_segments_holder) if (!file_segments_holder)
return; return;

View File

@ -127,7 +127,7 @@ private:
FileCachePtr cache; FileCachePtr cache;
size_t current_download_offset = 0; size_t current_download_offset = 0;
std::optional<FileSegmentsHolder> file_segments_holder; std::optional<FileSegmentsHolder> file_segments_holder;
void finalizeCacheIfNeeded(); static void finalizeCacheIfNeeded(std::optional<FileSegmentsHolder> &);
ContextMutablePtr shared_query_context; ContextMutablePtr shared_query_context;
ContextPtr query_context; ContextPtr query_context;
}; };