Use only local snapshot for in order coordinator

This commit is contained in:
Igor Nikonov 2024-06-30 20:33:43 +00:00
parent a471716a7f
commit 37fbf905dd

View File

@ -836,6 +836,7 @@ public:
Parts all_parts_to_read;
size_t total_rows_to_read = 0;
bool state_initialized{false};
LoggerPtr log = getLogger(fmt::format("{}{}", magic_enum::enum_name(mode), "Coordinator"));
};
@ -857,6 +858,11 @@ void InOrderCoordinator<mode>::doHandleInitialAllRangesAnnouncement(InitialAllRa
{
LOG_TRACE(log, "Received an announcement {}", announcement.describe());
++stats[announcement.replica_num].number_of_requests;
if (state_initialized)
return;
size_t new_rows_to_read = 0;
/// To get rid of duplicates
@ -886,7 +892,7 @@ void InOrderCoordinator<mode>::doHandleInitialAllRangesAnnouncement(InitialAllRa
std::sort(ranges.begin(), ranges.end());
}
++stats[announcement.replica_num].number_of_requests;
state_initialized = true;
// progress_callback is not set when local plan is used for initiator
if (progress_callback && new_rows_to_read > 0)
@ -923,8 +929,15 @@ ParallelReadResponse InOrderCoordinator<mode>::handleRequest(ParallelReadRequest
if (global_part_it == all_parts_to_read.end())
continue;
if (global_part_it->replicas.empty())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Part {} requested by replica {} is not registered in working set",
part.info.getPartNameV1(),
request.replica_num);
if (!global_part_it->replicas.contains(request.replica_num))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} doesn't exist on replica {} according to the global state", part.info.getPartNameV1(), request.replica_num);
continue;
size_t current_mark_size = 0;