Lowered amount of syscalls in AsynchronousMetrics

This commit is contained in:
Alexey Milovidov 2021-07-12 04:12:34 +03:00
parent e0effad4d1
commit 822cc0fec3
5 changed files with 102 additions and 45 deletions

View File

@ -46,4 +46,18 @@ public:
}
};
/** Similar to ReadBufferFromFile but it is using 'pread' instead of 'read'.
*/
class ReadBufferFromFilePRead : public ReadBufferFromFile
{
public:
ReadBufferFromFilePRead(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFile(file_name_, buf_size, flags, existing_memory, alignment)
{
use_pread = true;
}
};
}

View File

@ -59,7 +59,11 @@ bool ReadBufferFromFileDescriptor::nextImpl()
ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
if (use_pread)
res = ::pread(fd, internal_buffer.begin(), internal_buffer.size(), file_offset_of_buffer_end);
else
res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
}
if (!res)
break;
@ -133,7 +137,8 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
&& new_pos < file_offset_of_buffer_end)
{
/// Position is still inside buffer.
/// Position is still inside the buffer.
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin());
assert(pos < working_buffer.end());
@ -142,41 +147,61 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
}
else
{
size_t seek_pos = required_alignment > 1
/// Position is out of the buffer, we need to do real seek.
off_t seek_pos = required_alignment > 1
? new_pos / required_alignment * required_alignment
: new_pos;
size_t offset_after_seek_pos = new_pos - seek_pos;
ProfileEvents::increment(ProfileEvents::Seek);
Stopwatch watch(profile_callback ? clock_type : CLOCK_MONOTONIC);
off_t offset_after_seek_pos = new_pos - seek_pos;
/// First put position at the end of the buffer so the next read will fetch new data to the buffer.
pos = working_buffer.end();
off_t res = ::lseek(fd, seek_pos, SEEK_SET);
if (-1 == res)
throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(),
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
/// In case of using 'pread' we just update the info about the next position in file.
/// In case of using 'read' we call 'lseek'.
/// We account both cases as seek event as it leads to non-contiguous reads from file.
ProfileEvents::increment(ProfileEvents::Seek);
if (!use_pread)
{
Stopwatch watch(profile_callback ? clock_type : CLOCK_MONOTONIC);
off_t res = ::lseek(fd, seek_pos, SEEK_SET);
if (-1 == res)
throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(),
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
/// Also note that seeking past the file size is not allowed.
if (res != seek_pos)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
"The 'lseek' syscall returned value ({}) that is not expected ({})", res, seek_pos);
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
}
file_offset_of_buffer_end = seek_pos;
if (offset_after_seek_pos > 0)
ignore(offset_after_seek_pos);
return res;
return seek_pos;
}
}
void ReadBufferFromFileDescriptor::rewind()
{
ProfileEvents::increment(ProfileEvents::Seek);
off_t res = ::lseek(fd, 0, SEEK_SET);
if (-1 == res)
throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(),
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (!use_pread)
{
ProfileEvents::increment(ProfileEvents::Seek);
off_t res = ::lseek(fd, 0, SEEK_SET);
if (-1 == res)
throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(),
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
}
/// In case of pread, the ProfileEvents::Seek is not accounted, but it's Ok.
/// Clearing the buffer with existing data. New data will be read on subsequent call to 'next'.
working_buffer.resize(0);

View File

@ -14,8 +14,10 @@ namespace DB
class ReadBufferFromFileDescriptor : public ReadBufferFromFileBase
{
protected:
const size_t required_alignment = 0; /// For O_DIRECT both file offsets and memory addresses have to be aligned.
size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end().
const size_t required_alignment = 0; /// For O_DIRECT both file offsets and memory addresses have to be aligned.
bool use_pread = false; /// To access one fd from multiple threads, use 'pread' syscall instead of 'read'.
size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end().
int fd;
bool nextImpl() override;
@ -25,7 +27,9 @@ protected:
public:
ReadBufferFromFileDescriptor(int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment), required_alignment(alignment), fd(fd_) {}
: ReadBufferFromFileBase(buf_size, existing_memory, alignment), required_alignment(alignment), fd(fd_)
{
}
int getFD() const
{
@ -46,9 +50,23 @@ public:
off_t size();
void setProgressCallback(ContextPtr context);
private:
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
bool poll(size_t timeout_microseconds);
};
/** Similar to ReadBufferFromFileDescriptor but it is using 'pread' allowing multiple concurrent reads from the same fd.
*/
class ReadBufferFromFileDescriptorPRead : public ReadBufferFromFileDescriptor
{
public:
ReadBufferFromFileDescriptorPRead(int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment)
{
use_pread = true;
}
};
}

View File

@ -48,7 +48,7 @@ namespace ErrorCodes
static constexpr size_t small_buffer_size = 4096;
static void openFileIfExists(const char * filename, std::optional<ReadBufferFromFile> & out)
static void openFileIfExists(const char * filename, std::optional<ReadBufferFromFilePRead> & out)
{
/// Ignoring time of check is not time of use cases, as procfs/sysfs files are fairly persistent.
@ -57,11 +57,11 @@ static void openFileIfExists(const char * filename, std::optional<ReadBufferFrom
out.emplace(filename, small_buffer_size);
}
static std::unique_ptr<ReadBufferFromFile> openFileIfExists(const std::string & filename)
static std::unique_ptr<ReadBufferFromFilePRead> openFileIfExists(const std::string & filename)
{
std::error_code ec;
if (std::filesystem::is_regular_file(filename, ec))
return std::make_unique<ReadBufferFromFile>(filename, small_buffer_size);
return std::make_unique<ReadBufferFromFilePRead>(filename, small_buffer_size);
return {};
}
@ -89,7 +89,7 @@ AsynchronousMetrics::AsynchronousMetrics(
for (size_t thermal_device_index = 0;; ++thermal_device_index)
{
std::unique_ptr<ReadBufferFromFile> file = openFileIfExists(fmt::format("/sys/class/thermal/thermal_zone{}/temp", thermal_device_index));
std::unique_ptr<ReadBufferFromFilePRead> file = openFileIfExists(fmt::format("/sys/class/thermal/thermal_zone{}/temp", thermal_device_index));
if (!file)
{
/// Sometimes indices are from zero sometimes from one.
@ -113,7 +113,7 @@ AsynchronousMetrics::AsynchronousMetrics(
}
String hwmon_name;
ReadBufferFromFile hwmon_name_in(hwmon_name_file, small_buffer_size);
ReadBufferFromFilePRead hwmon_name_in(hwmon_name_file, small_buffer_size);
readText(hwmon_name, hwmon_name_in);
std::replace(hwmon_name.begin(), hwmon_name.end(), ' ', '_');
@ -134,14 +134,14 @@ AsynchronousMetrics::AsynchronousMetrics(
break;
}
std::unique_ptr<ReadBufferFromFile> file = openFileIfExists(sensor_value_file);
std::unique_ptr<ReadBufferFromFilePRead> file = openFileIfExists(sensor_value_file);
if (!file)
continue;
String sensor_name;
if (sensor_name_file_exists)
{
ReadBufferFromFile sensor_name_in(sensor_name_file, small_buffer_size);
ReadBufferFromFilePRead sensor_name_in(sensor_name_file, small_buffer_size);
readText(sensor_name, sensor_name_in);
std::replace(sensor_name.begin(), sensor_name.end(), ' ', '_');
}
@ -184,7 +184,7 @@ AsynchronousMetrics::AsynchronousMetrics(
if (device_name.starts_with("loop"))
continue;
std::unique_ptr<ReadBufferFromFile> file = openFileIfExists(device_dir.path() / "stat");
std::unique_ptr<ReadBufferFromFilePRead> file = openFileIfExists(device_dir.path() / "stat");
if (!file)
continue;
@ -1021,7 +1021,7 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
{
try
{
ReadBufferFromFile & in = *thermal[i];
ReadBufferFromFilePRead & in = *thermal[i];
in.rewind();
Int64 temperature = 0;
@ -1065,7 +1065,7 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
{
if (edac[i].first)
{
ReadBufferFromFile & in = *edac[i].first;
ReadBufferFromFilePRead & in = *edac[i].first;
in.rewind();
uint64_t errors = 0;
readText(errors, in);
@ -1074,7 +1074,7 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
if (edac[i].second)
{
ReadBufferFromFile & in = *edac[i].second;
ReadBufferFromFilePRead & in = *edac[i].second;
in.rewind();
uint64_t errors = 0;
readText(errors, in);

View File

@ -82,25 +82,25 @@ private:
#if defined(OS_LINUX)
MemoryStatisticsOS memory_stat;
std::optional<ReadBufferFromFile> meminfo;
std::optional<ReadBufferFromFile> loadavg;
std::optional<ReadBufferFromFile> proc_stat;
std::optional<ReadBufferFromFile> cpuinfo;
std::optional<ReadBufferFromFile> file_nr;
std::optional<ReadBufferFromFile> uptime;
std::optional<ReadBufferFromFile> net_dev;
std::optional<ReadBufferFromFilePRead> meminfo;
std::optional<ReadBufferFromFilePRead> loadavg;
std::optional<ReadBufferFromFilePRead> proc_stat;
std::optional<ReadBufferFromFilePRead> cpuinfo;
std::optional<ReadBufferFromFilePRead> file_nr;
std::optional<ReadBufferFromFilePRead> uptime;
std::optional<ReadBufferFromFilePRead> net_dev;
std::vector<std::unique_ptr<ReadBufferFromFile>> thermal;
std::vector<std::unique_ptr<ReadBufferFromFilePRead>> thermal;
std::unordered_map<String /* device name */,
std::unordered_map<String /* label name */,
std::unique_ptr<ReadBufferFromFile>>> hwmon_devices;
std::unique_ptr<ReadBufferFromFilePRead>>> hwmon_devices;
std::vector<std::pair<
std::unique_ptr<ReadBufferFromFile> /* correctable errors */,
std::unique_ptr<ReadBufferFromFile> /* uncorrectable errors */>> edac;
std::unique_ptr<ReadBufferFromFilePRead> /* correctable errors */,
std::unique_ptr<ReadBufferFromFilePRead> /* uncorrectable errors */>> edac;
std::unordered_map<String /* device name */, std::unique_ptr<ReadBufferFromFile>> block_devs;
std::unordered_map<String /* device name */, std::unique_ptr<ReadBufferFromFilePRead>> block_devs;
/// TODO: socket statistics.