Intermediate version

This commit is contained in:
alesapin 2023-07-04 22:19:53 +02:00
parent f1aaea129d
commit 1ea5261012
3 changed files with 123 additions and 1 deletions

View File

@ -203,6 +203,8 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
sendPartFromMemory(part, out, send_projections);
else
sendPartFromDisk(part, out, client_protocol_version, false, send_projections);
data.addLastSentPart(part->name);
}
catch (const NetException &)
{

View File

@ -3928,6 +3928,111 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
return {};
}
void StorageReplicatedMergeTree::addLastSentPart(const MergeTreePartInfo & info)
{
{
std::lock_guard lock(last_sent_parts_mutex);
last_sent_parts.emplace_back(info);
while (last_sent_parts.size() > LAST_SENT_PARS_WINDOW_SIZE)
last_sent_parts.pop_front();
}
last_sent_parts_cv.notify_all();
}
void StorageReplicatedMergeTree::waitForUniquePartsToBeFetchedByOtherReplicas(size_t wait_ms)
{
if (wait_ms == 0)
{
LOG_INFO(log, "Will not wait for unique parts to be fetched by other replicas because wait time is zero");
return;
}
auto zookeeper = getZooKeeper();
auto unique_parts_set = findReplicaUniqueParts(replica_name, zookeeper_path, format_version, zookeeper);
if (unique_parts_set.empty())
{
LOG_INFO(log, "Will not wait for unique parts to be fetched because we don't have any unique parts");
return;
}
auto wait_predicate = [&] () -> void
{
bool all_fetched = true;
for (const auto & part : unique_parts_set)
{
bool found = false;
for (const auto & sent_part : last_sent_parts)
{
if (sent_part.contains(part))
{
found = true;
break;
}
}
if (!found)
{
all_fetched = false;
break;
}
}
return all_fetched;
};
std::unique_lock lock(last_sent_parts_mutex);
if (!last_sent_parts_cv.wait_for(last_sent_parts_cv, std::chrono::duration_cast<std::chrono::milliseconds>(wait_ms), wait_predicate))
LOG_WARNING(log, "Failed to wait for unqiue parts to be fetched in {} ms, {} parts can be left on this replica", wait_ms, unqiue_parts_set.size());
}
std::vector<MergeTreePartInfo> StorageReplicatedMergeTree::findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_)
{
if (zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica_name_ / "is_active"))
return {};
Strings replicas = zookeeper_->getChildren(fs::path(zookeeper_path_) / "replicas");
Strings our_parts;
std::vector<ActiveDataPartSet> data_parts_on_replicas;
for (const String & replica : replicas)
{
if (!zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica / "is_active"))
continue;
Strings parts = zookeeper_->getChildren(fs::path(zookeeper_path_) / "replicas" / replica / "parts");
if (replica == replica_name_)
{
our_parts = parts;
}
else
{
data_parts_on_replicas.emplace_back(format_version_);
for (const auto & part : parts)
{
if (!data_parts_on_replicas.back().getContainingPart(part).empty())
data_parts_on_replicas.back().add(part);
}
}
}
NameSet our_unique_parts;
for (const auto & part : our_parts)
{
bool found = false;
for (const auto & active_parts_set : data_parts_on_replicas)
{
if (!active_parts_set.getContainingPart(part).empty())
{
found = true;
break;
}
}
if (!found)
our_unique_parts.insert(MergeTreePartInfo::fromPartName(part, format_version));
}
return our_unique_parts;
}
String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entry, bool active)
{
auto zookeeper = getZooKeeper();

View File

@ -340,6 +340,15 @@ public:
/// Get a sequential consistent view of current parts.
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
void addLastSentPart(const MergeTreePartInfo & info);
std::deque<MergeTreePartInfo> getLastSentParts() const
{
std::lock_guard lock(last_sent_parts_mutex);
return last_sent_parts;
}
void waitForUniquePartsToBeFetchedByOtherReplicas(size_t wait_ms);
private:
std::atomic_bool are_restoring_replica {false};
@ -444,9 +453,14 @@ private:
Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};
static constexpr size_t LAST_SENT_PARS_WINDOW_SIZE = 1000;
std::mutex last_sent_parts_mutex;
std::condition_variable last_sent_parts_cv;
std::deque<MergeTreePartInfo> last_sent_parts;
/// Threads.
///
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
bool queue_update_in_progress = false;
@ -697,6 +711,7 @@ private:
*/
String findReplicaHavingCoveringPart(LogEntry & entry, bool active);
String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name);
static std::vector<MergeTreePartInfo> findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_);
/** Download the specified part from the specified replica.
* If `to_detached`, the part is placed in the `detached` directory.