This commit is contained in:
Nikita Taranov 2024-05-15 20:38:35 +01:00
parent 9d909618a8
commit 78a9731734

View File

@ -167,6 +167,7 @@ public:
Stats stats;
size_t replicas_count{0};
size_t unavailable_replicas_count{0};
size_t sent_initial_requests{0};
ProgressCallback progress_callback;
explicit ImplInterface(size_t replicas_count_)
@ -177,9 +178,17 @@ public:
virtual ~ImplInterface() = default;
virtual ParallelReadResponse handleRequest(ParallelReadRequest request) = 0;
virtual void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) = 0;
virtual void doHandleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) = 0;
virtual void markReplicaAsUnavailable(size_t replica_number) = 0;
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
{
if (++sent_initial_requests > replicas_count)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Initiator received more initial requests than there are replicas");
doHandleInitialAllRangesAnnouncement(std::move(announcement));
}
void setProgressCallback(ProgressCallback callback) { progress_callback = std::move(callback); }
};
@ -215,7 +224,7 @@ public:
ParallelReadResponse handleRequest(ParallelReadRequest request) override;
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) override;
void doHandleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) override;
void markReplicaAsUnavailable(size_t replica_number) override;
@ -223,7 +232,6 @@ private:
/// This many granules will represent a single segment of marks that will be assigned to a replica
const size_t mark_segment_size{0};
size_t sent_initial_requests{0};
bool state_initialized{false};
size_t finished_replicas{0};
@ -422,7 +430,7 @@ void DefaultCoordinator::setProgressCallback()
}
}
void DefaultCoordinator::handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
void DefaultCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
{
const auto replica_num = announcement.replica_num;
@ -437,7 +445,6 @@ void DefaultCoordinator::handleInitialAllRangesAnnouncement(InitialAllRangesAnno
++stats[replica_num].number_of_requests;
replica_status[replica_num].is_announcement_received = true;
++sent_initial_requests;
LOG_DEBUG(log, "Sent initial requests: {} Replicas count: {}", sent_initial_requests, replicas_count);
if (sent_initial_requests == replicas_count - unavailable_replicas_count)
@ -781,6 +788,11 @@ ParallelReadResponse DefaultCoordinator::handleRequest(ParallelReadRequest reque
{
/// Nobody will come to process any more data
for (const auto & part : all_parts_to_read)
if (!part.description.ranges.empty())
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Some segments were left unread for the part {}", part.description.describe());
if (!ranges_for_stealing_queue.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Some orphaned segments were left unread");
@ -818,7 +830,7 @@ public:
}
ParallelReadResponse handleRequest([[ maybe_unused ]] ParallelReadRequest request) override;
void handleInitialAllRangesAnnouncement([[ maybe_unused ]] InitialAllRangesAnnouncement announcement) override;
void doHandleInitialAllRangesAnnouncement([[maybe_unused]] InitialAllRangesAnnouncement announcement) override;
void markReplicaAsUnavailable(size_t replica_number) override;
Parts all_parts_to_read;
@ -840,7 +852,7 @@ void InOrderCoordinator<mode>::markReplicaAsUnavailable(size_t replica_number)
}
template <CoordinationMode mode>
void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
void InOrderCoordinator<mode>::doHandleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
{
LOG_TRACE(log, "Received an announcement {}", announcement.describe());
@ -1051,7 +1063,10 @@ ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t re
{
}
ParallelReplicasReadingCoordinator::~ParallelReplicasReadingCoordinator() = default;
ParallelReplicasReadingCoordinator::~ParallelReplicasReadingCoordinator()
{
chassert(pimpl);
}
void ParallelReplicasReadingCoordinator::setProgressCallback(ProgressCallback callback)
{