Review fixes + enable in ci

This commit is contained in:
Alexander Sapin 2023-07-06 17:15:26 +02:00
parent 30bcc73c63
commit 9ae0dc730c
7 changed files with 66 additions and 63 deletions

View File

@ -61,6 +61,7 @@ configure
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
start
@ -90,6 +91,7 @@ configure
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
start

View File

@ -329,8 +329,8 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shutdown)
{
setReadonly(part_of_full_shutdown);
storage.partialShutdown(part_of_full_shutdown);
setReadonly(/* on_shutdown = */ part_of_full_shutdown);
storage.partialShutdown();
}
@ -341,8 +341,7 @@ void ReplicatedMergeTreeRestartingThread::shutdown(bool part_of_full_shutdown)
task->deactivate();
LOG_TRACE(log, "Restarting thread finished");
/// Stop other tasks.
partialShutdown(part_of_full_shutdown);
setReadonly(part_of_full_shutdown);
}
void ReplicatedMergeTreeRestartingThread::setReadonly(bool on_shutdown)

View File

@ -36,6 +36,7 @@ public:
void shutdown(bool part_of_full_shutdown);
void run();
private:
StorageReplicatedMergeTree & storage;
String log_name;

View File

@ -3942,17 +3942,26 @@ void StorageReplicatedMergeTree::addLastSentPart(const MergeTreePartInfo & info)
last_sent_parts_cv.notify_all();
}
void StorageReplicatedMergeTree::waitForUniquePartsToBeFetchedByOtherReplicas(size_t wait_ms)
void StorageReplicatedMergeTree::waitForUniquePartsToBeFetchedByOtherReplicas(StorageReplicatedMergeTree::ShutdownDeadline shutdown_deadline_)
{
if (!shutdown_called.load())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Called waitForUniquePartsToBeFetchedByOtherReplicas before shutdown, it's a bug");
auto settings_ptr = getSettings();
auto wait_ms = settings_ptr->wait_for_unique_parts_send_before_shutdown_ms.totalMilliseconds();
if (wait_ms == 0)
{
LOG_INFO(log, "Will not wait for unique parts to be fetched by other replicas because wait time is zero");
return;
}
if (shutdown_deadline_ <= std::chrono::system_clock::now())
{
LOG_INFO(log, "Will not wait for unique parts to be fetched by other replicas because shutdown_deadline already passed");
return;
}
auto zookeeper = getZooKeeperIfTableShutDown();
auto unique_parts_set = findReplicaUniqueParts(replica_name, zookeeper_path, format_version, zookeeper, log);
@ -3968,7 +3977,6 @@ void StorageReplicatedMergeTree::waitForUniquePartsToBeFetchedByOtherReplicas(si
auto wait_predicate = [&] () -> bool
{
bool all_fetched = true;
for (auto it = unique_parts_set.begin(); it != unique_parts_set.end();)
{
const auto & part = *it;
@ -3985,22 +3993,19 @@ void StorageReplicatedMergeTree::waitForUniquePartsToBeFetchedByOtherReplicas(si
}
}
if (!found)
{
all_fetched = false;
break;
}
}
return all_fetched;
return unique_parts_set.empty();
};
std::unique_lock lock(last_sent_parts_mutex);
if (!last_sent_parts_cv.wait_for(lock, std::chrono::milliseconds(wait_ms), wait_predicate))
if (!last_sent_parts_cv.wait_until(lock, shutdown_deadline_, wait_predicate))
LOG_WARNING(log, "Failed to wait for unique parts to be fetched in {} ms, {} parts can be left on this replica", wait_ms, unique_parts_set.size());
else
LOG_INFO(log, "Successfully waited all the parts");
}
std::vector<MergeTreePartInfo> StorageReplicatedMergeTree::findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, Poco::Logger * log_)
std::set<MergeTreePartInfo> StorageReplicatedMergeTree::findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, Poco::Logger * log_)
{
if (!zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica_name_ / "is_active"))
{
@ -4027,26 +4032,25 @@ std::vector<MergeTreePartInfo> StorageReplicatedMergeTree::findReplicaUniquePart
}
else
{
LOG_TRACE(log_, "Fetching parts for replica {}", replica);
data_parts_on_replicas.emplace_back(format_version_);
for (const auto & part : parts)
{
if (data_parts_on_replicas.back().getContainingPart(part).empty())
data_parts_on_replicas.back().add(part);
}
LOG_TRACE(log_, "Fetching parts for replica {}: [{}]", replica, fmt::join(parts, ", "));
data_parts_on_replicas.emplace_back(format_version_, parts);
}
}
std::vector<MergeTreePartInfo> our_unique_parts;
if (data_parts_on_replicas.empty())
{
LOG_TRACE(log_, "Has no active replicas, will no try to wait for fetch");
return {};
}
std::set<MergeTreePartInfo> our_unique_parts;
for (const auto & part : our_parts)
{
LOG_TRACE(log_, "Looking for part {}", part);
bool found = false;
for (const auto & active_parts_set : data_parts_on_replicas)
{
if (!active_parts_set.getContainingPart(part).empty())
{
LOG_TRACE(log_, "Part {} found", part);
found = true;
break;
}
@ -4054,8 +4058,8 @@ std::vector<MergeTreePartInfo> StorageReplicatedMergeTree::findReplicaUniquePart
if (!found)
{
LOG_TRACE(log_, "Part not {} found", part);
our_unique_parts.emplace_back(MergeTreePartInfo::fromPartName(part, format_version_));
LOG_TRACE(log_, "Part not {} found on other replicas", part);
our_unique_parts.emplace(MergeTreePartInfo::fromPartName(part, format_version_));
}
}
@ -4836,9 +4840,7 @@ void StorageReplicatedMergeTree::flushAndPrepareForShutdown()
if (shutdown_prepared_called.exchange(true))
return;
session_expired_callback_handler.reset();
stopOutdatedDataPartsLoadingTask();
auto settings_ptr = getSettings();
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();
@ -4850,39 +4852,17 @@ void StorageReplicatedMergeTree::flushAndPrepareForShutdown()
attach_thread->shutdown();
restarting_thread.shutdown(/* part_of_full_shutdown */true);
background_operations_assignee.finish();
part_moves_between_shards_orchestrator.shutdown();
{
auto lock = queue.lockQueue();
/// Cancel logs pulling after background task were cancelled. It's still
/// required because we can trigger pullLogsToQueue during manual OPTIMIZE,
/// MUTATE, etc. query.
queue.pull_log_blocker.cancelForever();
}
background_moves_assignee.finish();
shutdown_deadline.emplace(std::chrono::system_clock::now() + std::chrono::milliseconds(settings_ptr->wait_for_unique_parts_send_before_shutdown_ms.totalMilliseconds()));
}
void StorageReplicatedMergeTree::partialShutdown(bool part_of_full_shutdown)
void StorageReplicatedMergeTree::partialShutdown()
{
ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown);
partial_shutdown_called = true;
partial_shutdown_event.set();
queue.notifySubscribersOnPartialShutdown();
if (!part_of_full_shutdown)
{
/// If we are going to completely shutdown table we allow other
/// replicas to fetch parts which are unique for our replica.
///
/// Replicas try to fetch part only in case the source replica is active,
/// so don't reset handler here.
LOG_DEBUG(log, "Reset active node, replica will be inactive");
replica_is_active_node = nullptr;
}
else
LOG_DEBUG(log, "Will not reset active node, it will be reset completely during full shutdown");
replica_is_active_node = nullptr;
LOG_TRACE(log, "Waiting for threads to finish");
merge_selecting_task->deactivate();
@ -4914,10 +4894,27 @@ void StorageReplicatedMergeTree::shutdown()
flushAndPrepareForShutdown();
auto settings_ptr = getSettings();
LOG_DEBUG(log, "Data parts exchange still exists {}", data_parts_exchange_endpoint != nullptr);
waitForUniquePartsToBeFetchedByOtherReplicas(settings_ptr->wait_for_unique_parts_send_before_shutdown_ms.totalMilliseconds());
if (!shutdown_deadline.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Shutdown deadline is not set in shutdown");
replica_is_active_node = nullptr;
waitForUniquePartsToBeFetchedByOtherReplicas(*shutdown_deadline);
session_expired_callback_handler.reset();
stopOutdatedDataPartsLoadingTask();
partialShutdown();
part_moves_between_shards_orchestrator.shutdown();
background_operations_assignee.finish();
{
auto lock = queue.lockQueue();
/// Cancel logs pulling after background task were cancelled. It's still
/// required because we can trigger pullLogsToQueue during manual OPTIMIZE,
/// MUTATE, etc. query.
queue.pull_log_blocker.cancelForever();
}
background_moves_assignee.finish();
auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{});
if (data_parts_exchange_ptr)

