From cce970e40cdf1eba81a1d34c6e692ec883d544e2 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Tue, 8 Sep 2020 02:09:03 +0300 Subject: [PATCH] Use join() instead of detach() for loading threads in ExternalLoader. --- src/Interpreters/ExternalLoader.cpp | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/Interpreters/ExternalLoader.cpp b/src/Interpreters/ExternalLoader.cpp index e8df205760a..dcef36de175 100644 --- a/src/Interpreters/ExternalLoader.cpp +++ b/src/Interpreters/ExternalLoader.cpp @@ -893,6 +893,8 @@ private: cancelLoading(info); } + putBackFinishedThreadsToPool(); + /// All loadings have unique loading IDs. size_t loading_id = next_id_counter++; info.loading_id = loading_id; @@ -914,6 +916,21 @@ private: } } + void putBackFinishedThreadsToPool() + { + for (auto loading_id : recently_finished_loadings) + { + auto it = loading_threads.find(loading_id); + if (it != loading_threads.end()) + { + auto thread = std::move(it->second); + loading_threads.erase(it); + thread.join(); /// It's very likely that `thread` has already finished. + } + } + recently_finished_loadings.clear(); + } + static void cancelLoading(Info & info) { if (!info.isLoading()) @@ -1095,12 +1112,11 @@ private: } min_id_to_finish_loading_dependencies.erase(std::this_thread::get_id()); - auto it = loading_threads.find(loading_id); - if (it != loading_threads.end()) - { - it->second.detach(); - loading_threads.erase(it); - } + /// Add `loading_id` to the list of recently finished loadings. + /// This list is used to later put the threads which finished loading back to the thread pool. + /// (We can't put the loading thread back to the thread pool immediately here because at this point + /// the loading thread is about to finish but it's not finished yet right now.) + recently_finished_loadings.push_back(loading_id); } /// Calculate next update time for loaded_object. Can be called without mutex locking, @@ -1158,6 +1174,7 @@ private: bool always_load_everything = false; std::atomic enable_async_loading = false; std::unordered_map loading_threads; + std::vector recently_finished_loadings; std::unordered_map min_id_to_finish_loading_dependencies; size_t next_id_counter = 1; /// should always be > 0 mutable pcg64 rnd_engine{randomSeed()};