Fix data race

This commit is contained in:
kssenii 2023-09-29 16:02:51 +02:00
parent d644992192
commit 1ef21bab3d
5 changed files with 71 additions and 46 deletions

View File

@ -208,6 +208,9 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs
/// Processing state is cached only if processing is being done by current clickhouse server
/// (because If another server is doing the processing,
/// we cannot know if state changes without checking with zookeeper so there is no point in cache here).
{
std::lock_guard lock(file_status->metadata_lock);
switch (file_status->state)
{
case FileStatus::State::Processing: [[fallthrough]];
@ -239,11 +242,12 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs
break;
}
}
}
/// Another thread could already be trying to set file as processing.
/// So there is no need to attempt the same, better to continue with the next file.
std::unique_lock lock(file_status->processing_lock, std::defer_lock);
if (!lock.try_lock())
std::unique_lock processing_lock(file_status->processing_lock, std::defer_lock);
if (!processing_lock.try_lock())
{
return nullptr;
}
@ -272,6 +276,7 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs
{
case SetFileProcessingResult::Success:
{
std::lock_guard lock(file_status->metadata_lock);
file_status->state = FileStatus::State::Processing;
file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessingMicroseconds, timer.get());
@ -284,11 +289,13 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs
}
case SetFileProcessingResult::AlreadyProcessed:
{
std::lock_guard lock(file_status->metadata_lock);
file_status->state = FileStatus::State::Processed;
break;
}
case SetFileProcessingResult::AlreadyFailed:
{
std::lock_guard lock(file_status->metadata_lock);
file_status->state = FileStatus::State::Failed;
break;
}
@ -408,12 +415,16 @@ void S3QueueFilesMetadata::setFileProcessed(ProcessingNodeHolderPtr holder)
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessedMicroseconds);
const auto & path = holder->path;
SCOPE_EXIT({
auto file_status = local_file_statuses.get(path, /* create */false);
{
std::lock_guard lock(file_status->metadata_lock);
file_status->state = FileStatus::State::Processed;
file_status->processing_end_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
}
SCOPE_EXIT({
file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessedMicroseconds, timer.get());
timer.cancel();
file_status->processing_end_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
});
switch (mode)
@ -524,9 +535,12 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S
const auto & path = holder->path;
auto file_status = local_file_statuses.get(path, /* create */false);
{
std::lock_guard lock(file_status->metadata_lock);
file_status->state = FileStatus::State::Failed;
file_status->last_exception = exception_message;
file_status->processing_end_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
}
SCOPE_EXIT({
file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileFailedMicroseconds, timer.get());
@ -583,6 +597,8 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S
if (zk_client->tryGet(zookeeper_failed_path / node_name_with_retriable_suffix, res, &stat))
{
node_metadata.retries = failed_node_metadata.retries + 1;
std::lock_guard lock(file_status->metadata_lock);
file_status->retries = node_metadata.retries;
}
@ -771,6 +787,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
LOG_TEST(log, "Cleanup is already being executed by another node");
return;
}
/// TODO because of this lock we might not update local file statuses on time on one of the nodes.
struct Node
{

View File

@ -59,7 +59,7 @@ public:
};
State state = State::None;
size_t processed_rows = 0;
std::atomic<size_t> processed_rows = 0;
time_t processing_start_time = 0;
time_t processing_end_time = 0;
size_t retries = 0;
@ -67,6 +67,7 @@ public:
ProfileEvents::Counters profile_counters;
std::mutex processing_lock;
std::mutex metadata_lock;
};
using FileStatuses = std::unordered_map<std::string, std::shared_ptr<FileStatus>>;

View File

@ -201,12 +201,15 @@ void StorageS3QueueSource::applyActionAfterProcessing(const String & path)
}
}
void StorageS3QueueSource::appendLogElement(const std::string & filename, const S3QueueFilesMetadata::FileStatus & file_status_, size_t processed_rows, bool processed)
void StorageS3QueueSource::appendLogElement(const std::string & filename, S3QueueFilesMetadata::FileStatus & file_status_, size_t processed_rows, bool processed)
{
if (!s3_queue_log)
return;
S3QueueLogElement elem
S3QueueLogElement elem{};
{
std::lock_guard lock(file_status_.metadata_lock);
elem = S3QueueLogElement
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.file_name = filename,
@ -217,6 +220,7 @@ void StorageS3QueueSource::appendLogElement(const std::string & filename, const
.processing_end_time = file_status_.processing_end_time,
.exception = file_status_.last_exception,
};
}
s3_queue_log->add(std::move(elem));
}

View File

@ -91,7 +91,7 @@ private:
std::shared_ptr<S3QueueFilesMetadata::FileStatus> file_status;
void applyActionAfterProcessing(const String & path);
void appendLogElement(const std::string & filename, const S3QueueFilesMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
void appendLogElement(const std::string & filename, S3QueueFilesMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
};
}

View File

@ -47,7 +47,10 @@ void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, co
size_t i = 0;
res_columns[i++]->insert(zookeeper_path);
res_columns[i++]->insert(file_name);
res_columns[i++]->insert(file_status->processed_rows);
std::lock_guard lock(file_status->metadata_lock);
res_columns[i++]->insert(file_status->processed_rows.load());
res_columns[i++]->insert(magic_enum::enum_name(file_status->state));
if (file_status->processing_start_time)