View File

@ -118,7 +118,7 @@ public:
/// Partial shutdown called if we loose connection to zookeeper.
/// Table can also recover after partial shutdown and continue
/// to work. This method can be called regularly.
void partialShutdown(bool part_of_full_shutdown);
void partialShutdown();
/// These two methods are called during final table shutdown (DROP/DETACH/overall server shutdown).
/// The shutdown process is split into two methods to make it more soft and fast. In database shutdown()
@ -368,15 +368,11 @@ public:
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
void addLastSentPart(const MergeTreePartInfo & info);
std::deque<MergeTreePartInfo> getLastSentParts() const
{
std::lock_guard lock(last_sent_parts_mutex);
return last_sent_parts;
}
/// Wait required amount of milliseconds to give other replicas a chance to
/// download unique parts from our replica
void waitForUniquePartsToBeFetchedByOtherReplicas(size_t wait_ms);
using ShutdownDeadline = std::chrono::time_point<std::chrono::system_clock>;
void waitForUniquePartsToBeFetchedByOtherReplicas(ShutdownDeadline shutdown_deadline);
private:
std::atomic_bool are_restoring_replica {false};
@ -483,6 +479,8 @@ private:
std::atomic<bool> shutdown_called {false};
std::atomic<bool> shutdown_prepared_called {false};
std::optional<ShutdownDeadline> shutdown_deadline;
mutable std::mutex last_sent_parts_mutex;
std::condition_variable last_sent_parts_cv;
@ -740,7 +738,7 @@ private:
*/
String findReplicaHavingCoveringPart(LogEntry & entry, bool active);
String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name);
static std::vector<MergeTreePartInfo> findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, Poco::Logger * log_);
static std::set<MergeTreePartInfo> findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, Poco::Logger * log_);
/** Download the specified part from the specified replica.
* If `to_detached`, the part is placed in the `detached` directory.

View File

@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<wait_for_unique_parts_send_before_shutdown_ms>1000</wait_for_unique_parts_send_before_shutdown_ms>
</merge_tree>
</clickhouse>

View File

@ -57,6 +57,7 @@ ln -sf $SRC_PATH/config.d/display_name.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/reverse_dns_query_function.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/compressed_marks_and_index.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/disable_s3_env_credentials.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/enable_wait_for_shutdown_replicated_tables.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/backups.xml $DEST_SERVER_PATH/config.d/
# Not supported with fasttest.