Merge pull request #65839 from ClickHouse/fix-s3-queue-memory-usage

Fix s3queue memory usage
This commit is contained in:
Kseniia Sumarokova 2024-07-02 16:19:33 +00:00 committed by GitHub
commit bdc7157670
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 0 additions and 13 deletions

View File

@ -36,7 +36,6 @@ ColumnsDescription ObjectStorageQueueLogElement::getColumnsDescription()
{"status", status_datatype, "Status of the processing file"},
{"processing_start_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time of the start of processing the file"},
{"processing_end_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time of the end of processing the file"},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>()), "Profile events collected while loading this file"},
{"exception", std::make_shared<DataTypeString>(), "Exception message if happened"},
};
}
@ -64,8 +63,6 @@ void ObjectStorageQueueLogElement::appendToBlock(MutableColumns & columns) const
else
columns[i++]->insertDefault();
ProfileEvents::dumpToMapColumn(counters_snapshot, columns[i++].get(), true);
columns[i++]->insert(exception);
}

View File

@ -26,7 +26,6 @@ struct ObjectStorageQueueLogElement
Failed,
};
ObjectStorageQueueStatus status;
ProfileEvents::Counters::Snapshot counters_snapshot;
time_t processing_start_time;
time_t processing_end_time;
std::string exception;

View File

@ -34,7 +34,6 @@ public:
std::atomic<time_t> processing_start_time = 0;
std::atomic<time_t> processing_end_time = 0;
std::atomic<size_t> retries = 0;
ProfileEvents::Counters profile_counters;
private:
mutable std::mutex last_exception_mutex;

View File

@ -509,10 +509,6 @@ Chunk ObjectStorageQueueSource::generateImpl()
path, processed_rows_from_file);
}
auto * prev_scope = CurrentThread::get().attachProfileCountersScope(&file_status->profile_counters);
SCOPE_EXIT({ CurrentThread::get().attachProfileCountersScope(prev_scope); });
/// FIXME: if files are compressed, profile counters update does not work fully (object storage related counters are not saved). Why?
try
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::ObjectStorageQueuePullMicroseconds);
@ -714,7 +710,6 @@ void ObjectStorageQueueSource::appendLogElement(
.file_name = filename,
.rows_processed = processed_rows,
.status = processed ? ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed : ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed,
.counters_snapshot = file_status_.profile_counters.getPartiallyAtomicSnapshot(),
.processing_start_time = file_status_.processing_start_time,
.processing_end_time = file_status_.processing_end_time,
.exception = file_status_.getException(),

View File

@ -32,7 +32,6 @@ ColumnsDescription StorageSystemS3Queue::getColumnsDescription()
{"status", std::make_shared<DataTypeString>(), "Status of processing: Processed, Processing, Failed"},
{"processing_start_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time at which processing of the file started"},
{"processing_end_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time at which processing of the file ended"},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>()), "Profile events collected during processing of the file"},
{"exception", std::make_shared<DataTypeString>(), "Exception which happened during processing"},
};
}
@ -65,8 +64,6 @@ void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, co
else
res_columns[i++]->insertDefault();
ProfileEvents::dumpToMapColumn(file_status->profile_counters.getPartiallyAtomicSnapshot(), res_columns[i++].get(), true);
res_columns[i++]->insert(file_status->getException());
}
}