Fixed failures in utils, more precise test. [#CLICKHOUSE-2910]

This commit is contained in:
Vitaliy Lyudvichenko 2018-06-20 18:21:42 +03:00
parent 310bb4116e
commit 358e4ae9bf
18 changed files with 95 additions and 56 deletions

View File

@ -24,6 +24,7 @@
#include <Parsers/IAST.h>
#include <common/ErrorHandlers.h>
#include <Common/StatusFile.h>
#include <Common/ThreadStatus.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
@ -269,6 +270,9 @@ void LocalServer::processQueries()
context->setCurrentQueryId("");
applyCmdSettings(*context);
/// Use the same query_id (and thread group) for all queries
CurrentThread::QueryScope query_scope_holder(*context);
bool echo_query = config().hasOption("echo") || config().hasOption("verbose");
std::exception_ptr exception;

View File

@ -212,10 +212,9 @@ void HTTPHandler::processQuery(
Context context = server.context();
context.setGlobalContext(server.context());
CurrentThread::initializeQuery();
/// It will forcibly detach query even if unexpected error ocurred and detachQuery() was not called
/// Normal detaching is happen in BlockIO callbacks
SCOPE_EXIT({CurrentThread::detachQueryIfNotDetached();});
CurrentThread::QueryScope query_scope_holder(context);
LOG_TRACE(log, "Request URI: " << request.getURI());

View File

@ -57,7 +57,7 @@ ThreadStatusPtr CurrentThread::get()
void CurrentThread::detachQuery()
{
getCurrentThreadImpl()->detachQuery();
getCurrentThreadImpl()->detachQuery(false);
}
void CurrentThread::detachQueryIfNotDetached()
@ -112,11 +112,6 @@ std::string CurrentThread::getCurrentQueryID()
return current_thread->getQueryID();
}
ThreadGroupStatusPtr CurrentThread::getGroup()
{
return getCurrentThreadImpl()->getThreadGroup();
}
void CurrentThread::attachQueryContext(Context & query_context)
{
return getCurrentThreadImpl()->attachQueryContext(query_context);
@ -127,4 +122,20 @@ void CurrentThread::finalizePerformanceCounters()
getCurrentThreadImpl()->finalizePerformanceCounters();
}
ThreadGroupStatusPtr CurrentThread::getGroup()
{
return getCurrentThreadImpl()->getThreadGroup();
}
CurrentThread::QueryScope::~QueryScope()
{
try
{
CurrentThread::detachQueryIfNotDetached();
}
catch (...)
{
tryLogCurrentException("CurrentThread", __PRETTY_FUNCTION__);
}
}
}

View File

@ -64,8 +64,20 @@ public:
/// Non-master threads call this method in destructor automatically
static void detachQuery();
static void detachQueryIfNotDetached();
/// Initializes query with current thread as master thread in constructor, and detaches it in desstructor
struct QueryScope
{
explicit QueryScope(Context & query_context)
{
CurrentThread::initializeQuery();
CurrentThread::attachQueryContext(query_context);
}
~QueryScope();
};
};
}

View File

@ -161,6 +161,8 @@
M(InvoluntaryContextSwitches) \
\
M(OSIOWaitMicroseconds) \
M(OSCPUWaitMicroseconds) \
M(OSCPUVirtualTimeMicroseconds) \
M(OSReadBytes) \
M(OSWriteBytes) \
M(OSReadChars) \

View File

@ -86,7 +86,7 @@ RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::C
handler_ptr->it_client->info += "; " + client.info;
return handler_ptr;
return handler_ptr;
}
if (type == Type::Write || queue.empty() || queue.back().type == Type::Write)

View File

