mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #11317 from ClickHouse/logger-fmt-2
Preparation for structured logging
This commit is contained in:
commit
9984e989c1
@ -28,7 +28,7 @@ public:
|
|||||||
void exception() override { logException(); }
|
void exception() override { logException(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Logger * log = &Logger::get("ServerErrorHandler");
|
Poco::Logger * log = &Poco::Logger::get("ServerErrorHandler");
|
||||||
|
|
||||||
void logException()
|
void logException()
|
||||||
{
|
{
|
||||||
|
@ -9,13 +9,6 @@
|
|||||||
#include <Common/CurrentThread.h>
|
#include <Common/CurrentThread.h>
|
||||||
|
|
||||||
|
|
||||||
/// TODO Remove this.
|
|
||||||
using Poco::Logger;
|
|
||||||
using Poco::Message;
|
|
||||||
using DB::LogsLevel;
|
|
||||||
using DB::CurrentThread;
|
|
||||||
|
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
template <typename... Ts> constexpr size_t numArgs(Ts &&...) { return sizeof...(Ts); }
|
template <typename... Ts> constexpr size_t numArgs(Ts &&...) { return sizeof...(Ts); }
|
||||||
@ -31,8 +24,8 @@ namespace
|
|||||||
|
|
||||||
#define LOG_IMPL(logger, priority, PRIORITY, ...) do \
|
#define LOG_IMPL(logger, priority, PRIORITY, ...) do \
|
||||||
{ \
|
{ \
|
||||||
const bool is_clients_log = (CurrentThread::getGroup() != nullptr) && \
|
const bool is_clients_log = (DB::CurrentThread::getGroup() != nullptr) && \
|
||||||
(CurrentThread::getGroup()->client_logs_level >= (priority)); \
|
(DB::CurrentThread::getGroup()->client_logs_level >= (priority)); \
|
||||||
if ((logger)->is((PRIORITY)) || is_clients_log) \
|
if ((logger)->is((PRIORITY)) || is_clients_log) \
|
||||||
{ \
|
{ \
|
||||||
std::string formatted_message = numArgs(__VA_ARGS__) > 1 ? fmt::format(__VA_ARGS__) : firstArg(__VA_ARGS__); \
|
std::string formatted_message = numArgs(__VA_ARGS__) > 1 ? fmt::format(__VA_ARGS__) : firstArg(__VA_ARGS__); \
|
||||||
@ -42,7 +35,7 @@ namespace
|
|||||||
file_function += __FILE__; \
|
file_function += __FILE__; \
|
||||||
file_function += "; "; \
|
file_function += "; "; \
|
||||||
file_function += __PRETTY_FUNCTION__; \
|
file_function += __PRETTY_FUNCTION__; \
|
||||||
Message poco_message((logger)->name(), formatted_message, \
|
Poco::Message poco_message((logger)->name(), formatted_message, \
|
||||||
(PRIORITY), file_function.c_str(), __LINE__); \
|
(PRIORITY), file_function.c_str(), __LINE__); \
|
||||||
channel->log(poco_message); \
|
channel->log(poco_message); \
|
||||||
} \
|
} \
|
||||||
@ -50,9 +43,18 @@ namespace
|
|||||||
} while (false)
|
} while (false)
|
||||||
|
|
||||||
|
|
||||||
#define LOG_TRACE(logger, ...) LOG_IMPL(logger, LogsLevel::trace, Message::PRIO_TRACE, __VA_ARGS__)
|
#define LOG_TRACE(logger, ...) LOG_IMPL(logger, DB::LogsLevel::trace, Poco::Message::PRIO_TRACE, __VA_ARGS__)
|
||||||
#define LOG_DEBUG(logger, ...) LOG_IMPL(logger, LogsLevel::debug, Message::PRIO_DEBUG, __VA_ARGS__)
|
#define LOG_DEBUG(logger, ...) LOG_IMPL(logger, DB::LogsLevel::debug, Poco::Message::PRIO_DEBUG, __VA_ARGS__)
|
||||||
#define LOG_INFO(logger, ...) LOG_IMPL(logger, LogsLevel::information, Message::PRIO_INFORMATION, __VA_ARGS__)
|
#define LOG_INFO(logger, ...) LOG_IMPL(logger, DB::LogsLevel::information, Poco::Message::PRIO_INFORMATION, __VA_ARGS__)
|
||||||
#define LOG_WARNING(logger, ...) LOG_IMPL(logger, LogsLevel::warning, Message::PRIO_WARNING, __VA_ARGS__)
|
#define LOG_WARNING(logger, ...) LOG_IMPL(logger, DB::LogsLevel::warning, Poco::Message::PRIO_WARNING, __VA_ARGS__)
|
||||||
#define LOG_ERROR(logger, ...) LOG_IMPL(logger, LogsLevel::error, Message::PRIO_ERROR, __VA_ARGS__)
|
#define LOG_ERROR(logger, ...) LOG_IMPL(logger, DB::LogsLevel::error, Poco::Message::PRIO_ERROR, __VA_ARGS__)
|
||||||
#define LOG_FATAL(logger, ...) LOG_IMPL(logger, LogsLevel::error, Message::PRIO_FATAL, __VA_ARGS__)
|
#define LOG_FATAL(logger, ...) LOG_IMPL(logger, DB::LogsLevel::error, Poco::Message::PRIO_FATAL, __VA_ARGS__)
|
||||||
|
|
||||||
|
|
||||||
|
/// Compatibility for external projects.
|
||||||
|
#if defined(ARCADIA_BUILD)
|
||||||
|
using Poco::Logger;
|
||||||
|
using Poco::Message;
|
||||||
|
using DB::LogsLevel;
|
||||||
|
using DB::CurrentThread;
|
||||||
|
#endif
|
||||||
|
@ -124,7 +124,7 @@ static void signalHandler(int sig, siginfo_t * info, void * context)
|
|||||||
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
|
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
|
||||||
const StackTrace stack_trace(signal_context);
|
const StackTrace stack_trace(signal_context);
|
||||||
|
|
||||||
StringRef query_id = CurrentThread::getQueryId(); /// This is signal safe.
|
StringRef query_id = DB::CurrentThread::getQueryId(); /// This is signal safe.
|
||||||
query_id.size = std::min(query_id.size, max_query_id_size);
|
query_id.size = std::min(query_id.size, max_query_id_size);
|
||||||
|
|
||||||
DB::writeBinary(sig, out);
|
DB::writeBinary(sig, out);
|
||||||
@ -162,7 +162,7 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
explicit SignalListener(BaseDaemon & daemon_)
|
explicit SignalListener(BaseDaemon & daemon_)
|
||||||
: log(&Logger::get("BaseDaemon"))
|
: log(&Poco::Logger::get("BaseDaemon"))
|
||||||
, daemon(daemon_)
|
, daemon(daemon_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -231,7 +231,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Logger * log;
|
Poco::Logger * log;
|
||||||
BaseDaemon & daemon;
|
BaseDaemon & daemon;
|
||||||
|
|
||||||
void onTerminate(const std::string & message, UInt32 thread_num) const
|
void onTerminate(const std::string & message, UInt32 thread_num) const
|
||||||
@ -288,9 +288,9 @@ extern "C" void __sanitizer_set_death_callback(void (*)());
|
|||||||
|
|
||||||
static void sanitizerDeathCallback()
|
static void sanitizerDeathCallback()
|
||||||
{
|
{
|
||||||
Logger * log = &Logger::get("BaseDaemon");
|
Poco::Logger * log = &Poco::Logger::get("BaseDaemon");
|
||||||
|
|
||||||
StringRef query_id = CurrentThread::getQueryId(); /// This is signal safe.
|
StringRef query_id = DB::CurrentThread::getQueryId(); /// This is signal safe.
|
||||||
|
|
||||||
{
|
{
|
||||||
std::stringstream message;
|
std::stringstream message;
|
||||||
@ -498,10 +498,10 @@ void debugIncreaseOOMScore()
|
|||||||
}
|
}
|
||||||
catch (const Poco::Exception & e)
|
catch (const Poco::Exception & e)
|
||||||
{
|
{
|
||||||
LOG_WARNING(&Logger::root(), "Failed to adjust OOM score: '{}'.", e.displayText());
|
LOG_WARNING(&Poco::Logger::root(), "Failed to adjust OOM score: '{}'.", e.displayText());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG_INFO(&Logger::root(), "Set OOM score adjustment to {}", new_score);
|
LOG_INFO(&Poco::Logger::root(), "Set OOM score adjustment to {}", new_score);
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
void debugIncreaseOOMScore() {}
|
void debugIncreaseOOMScore() {}
|
||||||
@ -715,7 +715,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
|
|||||||
|
|
||||||
void BaseDaemon::logRevision() const
|
void BaseDaemon::logRevision() const
|
||||||
{
|
{
|
||||||
Logger::root().information("Starting " + std::string{VERSION_FULL}
|
Poco::Logger::root().information("Starting " + std::string{VERSION_FULL}
|
||||||
+ " with revision " + std::to_string(ClickHouseRevision::get())
|
+ " with revision " + std::to_string(ClickHouseRevision::get())
|
||||||
+ ", PID " + std::to_string(getpid()));
|
+ ", PID " + std::to_string(getpid()));
|
||||||
}
|
}
|
||||||
@ -732,7 +732,7 @@ void BaseDaemon::handleNotification(Poco::TaskFailedNotification *_tfn)
|
|||||||
{
|
{
|
||||||
task_failed = true;
|
task_failed = true;
|
||||||
Poco::AutoPtr<Poco::TaskFailedNotification> fn(_tfn);
|
Poco::AutoPtr<Poco::TaskFailedNotification> fn(_tfn);
|
||||||
Logger *lg = &(logger());
|
Poco::Logger * lg = &(logger());
|
||||||
LOG_ERROR(lg, "Task '{}' failed. Daemon is shutting down. Reason - {}", fn->task()->name(), fn->reason().displayText());
|
LOG_ERROR(lg, "Task '{}' failed. Daemon is shutting down. Reason - {}", fn->task()->name(), fn->reason().displayText());
|
||||||
ServerApplication::terminate();
|
ServerApplication::terminate();
|
||||||
}
|
}
|
||||||
|
@ -135,7 +135,7 @@ static void attachSystemTables(const Context & context)
|
|||||||
int LocalServer::main(const std::vector<std::string> & /*args*/)
|
int LocalServer::main(const std::vector<std::string> & /*args*/)
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
Logger * log = &logger();
|
Poco::Logger * log = &logger();
|
||||||
ThreadStatus thread_status;
|
ThreadStatus thread_status;
|
||||||
UseSSL use_ssl;
|
UseSSL use_ssl;
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ ODBCBlockInputStream::ODBCBlockInputStream(
|
|||||||
, result{statement}
|
, result{statement}
|
||||||
, iterator{result.begin()}
|
, iterator{result.begin()}
|
||||||
, max_block_size{max_block_size_}
|
, max_block_size{max_block_size_}
|
||||||
, log(&Logger::get("ODBCBlockInputStream"))
|
, log(&Poco::Logger::get("ODBCBlockInputStream"))
|
||||||
{
|
{
|
||||||
if (sample_block.columns() != result.columnCount())
|
if (sample_block.columns() != result.columnCount())
|
||||||
throw Exception{"RecordSet contains " + toString(result.columnCount()) + " columns while " + toString(sample_block.columns())
|
throw Exception{"RecordSet contains " + toString(result.columnCount()) + " columns while " + toString(sample_block.columns())
|
||||||
|
@ -94,7 +94,7 @@ ODBCBlockOutputStream::ODBCBlockOutputStream(Poco::Data::Session && session_,
|
|||||||
, table_name(remote_table_name_)
|
, table_name(remote_table_name_)
|
||||||
, sample_block(sample_block_)
|
, sample_block(sample_block_)
|
||||||
, quoting(quoting_)
|
, quoting(quoting_)
|
||||||
, log(&Logger::get("ODBCBlockOutputStream"))
|
, log(&Poco::Logger::get("ODBCBlockOutputStream"))
|
||||||
{
|
{
|
||||||
description.init(sample_block);
|
description.init(sample_block);
|
||||||
}
|
}
|
||||||
|
@ -89,7 +89,7 @@ namespace CurrentMetrics
|
|||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
void setupTmpPath(Logger * log, const std::string & path)
|
void setupTmpPath(Poco::Logger * log, const std::string & path)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Setting up {} to store temporary data in it", path);
|
LOG_DEBUG(log, "Setting up {} to store temporary data in it", path);
|
||||||
|
|
||||||
@ -212,7 +212,7 @@ void Server::defineOptions(Poco::Util::OptionSet & options)
|
|||||||
|
|
||||||
int Server::main(const std::vector<std::string> & /*args*/)
|
int Server::main(const std::vector<std::string> & /*args*/)
|
||||||
{
|
{
|
||||||
Logger * log = &logger();
|
Poco::Logger * log = &logger();
|
||||||
UseSSL use_ssl;
|
UseSSL use_ssl;
|
||||||
|
|
||||||
ThreadStatus thread_status;
|
ThreadStatus thread_status;
|
||||||
|
@ -309,7 +309,7 @@ bool AllowedClientHosts::contains(const IPAddress & client_address) const
|
|||||||
throw;
|
throw;
|
||||||
/// Try to ignore DNS errors: if host cannot be resolved, skip it and try next.
|
/// Try to ignore DNS errors: if host cannot be resolved, skip it and try next.
|
||||||
LOG_WARNING(
|
LOG_WARNING(
|
||||||
&Logger::get("AddressPatterns"),
|
&Poco::Logger::get("AddressPatterns"),
|
||||||
"Failed to check if the allowed client hosts contain address {}. {}, code = {}",
|
"Failed to check if the allowed client hosts contain address {}. {}, code = {}",
|
||||||
client_address.toString(), e.displayText(), e.code());
|
client_address.toString(), e.displayText(), e.code());
|
||||||
return false;
|
return false;
|
||||||
@ -342,7 +342,7 @@ bool AllowedClientHosts::contains(const IPAddress & client_address) const
|
|||||||
throw;
|
throw;
|
||||||
/// Try to ignore DNS errors: if host cannot be resolved, skip it and try next.
|
/// Try to ignore DNS errors: if host cannot be resolved, skip it and try next.
|
||||||
LOG_WARNING(
|
LOG_WARNING(
|
||||||
&Logger::get("AddressPatterns"),
|
&Poco::Logger::get("AddressPatterns"),
|
||||||
"Failed to check if the allowed client hosts contain address {}. {}, code = {}",
|
"Failed to check if the allowed client hosts contain address {}. {}, code = {}",
|
||||||
client_address.toString(), e.displayText(), e.code());
|
client_address.toString(), e.displayText(), e.code());
|
||||||
return false;
|
return false;
|
||||||
|
@ -508,18 +508,18 @@ void Connection::sendScalarsData(Scalars & data)
|
|||||||
"Sent data for {} scalars, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), compressed {} times to {} ({}/sec.)",
|
"Sent data for {} scalars, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), compressed {} times to {} ({}/sec.)",
|
||||||
data.size(), rows, elapsed,
|
data.size(), rows, elapsed,
|
||||||
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
||||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes),
|
ReadableSize(maybe_compressed_out_bytes),
|
||||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()),
|
ReadableSize(maybe_compressed_out_bytes / watch.elapsedSeconds()),
|
||||||
static_cast<double>(maybe_compressed_out_bytes) / out_bytes,
|
static_cast<double>(maybe_compressed_out_bytes) / out_bytes,
|
||||||
formatReadableSizeWithBinarySuffix(out_bytes),
|
ReadableSize(out_bytes),
|
||||||
formatReadableSizeWithBinarySuffix(out_bytes / watch.elapsedSeconds()));
|
ReadableSize(out_bytes / watch.elapsedSeconds()));
|
||||||
else
|
else
|
||||||
LOG_DEBUG(log_wrapper.get(),
|
LOG_DEBUG(log_wrapper.get(),
|
||||||
"Sent data for {} scalars, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), no compression.",
|
"Sent data for {} scalars, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), no compression.",
|
||||||
data.size(), rows, elapsed,
|
data.size(), rows, elapsed,
|
||||||
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
||||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes),
|
ReadableSize(maybe_compressed_out_bytes),
|
||||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()));
|
ReadableSize(maybe_compressed_out_bytes / watch.elapsedSeconds()));
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
@ -612,18 +612,18 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
|
|||||||
"Sent data for {} external tables, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), compressed {} times to {} ({}/sec.)",
|
"Sent data for {} external tables, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), compressed {} times to {} ({}/sec.)",
|
||||||
data.size(), rows, elapsed,
|
data.size(), rows, elapsed,
|
||||||
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
||||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes),
|
ReadableSize(maybe_compressed_out_bytes),
|
||||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()),
|
ReadableSize(maybe_compressed_out_bytes / watch.elapsedSeconds()),
|
||||||
static_cast<double>(maybe_compressed_out_bytes) / out_bytes,
|
static_cast<double>(maybe_compressed_out_bytes) / out_bytes,
|
||||||
formatReadableSizeWithBinarySuffix(out_bytes),
|
ReadableSize(out_bytes),
|
||||||
formatReadableSizeWithBinarySuffix(out_bytes / watch.elapsedSeconds()));
|
ReadableSize(out_bytes / watch.elapsedSeconds()));
|
||||||
else
|
else
|
||||||
LOG_DEBUG(log_wrapper.get(),
|
LOG_DEBUG(log_wrapper.get(),
|
||||||
"Sent data for {} external tables, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), no compression.",
|
"Sent data for {} external tables, total {} rows in {} sec., {} rows/sec., {} ({}/sec.), no compression.",
|
||||||
data.size(), rows, elapsed,
|
data.size(), rows, elapsed,
|
||||||
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
static_cast<size_t>(rows / watch.elapsedSeconds()),
|
||||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes),
|
ReadableSize(maybe_compressed_out_bytes),
|
||||||
formatReadableSizeWithBinarySuffix(maybe_compressed_out_bytes / watch.elapsedSeconds()));
|
ReadableSize(maybe_compressed_out_bytes / watch.elapsedSeconds()));
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<Poco::Net::SocketAddress> Connection::getResolvedAddress() const
|
std::optional<Poco::Net::SocketAddress> Connection::getResolvedAddress() const
|
||||||
|
@ -249,16 +249,16 @@ private:
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
Logger * get()
|
Poco::Logger * get()
|
||||||
{
|
{
|
||||||
if (!log)
|
if (!log)
|
||||||
log = &Logger::get("Connection (" + parent.getDescription() + ")");
|
log = &Poco::Logger::get("Connection (" + parent.getDescription() + ")");
|
||||||
|
|
||||||
return log;
|
return log;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::atomic<Logger *> log;
|
std::atomic<Poco::Logger *> log;
|
||||||
Connection & parent;
|
Connection & parent;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ public:
|
|||||||
Protocol::Compression compression_ = Protocol::Compression::Enable,
|
Protocol::Compression compression_ = Protocol::Compression::Enable,
|
||||||
Protocol::Secure secure_ = Protocol::Secure::Disable)
|
Protocol::Secure secure_ = Protocol::Secure::Disable)
|
||||||
: Base(max_connections_,
|
: Base(max_connections_,
|
||||||
&Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
|
&Poco::Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
|
||||||
host(host_),
|
host(host_),
|
||||||
port(port_),
|
port(port_),
|
||||||
default_database(default_database_),
|
default_database(default_database_),
|
||||||
|
@ -35,7 +35,7 @@ ConnectionPoolWithFailover::ConnectionPoolWithFailover(
|
|||||||
LoadBalancing load_balancing,
|
LoadBalancing load_balancing,
|
||||||
time_t decrease_error_period_,
|
time_t decrease_error_period_,
|
||||||
size_t max_error_cap_)
|
size_t max_error_cap_)
|
||||||
: Base(std::move(nested_pools_), decrease_error_period_, max_error_cap_, &Logger::get("ConnectionPoolWithFailover"))
|
: Base(std::move(nested_pools_), decrease_error_period_, max_error_cap_, &Poco::Logger::get("ConnectionPoolWithFailover"))
|
||||||
, default_load_balancing(load_balancing)
|
, default_load_balancing(load_balancing)
|
||||||
{
|
{
|
||||||
const std::string & local_hostname = getFQDNOrHostName();
|
const std::string & local_hostname = getFQDNOrHostName();
|
||||||
|
@ -35,7 +35,7 @@ TimeoutSetter::~TimeoutSetter()
|
|||||||
catch (std::exception & e)
|
catch (std::exception & e)
|
||||||
{
|
{
|
||||||
// Sometimes catched on macos
|
// Sometimes catched on macos
|
||||||
LOG_ERROR(&Logger::get("Client"), "TimeoutSetter: Can't reset timeouts: {}", e.what());
|
LOG_ERROR(&Poco::Logger::get("Client"), "TimeoutSetter: Can't reset timeouts: {}", e.what());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,8 +18,8 @@ void AlignedBuffer::alloc(size_t size, size_t alignment)
|
|||||||
void * new_buf;
|
void * new_buf;
|
||||||
int res = ::posix_memalign(&new_buf, std::max(alignment, sizeof(void*)), size);
|
int res = ::posix_memalign(&new_buf, std::max(alignment, sizeof(void*)), size);
|
||||||
if (0 != res)
|
if (0 != res)
|
||||||
throwFromErrno("Cannot allocate memory (posix_memalign), size: "
|
throwFromErrno(fmt::format("Cannot allocate memory (posix_memalign), size: {}, alignment: {}.",
|
||||||
+ formatReadableSizeWithBinarySuffix(size) + ", alignment: " + formatReadableSizeWithBinarySuffix(alignment) + ".",
|
ReadableSize(size), ReadableSize(alignment)),
|
||||||
ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
||||||
buf = new_buf;
|
buf = new_buf;
|
||||||
}
|
}
|
||||||
|
@ -129,7 +129,7 @@ public:
|
|||||||
|
|
||||||
void * new_buf = ::realloc(buf, new_size);
|
void * new_buf = ::realloc(buf, new_size);
|
||||||
if (nullptr == new_buf)
|
if (nullptr == new_buf)
|
||||||
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
DB::throwFromErrno(fmt::format("Allocator: Cannot realloc from {} to {}.", ReadableSize(old_size), ReadableSize(new_size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||||
|
|
||||||
buf = new_buf;
|
buf = new_buf;
|
||||||
if constexpr (clear_memory)
|
if constexpr (clear_memory)
|
||||||
@ -145,7 +145,8 @@ public:
|
|||||||
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE,
|
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE,
|
||||||
PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
|
PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
|
||||||
if (MAP_FAILED == buf)
|
if (MAP_FAILED == buf)
|
||||||
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
|
DB::throwFromErrno(fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.",
|
||||||
|
ReadableSize(old_size), ReadableSize(new_size)), DB::ErrorCodes::CANNOT_MREMAP);
|
||||||
|
|
||||||
/// No need for zero-fill, because mmap guarantees it.
|
/// No need for zero-fill, because mmap guarantees it.
|
||||||
}
|
}
|
||||||
@ -201,13 +202,13 @@ private:
|
|||||||
if (size >= MMAP_THRESHOLD)
|
if (size >= MMAP_THRESHOLD)
|
||||||
{
|
{
|
||||||
if (alignment > MMAP_MIN_ALIGNMENT)
|
if (alignment > MMAP_MIN_ALIGNMENT)
|
||||||
throw DB::Exception("Too large alignment " + formatReadableSizeWithBinarySuffix(alignment) + ": more than page size when allocating "
|
throw DB::Exception(fmt::format("Too large alignment {}: more than page size when allocating {}.",
|
||||||
+ formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::BAD_ARGUMENTS);
|
ReadableSize(alignment), ReadableSize(size)), DB::ErrorCodes::BAD_ARGUMENTS);
|
||||||
|
|
||||||
buf = mmap(getMmapHint(), size, PROT_READ | PROT_WRITE,
|
buf = mmap(getMmapHint(), size, PROT_READ | PROT_WRITE,
|
||||||
mmap_flags, -1, 0);
|
mmap_flags, -1, 0);
|
||||||
if (MAP_FAILED == buf)
|
if (MAP_FAILED == buf)
|
||||||
DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
DB::throwFromErrno(fmt::format("Allocator: Cannot mmap {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||||
|
|
||||||
/// No need for zero-fill, because mmap guarantees it.
|
/// No need for zero-fill, because mmap guarantees it.
|
||||||
}
|
}
|
||||||
@ -221,7 +222,7 @@ private:
|
|||||||
buf = ::malloc(size);
|
buf = ::malloc(size);
|
||||||
|
|
||||||
if (nullptr == buf)
|
if (nullptr == buf)
|
||||||
DB::throwFromErrno("Allocator: Cannot malloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
DB::throwFromErrno(fmt::format("Allocator: Cannot malloc {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -229,7 +230,8 @@ private:
|
|||||||
int res = posix_memalign(&buf, alignment, size);
|
int res = posix_memalign(&buf, alignment, size);
|
||||||
|
|
||||||
if (0 != res)
|
if (0 != res)
|
||||||
DB::throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
DB::throwFromErrno(fmt::format("Cannot allocate memory (posix_memalign) {}.", ReadableSize(size)),
|
||||||
|
DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
||||||
|
|
||||||
if constexpr (clear_memory)
|
if constexpr (clear_memory)
|
||||||
memset(buf, 0, size);
|
memset(buf, 0, size);
|
||||||
@ -243,7 +245,7 @@ private:
|
|||||||
if (size >= MMAP_THRESHOLD)
|
if (size >= MMAP_THRESHOLD)
|
||||||
{
|
{
|
||||||
if (0 != munmap(buf, size))
|
if (0 != munmap(buf, size))
|
||||||
DB::throwFromErrno("Allocator: Cannot munmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_MUNMAP);
|
DB::throwFromErrno(fmt::format("Allocator: Cannot munmap {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_MUNMAP);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -177,13 +177,13 @@ private:
|
|||||||
{
|
{
|
||||||
ptr = mmap(address_hint, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
ptr = mmap(address_hint, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
||||||
if (MAP_FAILED == ptr)
|
if (MAP_FAILED == ptr)
|
||||||
DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
DB::throwFromErrno(fmt::format("Allocator: Cannot mmap {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
~Chunk()
|
~Chunk()
|
||||||
{
|
{
|
||||||
if (ptr && 0 != munmap(ptr, size))
|
if (ptr && 0 != munmap(ptr, size))
|
||||||
DB::throwFromErrno("Allocator: Cannot munmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_MUNMAP);
|
DB::throwFromErrno(fmt::format("Allocator: Cannot munmap {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_MUNMAP);
|
||||||
}
|
}
|
||||||
|
|
||||||
Chunk(Chunk && other) : ptr(other.ptr), size(other.size)
|
Chunk(Chunk && other) : ptr(other.ptr), size(other.size)
|
||||||
|
@ -278,7 +278,7 @@ private:
|
|||||||
void * new_data = nullptr;
|
void * new_data = nullptr;
|
||||||
int res = posix_memalign(&new_data, alignment, prefix_size + new_size * sizeof(T));
|
int res = posix_memalign(&new_data, alignment, prefix_size + new_size * sizeof(T));
|
||||||
if (0 != res)
|
if (0 != res)
|
||||||
throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(new_size) + ".",
|
throwFromErrno(fmt::format("Cannot allocate memory (posix_memalign) {}.", ReadableSize(new_size)),
|
||||||
ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
||||||
|
|
||||||
data_ptr = static_cast<char *>(new_data);
|
data_ptr = static_cast<char *>(new_data);
|
||||||
|
@ -66,21 +66,21 @@ ConfigProcessor::ConfigProcessor(
|
|||||||
, name_pool(new Poco::XML::NamePool(65521))
|
, name_pool(new Poco::XML::NamePool(65521))
|
||||||
, dom_parser(name_pool)
|
, dom_parser(name_pool)
|
||||||
{
|
{
|
||||||
if (log_to_console && !Logger::has("ConfigProcessor"))
|
if (log_to_console && !Poco::Logger::has("ConfigProcessor"))
|
||||||
{
|
{
|
||||||
channel_ptr = new Poco::ConsoleChannel;
|
channel_ptr = new Poco::ConsoleChannel;
|
||||||
log = &Logger::create("ConfigProcessor", channel_ptr.get(), Poco::Message::PRIO_TRACE);
|
log = &Poco::Logger::create("ConfigProcessor", channel_ptr.get(), Poco::Message::PRIO_TRACE);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
log = &Logger::get("ConfigProcessor");
|
log = &Poco::Logger::get("ConfigProcessor");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ConfigProcessor::~ConfigProcessor()
|
ConfigProcessor::~ConfigProcessor()
|
||||||
{
|
{
|
||||||
if (channel_ptr) /// This means we have created a new console logger in the constructor.
|
if (channel_ptr) /// This means we have created a new console logger in the constructor.
|
||||||
Logger::destroy("ConfigProcessor");
|
Poco::Logger::destroy("ConfigProcessor");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -116,7 +116,7 @@ private:
|
|||||||
|
|
||||||
bool throw_on_bad_incl;
|
bool throw_on_bad_incl;
|
||||||
|
|
||||||
Logger * log;
|
Poco::Logger * log;
|
||||||
Poco::AutoPtr<Poco::Channel> channel_ptr;
|
Poco::AutoPtr<Poco::Channel> channel_ptr;
|
||||||
|
|
||||||
Substitutions substitutions;
|
Substitutions substitutions;
|
||||||
|
@ -69,7 +69,7 @@ private:
|
|||||||
|
|
||||||
static constexpr auto reload_interval = std::chrono::seconds(2);
|
static constexpr auto reload_interval = std::chrono::seconds(2);
|
||||||
|
|
||||||
Poco::Logger * log = &Logger::get("ConfigReloader");
|
Poco::Logger * log = &Poco::Logger::get("ConfigReloader");
|
||||||
|
|
||||||
std::string path;
|
std::string path;
|
||||||
std::string include_from_path;
|
std::string include_from_path;
|
||||||
|
@ -202,7 +202,7 @@ bool DNSResolver::updateCache()
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!lost_hosts.empty())
|
if (!lost_hosts.empty())
|
||||||
LOG_INFO(&Logger::get("DNSResolver"), "Cached hosts not found: {}", lost_hosts);
|
LOG_INFO(&Poco::Logger::get("DNSResolver"), "Cached hosts not found: {}", lost_hosts);
|
||||||
|
|
||||||
return updated;
|
return updated;
|
||||||
}
|
}
|
||||||
|
@ -122,7 +122,7 @@ void throwFromErrnoWithPath(const std::string & s, const std::string & path, int
|
|||||||
|
|
||||||
void tryLogCurrentException(const char * log_name, const std::string & start_of_message)
|
void tryLogCurrentException(const char * log_name, const std::string & start_of_message)
|
||||||
{
|
{
|
||||||
tryLogCurrentException(&Logger::get(log_name), start_of_message);
|
tryLogCurrentException(&Poco::Logger::get(log_name), start_of_message);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_message)
|
void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_message)
|
||||||
@ -148,13 +148,16 @@ static void getNoSpaceLeftInfoMessage(std::filesystem::path path, std::string &
|
|||||||
path = path.parent_path();
|
path = path.parent_path();
|
||||||
|
|
||||||
auto fs = getStatVFS(path);
|
auto fs = getStatVFS(path);
|
||||||
msg += "\nTotal space: " + formatReadableSizeWithBinarySuffix(fs.f_blocks * fs.f_bsize)
|
|
||||||
+ "\nAvailable space: " + formatReadableSizeWithBinarySuffix(fs.f_bavail * fs.f_bsize)
|
|
||||||
+ "\nTotal inodes: " + formatReadableQuantity(fs.f_files)
|
|
||||||
+ "\nAvailable inodes: " + formatReadableQuantity(fs.f_favail);
|
|
||||||
|
|
||||||
auto mount_point = getMountPoint(path).string();
|
auto mount_point = getMountPoint(path).string();
|
||||||
msg += "\nMount point: " + mount_point;
|
|
||||||
|
fmt::format_to(std::back_inserter(msg),
|
||||||
|
"\nTotal space: {}\nAvailable space: {}\nTotal inodes: {}\nAvailable inodes: {}\nMount point: {}",
|
||||||
|
ReadableSize(fs.f_blocks * fs.f_bsize),
|
||||||
|
ReadableSize(fs.f_bavail * fs.f_bsize),
|
||||||
|
formatReadableQuantity(fs.f_files),
|
||||||
|
formatReadableQuantity(fs.f_favail),
|
||||||
|
mount_point);
|
||||||
|
|
||||||
#if defined(__linux__)
|
#if defined(__linux__)
|
||||||
msg += "\nFilesystem: " + getFilesystemName(mount_point);
|
msg += "\nFilesystem: " + getFilesystemName(mount_point);
|
||||||
#endif
|
#endif
|
||||||
|
@ -37,7 +37,7 @@ private:
|
|||||||
Map map;
|
Map map;
|
||||||
bool initialized = false;
|
bool initialized = false;
|
||||||
|
|
||||||
Logger * log = &Logger::get("FileChecker");
|
Poco::Logger * log = &Poco::Logger::get("FileChecker");
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -306,7 +306,7 @@ private:
|
|||||||
auto it = cells.find(key);
|
auto it = cells.find(key);
|
||||||
if (it == cells.end())
|
if (it == cells.end())
|
||||||
{
|
{
|
||||||
LOG_ERROR(&Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it.");
|
LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it.");
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -324,7 +324,7 @@ private:
|
|||||||
|
|
||||||
if (current_size > (1ull << 63))
|
if (current_size > (1ull << 63))
|
||||||
{
|
{
|
||||||
LOG_ERROR(&Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it.");
|
LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it.");
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,13 +50,13 @@ MemoryTracker::~MemoryTracker()
|
|||||||
void MemoryTracker::logPeakMemoryUsage() const
|
void MemoryTracker::logPeakMemoryUsage() const
|
||||||
{
|
{
|
||||||
const auto * description = description_ptr.load(std::memory_order_relaxed);
|
const auto * description = description_ptr.load(std::memory_order_relaxed);
|
||||||
LOG_DEBUG(&Logger::get("MemoryTracker"), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), formatReadableSizeWithBinarySuffix(peak));
|
LOG_DEBUG(&Poco::Logger::get("MemoryTracker"), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(peak));
|
||||||
}
|
}
|
||||||
|
|
||||||
void MemoryTracker::logMemoryUsage(Int64 current) const
|
void MemoryTracker::logMemoryUsage(Int64 current) const
|
||||||
{
|
{
|
||||||
const auto * description = description_ptr.load(std::memory_order_relaxed);
|
const auto * description = description_ptr.load(std::memory_order_relaxed);
|
||||||
LOG_DEBUG(&Logger::get("MemoryTracker"), "Current memory usage{}: {}.", (description ? " " + std::string(description) : ""), formatReadableSizeWithBinarySuffix(current));
|
LOG_DEBUG(&Poco::Logger::get("MemoryTracker"), "Current memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(current));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ void LazyPipeFDs::tryIncreaseSize(int desired_size)
|
|||||||
if (-1 == fcntl(fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
|
if (-1 == fcntl(fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
|
||||||
throwFromErrno("Cannot increase pipe capacity to " + std::to_string(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
|
throwFromErrno("Cannot increase pipe capacity to " + std::to_string(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
|
||||||
|
|
||||||
LOG_TRACE(log, "Pipe capacity is {}", formatReadableSizeWithBinarySuffix(std::min(pipe_size, desired_size)));
|
LOG_TRACE(log, "Pipe capacity is {}", ReadableSize(std::min(pipe_size, desired_size)));
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
(void)desired_size;
|
(void)desired_size;
|
||||||
|
@ -152,9 +152,9 @@ private:
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
Logger * log;
|
Poco::Logger * log;
|
||||||
|
|
||||||
PoolBase(unsigned max_items_, Logger * log_)
|
PoolBase(unsigned max_items_, Poco::Logger * log_)
|
||||||
: max_items(max_items_), log(log_)
|
: max_items(max_items_), log(log_)
|
||||||
{
|
{
|
||||||
items.reserve(max_items);
|
items.reserve(max_items);
|
||||||
|
@ -57,7 +57,7 @@ public:
|
|||||||
NestedPools nested_pools_,
|
NestedPools nested_pools_,
|
||||||
time_t decrease_error_period_,
|
time_t decrease_error_period_,
|
||||||
size_t max_error_cap_,
|
size_t max_error_cap_,
|
||||||
Logger * log_)
|
Poco::Logger * log_)
|
||||||
: nested_pools(std::move(nested_pools_))
|
: nested_pools(std::move(nested_pools_))
|
||||||
, decrease_error_period(decrease_error_period_)
|
, decrease_error_period(decrease_error_period_)
|
||||||
, max_error_cap(max_error_cap_)
|
, max_error_cap(max_error_cap_)
|
||||||
@ -134,7 +134,7 @@ protected:
|
|||||||
/// The time when error counts were last decreased.
|
/// The time when error counts were last decreased.
|
||||||
time_t last_error_decrease_time = 0;
|
time_t last_error_decrease_time = 0;
|
||||||
|
|
||||||
Logger * log;
|
Poco::Logger * log;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename TNestedPool>
|
template <typename TNestedPool>
|
||||||
|
@ -79,7 +79,7 @@ namespace ErrorCodes
|
|||||||
|
|
||||||
template <typename ProfilerImpl>
|
template <typename ProfilerImpl>
|
||||||
QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const int clock_type, UInt32 period, const int pause_signal_)
|
QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const int clock_type, UInt32 period, const int pause_signal_)
|
||||||
: log(&Logger::get("QueryProfiler"))
|
: log(&Poco::Logger::get("QueryProfiler"))
|
||||||
, pause_signal(pause_signal_)
|
, pause_signal(pause_signal_)
|
||||||
{
|
{
|
||||||
#if USE_UNWIND
|
#if USE_UNWIND
|
||||||
|
@ -102,7 +102,7 @@ SensitiveDataMasker::SensitiveDataMasker(const Poco::Util::AbstractConfiguration
|
|||||||
{
|
{
|
||||||
Poco::Util::AbstractConfiguration::Keys keys;
|
Poco::Util::AbstractConfiguration::Keys keys;
|
||||||
config.keys(config_prefix, keys);
|
config.keys(config_prefix, keys);
|
||||||
Logger * logger = &Logger::get("SensitiveDataMaskerConfigRead");
|
Poco::Logger * logger = &Poco::Logger::get("SensitiveDataMaskerConfigRead");
|
||||||
|
|
||||||
std::set<std::string> used_names;
|
std::set<std::string> used_names;
|
||||||
|
|
||||||
|
@ -43,9 +43,9 @@ StatusFile::StatusFile(const std::string & path_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!contents.empty())
|
if (!contents.empty())
|
||||||
LOG_INFO(&Logger::get("StatusFile"), "Status file {} already exists - unclean restart. Contents:\n{}", path, contents);
|
LOG_INFO(&Poco::Logger::get("StatusFile"), "Status file {} already exists - unclean restart. Contents:\n{}", path, contents);
|
||||||
else
|
else
|
||||||
LOG_INFO(&Logger::get("StatusFile"), "Status file {} already exists and is empty - probably unclean hardware restart.", path);
|
LOG_INFO(&Poco::Logger::get("StatusFile"), "Status file {} already exists and is empty - probably unclean hardware restart.", path);
|
||||||
}
|
}
|
||||||
|
|
||||||
fd = ::open(path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
|
fd = ::open(path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
|
||||||
@ -90,10 +90,10 @@ StatusFile::StatusFile(const std::string & path_)
|
|||||||
StatusFile::~StatusFile()
|
StatusFile::~StatusFile()
|
||||||
{
|
{
|
||||||
if (0 != close(fd))
|
if (0 != close(fd))
|
||||||
LOG_ERROR(&Logger::get("StatusFile"), "Cannot close file {}, {}", path, errnoToString(ErrorCodes::CANNOT_CLOSE_FILE));
|
LOG_ERROR(&Poco::Logger::get("StatusFile"), "Cannot close file {}, {}", path, errnoToString(ErrorCodes::CANNOT_CLOSE_FILE));
|
||||||
|
|
||||||
if (0 != unlink(path.c_str()))
|
if (0 != unlink(path.c_str()))
|
||||||
LOG_ERROR(&Logger::get("StatusFile"), "Cannot unlink file {}, {}", path, errnoToString(ErrorCodes::CANNOT_CLOSE_FILE));
|
LOG_ERROR(&Poco::Logger::get("StatusFile"), "Cannot unlink file {}, {}", path, errnoToString(ErrorCodes::CANNOT_CLOSE_FILE));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -43,7 +43,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
zkutil::ZooKeeperHolderPtr zookeeper_holder;
|
zkutil::ZooKeeperHolderPtr zookeeper_holder;
|
||||||
std::string path;
|
std::string path;
|
||||||
Logger * log = &Logger::get("zkutil::Increment");
|
Poco::Logger * log = &Poco::Logger::get("zkutil::Increment");
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ public:
|
|||||||
LeaderElection(DB::BackgroundSchedulePool & pool_, const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
|
LeaderElection(DB::BackgroundSchedulePool & pool_, const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
|
||||||
: pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
|
: pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
|
||||||
, log_name("LeaderElection (" + path + ")")
|
, log_name("LeaderElection (" + path + ")")
|
||||||
, log(&Logger::get(log_name))
|
, log(&Poco::Logger::get(log_name))
|
||||||
{
|
{
|
||||||
task = pool.createTask(log_name, [this] { threadFunction(); });
|
task = pool.createTask(log_name, [this] { threadFunction(); });
|
||||||
createNode();
|
createNode();
|
||||||
@ -67,7 +67,7 @@ private:
|
|||||||
LeadershipHandler handler;
|
LeadershipHandler handler;
|
||||||
std::string identifier;
|
std::string identifier;
|
||||||
std::string log_name;
|
std::string log_name;
|
||||||
Logger * log;
|
Poco::Logger * log;
|
||||||
|
|
||||||
EphemeralNodeHolderPtr node;
|
EphemeralNodeHolderPtr node;
|
||||||
std::string node_name;
|
std::string node_name;
|
||||||
|
@ -21,7 +21,7 @@ namespace zkutil
|
|||||||
zookeeper_holder(zookeeper_holder_),
|
zookeeper_holder(zookeeper_holder_),
|
||||||
lock_path(lock_prefix_ + "/" + lock_name_),
|
lock_path(lock_prefix_ + "/" + lock_name_),
|
||||||
lock_message(lock_message_),
|
lock_message(lock_message_),
|
||||||
log(&Logger::get("zkutil::Lock"))
|
log(&Poco::Logger::get("zkutil::Lock"))
|
||||||
{
|
{
|
||||||
auto zookeeper = zookeeper_holder->getZooKeeper();
|
auto zookeeper = zookeeper_holder->getZooKeeper();
|
||||||
if (create_parent_path_)
|
if (create_parent_path_)
|
||||||
@ -72,7 +72,7 @@ namespace zkutil
|
|||||||
|
|
||||||
std::string lock_path;
|
std::string lock_path;
|
||||||
std::string lock_message;
|
std::string lock_message;
|
||||||
Logger * log;
|
Poco::Logger * log;
|
||||||
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -48,7 +48,7 @@ static void check(int32_t code, const std::string & path)
|
|||||||
void ZooKeeper::init(const std::string & implementation_, const std::string & hosts_, const std::string & identity_,
|
void ZooKeeper::init(const std::string & implementation_, const std::string & hosts_, const std::string & identity_,
|
||||||
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_)
|
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_)
|
||||||
{
|
{
|
||||||
log = &Logger::get("ZooKeeper");
|
log = &Poco::Logger::get("ZooKeeper");
|
||||||
hosts = hosts_;
|
hosts = hosts_;
|
||||||
identity = identity_;
|
identity = identity_;
|
||||||
session_timeout_ms = session_timeout_ms_;
|
session_timeout_ms = session_timeout_ms_;
|
||||||
|
@ -269,7 +269,7 @@ private:
|
|||||||
|
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
|
|
||||||
Logger * log = nullptr;
|
Poco::Logger * log = nullptr;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -70,7 +70,7 @@ private:
|
|||||||
mutable std::mutex mutex;
|
mutable std::mutex mutex;
|
||||||
ZooKeeper::Ptr ptr;
|
ZooKeeper::Ptr ptr;
|
||||||
|
|
||||||
Logger * log = &Logger::get("ZooKeeperHolder");
|
Poco::Logger * log = &Poco::Logger::get("ZooKeeperHolder");
|
||||||
|
|
||||||
static std::string nullptr_exception_message;
|
static std::string nullptr_exception_message;
|
||||||
};
|
};
|
||||||
|
@ -20,8 +20,8 @@ int main(int argc, char ** argv)
|
|||||||
}
|
}
|
||||||
|
|
||||||
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
|
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
|
||||||
Logger::root().setChannel(channel);
|
Poco::Logger::root().setChannel(channel);
|
||||||
Logger::root().setLevel("trace");
|
Poco::Logger::root().setLevel("trace");
|
||||||
|
|
||||||
zkutil::ZooKeeper zk(argv[1]);
|
zkutil::ZooKeeper zk(argv[1]);
|
||||||
std::string unused;
|
std::string unused;
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <fmt/format.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -20,3 +22,35 @@ std::string formatReadableSizeWithDecimalSuffix(double value, int precision = 2)
|
|||||||
/// Prints the number as 123.45 billion.
|
/// Prints the number as 123.45 billion.
|
||||||
void formatReadableQuantity(double value, DB::WriteBuffer & out, int precision = 2);
|
void formatReadableQuantity(double value, DB::WriteBuffer & out, int precision = 2);
|
||||||
std::string formatReadableQuantity(double value, int precision = 2);
|
std::string formatReadableQuantity(double value, int precision = 2);
|
||||||
|
|
||||||
|
|
||||||
|
/// Wrapper around value. If used with fmt library (e.g. for log messages),
|
||||||
|
/// value is automatically formatted as size with binary suffix.
|
||||||
|
struct ReadableSize
|
||||||
|
{
|
||||||
|
double value;
|
||||||
|
explicit ReadableSize(double value_) : value(value_) {}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// See https://fmt.dev/latest/api.html#formatting-user-defined-types
|
||||||
|
template <>
|
||||||
|
struct fmt::formatter<ReadableSize>
|
||||||
|
{
|
||||||
|
constexpr auto parse(format_parse_context & ctx)
|
||||||
|
{
|
||||||
|
auto it = ctx.begin();
|
||||||
|
auto end = ctx.end();
|
||||||
|
|
||||||
|
/// Only support {}.
|
||||||
|
if (it != end && *it != '}')
|
||||||
|
throw format_error("invalid format");
|
||||||
|
|
||||||
|
return it;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename FormatContext>
|
||||||
|
auto format(const ReadableSize & size, FormatContext & ctx)
|
||||||
|
{
|
||||||
|
return format_to(ctx.out(), "{}", formatReadableSizeWithBinarySuffix(size.value));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
@ -12,7 +12,7 @@ TEST(Logger, Log)
|
|||||||
{
|
{
|
||||||
Poco::Logger::root().setLevel("none");
|
Poco::Logger::root().setLevel("none");
|
||||||
Poco::Logger::root().setChannel(Poco::AutoPtr<Poco::NullChannel>(new Poco::NullChannel()));
|
Poco::Logger::root().setChannel(Poco::AutoPtr<Poco::NullChannel>(new Poco::NullChannel()));
|
||||||
Logger * log = &Logger::get("Log");
|
Poco::Logger * log = &Poco::Logger::get("Log");
|
||||||
|
|
||||||
/// This test checks that we don't pass this string to fmtlib, because it is the only argument.
|
/// This test checks that we don't pass this string to fmtlib, because it is the only argument.
|
||||||
EXPECT_NO_THROW(LOG_INFO(log, "Hello {} World"));
|
EXPECT_NO_THROW(LOG_INFO(log, "Hello {} World"));
|
||||||
|
@ -111,7 +111,7 @@ void BackgroundSchedulePoolTaskInfo::execute()
|
|||||||
static const int32_t slow_execution_threshold_ms = 200;
|
static const int32_t slow_execution_threshold_ms = 200;
|
||||||
|
|
||||||
if (milliseconds >= slow_execution_threshold_ms)
|
if (milliseconds >= slow_execution_threshold_ms)
|
||||||
LOG_TRACE(&Logger::get(log_name), "Execution took {} ms.", milliseconds);
|
LOG_TRACE(&Poco::Logger::get(log_name), "Execution took {} ms.", milliseconds);
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock_schedule(schedule_mutex);
|
std::lock_guard lock_schedule(schedule_mutex);
|
||||||
@ -156,7 +156,7 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met
|
|||||||
, memory_metric(memory_metric_)
|
, memory_metric(memory_metric_)
|
||||||
, thread_name(thread_name_)
|
, thread_name(thread_name_)
|
||||||
{
|
{
|
||||||
LOG_INFO(&Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size);
|
LOG_INFO(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size);
|
||||||
|
|
||||||
threads.resize(size);
|
threads.resize(size);
|
||||||
for (auto & thread : threads)
|
for (auto & thread : threads)
|
||||||
@ -179,7 +179,7 @@ BackgroundSchedulePool::~BackgroundSchedulePool()
|
|||||||
queue.wakeUpAll();
|
queue.wakeUpAll();
|
||||||
delayed_thread.join();
|
delayed_thread.join();
|
||||||
|
|
||||||
LOG_TRACE(&Logger::get("BackgroundSchedulePool/" + thread_name), "Waiting for threads to finish.");
|
LOG_TRACE(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Waiting for threads to finish.");
|
||||||
for (auto & thread : threads)
|
for (auto & thread : threads)
|
||||||
thread.join();
|
thread.join();
|
||||||
}
|
}
|
||||||
|
@ -994,7 +994,7 @@ private:
|
|||||||
class Sha256Password : public IPlugin
|
class Sha256Password : public IPlugin
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
Sha256Password(RSA & public_key_, RSA & private_key_, Logger * log_)
|
Sha256Password(RSA & public_key_, RSA & private_key_, Poco::Logger * log_)
|
||||||
: public_key(public_key_)
|
: public_key(public_key_)
|
||||||
, private_key(private_key_)
|
, private_key(private_key_)
|
||||||
, log(log_)
|
, log(log_)
|
||||||
@ -1130,7 +1130,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
RSA & public_key;
|
RSA & public_key;
|
||||||
RSA & private_key;
|
RSA & private_key;
|
||||||
Logger * log;
|
Poco::Logger * log;
|
||||||
String scramble;
|
String scramble;
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
@ -598,7 +598,7 @@ namespace details
|
|||||||
|
|
||||||
void SettingsCollectionUtils::warningNameNotFound(const StringRef & name)
|
void SettingsCollectionUtils::warningNameNotFound(const StringRef & name)
|
||||||
{
|
{
|
||||||
static auto * log = &Logger::get("Settings");
|
static auto * log = &Poco::Logger::get("Settings");
|
||||||
LOG_WARNING(log, "Unknown setting {}, skipping", name);
|
LOG_WARNING(log, "Unknown setting {}, skipping", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ Block AggregatingBlockInputStream::readImpl()
|
|||||||
input_streams.emplace_back(temporary_inputs.back()->block_in);
|
input_streams.emplace_back(temporary_inputs.back()->block_in);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), formatReadableSizeWithBinarySuffix(files.sum_size_compressed), formatReadableSizeWithBinarySuffix(files.sum_size_uncompressed));
|
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
|
||||||
|
|
||||||
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(input_streams, params, final, 1, 1);
|
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(input_streams, params, final, 1, 1);
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,7 @@ protected:
|
|||||||
/** From here we will get the completed blocks after the aggregation. */
|
/** From here we will get the completed blocks after the aggregation. */
|
||||||
std::unique_ptr<IBlockInputStream> impl;
|
std::unique_ptr<IBlockInputStream> impl;
|
||||||
|
|
||||||
Logger * log = &Logger::get("AggregatingBlockInputStream");
|
Poco::Logger * log = &Poco::Logger::get("AggregatingBlockInputStream");
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -168,7 +168,7 @@ private:
|
|||||||
const SortDescription description;
|
const SortDescription description;
|
||||||
String sign_column_name;
|
String sign_column_name;
|
||||||
|
|
||||||
Logger * log = &Logger::get("CollapsingFinalBlockInputStream");
|
Poco::Logger * log = &Poco::Logger::get("CollapsingFinalBlockInputStream");
|
||||||
|
|
||||||
bool first = true;
|
bool first = true;
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ ColumnGathererStream::ColumnGathererStream(
|
|||||||
const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
|
const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
|
||||||
size_t block_preferred_size_)
|
size_t block_preferred_size_)
|
||||||
: column_name(column_name_), sources(source_streams.size()), row_sources_buf(row_sources_buf_)
|
: column_name(column_name_), sources(source_streams.size()), row_sources_buf(row_sources_buf_)
|
||||||
, block_preferred_size(block_preferred_size_), log(&Logger::get("ColumnGathererStream"))
|
, block_preferred_size(block_preferred_size_), log(&Poco::Logger::get("ColumnGathererStream"))
|
||||||
{
|
{
|
||||||
if (source_streams.empty())
|
if (source_streams.empty())
|
||||||
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
|
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
|
||||||
@ -105,7 +105,7 @@ void ColumnGathererStream::readSuffixImpl()
|
|||||||
else
|
else
|
||||||
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
|
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
|
||||||
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows, seconds,
|
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows, seconds,
|
||||||
profile_info.rows / seconds, formatReadableSizeWithBinarySuffix(profile_info.bytes / seconds));
|
profile_info.rows / seconds, ReadableSize(profile_info.bytes / seconds));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -44,7 +44,7 @@ private:
|
|||||||
size_t bytes_to_transfer = 0;
|
size_t bytes_to_transfer = 0;
|
||||||
|
|
||||||
using Logger = Poco::Logger;
|
using Logger = Poco::Logger;
|
||||||
Logger * log = &Logger::get("CreatingSetsBlockInputStream");
|
Poco::Logger * log = &Poco::Logger::get("CreatingSetsBlockInputStream");
|
||||||
|
|
||||||
void createAll();
|
void createAll();
|
||||||
void createOne(SubqueryForSet & subquery);
|
void createOne(SubqueryForSet & subquery);
|
||||||
|
@ -264,7 +264,7 @@ void MergeSortingBlockInputStream::remerge()
|
|||||||
}
|
}
|
||||||
merger.readSuffix();
|
merger.readSuffix();
|
||||||
|
|
||||||
LOG_DEBUG(log, "Memory usage is lowered from {} to {}", formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks), formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
|
LOG_DEBUG(log, "Memory usage is lowered from {} to {}", ReadableSize(sum_bytes_in_blocks), ReadableSize(new_sum_bytes_in_blocks));
|
||||||
|
|
||||||
/// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
|
/// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
|
||||||
if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)
|
if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)
|
||||||
|
@ -104,7 +104,7 @@ private:
|
|||||||
String codec;
|
String codec;
|
||||||
size_t min_free_disk_space;
|
size_t min_free_disk_space;
|
||||||
|
|
||||||
Logger * log = &Logger::get("MergeSortingBlockInputStream");
|
Poco::Logger * log = &Poco::Logger::get("MergeSortingBlockInputStream");
|
||||||
|
|
||||||
Blocks blocks;
|
Blocks blocks;
|
||||||
size_t sum_rows_in_blocks = 0;
|
size_t sum_rows_in_blocks = 0;
|
||||||
|
@ -555,7 +555,7 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
|
|||||||
/// Not yet partitioned (splitted to buckets) block. Will partition it and place result to 'splitted_blocks'.
|
/// Not yet partitioned (splitted to buckets) block. Will partition it and place result to 'splitted_blocks'.
|
||||||
if (input.block.info.bucket_num == -1 && input.block && input.splitted_blocks.empty())
|
if (input.block.info.bucket_num == -1 && input.block && input.splitted_blocks.empty())
|
||||||
{
|
{
|
||||||
LOG_TRACE(&Logger::get("MergingAggregatedMemoryEfficient"), "Having block without bucket: will split.");
|
LOG_TRACE(&Poco::Logger::get("MergingAggregatedMemoryEfficient"), "Having block without bucket: will split.");
|
||||||
|
|
||||||
input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block);
|
input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block);
|
||||||
input.block = Block();
|
input.block = Block();
|
||||||
|
@ -96,7 +96,7 @@ private:
|
|||||||
std::atomic<bool> has_overflows {false};
|
std::atomic<bool> has_overflows {false};
|
||||||
int current_bucket_num = -1;
|
int current_bucket_num = -1;
|
||||||
|
|
||||||
Logger * log = &Logger::get("MergingAggregatedMemoryEfficientBlockInputStream");
|
Poco::Logger * log = &Poco::Logger::get("MergingAggregatedMemoryEfficientBlockInputStream");
|
||||||
|
|
||||||
|
|
||||||
struct Input
|
struct Input
|
||||||
|
@ -23,7 +23,7 @@ MergingSortedBlockInputStream::MergingSortedBlockInputStream(
|
|||||||
: description(std::move(description_)), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
|
: description(std::move(description_)), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
|
||||||
, source_blocks(inputs_.size())
|
, source_blocks(inputs_.size())
|
||||||
, cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_)
|
, cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_)
|
||||||
, log(&Logger::get("MergingSortedBlockInputStream"))
|
, log(&Poco::Logger::get("MergingSortedBlockInputStream"))
|
||||||
{
|
{
|
||||||
children.insert(children.end(), inputs_.begin(), inputs_.end());
|
children.insert(children.end(), inputs_.begin(), inputs_.end());
|
||||||
header = children.at(0)->getHeader();
|
header = children.at(0)->getHeader();
|
||||||
@ -269,7 +269,7 @@ void MergingSortedBlockInputStream::readSuffixImpl()
|
|||||||
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in {} sec., {} rows/sec., {}/sec",
|
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in {} sec., {} rows/sec., {}/sec",
|
||||||
profile_info.blocks, profile_info.rows, seconds,
|
profile_info.blocks, profile_info.rows, seconds,
|
||||||
profile_info.rows / seconds,
|
profile_info.rows / seconds,
|
||||||
formatReadableSizeWithBinarySuffix(profile_info.bytes / seconds));
|
ReadableSize(profile_info.bytes / seconds));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ Block ParallelAggregatingBlockInputStream::readImpl()
|
|||||||
input_streams.emplace_back(temporary_inputs.back()->block_in);
|
input_streams.emplace_back(temporary_inputs.back()->block_in);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), formatReadableSizeWithBinarySuffix(files.sum_size_compressed), formatReadableSizeWithBinarySuffix(files.sum_size_uncompressed));
|
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
|
||||||
|
|
||||||
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(
|
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(
|
||||||
input_streams, params, final, temporary_data_merge_threads, temporary_data_merge_threads);
|
input_streams, params, final, temporary_data_merge_threads, temporary_data_merge_threads);
|
||||||
@ -178,16 +178,16 @@ void ParallelAggregatingBlockInputStream::execute()
|
|||||||
{
|
{
|
||||||
size_t rows = many_data[i]->size();
|
size_t rows = many_data[i]->size();
|
||||||
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
|
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
|
||||||
threads_data[i].src_rows, rows, formatReadableSizeWithBinarySuffix(threads_data[i].src_bytes),
|
threads_data[i].src_rows, rows, ReadableSize(threads_data[i].src_bytes),
|
||||||
elapsed_seconds, threads_data[i].src_rows / elapsed_seconds,
|
elapsed_seconds, threads_data[i].src_rows / elapsed_seconds,
|
||||||
formatReadableSizeWithBinarySuffix(threads_data[i].src_bytes / elapsed_seconds));
|
ReadableSize(threads_data[i].src_bytes / elapsed_seconds));
|
||||||
|
|
||||||
total_src_rows += threads_data[i].src_rows;
|
total_src_rows += threads_data[i].src_rows;
|
||||||
total_src_bytes += threads_data[i].src_bytes;
|
total_src_bytes += threads_data[i].src_bytes;
|
||||||
}
|
}
|
||||||
LOG_TRACE(log, "Total aggregated. {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
|
LOG_TRACE(log, "Total aggregated. {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
|
||||||
total_src_rows, formatReadableSizeWithBinarySuffix(total_src_bytes), elapsed_seconds,
|
total_src_rows, ReadableSize(total_src_bytes), elapsed_seconds,
|
||||||
total_src_rows / elapsed_seconds, formatReadableSizeWithBinarySuffix(total_src_bytes / elapsed_seconds));
|
total_src_rows / elapsed_seconds, ReadableSize(total_src_bytes / elapsed_seconds));
|
||||||
|
|
||||||
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
|
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
|
||||||
/// To do this, we pass a block with zero rows to aggregate.
|
/// To do this, we pass a block with zero rows to aggregate.
|
||||||
|
@ -60,7 +60,7 @@ private:
|
|||||||
std::atomic<bool> executed {false};
|
std::atomic<bool> executed {false};
|
||||||
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
|
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
|
||||||
|
|
||||||
Logger * log = &Logger::get("ParallelAggregatingBlockInputStream");
|
Poco::Logger * log = &Poco::Logger::get("ParallelAggregatingBlockInputStream");
|
||||||
|
|
||||||
|
|
||||||
ManyAggregatedDataVariants many_data;
|
ManyAggregatedDataVariants many_data;
|
||||||
|
@ -359,7 +359,7 @@ private:
|
|||||||
/// Wait for the completion of all threads.
|
/// Wait for the completion of all threads.
|
||||||
std::atomic<bool> joined_threads { false };
|
std::atomic<bool> joined_threads { false };
|
||||||
|
|
||||||
Logger * log = &Logger::get("ParallelInputsProcessor");
|
Poco::Logger * log = &Poco::Logger::get("ParallelInputsProcessor");
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -151,7 +151,7 @@ private:
|
|||||||
PoolMode pool_mode = PoolMode::GET_MANY;
|
PoolMode pool_mode = PoolMode::GET_MANY;
|
||||||
StorageID main_table = StorageID::createEmpty();
|
StorageID main_table = StorageID::createEmpty();
|
||||||
|
|
||||||
Logger * log = &Logger::get("RemoteBlockInputStream");
|
Poco::Logger * log = &Poco::Logger::get("RemoteBlockInputStream");
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -16,8 +16,8 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int too_man
|
|||||||
+ ", current rows: " + formatReadableQuantity(rows), too_many_rows_exception_code);
|
+ ", current rows: " + formatReadableQuantity(rows), too_many_rows_exception_code);
|
||||||
|
|
||||||
if (max_bytes && bytes > max_bytes)
|
if (max_bytes && bytes > max_bytes)
|
||||||
throw Exception("Limit for " + std::string(what) + " exceeded, max bytes: " + formatReadableSizeWithBinarySuffix(max_bytes)
|
throw Exception(fmt::format("Limit for {} exceeded, max bytes: {}, current bytes: {}",
|
||||||
+ ", current bytes: " + formatReadableSizeWithBinarySuffix(bytes), too_many_bytes_exception_code);
|
std::string(what), ReadableSize(max_bytes), ReadableSize(bytes)), too_many_bytes_exception_code);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -28,7 +28,7 @@ TTLBlockInputStream::TTLBlockInputStream(
|
|||||||
, current_time(current_time_)
|
, current_time(current_time_)
|
||||||
, force(force_)
|
, force(force_)
|
||||||
, old_ttl_infos(data_part->ttl_infos)
|
, old_ttl_infos(data_part->ttl_infos)
|
||||||
, log(&Logger::get(storage.getLogName() + " (TTLBlockInputStream)"))
|
, log(&Poco::Logger::get(storage.getLogName() + " (TTLBlockInputStream)"))
|
||||||
, date_lut(DateLUT::instance())
|
, date_lut(DateLUT::instance())
|
||||||
{
|
{
|
||||||
children.push_back(input_);
|
children.push_back(input_);
|
||||||
|
@ -52,7 +52,7 @@ private:
|
|||||||
NameSet empty_columns;
|
NameSet empty_columns;
|
||||||
|
|
||||||
size_t rows_removed = 0;
|
size_t rows_removed = 0;
|
||||||
Logger * log;
|
Poco::Logger * log;
|
||||||
const DateLUTImpl & date_lut;
|
const DateLUTImpl & date_lut;
|
||||||
|
|
||||||
/// TODO rewrite defaults logic to evaluteMissingDefaults
|
/// TODO rewrite defaults logic to evaluteMissingDefaults
|
||||||
|
@ -253,7 +253,7 @@ private:
|
|||||||
bool started = false;
|
bool started = false;
|
||||||
bool all_read = false;
|
bool all_read = false;
|
||||||
|
|
||||||
Logger * log = &Logger::get("UnionBlockInputStream");
|
Poco::Logger * log = &Poco::Logger::get("UnionBlockInputStream");
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -20,8 +20,8 @@ try
|
|||||||
using namespace DB;
|
using namespace DB;
|
||||||
|
|
||||||
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
|
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
|
||||||
Logger::root().setChannel(channel);
|
Poco::Logger::root().setChannel(channel);
|
||||||
Logger::root().setLevel("trace");
|
Poco::Logger::root().setLevel("trace");
|
||||||
|
|
||||||
Block block1;
|
Block block1;
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ namespace
|
|||||||
|
|
||||||
DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & global_context_)
|
DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & global_context_)
|
||||||
: IDatabase(name_)
|
: IDatabase(name_)
|
||||||
, log(&Logger::get("DatabaseDictionary(" + database_name + ")"))
|
, log(&Poco::Logger::get("DatabaseDictionary(" + database_name + ")"))
|
||||||
, global_context(global_context_.getGlobalContext())
|
, global_context(global_context_.getGlobalContext())
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@ namespace ErrorCodes
|
|||||||
}
|
}
|
||||||
|
|
||||||
DatabaseWithOwnTablesBase::DatabaseWithOwnTablesBase(const String & name_, const String & logger, const Context & context)
|
DatabaseWithOwnTablesBase::DatabaseWithOwnTablesBase(const String & name_, const String & logger, const Context & context)
|
||||||
: IDatabase(name_), log(&Logger::get(logger)), global_context(context.getGlobalContext())
|
: IDatabase(name_), log(&Poco::Logger::get(logger)), global_context(context.getGlobalContext())
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,7 +85,7 @@ CacheDictionary::CacheDictionary(
|
|||||||
, update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
|
, update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
|
||||||
, query_wait_timeout_milliseconds(query_wait_timeout_milliseconds_)
|
, query_wait_timeout_milliseconds(query_wait_timeout_milliseconds_)
|
||||||
, max_threads_for_updates(max_threads_for_updates_)
|
, max_threads_for_updates(max_threads_for_updates_)
|
||||||
, log(&Logger::get("ExternalDictionaries"))
|
, log(&Poco::Logger::get("ExternalDictionaries"))
|
||||||
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
|
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
|
||||||
, size_overlap_mask{this->size - 1}
|
, size_overlap_mask{this->size - 1}
|
||||||
, cells{this->size}
|
, cells{this->size}
|
||||||
|
@ -314,7 +314,7 @@ private:
|
|||||||
const size_t query_wait_timeout_milliseconds;
|
const size_t query_wait_timeout_milliseconds;
|
||||||
const size_t max_threads_for_updates;
|
const size_t max_threads_for_updates;
|
||||||
|
|
||||||
Logger * const log;
|
Poco::Logger * log;
|
||||||
|
|
||||||
mutable std::shared_mutex rw_lock;
|
mutable std::shared_mutex rw_lock;
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
RegionsHierarchies::RegionsHierarchies(IRegionsHierarchiesDataProviderPtr data_provider)
|
RegionsHierarchies::RegionsHierarchies(IRegionsHierarchiesDataProviderPtr data_provider)
|
||||||
{
|
{
|
||||||
Logger * log = &Logger::get("RegionsHierarchies");
|
Poco::Logger * log = &Poco::Logger::get("RegionsHierarchies");
|
||||||
|
|
||||||
LOG_DEBUG(log, "Adding default regions hierarchy");
|
LOG_DEBUG(log, "Adding default regions hierarchy");
|
||||||
data.emplace("", data_provider->getDefaultHierarchySource());
|
data.emplace("", data_provider->getDefaultHierarchySource());
|
||||||
|
@ -23,7 +23,7 @@ RegionsHierarchy::RegionsHierarchy(IRegionsHierarchyDataSourcePtr data_source_)
|
|||||||
|
|
||||||
void RegionsHierarchy::reload()
|
void RegionsHierarchy::reload()
|
||||||
{
|
{
|
||||||
Logger * log = &Logger::get("RegionsHierarchy");
|
Poco::Logger * log = &Poco::Logger::get("RegionsHierarchy");
|
||||||
|
|
||||||
if (!data_source->isModified())
|
if (!data_source->isModified())
|
||||||
return;
|
return;
|
||||||
|
@ -42,7 +42,7 @@ std::string RegionsNames::dumpSupportedLanguagesNames()
|
|||||||
|
|
||||||
void RegionsNames::reload()
|
void RegionsNames::reload()
|
||||||
{
|
{
|
||||||
Logger * log = &Logger::get("RegionsNames");
|
Poco::Logger * log = &Poco::Logger::get("RegionsNames");
|
||||||
LOG_DEBUG(log, "Reloading regions names");
|
LOG_DEBUG(log, "Reloading regions names");
|
||||||
|
|
||||||
RegionID max_region_id = 0;
|
RegionID max_region_id = 0;
|
||||||
|
@ -53,7 +53,7 @@ ExecutableDictionarySource::ExecutableDictionarySource(
|
|||||||
const std::string & config_prefix,
|
const std::string & config_prefix,
|
||||||
Block & sample_block_,
|
Block & sample_block_,
|
||||||
const Context & context_)
|
const Context & context_)
|
||||||
: log(&Logger::get("ExecutableDictionarySource"))
|
: log(&Poco::Logger::get("ExecutableDictionarySource"))
|
||||||
, dict_struct{dict_struct_}
|
, dict_struct{dict_struct_}
|
||||||
, command{config.getString(config_prefix + ".command")}
|
, command{config.getString(config_prefix + ".command")}
|
||||||
, update_field{config.getString(config_prefix + ".update_field", "")}
|
, update_field{config.getString(config_prefix + ".update_field", "")}
|
||||||
@ -64,7 +64,7 @@ ExecutableDictionarySource::ExecutableDictionarySource(
|
|||||||
}
|
}
|
||||||
|
|
||||||
ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other)
|
ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other)
|
||||||
: log(&Logger::get("ExecutableDictionarySource"))
|
: log(&Poco::Logger::get("ExecutableDictionarySource"))
|
||||||
, update_time{other.update_time}
|
, update_time{other.update_time}
|
||||||
, dict_struct{other.dict_struct}
|
, dict_struct{other.dict_struct}
|
||||||
, command{other.command}
|
, command{other.command}
|
||||||
|
@ -31,7 +31,7 @@ HTTPDictionarySource::HTTPDictionarySource(
|
|||||||
Block & sample_block_,
|
Block & sample_block_,
|
||||||
const Context & context_,
|
const Context & context_,
|
||||||
bool check_config)
|
bool check_config)
|
||||||
: log(&Logger::get("HTTPDictionarySource"))
|
: log(&Poco::Logger::get("HTTPDictionarySource"))
|
||||||
, update_time{std::chrono::system_clock::from_time_t(0)}
|
, update_time{std::chrono::system_clock::from_time_t(0)}
|
||||||
, dict_struct{dict_struct_}
|
, dict_struct{dict_struct_}
|
||||||
, url{config.getString(config_prefix + ".url", "")}
|
, url{config.getString(config_prefix + ".url", "")}
|
||||||
@ -71,7 +71,7 @@ HTTPDictionarySource::HTTPDictionarySource(
|
|||||||
}
|
}
|
||||||
|
|
||||||
HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
|
HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
|
||||||
: log(&Logger::get("HTTPDictionarySource"))
|
: log(&Poco::Logger::get("HTTPDictionarySource"))
|
||||||
, update_time{other.update_time}
|
, update_time{other.update_time}
|
||||||
, dict_struct{other.dict_struct}
|
, dict_struct{other.dict_struct}
|
||||||
, url{other.url}
|
, url{other.url}
|
||||||
|
@ -125,7 +125,7 @@ LibraryDictionarySource::LibraryDictionarySource(
|
|||||||
Block & sample_block_,
|
Block & sample_block_,
|
||||||
const Context & context,
|
const Context & context,
|
||||||
bool check_config)
|
bool check_config)
|
||||||
: log(&Logger::get("LibraryDictionarySource"))
|
: log(&Poco::Logger::get("LibraryDictionarySource"))
|
||||||
, dict_struct{dict_struct_}
|
, dict_struct{dict_struct_}
|
||||||
, config_prefix{config_prefix_}
|
, config_prefix{config_prefix_}
|
||||||
, path{config.getString(config_prefix + ".path", "")}
|
, path{config.getString(config_prefix + ".path", "")}
|
||||||
@ -157,7 +157,7 @@ LibraryDictionarySource::LibraryDictionarySource(
|
|||||||
}
|
}
|
||||||
|
|
||||||
LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource & other)
|
LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource & other)
|
||||||
: log(&Logger::get("LibraryDictionarySource"))
|
: log(&Poco::Logger::get("LibraryDictionarySource"))
|
||||||
, dict_struct{other.dict_struct}
|
, dict_struct{other.dict_struct}
|
||||||
, config_prefix{other.config_prefix}
|
, config_prefix{other.config_prefix}
|
||||||
, path{other.path}
|
, path{other.path}
|
||||||
|
@ -10,7 +10,7 @@ void ClickHouseLibrary::log(ClickHouseLibrary::LogLevel level, ClickHouseLibrary
|
|||||||
{
|
{
|
||||||
using ClickHouseLibrary::LogLevel;
|
using ClickHouseLibrary::LogLevel;
|
||||||
|
|
||||||
auto & logger = Logger::get(DICT_LOGGER_NAME);
|
auto & logger = Poco::Logger::get(DICT_LOGGER_NAME);
|
||||||
switch (level)
|
switch (level)
|
||||||
{
|
{
|
||||||
case LogLevel::TRACE:
|
case LogLevel::TRACE:
|
||||||
|
@ -58,7 +58,7 @@ MySQLDictionarySource::MySQLDictionarySource(
|
|||||||
const Poco::Util::AbstractConfiguration & config,
|
const Poco::Util::AbstractConfiguration & config,
|
||||||
const std::string & config_prefix,
|
const std::string & config_prefix,
|
||||||
const Block & sample_block_)
|
const Block & sample_block_)
|
||||||
: log(&Logger::get("MySQLDictionarySource"))
|
: log(&Poco::Logger::get("MySQLDictionarySource"))
|
||||||
, update_time{std::chrono::system_clock::from_time_t(0)}
|
, update_time{std::chrono::system_clock::from_time_t(0)}
|
||||||
, dict_struct{dict_struct_}
|
, dict_struct{dict_struct_}
|
||||||
, db{config.getString(config_prefix + ".db", "")}
|
, db{config.getString(config_prefix + ".db", "")}
|
||||||
@ -77,7 +77,7 @@ MySQLDictionarySource::MySQLDictionarySource(
|
|||||||
|
|
||||||
/// copy-constructor is provided in order to support cloneability
|
/// copy-constructor is provided in order to support cloneability
|
||||||
MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other)
|
MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other)
|
||||||
: log(&Logger::get("MySQLDictionarySource"))
|
: log(&Poco::Logger::get("MySQLDictionarySource"))
|
||||||
, update_time{other.update_time}
|
, update_time{other.update_time}
|
||||||
, dict_struct{other.dict_struct}
|
, dict_struct{other.dict_struct}
|
||||||
, db{other.db}
|
, db{other.db}
|
||||||
|
@ -253,7 +253,7 @@ private:
|
|||||||
|
|
||||||
std::exception_ptr creation_exception;
|
std::exception_ptr creation_exception;
|
||||||
|
|
||||||
Logger * logger;
|
Poco::Logger * logger;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -72,7 +72,7 @@ XDBCDictionarySource::XDBCDictionarySource(
|
|||||||
const Block & sample_block_,
|
const Block & sample_block_,
|
||||||
const Context & context_,
|
const Context & context_,
|
||||||
const BridgeHelperPtr bridge_)
|
const BridgeHelperPtr bridge_)
|
||||||
: log(&Logger::get(bridge_->getName() + "DictionarySource"))
|
: log(&Poco::Logger::get(bridge_->getName() + "DictionarySource"))
|
||||||
, update_time{std::chrono::system_clock::from_time_t(0)}
|
, update_time{std::chrono::system_clock::from_time_t(0)}
|
||||||
, dict_struct{dict_struct_}
|
, dict_struct{dict_struct_}
|
||||||
, db{config_.getString(config_prefix_ + ".db", "")}
|
, db{config_.getString(config_prefix_ + ".db", "")}
|
||||||
@ -96,7 +96,7 @@ XDBCDictionarySource::XDBCDictionarySource(
|
|||||||
|
|
||||||
/// copy-constructor is provided in order to support cloneability
|
/// copy-constructor is provided in order to support cloneability
|
||||||
XDBCDictionarySource::XDBCDictionarySource(const XDBCDictionarySource & other)
|
XDBCDictionarySource::XDBCDictionarySource(const XDBCDictionarySource & other)
|
||||||
: log(&Logger::get(other.bridge_helper->getName() + "DictionarySource"))
|
: log(&Poco::Logger::get(other.bridge_helper->getName() + "DictionarySource"))
|
||||||
, update_time{other.update_time}
|
, update_time{other.update_time}
|
||||||
, dict_struct{other.dict_struct}
|
, dict_struct{other.dict_struct}
|
||||||
, db{other.db}
|
, db{other.db}
|
||||||
|
@ -91,7 +91,7 @@ bool DiskLocal::tryReserve(UInt64 bytes)
|
|||||||
std::lock_guard lock(DiskLocal::reservation_mutex);
|
std::lock_guard lock(DiskLocal::reservation_mutex);
|
||||||
if (bytes == 0)
|
if (bytes == 0)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Logger::get("DiskLocal"), "Reserving 0 bytes on disk {}", backQuote(name));
|
LOG_DEBUG(&Poco::Logger::get("DiskLocal"), "Reserving 0 bytes on disk {}", backQuote(name));
|
||||||
++reservation_count;
|
++reservation_count;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -100,8 +100,8 @@ bool DiskLocal::tryReserve(UInt64 bytes)
|
|||||||
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
|
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
|
||||||
if (unreserved_space >= bytes)
|
if (unreserved_space >= bytes)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Logger::get("DiskLocal"), "Reserving {} on disk {}, having unreserved {}.",
|
LOG_DEBUG(&Poco::Logger::get("DiskLocal"), "Reserving {} on disk {}, having unreserved {}.",
|
||||||
formatReadableSizeWithBinarySuffix(bytes), backQuote(name), formatReadableSizeWithBinarySuffix(unreserved_space));
|
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
|
||||||
++reservation_count;
|
++reservation_count;
|
||||||
reserved_bytes += bytes;
|
reserved_bytes += bytes;
|
||||||
return true;
|
return true;
|
||||||
@ -310,7 +310,7 @@ DiskLocalReservation::~DiskLocalReservation()
|
|||||||
if (disk->reserved_bytes < size)
|
if (disk->reserved_bytes < size)
|
||||||
{
|
{
|
||||||
disk->reserved_bytes = 0;
|
disk->reserved_bytes = 0;
|
||||||
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName());
|
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -318,7 +318,7 @@ DiskLocalReservation::~DiskLocalReservation()
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (disk->reservation_count == 0)
|
if (disk->reservation_count == 0)
|
||||||
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName());
|
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName());
|
||||||
else
|
else
|
||||||
--disk->reservation_count;
|
--disk->reservation_count;
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
|
|||||||
}
|
}
|
||||||
|
|
||||||
writeString(" disappeared from configuration, this change will be applied after restart of ClickHouse", warning);
|
writeString(" disappeared from configuration, this change will be applied after restart of ClickHouse", warning);
|
||||||
LOG_WARNING(&Logger::get("DiskSelector"), warning.str());
|
LOG_WARNING(&Poco::Logger::get("DiskSelector"), warning.str());
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
@ -515,7 +515,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
|
|||||||
{
|
{
|
||||||
Metadata metadata(metadata_path, path);
|
Metadata metadata(metadata_path, path);
|
||||||
|
|
||||||
LOG_DEBUG(&Logger::get("DiskS3"), "Read from file by path: {}. Existing S3 objects: {}",
|
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Read from file by path: {}. Existing S3 objects: {}",
|
||||||
backQuote(metadata_path + path), metadata.s3_objects.size());
|
backQuote(metadata_path + path), metadata.s3_objects.size());
|
||||||
|
|
||||||
return std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, buf_size);
|
return std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, buf_size);
|
||||||
@ -536,7 +536,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
|||||||
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
|
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
|
||||||
metadata.save();
|
metadata.save();
|
||||||
|
|
||||||
LOG_DEBUG(&Logger::get("DiskS3"), "Write to file by path: {} New S3 path: {}", backQuote(metadata_path + path), s3_path);
|
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {} New S3 path: {}", backQuote(metadata_path + path), s3_path);
|
||||||
|
|
||||||
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, min_upload_part_size, buf_size);
|
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, min_upload_part_size, buf_size);
|
||||||
}
|
}
|
||||||
@ -544,7 +544,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
|||||||
{
|
{
|
||||||
Metadata metadata(metadata_path, path);
|
Metadata metadata(metadata_path, path);
|
||||||
|
|
||||||
LOG_DEBUG(&Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.",
|
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.",
|
||||||
backQuote(metadata_path + path), s3_path, metadata.s3_objects.size());
|
backQuote(metadata_path + path), s3_path, metadata.s3_objects.size());
|
||||||
|
|
||||||
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, min_upload_part_size, buf_size);
|
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, min_upload_part_size, buf_size);
|
||||||
@ -553,7 +553,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
|||||||
|
|
||||||
void DiskS3::remove(const String & path)
|
void DiskS3::remove(const String & path)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Logger::get("DiskS3"), "Remove file by path: {}", backQuote(metadata_path + path));
|
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Remove file by path: {}", backQuote(metadata_path + path));
|
||||||
|
|
||||||
Poco::File file(metadata_path + path);
|
Poco::File file(metadata_path + path);
|
||||||
if (file.isFile())
|
if (file.isFile())
|
||||||
@ -607,7 +607,7 @@ bool DiskS3::tryReserve(UInt64 bytes)
|
|||||||
std::lock_guard lock(reservation_mutex);
|
std::lock_guard lock(reservation_mutex);
|
||||||
if (bytes == 0)
|
if (bytes == 0)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Logger::get("DiskS3"), "Reserving 0 bytes on s3 disk {}", backQuote(name));
|
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Reserving 0 bytes on s3 disk {}", backQuote(name));
|
||||||
++reservation_count;
|
++reservation_count;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -616,8 +616,8 @@ bool DiskS3::tryReserve(UInt64 bytes)
|
|||||||
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
|
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
|
||||||
if (unreserved_space >= bytes)
|
if (unreserved_space >= bytes)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Logger::get("DiskS3"), "Reserving {} on disk {}, having unreserved {}.",
|
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Reserving {} on disk {}, having unreserved {}.",
|
||||||
formatReadableSizeWithBinarySuffix(bytes), backQuote(name), formatReadableSizeWithBinarySuffix(unreserved_space));
|
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
|
||||||
++reservation_count;
|
++reservation_count;
|
||||||
reserved_bytes += bytes;
|
reserved_bytes += bytes;
|
||||||
return true;
|
return true;
|
||||||
@ -672,7 +672,7 @@ DiskS3Reservation::~DiskS3Reservation()
|
|||||||
if (disk->reserved_bytes < size)
|
if (disk->reserved_bytes < size)
|
||||||
{
|
{
|
||||||
disk->reserved_bytes = 0;
|
disk->reserved_bytes = 0;
|
||||||
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName());
|
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -680,7 +680,7 @@ DiskS3Reservation::~DiskS3Reservation()
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (disk->reservation_count == 0)
|
if (disk->reservation_count == 0)
|
||||||
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName());
|
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName());
|
||||||
else
|
else
|
||||||
--disk->reservation_count;
|
--disk->reservation_count;
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@ Aws::Client::ClientConfigurationPerRequest ProxyListConfiguration::getConfigurat
|
|||||||
cfg.proxyHost = proxies[index].getHost();
|
cfg.proxyHost = proxies[index].getHost();
|
||||||
cfg.proxyPort = proxies[index].getPort();
|
cfg.proxyPort = proxies[index].getPort();
|
||||||
|
|
||||||
LOG_DEBUG(&Logger::get("AWSClient"), "Use proxy: {}", proxies[index].toString());
|
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use proxy: {}", proxies[index].toString());
|
||||||
|
|
||||||
return cfg;
|
return cfg;
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@ ProxyResolverConfiguration::ProxyResolverConfiguration(const Poco::URI & endpoin
|
|||||||
|
|
||||||
Aws::Client::ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const Aws::Http::HttpRequest &)
|
Aws::Client::ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const Aws::Http::HttpRequest &)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Logger::get("AWSClient"), "Obtain proxy using resolver: {}", endpoint.toString());
|
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Obtain proxy using resolver: {}", endpoint.toString());
|
||||||
|
|
||||||
/// 1 second is enough for now.
|
/// 1 second is enough for now.
|
||||||
/// TODO: Make timeouts configurable.
|
/// TODO: Make timeouts configurable.
|
||||||
@ -49,7 +49,7 @@ Aws::Client::ClientConfigurationPerRequest ProxyResolverConfiguration::getConfig
|
|||||||
/// Read proxy host as string from response body.
|
/// Read proxy host as string from response body.
|
||||||
Poco::StreamCopier::copyToString(response_body_stream, proxy_host);
|
Poco::StreamCopier::copyToString(response_body_stream, proxy_host);
|
||||||
|
|
||||||
LOG_DEBUG(&Logger::get("AWSClient"), "Use proxy: {}://{}:{}", proxy_scheme, proxy_host, proxy_port);
|
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use proxy: {}://{}:{}", proxy_scheme, proxy_host, proxy_port);
|
||||||
|
|
||||||
cfg.proxyScheme = Aws::Http::SchemeMapper::FromString(proxy_scheme.c_str());
|
cfg.proxyScheme = Aws::Http::SchemeMapper::FromString(proxy_scheme.c_str());
|
||||||
cfg.proxyHost = proxy_host;
|
cfg.proxyHost = proxy_host;
|
||||||
|
@ -46,7 +46,7 @@ namespace
|
|||||||
throw Exception("Only HTTP/HTTPS schemas allowed in proxy resolver config: " + proxy_scheme, ErrorCodes::BAD_ARGUMENTS);
|
throw Exception("Only HTTP/HTTPS schemas allowed in proxy resolver config: " + proxy_scheme, ErrorCodes::BAD_ARGUMENTS);
|
||||||
auto proxy_port = proxy_resolver_config.getUInt(prefix + ".proxy_port");
|
auto proxy_port = proxy_resolver_config.getUInt(prefix + ".proxy_port");
|
||||||
|
|
||||||
LOG_DEBUG(&Logger::get("DiskS3"), "Configured proxy resolver: {}, Scheme: {}, Port: {}", endpoint.toString(), proxy_scheme, proxy_port);
|
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy resolver: {}, Scheme: {}, Port: {}", endpoint.toString(), proxy_scheme, proxy_port);
|
||||||
|
|
||||||
return std::make_shared<S3::ProxyResolverConfiguration>(endpoint, proxy_scheme, proxy_port);
|
return std::make_shared<S3::ProxyResolverConfiguration>(endpoint, proxy_scheme, proxy_port);
|
||||||
}
|
}
|
||||||
@ -70,7 +70,7 @@ namespace
|
|||||||
|
|
||||||
proxies.push_back(proxy_uri);
|
proxies.push_back(proxy_uri);
|
||||||
|
|
||||||
LOG_DEBUG(&Logger::get("DiskS3"), "Configured proxy: {}", proxy_uri.toString());
|
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy: {}", proxy_uri.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!proxies.empty())
|
if (!proxies.empty())
|
||||||
|
@ -245,7 +245,7 @@ StoragePolicySelector::StoragePolicySelector(
|
|||||||
"StoragePolicy name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
|
"StoragePolicy name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
|
||||||
|
|
||||||
policies.emplace(name, std::make_shared<StoragePolicy>(name, config, config_prefix + "." + name, disks));
|
policies.emplace(name, std::make_shared<StoragePolicy>(name, config, config_prefix + "." + name, disks));
|
||||||
LOG_INFO(&Logger::get("StoragePolicySelector"), "Storage policy {} loaded", backQuote(name));
|
LOG_INFO(&Poco::Logger::get("StoragePolicySelector"), "Storage policy {} loaded", backQuote(name));
|
||||||
}
|
}
|
||||||
|
|
||||||
constexpr auto default_storage_policy_name = "default";
|
constexpr auto default_storage_policy_name = "default";
|
||||||
|
@ -20,7 +20,7 @@ VolumeJBOD::VolumeJBOD(
|
|||||||
DiskSelectorPtr disk_selector
|
DiskSelectorPtr disk_selector
|
||||||
) : IVolume(name_, config, config_prefix, disk_selector)
|
) : IVolume(name_, config, config_prefix, disk_selector)
|
||||||
{
|
{
|
||||||
Logger * logger = &Logger::get("StorageConfiguration");
|
Poco::Logger * logger = &Poco::Logger::get("StorageConfiguration");
|
||||||
|
|
||||||
auto has_max_bytes = config.has(config_prefix + ".max_data_part_size_bytes");
|
auto has_max_bytes = config.has(config_prefix + ".max_data_part_size_bytes");
|
||||||
auto has_max_ratio = config.has(config_prefix + ".max_data_part_size_ratio");
|
auto has_max_ratio = config.has(config_prefix + ".max_data_part_size_ratio");
|
||||||
@ -48,11 +48,11 @@ VolumeJBOD::VolumeJBOD(
|
|||||||
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio / disks.size());
|
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio / disks.size());
|
||||||
for (size_t i = 0; i < disks.size(); ++i)
|
for (size_t i = 0; i < disks.size(); ++i)
|
||||||
if (sizes[i] < max_data_part_size)
|
if (sizes[i] < max_data_part_size)
|
||||||
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})", backQuote(disks[i]->getName()), backQuote(config_prefix), formatReadableSizeWithBinarySuffix(sizes[i]), formatReadableSizeWithBinarySuffix(max_data_part_size));
|
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})", backQuote(disks[i]->getName()), backQuote(config_prefix), ReadableSize(sizes[i]), ReadableSize(max_data_part_size));
|
||||||
}
|
}
|
||||||
static constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u;
|
static constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u;
|
||||||
if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE)
|
if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE)
|
||||||
LOG_WARNING(logger, "Volume {} max_data_part_size is too low ({} < {})", backQuote(name), formatReadableSizeWithBinarySuffix(max_data_part_size), formatReadableSizeWithBinarySuffix(MIN_PART_SIZE));
|
LOG_WARNING(logger, "Volume {} max_data_part_size is too low ({} < {})", backQuote(name), ReadableSize(max_data_part_size), ReadableSize(MIN_PART_SIZE));
|
||||||
}
|
}
|
||||||
|
|
||||||
DiskPtr VolumeJBOD::getNextDisk()
|
DiskPtr VolumeJBOD::getNextDisk()
|
||||||
|
@ -173,7 +173,7 @@ namespace
|
|||||||
auto msg = Poco::AnyCast<std::string>(session_data);
|
auto msg = Poco::AnyCast<std::string>(session_data);
|
||||||
if (!msg.empty())
|
if (!msg.empty())
|
||||||
{
|
{
|
||||||
LOG_TRACE((&Logger::get("HTTPCommon")), "Failed communicating with {} with error '{}' will try to reconnect session", host, msg);
|
LOG_TRACE((&Poco::Logger::get("HTTPCommon")), "Failed communicating with {} with error '{}' will try to reconnect session", host, msg);
|
||||||
/// Host can change IP
|
/// Host can change IP
|
||||||
const auto ip = DNSResolver::instance().resolveHost(host).toString();
|
const auto ip = DNSResolver::instance().resolveHost(host).toString();
|
||||||
if (ip != session->getHost())
|
if (ip != session->getHost())
|
||||||
|
@ -34,7 +34,7 @@ void MMapReadBufferFromFileDescriptor::init(int fd_, size_t offset, size_t lengt
|
|||||||
{
|
{
|
||||||
void * buf = mmap(nullptr, length, PROT_READ, MAP_PRIVATE, fd, offset);
|
void * buf = mmap(nullptr, length, PROT_READ, MAP_PRIVATE, fd, offset);
|
||||||
if (MAP_FAILED == buf)
|
if (MAP_FAILED == buf)
|
||||||
throwFromErrno("MMapReadBufferFromFileDescriptor: Cannot mmap " + formatReadableSizeWithBinarySuffix(length) + ".",
|
throwFromErrno(fmt::format("MMapReadBufferFromFileDescriptor: Cannot mmap {}.", ReadableSize(length)),
|
||||||
ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||||
|
|
||||||
BufferBase::set(static_cast<char *>(buf), length, 0);
|
BufferBase::set(static_cast<char *>(buf), length, 0);
|
||||||
@ -84,7 +84,7 @@ MMapReadBufferFromFileDescriptor::~MMapReadBufferFromFileDescriptor()
|
|||||||
void MMapReadBufferFromFileDescriptor::finish()
|
void MMapReadBufferFromFileDescriptor::finish()
|
||||||
{
|
{
|
||||||
if (0 != munmap(internalBuffer().begin(), length))
|
if (0 != munmap(internalBuffer().begin(), length))
|
||||||
throwFromErrno("MMapReadBufferFromFileDescriptor: Cannot munmap " + formatReadableSizeWithBinarySuffix(length) + ".",
|
throwFromErrno(fmt::format("MMapReadBufferFromFileDescriptor: Cannot munmap {}.", ReadableSize(length)),
|
||||||
ErrorCodes::CANNOT_MUNMAP);
|
ErrorCodes::CANNOT_MUNMAP);
|
||||||
|
|
||||||
length = 0;
|
length = 0;
|
||||||
|
@ -33,7 +33,7 @@ private:
|
|||||||
Aws::S3::Model::GetObjectResult read_result;
|
Aws::S3::Model::GetObjectResult read_result;
|
||||||
std::unique_ptr<ReadBuffer> impl;
|
std::unique_ptr<ReadBuffer> impl;
|
||||||
|
|
||||||
Logger * log = &Logger::get("ReadBufferFromS3");
|
Poco::Logger * log = &Poco::Logger::get("ReadBufferFromS3");
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit ReadBufferFromS3(
|
explicit ReadBufferFromS3(
|
||||||
|
@ -127,7 +127,7 @@ namespace detail
|
|||||||
if (!credentials.getUsername().empty())
|
if (!credentials.getUsername().empty())
|
||||||
credentials.authenticate(request);
|
credentials.authenticate(request);
|
||||||
|
|
||||||
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to {}", uri.toString());
|
LOG_TRACE((&Poco::Logger::get("ReadWriteBufferFromHTTP")), "Sending request to {}", uri.toString());
|
||||||
|
|
||||||
auto sess = session->getSession();
|
auto sess = session->getSession();
|
||||||
|
|
||||||
|
@ -16,16 +16,17 @@
|
|||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
const std::pair<LogsLevel, Message::Priority> & convertLogLevel(Aws::Utils::Logging::LogLevel log_level)
|
const std::pair<DB::LogsLevel, Poco::Message::Priority> & convertLogLevel(Aws::Utils::Logging::LogLevel log_level)
|
||||||
{
|
{
|
||||||
static const std::unordered_map<Aws::Utils::Logging::LogLevel, std::pair<LogsLevel, Message::Priority>> mapping = {
|
static const std::unordered_map<Aws::Utils::Logging::LogLevel, std::pair<DB::LogsLevel, Poco::Message::Priority>> mapping =
|
||||||
{Aws::Utils::Logging::LogLevel::Off, {LogsLevel::none, Message::PRIO_FATAL}},
|
{
|
||||||
{Aws::Utils::Logging::LogLevel::Fatal, {LogsLevel::error, Message::PRIO_FATAL}},
|
{Aws::Utils::Logging::LogLevel::Off, {DB::LogsLevel::none, Poco::Message::PRIO_FATAL}},
|
||||||
{Aws::Utils::Logging::LogLevel::Error, {LogsLevel::error, Message::PRIO_ERROR}},
|
{Aws::Utils::Logging::LogLevel::Fatal, {DB::LogsLevel::error, Poco::Message::PRIO_FATAL}},
|
||||||
{Aws::Utils::Logging::LogLevel::Warn, {LogsLevel::warning, Message::PRIO_WARNING}},
|
{Aws::Utils::Logging::LogLevel::Error, {DB::LogsLevel::error, Poco::Message::PRIO_ERROR}},
|
||||||
{Aws::Utils::Logging::LogLevel::Info, {LogsLevel::information, Message::PRIO_INFORMATION}},
|
{Aws::Utils::Logging::LogLevel::Warn, {DB::LogsLevel::warning, Poco::Message::PRIO_WARNING}},
|
||||||
{Aws::Utils::Logging::LogLevel::Debug, {LogsLevel::debug, Message::PRIO_DEBUG}},
|
{Aws::Utils::Logging::LogLevel::Info, {DB::LogsLevel::information, Poco::Message::PRIO_INFORMATION}},
|
||||||
{Aws::Utils::Logging::LogLevel::Trace, {LogsLevel::trace, Message::PRIO_TRACE}},
|
{Aws::Utils::Logging::LogLevel::Debug, {DB::LogsLevel::debug, Poco::Message::PRIO_DEBUG}},
|
||||||
|
{Aws::Utils::Logging::LogLevel::Trace, {DB::LogsLevel::trace, Poco::Message::PRIO_TRACE}},
|
||||||
};
|
};
|
||||||
return mapping.at(log_level);
|
return mapping.at(log_level);
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,7 @@ WriteBufferFromHTTP::WriteBufferFromHTTP(
|
|||||||
request.setHost(uri.getHost());
|
request.setHost(uri.getHost());
|
||||||
request.setChunkedTransferEncoding(true);
|
request.setChunkedTransferEncoding(true);
|
||||||
|
|
||||||
LOG_TRACE((&Logger::get("WriteBufferToHTTP")), "Sending request to {}", uri.toString());
|
LOG_TRACE((&Poco::Logger::get("WriteBufferToHTTP")), "Sending request to {}", uri.toString());
|
||||||
|
|
||||||
ostr = &session->sendRequest(request);
|
ostr = &session->sendRequest(request);
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ private:
|
|||||||
String upload_id;
|
String upload_id;
|
||||||
std::vector<String> part_tags;
|
std::vector<String> part_tags;
|
||||||
|
|
||||||
Logger * log = &Logger::get("WriteBufferFromS3");
|
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit WriteBufferFromS3(
|
explicit WriteBufferFromS3(
|
||||||
|
@ -56,7 +56,8 @@ void NO_INLINE loop(ReadBuffer & in, WriteBuffer & out)
|
|||||||
}
|
}
|
||||||
|
|
||||||
watch.stop();
|
watch.stop();
|
||||||
out << "Read in " << watch.elapsedSeconds() << " sec, " << formatReadableSizeWithBinarySuffix(in.count() / watch.elapsedSeconds()) << "/sec, result = " << sum << "\n";
|
out << "Read in " << watch.elapsedSeconds() << " sec, "
|
||||||
|
<< formatReadableSizeWithBinarySuffix(in.count() / watch.elapsedSeconds()) << "/sec, result = " << sum << "\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -768,14 +768,14 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
|
|||||||
" ({} rows/sec., {}/sec. uncompressed, {}/sec. compressed)",
|
" ({} rows/sec., {}/sec. uncompressed, {}/sec. compressed)",
|
||||||
elapsed_seconds,
|
elapsed_seconds,
|
||||||
rows,
|
rows,
|
||||||
formatReadableSizeWithBinarySuffix(uncompressed_bytes),
|
ReadableSize(uncompressed_bytes),
|
||||||
formatReadableSizeWithBinarySuffix(compressed_bytes),
|
ReadableSize(compressed_bytes),
|
||||||
uncompressed_bytes / rows,
|
uncompressed_bytes / rows,
|
||||||
compressed_bytes / rows,
|
compressed_bytes / rows,
|
||||||
uncompressed_bytes / compressed_bytes,
|
uncompressed_bytes / compressed_bytes,
|
||||||
rows / elapsed_seconds,
|
rows / elapsed_seconds,
|
||||||
formatReadableSizeWithBinarySuffix(uncompressed_bytes / elapsed_seconds),
|
ReadableSize(uncompressed_bytes / elapsed_seconds),
|
||||||
formatReadableSizeWithBinarySuffix(compressed_bytes / elapsed_seconds));
|
ReadableSize(compressed_bytes / elapsed_seconds));
|
||||||
}
|
}
|
||||||
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
|
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
|
||||||
{
|
{
|
||||||
@ -871,7 +871,7 @@ void Aggregator::writeToTemporaryFileImpl(
|
|||||||
/// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
|
/// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
|
||||||
data_variants.aggregator = nullptr;
|
data_variants.aggregator = nullptr;
|
||||||
|
|
||||||
LOG_TRACE(log, "Max size of temporary block: {} rows, {}.", max_temporary_block_size_rows, formatReadableSizeWithBinarySuffix(max_temporary_block_size_bytes));
|
LOG_TRACE(log, "Max size of temporary block: {} rows, {}.", max_temporary_block_size_rows, ReadableSize(max_temporary_block_size_bytes));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -943,9 +943,9 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
|
|||||||
size_t rows = result.sizeWithoutOverflowRow();
|
size_t rows = result.sizeWithoutOverflowRow();
|
||||||
|
|
||||||
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
|
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
|
||||||
src_rows, rows, formatReadableSizeWithBinarySuffix(src_bytes),
|
src_rows, rows, ReadableSize(src_bytes),
|
||||||
elapsed_seconds, src_rows / elapsed_seconds,
|
elapsed_seconds, src_rows / elapsed_seconds,
|
||||||
formatReadableSizeWithBinarySuffix(src_bytes / elapsed_seconds));
|
ReadableSize(src_bytes / elapsed_seconds));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1315,9 +1315,9 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
|
|||||||
double elapsed_seconds = watch.elapsedSeconds();
|
double elapsed_seconds = watch.elapsedSeconds();
|
||||||
LOG_TRACE(log,
|
LOG_TRACE(log,
|
||||||
"Converted aggregated data to blocks. {} rows, {} in {} sec. ({} rows/sec., {}/sec.)",
|
"Converted aggregated data to blocks. {} rows, {} in {} sec. ({} rows/sec., {}/sec.)",
|
||||||
rows, formatReadableSizeWithBinarySuffix(bytes),
|
rows, ReadableSize(bytes),
|
||||||
elapsed_seconds, rows / elapsed_seconds,
|
elapsed_seconds, rows / elapsed_seconds,
|
||||||
formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds));
|
ReadableSize(bytes / elapsed_seconds));
|
||||||
|
|
||||||
return blocks;
|
return blocks;
|
||||||
}
|
}
|
||||||
@ -1567,7 +1567,7 @@ public:
|
|||||||
|
|
||||||
~MergingAndConvertingBlockInputStream() override
|
~MergingAndConvertingBlockInputStream() override
|
||||||
{
|
{
|
||||||
LOG_TRACE(&Logger::get(__PRETTY_FUNCTION__), "Waiting for threads to finish");
|
LOG_TRACE(&Poco::Logger::get(__PRETTY_FUNCTION__), "Waiting for threads to finish");
|
||||||
|
|
||||||
/// We need to wait for threads to finish before destructor of 'parallel_merge_data',
|
/// We need to wait for threads to finish before destructor of 'parallel_merge_data',
|
||||||
/// because the threads access 'parallel_merge_data'.
|
/// because the threads access 'parallel_merge_data'.
|
||||||
@ -2186,9 +2186,9 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
|
|||||||
size_t bytes = block.bytes();
|
size_t bytes = block.bytes();
|
||||||
double elapsed_seconds = watch.elapsedSeconds();
|
double elapsed_seconds = watch.elapsedSeconds();
|
||||||
LOG_TRACE(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({} rows/sec., {}/sec.)",
|
LOG_TRACE(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({} rows/sec., {}/sec.)",
|
||||||
rows, formatReadableSizeWithBinarySuffix(bytes),
|
rows, ReadableSize(bytes),
|
||||||
elapsed_seconds, rows / elapsed_seconds,
|
elapsed_seconds, rows / elapsed_seconds,
|
||||||
formatReadableSizeWithBinarySuffix(bytes / elapsed_seconds));
|
ReadableSize(bytes / elapsed_seconds));
|
||||||
|
|
||||||
if (isCancelled())
|
if (isCancelled())
|
||||||
return {};
|
return {};
|
||||||
|
@ -1048,7 +1048,7 @@ protected:
|
|||||||
|
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
|
|
||||||
Logger * log = &Logger::get("Aggregator");
|
Poco::Logger * log = &Poco::Logger::get("Aggregator");
|
||||||
|
|
||||||
/// Returns true if you can abort the current task.
|
/// Returns true if you can abort the current task.
|
||||||
CancellationHook isCancelled;
|
CancellationHook isCancelled;
|
||||||
|
@ -180,7 +180,7 @@ void SelectStreamFactory::createForShard(
|
|||||||
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
|
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
|
||||||
if (shard_info.hasRemoteConnections())
|
if (shard_info.hasRemoteConnections())
|
||||||
{
|
{
|
||||||
LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"), "There is no table {} on local replica of shard {}, will try remote replicas.", main_table.getNameForLogs(), shard_info.shard_num);
|
LOG_WARNING(&Poco::Logger::get("ClusterProxy::SelectStreamFactory"), "There is no table {} on local replica of shard {}, will try remote replicas.", main_table.getNameForLogs(), shard_info.shard_num);
|
||||||
emplace_remote_stream();
|
emplace_remote_stream();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -216,7 +216,7 @@ void SelectStreamFactory::createForShard(
|
|||||||
|
|
||||||
/// If we reached this point, local replica is stale.
|
/// If we reached this point, local replica is stale.
|
||||||
ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
|
ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
|
||||||
LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"), "Local replica of shard {} is stale (delay: {}s.)", shard_info.shard_num, local_delay);
|
LOG_WARNING(&Poco::Logger::get("ClusterProxy::SelectStreamFactory"), "Local replica of shard {} is stale (delay: {}s.)", shard_info.shard_num, local_delay);
|
||||||
|
|
||||||
if (!settings.fallback_to_stale_replicas_for_distributed_queries)
|
if (!settings.fallback_to_stale_replicas_for_distributed_queries)
|
||||||
{
|
{
|
||||||
@ -264,7 +264,7 @@ void SelectStreamFactory::createForShard(
|
|||||||
catch (const Exception & ex)
|
catch (const Exception & ex)
|
||||||
{
|
{
|
||||||
if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
|
if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
|
||||||
LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"), "Connections to remote replicas of local shard {} failed, will use stale local replica", shard_num);
|
LOG_WARNING(&Poco::Logger::get("ClusterProxy::SelectStreamFactory"), "Connections to remote replicas of local shard {} failed, will use stale local replica", shard_num);
|
||||||
else
|
else
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
@ -287,7 +287,7 @@ void NamedSession::release()
|
|||||||
*/
|
*/
|
||||||
struct ContextShared
|
struct ContextShared
|
||||||
{
|
{
|
||||||
Logger * log = &Logger::get("Context");
|
Poco::Logger * log = &Poco::Logger::get("Context");
|
||||||
|
|
||||||
/// For access of most of shared objects. Recursive mutex.
|
/// For access of most of shared objects. Recursive mutex.
|
||||||
mutable std::recursive_mutex mutex;
|
mutable std::recursive_mutex mutex;
|
||||||
|
@ -220,7 +220,7 @@ static bool isSupportedAlterType(int type)
|
|||||||
|
|
||||||
|
|
||||||
DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix)
|
DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix)
|
||||||
: context(context_), log(&Logger::get("DDLWorker"))
|
: context(context_), log(&Poco::Logger::get("DDLWorker"))
|
||||||
{
|
{
|
||||||
queue_dir = zk_root_dir;
|
queue_dir = zk_root_dir;
|
||||||
if (queue_dir.back() == '/')
|
if (queue_dir.back() == '/')
|
||||||
@ -1073,7 +1073,7 @@ class DDLQueryStatusInputStream : public IBlockInputStream
|
|||||||
public:
|
public:
|
||||||
|
|
||||||
DDLQueryStatusInputStream(const String & zk_node_path, const DDLLogEntry & entry, const Context & context_)
|
DDLQueryStatusInputStream(const String & zk_node_path, const DDLLogEntry & entry, const Context & context_)
|
||||||
: node_path(zk_node_path), context(context_), watch(CLOCK_MONOTONIC_COARSE), log(&Logger::get("DDLQueryStatusInputStream"))
|
: node_path(zk_node_path), context(context_), watch(CLOCK_MONOTONIC_COARSE), log(&Poco::Logger::get("DDLQueryStatusInputStream"))
|
||||||
{
|
{
|
||||||
sample = Block{
|
sample = Block{
|
||||||
{std::make_shared<DataTypeString>(), "host"},
|
{std::make_shared<DataTypeString>(), "host"},
|
||||||
@ -1235,7 +1235,7 @@ private:
|
|||||||
String node_path;
|
String node_path;
|
||||||
const Context & context;
|
const Context & context;
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
Logger * log;
|
Poco::Logger * log;
|
||||||
|
|
||||||
Block sample;
|
Block sample;
|
||||||
|
|
||||||
|
@ -101,7 +101,7 @@ private:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
Context & context;
|
Context & context;
|
||||||
Logger * log;
|
Poco::Logger * log;
|
||||||
std::unique_ptr<Context> current_context;
|
std::unique_ptr<Context> current_context;
|
||||||
|
|
||||||
std::string host_fqdn; /// current host domain name
|
std::string host_fqdn; /// current host domain name
|
||||||
|
@ -124,7 +124,7 @@ EmbeddedDictionaries::EmbeddedDictionaries(
|
|||||||
std::unique_ptr<GeoDictionariesLoader> geo_dictionaries_loader_,
|
std::unique_ptr<GeoDictionariesLoader> geo_dictionaries_loader_,
|
||||||
Context & context_,
|
Context & context_,
|
||||||
const bool throw_on_error)
|
const bool throw_on_error)
|
||||||
: log(&Logger::get("EmbeddedDictionaries"))
|
: log(&Poco::Logger::get("EmbeddedDictionaries"))
|
||||||
, context(context_)
|
, context(context_)
|
||||||
, geo_dictionaries_loader(std::move(geo_dictionaries_loader_))
|
, geo_dictionaries_loader(std::move(geo_dictionaries_loader_))
|
||||||
, reload_period(context_.getConfigRef().getInt("builtin_dictionaries_reload_interval", 3600))
|
, reload_period(context_.getConfigRef().getInt("builtin_dictionaries_reload_interval", 3600))
|
||||||
|
@ -15,7 +15,7 @@ namespace DB
|
|||||||
|
|
||||||
/// Must not acquire Context lock in constructor to avoid possibility of deadlocks.
|
/// Must not acquire Context lock in constructor to avoid possibility of deadlocks.
|
||||||
ExternalDictionariesLoader::ExternalDictionariesLoader(Context & context_)
|
ExternalDictionariesLoader::ExternalDictionariesLoader(Context & context_)
|
||||||
: ExternalLoader("external dictionary", &Logger::get("ExternalDictionariesLoader"))
|
: ExternalLoader("external dictionary", &Poco::Logger::get("ExternalDictionariesLoader"))
|
||||||
, context(context_)
|
, context(context_)
|
||||||
{
|
{
|
||||||
setConfigSettings({"dictionary", "name", "database"});
|
setConfigSettings({"dictionary", "name", "database"});
|
||||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user