mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Preparation for structured logging
This commit is contained in:
parent
a4b2daae0d
commit
5aff138956
@ -508,18 +508,18 @@ void Connection::sendScalarsData(Scalars & data)
|
||||
"Sent data for {} scalars, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), compressed {} times to {} ({}/sec.)",
|
||||
data.size(), rows, elapsed,
|
||||
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes),
|
||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()),
|
||||
ReadableSize(maybe_compressed_out_bytes),
|
||||
ReadableSize(maybe_compressed_out_bytes / watch.elapsedSeconds()),
|
||||
static_cast<double>(maybe_compressed_out_bytes) / out_bytes,
|
||||
formatReadableSizeWithBinarySuffix(out_bytes),
|
||||
formatReadableSizeWithBinarySuffix(out_bytes / watch.elapsedSeconds()));
|
||||
ReadableSize(out_bytes),
|
||||
ReadableSize(out_bytes / watch.elapsedSeconds()));
|
||||
else
|
||||
LOG_DEBUG(log_wrapper.get(),
|
||||
"Sent data for {} scalars, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), no compression.",
|
||||
data.size(), rows, elapsed,
|
||||
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes),
|
||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()));
|
||||
ReadableSize(maybe_compressed_out_bytes),
|
||||
ReadableSize(maybe_compressed_out_bytes / watch.elapsedSeconds()));
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -612,18 +612,18 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
|
||||
"Sent data for {} external tables, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), compressed {} times to {} ({}/sec.)",
|
||||
data.size(), rows, elapsed,
|
||||
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes),
|
||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()),
|
||||
ReadableSize(maybe_compressed_out_bytes),
|
||||
ReadableSize(maybe_compressed_out_bytes / watch.elapsedSeconds()),
|
||||
static_cast<double>(maybe_compressed_out_bytes) / out_bytes,
|
||||
formatReadableSizeWithBinarySuffix(out_bytes),
|
||||
formatReadableSizeWithBinarySuffix(out_bytes / watch.elapsedSeconds()));
|
||||
ReadableSize(out_bytes),
|
||||
ReadableSize(out_bytes / watch.elapsedSeconds()));
|
||||
else
|
||||
LOG_DEBUG(log_wrapper.get(),
|
||||
"Sent data for {} external tables, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), no compression.",
|
||||
data.size(), rows, elapsed,
|
||||
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes),
|
||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()));
|
||||
ReadableSize(maybe_compressed_out_bytes),
|
||||
ReadableSize(maybe_compressed_out_bytes / watch.elapsedSeconds()));
|
||||
}
|
||||
|
||||
std::optional<Poco::Net::SocketAddress> Connection::getResolvedAddress() const
|
||||
|
@ -18,8 +18,8 @@ void AlignedBuffer::alloc(size_t size, size_t alignment)
|
||||
void * new_buf;
|
||||
int res = ::posix_memalign(&new_buf, std::max(alignment, sizeof(void*)), size);
|
||||
if (0 != res)
|
||||
throwFromErrno("Cannot allocate memory (posix_memalign), size: "
|
||||
+ formatReadableSizeWithBinarySuffix(size) + ", alignment: " + formatReadableSizeWithBinarySuffix(alignment) + ".",
|
||||
throwFromErrno(fmt::format("Cannot allocate memory (posix_memalign), size: {}, alignment: {}.",
|
||||
ReadableSize(size), ReadableSize(alignment)),
|
||||
ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
||||
buf = new_buf;
|
||||
}
|
||||
|
@ -129,7 +129,7 @@ public:
|
||||
|
||||
void * new_buf = ::realloc(buf, new_size);
|
||||
if (nullptr == new_buf)
|
||||
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
DB::throwFromErrno(fmt::format("Allocator: Cannot realloc from {} to {}.", ReadableSize(old_size), ReadableSize(new_size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
|
||||
buf = new_buf;
|
||||
if constexpr (clear_memory)
|
||||
@ -145,7 +145,8 @@ public:
|
||||
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE,
|
||||
PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
|
||||
if (MAP_FAILED == buf)
|
||||
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
|
||||
DB::throwFromErrno(fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.",
|
||||
ReadableSize(old_size), ReadableSize(new_size)), DB::ErrorCodes::CANNOT_MREMAP);
|
||||
|
||||
/// No need for zero-fill, because mmap guarantees it.
|
||||
}
|
||||
@ -201,13 +202,13 @@ private:
|
||||
if (size >= MMAP_THRESHOLD)
|
||||
{
|
||||
if (alignment > MMAP_MIN_ALIGNMENT)
|
||||
throw DB::Exception("Too large alignment " + formatReadableSizeWithBinarySuffix(alignment) + ": more than page size when allocating "
|
||||
+ formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::BAD_ARGUMENTS);
|
||||
throw DB::Exception(fmt::format("Too large alignment {}: more than page size when allocating {}.",
|
||||
ReadableSize(alignment), ReadableSize(size)), DB::ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
buf = mmap(getMmapHint(), size, PROT_READ | PROT_WRITE,
|
||||
mmap_flags, -1, 0);
|
||||
if (MAP_FAILED == buf)
|
||||
DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
DB::throwFromErrno(fmt::format("Allocator: Cannot mmap {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
|
||||
/// No need for zero-fill, because mmap guarantees it.
|
||||
}
|
||||
@ -221,7 +222,7 @@ private:
|
||||
buf = ::malloc(size);
|
||||
|
||||
if (nullptr == buf)
|
||||
DB::throwFromErrno("Allocator: Cannot malloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
DB::throwFromErrno(fmt::format("Allocator: Cannot malloc {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -229,7 +230,8 @@ private:
|
||||
int res = posix_memalign(&buf, alignment, size);
|
||||
|
||||
if (0 != res)
|
||||
DB::throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
||||
DB::throwFromErrno(fmt::format("Cannot allocate memory (posix_memalign) {}.", ReadableSize(size)),
|
||||
DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
||||
|
||||
if constexpr (clear_memory)
|
||||
memset(buf, 0, size);
|
||||
@ -243,7 +245,7 @@ private:
|
||||
if (size >= MMAP_THRESHOLD)
|
||||
{
|
||||
if (0 != munmap(buf, size))
|
||||
DB::throwFromErrno("Allocator: Cannot munmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_MUNMAP);
|
||||
DB::throwFromErrno(fmt::format("Allocator: Cannot munmap {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_MUNMAP);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -177,13 +177,13 @@ private:
|
||||
{
|
||||
ptr = mmap(address_hint, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
||||
if (MAP_FAILED == ptr)
|
||||
DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
DB::throwFromErrno(fmt::format("Allocator: Cannot mmap {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
}
|
||||
|
||||
~Chunk()
|
||||
{
|
||||
if (ptr && 0 != munmap(ptr, size))
|
||||
DB::throwFromErrno("Allocator: Cannot munmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_MUNMAP);
|
||||
DB::throwFromErrno(fmt::format("Allocator: Cannot munmap {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_MUNMAP);
|
||||
}
|
||||
|
||||
Chunk(Chunk && other) : ptr(other.ptr), size(other.size)
|
||||
|
@ -278,7 +278,7 @@ private:
|
||||
void * new_data = nullptr;
|
||||
int res = posix_memalign(&new_data, alignment, prefix_size + new_size * sizeof(T));
|
||||
if (0 != res)
|
||||
throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(new_size) + ".",
|
||||
throwFromErrno(fmt::format("Cannot allocate memory (posix_memalign) {}.", ReadableSize(new_size)),
|
||||
ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
||||
|
||||
data_ptr = static_cast<char *>(new_data);
|
||||
|
@ -144,13 +144,16 @@ static void getNoSpaceLeftInfoMessage(std::filesystem::path path, std::string &
|
||||
path = path.parent_path();
|
||||
|
||||
auto fs = getStatVFS(path);
|
||||
msg += "\nTotal space: " + formatReadableSizeWithBinarySuffix(fs.f_blocks * fs.f_bsize)
|
||||
+ "\nAvailable space: " + formatReadableSizeWithBinarySuffix(fs.f_bavail * fs.f_bsize)
|
||||
+ "\nTotal inodes: " + formatReadableQuantity(fs.f_files)
|
||||
+ "\nAvailable inodes: " + formatReadableQuantity(fs.f_favail);
|
||||
|
||||
auto mount_point = getMountPoint(path).string();
|
||||
msg += "\nMount point: " + mount_point;
|
||||
|
||||
fmt::format_to(std::back_inserter(msg),
|
||||
"\nTotal space: {}\nAvailable space: {}\nTotal inodes: {}\nAvailable inodes: {}\nMount point: {}",
|
||||
ReadableSize(fs.f_blocks * fs.f_bsize),
|
||||
ReadableSize(fs.f_bavail * fs.f_bsize),
|
||||
formatReadableQuantity(fs.f_files),
|
||||
formatReadableQuantity(fs.f_favail),
|
||||
mount_point);
|
||||
|
||||
#if defined(__linux__)
|
||||
msg += "\nFilesystem: " + getFilesystemName(mount_point);
|
||||
#endif
|
||||
|
@ -50,13 +50,13 @@ MemoryTracker::~MemoryTracker()
|
||||
void MemoryTracker::logPeakMemoryUsage() const
|
||||
{
|
||||
const auto * description = description_ptr.load(std::memory_order_relaxed);
|
||||
LOG_DEBUG(&Logger::get("MemoryTracker"), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), formatReadableSizeWithBinarySuffix(peak));
|
||||
LOG_DEBUG(&Logger::get("MemoryTracker"), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(peak));
|
||||
}
|
||||
|
||||
void MemoryTracker::logMemoryUsage(Int64 current) const
|
||||
{
|
||||
const auto * description = description_ptr.load(std::memory_order_relaxed);
|
||||
LOG_DEBUG(&Logger::get("MemoryTracker"), "Current memory usage{}: {}.", (description ? " " + std::string(description) : ""), formatReadableSizeWithBinarySuffix(current));
|
||||
LOG_DEBUG(&Logger::get("MemoryTracker"), "Current memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(current));
|
||||
}
|
||||
|
||||
|
||||
|
@ -102,7 +102,7 @@ void LazyPipeFDs::tryIncreaseSize(int desired_size)
|
||||
if (-1 == fcntl(fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
|
||||
throwFromErrno("Cannot increase pipe capacity to " + std::to_string(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
|
||||
|
||||
LOG_TRACE(log, "Pipe capacity is {}", formatReadableSizeWithBinarySuffix(std::min(pipe_size, desired_size)));
|
||||
LOG_TRACE(log, "Pipe capacity is {}", ReadableSize(std::min(pipe_size, desired_size)));
|
||||
}
|
||||
#else
|
||||
(void)desired_size;
|
||||
|
@ -1,6 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <fmt/format.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -20,3 +22,35 @@ std::string formatReadableSizeWithDecimalSuffix(double value, int precision = 2)
|
||||
/// Prints the number as 123.45 billion.
|
||||
void formatReadableQuantity(double value, DB::WriteBuffer & out, int precision = 2);
|
||||
std::string formatReadableQuantity(double value, int precision = 2);
|
||||
|
||||
|
||||
/// Wrapper around value. If used with fmt library (e.g. for log messages),
|
||||
/// value is automatically formatted as size with binary suffix.
|
||||
struct ReadableSize
|
||||
{
|
||||
double value;
|
||||
explicit ReadableSize(double value_) : value(value_) {}
|
||||
};
|
||||
|
||||
/// See https://fmt.dev/latest/api.html#formatting-user-defined-types
|
||||
template <>
|
||||
struct fmt::formatter<ReadableSize>
|
||||
{
|
||||
constexpr auto parse(format_parse_context & ctx)
|
||||
{
|
||||
auto it = ctx.begin();
|
||||
auto end = ctx.end();
|
||||
|
||||
/// Only support {}.
|
||||
if (it != end && *it != '}')
|
||||
throw format_error("invalid format");
|
||||
|
||||
return it;
|
||||
}
|
||||
|
||||
template <typename FormatContext>
|
||||
auto format(const ReadableSize & size, FormatContext & ctx)
|
||||
{
|
||||
return format_to(ctx.out(), "{}", formatReadableSizeWithBinarySuffix(size.value));
|
||||
}
|
||||
};
|
||||
|
@ -60,7 +60,7 @@ Block AggregatingBlockInputStream::readImpl()
|
||||
input_streams.emplace_back(temporary_inputs.back()->block_in);
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), formatReadableSizeWithBinarySuffix(files.sum_size_compressed), formatReadableSizeWithBinarySuffix(files.sum_size_uncompressed));
|
||||
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
|
||||
|
||||
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(input_streams, params, final, 1, 1);
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ void ColumnGathererStream::readSuffixImpl()
|
||||
else
|
||||
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
|
||||
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows, seconds,
|
||||
profile_info.rows / seconds, formatReadableSizeWithBinarySuffix(profile_info.bytes / seconds));
|
||||
profile_info.rows / seconds, ReadableSize(profile_info.bytes / seconds));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -264,7 +264,7 @@ void MergeSortingBlockInputStream::remerge()
|
||||
}
|
||||
merger.readSuffix();
|
||||
|
||||
LOG_DEBUG(log, "Memory usage is lowered from {} to {}", formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks), formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
|
||||
LOG_DEBUG(log, "Memory usage is lowered from {} to {}", ReadableSize(sum_bytes_in_blocks), ReadableSize(new_sum_bytes_in_blocks));
|
||||
|
||||
/// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
|
||||
if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)
|
||||
|
@ -269,7 +269,7 @@ void MergingSortedBlockInputStream::readSuffixImpl()
|
||||
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in {} sec., {} rows/sec., {}/sec",
|
||||
profile_info.blocks, profile_info.rows, seconds,
|
||||
profile_info.rows / seconds,
|
||||
formatReadableSizeWithBinarySuffix(profile_info.bytes / seconds));
|
||||
ReadableSize(profile_info.bytes / seconds));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ Block ParallelAggregatingBlockInputStream::readImpl()
|
||||
input_streams.emplace_back(temporary_inputs.back()->block_in);
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), formatReadableSizeWithBinarySuffix(files.sum_size_compressed), formatReadableSizeWithBinarySuffix(files.sum_size_uncompressed));
|
||||
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
|
||||
|
||||
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(
|
||||
input_streams, params, final, temporary_data_merge_threads, temporary_data_merge_threads);
|
||||
@ -178,16 +178,16 @@ void ParallelAggregatingBlockInputStream::execute()
|
||||
{
|
||||
size_t rows = many_data[i]->size();
|
||||
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
|
||||
threads_data[i].src_rows, rows, formatReadableSizeWithBinarySuffix(threads_data[i].src_bytes),
|
||||
threads_data[i].src_rows, rows, ReadableSize(threads_data[i].src_bytes),
|
||||
elapsed_seconds, threads_data[i].src_rows / elapsed_seconds,
|
||||
formatReadableSizeWithBinarySuffix(threads_data[i].src_bytes / elapsed_seconds));
|
||||
ReadableSize(threads_data[i].src_bytes / elapsed_seconds));
|
||||
|
||||
total_src_rows += threads_data[i].src_rows;
|
||||
total_src_bytes += threads_data[i].src_bytes;
|
||||
}
|
||||
LOG_TRACE(log, "Total aggregated. {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
|
||||
total_src_rows, formatReadableSizeWithBinarySuffix(total_src_bytes), elapsed_seconds,
|
||||
total_src_rows / elapsed_seconds, formatReadableSizeWithBinarySuffix(total_src_bytes / elapsed_seconds));
|
||||
total_src_rows, ReadableSize(total_src_bytes), elapsed_seconds,
|
||||
total_src_rows / elapsed_seconds, ReadableSize(total_src_bytes / elapsed_seconds));
|
||||
|
||||
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
|
||||
/// To do this, we pass a block with zero rows to aggregate.
|
||||
|
@ -16,8 +16,8 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int too_man
|
||||
+ ", current rows: " + formatReadableQuantity(rows), too_many_rows_exception_code);
|
||||
|
||||
if (max_bytes && bytes > max_bytes)
|
||||
throw Exception("Limit for " + std::string(what) + " exceeded, max bytes: " + formatReadableSizeWithBinarySuffix(max_bytes)
|
||||
+ ", current bytes: " + formatReadableSizeWithBinarySuffix(bytes), too_many_bytes_exception_code);
|
||||
throw Exception(fmt::format("Limit for {} exceeded, max bytes: {}, current bytes: {}",
|
||||
std::string(what), ReadableSize(max_bytes), ReadableSize(bytes)), too_many_bytes_exception_code);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ bool DiskLocal::tryReserve(UInt64 bytes)
|
||||
if (unreserved_space >= bytes)
|
||||
{
|
||||
LOG_DEBUG(&Logger::get("DiskLocal"), "Reserving {} on disk {}, having unreserved {}.",
|
||||
formatReadableSizeWithBinarySuffix(bytes), backQuote(name), formatReadableSizeWithBinarySuffix(unreserved_space));
|
||||
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
|
||||
++reservation_count;
|
||||
reserved_bytes += bytes;
|
||||
return true;
|
||||
|
@ -617,7 +617,7 @@ bool DiskS3::tryReserve(UInt64 bytes)
|
||||
if (unreserved_space >= bytes)
|
||||
{
|
||||
LOG_DEBUG(&Logger::get("DiskS3"), "Reserving {} on disk {}, having unreserved {}.",
|
||||
formatReadableSizeWithBinarySuffix(bytes), backQuote(name), formatReadableSizeWithBinarySuffix(unreserved_space));
|
||||
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
|
||||
++reservation_count;
|
||||
reserved_bytes += bytes;
|
||||
return true;
|
||||
|
@ -48,11 +48,11 @@ VolumeJBOD::VolumeJBOD(
|
||||
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio / disks.size());
|
||||
for (size_t i = 0; i < disks.size(); ++i)
|
||||
if (sizes[i] < max_data_part_size)
|
||||
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})", backQuote(disks[i]->getName()), backQuote(config_prefix), formatReadableSizeWithBinarySuffix(sizes[i]), formatReadableSizeWithBinarySuffix(max_data_part_size));
|
||||
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})", backQuote(disks[i]->getName()), backQuote(config_prefix), ReadableSize(sizes[i]), ReadableSize(max_data_part_size));
|
||||
}
|
||||
static constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u;
|
||||
if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE)
|
||||
LOG_WARNING(logger, "Volume {} max_data_part_size is too low ({} < {})", backQuote(name), formatReadableSizeWithBinarySuffix(max_data_part_size), formatReadableSizeWithBinarySuffix(MIN_PART_SIZE));
|
||||
LOG_WARNING(logger, "Volume {} max_data_part_size is too low ({} < {})", backQuote(name), ReadableSize(max_data_part_size), ReadableSize(MIN_PART_SIZE));
|
||||
}
|
||||
|
||||
DiskPtr VolumeJBOD::getNextDisk()
|
||||
|
@ -34,7 +34,7 @@ void MMapReadBufferFromFileDescriptor::init(int fd_, size_t offset, size_t lengt
|
||||
{
|
||||
void * buf = mmap(nullptr, length, PROT_READ, MAP_PRIVATE, fd, offset);
|
||||
if (MAP_FAILED == buf)
|
||||
throwFromErrno("MMapReadBufferFromFileDescriptor: Cannot mmap " + formatReadableSizeWithBinarySuffix(length) + ".",
|
||||
throwFromErrno(fmt::format("MMapReadBufferFromFileDescriptor: Cannot mmap {}.", ReadableSize(length)),
|
||||
ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
|
||||
BufferBase::set(static_cast<char *>(buf), length, 0);
|
||||
@ -84,7 +84,7 @@ MMapReadBufferFromFileDescriptor::~MMapReadBufferFromFileDescriptor()
|
||||
void MMapReadBufferFromFileDescriptor::finish()
|
||||
{
|
||||
if (0 != munmap(internalBuffer().begin(), length))
|
||||
throwFromErrno("MMapReadBufferFromFileDescriptor: Cannot munmap " + formatReadableSizeWithBinarySuffix(length) + ".",
|
||||
throwFromErrno(fmt::format("MMapReadBufferFromFileDescriptor: Cannot munmap {}.", ReadableSize(length)),
|
||||
ErrorCodes::CANNOT_MUNMAP);
|
||||
|
||||
length = 0;
|
||||
|
@ -56,7 +56,8 @@ void NO_INLINE loop(ReadBuffer & in, WriteBuffer & out)
|
||||
}
|
||||
|
||||
watch.stop();
|
||||
out << "Read in " << watch.elapsedSeconds() << " sec, " << formatReadableSizeWithBinarySuffix(in.count() / watch.elapsedSeconds()) << "/sec, result = " << sum << "\n";
|
||||
out << "Read in " << watch.elapsedSeconds() << " sec, "
|
||||
<< formatReadableSizeWithBinarySuffix(in.count() / watch.elapsedSeconds()) << "/sec, result = " << sum << "\n";
|
||||
}
|
||||
|
||||
|
||||
|
@ -768,14 +768,14 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
|
||||
" ({} rows/sec., {}/sec. uncompressed, {}/sec. compressed)",
|
||||
elapsed_seconds,
|
||||
rows,
|
||||
formatReadableSizeWithBinarySuffix(uncompressed_bytes),
|
||||
formatReadableSizeWithBinarySuffix(compressed_bytes),
|
||||
ReadableSize(uncompressed_bytes),
|
||||
ReadableSize(compressed_bytes),
|
||||
uncompressed_bytes / rows,
|
||||
compressed_bytes / rows,
|
||||
uncompressed_bytes / compressed_bytes,
|
||||
rows / elapsed_seconds,
|
||||
formatReadableSizeWithBinarySuffix(uncompressed_bytes / elapsed_seconds),
|
||||
formatReadableSizeWithBinarySuffix(compressed_bytes / elapsed_seconds));
|
||||
ReadableSize(uncompressed_bytes / elapsed_seconds),
|
||||
ReadableSize(compressed_bytes / elapsed_seconds));
|
||||
}
|
||||
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
|
||||
{
|
||||
@ -871,7 +871,7 @@ void Aggregator::writeToTemporaryFileImpl(
|
||||
/// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
|
||||
data_variants.aggregator = nullptr;
|
||||
|
||||
LOG_TRACE(log, "Max size of temporary block: {} rows, {}.", max_temporary_block_size_rows, formatReadableSizeWithBinarySuffix(max_temporary_block_size_bytes));
|
||||
LOG_TRACE(log, "Max size of temporary block: {} rows, {}.", max_temporary_block_size_rows, ReadableSize(max_temporary_block_size_bytes));
|
||||
}
|
||||
|
||||
|
||||
@ -943,9 +943,9 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
|
||||
size_t rows = result.sizeWithoutOverflowRow();
|
||||
|
||||
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
|
||||
src_rows, rows, formatReadableSizeWithBinarySuffix(src_bytes),
|
||||
src_rows, rows, ReadableSize(src_bytes),
|
||||
elapsed_seconds, src_rows / elapsed_seconds,
|
||||
formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds));
|
||||
ReadableSize(src_bytes / elapsed_seconds));
|
||||
}
|
||||
|
||||
|
||||
@ -1313,9 +1313,9 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
|
||||
double elapsed_seconds = watch.elapsedSeconds();
|
||||
LOG_TRACE(log,
|
||||
"Converted aggregated data to blocks. {} rows, {} in {} sec. ({} rows/sec., {}/sec.)",
|
||||
rows, formatReadableSizeWithBinarySuffix(bytes),
|
||||
rows, ReadableSize(bytes),
|
||||
elapsed_seconds, rows / elapsed_seconds,
|
||||
formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds));
|
||||
ReadableSize(bytes / elapsed_seconds));
|
||||
|
||||
return blocks;
|
||||
}
|
||||
@ -2178,9 +2178,9 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
|
||||
size_t bytes = block.bytes();
|
||||
double elapsed_seconds = watch.elapsedSeconds();
|
||||
LOG_TRACE(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({} rows/sec., {}/sec.)",
|
||||
rows, formatReadableSizeWithBinarySuffix(bytes),
|
||||
rows, ReadableSize(bytes),
|
||||
elapsed_seconds, rows / elapsed_seconds,
|
||||
formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds));
|
||||
ReadableSize(bytes / elapsed_seconds));
|
||||
|
||||
if (isCancelled())
|
||||
return {};
|
||||
|
@ -506,9 +506,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
if (elem.read_rows != 0)
|
||||
{
|
||||
LOG_INFO(&Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
|
||||
elem.read_rows, formatReadableSizeWithBinarySuffix(elem.read_bytes), elapsed_seconds,
|
||||
elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds,
|
||||
static_cast<size_t>(elem.read_rows / elapsed_seconds),
|
||||
formatReadableSizeWithBinarySuffix(elem.read_bytes / elapsed_seconds));
|
||||
ReadableSize(elem.read_bytes / elapsed_seconds));
|
||||
}
|
||||
|
||||
elem.thread_ids = std::move(info.thread_ids);
|
||||
|
@ -60,24 +60,25 @@ void MySQLOutputFormat::consume(Chunk chunk)
|
||||
void MySQLOutputFormat::finalize()
|
||||
{
|
||||
size_t affected_rows = 0;
|
||||
std::stringstream human_readable_info;
|
||||
std::string human_readable_info;
|
||||
if (QueryStatus * process_list_elem = context->getProcessListElement())
|
||||
{
|
||||
CurrentThread::finalizePerformanceCounters();
|
||||
QueryStatusInfo info = process_list_elem->getInfo();
|
||||
affected_rows = info.written_rows;
|
||||
human_readable_info << std::fixed << std::setprecision(3)
|
||||
<< "Read " << info.read_rows << " rows, " << formatReadableSizeWithBinarySuffix(info.read_bytes) << " in " << info.elapsed_seconds << " sec., "
|
||||
<< static_cast<size_t>(info.read_rows / info.elapsed_seconds) << " rows/sec., "
|
||||
<< formatReadableSizeWithBinarySuffix(info.read_bytes / info.elapsed_seconds) << "/sec.";
|
||||
human_readable_info = fmt::format(
|
||||
"Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
|
||||
info.read_rows, ReadableSize(info.read_bytes), info.elapsed_seconds,
|
||||
static_cast<size_t>(info.read_rows / info.elapsed_seconds),
|
||||
ReadableSize(info.read_bytes / info.elapsed_seconds));
|
||||
}
|
||||
|
||||
const auto & header = getPort(PortKind::Main).getHeader();
|
||||
if (header.columns() == 0)
|
||||
packet_sender->sendPacket(OK_Packet(0x0, context->mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
|
||||
packet_sender->sendPacket(OK_Packet(0x0, context->mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info), true);
|
||||
else
|
||||
if (context->mysql.client_capabilities & CLIENT_DEPRECATE_EOF)
|
||||
packet_sender->sendPacket(OK_Packet(0xfe, context->mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
|
||||
packet_sender->sendPacket(OK_Packet(0xfe, context->mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info), true);
|
||||
else
|
||||
packet_sender->sendPacket(EOF_Packet(0, 0), true);
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ void MergingSortedTransform::onFinish()
|
||||
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in {} sec., {} rows/sec., {}/sec",
|
||||
merged_data.totalChunks(), merged_data.totalMergedRows(), seconds,
|
||||
merged_data.totalMergedRows() / seconds,
|
||||
formatReadableSizeWithBinarySuffix(merged_data.totalAllocatedBytes() / seconds));
|
||||
ReadableSize(merged_data.totalAllocatedBytes() / seconds));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -542,9 +542,9 @@ void AggregatingTransform::initGenerate()
|
||||
size_t rows = variants.sizeWithoutOverflowRow();
|
||||
|
||||
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
|
||||
src_rows, rows, formatReadableSizeWithBinarySuffix(src_bytes),
|
||||
src_rows, rows, ReadableSize(src_bytes),
|
||||
elapsed_seconds, src_rows / elapsed_seconds,
|
||||
formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds));
|
||||
ReadableSize(src_bytes / elapsed_seconds));
|
||||
|
||||
if (params->aggregator.hasTemporaryFiles())
|
||||
{
|
||||
@ -593,7 +593,7 @@ void AggregatingTransform::initGenerate()
|
||||
for (const auto & file : files.files)
|
||||
processors.emplace_back(std::make_unique<SourceFromNativeStream>(header, file->path()));
|
||||
|
||||
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), formatReadableSizeWithBinarySuffix(files.sum_size_compressed), formatReadableSizeWithBinarySuffix(files.sum_size_uncompressed));
|
||||
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
|
||||
|
||||
auto pipe = createMergingAggregatedMemoryEfficientPipe(
|
||||
header, params, files.files.size(), temporary_data_merge_threads);
|
||||
|
@ -267,7 +267,7 @@ void MergeSortingTransform::remerge()
|
||||
new_chunks.emplace_back(std::move(chunk));
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Memory usage is lowered from {} to {}", formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks), formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
|
||||
LOG_DEBUG(log, "Memory usage is lowered from {} to {}", ReadableSize(sum_bytes_in_blocks), ReadableSize(new_sum_bytes_in_blocks));
|
||||
|
||||
/// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
|
||||
if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)
|
||||
|
@ -2820,8 +2820,7 @@ inline ReservationPtr checkAndReturnReservation(UInt64 expected_size, Reservatio
|
||||
if (reservation)
|
||||
return reservation;
|
||||
|
||||
throw Exception("Cannot reserve " + formatReadableSizeWithBinarySuffix(expected_size) + ", not enough space",
|
||||
ErrorCodes::NOT_ENOUGH_SPACE);
|
||||
throw Exception(fmt::format("Cannot reserve {}, not enough space", ReadableSize(expected_size)), ErrorCodes::NOT_ENOUGH_SPACE);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -371,14 +371,13 @@ bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
|
||||
", {} required now (+{}% on overhead); suppressing similar warnings for the next hour",
|
||||
parts.front()->name,
|
||||
(*prev_it)->name,
|
||||
formatReadableSizeWithBinarySuffix(available_disk_space),
|
||||
formatReadableSizeWithBinarySuffix(sum_bytes),
|
||||
ReadableSize(available_disk_space),
|
||||
ReadableSize(sum_bytes),
|
||||
static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100));
|
||||
}
|
||||
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = "Insufficient available disk space, required " +
|
||||
formatReadableSizeWithDecimalSuffix(required_disk_space);
|
||||
*out_disable_reason = fmt::format("Insufficient available disk space, required {}", ReadableSize(required_disk_space));
|
||||
|
||||
return false;
|
||||
}
|
||||
@ -949,7 +948,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
/// Print overall profiling info. NOTE: it may duplicates previous messages
|
||||
{
|
||||
double elapsed_seconds = merge_entry->watch.elapsedSeconds();
|
||||
LOG_DEBUG(log, "Merge sorted {} rows, containing {} columns ({} merged, {} gathered) in {} sec., {} rows/sec., {}/sec.", merge_entry->rows_read, all_column_names.size(), merging_column_names.size(), gathering_column_names.size(), elapsed_seconds, merge_entry->rows_read / elapsed_seconds, formatReadableSizeWithBinarySuffix(merge_entry->bytes_read_uncompressed / elapsed_seconds));
|
||||
LOG_DEBUG(log,
|
||||
"Merge sorted {} rows, containing {} columns ({} merged, {} gathered) in {} sec., {} rows/sec., {}/sec.",
|
||||
merge_entry->rows_read,
|
||||
all_column_names.size(),
|
||||
merging_column_names.size(),
|
||||
gathering_column_names.size(),
|
||||
elapsed_seconds,
|
||||
merge_entry->rows_read / elapsed_seconds,
|
||||
ReadableSize(merge_entry->bytes_read_uncompressed / elapsed_seconds));
|
||||
}
|
||||
|
||||
if (merge_alg != MergeAlgorithm::Vertical)
|
||||
|
@ -179,7 +179,7 @@ bool MergeTreePartsMover::selectPartsForMove(
|
||||
|
||||
if (!parts_to_move.empty())
|
||||
{
|
||||
LOG_TRACE(log, "Selected {} parts to move according to storage policy rules and {} parts according to TTL rules, {} total", parts_to_move_by_policy_rules, parts_to_move_by_ttl_rules, formatReadableSizeWithBinarySuffix(parts_to_move_total_size_bytes));
|
||||
LOG_TRACE(log, "Selected {} parts to move according to storage policy rules and {} parts according to TTL rules, {} total", parts_to_move_by_policy_rules, parts_to_move_by_ttl_rules, ReadableSize(parts_to_move_total_size_bytes));
|
||||
return true;
|
||||
}
|
||||
else
|
||||
|
@ -169,7 +169,7 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf
|
||||
ProfileEvents::increment(ProfileEvents::SlowRead);
|
||||
LOG_DEBUG(log, "Slow read, event №{}: read {} bytes in {} sec., {}/s.",
|
||||
backoff_state.num_events, info.bytes_read, info.nanoseconds / 1e9,
|
||||
formatReadableSizeWithBinarySuffix(throughput));
|
||||
ReadableSize(throughput));
|
||||
|
||||
if (backoff_state.num_events < backoff_settings.min_events)
|
||||
return;
|
||||
|
@ -1030,11 +1030,18 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
||||
|
||||
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
|
||||
{
|
||||
String reason = "Not executing log entry " + entry.typeToString() + " for part " + entry.new_part_name
|
||||
+ " because source parts size (" + formatReadableSizeWithBinarySuffix(sum_parts_size_in_bytes)
|
||||
+ ") is greater than the current maximum (" + formatReadableSizeWithBinarySuffix(max_source_parts_size) + ").";
|
||||
LOG_DEBUG(log, reason);
|
||||
out_postpone_reason = reason;
|
||||
const char * format_str = "Not executing log entry {} for part {}"
|
||||
" because source parts size ({}) is greater than the current maximum ({}).";
|
||||
|
||||
LOG_DEBUG(log, format_str,
|
||||
entry.typeToString(), entry.new_part_name,
|
||||
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
|
||||
|
||||
/// Copy-paste of above because we need structured logging (instead of already formatted message).
|
||||
out_postpone_reason = fmt::format(format_str,
|
||||
entry.typeToString(), entry.new_part_name,
|
||||
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -192,7 +192,7 @@ void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
|
||||
|
||||
/// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project.
|
||||
LOG_INFO(&Logger::get("StorageSetOrJoinBase"), "Loaded from backup file {}. {} rows, {}. State has {} unique rows.",
|
||||
file_path, backup_stream.getProfileInfo().rows, formatReadableSizeWithBinarySuffix(backup_stream.getProfileInfo().bytes), getSize());
|
||||
file_path, backup_stream.getProfileInfo().rows, ReadableSize(backup_stream.getProfileInfo().bytes), getSize());
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user