@ -26,10 +26,12 @@ namespace ProfileEvents
extern const Event InvoluntaryContextSwitches;
extern const Event OSIOWaitMicroseconds;
extern const Event OSReadBytes;
extern const Event OSWriteBytes;
extern const Event OSCPUWaitMicroseconds;
extern const Event OSCPUVirtualTimeMicroseconds;
extern const Event OSReadChars;
extern const Event OSWriteChars;
extern const Event OSReadBytes;
extern const Event OSWriteBytes;
}
@ -99,8 +101,6 @@ struct RusageCounters
UInt64 soft_page_faults = 0;
UInt64 hard_page_faults = 0;
UInt64 voluntary_context_switches = 0;
UInt64 involuntary_context_switches = 0;
RusageCounters() = default;
RusageCounters(const ::rusage & rusage_, UInt64 real_time_)
@ -116,8 +116,6 @@ struct RusageCounters
soft_page_faults = static_cast<UInt64>(rusage.ru_minflt);
hard_page_faults = static_cast<UInt64>(rusage.ru_majflt);
voluntary_context_switches = static_cast<UInt64>(rusage.ru_nvcsw);
involuntary_context_switches = static_cast<UInt64>(rusage.ru_nivcsw);
}
static RusageCounters zeros(UInt64 real_time_ = getCurrentTimeNanoseconds())
@ -169,12 +167,21 @@ struct TasksStatsCounters
static void incrementProfileEvents(const TasksStatsCounters & prev, const TasksStatsCounters & curr, ProfileEvents::Counters & profile_events)
{
profile_events.increment(ProfileEvents::OSCPUWaitMicroseconds,
safeDiff(prev.stat.cpu_delay_total, curr.stat.cpu_delay_total) / 1000U);
profile_events.increment(ProfileEvents::OSIOWaitMicroseconds,
safeDiff(prev.stat.blkio_delay_total, curr.stat.blkio_delay_total) / 1000U);
profile_events.increment(ProfileEvents::OSReadBytes, safeDiff(prev.stat.read_bytes, curr.stat.read_bytes));
profile_events.increment(ProfileEvents::OSWriteBytes, safeDiff(prev.stat.write_bytes, curr.stat.write_bytes));
profile_events.increment(ProfileEvents::OSCPUVirtualTimeMicroseconds,
safeDiff(prev.stat.cpu_run_virtual_total, curr.stat.cpu_run_virtual_total) / 1000U);
/// Too old struct version, do not read new fields
if (curr.stat.version < TASKSTATS_VERSION)
return;
profile_events.increment(ProfileEvents::OSReadChars, safeDiff(prev.stat.read_char, curr.stat.read_char));
profile_events.increment(ProfileEvents::OSWriteChars, safeDiff(prev.stat.write_char, curr.stat.write_char));
profile_events.increment(ProfileEvents::OSReadBytes, safeDiff(prev.stat.read_bytes, curr.stat.read_bytes));
profile_events.increment(ProfileEvents::OSWriteBytes, safeDiff(prev.stat.write_bytes, curr.stat.write_bytes));
}
static void updateProfileEvents(TasksStatsCounters & last_counters, ProfileEvents::Counters & profile_events)
@ -362,7 +369,8 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
elem.read_bytes = progress_in.bytes.load(std::memory_order_relaxed);
elem.written_rows = progress_out.rows.load(std::memory_order_relaxed);
elem.written_bytes = progress_out.bytes.load(std::memory_order_relaxed);
elem.memory_usage = memory_tracker.getPeak();
elem.memory_usage = memory_tracker.get();
elem.peak_memory_usage = memory_tracker.getPeak();
elem.thread_name = getThreadName();
elem.thread_number = thread_number;

View File

@ -50,6 +50,8 @@ public:
/// Key is Poco's thread_id
using QueryThreadStatuses = std::map<UInt32, ThreadStatusPtr>;
QueryThreadStatuses thread_statuses;
/// The first thread created this thread group
ThreadStatusPtr master_thread;
String query;

View File

