Merge remote-tracking branch 'upstream/master' into fix25

This commit is contained in:
proller 2019-07-23 12:16:32 +03:00
commit d80f5167cb
84 changed files with 1071 additions and 248 deletions

2
contrib/libunwind vendored

@ -1 +1 @@
Subproject commit ec86b1c6a2c6b8ba316f429db9a6d4122dd12710
Subproject commit 17a48fbfa7913ee889960a698516bd3ba51d63ee

View File

@ -231,6 +231,8 @@ target_link_libraries(clickhouse_common_io
Threads::Threads
PRIVATE
${CMAKE_DL_LIBS}
PRIVATE
rt
PUBLIC
roaring
)

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54424)
set(VERSION_REVISION 54425)
set(VERSION_MAJOR 19)
set(VERSION_MINOR 12)
set(VERSION_MINOR 13)
set(VERSION_PATCH 1)
set(VERSION_GITHASH a584f0ca6cb5df9b0d9baf1e2e1eaa7d12a20a44)
set(VERSION_DESCRIBE v19.12.1.1-prestable)
set(VERSION_STRING 19.12.1.1)
set(VERSION_GITHASH adfc36917222bdb03eba069f0cad0f4f5b8f1c94)
set(VERSION_DESCRIBE v19.13.1.1-prestable)
set(VERSION_STRING 19.13.1.1)
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -508,6 +508,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
LOG_DEBUG(log, "Loaded metadata.");
/// Init trace collector only after trace_log system table was created
global_context->initializeTraceCollector();
global_context->setCurrentDatabase(default_database);
if (has_zookeeper && config().has("distributed_ddl"))

View File

@ -294,6 +294,16 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_log>
<!-- Trace log. Stores stack traces collected by query profilers.
See query_profiler_real_time_period_ns and query_profiler_cpu_time_period_ns settings. -->
<trace_log>
<database>system</database>
<table>trace_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</trace_log>
<!-- Query thread log. Has information about all threads participated in query execution.
Used only for queries with setting log_query_threads = 1. -->
<query_thread_log>

View File

@ -3,6 +3,7 @@
#include <memory>
#include <string>
#include <common/likely.h>
#include <common/StringRef.h>
#include <Common/ThreadStatus.h>
@ -72,7 +73,12 @@ public:
static void finalizePerformanceCounters();
/// Returns a non-empty string if the thread is attached to a query
static StringRef getQueryId();
static StringRef getQueryId()
{
if (unlikely(!current_thread))
return {};
return current_thread->getQueryId();
}
/// Non-master threads call this method in destructor automatically
static void detachQuery();

View File

@ -434,6 +434,9 @@ namespace ErrorCodes
extern const int BAD_QUERY_PARAMETER = 457;
extern const int CANNOT_UNLINK = 458;
extern const int CANNOT_SET_THREAD_PRIORITY = 459;
extern const int CANNOT_CREATE_TIMER = 460;
extern const int CANNOT_SET_TIMER_PERIOD = 461;
extern const int CANNOT_DELETE_TIMER = 462;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -0,0 +1,134 @@
#include "QueryProfiler.h"
#include <common/Pipe.h>
#include <common/StackTrace.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
namespace DB
{
extern LazyPipe trace_pipe;
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * /* info */, void * context)
{
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(StackTrace) + // collected stack trace
sizeof(TimerType); // timer type
char buffer[buf_size];
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);
writeChar(false, out);
writeStringBinary(query_id, out);
writePODBinary(stack_trace, out);
writePODBinary(timer_type, out);
out.next();
}
const UInt32 TIMER_PRECISION = 1e9;
}
namespace ErrorCodes
{
extern const int CANNOT_MANIPULATE_SIGSET;
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int CANNOT_CREATE_TIMER;
extern const int CANNOT_SET_TIMER_PERIOD;
extern const int CANNOT_DELETE_TIMER;
}
template <typename ProfilerImpl>
QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal)
: log(&Logger::get("QueryProfiler"))
, pause_signal(pause_signal)
{
struct sigaction sa{};
sa.sa_sigaction = ProfilerImpl::signalHandler;
sa.sa_flags = SA_SIGINFO | SA_RESTART;
if (sigemptyset(&sa.sa_mask))
throwFromErrno("Failed to clean signal mask for query profiler", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaddset(&sa.sa_mask, pause_signal))
throwFromErrno("Failed to add signal to mask for query profiler", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaction(pause_signal, &sa, previous_handler))
throwFromErrno("Failed to setup signal handler for query profiler", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
try
{
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD_ID;
sev.sigev_signo = pause_signal;
sev._sigev_un._tid = thread_id;
if (timer_create(clock_type, &sev, &timer_id))
throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER);
struct timespec interval{.tv_sec = period / TIMER_PRECISION, .tv_nsec = period % TIMER_PRECISION};
struct itimerspec timer_spec = {.it_interval = interval, .it_value = interval};
if (timer_settime(timer_id, 0, &timer_spec, nullptr))
throwFromErrno("Failed to set thread timer period", ErrorCodes::CANNOT_SET_TIMER_PERIOD);
}
catch (...)
{
tryCleanup();
throw;
}
}
template <typename ProfilerImpl>
QueryProfilerBase<ProfilerImpl>::~QueryProfilerBase()
{
tryCleanup();
}
template <typename ProfilerImpl>
void QueryProfilerBase<ProfilerImpl>::tryCleanup()
{
if (timer_id != nullptr && timer_delete(timer_id))
LOG_ERROR(log, "Failed to delete query profiler timer " + errnoToString(ErrorCodes::CANNOT_DELETE_TIMER));
if (previous_handler != nullptr && sigaction(pause_signal, previous_handler, nullptr))
LOG_ERROR(log, "Failed to restore signal handler after query profiler " + errnoToString(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER));
}
template class QueryProfilerBase<QueryProfilerReal>;
template class QueryProfilerBase<QueryProfilerCpu>;
QueryProfilerReal::QueryProfilerReal(const Int32 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_REALTIME, period, SIGUSR1)
{}
void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Real, sig, info, context);
}
QueryProfilerCpu::QueryProfilerCpu(const Int32 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_THREAD_CPUTIME_ID, period, SIGUSR2)
{}
void QueryProfilerCpu::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Cpu, sig, info, context);
}
}

View File

@ -0,0 +1,74 @@
#pragma once
#include <Core/Types.h>
#include <signal.h>
#include <time.h>
namespace Poco
{
class Logger;
}
namespace DB
{
enum class TimerType : UInt8
{
Real,
Cpu,
};
/**
* Query profiler implementation for selected thread.
*
* This class installs timer and signal handler on creation to:
* 1. periodically pause given thread
* 2. collect thread's current stack trace
* 3. write collected stack trace to trace_pipe for TraceCollector
*
* Desctructor tries to unset timer and restore previous signal handler.
* Note that signal handler implementation is defined by template parameter. See QueryProfilerReal and QueryProfilerCpu.
*/
template <typename ProfilerImpl>
class QueryProfilerBase
{
public:
QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal = SIGALRM);
~QueryProfilerBase();
private:
void tryCleanup();
Poco::Logger * log;
/// Timer id from timer_create(2)
timer_t timer_id = nullptr;
/// Pause signal to interrupt threads to get traces
int pause_signal;
/// Previous signal handler to restore after query profiler exits
struct sigaction * previous_handler = nullptr;
};
/// Query profiler with timer based on real clock
class QueryProfilerReal : public QueryProfilerBase<QueryProfilerReal>
{
public:
QueryProfilerReal(const Int32 thread_id, const UInt32 period);
static void signalHandler(int sig, siginfo_t * info, void * context);
};
/// Query profiler with timer based on CPU clock
class QueryProfilerCpu : public QueryProfilerBase<QueryProfilerCpu>
{
public:
QueryProfilerCpu(const Int32 thread_id, const UInt32 period);
static void signalHandler(int sig, siginfo_t * info, void * context);
};
}

View File

@ -8,52 +8,10 @@
#include <IO/WriteHelpers.h>
#include <port/unistd.h>
#include <csignal>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PIPE;
extern const int CANNOT_DLSYM;
extern const int CANNOT_FORK;
extern const int CANNOT_WAITPID;
extern const int CHILD_WAS_NOT_EXITED_NORMALLY;
extern const int CANNOT_CREATE_CHILD_PROCESS;
}
}
#include <common/Pipe.h>
namespace
{
struct Pipe
{
int fds_rw[2];
Pipe()
{
#ifndef __APPLE__
if (0 != pipe2(fds_rw, O_CLOEXEC))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
#else
if (0 != pipe(fds_rw))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
#endif
}
~Pipe()
{
if (fds_rw[0] >= 0)
close(fds_rw[0]);
if (fds_rw[1] >= 0)
close(fds_rw[1]);
}
};
/// By these return codes from the child process, we learn (for sure) about errors when creating it.
enum class ReturnCodes : int
{
@ -64,10 +22,18 @@ namespace
};
}
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_DLSYM;
extern const int CANNOT_FORK;
extern const int CANNOT_WAITPID;
extern const int CHILD_WAS_NOT_EXITED_NORMALLY;
extern const int CANNOT_CREATE_CHILD_PROCESS;
}
ShellCommand::ShellCommand(pid_t pid, int in_fd, int out_fd, int err_fd, bool terminate_in_destructor_)
: pid(pid)
, terminate_in_destructor(terminate_in_destructor_)

View File

@ -4,6 +4,7 @@
#include <Common/Exception.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/TaskStatsInfoGetter.h>
#include <Common/QueryProfiler.h>
#include <Common/ThreadStatus.h>
#include <Poco/Logger.h>

View File

@ -28,6 +28,8 @@ namespace DB
class Context;
class QueryStatus;
class ThreadStatus;
class QueryProfilerReal;
class QueryProfilerCpu;
class QueryThreadLog;
struct TasksStatsCounters;
struct RUsageCounters;
@ -123,7 +125,10 @@ public:
return thread_state.load(std::memory_order_relaxed);
}
StringRef getQueryId() const;
StringRef getQueryId() const
{
return query_id;
}
/// Starts new query and create new thread group for it, current thread becomes master thread of the query
void initializeQuery();
@ -155,6 +160,10 @@ public:
protected:
void initPerformanceCounters();
void initQueryProfiler();
void finalizeQueryProfiler();
void logToQueryThreadLog(QueryThreadLog & thread_log);
void assertState(const std::initializer_list<int> & permitted_states, const char * description = nullptr);
@ -178,6 +187,10 @@ protected:
time_t query_start_time = 0;
size_t queries_started = 0;
// CPU and Real time query profilers
std::unique_ptr<QueryProfilerReal> query_profiler_real;
std::unique_ptr<QueryProfilerCpu> query_profiler_cpu;
Poco::Logger * log = nullptr;
friend class CurrentThread;

View File

@ -6,6 +6,7 @@
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <common/sleep.h>
#include <IO/WriteHelpers.h>
#include <port/clock.h>
@ -76,12 +77,7 @@ public:
if (desired_ns > elapsed_ns)
{
UInt64 sleep_ns = desired_ns - elapsed_ns;
::timespec sleep_ts;
sleep_ts.tv_sec = sleep_ns / 1000000000;
sleep_ts.tv_nsec = sleep_ns % 1000000000;
/// NOTE: Returns early in case of a signal. This is considered normal.
::nanosleep(&sleep_ts, nullptr);
sleepForNanoseconds(sleep_ns);
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_ns / 1000UL);
}

View File

@ -0,0 +1,100 @@
#include "TraceCollector.h"
#include <Core/Field.h>
#include <Poco/Logger.h>
#include <common/Pipe.h>
#include <common/StackTrace.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Common/Exception.h>
#include <Interpreters/TraceLog.h>
namespace DB
{
LazyPipe trace_pipe;
namespace ErrorCodes
{
extern const int NULL_POINTER_DEREFERENCE;
extern const int THREAD_IS_NOT_JOINABLE;
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log)
: log(&Poco::Logger::get("TraceCollector"))
, trace_log(trace_log)
{
if (trace_log == nullptr)
throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE);
trace_pipe.open();
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
TraceCollector::~TraceCollector()
{
if (!thread.joinable())
LOG_ERROR(log, "TraceCollector thread is malformed and cannot be joined");
else
{
TraceCollector::notifyToStop();
thread.join();
}
trace_pipe.close();
}
/**
* Sends TraceCollector stop message
*
* Each sequence of data for TraceCollector thread starts with a boolean flag.
* If this flag is true, TraceCollector must stop reading trace_pipe and exit.
* This function sends flag with a true value to stop TraceCollector gracefully.
*
* NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe
* before stop message.
*/
void TraceCollector::notifyToStop()
{
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]);
writeChar(true, out);
out.next();
}
void TraceCollector::run()
{
ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]);
while (true)
{
char is_last;
readChar(is_last, in);
if (is_last)
break;
std::string query_id;
StackTrace stack_trace(NoCapture{});
TimerType timer_type;
readStringBinary(query_id, in);
readPODBinary(stack_trace, in);
readPODBinary(timer_type, in);
const auto size = stack_trace.getSize();
const auto & frames = stack_trace.getFrames();
Array trace;
trace.reserve(size);
for (size_t i = 0; i < size; i++)
trace.emplace_back(UInt64(reinterpret_cast<uintptr_t>(frames[i])));
TraceLogElement element{std::time(nullptr), timer_type, query_id, trace};
trace_log->add(element);
}
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Common/ThreadPool.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class TraceLog;
class TraceCollector
{
private:
Poco::Logger * log;
std::shared_ptr<TraceLog> trace_log;
ThreadFromGlobalPool thread;
void run();
static void notifyToStop();
public:
TraceCollector(std::shared_ptr<TraceLog> & trace_log);
~TraceCollector();
};
}

View File

@ -221,6 +221,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \
M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.") \
M(SettingUInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.") \
M(SettingUInt64, query_profiler_real_time_period_ns, 0, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off real clock query profiler") \
M(SettingUInt64, query_profiler_cpu_time_period_ns, 0, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off CPU clock query profiler") \
\
\
/** Limits during query execution are part of the settings. \

View File

@ -3,7 +3,7 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/Quota.h>
#include <Common/CurrentThread.h>
#include <common/sleep.h>
namespace ProfileEvents
{
@ -255,13 +255,7 @@ static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_i
if (desired_microseconds > total_elapsed_microseconds)
{
UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds;
::timespec sleep_ts;
sleep_ts.tv_sec = sleep_microseconds / 1000000;
sleep_ts.tv_nsec = sleep_microseconds % 1000000 * 1000;
/// NOTE: Returns early in case of a signal. This is considered normal.
/// NOTE: It's worth noting that this behavior affects kill of queries.
::nanosleep(&sleep_ts, nullptr);
sleepForMicroseconds(sleep_microseconds);
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds);
}

View File

@ -9,6 +9,7 @@
#include <Storages/IStorage.h>
#include <Storages/StorageFactory.h>
#include <Common/typeid_cast.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <sstream>
@ -68,6 +69,13 @@ std::pair<String, StoragePtr> createTableFromDefinition(
ast_create_query.attach = true;
ast_create_query.database = database_name;
if (ast_create_query.as_table_function)
{
const auto & table_function = ast_create_query.as_table_function->as<ASTFunction &>();
const auto & factory = TableFunctionFactory::instance();
StoragePtr storage = factory.get(table_function.name, context)->execute(ast_create_query.as_table_function, context, ast_create_query.table);
return {ast_create_query.table, storage};
}
/// We do not directly use `InterpreterCreateQuery::execute`, because
/// - the database has not been created yet;
/// - the code is simpler, since the query is already brought to a suitable form.

View File

@ -0,0 +1,12 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsIntrospection.h>
namespace DB
{
void registerFunctionsIntrospection(FunctionFactory & factory)
{
factory.registerFunction<FunctionSymbolizeTrace>();
}
}

View File

@ -0,0 +1,107 @@
#pragma once
#include <common/StackTrace.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
class FunctionSymbolizeTrace : public IFunction
{
public:
static constexpr auto name = "symbolizeTrace";
static FunctionPtr create(const Context &)
{
return std::make_shared<FunctionSymbolizeTrace>();
}
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override
{
return 1;
}
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 1)
throw Exception("Function " + getName() + " needs exactly one argument; passed "
+ toString(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto array_type = checkAndGetDataType<DataTypeArray>(arguments[0].type.get());
if (!array_type)
throw Exception("The only argument for function " + getName() + " must be array. Found "
+ arguments[0].type->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
DataTypePtr nested_type = array_type->getNestedType();
if (!WhichDataType(nested_type).isUInt64())
throw Exception("The only argument for function " + getName() + " must be array of UInt64. Found "
+ arguments[0].type->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeString>();
}
bool useDefaultImplementationForConstants() const override
{
return true;
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
{
const ColumnPtr column = block.getByPosition(arguments[0]).column;
const ColumnArray * column_array = checkAndGetColumn<ColumnArray>(column.get());
if (!column_array)
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
const ColumnPtr data_ptr = column_array->getDataPtr();
const ColumnVector<UInt64> * data_vector = checkAndGetColumn<ColumnVector<UInt64>>(&*data_ptr);
const typename ColumnVector<UInt64>::Container & data = data_vector->getData();
const ColumnArray::Offsets & offsets = column_array->getOffsets();
auto result_column = ColumnString::create();
StackTrace::Frames frames;
size_t current_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
size_t current_size = 0;
for (; current_size < frames.size() && current_offset + current_size < offsets[i]; ++current_size)
{
frames[current_size] = reinterpret_cast<void *>(data[current_offset + current_size]);
}
std::string backtrace = StackTrace(frames.begin(), frames.begin() + current_size).toString();
result_column->insertDataWithTerminatingZero(backtrace.c_str(), backtrace.length() + 1);
current_offset = offsets[i];
}
block.getByPosition(result).column = std::move(result_column);
}
};
}

View File

@ -36,6 +36,7 @@ void registerFunctionsURL(FunctionFactory &);
void registerFunctionsVisitParam(FunctionFactory &);
void registerFunctionsMath(FunctionFactory &);
void registerFunctionsGeo(FunctionFactory &);
void registerFunctionsIntrospection(FunctionFactory &);
void registerFunctionsNull(FunctionFactory &);
void registerFunctionsFindCluster(FunctionFactory &);
void registerFunctionsJSON(FunctionFactory &);
@ -74,6 +75,7 @@ void registerFunctions()
registerFunctionsVisitParam(factory);
registerFunctionsMath(factory);
registerFunctionsGeo(factory);
registerFunctionsIntrospection(factory);
registerFunctionsNull(factory);
registerFunctionsFindCluster(factory);
registerFunctionsJSON(factory);

View File

@ -4,6 +4,7 @@
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypesNumber.h>
#include <Common/FieldVisitors.h>
#include <common/sleep.h>
#include <IO/WriteHelpers.h>
@ -86,8 +87,8 @@ public:
if (seconds > 3.0) /// The choice is arbitrary
throw Exception("The maximum sleep time is 3 seconds. Requested: " + toString(seconds), ErrorCodes::TOO_SLOW);
UInt64 useconds = seconds * (variant == FunctionSleepVariant::PerBlock ? 1 : size) * 1e6;
::usleep(useconds);
UInt64 microseconds = seconds * (variant == FunctionSleepVariant::PerBlock ? 1 : size) * 1e6;
sleepForMicroseconds(microseconds);
}
/// convertToFullColumn needed, because otherwise (constant expression case) function will not get called on each block.

View File

@ -99,7 +99,8 @@ void SelectStreamFactory::createForShard(
if (table_func_ptr)
{
const auto * table_function = table_func_ptr->as<ASTFunction>();
main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context);
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
}
else
main_table_storage = context.tryGetTable(main_table.database, main_table.table);

View File

@ -41,6 +41,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Common/DNSResolver.h>
@ -53,6 +54,7 @@
#include <Common/Config/ConfigProcessor.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ShellCommand.h>
#include <Common/TraceCollector.h>
#include <common/logger_useful.h>
@ -153,6 +155,8 @@ struct ContextShared
ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers
std::optional<SystemLogs> system_logs; /// Used to log queries and operations on parts
std::unique_ptr<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
class SessionKeyHash
@ -285,6 +289,22 @@ struct ContextShared
background_pool.reset();
schedule_pool.reset();
ddl_worker.reset();
/// Stop trace collector if any
trace_collector.reset();
}
bool hasTraceCollector()
{
return trace_collector != nullptr;
}
void initializeTraceCollector(std::shared_ptr<TraceLog> trace_log)
{
if (trace_log == nullptr)
return;
trace_collector = std::make_unique<TraceCollector>(trace_log);
}
private:
@ -497,7 +517,6 @@ DatabasePtr Context::tryGetDatabase(const String & database_name)
return it->second;
}
String Context::getPath() const
{
auto lock = getLock();
@ -963,7 +982,7 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression)
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression->as<ASTFunction>()->name, *this);
/// Run it and remember the result
res = table_function_ptr->execute(table_expression, *this);
res = table_function_ptr->execute(table_expression, *this, table_function_ptr->getName());
}
return res;
@ -1623,6 +1642,16 @@ void Context::initializeSystemLogs()
shared->system_logs.emplace(*global_context, getConfigRef());
}
bool Context::hasTraceCollector()
{
return shared->hasTraceCollector();
}
void Context::initializeTraceCollector()
{
shared->initializeTraceCollector(getTraceLog());
}
std::shared_ptr<QueryLog> Context::getQueryLog()
{
@ -1663,6 +1692,16 @@ std::shared_ptr<PartLog> Context::getPartLog(const String & part_database)
return shared->system_logs->part_log;
}
std::shared_ptr<TraceLog> Context::getTraceLog()
{
auto lock = getLock();
if (!shared->system_logs || !shared->system_logs->trace_log)
return nullptr;
return shared->system_logs->trace_log;
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{

View File

@ -62,6 +62,7 @@ class Clusters;
class QueryLog;
class QueryThreadLog;
class PartLog;
class TraceLog;
struct MergeTreeSettings;
class IDatabase;
class DDLGuard;
@ -420,10 +421,15 @@ public:
/// Call after initialization before using system logs. Call for global context.
void initializeSystemLogs();
void initializeTraceCollector();
bool hasTraceCollector();
/// Nullptr if the query log is not ready for this moment.
std::shared_ptr<QueryLog> getQueryLog();
std::shared_ptr<QueryThreadLog> getQueryThreadLog();
std::shared_ptr<TraceLog> getTraceLog();
/// Returns an object used to log opertaions with parts if it possible.
/// Provide table name to make required cheks.
std::shared_ptr<PartLog> getPartLog(const String & part_database);

View File

@ -22,6 +22,7 @@
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/randomSeed.h>
#include <common/sleep.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
@ -953,7 +954,7 @@ void DDLWorker::runMainThread()
tryLogCurrentException(__PRETTY_FUNCTION__);
/// Avoid busy loop when ZooKeeper is not available.
::sleep(1);
sleepForSeconds(1);
}
}
catch (...)

View File

@ -46,6 +46,8 @@
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
{
@ -518,34 +520,45 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
StoragePtr as_storage;
TableStructureReadLockHolder as_storage_lock;
if (!as_table_name.empty())
{
as_storage = context.getTable(as_database_name, as_table_name);
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
}
/// Set and retrieve list of columns.
ColumnsDescription columns = setColumns(create, as_select_sample, as_storage);
ColumnsDescription columns;
StoragePtr res;
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types)
if (create.as_table_function)
{
for (const auto & name_and_type_pair : columns.getAllPhysical())
const auto & table_function = create.as_table_function->as<ASTFunction &>();
const auto & factory = TableFunctionFactory::instance();
res = factory.get(table_function.name, context)->execute(create.as_table_function, context, create.table);
}
else
{
/// Set and retrieve list of columns.
columns = setColumns(create, as_select_sample, as_storage);
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types)
{
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
for (const auto & name_and_type_pair : columns.getAllPhysical())
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default due to expected negative impact on performance. It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default due to expected negative impact on performance. It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
}
}
}
/// Set the table engine if it was not specified explicitly.
setEngine(create);
}
/// Set the table engine if it was not specified explicitly.
setEngine(create);
StoragePtr res;
{
std::unique_ptr<DDLGuard> guard;
@ -585,15 +598,18 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return {};
res = StorageFactory::instance().get(create,
data_path,
table_name,
database_name,
context,
context.getGlobalContext(),
columns,
create.attach,
false);
if (!create.as_table_function)
{
res = StorageFactory::instance().get(create,
data_path,
table_name,
database_name,
context,
context.getGlobalContext(),
columns,
create.attach,
false);
}
if (create.temporary)
context.getSessionContext().addExternalTable(table_name, res, query_ptr);

View File

@ -79,7 +79,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
const auto & table_function = table_expression.table_function->as<ASTFunction &>();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function.name, context);
/// Run the table function and remember the result
table = table_function_ptr->execute(table_expression.table_function, context);
table = table_function_ptr->execute(table_expression.table_function, context, table_function_ptr->getName());
}
else
{

View File

@ -48,7 +48,8 @@ StoragePtr InterpreterInsertQuery::getTable(const ASTInsertQuery & query)
{
const auto * table_function = query.table_function->as<ASTFunction>();
const auto & factory = TableFunctionFactory::instance();
return factory.get(table_function->name, context)->execute(query.table_function, context);
TableFunctionPtr table_function_ptr = factory.get(table_function->name, context);
return table_function_ptr->execute(query.table_function, context, table_function_ptr->getName());
}
/// Into what table to write.

View File

@ -14,6 +14,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/TraceLog.h>
#include <Databases/IDatabase.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
@ -221,7 +222,8 @@ BlockIO InterpreterSystemQuery::execute()
executeCommandsAndThrowIfError(
[&] () { if (auto query_log = context.getQueryLog()) query_log->flush(); },
[&] () { if (auto part_log = context.getPartLog("")) part_log->flush(); },
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); }
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); },
[&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(); }
);
break;
case Type::STOP_LISTEN_QUERIES:

View File

@ -2,6 +2,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/TraceLog.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -44,6 +45,7 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
query_log = createSystemLog<QueryLog>(global_context, "system", "query_log", config, "query_log");
query_thread_log = createSystemLog<QueryThreadLog>(global_context, "system", "query_thread_log", config, "query_thread_log");
part_log = createSystemLog<PartLog>(global_context, "system", "part_log", config, "part_log");
trace_log = createSystemLog<TraceLog>(global_context, "system", "trace_log", config, "trace_log");
part_log_database = config.getString("part_log.database", "system");
}
@ -63,6 +65,8 @@ void SystemLogs::shutdown()
query_thread_log->shutdown();
if (part_log)
part_log->shutdown();
if (trace_log)
trace_log->shutdown();
}
}

View File

@ -59,6 +59,7 @@ class Context;
class QueryLog;
class QueryThreadLog;
class PartLog;
class TraceLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
@ -73,6 +74,7 @@ struct SystemLogs
std::shared_ptr<QueryLog> query_log; /// Used to log queries.
std::shared_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads.
std::shared_ptr<PartLog> part_log; /// Used to log operations with parts
std::shared_ptr<TraceLog> trace_log; /// Used to log traces from query profiler
String part_log_database;
};

View File

@ -2,6 +2,7 @@
#include <Common/CurrentThread.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/Exception.h>
#include <Common/QueryProfiler.h>
#include <Interpreters/Context.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/ProcessList.h>
@ -40,11 +41,8 @@ void ThreadStatus::attachQueryContext(Context & query_context_)
if (!thread_group->global_context)
thread_group->global_context = global_context;
}
}
StringRef ThreadStatus::getQueryId() const
{
return query_id;
initQueryProfiler();
}
void CurrentThread::defaultThreadDeleter()
@ -126,6 +124,7 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
#endif
initPerformanceCounters();
thread_state = ThreadState::AttachedToQuery;
}
@ -153,6 +152,33 @@ void ThreadStatus::finalizePerformanceCounters()
}
}
void ThreadStatus::initQueryProfiler()
{
/// query profilers are useless without trace collector
if (!global_context->hasTraceCollector())
return;
const auto & settings = query_context->getSettingsRef();
if (settings.query_profiler_real_time_period_ns > 0)
query_profiler_real = std::make_unique<QueryProfilerReal>(
/* thread_id */ os_thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_real_time_period_ns)
);
if (settings.query_profiler_cpu_time_period_ns > 0)
query_profiler_cpu = std::make_unique<QueryProfilerCpu>(
/* thread_id */ os_thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns)
);
}
void ThreadStatus::finalizeQueryProfiler()
{
query_profiler_real.reset();
query_profiler_cpu.reset();
}
void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
{
if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery)
@ -162,6 +188,8 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
}
assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__);
finalizeQueryProfiler();
finalizePerformanceCounters();
/// Detach from thread group
@ -260,13 +288,6 @@ void CurrentThread::attachToIfDetached(const ThreadGroupStatusPtr & thread_group
current_thread->deleter = CurrentThread::defaultThreadDeleter;
}
StringRef CurrentThread::getQueryId()
{
if (unlikely(!current_thread))
return {};
return current_thread->getQueryId();
}
void CurrentThread::attachQueryContext(Context & query_context)
{
if (unlikely(!current_thread))

View File

@ -0,0 +1,42 @@
#include <Interpreters/TraceLog.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
using namespace DB;
using TimerDataType = TraceLogElement::TimerDataType;
const TimerDataType::Values TraceLogElement::timer_values = {
{"Real", static_cast<UInt8>(TimerType::Real)},
{"CPU", static_cast<UInt8>(TimerType::Cpu)}
};
Block TraceLogElement::createBlock()
{
return
{
{std::make_shared<DataTypeDate>(), "event_date"},
{std::make_shared<DataTypeDateTime>(), "event_time"},
{std::make_shared<TimerDataType>(timer_values), "timer_type"},
{std::make_shared<DataTypeString>(), "query_id"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()), "trace"}
};
}
void TraceLogElement::appendToBlock(Block &block) const
{
MutableColumns columns = block.mutateColumns();
size_t i = 0;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time));
columns[i++]->insert(event_time);
columns[i++]->insert(static_cast<UInt8>(timer_type));
columns[i++]->insertData(query_id.data(), query_id.size());
columns[i++]->insert(trace);
block.setColumns(std::move(columns));
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Core/Field.h>
#include <Common/QueryProfiler.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <Interpreters/SystemLog.h>
namespace DB
{
struct TraceLogElement
{
using TimerDataType = DataTypeEnum8;
static const TimerDataType::Values timer_values;
time_t event_time{};
TimerType timer_type;
String query_id{};
Array trace{};
static std::string name() { return "TraceLog"; }
static Block createBlock();
void appendToBlock(Block & block) const;
};
class TraceLog : public SystemLog<TraceLogElement>
{
using SystemLog<TraceLogElement>::SystemLog;
};
}

View File

@ -216,7 +216,11 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
formatOnCluster(settings);
}
if (as_table_function)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS " << (settings.hilite ? hilite_none : "");
as_table_function->formatImpl(settings, state, frame);
}
if (!to_table.empty())
{
settings.ostr

View File

@ -63,6 +63,7 @@ public:
ASTStorage * storage = nullptr;
String as_database;
String as_table;
ASTPtr as_table_function;
ASTSelectWithUnionQuery * select = nullptr;
/** Get the text that identifies this element. */

View File

@ -319,6 +319,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserIdentifier name_p;
ParserColumnsOrIndicesDeclarationList columns_or_indices_p;
ParserSelectWithUnionQuery select_p;
ParserFunction table_function_p;
ASTPtr database;
ASTPtr table;
@ -328,6 +329,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr storage;
ASTPtr as_database;
ASTPtr as_table;
ASTPtr as_table_function;
ASTPtr select;
String cluster_str;
bool attach = false;
@ -407,22 +409,25 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!s_as.ignore(pos, expected))
return false;
if (!select_p.parse(pos, select, expected)) /// AS SELECT ...
if (!table_function_p.parse(pos, as_table_function, expected))
{
/// AS [db.]table
if (!name_p.parse(pos, as_table, expected))
return false;
if (s_dot.ignore(pos, expected))
if (!select_p.parse(pos, select, expected)) /// AS SELECT ...
{
as_database = as_table;
/// AS [db.]table
if (!name_p.parse(pos, as_table, expected))
return false;
}
/// Optional - ENGINE can be specified.
if (!storage)
storage_p.parse(pos, storage, expected);
if (s_dot.ignore(pos, expected))
{
as_database = as_table;
if (!name_p.parse(pos, as_table, expected))
return false;
}
/// Optional - ENGINE can be specified.
if (!storage)
storage_p.parse(pos, storage, expected);
}
}
}
}
@ -526,6 +531,9 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
auto query = std::make_shared<ASTCreateQuery>();
node = query;
if (as_table_function)
query->as_table_function = as_table_function;
query->attach = attach;
query->if_not_exists = if_not_exists;
query->is_view = is_view;

View File

@ -55,6 +55,22 @@ void ReadBufferFromKafkaConsumer::commit()
void ReadBufferFromKafkaConsumer::subscribe(const Names & topics)
{
{
String message = "Subscribed to topics:";
for (const auto & topic : consumer->get_subscription())
message += " " + topic;
LOG_TRACE(log, message);
}
{
String message = "Assigned to topics:";
for (const auto & toppar : consumer->get_assignment())
message += " " + toppar.get_topic();
LOG_TRACE(log, message);
}
consumer->resume();
// While we wait for an assignment after subscribtion, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (consumer->get_subscription().empty())

View File

@ -24,7 +24,7 @@ public:
void subscribe(const Names & topics); // Subscribe internal consumer to topics.
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
auto pollTimeout() { return poll_timeout; }
auto pollTimeout() const { return poll_timeout; }
// Return values for the message that's being read.
String currentTopic() const { return current[-1].get_topic(); }

View File

@ -40,7 +40,8 @@ ColumnsDescription getStructureOfRemoteTable(
if (shard_info.isLocal())
{
const auto * table_function = table_func_ptr->as<ASTFunction>();
return TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context)->getColumns();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
return table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName())->getColumns();
}
auto table_func_name = queryToString(table_func_ptr);

View File

@ -10,10 +10,10 @@ namespace ProfileEvents
namespace DB
{
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context) const
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
ProfileEvents::increment(ProfileEvents::TableFunctionExecute);
return executeImpl(ast_function, context);
return executeImpl(ast_function, context, table_name);
}
}

View File

@ -32,12 +32,12 @@ public:
virtual std::string getName() const = 0;
/// Create storage according to the query.
StoragePtr execute(const ASTPtr & ast_function, const Context & context) const;
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const;
virtual ~ITableFunction() {}
private:
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const = 0;
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const = 0;
};
using TableFunctionPtr = std::shared_ptr<ITableFunction>;

View File

@ -19,7 +19,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
// Parse args
ASTs & args_func = ast_function->children;
@ -60,7 +60,7 @@ StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, cons
}
// Create table
StoragePtr storage = getStorage(filename, format, sample_block, const_cast<Context &>(context));
StoragePtr storage = getStorage(filename, format, sample_block, const_cast<Context &>(context), table_name);
storage->startup();

View File

@ -12,8 +12,8 @@ namespace DB
class ITableFunctionFileLike : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
virtual StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const = 0;
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const = 0;
};
}

View File

@ -27,7 +27,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
const auto & args_func = ast_function->as<ASTFunction &>();
@ -45,18 +45,18 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
std::string connection_string;
std::string schema_name;
std::string table_name;
std::string remote_table_name;
if (args.size() == 3)
{
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
schema_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
else if (args.size() == 2)
{
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
table_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
remote_table_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
/* Infer external table structure */
@ -68,7 +68,7 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
columns_info_uri.addQueryParameter("connection_string", connection_string);
if (!schema_name.empty())
columns_info_uri.addQueryParameter("schema", schema_name);
columns_info_uri.addQueryParameter("table", table_name);
columns_info_uri.addQueryParameter("table", remote_table_name);
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, nullptr);
@ -76,7 +76,7 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
readStringBinary(columns_info, buf);
NamesAndTypesList columns = NamesAndTypesList::parse(columns_info);
auto result = std::make_shared<StorageXDBC>(getDatabaseName(), table_name, schema_name, table_name, ColumnsDescription{columns}, context, helper);
auto result = std::make_shared<StorageXDBC>(getDatabaseName(), table_name, schema_name, remote_table_name, ColumnsDescription{columns}, context, helper);
if (!result)
throw Exception("Failed to instantiate storage from table function " + getName(), ErrorCodes::UNKNOWN_EXCEPTION);

View File

@ -15,7 +15,7 @@ namespace DB
class ITableFunctionXDBC : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
/* A factory method to create bridge helper, that will assist in remote interaction */
virtual BridgeHelperPtr createBridgeHelper(Context & context,

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
}
StoragePtr TableFunctionCatBoostPool::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr TableFunctionCatBoostPool::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
ASTs & args_func = ast_function->children;
@ -45,7 +45,7 @@ StoragePtr TableFunctionCatBoostPool::executeImpl(const ASTPtr & ast_function, c
String column_descriptions_file = getStringLiteral(*args[0], "Column descriptions file");
String dataset_description_file = getStringLiteral(*args[1], "Dataset description file");
return StorageCatBoostPool::create(getDatabaseName(), getName(), context, column_descriptions_file, dataset_description_file);
return StorageCatBoostPool::create(getDatabaseName(), table_name, context, column_descriptions_file, dataset_description_file);
}
void registerTableFunctionCatBoostPool(TableFunctionFactory & factory)

View File

@ -15,7 +15,7 @@ public:
static constexpr auto name = "catBoostPool";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
};
}

View File

@ -5,13 +5,13 @@
namespace DB
{
StoragePtr TableFunctionFile::getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const
{
return StorageFile::create(source,
-1,
global_context.getUserFilesPath(),
getDatabaseName(),
getName(),
table_name,
format,
ColumnsDescription{sample_block.getNamesAndTypesList()},
global_context);

View File

@ -24,6 +24,6 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const override;
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const override;
};
}

View File

@ -8,11 +8,11 @@
namespace DB
{
StoragePtr TableFunctionHDFS::getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const
{
return StorageHDFS::create(source,
getDatabaseName(),
getName(),
table_name,
format,
ColumnsDescription{sample_block.getNamesAndTypesList()},
global_context);

View File

@ -25,7 +25,7 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const override;
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const override;
};
}

View File

@ -47,7 +47,7 @@ static NamesAndTypesList chooseColumns(const String & source_database, const Str
}
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
ASTs & args_func = ast_function->children;
@ -71,7 +71,7 @@ StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Co
auto res = StorageMerge::create(
getDatabaseName(),
getName(),
table_name,
ColumnsDescription{chooseColumns(source_database, table_name_regexp, context)},
source_database,
table_name_regexp,

View File

@ -16,7 +16,7 @@ public:
static constexpr auto name = "merge";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
};

View File

@ -37,7 +37,7 @@ namespace ErrorCodes
}
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
const auto & args_func = ast_function->as<ASTFunction &>();
@ -54,8 +54,8 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context);
std::string host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>();
std::string database_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
std::string table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
std::string remote_database_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
std::string remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
std::string user_name = args[3]->as<ASTLiteral &>().value.safeGet<String>();
std::string password = args[4]->as<ASTLiteral &>().value.safeGet<String>();
@ -74,7 +74,7 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
/// 3306 is the default MySQL port number
auto parsed_host_port = parseAddress(host_port, 3306);
mysqlxx::Pool pool(database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
mysqlxx::Pool pool(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
/// Determine table definition by running a query to INFORMATION_SCHEMA.
@ -95,8 +95,8 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
" COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
" CHARACTER_MAXIMUM_LENGTH AS length"
" FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = " << quote << database_name
<< " AND TABLE_NAME = " << quote << table_name
" WHERE TABLE_SCHEMA = " << quote << remote_database_name
<< " AND TABLE_NAME = " << quote << remote_table_name
<< " ORDER BY ORDINAL_POSITION";
NamesAndTypesList columns;
@ -116,14 +116,14 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
}
if (columns.empty())
throw Exception("MySQL table " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception("MySQL table " + backQuoteIfNeed(remote_database_name) + "." + backQuoteIfNeed(remote_table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
auto res = StorageMySQL::create(
getDatabaseName(),
table_name,
std::move(pool),
database_name,
table_name,
remote_database_name,
remote_table_name,
replace_query,
on_duplicate_clause,
ColumnsDescription{columns},

View File

@ -19,7 +19,7 @@ public:
return name;
}
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
};
}

View File

@ -17,7 +17,7 @@ namespace ErrorCodes
}
StoragePtr TableFunctionNumbers::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr TableFunctionNumbers::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
if (const auto * function = ast_function->as<ASTFunction>())
{
@ -30,7 +30,7 @@ StoragePtr TableFunctionNumbers::executeImpl(const ASTPtr & ast_function, const
UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0;
UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]);
auto res = StorageSystemNumbers::create(getName(), false, length, offset);
auto res = StorageSystemNumbers::create(table_name, false, length, offset);
res->startup();
return res;
}

View File

@ -17,7 +17,7 @@ public:
static constexpr auto name = "numbers";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
};

View File

@ -25,7 +25,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
ASTs & args_func = ast_function->children;
@ -162,13 +162,13 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
StoragePtr res = remote_table_function_ptr
? StorageDistributed::createWithOwnCluster(
getName(),
table_name,
structure_remote_table,
remote_table_function_ptr,
cluster,
context)
: StorageDistributed::createWithOwnCluster(
getName(),
table_name,
structure_remote_table,
remote_database,
remote_table,

View File

@ -21,7 +21,7 @@ public:
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
std::string name;
bool is_cluster_function;

View File

@ -6,10 +6,10 @@
namespace DB
{
StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const
{
Poco::URI uri(source);
return StorageURL::create(uri, getDatabaseName(), getName(), format, ColumnsDescription{sample_block.getNamesAndTypesList()}, global_context);
return StorageURL::create(uri, getDatabaseName(), table_name, format, ColumnsDescription{sample_block.getNamesAndTypesList()}, global_context);
}
void registerTableFunctionURL(TableFunctionFactory & factory)

View File

@ -20,6 +20,6 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const override;
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const override;
};
}

View File

@ -0,0 +1,27 @@
1
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS t3;
DROP TABLE IF EXISTS t4;
CREATE TABLE t1 AS remote('127.0.0.1', system.one);
SELECT count() FROM t1;
CREATE TABLE t2 AS remote('127.0.0.1', system.numbers);
SELECT * FROM t2 LIMIT 18;
CREATE TABLE t3 AS remote('127.0.0.1', numbers(100));
SELECT * FROM t3 where number > 17 and number < 25;
CREATE TABLE t4 AS numbers(100);
SELECT count() FROM t4 where number > 74;
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE t3;
DROP TABLE t4;

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (19.12.1.1) unstable; urgency=low
clickhouse (19.13.1.1) unstable; urgency=low
* Modified source code
-- clickhouse-release <clickhouse-release@yandex-team.ru> Wed, 10 Jul 2019 22:57:50 +0300
-- clickhouse-release <clickhouse-release@yandex-team.ru> Tue, 23 Jul 2019 11:20:49 +0300

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=19.12.1.*
ARG version=19.13.1.*
RUN apt-get update \
&& apt-get install --yes --no-install-recommends \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=19.12.1.*
ARG version=19.13.1.*
ARG gosu_ver=1.10
RUN apt-get update \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=19.12.1.*
ARG version=19.13.1.*
RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \

View File

@ -322,8 +322,8 @@ Writing to the syslog is also supported. Config example:
Keys:
- user_syslog — Required setting if you want to write to the syslog.
- address — The host[:порт] of syslogd. If omitted, the local daemon is used.
- use_syslog — Required setting if you want to write to the syslog.
- address — The host[:port] of syslogd. If omitted, the local daemon is used.
- hostname — Optional. The name of the host that logs are sent from.
- facility — [The syslog facility keyword](https://en.wikipedia.org/wiki/Syslog#Facility) in uppercase letters with the "LOG_" prefix: (``LOG_USER``, ``LOG_DAEMON``, ``LOG_LOCAL3``, and so on).
Default value: ``LOG_USER`` if ``address`` is specified, ``LOG_DAEMON otherwise.``

View File

@ -1,6 +1,6 @@
# MergeTree {#table_engines-mergetree}
The `MergeTree` engine and other engines of this family (`*MergeTree`) are the most robust ClickHousе table engines.
The `MergeTree` engine and other engines of this family (`*MergeTree`) are the most robust ClickHouse table engines.
The basic idea for `MergeTree` engines family is the following. When you have tremendous amount of a data that should be inserted into the table, you should write them quickly part by part and then merge parts by some rules in background. This method is much more efficient than constantly rewriting data in the storage at the insert.

View File

@ -52,6 +52,12 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name AS [db2.]name2 [ENGINE = engine]
Creates a table with the same structure as another table. You can specify a different engine for the table. If the engine is not specified, the same engine will be used as for the `db2.name2` table.
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name AS table_fucntion()
```
Creates a table with the same structure and data as the value returned by table function.
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name ENGINE = engine AS SELECT ...
```

View File

@ -694,7 +694,7 @@ The query `SELECT sum(x), y FROM t_null_big GROUP BY y` results in:
└────────┴──────┘
```
You can see that `GROUP BY` for `У = NULL` summed up `x`, as if `NULL` is this value.
You can see that `GROUP BY` for `y = NULL` summed up `x`, as if `NULL` is this value.
If you pass several keys to `GROUP BY`, the result will give you all the combinations of the selection, as if `NULL` were a specific value.

View File

@ -319,7 +319,7 @@ ClickHouse проверит условия `min_part_size` и `min_part_size_rat
```
Ключи:
- user_syslog - обязательная настройка, если требуется запись в syslog
- use_syslog - обязательная настройка, если требуется запись в syslog
- address - хост[:порт] демона syslogd. Если не указан, используется локальный
- hostname - опционально, имя хоста, с которого отсылаются логи
- facility - [категория syslog](https://en.wikipedia.org/wiki/Syslog#Facility),

View File

@ -34,6 +34,12 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name AS [db2.]name2 [ENGINE = engine]
Создаёт таблицу с такой же структурой, как другая таблица. Можно указать другой движок для таблицы. Если движок не указан, то будет выбран такой же движок, как у таблицы `db2.name2`.
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name AS table_fucntion()
```
Создаёт таблицу с такой же структурой и данными, как результат соотвествующей табличной функцией.
```sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name ENGINE = engine AS SELECT ...
```

View File

@ -10,7 +10,7 @@ if (DEFINED APPLE_HAVE_CLOCK_GETTIME)
target_compile_definitions(apple_rt PUBLIC -DAPPLE_HAVE_CLOCK_GETTIME=${APPLE_HAVE_CLOCK_GETTIME})
endif ()
add_library(common
add_library (common
src/DateLUT.cpp
src/DateLUTImpl.cpp
src/preciseExp10.c
@ -21,8 +21,10 @@ add_library(common
src/demangle.cpp
src/setTerminalEcho.cpp
src/getThreadNumber.cpp
src/sleep.cpp
src/argsToConfig.cpp
src/StackTrace.cpp
src/Pipe.cpp
include/common/SimpleCache.h
include/common/StackTrace.h
@ -45,7 +47,10 @@ add_library(common
include/common/setTerminalEcho.h
include/common/find_symbols.h
include/common/constexpr_helpers.h
include/common/Pipe.h
include/common/getThreadNumber.h
include/common/sleep.h
include/common/SimpleCache.h
include/ext/bit_cast.h
include/ext/collection_cast.h
@ -59,8 +64,7 @@ add_library(common
include/ext/unlock_guard.h
include/ext/singleton.h
${CONFIG_COMMON}
)
${CONFIG_COMMON})
if (USE_UNWIND)
target_compile_definitions (common PRIVATE USE_UNWIND=1)

View File

@ -0,0 +1,34 @@
#pragma once
#include <unistd.h>
#include <fcntl.h>
#include <stdexcept>
/**
* Struct containing a pipe with lazy initialization.
* Use `open` and `close` methods to manipulate pipe and `fds_rw` field to access
* pipe's file descriptors.
*/
struct LazyPipe
{
int fds_rw[2] = {-1, -1};
LazyPipe() = default;
void open();
void close();
virtual ~LazyPipe() = default;
};
/**
* Struct which opens new pipe on creation and closes it on destruction.
* Use `fds_rw` field to access pipe's file descriptors.
*/
struct Pipe : public LazyPipe
{
Pipe();
~Pipe();
};

View File

@ -35,7 +35,14 @@ public:
StackTrace(NoCapture);
/// Fills stack trace frames with provided sequence
StackTrace(const std::vector<void *> & source_frames);
template <typename Iterator>
StackTrace(Iterator it, Iterator end)
{
while (size < capacity && it != end)
{
frames[size++] = *(it++);
}
}
size_t getSize() const;
const Frames & getFrames() const;

View File

@ -0,0 +1,16 @@
#pragma once
#include <cstdint>
/**
* Sleep functions tolerant to signal interruptions (which can happen
* when query profiler is turned on for example)
*/
void sleepForNanoseconds(uint64_t nanoseconds);
void sleepForMicroseconds(uint64_t microseconds);
void sleepForMilliseconds(uint64_t milliseconds);
void sleepForSeconds(uint64_t seconds);

View File

@ -0,0 +1,45 @@
#include "common/Pipe.h"
void LazyPipe::open()
{
for (int & fd : fds_rw)
{
if (fd >= 0)
{
throw std::logic_error("Pipe is already opened");
}
}
#ifndef __APPLE__
if (0 != pipe2(fds_rw, O_CLOEXEC))
throw std::runtime_error("Cannot create pipe");
#else
if (0 != pipe(fds_rw))
throw std::runtime_error("Cannot create pipe");
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
throw std::runtime_error("Cannot setup auto-close on exec for read end of pipe");
if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
throw std::runtime_error("Cannot setup auto-close on exec for write end of pipe");
#endif
}
void LazyPipe::close()
{
for (int fd : fds_rw)
{
if (fd >= 0)
{
::close(fd);
}
}
}
Pipe::Pipe()
{
open();
}
Pipe::~Pipe()
{
close();
}

View File

@ -195,12 +195,6 @@ StackTrace::StackTrace(NoCapture)
{
}
StackTrace::StackTrace(const std::vector<void *> & source_frames)
{
for (size = 0; size < std::min(source_frames.size(), capacity); ++size)
frames[size] = source_frames[size];
}
void StackTrace::tryCapture()
{
size = 0;

View File

@ -0,0 +1,47 @@
#include "common/sleep.h"
#include <time.h>
#include <errno.h>
/**
* Sleep with nanoseconds precision. Tolerant to signal interruptions
*
* In case query profiler is turned on, all threads spawned for
* query execution are repeatedly interrupted by signals from timer.
* Functions for relative sleep (sleep(3), nanosleep(2), etc.) have
* problems in this setup and man page for nanosleep(2) suggests
* using absolute deadlines, for instance clock_nanosleep(2).
*/
void sleepForNanoseconds(uint64_t nanoseconds)
{
constexpr auto clock_type = CLOCK_MONOTONIC;
struct timespec current_time;
clock_gettime(clock_type, &current_time);
constexpr uint64_t resolution = 1'000'000'000;
struct timespec finish_time = current_time;
finish_time.tv_nsec += nanoseconds % resolution;
const uint64_t extra_second = finish_time.tv_nsec / resolution;
finish_time.tv_nsec %= resolution;
finish_time.tv_sec += (nanoseconds / resolution) + extra_second;
while (clock_nanosleep(clock_type, TIMER_ABSTIME, &finish_time, nullptr) == EINTR);
}
void sleepForMicroseconds(uint64_t microseconds)
{
sleepForNanoseconds(microseconds * 1000);
}
void sleepForMilliseconds(uint64_t milliseconds)
{
sleepForMicroseconds(milliseconds * 1000);
}
void sleepForSeconds(uint64_t seconds)
{
sleepForMilliseconds(seconds * 1000);
}

View File

@ -1,5 +1,4 @@
#include <daemon/BaseDaemon.h>
#include <Common/Config/ConfigProcessor.h>
#include <sys/stat.h>
#include <sys/types.h>
@ -15,6 +14,7 @@
#include <typeinfo>
#include <common/logger_useful.h>
#include <common/ErrorHandlers.h>
#include <common/Pipe.h>
#include <common/StackTrace.h>
#include <sys/time.h>
#include <sys/resource.h>
@ -53,55 +53,6 @@
#include <ucontext.h>
/** For transferring information from signal handler to a separate thread.
* If you need to do something serious in case of a signal (example: write a message to the log),
* then sending information to a separate thread through pipe and doing all the stuff asynchronously
* - is probably the only safe method for doing it.
* (Because it's only safe to use reentrant functions in signal handlers.)
*/
struct Pipe
{
union
{
int fds[2];
struct
{
int read_fd;
int write_fd;
};
};
Pipe()
{
read_fd = -1;
write_fd = -1;
if (0 != pipe(fds))
DB::throwFromErrno("Cannot create pipe", 0);
}
void close()
{
if (-1 != read_fd)
{
::close(read_fd);
read_fd = -1;
}
if (-1 != write_fd)
{
::close(write_fd);
write_fd = -1;
}
}
~Pipe()
{
close();
}
};
Pipe signal_pipe;
@ -123,7 +74,7 @@ using signal_function = void(int, siginfo_t*, void*);
static void writeSignalIDtoSignalPipe(int sig)
{
char buf[buf_size];
DB::WriteBufferFromFileDescriptor out(signal_pipe.write_fd, buf_size, buf);
DB::WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], buf_size, buf);
DB::writeBinary(sig, out);
out.next();
}
@ -151,7 +102,7 @@ static void faultSignalHandler(int sig, siginfo_t * info, void * context)
already_signal_handled = true;
char buf[buf_size];
DB::WriteBufferFromFileDescriptor out(signal_pipe.write_fd, buf_size, buf);
DB::WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], buf_size, buf);
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);
@ -194,7 +145,7 @@ public:
void run()
{
char buf[buf_size];
DB::ReadBufferFromFileDescriptor in(signal_pipe.read_fd, buf_size, buf);
DB::ReadBufferFromFileDescriptor in(signal_pipe.fds_rw[0], buf_size, buf);
while (!in.eof())
{
@ -296,7 +247,7 @@ static void terminate_handler()
log_message.resize(buf_size - 16);
char buf[buf_size];
DB::WriteBufferFromFileDescriptor out(signal_pipe.write_fd, buf_size, buf);
DB::WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], buf_size, buf);
DB::writeBinary(static_cast<int>(SignalListener::StdTerminate), out);
DB::writeBinary(getThreadNumber(), out);
@ -532,7 +483,7 @@ void BaseDaemon::closeFDs()
for (const auto & fd_str : fds)
{
int fd = DB::parse<int>(fd_str);
if (fd > 2 && fd != signal_pipe.read_fd && fd != signal_pipe.write_fd)
if (fd > 2 && fd != signal_pipe.fds_rw[0] && fd != signal_pipe.fds_rw[1])
::close(fd);
}
}
@ -545,7 +496,7 @@ void BaseDaemon::closeFDs()
#endif
max_fd = 256; /// bad fallback
for (int fd = 3; fd < max_fd; ++fd)
if (fd != signal_pipe.read_fd && fd != signal_pipe.write_fd)
if (fd != signal_pipe.fds_rw[0] && fd != signal_pipe.fds_rw[1])
::close(fd);
}
}

View File

@ -8,6 +8,8 @@
#include <mysqlxx/Pool.h>
#include <common/sleep.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
@ -133,7 +135,7 @@ Pool::Entry Pool::Get()
}
lock.unlock();
::sleep(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
lock.lock();
}
}
@ -193,7 +195,7 @@ void Pool::Entry::forceConnected() const
if (first)
first = false;
else
::sleep(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
app.logger().information("MYSQL: Reconnecting to " + pool->description);
data->conn.connect(