Fix possible stuck on error in HashedDictionaryParallelLoader

This commit is contained in:
vdimir 2024-03-06 12:04:57 +00:00
parent 083a251951
commit 70750cb108
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862

View File

@ -93,8 +93,21 @@ public:
for (size_t shard = 0; shard < shards; ++shard)
{
if (!shards_queues[shard]->push(std::move(shards_blocks[shard])))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to shards queue #{}", shard);
const auto & current_block = shards_blocks[shard];
while (!shards_queues[shard]->tryPush(current_block, /* milliseconds= */ 100))
{
if (shards_queues[shard]->isFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to finished shards queue #{}, dictionary {}", shard, dictionary_name);
/// We need to check if some workers failed
if (pool.active() != shards)
{
LOG_DEBUG(dictionary.log, "Some workers for dictionary {} failed, stopping all workers", dictionary_name);
stop_all_workers = true;
pool.wait(); /// We expect exception to be thrown from the failed worker thread
throw Exception(ErrorCodes::LOGICAL_ERROR, "Worker threads for dictionary {} are not active", dictionary_name);
}
}
}
}
@ -130,6 +143,7 @@ private:
String dictionary_name;
const size_t shards;
ThreadPool pool;
std::atomic_bool stop_all_workers{false};
std::vector<std::optional<ConcurrentBoundedQueue<Block>>> shards_queues;
std::vector<UInt64> shards_slots;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
@ -147,8 +161,17 @@ private:
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder_;
auto & shard_queue = *shards_queues[shard];
while (shard_queue.pop(block))
while (true)
{
if (!shard_queue.tryPop(block, /* milliseconds= */ 100))
{
/// Check if we need to stop
if (stop_all_workers || shard_queue.isFinished())
break;
/// Timeout expired, but the queue is not finished yet, try again
continue;
}
Stopwatch watch;
dictionary.blockToAttributes(block, arena_holder_, shard);
UInt64 elapsed_ms = watch.elapsedMilliseconds();
@ -160,9 +183,6 @@ private:
if (elapsed_ms > 1'000)
LOG_TRACE(dictionary.log, "Block processing for shard #{} is slow {}ms (rows {})", shard, elapsed_ms, block.rows());
}
if (!shard_queue.isFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not pull non finished shards queue #{}", shard);
}
/// Split block to shards smaller block, using 'selector'.