@ -32,7 +32,7 @@ TEST(Common, RWLockFIFO_1)
auto func = [&] (size_t threads, int round)
{
for (int i = 0; i < cycles; ++i)
for (int i = 0; i < cycles; ++i)
{
auto type = (std::uniform_int_distribution<>(0, 9)(gen) >= round) ? RWLockFIFO::Read : RWLockFIFO::Write;
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));

View File

@ -43,7 +43,10 @@ void AsynchronousBlockInputStream::next()
{
if (first)
setThreadName("AsyncBlockInput");
CurrentThread::attachToIfDetached(thread_group);
/// AsynchronousBlockInputStream is used in Client which does not create queries and thread groups
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
}
catch (...)
{

View File

@ -12,21 +12,21 @@ namespace DB
{
InternalTextLogsQueue::InternalTextLogsQueue()
: ConcurrentBoundedQueue<MutableColumns>(std::numeric_limits<int>::max()),
max_priority(Poco::Message::Priority::PRIO_INFORMATION) {}
: ConcurrentBoundedQueue<MutableColumns>(std::numeric_limits<int>::max()),
max_priority(Poco::Message::Priority::PRIO_INFORMATION) {}
Block InternalTextLogsQueue::getSampleBlock()
{
return Block {
{std::make_shared<DataTypeDateTime>(), "event_time"},
{std::make_shared<DataTypeUInt32>(), "event_time_microseconds"},
{std::make_shared<DataTypeString>(), "host_name"},
{std::make_shared<DataTypeString>(), "query_id"},
{std::make_shared<DataTypeUInt32>(), "thread_number"},
{std::make_shared<DataTypeInt8>(), "priority"},
{std::make_shared<DataTypeString>(), "source"},
{std::make_shared<DataTypeString>(), "text"},
{std::make_shared<DataTypeDateTime>(), "event_time"},
{std::make_shared<DataTypeUInt32>(), "event_time_microseconds"},
{std::make_shared<DataTypeString>(), "host_name"},
{std::make_shared<DataTypeString>(), "query_id"},
{std::make_shared<DataTypeUInt32>(), "thread_number"},
{std::make_shared<DataTypeInt8>(), "priority"},
{std::make_shared<DataTypeString>(), "source"},
{std::make_shared<DataTypeString>(), "text"}
};
}
@ -38,11 +38,11 @@ MutableColumns InternalTextLogsQueue::getSampleColumns()
void InternalTextLogsQueue::pushBlock(Block && log_block)
{
static Block sample_block = getSampleBlock();
static Block sample_block = getSampleBlock();
if (blocksHaveEqualStructure(sample_block, log_block))
emplace(log_block.mutateColumns());
else
if (blocksHaveEqualStructure(sample_block, log_block))
emplace(log_block.mutateColumns());
else
LOG_WARNING(&Poco::Logger::get("InternalTextLogsQueue"), "Log block have different structure");
}
@ -52,14 +52,14 @@ const char * InternalTextLogsQueue::getPriorityName(int priority)
static const char * PRIORITIES [] = {
"Unknown",
"Fatal",
"Critical",
"Error",
"Warning",
"Notice",
"Information",
"Debug",
"Trace"
"Fatal",
"Critical",
"Error",
"Warning",
"Notice",
"Information",
"Debug",
"Trace"
};
return (priority >= 1 && priority <= 8) ? PRIORITIES[priority] : PRIORITIES[0];

View File

@ -11,7 +11,6 @@
#include <IO/WriteHelpers.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <common/logger_useful.h>
#include <pthread.h>
#include <chrono>
@ -164,21 +163,17 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
user_process_list.user_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_user);
user_process_list.user_memory_tracker.setDescription("(for user)");
/// Query-level memory tracker is already set in the QueryStatus constructor
/// Actualize thread group info
if (auto thread_group = CurrentThread::getGroup())
{
auto thread_group = CurrentThread::getGroup();
std::unique_lock lock(thread_group->mutex);
std::unique_lock lock_thread_group(thread_group->mutex);
thread_group->performance_counters.setParent(&user_process_list.user_performance_counters);
thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker);
thread_group->query = process_it->query;
/// Set memory trackers
/// Set query-level memory trackers
thread_group->memory_tracker.setOrRaiseLimit(process_it->max_memory_usage);
thread_group->memory_tracker.setDescription("(for query)");
if (process_it->memory_tracker_fault_probability)
thread_group->memory_tracker.setFaultProbability(process_it->memory_tracker_fault_probability);

View File

@ -31,6 +31,7 @@ Block QueryThreadLogElement::createBlock()
{std::make_shared<DataTypeUInt64>(), "written_rows"},
{std::make_shared<DataTypeUInt64>(), "written_bytes"},
{std::make_shared<DataTypeInt64>(), "memory_usage"},
{std::make_shared<DataTypeInt64>(), "peak_memory_usage"},
{std::make_shared<DataTypeString>(), "thread_name"},
{std::make_shared<DataTypeUInt32>(), "thread_number"},
@ -81,6 +82,7 @@ void QueryThreadLogElement::appendToBlock(Block & block) const
columns[i++]->insert(UInt64(written_bytes));
columns[i++]->insert(Int64(memory_usage));
columns[i++]->insert(Int64(peak_memory_usage));
columns[i++]->insertData(thread_name.data(), thread_name.size());
columns[i++]->insert(UInt64(thread_number));

View File

@ -29,6 +29,7 @@ struct QueryThreadLogElement
UInt64 written_bytes{};
Int64 memory_usage{};
Int64 peak_memory_usage{};
String thread_name;
UInt32 thread_number{};

View File

@ -351,7 +351,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (process_list_elem)
{
QueryStatusInfo info = process_list_elem->getInfo(true, settings.log_profile_events);
QueryStatusInfo info = process_list_elem->getInfo(true, settings.log_profile_events, false);
elem.query_duration_ms = info.elapsed_seconds * 1000;

View File

@ -62,9 +62,7 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
/// The master thread exits immediately
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "thread_group " << thread_group.get());
CurrentThread::detachQuery();
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "thread_group " << thread_group.get());
threads.resize(size);
for (auto & thread : threads)

