diff --git a/src/Dictionaries/HashedDictionaryParallelLoader.h b/src/Dictionaries/HashedDictionaryParallelLoader.h index 1b8b7b7f555..c0b4aa73adb 100644 --- a/src/Dictionaries/HashedDictionaryParallelLoader.h +++ b/src/Dictionaries/HashedDictionaryParallelLoader.h @@ -93,8 +93,21 @@ public: for (size_t shard = 0; shard < shards; ++shard) { - if (!shards_queues[shard]->push(std::move(shards_blocks[shard]))) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to shards queue #{}", shard); + const auto & current_block = shards_blocks[shard]; + while (!shards_queues[shard]->tryPush(current_block, /* milliseconds= */ 100)) + { + if (shards_queues[shard]->isFinished()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to finished shards queue #{}, dictionary {}", shard, dictionary_name); + + /// We need to check if some workers failed + if (pool.active() != shards) + { + LOG_DEBUG(dictionary.log, "Some workers for dictionary {} failed, stopping all workers", dictionary_name); + stop_all_workers = true; + pool.wait(); /// We expect exception to be thrown from the failed worker thread + throw Exception(ErrorCodes::LOGICAL_ERROR, "Worker threads for dictionary {} are not active", dictionary_name); + } + } } } @@ -130,6 +143,7 @@ private: String dictionary_name; const size_t shards; ThreadPool pool; + std::atomic_bool stop_all_workers{false}; std::vector>> shards_queues; std::vector shards_slots; DictionaryKeysArenaHolder arena_holder; @@ -147,8 +161,17 @@ private: DictionaryKeysArenaHolder arena_holder_; auto & shard_queue = *shards_queues[shard]; - while (shard_queue.pop(block)) + while (true) { + if (!shard_queue.tryPop(block, /* milliseconds= */ 100)) + { + /// Check if we need to stop + if (stop_all_workers || shard_queue.isFinished()) + break; + /// Timeout expired, but the queue is not finished yet, try again + continue; + } + Stopwatch watch; dictionary.blockToAttributes(block, arena_holder_, shard); UInt64 elapsed_ms = watch.elapsedMilliseconds(); @@ -160,9 +183,6 @@ private: if (elapsed_ms > 1'000) LOG_TRACE(dictionary.log, "Block processing for shard #{} is slow {}ms (rows {})", shard, elapsed_ms, block.rows()); } - - if (!shard_queue.isFinished()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not pull non finished shards queue #{}", shard); } /// Split block to shards smaller block, using 'selector'.