diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 4445d821a31..1b93667cb92 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -201,9 +201,12 @@ void WriteBufferFromS3::writePart() std::lock_guard lock(bg_tasks_mutex); task->is_finised = true; ++num_finished_bg_tasks; - } - bg_tasks_condvar.notify_one(); + /// Notification under mutex is important here. + /// Othervies, WriteBuffer could be destroyed in between + /// Releasing lock and condvar notification. + bg_tasks_condvar.notify_one(); + } }); } else @@ -305,9 +308,13 @@ void WriteBufferFromS3::makeSinglepartUpload() { std::lock_guard lock(bg_tasks_mutex); put_object_task->is_finised = true; + + /// Notification under mutex is important here. + /// Othervies, WriteBuffer could be destroyed in between + /// Releasing lock and condvar notification. + bg_tasks_condvar.notify_one(); } - bg_tasks_condvar.notify_one(); }); } else diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 62d53e58373..a3eec3e54bc 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -412,6 +412,11 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai String escaped_name = escapeForFileName(name); String mrk_path = part_path + escaped_name + marks_file_extension; String bin_path = part_path + escaped_name + DATA_FILE_EXTENSION; + + /// Some columns may be removed because of ttl. Skip them. + if (!disk->exists(mrk_path)) + return; + auto mrk_in = disk->readFile(mrk_path); DB::CompressedReadBufferFromFile bin_in(disk->readFile(bin_path)); bool must_be_last = false; diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index f8ed4d1806a..e262217d17e 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -66,10 +66,9 @@ struct MergedBlockOutputStream::Finalizer::Impl void MergedBlockOutputStream::Finalizer::finish() { - if (impl) - impl->finish(); - - impl.reset(); + std::unique_ptr to_finish = std::move(impl); + if (to_finish) + to_finish->finish(); } void MergedBlockOutputStream::Finalizer::Impl::finish()