Refactor to avoid too much TSA_GUARD clutter

Remove LOGICAL_ERROR check that should never happen.
It didn't happen on the CI so far, and seems unnecessary.
This commit is contained in:
Pablo Marcos 2024-11-12 11:02:46 +00:00
parent 516300e733
commit d4e288a8c1
2 changed files with 27 additions and 29 deletions

View File

@ -150,12 +150,13 @@ void QueryMetricLog::collectMetric(const ProcessList & process_list, String quer
void QueryMetricLog::startQuery(const String & query_id, TimePoint start_time, UInt64 interval_milliseconds) TSA_NO_THREAD_SAFETY_ANALYSIS
{
QueryMetricLogStatus query_status;
query_status.interval_milliseconds = interval_milliseconds;
query_status.next_collect_time = start_time;
QueryMetricLogStatusInfo & info = query_status.info;
info.interval_milliseconds = interval_milliseconds;
info.next_collect_time = start_time;
auto context = getContext();
const auto & process_list = context->getProcessList();
query_status.task = context->getSchedulePool().createTask("QueryMetricLog", [this, &process_list, query_id] {
info.task = context->getSchedulePool().createTask("QueryMetricLog", [this, &process_list, query_id] {
collectMetric(process_list, query_id);
});
@ -199,10 +200,10 @@ void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time,
/// that order.
{
/// Take ownership of the task so that we can destroy it in this scope after unlocking `queries_mutex`.
auto task = std::move(query_status.task);
auto task = std::move(query_status.info.task);
/// Build an empty task for the old task to make sure it does not lock any mutex on its destruction.
query_status.task = {};
query_status.info.task = {};
query_lock.unlock();
global_lock.lock();
@ -216,18 +217,18 @@ void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time,
void QueryMetricLogStatus::scheduleNext(String query_id)
{
next_collect_time += std::chrono::milliseconds(interval_milliseconds);
info.next_collect_time += std::chrono::milliseconds(info.interval_milliseconds);
const auto now = std::chrono::system_clock::now();
if (next_collect_time > now)
if (info.next_collect_time > now)
{
const auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(next_collect_time - now).count();
task->scheduleAfter(wait_time);
const auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(info.next_collect_time - now).count();
info.task->scheduleAfter(wait_time);
}
else
{
LOG_TRACE(logger, "The next collecting task for query {} should have already run at {}. Scheduling it right now",
query_id, timePointToString(next_collect_time));
task->schedule();
query_id, timePointToString(info.next_collect_time));
info.task->schedule();
}
}
@ -235,25 +236,17 @@ std::optional<QueryMetricLogElement> QueryMetricLogStatus::createLogMetricElemen
{
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Collecting query_metric_log for query {} and interval {} ms with QueryStatusInfo from {}. Next collection time: {}",
query_id, interval_milliseconds, timePointToString(query_info_time),
schedule_next ? timePointToString(next_collect_time + std::chrono::milliseconds(interval_milliseconds)) : "finished");
query_id, info.interval_milliseconds, timePointToString(query_info_time),
schedule_next ? timePointToString(info.next_collect_time + std::chrono::milliseconds(info.interval_milliseconds)) : "finished");
if (query_info_time <= last_collect_time)
if (query_info_time <= info.last_collect_time)
{
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} has a more recent metrics collected. Skipping this one", query_id);
return {};
}
/// Leave some margin because task->scheduleAfter takes a value in milliseconds.
/// So, we can expect up to 1ms of drift since BackgroundSchedulePool will compare
/// time points in milliseconds.
static auto error_margin = std::chrono::milliseconds(1);
if (schedule_next && query_info_time + error_margin < next_collect_time)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Task to collect metric for query {} scheduled at {} but run at {}",
query_id, timePointToString(next_collect_time), timePointToString(query_info_time));
last_collect_time = query_info_time;
info.last_collect_time = query_info_time;
QueryMetricLogElement elem;
elem.event_time = timeInSeconds(query_info_time);
@ -267,7 +260,7 @@ std::optional<QueryMetricLogElement> QueryMetricLogStatus::createLogMetricElemen
for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
{
const auto & new_value = (*(query_info.profile_counters))[i];
auto & old_value = last_profile_events[i];
auto & old_value = info.last_profile_events[i];
/// Profile event counters are supposed to be monotonic. However, at least the `NetworkReceiveBytes` can be inaccurate.
/// So, since in the future the counter should always have a bigger value than in the past, we skip this event.

View File

@ -41,16 +41,21 @@ struct QueryMetricLogElement
void appendToBlock(MutableColumns & columns) const;
};
struct QueryMetricLogStatusInfo
{
UInt64 interval_milliseconds;
std::chrono::system_clock::time_point last_collect_time;
std::chrono::system_clock::time_point next_collect_time;
std::vector<ProfileEvents::Count> last_profile_events = std::vector<ProfileEvents::Count>(ProfileEvents::end());
BackgroundSchedulePool::TaskHolder task;
};
struct QueryMetricLogStatus
{
using TimePoint = std::chrono::system_clock::time_point;
using Mutex = std::mutex;
UInt64 interval_milliseconds;
std::chrono::system_clock::time_point last_collect_time TSA_GUARDED_BY(getMutex());
std::chrono::system_clock::time_point next_collect_time TSA_GUARDED_BY(getMutex());
std::vector<ProfileEvents::Count> last_profile_events TSA_GUARDED_BY(getMutex()) = std::vector<ProfileEvents::Count>(ProfileEvents::end());
BackgroundSchedulePool::TaskHolder task TSA_GUARDED_BY(getMutex());
QueryMetricLogStatusInfo info TSA_GUARDED_BY(getMutex());
/// We need to be able to move it for the hash map, so we need to add an indirection here.
std::unique_ptr<Mutex> mutex = std::make_unique<Mutex>();