View File

@ -39,7 +39,7 @@ $CLICKHOUSE_CLIENT $settings -q "
WITH
any(query_duration_ms*1000) AS duration,
sumIf(PV, PN = 'RealTimeMicroseconds') AS threads_realtime,
sumIf(PV, PN IN ('UserTimeMicroseconds', 'SystemTimeMicroseconds', 'OSIOWaitMicroseconds')) AS threads_time_user_system_io
sumIf(PV, PN IN ('UserTimeMicroseconds', 'SystemTimeMicroseconds', 'OSIOWaitMicroseconds', 'OSCPUWaitMicroseconds')) AS threads_time_user_system_io
SELECT
--duration, threads_realtime, threads_time_user_system_io,
threads_realtime >= 0.99 * duration,
@ -69,7 +69,7 @@ FROM
SELECT
thread_number,
sumIf(PV, PN = 'RealTimeMicroseconds') AS thread_realtime,
sumIf(PV, PN IN ('UserTimeMicroseconds', 'SystemTimeMicroseconds', 'OSIOWaitMicroseconds')) AS thread_time_user_system_io
sumIf(PV, PN IN ('UserTimeMicroseconds', 'SystemTimeMicroseconds', 'OSIOWaitMicroseconds', 'OSCPUWaitMicroseconds')) AS thread_time_user_system_io
FROM
(SELECT * FROM system.query_thread_log PREWHERE query_id='$query_id' WHERE event_date >= today()-1)
ARRAY JOIN ProfileEvents.Names AS PN, ProfileEvents.Values AS PV

View File

@ -51,6 +51,8 @@ void OwnSplitChannel::log(const Poco::Message & msg)
logs_queue->emplace(std::move(columns));
}
/// TODO: Also log to system.internal_text_log table
}
void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel)