Merge remote-tracking branch 'origin/master' into pr-local-plan

This commit is contained in:
Igor Nikonov 2024-07-19 08:24:54 +00:00
commit 75f7816c46
46 changed files with 1266 additions and 891 deletions

View File

@ -9,7 +9,6 @@ Columns:
- `name` ([String](../../sql-reference/data-types/string.md)) The name of the function. - `name` ([String](../../sql-reference/data-types/string.md)) The name of the function.
- `is_aggregate` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Whether the function is an aggregate function. - `is_aggregate` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Whether the function is an aggregate function.
- `is_deterministic` ([Nullable](../../sql-reference/data-types/nullable.md)([UInt8](../../sql-reference/data-types/int-uint.md))) - Whether the function is deterministic.
- `case_insensitive`, ([UInt8](../../sql-reference/data-types/int-uint.md)) - Whether the function name can be used case-insensitively. - `case_insensitive`, ([UInt8](../../sql-reference/data-types/int-uint.md)) - Whether the function name can be used case-insensitively.
- `alias_to`, ([String](../../sql-reference/data-types/string.md)) - The original function name, if the function name is an alias. - `alias_to`, ([String](../../sql-reference/data-types/string.md)) - The original function name, if the function name is an alias.
- `create_query`, ([String](../../sql-reference/data-types/enum.md)) - Unused. - `create_query`, ([String](../../sql-reference/data-types/enum.md)) - Unused.

View File

@ -21,6 +21,7 @@
#include <Common/StringUtils.h> #include <Common/StringUtils.h>
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <Common/NetException.h> #include <Common/NetException.h>
#include <Common/SignalHandlers.h>
#include <Common/tryGetFileNameByFileDescriptor.h> #include <Common/tryGetFileNameByFileDescriptor.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
@ -302,7 +303,13 @@ public:
}; };
ClientBase::~ClientBase() = default; ClientBase::~ClientBase()
{
writeSignalIDtoSignalPipe(SignalListener::StopThread);
signal_listener_thread.join();
HandledSignals::instance().reset();
}
ClientBase::ClientBase( ClientBase::ClientBase(
int in_fd_, int in_fd_,
int out_fd_, int out_fd_,
@ -3072,6 +3079,8 @@ void ClientBase::init(int argc, char ** argv)
("max_memory_usage_in_client", po::value<std::string>(), "Set memory limit in client/local server") ("max_memory_usage_in_client", po::value<std::string>(), "Set memory limit in client/local server")
("fuzzer-args", po::value<std::string>(), "Command line arguments for the LLVM's libFuzzer driver. Only relevant if the application is compiled with libFuzzer.") ("fuzzer-args", po::value<std::string>(), "Command line arguments for the LLVM's libFuzzer driver. Only relevant if the application is compiled with libFuzzer.")
("client_logs_file", po::value<std::string>(), "Path to a file for writing client logs. Currently we only have fatal logs (when the client crashes)")
; ;
addOptions(options_description); addOptions(options_description);
@ -3236,6 +3245,25 @@ void ClientBase::init(int argc, char ** argv)
total_memory_tracker.setDescription("(total)"); total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
} }
/// Print stacktrace in case of crash
HandledSignals::instance().setupTerminateHandler();
HandledSignals::instance().setupCommonDeadlySignalHandlers();
/// We don't setup signal handlers for SIGINT, SIGQUIT, SIGTERM because we don't
/// have an option for client to shutdown gracefully.
fatal_channel_ptr = new Poco::SplitterChannel;
fatal_console_channel_ptr = new Poco::ConsoleChannel;
fatal_channel_ptr->addChannel(fatal_console_channel_ptr);
if (options.count("client_logs_file"))
{
fatal_file_channel_ptr = new Poco::SimpleFileChannel(options["client_logs_file"].as<std::string>());
fatal_channel_ptr->addChannel(fatal_file_channel_ptr);
}
fatal_log = createLogger("ClientBase", fatal_channel_ptr.get(), Poco::Message::PRIO_FATAL);
signal_listener = std::make_unique<SignalListener>(nullptr, fatal_log);
signal_listener_thread.start(*signal_listener);
} }
} }

View File

@ -12,6 +12,9 @@
#include <Core/ExternalTable.h> #include <Core/ExternalTable.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Poco/Util/Application.h> #include <Poco/Util/Application.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/SimpleFileChannel.h>
#include <Poco/SplitterChannel.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Client/Suggest.h> #include <Client/Suggest.h>
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
@ -212,6 +215,13 @@ protected:
SharedContextHolder shared_context; SharedContextHolder shared_context;
ContextMutablePtr global_context; ContextMutablePtr global_context;
LoggerPtr fatal_log;
Poco::AutoPtr<Poco::SplitterChannel> fatal_channel_ptr;
Poco::AutoPtr<Poco::Channel> fatal_console_channel_ptr;
Poco::AutoPtr<Poco::Channel> fatal_file_channel_ptr;
Poco::Thread signal_listener_thread;
std::unique_ptr<Poco::Runnable> signal_listener;
bool is_interactive = false; /// Use either interactive line editing interface or batch mode. bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
bool is_multiquery = false; bool is_multiquery = false;
bool delayed_interactive = false; bool delayed_interactive = false;

View File

@ -84,12 +84,18 @@ public:
return result; return result;
} }
void append(Self && other) // append items for other inscnace only if there is no such item in current instance
void appendIfUniq(Self && other)
{ {
auto middle_idx = records.size(); auto middle_idx = records.size();
std::move(other.records.begin(), other.records.end(), std::back_inserter(records)); std::move(other.records.begin(), other.records.end(), std::back_inserter(records));
// merge is stable
std::inplace_merge(records.begin(), records.begin() + middle_idx, records.end()); std::inplace_merge(records.begin(), records.begin() + middle_idx, records.end());
chassert(isUniqTypes()); // remove duplicates
records.erase(std::unique(records.begin(), records.end()), records.end());
assert(std::is_sorted(records.begin(), records.end()));
assert(isUniqTypes());
} }
template <class T> template <class T>
@ -142,7 +148,6 @@ private:
bool isUniqTypes() const bool isUniqTypes() const
{ {
auto uniq_it = std::adjacent_find(records.begin(), records.end()); auto uniq_it = std::adjacent_find(records.begin(), records.end());
return uniq_it == records.end(); return uniq_it == records.end();
} }
@ -161,8 +166,6 @@ private:
records.emplace(it, type_idx, item); records.emplace(it, type_idx, item);
chassert(isUniqTypes());
} }
Records::const_iterator getImpl(std::type_index type_idx) const Records::const_iterator getImpl(std::type_index type_idx) const

View File

@ -116,32 +116,32 @@ public:
return elements; return elements;
} }
bool exists(const std::string & path) const override bool exists(const std::string & file_name) const override
{ {
return fs::exists(getPath(path)); return fs::exists(getPath(file_name));
} }
std::string read(const std::string & path) const override std::string read(const std::string & file_name) const override
{ {
ReadBufferFromFile in(getPath(path)); ReadBufferFromFile in(getPath(file_name));
std::string data; std::string data;
readStringUntilEOF(data, in); readStringUntilEOF(data, in);
return data; return data;
} }
void write(const std::string & path, const std::string & data, bool replace) override void write(const std::string & file_name, const std::string & data, bool replace) override
{ {
if (!replace && fs::exists(path)) if (!replace && fs::exists(file_name))
{ {
throw Exception( throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS, ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Metadata file {} for named collection already exists", "Metadata file {} for named collection already exists",
path); file_name);
} }
fs::create_directories(root_path); fs::create_directories(root_path);
auto tmp_path = getPath(path + ".tmp"); auto tmp_path = getPath(file_name + ".tmp");
WriteBufferFromFile out(tmp_path, data.size(), O_WRONLY | O_CREAT | O_EXCL); WriteBufferFromFile out(tmp_path, data.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(data, out); writeString(data, out);
@ -150,28 +150,32 @@ public:
out.sync(); out.sync();
out.close(); out.close();
fs::rename(tmp_path, getPath(path)); fs::rename(tmp_path, getPath(file_name));
} }
void remove(const std::string & path) override void remove(const std::string & file_name) override
{ {
if (!removeIfExists(getPath(path))) if (!removeIfExists(file_name))
{ {
throw Exception( throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST, ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove `{}`, because it doesn't exist", path); "Cannot remove `{}`, because it doesn't exist", file_name);
} }
} }
bool removeIfExists(const std::string & path) override bool removeIfExists(const std::string & file_name) override
{ {
return fs::remove(getPath(path)); return fs::remove(getPath(file_name));
} }
private: private:
std::string getPath(const std::string & path) const std::string getPath(const std::string & file_name) const
{ {
return fs::path(root_path) / path; const auto file_name_as_path = fs::path(file_name);
if (file_name_as_path.is_absolute())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Filename {} cannot be an absolute path", file_name);
return fs::path(root_path) / file_name_as_path;
} }
/// Delete .tmp files. They could be left undeleted in case of /// Delete .tmp files. They could be left undeleted in case of
@ -264,49 +268,49 @@ public:
return children; return children;
} }
bool exists(const std::string & path) const override bool exists(const std::string & file_name) const override
{ {
return getClient()->exists(getPath(path)); return getClient()->exists(getPath(file_name));
} }
std::string read(const std::string & path) const override std::string read(const std::string & file_name) const override
{ {
return getClient()->get(getPath(path)); return getClient()->get(getPath(file_name));
} }
void write(const std::string & path, const std::string & data, bool replace) override void write(const std::string & file_name, const std::string & data, bool replace) override
{ {
if (replace) if (replace)
{ {
getClient()->createOrUpdate(getPath(path), data, zkutil::CreateMode::Persistent); getClient()->createOrUpdate(getPath(file_name), data, zkutil::CreateMode::Persistent);
} }
else else
{ {
auto code = getClient()->tryCreate(getPath(path), data, zkutil::CreateMode::Persistent); auto code = getClient()->tryCreate(getPath(file_name), data, zkutil::CreateMode::Persistent);
if (code == Coordination::Error::ZNODEEXISTS) if (code == Coordination::Error::ZNODEEXISTS)
{ {
throw Exception( throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS, ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Metadata file {} for named collection already exists", "Metadata file {} for named collection already exists",
path); file_name);
} }
} }
} }
void remove(const std::string & path) override void remove(const std::string & file_name) override
{ {
getClient()->remove(getPath(path)); getClient()->remove(getPath(file_name));
} }
bool removeIfExists(const std::string & path) override bool removeIfExists(const std::string & file_name) override
{ {
auto code = getClient()->tryRemove(getPath(path)); auto code = getClient()->tryRemove(getPath(file_name));
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
return true; return true;
if (code == Coordination::Error::ZNONODE) if (code == Coordination::Error::ZNONODE)
return false; return false;
throw Coordination::Exception::fromPath(code, getPath(path)); throw Coordination::Exception::fromPath(code, getPath(file_name));
} }
private: private:
@ -320,9 +324,13 @@ private:
return zookeeper_client; return zookeeper_client;
} }
std::string getPath(const std::string & path) const std::string getPath(const std::string & file_name) const
{ {
return fs::path(root_path) / path; const auto file_name_as_path = fs::path(file_name);
if (file_name_as_path.is_absolute())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Filename {} cannot be an absolute path", file_name);
return fs::path(root_path) / file_name_as_path;
} }
}; };

View File

@ -0,0 +1,635 @@
#include <Common/SignalHandlers.h>
#include <Common/config_version.h>
#include <Common/getHashOfLoadedBinary.h>
#include <Common/CurrentThread.h>
#include <Daemon/BaseDaemon.h>
#include <Daemon/SentryWriter.h>
#include <base/sleep.h>
#include <base/getThreadId.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Core/Settings.h>
#pragma clang diagnostic ignored "-Wreserved-identifier"
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int CANNOT_SEND_SIGNAL;
}
}
using namespace DB;
void call_default_signal_handler(int sig)
{
if (SIG_ERR == signal(sig, SIG_DFL))
throw ErrnoException(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler");
if (0 != raise(sig))
throw ErrnoException(ErrorCodes::CANNOT_SEND_SIGNAL, "Cannot send signal");
}
void writeSignalIDtoSignalPipe(int sig)
{
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
char buf[signal_pipe_buf_size];
auto & signal_pipe = HandledSignals::instance().signal_pipe;
WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf);
writeBinary(sig, out);
out.next();
errno = saved_errno;
}
void closeLogsSignalHandler(int sig, siginfo_t *, void *)
{
DENY_ALLOCATIONS_IN_SCOPE;
writeSignalIDtoSignalPipe(sig);
}
void terminateRequestedSignalHandler(int sig, siginfo_t *, void *)
{
DENY_ALLOCATIONS_IN_SCOPE;
writeSignalIDtoSignalPipe(sig);
}
void signalHandler(int sig, siginfo_t * info, void * context)
{
if (asynchronous_stack_unwinding && sig == SIGSEGV)
siglongjmp(asynchronous_stack_unwinding_signal_jump_buffer, 1);
DENY_ALLOCATIONS_IN_SCOPE;
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
char buf[signal_pipe_buf_size];
auto & signal_pipe = HandledSignals::instance().signal_pipe;
WriteBufferFromFileDescriptorDiscardOnFailure out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf);
const ucontext_t * signal_context = reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(*signal_context);
#if USE_GWP_ASAN
if (const auto fault_address = reinterpret_cast<uintptr_t>(info->si_addr);
GWPAsan::isGWPAsanError(fault_address))
GWPAsan::printReport(fault_address);
#endif
writeBinary(sig, out);
writePODBinary(*info, out);
writePODBinary(signal_context, out);
writePODBinary(stack_trace, out);
writeVectorBinary(Exception::enable_job_stack_trace ? Exception::thread_frame_pointers : std::vector<StackTrace::FramePointers>{}, out);
writeBinary(static_cast<UInt32>(getThreadId()), out);
writePODBinary(current_thread, out);
out.next();
if (sig != SIGTSTP) /// This signal is used for debugging.
{
/// The time that is usually enough for separate thread to print info into log.
/// Under MSan full stack unwinding with DWARF info about inline functions takes 101 seconds in one case.
for (size_t i = 0; i < 300; ++i)
{
/// We will synchronize with the thread printing the messages with an atomic variable to finish earlier.
if (HandledSignals::instance().fatal_error_printed.test())
break;
/// This coarse method of synchronization is perfectly ok for fatal signals.
sleepForSeconds(1);
}
/// Wait for all logs flush operations
sleepForSeconds(3);
call_default_signal_handler(sig);
}
errno = saved_errno;
}
[[noreturn]] void terminate_handler()
{
static thread_local bool terminating = false;
if (terminating)
abort();
terminating = true;
std::string log_message;
if (std::current_exception())
log_message = "Terminate called for uncaught exception:\n" + getCurrentExceptionMessage(true);
else
log_message = "Terminate called without an active exception";
/// POSIX.1 says that write(2)s of less than PIPE_BUF bytes must be atomic - man 7 pipe
/// And the buffer should not be too small because our exception messages can be large.
static constexpr size_t buf_size = PIPE_BUF;
if (log_message.size() > buf_size - 16)
log_message.resize(buf_size - 16);
char buf[buf_size];
auto & signal_pipe = HandledSignals::instance().signal_pipe;
WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], buf_size, buf);
writeBinary(static_cast<int>(SignalListener::StdTerminate), out);
writeBinary(static_cast<UInt32>(getThreadId()), out);
writeBinary(log_message, out);
out.next();
abort();
}
#if defined(SANITIZER)
template <typename T>
struct ValueHolder
{
ValueHolder(T value_) : value(value_)
{}
T value;
};
extern "C" void __sanitizer_set_death_callback(void (*)());
/// Sanitizers may not expect some function calls from death callback.
/// Let's try to disable instrumentation to avoid possible issues.
/// However, this callback may call other functions that are still instrumented.
/// We can try [[clang::always_inline]] attribute for statements in future (available in clang-15)
/// See https://github.com/google/sanitizers/issues/1543 and https://github.com/google/sanitizers/issues/1549.
static DISABLE_SANITIZER_INSTRUMENTATION void sanitizerDeathCallback()
{
DENY_ALLOCATIONS_IN_SCOPE;
/// Also need to send data via pipe. Otherwise it may lead to deadlocks or failures in printing diagnostic info.
char buf[signal_pipe_buf_size];
auto & signal_pipe = HandledSignals::instance().signal_pipe;
WriteBufferFromFileDescriptorDiscardOnFailure out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf);
const StackTrace stack_trace;
writeBinary(SignalListener::SanitizerTrap, out);
writePODBinary(stack_trace, out);
/// We create a dummy struct with a constructor so DISABLE_SANITIZER_INSTRUMENTATION is not applied to it
/// otherwise, Memory sanitizer can't know that values initiialized inside this function are actually initialized
/// because instrumentations are disabled leading to false positives later on
ValueHolder<UInt32> thread_id{static_cast<UInt32>(getThreadId())};
writeBinary(thread_id.value, out);
writePODBinary(current_thread, out);
out.next();
/// The time that is usually enough for separate thread to print info into log.
sleepForSeconds(20);
}
#endif
void HandledSignals::addSignalHandler(const std::vector<int> & signals, signal_function handler, bool register_signal)
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_sigaction = handler;
sa.sa_flags = SA_SIGINFO;
#if defined(OS_DARWIN)
sigemptyset(&sa.sa_mask);
for (auto signal : signals)
sigaddset(&sa.sa_mask, signal);
#else
if (sigemptyset(&sa.sa_mask))
throw Poco::Exception("Cannot set signal handler.");
for (auto signal : signals)
if (sigaddset(&sa.sa_mask, signal))
throw Poco::Exception("Cannot set signal handler.");
#endif
for (auto signal : signals)
if (sigaction(signal, &sa, nullptr))
throw Poco::Exception("Cannot set signal handler.");
if (register_signal)
std::copy(signals.begin(), signals.end(), std::back_inserter(handled_signals));
}
void blockSignals(const std::vector<int> & signals)
{
sigset_t sig_set;
#if defined(OS_DARWIN)
sigemptyset(&sig_set);
for (auto signal : signals)
sigaddset(&sig_set, signal);
#else
if (sigemptyset(&sig_set))
throw Poco::Exception("Cannot block signal.");
for (auto signal : signals)
if (sigaddset(&sig_set, signal))
throw Poco::Exception("Cannot block signal.");
#endif
if (pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
throw Poco::Exception("Cannot block signal.");
}
void SignalListener::run()
{
static_assert(PIPE_BUF >= 512);
static_assert(signal_pipe_buf_size <= PIPE_BUF, "Only write of PIPE_BUF to pipe is atomic and the minimal known PIPE_BUF across supported platforms is 512");
char buf[signal_pipe_buf_size];
auto & signal_pipe = HandledSignals::instance().signal_pipe;
ReadBufferFromFileDescriptor in(signal_pipe.fds_rw[0], signal_pipe_buf_size, buf);
while (!in.eof())
{
int sig = 0;
readBinary(sig, in);
// We may log some specific signals afterwards, with different log
// levels and more info, but for completeness we log all signals
// here at trace level.
// Don't use strsignal here, because it's not thread-safe.
LOG_TRACE(log, "Received signal {}", sig);
if (sig == StopThread)
{
LOG_INFO(log, "Stop SignalListener thread");
break;
}
else if (sig == SIGHUP)
{
LOG_DEBUG(log, "Received signal to close logs.");
BaseDaemon::instance().closeLogs(BaseDaemon::instance().logger());
LOG_INFO(log, "Opened new log file after received signal.");
}
else if (sig == StdTerminate)
{
UInt32 thread_num;
std::string message;
readBinary(thread_num, in);
readBinary(message, in);
onTerminate(message, thread_num);
}
else if (sig == SIGINT ||
sig == SIGQUIT ||
sig == SIGTERM)
{
if (daemon)
daemon->handleSignal(sig);
}
else
{
siginfo_t info{};
ucontext_t * context{};
StackTrace stack_trace(NoCapture{});
std::vector<StackTrace::FramePointers> thread_frame_pointers;
UInt32 thread_num{};
ThreadStatus * thread_ptr{};
if (sig != SanitizerTrap)
{
readPODBinary(info, in);
readPODBinary(context, in);
}
readPODBinary(stack_trace, in);
if (sig != SanitizerTrap)
readVectorBinary(thread_frame_pointers, in);
readBinary(thread_num, in);
readPODBinary(thread_ptr, in);
/// This allows to receive more signals if failure happens inside onFault function.
/// Example: segfault while symbolizing stack trace.
try
{
std::thread([=, this] { onFault(sig, info, context, stack_trace, thread_frame_pointers, thread_num, thread_ptr); }).detach();
}
catch (...)
{
/// Likely cannot allocate thread
onFault(sig, info, context, stack_trace, thread_frame_pointers, thread_num, thread_ptr);
}
}
}
}
void SignalListener::onTerminate(std::string_view message, UInt32 thread_num) const
{
size_t pos = message.find('\n');
LOG_FATAL(log, "(version {}{}, build id: {}, git hash: {}) (from thread {}) {}",
VERSION_STRING, VERSION_OFFICIAL, daemon ? daemon->build_id : "", daemon ? daemon->git_hash : "", thread_num, message.substr(0, pos));
/// Print trace from std::terminate exception line-by-line to make it easy for grep.
while (pos != std::string_view::npos)
{
++pos;
size_t next_pos = message.find('\n', pos);
size_t size = next_pos;
if (next_pos != std::string_view::npos)
size = next_pos - pos;
LOG_FATAL(log, fmt::runtime(message.substr(pos, size)));
pos = next_pos;
}
}
void SignalListener::onFault(
int sig,
const siginfo_t & info,
ucontext_t * context,
const StackTrace & stack_trace,
const std::vector<StackTrace::FramePointers> & thread_frame_pointers,
UInt32 thread_num,
DB::ThreadStatus * thread_ptr) const
try
{
ThreadStatus thread_status;
/// First log those fields that are safe to access and that should not cause new fault.
/// That way we will have some duplicated info in the log but we don't loose important info
/// in case of double fault.
LOG_FATAL(log, "########## Short fault info ############");
LOG_FATAL(log, "(version {}{}, build id: {}, git hash: {}) (from thread {}) Received signal {}",
VERSION_STRING, VERSION_OFFICIAL, daemon ? daemon->build_id : "", daemon ? daemon->git_hash : "",
thread_num, sig);
std::string signal_description = "Unknown signal";
/// Some of these are not really signals, but our own indications on failure reason.
if (sig == StdTerminate)
signal_description = "std::terminate";
else if (sig == SanitizerTrap)
signal_description = "sanitizer trap";
else if (sig >= 0)
signal_description = strsignal(sig); // NOLINT(concurrency-mt-unsafe) // it is not thread-safe but ok in this context
LOG_FATAL(log, "Signal description: {}", signal_description);
String error_message;
if (sig != SanitizerTrap)
error_message = signalToErrorMessage(sig, info, *context);
else
error_message = "Sanitizer trap.";
LOG_FATAL(log, fmt::runtime(error_message));
String bare_stacktrace_str;
if (stack_trace.getSize())
{
/// Write bare stack trace (addresses) just in case if we will fail to print symbolized stack trace.
/// NOTE: This still require memory allocations and mutex lock inside logger.
/// BTW we can also print it to stderr using write syscalls.
WriteBufferFromOwnString bare_stacktrace;
writeString("Stack trace:", bare_stacktrace);
for (size_t i = stack_trace.getOffset(); i < stack_trace.getSize(); ++i)
{
writeChar(' ', bare_stacktrace);
writePointerHex(stack_trace.getFramePointers()[i], bare_stacktrace);
}
LOG_FATAL(log, fmt::runtime(bare_stacktrace.str()));
bare_stacktrace_str = bare_stacktrace.str();
}
/// Now try to access potentially unsafe data in thread_ptr.
String query_id;
String query;
/// Send logs from this thread to client if possible.
/// It will allow client to see failure messages directly.
if (thread_ptr)
{
query_id = thread_ptr->getQueryId();
query = thread_ptr->getQueryForLog();
if (auto logs_queue = thread_ptr->getInternalTextLogsQueue())
{
CurrentThread::attachInternalTextLogsQueue(logs_queue, LogsLevel::trace);
}
}
LOG_FATAL(log, "########################################");
if (query_id.empty())
{
LOG_FATAL(log, "(version {}{}, build id: {}, git hash: {}) (from thread {}) (no query) Received signal {} ({})",
VERSION_STRING, VERSION_OFFICIAL, daemon ? daemon->build_id : "", daemon ? daemon->git_hash : "",
thread_num, signal_description, sig);
}
else
{
LOG_FATAL(log, "(version {}{}, build id: {}, git hash: {}) (from thread {}) (query_id: {}) (query: {}) Received signal {} ({})",
VERSION_STRING, VERSION_OFFICIAL, daemon ? daemon->build_id : "", daemon ? daemon->git_hash : "",
thread_num, query_id, query, signal_description, sig);
}
LOG_FATAL(log, fmt::runtime(error_message));
if (!bare_stacktrace_str.empty())
{
LOG_FATAL(log, fmt::runtime(bare_stacktrace_str));
}
/// Write symbolized stack trace line by line for better grep-ability.
stack_trace.toStringEveryLine([&](std::string_view s) { LOG_FATAL(log, fmt::runtime(s)); });
/// In case it's a scheduled job write all previous jobs origins call stacks
std::for_each(thread_frame_pointers.rbegin(), thread_frame_pointers.rend(),
[this](const StackTrace::FramePointers & frame_pointers)
{
if (size_t size = std::ranges::find(frame_pointers, nullptr) - frame_pointers.begin())
{
LOG_FATAL(log, "========================================");
WriteBufferFromOwnString bare_stacktrace;
writeString("Job's origin stack trace:", bare_stacktrace);
std::for_each_n(frame_pointers.begin(), size,
[&bare_stacktrace](const void * ptr)
{
writeChar(' ', bare_stacktrace);
writePointerHex(ptr, bare_stacktrace);
}
);
LOG_FATAL(log, fmt::runtime(bare_stacktrace.str()));
StackTrace::toStringEveryLine(const_cast<void **>(frame_pointers.data()), 0, size, [this](std::string_view s) { LOG_FATAL(log, fmt::runtime(s)); });
}
}
);
#if defined(OS_LINUX)
/// Write information about binary checksum. It can be difficult to calculate, so do it only after printing stack trace.
/// Please keep the below log messages in-sync with the ones in programs/server/Server.cpp
if (daemon && daemon->stored_binary_hash.empty())
{
LOG_FATAL(log, "Integrity check of the executable skipped because the reference checksum could not be read.");
}
else if (daemon)
{
String calculated_binary_hash = getHashOfLoadedBinaryHex();
if (calculated_binary_hash == daemon->stored_binary_hash)
{
LOG_FATAL(log, "Integrity check of the executable successfully passed (checksum: {})", calculated_binary_hash);
}
else
{
LOG_FATAL(
log,
"Calculated checksum of the executable ({0}) does not correspond"
" to the reference checksum stored in the executable ({1})."
" This may indicate one of the following:"
" - the executable was changed just after startup;"
" - the executable was corrupted on disk due to faulty hardware;"
" - the loaded executable was corrupted in memory due to faulty hardware;"
" - the file was intentionally modified;"
" - a logical error in the code.",
calculated_binary_hash,
daemon->stored_binary_hash);
}
}
#endif
/// Write crash to system.crash_log table if available.
if (collectCrashLog)
collectCrashLog(sig, thread_num, query_id, stack_trace);
Context::getGlobalContextInstance()->handleCrash();
/// Send crash report to developers (if configured)
if (sig != SanitizerTrap)
{
if (daemon)
{
if (auto * sentry = SentryWriter::getInstance())
sentry->onSignal(sig, error_message, stack_trace.getFramePointers(), stack_trace.getOffset(), stack_trace.getSize());
}
/// Advice the user to send it manually.
if (std::string_view(VERSION_OFFICIAL).contains("official build"))
{
const auto & date_lut = DateLUT::instance();
/// Approximate support period, upper bound.
if (time(nullptr) - date_lut.makeDate(2000 + VERSION_MAJOR, VERSION_MINOR, 1) < (365 + 30) * 86400)
{
LOG_FATAL(log, "Report this error to https://github.com/ClickHouse/ClickHouse/issues");
}
else
{
LOG_FATAL(log, "ClickHouse version {} is old and should be upgraded to the latest version.", VERSION_STRING);
}
}
else
{
LOG_FATAL(log, "This ClickHouse version is not official and should be upgraded to the official build.");
}
}
/// List changed settings.
if (!query_id.empty())
{
ContextPtr query_context = thread_ptr->getQueryContext();
if (query_context)
{
String changed_settings = query_context->getSettingsRef().toString();
if (changed_settings.empty())
LOG_FATAL(log, "No settings were changed");
else
LOG_FATAL(log, "Changed settings: {}", changed_settings);
}
}
/// When everything is done, we will try to send these error messages to the client.
if (thread_ptr)
thread_ptr->onFatalError();
HandledSignals::instance().fatal_error_printed.test_and_set();
}
catch (...)
{
/// onFault is called from the std::thread, and it should catch all exceptions; otherwise, you can get unrelated fatal errors.
PreformattedMessage message = getCurrentExceptionMessageAndPattern(true);
LOG_FATAL(log, message);
}
HandledSignals::HandledSignals()
{
signal_pipe.setNonBlockingWrite();
signal_pipe.tryIncreaseSize(1 << 20);
}
void HandledSignals::reset()
{
/// Reset signals to SIG_DFL to avoid trying to write to the signal_pipe that will be closed after.
for (int sig : handled_signals)
{
if (SIG_ERR == signal(sig, SIG_DFL))
{
try
{
throw ErrnoException(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler");
}
catch (ErrnoException &)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
signal_pipe.close();
}
HandledSignals::~HandledSignals()
{
reset();
};
HandledSignals & HandledSignals::instance()
{
static HandledSignals res;
return res;
}
void HandledSignals::setupTerminateHandler()
{
std::set_terminate(terminate_handler);
}
void HandledSignals::setupCommonDeadlySignalHandlers()
{
/// SIGTSTP is added for debugging purposes. To output a stack trace of any running thread at anytime.
addSignalHandler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP, SIGTRAP}, signalHandler, true);
#if defined(SANITIZER)
__sanitizer_set_death_callback(sanitizerDeathCallback);
#endif
}
void HandledSignals::setupCommonTerminateRequestSignalHandlers()
{
addSignalHandler({SIGINT, SIGQUIT, SIGTERM}, terminateRequestedSignalHandler, true);
}

110
src/Common/SignalHandlers.h Normal file
View File

@ -0,0 +1,110 @@
#pragma once
#include <csignal>
#include <base/defines.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <Common/ThreadStatus.h>
#include <Core/Types.h>
#include <Poco/Runnable.h>
class BaseDaemon;
/** Reset signal handler to the default and send signal to itself.
* It's called from user signal handler to write core dump.
*/
void call_default_signal_handler(int sig);
const size_t signal_pipe_buf_size =
sizeof(int)
+ sizeof(siginfo_t)
+ sizeof(ucontext_t*)
+ sizeof(StackTrace)
+ sizeof(UInt64)
+ sizeof(UInt32)
+ sizeof(void*);
using signal_function = void(int, siginfo_t*, void*);
void writeSignalIDtoSignalPipe(int sig);
/** Signal handler for HUP */
void closeLogsSignalHandler(int sig, siginfo_t *, void *);
void terminateRequestedSignalHandler(int sig, siginfo_t *, void *);
/** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log.
*/
void signalHandler(int sig, siginfo_t * info, void * context);
/** To use with std::set_terminate.
* Collects slightly more info than __gnu_cxx::__verbose_terminate_handler,
* and send it to pipe. Other thread will read this info from pipe and asynchronously write it to log.
* Look at libstdc++-v3/libsupc++/vterminate.cc for example.
*/
[[noreturn]] void terminate_handler();
/// Avoid link time dependency on DB/Interpreters - will use this function only when linked.
__attribute__((__weak__)) void collectCrashLog(
Int32 signal, UInt64 thread_id, const String & query_id, const StackTrace & stack_trace);
void blockSignals(const std::vector<int> & signals);
/** The thread that read info about signal or std::terminate from pipe.
* On HUP, close log files (for new files to be opened later).
* On information about std::terminate, write it to log.
* On other signals, write info to log.
*/
class SignalListener : public Poco::Runnable
{
public:
static constexpr int StdTerminate = -1;
static constexpr int StopThread = -2;
static constexpr int SanitizerTrap = -3;
explicit SignalListener(BaseDaemon * daemon_, LoggerPtr log_)
: daemon(daemon_), log(log_)
{
}
void run() override;
private:
BaseDaemon * daemon;
LoggerPtr log;
void onTerminate(std::string_view message, UInt32 thread_num) const;
void onFault(
int sig,
const siginfo_t & info,
ucontext_t * context,
const StackTrace & stack_trace,
const std::vector<StackTrace::FramePointers> & thread_frame_pointers,
UInt32 thread_num,
DB::ThreadStatus * thread_ptr) const;
};
struct HandledSignals
{
std::vector<int> handled_signals;
DB::PipeFDs signal_pipe;
std::atomic_flag fatal_error_printed;
HandledSignals();
~HandledSignals();
void setupTerminateHandler();
void setupCommonDeadlySignalHandlers();
void setupCommonTerminateRequestSignalHandlers();
void addSignalHandler(const std::vector<int> & signals, signal_function handler, bool register_signal);
void reset();
static HandledSignals & instance();
};

View File

@ -151,7 +151,7 @@ class IColumn;
M(UInt64, max_local_write_bandwidth, 0, "The maximum speed of local writes in bytes per second.", 0) \ M(UInt64, max_local_write_bandwidth, 0, "The maximum speed of local writes in bytes per second.", 0) \
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams, and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \ M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams, and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \ M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \
\ M(Bool, dictionary_validate_primary_key_type, false, "Validate primary key type for dictionaries. By default id type for simple layouts will be implicitly converted to UInt64.", 0) \
M(Bool, distributed_insert_skip_read_only_replicas, false, "If true, INSERT into Distributed will skip read-only replicas.", 0) \ M(Bool, distributed_insert_skip_read_only_replicas, false, "If true, INSERT into Distributed will skip read-only replicas.", 0) \
M(Bool, distributed_foreground_insert, false, "If setting is enabled, insert query into distributed waits until data are sent to all nodes in a cluster. \n\nEnables or disables synchronous data insertion into a `Distributed` table.\n\nBy default, when inserting data into a Distributed table, the ClickHouse server sends data to cluster nodes in the background. When `distributed_foreground_insert` = 1, the data is processed synchronously, and the `INSERT` operation succeeds only after all the data is saved on all shards (at least one replica for each shard if `internal_replication` is true).", 0) ALIAS(insert_distributed_sync) \ M(Bool, distributed_foreground_insert, false, "If setting is enabled, insert query into distributed waits until data are sent to all nodes in a cluster. \n\nEnables or disables synchronous data insertion into a `Distributed` table.\n\nBy default, when inserting data into a Distributed table, the ClickHouse server sends data to cluster nodes in the background. When `distributed_foreground_insert` = 1, the data is processed synchronously, and the `INSERT` operation succeeds only after all the data is saved on all shards (at least one replica for each shard if `internal_replication` is true).", 0) ALIAS(insert_distributed_sync) \
M(UInt64, distributed_background_insert_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) ALIAS(insert_distributed_timeout) \ M(UInt64, distributed_background_insert_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) ALIAS(insert_distributed_timeout) \

View File

@ -68,6 +68,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"enable_named_columns_in_function_tuple", false, true, "Generate named tuples in function tuple() when all names are unique and can be treated as unquoted identifiers."}, {"enable_named_columns_in_function_tuple", false, true, "Generate named tuples in function tuple() when all names are unique and can be treated as unquoted identifiers."},
{"input_format_json_ignore_key_case", false, false, "Ignore json key case while read json field from string."}, {"input_format_json_ignore_key_case", false, false, "Ignore json key case while read json field from string."},
{"optimize_trivial_insert_select", true, false, "The optimization does not make sense in many cases."}, {"optimize_trivial_insert_select", true, false, "The optimization does not make sense in many cases."},
{"dictionary_validate_primary_key_type", false, false, "Validate primary key type for dictionaries. By default id type for simple layouts will be implicitly converted to UInt64."},
{"collect_hash_table_stats_during_joins", false, true, "New setting."}, {"collect_hash_table_stats_during_joins", false, true, "New setting."},
{"max_size_to_preallocate_for_joins", 0, 100'000'000, "New setting."}, {"max_size_to_preallocate_for_joins", 0, 100'000'000, "New setting."},
{"input_format_orc_read_use_writer_time_zone", false, false, "Whether use the writer's time zone in ORC stripe for ORC row reader, the default ORC row reader's time zone is GMT."}, {"input_format_orc_read_use_writer_time_zone", false, false, "Whether use the writer's time zone in ORC stripe for ORC row reader, the default ORC row reader's time zone is GMT."},

View File

@ -20,7 +20,6 @@
#endif #endif
#include <cerrno> #include <cerrno>
#include <cstring> #include <cstring>
#include <csignal>
#include <unistd.h> #include <unistd.h>
#include <algorithm> #include <algorithm>
@ -37,6 +36,7 @@
#include <Poco/Pipe.h> #include <Poco/Pipe.h>
#include <Common/ErrorHandlers.h> #include <Common/ErrorHandlers.h>
#include <Common/SignalHandlers.h>
#include <base/argsToConfig.h> #include <base/argsToConfig.h>
#include <base/getThreadId.h> #include <base/getThreadId.h>
#include <base/coverage.h> #include <base/coverage.h>
@ -54,7 +54,6 @@
#include <Common/Config/ConfigProcessor.h> #include <Common/Config/ConfigProcessor.h>
#include <Common/SymbolIndex.h> #include <Common/SymbolIndex.h>
#include <Common/getExecutablePath.h> #include <Common/getExecutablePath.h>
#include <Common/getHashOfLoadedBinary.h>
#include <Common/Elf.h> #include <Common/Elf.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -79,8 +78,6 @@ namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int CANNOT_SEND_SIGNAL;
extern const int SYSTEM_ERROR; extern const int SYSTEM_ERROR;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
@ -88,114 +85,6 @@ namespace DB
using namespace DB; using namespace DB;
PipeFDs signal_pipe;
/** Reset signal handler to the default and send signal to itself.
* It's called from user signal handler to write core dump.
*/
static void call_default_signal_handler(int sig)
{
if (SIG_ERR == signal(sig, SIG_DFL))
throw ErrnoException(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler");
if (0 != raise(sig))
throw ErrnoException(ErrorCodes::CANNOT_SEND_SIGNAL, "Cannot send signal");
}
static const size_t signal_pipe_buf_size =
sizeof(int)
+ sizeof(siginfo_t)
+ sizeof(ucontext_t*)
+ sizeof(StackTrace)
+ sizeof(UInt64)
+ sizeof(UInt32)
+ sizeof(void*);
using signal_function = void(int, siginfo_t*, void*);
static void writeSignalIDtoSignalPipe(int sig)
{
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
char buf[signal_pipe_buf_size];
WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf);
writeBinary(sig, out);
out.next();
errno = saved_errno;
}
/** Signal handler for HUP */
static void closeLogsSignalHandler(int sig, siginfo_t *, void *)
{
DENY_ALLOCATIONS_IN_SCOPE;
writeSignalIDtoSignalPipe(sig);
}
static void terminateRequestedSignalHandler(int sig, siginfo_t *, void *)
{
DENY_ALLOCATIONS_IN_SCOPE;
writeSignalIDtoSignalPipe(sig);
}
static std::atomic_flag fatal_error_printed;
/** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log.
*/
static void signalHandler(int sig, siginfo_t * info, void * context)
{
if (asynchronous_stack_unwinding && sig == SIGSEGV)
siglongjmp(asynchronous_stack_unwinding_signal_jump_buffer, 1);
DENY_ALLOCATIONS_IN_SCOPE;
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
char buf[signal_pipe_buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf);
const ucontext_t * signal_context = reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(*signal_context);
#if USE_GWP_ASAN
if (const auto fault_address = reinterpret_cast<uintptr_t>(info->si_addr);
GWPAsan::isGWPAsanError(fault_address))
GWPAsan::printReport(fault_address);
#endif
writeBinary(sig, out);
writePODBinary(*info, out);
writePODBinary(signal_context, out);
writePODBinary(stack_trace, out);
writeVectorBinary(Exception::enable_job_stack_trace ? Exception::thread_frame_pointers : std::vector<StackTrace::FramePointers>{}, out);
writeBinary(static_cast<UInt32>(getThreadId()), out);
writePODBinary(current_thread, out);
out.next();
if (sig != SIGTSTP) /// This signal is used for debugging.
{
/// The time that is usually enough for separate thread to print info into log.
/// Under MSan full stack unwinding with DWARF info about inline functions takes 101 seconds in one case.
for (size_t i = 0; i < 300; ++i)
{
/// We will synchronize with the thread printing the messages with an atomic variable to finish earlier.
if (fatal_error_printed.test())
break;
/// This coarse method of synchronization is perfectly ok for fatal signals.
sleepForSeconds(1);
}
/// Wait for all logs flush operations
sleepForSeconds(3);
call_default_signal_handler(sig);
}
errno = saved_errno;
}
static bool getenvBool(const char * name) static bool getenvBool(const char * name)
{ {
@ -207,445 +96,6 @@ static bool getenvBool(const char * name)
} }
/// Avoid link time dependency on DB/Interpreters - will use this function only when linked.
__attribute__((__weak__)) void collectCrashLog(
Int32 signal, UInt64 thread_id, const String & query_id, const StackTrace & stack_trace);
/** The thread that read info about signal or std::terminate from pipe.
* On HUP, close log files (for new files to be opened later).
* On information about std::terminate, write it to log.
* On other signals, write info to log.
*/
class SignalListener : public Poco::Runnable
{
public:
static constexpr int StdTerminate = -1;
static constexpr int StopThread = -2;
static constexpr int SanitizerTrap = -3;
explicit SignalListener(BaseDaemon & daemon_)
: log(getLogger("BaseDaemon"))
, daemon(daemon_)
{
}
void run() override
{
static_assert(PIPE_BUF >= 512);
static_assert(signal_pipe_buf_size <= PIPE_BUF, "Only write of PIPE_BUF to pipe is atomic and the minimal known PIPE_BUF across supported platforms is 512");
char buf[signal_pipe_buf_size];
ReadBufferFromFileDescriptor in(signal_pipe.fds_rw[0], signal_pipe_buf_size, buf);
while (!in.eof())
{
int sig = 0;
readBinary(sig, in);
// We may log some specific signals afterwards, with different log
// levels and more info, but for completeness we log all signals
// here at trace level.
// Don't use strsignal here, because it's not thread-safe.
LOG_TRACE(log, "Received signal {}", sig);
if (sig == StopThread)
{
LOG_INFO(log, "Stop SignalListener thread");
break;
}
else if (sig == SIGHUP)
{
LOG_DEBUG(log, "Received signal to close logs.");
BaseDaemon::instance().closeLogs(BaseDaemon::instance().logger());
LOG_INFO(log, "Opened new log file after received signal.");
}
else if (sig == StdTerminate)
{
UInt32 thread_num;
std::string message;
readBinary(thread_num, in);
readBinary(message, in);
onTerminate(message, thread_num);
}
else if (sig == SIGINT ||
sig == SIGQUIT ||
sig == SIGTERM)
{
daemon.handleSignal(sig);
}
else
{
siginfo_t info{};
ucontext_t * context{};
StackTrace stack_trace(NoCapture{});
std::vector<StackTrace::FramePointers> thread_frame_pointers;
UInt32 thread_num{};
ThreadStatus * thread_ptr{};
if (sig != SanitizerTrap)
{
readPODBinary(info, in);
readPODBinary(context, in);
}
readPODBinary(stack_trace, in);
if (sig != SanitizerTrap)
readVectorBinary(thread_frame_pointers, in);
readBinary(thread_num, in);
readPODBinary(thread_ptr, in);
/// This allows to receive more signals if failure happens inside onFault function.
/// Example: segfault while symbolizing stack trace.
try
{
std::thread([=, this] { onFault(sig, info, context, stack_trace, thread_frame_pointers, thread_num, thread_ptr); }).detach();
}
catch (...)
{
/// Likely cannot allocate thread
onFault(sig, info, context, stack_trace, thread_frame_pointers, thread_num, thread_ptr);
}
}
}
}
private:
LoggerPtr log;
BaseDaemon & daemon;
void onTerminate(std::string_view message, UInt32 thread_num) const
{
size_t pos = message.find('\n');
LOG_FATAL(log, "(version {}{}, build id: {}, git hash: {}) (from thread {}) {}",
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id, daemon.git_hash, thread_num, message.substr(0, pos));
/// Print trace from std::terminate exception line-by-line to make it easy for grep.
while (pos != std::string_view::npos)
{
++pos;
size_t next_pos = message.find('\n', pos);
size_t size = next_pos;
if (next_pos != std::string_view::npos)
size = next_pos - pos;
LOG_FATAL(log, fmt::runtime(message.substr(pos, size)));
pos = next_pos;
}
}
void onFault(
int sig,
const siginfo_t & info,
ucontext_t * context,
const StackTrace & stack_trace,
const std::vector<StackTrace::FramePointers> & thread_frame_pointers,
UInt32 thread_num,
ThreadStatus * thread_ptr) const
try
{
ThreadStatus thread_status;
/// First log those fields that are safe to access and that should not cause new fault.
/// That way we will have some duplicated info in the log but we don't loose important info
/// in case of double fault.
LOG_FATAL(log, "########## Short fault info ############");
LOG_FATAL(log, "(version {}{}, build id: {}, git hash: {}) (from thread {}) Received signal {}",
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id, daemon.git_hash,
thread_num, sig);
std::string signal_description = "Unknown signal";
/// Some of these are not really signals, but our own indications on failure reason.
if (sig == StdTerminate)
signal_description = "std::terminate";
else if (sig == SanitizerTrap)
signal_description = "sanitizer trap";
else if (sig >= 0)
signal_description = strsignal(sig); // NOLINT(concurrency-mt-unsafe) // it is not thread-safe but ok in this context
LOG_FATAL(log, "Signal description: {}", signal_description);
String error_message;
if (sig != SanitizerTrap)
error_message = signalToErrorMessage(sig, info, *context);
else
error_message = "Sanitizer trap.";
LOG_FATAL(log, fmt::runtime(error_message));
String bare_stacktrace_str;
if (stack_trace.getSize())
{
/// Write bare stack trace (addresses) just in case if we will fail to print symbolized stack trace.
/// NOTE: This still require memory allocations and mutex lock inside logger.
/// BTW we can also print it to stderr using write syscalls.
WriteBufferFromOwnString bare_stacktrace;
writeString("Stack trace:", bare_stacktrace);
for (size_t i = stack_trace.getOffset(); i < stack_trace.getSize(); ++i)
{
writeChar(' ', bare_stacktrace);
writePointerHex(stack_trace.getFramePointers()[i], bare_stacktrace);
}
LOG_FATAL(log, fmt::runtime(bare_stacktrace.str()));
bare_stacktrace_str = bare_stacktrace.str();
}
/// Now try to access potentially unsafe data in thread_ptr.
String query_id;
String query;
/// Send logs from this thread to client if possible.
/// It will allow client to see failure messages directly.
if (thread_ptr)
{
query_id = thread_ptr->getQueryId();
query = thread_ptr->getQueryForLog();
if (auto logs_queue = thread_ptr->getInternalTextLogsQueue())
{
CurrentThread::attachInternalTextLogsQueue(logs_queue, LogsLevel::trace);
}
}
LOG_FATAL(log, "########################################");
if (query_id.empty())
{
LOG_FATAL(log, "(version {}{}, build id: {}, git hash: {}) (from thread {}) (no query) Received signal {} ({})",
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id, daemon.git_hash,
thread_num, signal_description, sig);
}
else
{
LOG_FATAL(log, "(version {}{}, build id: {}, git hash: {}) (from thread {}) (query_id: {}) (query: {}) Received signal {} ({})",
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id, daemon.git_hash,
thread_num, query_id, query, signal_description, sig);
}
LOG_FATAL(log, fmt::runtime(error_message));
if (!bare_stacktrace_str.empty())
{
LOG_FATAL(log, fmt::runtime(bare_stacktrace_str));
}
/// Write symbolized stack trace line by line for better grep-ability.
stack_trace.toStringEveryLine([&](std::string_view s) { LOG_FATAL(log, fmt::runtime(s)); });
/// In case it's a scheduled job write all previous jobs origins call stacks
std::for_each(thread_frame_pointers.rbegin(), thread_frame_pointers.rend(),
[this](const StackTrace::FramePointers & frame_pointers)
{
if (size_t size = std::ranges::find(frame_pointers, nullptr) - frame_pointers.begin())
{
LOG_FATAL(log, "========================================");
WriteBufferFromOwnString bare_stacktrace;
writeString("Job's origin stack trace:", bare_stacktrace);
std::for_each_n(frame_pointers.begin(), size,
[&bare_stacktrace](const void * ptr)
{
writeChar(' ', bare_stacktrace);
writePointerHex(ptr, bare_stacktrace);
}
);
LOG_FATAL(log, fmt::runtime(bare_stacktrace.str()));
StackTrace::toStringEveryLine(const_cast<void **>(frame_pointers.data()), 0, size, [this](std::string_view s) { LOG_FATAL(log, fmt::runtime(s)); });
}
}
);
#if defined(OS_LINUX)
/// Write information about binary checksum. It can be difficult to calculate, so do it only after printing stack trace.
/// Please keep the below log messages in-sync with the ones in programs/server/Server.cpp
if (daemon.stored_binary_hash.empty())
{
LOG_FATAL(log, "Integrity check of the executable skipped because the reference checksum could not be read.");
}
else
{
String calculated_binary_hash = getHashOfLoadedBinaryHex();
if (calculated_binary_hash == daemon.stored_binary_hash)
{
LOG_FATAL(log, "Integrity check of the executable successfully passed (checksum: {})", calculated_binary_hash);
}
else
{
LOG_FATAL(
log,
"Calculated checksum of the executable ({0}) does not correspond"
" to the reference checksum stored in the executable ({1})."
" This may indicate one of the following:"
" - the executable was changed just after startup;"
" - the executable was corrupted on disk due to faulty hardware;"
" - the loaded executable was corrupted in memory due to faulty hardware;"
" - the file was intentionally modified;"
" - a logical error in the code.",
calculated_binary_hash,
daemon.stored_binary_hash);
}
}
#endif
/// Write crash to system.crash_log table if available.
if (collectCrashLog)
collectCrashLog(sig, thread_num, query_id, stack_trace);
Context::getGlobalContextInstance()->handleCrash();
/// Send crash report to developers (if configured)
if (sig != SanitizerTrap)
{
if (auto * sentry = SentryWriter::getInstance())
sentry->onSignal(sig, error_message, stack_trace.getFramePointers(), stack_trace.getOffset(), stack_trace.getSize());
/// Advice the user to send it manually.
if (std::string_view(VERSION_OFFICIAL).contains("official build"))
{
const auto & date_lut = DateLUT::instance();
/// Approximate support period, upper bound.
if (time(nullptr) - date_lut.makeDate(2000 + VERSION_MAJOR, VERSION_MINOR, 1) < (365 + 30) * 86400)
{
LOG_FATAL(log, "Report this error to https://github.com/ClickHouse/ClickHouse/issues");
}
else
{
LOG_FATAL(log, "ClickHouse version {} is old and should be upgraded to the latest version.", VERSION_STRING);
}
}
else
{
LOG_FATAL(log, "This ClickHouse version is not official and should be upgraded to the official build.");
}
}
/// List changed settings.
if (!query_id.empty())
{
ContextPtr query_context = thread_ptr->getQueryContext();
if (query_context)
{
String changed_settings = query_context->getSettingsRef().toString();
if (changed_settings.empty())
LOG_FATAL(log, "No settings were changed");
else
LOG_FATAL(log, "Changed settings: {}", changed_settings);
}
}
/// When everything is done, we will try to send these error messages to the client.
if (thread_ptr)
thread_ptr->onFatalError();
fatal_error_printed.test_and_set();
}
catch (...)
{
/// onFault is called from the std::thread, and it should catch all exceptions; otherwise, you can get unrelated fatal errors.
PreformattedMessage message = getCurrentExceptionMessageAndPattern(true);
LOG_FATAL(getLogger(__PRETTY_FUNCTION__), message);
}
};
#if defined(SANITIZER)
template <typename T>
struct ValueHolder
{
ValueHolder(T value_) : value(value_)
{}
T value;
};
extern "C" void __sanitizer_set_death_callback(void (*)());
/// Sanitizers may not expect some function calls from death callback.
/// Let's try to disable instrumentation to avoid possible issues.
/// However, this callback may call other functions that are still instrumented.
/// We can try [[clang::always_inline]] attribute for statements in future (available in clang-15)
/// See https://github.com/google/sanitizers/issues/1543 and https://github.com/google/sanitizers/issues/1549.
static DISABLE_SANITIZER_INSTRUMENTATION void sanitizerDeathCallback()
{
DENY_ALLOCATIONS_IN_SCOPE;
/// Also need to send data via pipe. Otherwise it may lead to deadlocks or failures in printing diagnostic info.
char buf[signal_pipe_buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf);
const StackTrace stack_trace;
writeBinary(SignalListener::SanitizerTrap, out);
writePODBinary(stack_trace, out);
/// We create a dummy struct with a constructor so DISABLE_SANITIZER_INSTRUMENTATION is not applied to it
/// otherwise, Memory sanitizer can't know that values initiialized inside this function are actually initialized
/// because instrumentations are disabled leading to false positives later on
ValueHolder<UInt32> thread_id{static_cast<UInt32>(getThreadId())};
writeBinary(thread_id.value, out);
writePODBinary(current_thread, out);
out.next();
/// The time that is usually enough for separate thread to print info into log.
sleepForSeconds(20);
}
#endif
/** To use with std::set_terminate.
* Collects slightly more info than __gnu_cxx::__verbose_terminate_handler,
* and send it to pipe. Other thread will read this info from pipe and asynchronously write it to log.
* Look at libstdc++-v3/libsupc++/vterminate.cc for example.
*/
[[noreturn]] static void terminate_handler()
{
static thread_local bool terminating = false;
if (terminating)
abort();
terminating = true;
std::string log_message;
if (std::current_exception())
log_message = "Terminate called for uncaught exception:\n" + getCurrentExceptionMessage(true);
else
log_message = "Terminate called without an active exception";
/// POSIX.1 says that write(2)s of less than PIPE_BUF bytes must be atomic - man 7 pipe
/// And the buffer should not be too small because our exception messages can be large.
static constexpr size_t buf_size = PIPE_BUF;
if (log_message.size() > buf_size - 16)
log_message.resize(buf_size - 16);
char buf[buf_size];
WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], buf_size, buf);
writeBinary(static_cast<int>(SignalListener::StdTerminate), out);
writeBinary(static_cast<UInt32>(getThreadId()), out);
writeBinary(log_message, out);
out.next();
abort();
}
static std::string createDirectory(const std::string & file) static std::string createDirectory(const std::string & file)
{ {
fs::path path = fs::path(file).parent_path(); fs::path path = fs::path(file).parent_path();
@ -698,22 +148,7 @@ BaseDaemon::~BaseDaemon()
{ {
writeSignalIDtoSignalPipe(SignalListener::StopThread); writeSignalIDtoSignalPipe(SignalListener::StopThread);
signal_listener_thread.join(); signal_listener_thread.join();
/// Reset signals to SIG_DFL to avoid trying to write to the signal_pipe that will be closed after. HandledSignals::instance().reset();
for (int sig : handled_signals)
if (SIG_ERR == signal(sig, SIG_DFL))
{
try
{
throw ErrnoException(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler");
}
catch (ErrnoException &)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
signal_pipe.close();
SentryWriter::resetInstance(); SentryWriter::resetInstance();
} }
@ -752,6 +187,8 @@ void BaseDaemon::closeFDs()
#else #else
fs::path proc_path{"/proc/self/fd"}; fs::path proc_path{"/proc/self/fd"};
#endif #endif
const auto & signal_pipe = HandledSignals::instance().signal_pipe;
if (fs::is_directory(proc_path)) /// Hooray, proc exists if (fs::is_directory(proc_path)) /// Hooray, proc exists
{ {
/// in /proc/self/fd directory filenames are numeric file descriptors. /// in /proc/self/fd directory filenames are numeric file descriptors.
@ -972,56 +409,6 @@ void BaseDaemon::initialize(Application & self)
} }
static void addSignalHandler(const std::vector<int> & signals, signal_function handler, std::vector<int> * out_handled_signals)
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_sigaction = handler;
sa.sa_flags = SA_SIGINFO;
#if defined(OS_DARWIN)
sigemptyset(&sa.sa_mask);
for (auto signal : signals)
sigaddset(&sa.sa_mask, signal);
#else
if (sigemptyset(&sa.sa_mask))
throw Poco::Exception("Cannot set signal handler.");
for (auto signal : signals)
if (sigaddset(&sa.sa_mask, signal))
throw Poco::Exception("Cannot set signal handler.");
#endif
for (auto signal : signals)
if (sigaction(signal, &sa, nullptr))
throw Poco::Exception("Cannot set signal handler.");
if (out_handled_signals)
std::copy(signals.begin(), signals.end(), std::back_inserter(*out_handled_signals));
}
static void blockSignals(const std::vector<int> & signals)
{
sigset_t sig_set;
#if defined(OS_DARWIN)
sigemptyset(&sig_set);
for (auto signal : signals)
sigaddset(&sig_set, signal);
#else
if (sigemptyset(&sig_set))
throw Poco::Exception("Cannot block signal.");
for (auto signal : signals)
if (sigaddset(&sig_set, signal))
throw Poco::Exception("Cannot block signal.");
#endif
if (pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
throw Poco::Exception("Cannot block signal.");
}
extern const char * GIT_HASH; extern const char * GIT_HASH;
void BaseDaemon::initializeTerminationAndSignalProcessing() void BaseDaemon::initializeTerminationAndSignalProcessing()
@ -1045,29 +432,21 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
}; };
} }
} }
std::set_terminate(terminate_handler);
/// We want to avoid SIGPIPE when working with sockets and pipes, and just handle return value/errno instead. /// We want to avoid SIGPIPE when working with sockets and pipes, and just handle return value/errno instead.
blockSignals({SIGPIPE}); blockSignals({SIGPIPE});
/// Setup signal handlers. /// Setup signal handlers.
/// SIGTSTP is added for debugging purposes. To output a stack trace of any running thread at anytime. HandledSignals::instance().setupTerminateHandler();
addSignalHandler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP, SIGTRAP}, signalHandler, &handled_signals); HandledSignals::instance().setupCommonDeadlySignalHandlers();
addSignalHandler({SIGHUP}, closeLogsSignalHandler, &handled_signals); HandledSignals::instance().setupCommonTerminateRequestSignalHandlers();
addSignalHandler({SIGINT, SIGQUIT, SIGTERM}, terminateRequestedSignalHandler, &handled_signals); HandledSignals::instance().addSignalHandler({SIGHUP}, closeLogsSignalHandler, true);
#if defined(SANITIZER)
__sanitizer_set_death_callback(sanitizerDeathCallback);
#endif
/// Set up Poco ErrorHandler for Poco Threads. /// Set up Poco ErrorHandler for Poco Threads.
static KillingErrorHandler killing_error_handler; static KillingErrorHandler killing_error_handler;
Poco::ErrorHandler::set(&killing_error_handler); Poco::ErrorHandler::set(&killing_error_handler);
signal_pipe.setNonBlockingWrite(); signal_listener = std::make_unique<SignalListener>(this, getLogger("BaseDaemon"));
signal_pipe.tryIncreaseSize(1 << 20);
signal_listener = std::make_unique<SignalListener>(*this);
signal_listener_thread.start(*signal_listener); signal_listener_thread.start(*signal_listener);
#if defined(__ELF__) && !defined(OS_FREEBSD) #if defined(__ELF__) && !defined(OS_FREEBSD)
@ -1273,7 +652,7 @@ void BaseDaemon::setupWatchdog()
/// Forward signals to the child process. /// Forward signals to the child process.
if (forward_signals) if (forward_signals)
{ {
addSignalHandler( HandledSignals::instance().addSignalHandler(
{SIGHUP, SIGINT, SIGQUIT, SIGTERM}, {SIGHUP, SIGINT, SIGQUIT, SIGTERM},
[](int sig, siginfo_t *, void *) [](int sig, siginfo_t *, void *)
{ {
@ -1289,7 +668,7 @@ void BaseDaemon::setupWatchdog()
(void)res; (void)res;
} }
}, },
nullptr); false);
} }
else else
{ {

View File

@ -168,8 +168,6 @@ protected:
String git_hash; String git_hash;
String stored_binary_hash; String stored_binary_hash;
std::vector<int> handled_signals;
bool should_setup_watchdog = false; bool should_setup_watchdog = false;
char * argv0 = nullptr; char * argv0 = nullptr;
}; };

View File

@ -105,7 +105,7 @@ ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const String & table_name, Co
return {}; return {};
} }
auto names_and_types = StorageDictionary::getNamesAndTypes(ExternalDictionariesLoader::getDictionaryStructure(*load_result.config)); auto names_and_types = StorageDictionary::getNamesAndTypes(ExternalDictionariesLoader::getDictionaryStructure(*load_result.config), false);
buffer << "CREATE TABLE " << backQuoteIfNeed(getDatabaseName()) << '.' << backQuoteIfNeed(table_name) << " ("; buffer << "CREATE TABLE " << backQuoteIfNeed(getDatabaseName()) << '.' << backQuoteIfNeed(table_name) << " (";
buffer << names_and_types.toNamesAndTypesDescription(); buffer << names_and_types.toNamesAndTypesDescription();
buffer << ") Engine = Dictionary(" << backQuoteIfNeed(table_name) << ")"; buffer << ") Engine = Dictionary(" << backQuoteIfNeed(table_name) << ")";

View File

@ -58,15 +58,6 @@ std::optional<AttributeUnderlyingType> tryGetAttributeUnderlyingType(TypeIndex i
} }
DictionarySpecialAttribute::DictionarySpecialAttribute(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
: name{config.getString(config_prefix + ".name", "")}, expression{config.getString(config_prefix + ".expression", "")}
{
if (name.empty() && !expression.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Element {}.name is empty", config_prefix);
}
DictionaryStructure::DictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) DictionaryStructure::DictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{ {
std::string structure_prefix = config_prefix + ".structure"; std::string structure_prefix = config_prefix + ".structure";
@ -79,7 +70,8 @@ DictionaryStructure::DictionaryStructure(const Poco::Util::AbstractConfiguration
if (has_id) if (has_id)
{ {
id.emplace(config, structure_prefix + ".id"); static constexpr auto id_default_type = "UInt64";
id.emplace(makeDictionaryTypedSpecialAttribute(config, structure_prefix + ".id", id_default_type));
} }
else if (has_key) else if (has_key)
{ {

View File

@ -89,14 +89,6 @@ constexpr void callOnDictionaryAttributeType(AttributeUnderlyingType type, F &&
}); });
} }
struct DictionarySpecialAttribute final
{
const std::string name;
const std::string expression;
DictionarySpecialAttribute(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
};
struct DictionaryTypedSpecialAttribute final struct DictionaryTypedSpecialAttribute final
{ {
const std::string name; const std::string name;
@ -108,7 +100,7 @@ struct DictionaryTypedSpecialAttribute final
/// Name of identifier plus list of attributes /// Name of identifier plus list of attributes
struct DictionaryStructure final struct DictionaryStructure final
{ {
std::optional<DictionarySpecialAttribute> id; std::optional<DictionaryTypedSpecialAttribute> id;
std::optional<std::vector<DictionaryAttribute>> key; std::optional<std::vector<DictionaryAttribute>> key;
std::vector<DictionaryAttribute> attributes; std::vector<DictionaryAttribute> attributes;
std::unordered_map<std::string, size_t> attribute_name_to_index; std::unordered_map<std::string, size_t> attribute_name_to_index;

View File

@ -382,6 +382,15 @@ void buildPrimaryKeyConfiguration(
name_element->appendChild(name); name_element->appendChild(name);
buildAttributeExpressionIfNeeded(doc, id_element, dict_attr); buildAttributeExpressionIfNeeded(doc, id_element, dict_attr);
if (!dict_attr->type)
return;
AutoPtr<Element> type_element(doc->createElement("type"));
id_element->appendChild(type_element);
AutoPtr<Text> type(doc->createTextNode(queryToString(dict_attr->type)));
type_element->appendChild(type);
} }
else else
{ {

View File

@ -134,7 +134,7 @@ Chunk Squashing::squash(std::vector<Chunk> && input_chunks, Chunk::ChunkInfoColl
Chunk result; Chunk result;
result.setColumns(std::move(mutable_columns), rows); result.setColumns(std::move(mutable_columns), rows);
result.setChunkInfos(infos); result.setChunkInfos(infos);
result.getChunkInfos().append(std::move(input_chunks.back().getChunkInfos())); result.getChunkInfos().appendIfUniq(std::move(input_chunks.back().getChunkInfos()));
chassert(result); chassert(result);
return result; return result;

View File

@ -1,5 +1,6 @@
#include <Parsers/isUnquotedIdentifier.h> #include <Parsers/isUnquotedIdentifier.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/Lexer.h> #include <Parsers/Lexer.h>
namespace DB namespace DB
@ -7,6 +8,18 @@ namespace DB
bool isUnquotedIdentifier(const String & name) bool isUnquotedIdentifier(const String & name)
{ {
auto is_keyword = [&name](Keyword keyword)
{
auto s = toStringView(keyword);
if (name.size() != s.size())
return false;
return strncasecmp(s.data(), name.data(), s.size()) == 0;
};
/// Special keywords are parsed as literals instead of identifiers.
if (is_keyword(Keyword::NULL_KEYWORD) || is_keyword(Keyword::TRUE_KEYWORD) || is_keyword(Keyword::FALSE_KEYWORD))
return false;
Lexer lexer(name.data(), name.data() + name.size()); Lexer lexer(name.data(), name.data() + name.size());
auto maybe_ident = lexer.nextToken(); auto maybe_ident = lexer.nextToken();

View File

@ -5,6 +5,14 @@
namespace DB namespace DB
{ {
/// Checks if the input string @name is a valid unquoted identifier.
///
/// Example Usage:
/// abc -> true (valid unquoted identifier)
/// 123 -> false (identifiers cannot start with digits)
/// `123` -> false (quoted identifiers are not considered)
/// `abc` -> false (quoted identifiers are not considered)
/// null -> false (reserved literal keyword)
bool isUnquotedIdentifier(const String & name); bool isUnquotedIdentifier(const String & name);
} }

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
void RestoreChunkInfosTransform::transform(Chunk & chunk) void RestoreChunkInfosTransform::transform(Chunk & chunk)
{ {
chunk.getChunkInfos().append(chunk_infos.clone()); chunk.getChunkInfos().appendIfUniq(chunk_infos.clone());
} }
namespace DeduplicationToken namespace DeduplicationToken

View File

@ -652,15 +652,12 @@ size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
return checksum->second.file_size; return checksum->second.file_size;
} }
String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(const NamesAndTypesList & available_columns) const
{ {
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns(with_subcolumns);
auto columns_list = columns_description.get(options);
std::optional<std::string> minimum_size_column; std::optional<std::string> minimum_size_column;
UInt64 minimum_size = std::numeric_limits<UInt64>::max(); UInt64 minimum_size = std::numeric_limits<UInt64>::max();
for (const auto & column : columns_list) for (const auto & column : available_columns)
{ {
if (!hasColumnFiles(column)) if (!hasColumnFiles(column))
continue; continue;

View File

@ -196,7 +196,9 @@ public:
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()). /// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column. /// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const; /// We pass a list of available columns since the ones available in the current storage snapshot might be smaller
/// than the one the table has (e.g a DROP COLUMN happened) and we don't want to get a column not in the snapshot
String getColumnNameWithMinimumCompressedSize(const NamesAndTypesList & available_columns) const;
bool contains(const IMergeTreeDataPart & other) const { return info.contains(other.info); } bool contains(const IMergeTreeDataPart & other) const { return info.contains(other.info); }

View File

@ -47,7 +47,7 @@ public:
virtual std::optional<size_t> getColumnPosition(const String & column_name) const = 0; virtual std::optional<size_t> getColumnPosition(const String & column_name) const = 0;
virtual String getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const = 0; virtual String getColumnNameWithMinimumCompressedSize(const NamesAndTypesList & available_columns) const = 0;
virtual const MergeTreeDataPartChecksums & getChecksums() const = 0; virtual const MergeTreeDataPartChecksums & getChecksums() const = 0;

View File

@ -36,7 +36,10 @@ public:
AlterConversionsPtr getAlterConversions() const override { return alter_conversions; } AlterConversionsPtr getAlterConversions() const override { return alter_conversions; }
String getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const override { return data_part->getColumnNameWithMinimumCompressedSize(with_subcolumns); } String getColumnNameWithMinimumCompressedSize(const NamesAndTypesList & available_columns) const override
{
return data_part->getColumnNameWithMinimumCompressedSize(available_columns);
}
const MergeTreeDataPartChecksums & getChecksums() const override { return data_part->checksums; } const MergeTreeDataPartChecksums & getChecksums() const override { return data_part->checksums; }

View File

@ -127,7 +127,8 @@ NameSet injectRequiredColumns(
*/ */
if (!have_at_least_one_physical_column) if (!have_at_least_one_physical_column)
{ {
const auto minimum_size_column_name = data_part_info_for_reader.getColumnNameWithMinimumCompressedSize(with_subcolumns); auto available_columns = storage_snapshot->metadata->getColumns().get(options);
const auto minimum_size_column_name = data_part_info_for_reader.getColumnNameWithMinimumCompressedSize(available_columns);
columns.push_back(minimum_size_column_name); columns.push_back(minimum_size_column_name);
/// correctly report added column /// correctly report added column
injected_columns.insert(columns.back()); injected_columns.insert(columns.back());

View File

@ -266,10 +266,13 @@ void MergeTreeDataPartWide::doCheckConsistency(bool require_part_metadata) const
bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const
{ {
auto serialization = tryGetSerialization(column.name);
if (!serialization)
return false;
auto marks_file_extension = index_granularity_info.mark_type.getFileExtension(); auto marks_file_extension = index_granularity_info.mark_type.getFileExtension();
bool res = true; bool res = true;
getSerialization(column.name)->enumerateStreams([&](const auto & substream_path) serialization->enumerateStreams([&](const auto & substream_path)
{ {
auto stream_name = getStreamNameForColumn(column, substream_path, checksums); auto stream_name = getStreamNameForColumn(column, substream_path, checksums);
if (!stream_name || !checksums.files.contains(*stream_name + marks_file_extension)) if (!stream_name || !checksums.files.contains(*stream_name + marks_file_extension))

View File

@ -315,6 +315,12 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
for (size_t i = 0; i < async_insert_info_with_partition.size(); ++i) for (size_t i = 0; i < async_insert_info_with_partition.size(); ++i)
{ {
if (async_insert_info_with_partition[i] == nullptr)
{
LOG_ERROR(getLogger("MergeTreeDataWriter"), "The {}th element in async_insert_info_with_partition is nullptr. There are totally {} partitions in the insert. Selector content is {}",
i, partitions_count, fmt::join(selector.begin(), selector.end(), ","));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error for async deduplicated insert, please check error logs");
}
result[i].offsets = std::move(async_insert_info_with_partition[i]->offsets); result[i].offsets = std::move(async_insert_info_with_partition[i]->offsets);
result[i].tokens = std::move(async_insert_info_with_partition[i]->tokens); result[i].tokens = std::move(async_insert_info_with_partition[i]->tokens);
} }

View File

@ -10,6 +10,7 @@
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Common/Config/ConfigHelper.h> #include <Common/Config/ConfigHelper.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Core/Settings.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h> #include <Dictionaries/getDictionaryConfigurationFromAST.h>
@ -26,13 +27,14 @@ namespace ErrorCodes
extern const int CANNOT_DETACH_DICTIONARY_AS_TABLE; extern const int CANNOT_DETACH_DICTIONARY_AS_TABLE;
extern const int DICTIONARY_ALREADY_EXISTS; extern const int DICTIONARY_ALREADY_EXISTS;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
} }
namespace namespace
{ {
void checkNamesAndTypesCompatibleWithDictionary(const String & dictionary_name, const ColumnsDescription & columns, const DictionaryStructure & dictionary_structure) void checkNamesAndTypesCompatibleWithDictionary(const String & dictionary_name, const ColumnsDescription & columns, const DictionaryStructure & dictionary_structure)
{ {
auto dictionary_names_and_types = StorageDictionary::getNamesAndTypes(dictionary_structure); auto dictionary_names_and_types = StorageDictionary::getNamesAndTypes(dictionary_structure, false);
std::set<NameAndTypePair> names_and_types_set(dictionary_names_and_types.begin(), dictionary_names_and_types.end()); std::set<NameAndTypePair> names_and_types_set(dictionary_names_and_types.begin(), dictionary_names_and_types.end());
for (const auto & column : columns.getOrdinary()) for (const auto & column : columns.getOrdinary())
@ -48,13 +50,17 @@ namespace
} }
NamesAndTypesList StorageDictionary::getNamesAndTypes(const DictionaryStructure & dictionary_structure) NamesAndTypesList StorageDictionary::getNamesAndTypes(const DictionaryStructure & dictionary_structure, bool validate_id_type)
{ {
NamesAndTypesList dictionary_names_and_types; NamesAndTypesList dictionary_names_and_types;
if (dictionary_structure.id) if (dictionary_structure.id)
dictionary_names_and_types.emplace_back(dictionary_structure.id->name, std::make_shared<DataTypeUInt64>()); {
if (validate_id_type && dictionary_structure.id->type->getTypeId() != TypeIndex::UInt64)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect type of ID column: must be UInt64, but it is {}", dictionary_structure.id->type->getFamilyName());
dictionary_names_and_types.emplace_back(dictionary_structure.id->name, std::make_shared<DataTypeUInt64>());
}
/// In old-style (XML) configuration we don't have this attributes in the /// In old-style (XML) configuration we don't have this attributes in the
/// main attribute list, so we have to add them to columns list explicitly. /// main attribute list, so we have to add them to columns list explicitly.
/// In the new configuration (DDL) we have them both in range_* nodes and /// In the new configuration (DDL) we have them both in range_* nodes and
@ -106,7 +112,7 @@ StorageDictionary::StorageDictionary(
Location location_, Location location_,
ContextPtr context_) ContextPtr context_)
: StorageDictionary( : StorageDictionary(
table_id_, dictionary_name_, ColumnsDescription{getNamesAndTypes(dictionary_structure_)}, comment, location_, context_) table_id_, dictionary_name_, ColumnsDescription{getNamesAndTypes(dictionary_structure_, context_->getSettingsRef().dictionary_validate_primary_key_type)}, comment, location_, context_)
{ {
} }

View File

@ -80,7 +80,7 @@ public:
std::shared_ptr<const IDictionary> getDictionary() const; std::shared_ptr<const IDictionary> getDictionary() const;
static NamesAndTypesList getNamesAndTypes(const DictionaryStructure & dictionary_structure); static NamesAndTypesList getNamesAndTypes(const DictionaryStructure & dictionary_structure, bool validate_id_type);
bool isDictionary() const override { return true; } bool isDictionary() const override { return true; }
void shutdown(bool is_drop) override; void shutdown(bool is_drop) override;

View File

@ -16,16 +16,6 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int DICTIONARIES_WAS_NOT_LOADED;
extern const int FUNCTION_NOT_ALLOWED;
extern const int NOT_IMPLEMENTED;
extern const int SUPPORT_IS_DISABLED;
extern const int ACCESS_DENIED;
extern const int DEPRECATED_FUNCTION;
};
enum class FunctionOrigin : int8_t enum class FunctionOrigin : int8_t
{ {
SYSTEM = 0, SYSTEM = 0,
@ -40,7 +30,6 @@ namespace
MutableColumns & res_columns, MutableColumns & res_columns,
const String & name, const String & name,
UInt64 is_aggregate, UInt64 is_aggregate,
std::optional<UInt64> is_deterministic,
const String & create_query, const String & create_query,
FunctionOrigin function_origin, FunctionOrigin function_origin,
const Factory & factory) const Factory & factory)
@ -48,58 +37,53 @@ namespace
res_columns[0]->insert(name); res_columns[0]->insert(name);
res_columns[1]->insert(is_aggregate); res_columns[1]->insert(is_aggregate);
if (!is_deterministic.has_value())
res_columns[2]->insertDefault();
else
res_columns[2]->insert(*is_deterministic);
if constexpr (std::is_same_v<Factory, UserDefinedSQLFunctionFactory> || std::is_same_v<Factory, UserDefinedExecutableFunctionFactory>) if constexpr (std::is_same_v<Factory, UserDefinedSQLFunctionFactory> || std::is_same_v<Factory, UserDefinedExecutableFunctionFactory>)
{ {
res_columns[3]->insert(false); res_columns[2]->insert(false);
res_columns[4]->insertDefault(); res_columns[3]->insertDefault();
} }
else else
{ {
res_columns[3]->insert(factory.isCaseInsensitive(name)); res_columns[2]->insert(factory.isCaseInsensitive(name));
if (factory.isAlias(name)) if (factory.isAlias(name))
res_columns[4]->insert(factory.aliasTo(name)); res_columns[3]->insert(factory.aliasTo(name));
else else
res_columns[4]->insertDefault(); res_columns[3]->insertDefault();
} }
res_columns[5]->insert(create_query); res_columns[4]->insert(create_query);
res_columns[6]->insert(static_cast<Int8>(function_origin)); res_columns[5]->insert(static_cast<Int8>(function_origin));
if constexpr (std::is_same_v<Factory, FunctionFactory>) if constexpr (std::is_same_v<Factory, FunctionFactory>)
{ {
if (factory.isAlias(name)) if (factory.isAlias(name))
{ {
res_columns[6]->insertDefault();
res_columns[7]->insertDefault(); res_columns[7]->insertDefault();
res_columns[8]->insertDefault(); res_columns[8]->insertDefault();
res_columns[9]->insertDefault(); res_columns[9]->insertDefault();
res_columns[10]->insertDefault(); res_columns[10]->insertDefault();
res_columns[11]->insertDefault(); res_columns[11]->insertDefault();
res_columns[12]->insertDefault();
} }
else else
{ {
auto documentation = factory.getDocumentation(name); auto documentation = factory.getDocumentation(name);
res_columns[7]->insert(documentation.description); res_columns[6]->insert(documentation.description);
res_columns[8]->insert(documentation.syntax); res_columns[7]->insert(documentation.syntax);
res_columns[9]->insert(documentation.argumentsAsString()); res_columns[8]->insert(documentation.argumentsAsString());
res_columns[10]->insert(documentation.returned_value); res_columns[9]->insert(documentation.returned_value);
res_columns[11]->insert(documentation.examplesAsString()); res_columns[10]->insert(documentation.examplesAsString());
res_columns[12]->insert(documentation.categoriesAsString()); res_columns[11]->insert(documentation.categoriesAsString());
} }
} }
else else
{ {
res_columns[6]->insertDefault();
res_columns[7]->insertDefault(); res_columns[7]->insertDefault();
res_columns[8]->insertDefault(); res_columns[8]->insertDefault();
res_columns[9]->insertDefault(); res_columns[9]->insertDefault();
res_columns[10]->insertDefault(); res_columns[10]->insertDefault();
res_columns[11]->insertDefault(); res_columns[11]->insertDefault();
res_columns[12]->insertDefault();
} }
} }
} }
@ -120,7 +104,6 @@ ColumnsDescription StorageSystemFunctions::getColumnsDescription()
{ {
{"name", std::make_shared<DataTypeString>(), "The name of the function."}, {"name", std::make_shared<DataTypeString>(), "The name of the function."},
{"is_aggregate", std::make_shared<DataTypeUInt8>(), "Whether the function is an aggregate function."}, {"is_aggregate", std::make_shared<DataTypeUInt8>(), "Whether the function is an aggregate function."},
{"is_deterministic", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "Whether the function is deterministic."},
{"case_insensitive", std::make_shared<DataTypeUInt8>(), "Whether the function name can be used case-insensitively."}, {"case_insensitive", std::make_shared<DataTypeUInt8>(), "Whether the function name can be used case-insensitively."},
{"alias_to", std::make_shared<DataTypeString>(), "The original function name, if the function name is an alias."}, {"alias_to", std::make_shared<DataTypeString>(), "The original function name, if the function name is an alias."},
{"create_query", std::make_shared<DataTypeString>(), "Obsolete."}, {"create_query", std::make_shared<DataTypeString>(), "Obsolete."},
@ -140,36 +123,14 @@ void StorageSystemFunctions::fillData(MutableColumns & res_columns, ContextPtr c
const auto & function_names = functions_factory.getAllRegisteredNames(); const auto & function_names = functions_factory.getAllRegisteredNames();
for (const auto & function_name : function_names) for (const auto & function_name : function_names)
{ {
std::optional<UInt64> is_deterministic; fillRow(res_columns, function_name, 0, "", FunctionOrigin::SYSTEM, functions_factory);
try
{
DO_NOT_UPDATE_ERROR_STATISTICS();
is_deterministic = functions_factory.tryGet(function_name, context)->isDeterministic();
}
catch (const Exception & e)
{
/// Some functions throw because they need special configuration or setup before use.
if (e.code() == ErrorCodes::DICTIONARIES_WAS_NOT_LOADED
|| e.code() == ErrorCodes::FUNCTION_NOT_ALLOWED
|| e.code() == ErrorCodes::NOT_IMPLEMENTED
|| e.code() == ErrorCodes::SUPPORT_IS_DISABLED
|| e.code() == ErrorCodes::ACCESS_DENIED
|| e.code() == ErrorCodes::DEPRECATED_FUNCTION)
{
/// Ignore exception, show is_deterministic = NULL.
}
else
throw;
}
fillRow(res_columns, function_name, 0, is_deterministic, "", FunctionOrigin::SYSTEM, functions_factory);
} }
const auto & aggregate_functions_factory = AggregateFunctionFactory::instance(); const auto & aggregate_functions_factory = AggregateFunctionFactory::instance();
const auto & aggregate_function_names = aggregate_functions_factory.getAllRegisteredNames(); const auto & aggregate_function_names = aggregate_functions_factory.getAllRegisteredNames();
for (const auto & function_name : aggregate_function_names) for (const auto & function_name : aggregate_function_names)
{ {
fillRow(res_columns, function_name, 1, {1}, "", FunctionOrigin::SYSTEM, aggregate_functions_factory); fillRow(res_columns, function_name, 1, "", FunctionOrigin::SYSTEM, aggregate_functions_factory);
} }
const auto & user_defined_sql_functions_factory = UserDefinedSQLFunctionFactory::instance(); const auto & user_defined_sql_functions_factory = UserDefinedSQLFunctionFactory::instance();
@ -177,14 +138,14 @@ void StorageSystemFunctions::fillData(MutableColumns & res_columns, ContextPtr c
for (const auto & function_name : user_defined_sql_functions_names) for (const auto & function_name : user_defined_sql_functions_names)
{ {
auto create_query = queryToString(user_defined_sql_functions_factory.get(function_name)); auto create_query = queryToString(user_defined_sql_functions_factory.get(function_name));
fillRow(res_columns, function_name, 0, {0}, create_query, FunctionOrigin::SQL_USER_DEFINED, user_defined_sql_functions_factory); fillRow(res_columns, function_name, 0, create_query, FunctionOrigin::SQL_USER_DEFINED, user_defined_sql_functions_factory);
} }
const auto & user_defined_executable_functions_factory = UserDefinedExecutableFunctionFactory::instance(); const auto & user_defined_executable_functions_factory = UserDefinedExecutableFunctionFactory::instance();
const auto & user_defined_executable_functions_names = user_defined_executable_functions_factory.getRegisteredNames(context); /// NOLINT(readability-static-accessed-through-instance) const auto & user_defined_executable_functions_names = user_defined_executable_functions_factory.getRegisteredNames(context); /// NOLINT(readability-static-accessed-through-instance)
for (const auto & function_name : user_defined_executable_functions_names) for (const auto & function_name : user_defined_executable_functions_names)
{ {
fillRow(res_columns, function_name, 0, {0}, "", FunctionOrigin::EXECUTABLE_USER_DEFINED, user_defined_executable_functions_factory); fillRow(res_columns, function_name, 0, "", FunctionOrigin::EXECUTABLE_USER_DEFINED, user_defined_executable_functions_factory);
} }
} }

View File

@ -74,7 +74,7 @@ ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr c
/// otherwise, we get table structure by dictionary structure. /// otherwise, we get table structure by dictionary structure.
auto dictionary_structure = external_loader.getDictionaryStructure(dictionary_name, context); auto dictionary_structure = external_loader.getDictionaryStructure(dictionary_name, context);
return ColumnsDescription(StorageDictionary::getNamesAndTypes(dictionary_structure)); return ColumnsDescription(StorageDictionary::getNamesAndTypes(dictionary_structure, false));
} }
StoragePtr TableFunctionDictionary::executeImpl( StoragePtr TableFunctionDictionary::executeImpl(

View File

@ -48,7 +48,19 @@ from git_helper import GIT_PREFIX, Git
from git_helper import Runner as GitRunner from git_helper import Runner as GitRunner
from github_helper import GitHub from github_helper import GitHub
from pr_info import PRInfo from pr_info import PRInfo
from report import ERROR, FAILURE, PENDING, SUCCESS, BuildResult, JobReport, TestResult from report import (
ERROR,
FAILURE,
PENDING,
SUCCESS,
BuildResult,
JobReport,
TestResult,
OK,
JOB_STARTED_TEST_NAME,
JOB_FINISHED_TEST_NAME,
FAIL,
)
from s3_helper import S3Helper from s3_helper import S3Helper
from stopwatch import Stopwatch from stopwatch import Stopwatch
from tee_popen import TeePopen from tee_popen import TeePopen
@ -263,7 +275,8 @@ def check_missing_images_on_dockerhub(
return result return result
def _pre_action(s3, indata, pr_info): def _pre_action(s3, job_name, batch, indata, pr_info):
no_cache = CiSettings.create_from_run_config(indata).no_ci_cache
print("Clear dmesg") print("Clear dmesg")
Utils.clear_dmesg() Utils.clear_dmesg()
CommitStatusData.cleanup() CommitStatusData.cleanup()
@ -282,6 +295,90 @@ def _pre_action(s3, indata, pr_info):
ci_cache.dump_run_config(indata) ci_cache.dump_run_config(indata)
to_be_skipped = False
skip_status = SUCCESS
# check if job was run already
if CI.is_build_job(job_name):
# this is a build job - check if a build report is present
build_result = (
BuildResult.load_any(job_name, pr_info.number, pr_info.head_ref)
if not no_cache
else None
)
if build_result:
if build_result.status == SUCCESS:
to_be_skipped = True
else:
print(
"Build report found but status is unsuccessful - will try to rerun"
)
print("::group::Build Report")
print(build_result.as_json())
print("::endgroup::")
else:
# this is a test job - check if GH commit status or cache record is present
commit = get_commit(GitHub(get_best_robot_token(), per_page=100), pr_info.sha)
# rerun helper check
# FIXME: Find a way to identify if job restarted manually (by developer) or by automatic workflow restart (died spot-instance)
# disable rerun check for the former
if job_name not in (
CI.JobNames.BUILD_CHECK,
): # we might want to rerun build report job
rerun_helper = RerunHelper(commit, _get_ext_check_name(job_name))
if rerun_helper.is_already_finished_by_status():
print("WARNING: Rerunning job with GH status ")
status = rerun_helper.get_finished_status()
assert status
print("::group::Commit Status")
print(status)
print("::endgroup::")
to_be_skipped = True
skip_status = status.state
# ci cache check
if not to_be_skipped and not no_cache:
ci_cache = CiCache(s3, indata["jobs_data"]["digests"]).update()
job_config = CI.get_job_config(job_name)
if ci_cache.is_successful(
job_name,
batch,
job_config.num_batches,
job_config.required_on_release_branch,
):
print("CICache record has be found - job will be skipped")
job_status = ci_cache.get_successful(
job_name, batch, job_config.num_batches
)
assert job_status, "BUG"
_create_gh_status(
commit,
job_name,
batch,
job_config.num_batches,
job_status,
)
to_be_skipped = True
# skip_status = SUCCESS already there
GHActions.print_in_group("Commit Status Data", job_status)
# create pre report
jr = JobReport.create_pre_report(status=skip_status, job_skipped=to_be_skipped)
jr.dump()
if not to_be_skipped:
print("push start record to ci db")
prepared_events = prepare_tests_results_for_clickhouse(
pr_info,
[TestResult(JOB_STARTED_TEST_NAME, OK)],
SUCCESS,
0.0,
JobReport.get_start_time_from_current(),
"",
_get_ext_check_name(job_name),
)
ClickHouseHelper().insert_events_into(
db="default", table="checks", events=prepared_events
)
print(f"Pre action done. Report files [{reports_files}] have been downloaded") print(f"Pre action done. Report files [{reports_files}] have been downloaded")
@ -1045,108 +1142,23 @@ def main() -> int:
### PRE action: start ### PRE action: start
elif args.pre: elif args.pre:
assert indata, "Run config must be provided via --infile" assert indata, "Run config must be provided via --infile"
_pre_action(s3, indata, pr_info) _pre_action(s3, args.job_name, args.batch, indata, pr_info)
JobReport.create_pre_report().dump()
### RUN action: start ### RUN action: start
elif args.run: elif args.run:
assert indata assert indata
ci_settings = CiSettings.create_from_run_config(indata) job_report = JobReport.load()
check_name = args.job_name check_name = args.job_name
check_name_with_group = _get_ext_check_name(check_name) check_name_with_group = _get_ext_check_name(check_name)
print( print(
f"Check if rerun for name: [{check_name}], extended name [{check_name_with_group}]" f"Check if rerun for name: [{check_name}], extended name [{check_name_with_group}]"
) )
previous_status = None
if CI.is_build_job(check_name):
# this is a build job - check if a build report is present
build_result = (
BuildResult.load_any(check_name, pr_info.number, pr_info.head_ref)
if not ci_settings.no_ci_cache
else None
)
if build_result:
if build_result.status == SUCCESS:
previous_status = build_result.status
JobReport(
status=SUCCESS,
description="",
test_results=[],
start_time="",
duration=0.0,
additional_files=[],
job_skipped=True,
).dump()
else:
# FIXME: Consider reusing failures for build jobs.
# Just remove this if/else - that makes build job starting and failing immediately
print(
"Build report found but status is unsuccessful - will try to rerun"
)
print("::group::Build Report")
print(build_result.as_json())
print("::endgroup::")
else:
# this is a test job - check if GH commit status or cache record is present
commit = get_commit(
GitHub(get_best_robot_token(), per_page=100), pr_info.sha
)
# rerun helper check if job_report.job_skipped and not args.force:
# FIXME: Find a way to identify if job restarted manually (by developer) or by automatic workflow restart (died spot-instance)
# disable rerun check for the former
if check_name not in (
CI.JobNames.BUILD_CHECK,
): # we might want to rerun build report job
rerun_helper = RerunHelper(commit, check_name_with_group)
if rerun_helper.is_already_finished_by_status():
print("WARNING: Rerunning job with GH status ")
status = rerun_helper.get_finished_status()
assert status
print("::group::Commit Status")
print(status)
print("::endgroup::")
previous_status = status.state
print("Create dummy job report with job_skipped flag")
JobReport(
status=status.state,
description="",
test_results=[],
start_time="",
duration=0.0,
additional_files=[],
job_skipped=True,
).dump()
# ci cache check
if not previous_status and not ci_settings.no_ci_cache:
ci_cache = CiCache(s3, indata["jobs_data"]["digests"]).update()
job_config = CI.get_job_config(check_name)
if ci_cache.is_successful(
check_name,
args.batch,
job_config.num_batches,
job_config.required_on_release_branch,
):
job_status = ci_cache.get_successful(
check_name, args.batch, job_config.num_batches
)
assert job_status, "BUG"
_create_gh_status(
commit,
check_name,
args.batch,
job_config.num_batches,
job_status,
)
previous_status = job_status.status
GHActions.print_in_group("Commit Status Data", job_status)
if previous_status and not args.force:
print( print(
f"Commit status or Build Report is already present - job will be skipped with status: [{previous_status}]" f"Commit status or Build Report is already present - job will be skipped with status: [{job_report.status}]"
) )
if previous_status == SUCCESS: if job_report.status == SUCCESS:
exit_code = 0 exit_code = 0
else: else:
exit_code = 1 exit_code = 1
@ -1166,7 +1178,8 @@ def main() -> int:
assert ( assert (
job_report job_report
), "BUG. There must be job report either real report, or pre-report if job was killed" ), "BUG. There must be job report either real report, or pre-report if job was killed"
if not job_report.job_skipped and not job_report.pre_report: error_description = ""
if not job_report.pre_report:
# it's a real job report # it's a real job report
ch_helper = ClickHouseHelper() ch_helper = ClickHouseHelper()
check_url = "" check_url = ""
@ -1244,7 +1257,6 @@ def main() -> int:
pr_info, pr_info,
dump_to_file=True, dump_to_file=True,
) )
print(f"Job report url: [{check_url}]") print(f"Job report url: [{check_url}]")
prepared_events = prepare_tests_results_for_clickhouse( prepared_events = prepare_tests_results_for_clickhouse(
pr_info, pr_info,
@ -1269,9 +1281,7 @@ def main() -> int:
) )
elif job_report.job_skipped: elif job_report.job_skipped:
print(f"Skipped after rerun check {[args.job_name]} - do nothing") print(f"Skipped after rerun check {[args.job_name]} - do nothing")
elif job_report.job_skipped: else:
print(f"Job was skipped {[args.job_name]} - do nothing")
elif job_report.pre_report:
print(f"ERROR: Job was killed - generate evidence") print(f"ERROR: Job was killed - generate evidence")
job_report.update_duration() job_report.update_duration()
ret_code = os.getenv("JOB_EXIT_CODE", "") ret_code = os.getenv("JOB_EXIT_CODE", "")
@ -1282,10 +1292,13 @@ def main() -> int:
pass pass
if Utils.is_killed_with_oom(): if Utils.is_killed_with_oom():
print("WARNING: OOM while job execution") print("WARNING: OOM while job execution")
error = f"Out Of Memory, exit_code {job_report.exit_code}, after {int(job_report.duration)}s" error_description = f"Out Of Memory, exit_code {job_report.exit_code}"
else: else:
error = f"Unknown, exit_code {job_report.exit_code}, after {int(job_report.duration)}s" error_description = f"Unknown, exit_code {job_report.exit_code}"
CIBuddy().post_error(error, job_name=_get_ext_check_name(args.job_name)) CIBuddy().post_error(
error_description + f" after {int(job_report.duration)}s",
job_name=_get_ext_check_name(args.job_name),
)
if CI.is_test_job(args.job_name): if CI.is_test_job(args.job_name):
gh = GitHub(get_best_robot_token(), per_page=100) gh = GitHub(get_best_robot_token(), per_page=100)
commit = get_commit(gh, pr_info.sha) commit = get_commit(gh, pr_info.sha)
@ -1293,11 +1306,32 @@ def main() -> int:
commit, commit,
ERROR, ERROR,
"", "",
"Error: " + error, "Error: " + error_description,
_get_ext_check_name(args.job_name), _get_ext_check_name(args.job_name),
pr_info, pr_info,
dump_to_file=True, dump_to_file=True,
) )
if not job_report.job_skipped:
print("push finish record to ci db")
prepared_events = prepare_tests_results_for_clickhouse(
pr_info,
[
TestResult(
JOB_FINISHED_TEST_NAME,
FAIL if error_description else OK,
raw_logs=error_description or None,
)
],
SUCCESS if not error_description else ERROR,
0.0,
JobReport.get_start_time_from_current(),
"",
_get_ext_check_name(args.job_name),
)
ClickHouseHelper().insert_events_into(
db="default", table="checks", events=prepared_events
)
### POST action: end ### POST action: end
### MARK SUCCESS action: start ### MARK SUCCESS action: start

View File

@ -737,17 +737,12 @@ class CiCache:
if job_name not in required_builds: if job_name not in required_builds:
remove_from_to_do.append(job_name) remove_from_to_do.append(job_name)
if not required_builds:
remove_from_to_do.append(CI.JobNames.BUILD_CHECK)
for job in remove_from_to_do: for job in remove_from_to_do:
print(f"Filter job [{job}] - not affected by the change") print(f"Filter job [{job}] - not affected by the change")
if job in self.jobs_to_do: if job in self.jobs_to_do:
del self.jobs_to_do[job] del self.jobs_to_do[job]
if job in self.jobs_to_wait: if job in self.jobs_to_wait:
del self.jobs_to_wait[job] del self.jobs_to_wait[job]
if job in self.jobs_to_skip:
self.jobs_to_skip.remove(job)
def await_pending_jobs(self, is_release: bool, dry_run: bool = False) -> None: def await_pending_jobs(self, is_release: bool, dry_run: bool = False) -> None:
""" """

View File

@ -160,6 +160,10 @@ class CiSettings:
else: else:
return False return False
if CI.is_build_job(job):
print(f"Build job [{job}] - always run")
return True
if self.exclude_keywords: if self.exclude_keywords:
for keyword in self.exclude_keywords: for keyword in self.exclude_keywords:
if keyword in normalize_string(job): if keyword in normalize_string(job):

View File

@ -247,6 +247,9 @@ BASE_HEADERS = ["Test name", "Test status"]
# should not be in TEMP directory or any directory that may be cleaned during the job execution # should not be in TEMP directory or any directory that may be cleaned during the job execution
JOB_REPORT_FILE = Path(GITHUB_WORKSPACE) / "job_report.json" JOB_REPORT_FILE = Path(GITHUB_WORKSPACE) / "job_report.json"
JOB_STARTED_TEST_NAME = "STARTED"
JOB_FINISHED_TEST_NAME = "COMPLETED"
@dataclass @dataclass
class TestResult: class TestResult:
@ -304,14 +307,19 @@ class JobReport:
exit_code: int = -1 exit_code: int = -1
@staticmethod @staticmethod
def create_pre_report() -> "JobReport": def get_start_time_from_current():
return datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
@classmethod
def create_pre_report(cls, status: str, job_skipped: bool) -> "JobReport":
return JobReport( return JobReport(
status=ERROR, status=status,
description="", description="",
test_results=[], test_results=[],
start_time=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), start_time=cls.get_start_time_from_current(),
duration=0.0, duration=0.0,
additional_files=[], additional_files=[],
job_skipped=job_skipped,
pre_report=True, pre_report=True,
) )

View File

@ -295,6 +295,13 @@ class TestCIConfig(unittest.TestCase):
continue continue
expected_jobs_to_do.append(job) expected_jobs_to_do.append(job)
for job, config in CI.JOB_CONFIGS.items(): for job, config in CI.JOB_CONFIGS.items():
if (
CI.is_build_job(job)
and not config.run_by_label
and job not in expected_jobs_to_do
):
# expected to run all builds jobs
expected_jobs_to_do.append(job)
if not any( if not any(
keyword in normalize_string(job) keyword in normalize_string(job)
for keyword in settings.include_keywords for keyword in settings.include_keywords

View File

@ -197,6 +197,10 @@ class TestCIOptions(unittest.TestCase):
"package_debug", "package_debug",
"package_msan", "package_msan",
"package_ubsan", "package_ubsan",
"package_aarch64",
"package_release_coverage",
"package_tsan",
"binary_release",
"Stateless tests (asan)", "Stateless tests (asan)",
"Stateless tests (azure, asan)", "Stateless tests (azure, asan)",
"Stateless tests flaky check (asan)", "Stateless tests flaky check (asan)",
@ -276,6 +280,7 @@ class TestCIOptions(unittest.TestCase):
filtered_jobs, filtered_jobs,
[ [
"Style check", "Style check",
"fuzzers",
], ],
) )
@ -291,9 +296,7 @@ class TestCIOptions(unittest.TestCase):
) )
self.assertCountEqual( self.assertCountEqual(
filtered_jobs, filtered_jobs,
[ ["Style check", "fuzzers"],
"Style check",
],
) )
def test_options_applied_4(self): def test_options_applied_4(self):
@ -329,5 +332,12 @@ class TestCIOptions(unittest.TestCase):
"Stateless tests (release, old analyzer, s3, DatabaseReplicated)", "Stateless tests (release, old analyzer, s3, DatabaseReplicated)",
"package_asan", "package_asan",
"fuzzers", "fuzzers",
"package_aarch64",
"package_release_coverage",
"package_debug",
"package_tsan",
"package_msan",
"package_ubsan",
"binary_release",
], ],
) )

View File

@ -1526,6 +1526,15 @@ class TestCase:
start_time = args.testcase_start_time start_time = args.testcase_start_time
database = args.testcase_database database = args.testcase_database
if args.client_log:
log_opt = " --client_logs_file=" + args.client_log + " "
client_options += log_opt
os.environ["CLICKHOUSE_CLIENT_OPT"] = (
os.environ["CLICKHOUSE_CLIENT_OPT"]
if "CLICKHOUSE_CLIENT_OPT" in os.environ
else ""
) + log_opt
# This is for .sh tests # This is for .sh tests
os.environ["CLICKHOUSE_LOG_COMMENT"] = args.testcase_basename os.environ["CLICKHOUSE_LOG_COMMENT"] = args.testcase_basename
@ -2995,6 +3004,15 @@ def main(args):
else: else:
print(colored("\nNo queries hung.", args, "green", attrs=["bold"])) print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
if args.client_log:
if os.path.exists(args.client_log):
with open(args.client_log, "rb") as stream:
content = stream.read().decode()
if len(content):
print("Has fatal logs from client:\n")
print(content)
os.remove(args.client_log)
if len(restarted_tests) > 0: if len(restarted_tests) > 0:
print("\nSome tests were restarted:\n") print("\nSome tests were restarted:\n")
@ -3396,6 +3414,11 @@ def parse_args():
default=os.environ.get("REPLACE_MT_WITH_SMT", False), default=os.environ.get("REPLACE_MT_WITH_SMT", False),
help="Replace ordinary MergeTree engine with SharedMergeTree", help="Replace ordinary MergeTree engine with SharedMergeTree",
) )
parser.add_argument(
"--client-log",
default="./client.fatal.log",
help="Path to file for fatal logs from client",
)
parser.add_argument( parser.add_argument(
"--run-sequential-tests-in-parallel", "--run-sequential-tests-in-parallel",

View File

@ -286,7 +286,6 @@ CREATE TABLE system.functions
( (
`name` String, `name` String,
`is_aggregate` UInt8, `is_aggregate` UInt8,
`is_deterministic` Nullable(UInt8),
`case_insensitive` UInt8, `case_insensitive` UInt8,
`alias_to` String, `alias_to` String,
`create_query` String, `create_query` String,

View File

@ -45,9 +45,4 @@ expect eof
EOF EOF
} }
run "$CLICKHOUSE_LOCAL --disable_suggestion" run "$CLICKHOUSE_LOCAL"
# Suggestions are off because the suggestion feature initializes itself by reading all available function
# names from "system.functions". Getting the value for field "is_obsolete" occasionally throws (e.g. for
# certain dictionary functions when dictionaries are not set up yet). Exceptions are properly handled, but
# they exist for a short time. This, in combination with CLICKHOUSE_TERMINATE_ON_ANY_EXCEPTION, terminates
# clickhouse-local and clickhouse-client when run in interactive mode *with* suggestions.

View File

@ -7,3 +7,4 @@ Tuple(\n k UInt8,\n j Int32)
Tuple(Int32, Int32, Int32, Int32) Tuple(Int32, Int32, Int32, Int32)
['1','2','3','4'] ['1','2','3','4']
(1,2,3) (1,2,3)
Tuple(Nullable(Nothing)) Tuple(Bool) Tuple(Bool)

View File

@ -28,4 +28,7 @@ create table tbl (x Tuple(a Int32, b Int32, c Int32)) engine MergeTree order by
insert into tbl values (tuple(1, 2, 3)); -- without tuple it's interpreted differently inside values block. insert into tbl values (tuple(1, 2, 3)); -- without tuple it's interpreted differently inside values block.
select * from tbl; select * from tbl;
drop table tbl drop table tbl;
-- Avoid generating named tuple for special keywords
select toTypeName(tuple(null)), toTypeName(tuple(true)), toTypeName(tuple(false));

View File

@ -0,0 +1,36 @@
-- { echoOn }
SELECT count() FROM src;
100
SELECT a, sum(b), uniq(b), FROM src GROUP BY a ORDER BY a;
0 450 10
1 460 10
2 470 10
3 480 10
4 490 10
5 500 10
6 510 10
7 520 10
8 530 10
9 540 10
SELECT count() FROM remote('127.0.0.{1..2}', currentDatabase(), src);
200
-- { echoOn }
INSERT INTO dst_null
SELECT a, b FROM src;
SELECT
a,
sumMerge(sum_b) AS sum_b,
uniqMerge(uniq_b) AS uniq_b
FROM mv_dst
GROUP BY a
ORDER BY a;
0 450 10
1 460 10
2 470 10
3 480 10
4 490 10
5 500 10
6 510 10
7 520 10
8 530 10
9 540 10

View File

@ -0,0 +1,51 @@
DROP TABLE IF EXISTS src;
CREATE TABLE src (a UInt64, b UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{database}/03008_deduplication_remote_insert_select/src', '{replica}')
ORDER BY tuple();
INSERT INTO src SELECT number % 10 as a, number as b FROM numbers(100);
SET allow_experimental_parallel_reading_from_replicas=1;
SET max_parallel_replicas=3;
SET parallel_replicas_for_non_replicated_merge_tree=1;
SET cluster_for_parallel_replicas='parallel_replicas';
-- { echoOn }
SELECT count() FROM src;
SELECT a, sum(b), uniq(b), FROM src GROUP BY a ORDER BY a;
SELECT count() FROM remote('127.0.0.{1..2}', currentDatabase(), src);
-- { echoOff }
DROP TABLE IF EXISTS dst_null;
CREATE TABLE dst_null(a UInt64, b UInt64)
ENGINE = Null;
DROP TABLE IF EXISTS mv_dst;
CREATE MATERIALIZED VIEW mv_dst
ENGINE = AggregatingMergeTree()
ORDER BY a
AS SELECT
a,
sumState(b) AS sum_b,
uniqState(b) AS uniq_b
FROM dst_null
GROUP BY a;
-- { echoOn }
INSERT INTO dst_null
SELECT a, b FROM src;
SELECT
a,
sumMerge(sum_b) AS sum_b,
uniqMerge(uniq_b) AS uniq_b
FROM mv_dst
GROUP BY a
ORDER BY a;
-- { echoOff }
DROP TABLE src;
DROP TABLE mv_dst;
DROP TABLE dst_null;

View File

@ -30,9 +30,9 @@ function alter_thread()
while true; do while true; do
REPLICA=$(($RANDOM % 3 + 1)) REPLICA=$(($RANDOM % 3 + 1))
ADD=$(($RANDOM % 5 + 1)) ADD=$(($RANDOM % 5 + 1))
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_add_drop_steroids_$REPLICA ADD COLUMN value$ADD UInt32 DEFAULT 42 SETTINGS replication_alter_partitions_sync=0"; # additionaly we don't wait anything for more heavy concurrency $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_add_drop_steroids_$REPLICA ADD COLUMN value$ADD UInt32 DEFAULT 42 SETTINGS replication_alter_partitions_sync=0"; # additionally we don't wait anything for more heavy concurrency
DROP=$(($RANDOM % 5 + 1)) DROP=$(($RANDOM % 5 + 1))
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_add_drop_steroids_$REPLICA DROP COLUMN value$DROP SETTINGS replication_alter_partitions_sync=0"; # additionaly we don't wait anything for more heavy concurrency $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_add_drop_steroids_$REPLICA DROP COLUMN value$DROP SETTINGS replication_alter_partitions_sync=0"; # additionally we don't wait anything for more heavy concurrency
sleep 0.$RANDOM sleep 0.$RANDOM
done done
} }

View File

@ -0,0 +1,4 @@
n1 UInt64
n2 UInt32
n1 UInt64
n2 UInt32

View File

@ -0,0 +1,41 @@
CREATE DICTIONARY `test_dictionary0` (
`n1` String,
`n2` UInt32
)
PRIMARY KEY n1
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 DB 'test_db' TABLE 'table_01' USER 'default'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT());
SET dictionary_validate_primary_key_type=1;
CREATE DICTIONARY `test_dictionary1` (
`n1` String,
`n2` UInt32
)
PRIMARY KEY n1
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 DB 'test_db' TABLE 'table_01' USER 'default'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT()); -- { serverError 36 }
CREATE DICTIONARY `test_dictionary2` (
`n1` UInt32,
`n2` UInt32
)
PRIMARY KEY n1
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 DB 'test_db' TABLE 'table_01' USER 'default'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT()); -- { serverError 36 }
CREATE DICTIONARY `test_dictionary3` (
`n1` UInt64,
`n2` UInt32
)
PRIMARY KEY n1
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 DB 'test_db' TABLE 'table_01' USER 'default'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT());
DESCRIBE `test_dictionary0`;
DESCRIBE `test_dictionary3`;