diff --git a/dbms/programs/client/Client.cpp b/dbms/programs/client/Client.cpp index a94e6865730..f5a66e4dd15 100644 --- a/dbms/programs/client/Client.cpp +++ b/dbms/programs/client/Client.cpp @@ -482,6 +482,37 @@ private: Poco::File(history_file).createFile(); } +#if USE_READLINE + /// Install Ctrl+C signal handler that will be used in interactive mode. + + if (rl_initialize()) + throw Exception("Cannot initialize readline", ErrorCodes::CANNOT_READLINE); + + auto clear_prompt_or_exit = [](int) + { + /// This is signal safe. + ssize_t res = write(STDOUT_FILENO, "\n", 1); + + /// Allow to quit client while query is in progress by pressing Ctrl+C twice. + /// (First press to Ctrl+C will try to cancel query by InterruptListener). + if (res == 1 && rl_line_buffer[0] && !RL_ISSTATE(RL_STATE_DONE)) + { + rl_replace_line("", 0); + if (rl_forced_update_display()) + _exit(0); + } + else + { + /// A little dirty, but we struggle to find better way to correctly + /// force readline to exit after returning from the signal handler. + _exit(0); + } + }; + + if (signal(SIGINT, clear_prompt_or_exit) == SIG_ERR) + throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER); +#endif + loop(); std::cout << (isNewYearMode() ? "Happy new year." : "Bye.") << std::endl; @@ -1518,35 +1549,6 @@ public: } } -#if USE_READLINE - if (rl_initialize()) - throw Exception("Cannot initialize readline", ErrorCodes::CANNOT_READLINE); - - auto clear_prompt_or_exit = [](int) - { - /// This is signal safe. - ssize_t res = write(STDOUT_FILENO, "\n", 1); - - /// Allow to quit client while query is in progress by pressing Ctrl+C twice. - /// (First press to Ctrl+C will try to cancel query by InterruptListener). - if (res == 1 && rl_line_buffer[0] && !RL_ISSTATE(RL_STATE_DONE)) - { - rl_replace_line("", 0); - if (rl_forced_update_display()) - _exit(0); - } - else - { - /// A little dirty, but we struggle to find better way to correctly - /// force readline to exit after returning from the signal handler. - _exit(0); - } - }; - - if (signal(SIGINT, clear_prompt_or_exit) == SIG_ERR) - throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER); -#endif - ioctl(0, TIOCGWINSZ, &terminal_size); namespace po = boost::program_options; diff --git a/dbms/programs/server/Server.cpp b/dbms/programs/server/Server.cpp index b22ab82559d..17f49ef6ddd 100644 --- a/dbms/programs/server/Server.cpp +++ b/dbms/programs/server/Server.cpp @@ -366,12 +366,16 @@ int Server::main(const std::vector & /*args*/) dns_cache_updater = std::make_unique(*global_context); } +#if defined(__linux__) if (!TaskStatsInfoGetter::checkPermissions()) { LOG_INFO(log, "It looks like the process has no CAP_NET_ADMIN capability, some performance statistics will be disabled." " It could happen due to incorrect ClickHouse package installation." " You could resolve the problem manually with 'sudo setcap cap_net_admin=+ep /usr/bin/clickhouse'"); } +#else + LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled."); +#endif { Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0); diff --git a/dbms/src/Common/TaskStatsInfoGetter.cpp b/dbms/src/Common/TaskStatsInfoGetter.cpp index d9fd8ec34ff..07bf502d1ac 100644 --- a/dbms/src/Common/TaskStatsInfoGetter.cpp +++ b/dbms/src/Common/TaskStatsInfoGetter.cpp @@ -2,36 +2,27 @@ #include #include -#include -#if defined(__linux__) -#include -#include -#include -#include -#endif +#include +#if defined(__linux__) + +#include + +#include #include #include #include #include -#include -#include - -#if __has_include() -#include -#else #include -#endif +#include +#include +#include +#include /// Basic idea is motivated by "iotop" tool. /// More info: https://www.kernel.org/doc/Documentation/accounting/taskstats.txt -#define GENLMSG_DATA(glh) ((void *)((char*)NLMSG_DATA(glh) + GENL_HDRLEN)) -#define GENLMSG_PAYLOAD(glh) (NLMSG_PAYLOAD(glh, 0) - GENL_HDRLEN) -#define NLA_DATA(na) ((void *)((char*)(na) + NLA_HDRLEN)) -#define NLA_PAYLOAD(len) (len - NLA_HDRLEN) - namespace DB { @@ -39,131 +30,203 @@ namespace DB namespace ErrorCodes { extern const int NETLINK_ERROR; + extern const int LOGICAL_ERROR; } namespace { -static size_t constexpr MAX_MSG_SIZE = 1024; - +/** The message contains: + * - Netlink protocol header; + * - Generic Netlink (is a sub-protocol of Netlink that we use) protocol header; + * - Payload + * -- that itself is a list of "Attributes" (sub-messages), each of them contains length (including header), type, and its own payload. + * -- and attribute payload may be represented by the list of embedded attributes. + */ struct NetlinkMessage { -#if defined(__linux__) - ::nlmsghdr n; - ::genlmsghdr g; - char buf[MAX_MSG_SIZE]; -#endif + static size_t constexpr MAX_MSG_SIZE = 1024; + + alignas(NLMSG_ALIGNTO) ::nlmsghdr header; + + struct Attribute + { + ::nlattr header; + + alignas(NLMSG_ALIGNTO) char payload[0]; + + const Attribute * next() const + { + return reinterpret_cast(reinterpret_cast(this) + NLA_ALIGN(header.nla_len)); + } + }; + + union alignas(NLMSG_ALIGNTO) + { + struct + { + ::genlmsghdr generic_header; + + union alignas(NLMSG_ALIGNTO) + { + char buf[MAX_MSG_SIZE]; + Attribute attribute; /// First attribute. There may be more. + } payload; + }; + + ::nlmsgerr error; + }; + + size_t payload_size() const + { + return header.nlmsg_len - sizeof(header) - sizeof(generic_header); + } + + const Attribute * end() const + { + return reinterpret_cast(reinterpret_cast(this) + header.nlmsg_len); + } + + void send(int fd) const + { + const char * request_buf = reinterpret_cast(this); + ssize_t request_size = header.nlmsg_len; + + ::sockaddr_nl nladdr{}; + nladdr.nl_family = AF_NETLINK; + + while (true) + { + ssize_t bytes_sent = ::sendto(fd, request_buf, request_size, 0, reinterpret_cast(&nladdr), sizeof(nladdr)); + + if (bytes_sent <= 0) + { + if (errno == EAGAIN) + continue; + else + throwFromErrno("Can't send a Netlink command", ErrorCodes::NETLINK_ERROR); + } + + if (bytes_sent > request_size) + throw Exception("Wrong result of sendto system call: bytes_sent is greater than request size", ErrorCodes::NETLINK_ERROR); + + if (bytes_sent == request_size) + break; + + request_buf += bytes_sent; + request_size -= bytes_sent; + } + } + + void receive(int fd) + { + ssize_t bytes_received = ::recv(fd, this, sizeof(*this), 0); + + if (header.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&header), bytes_received)) + throw Exception("Can't receive Netlink response, error: " + std::to_string(error.error), ErrorCodes::NETLINK_ERROR); + } }; -void sendCommand( - int sock_fd, - UInt16 nlmsg_type, - UInt32 nlmsg_pid, - UInt8 genl_cmd, - UInt16 nla_type, - void * nla_data, - int nla_len) +NetlinkMessage query( + int fd, + UInt16 type, + UInt32 pid, + UInt8 command, + UInt16 attribute_type, + const void * attribute_data, + int attribute_size) { -#if defined(__linux__) - NetlinkMessage msg{}; + NetlinkMessage request; - msg.n.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN); - msg.n.nlmsg_type = nlmsg_type; - msg.n.nlmsg_flags = NLM_F_REQUEST; - msg.n.nlmsg_seq = 0; - msg.n.nlmsg_pid = nlmsg_pid; - msg.g.cmd = genl_cmd; - msg.g.version = 1; + request.header.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN); /// Length of both headers. + request.header.nlmsg_type = type; + request.header.nlmsg_flags = NLM_F_REQUEST; /// A request. + request.header.nlmsg_seq = 0; + request.header.nlmsg_pid = pid; - ::nlattr * attr = static_cast<::nlattr *>(GENLMSG_DATA(&msg)); - attr->nla_type = nla_type; - attr->nla_len = nla_len + 1 + NLA_HDRLEN; + request.generic_header.cmd = command; + request.generic_header.version = 1; - memcpy(NLA_DATA(attr), nla_data, nla_len); - msg.n.nlmsg_len += NLMSG_ALIGN(attr->nla_len); + request.payload.attribute.header.nla_type = attribute_type; + request.payload.attribute.header.nla_len = attribute_size + 1 + NLA_HDRLEN; - char * buf = reinterpret_cast(&msg); - ssize_t buflen = msg.n.nlmsg_len; + memcpy(&request.payload.attribute.payload, attribute_data, attribute_size); - ::sockaddr_nl nladdr{}; - nladdr.nl_family = AF_NETLINK; + request.header.nlmsg_len += NLMSG_ALIGN(request.payload.attribute.header.nla_len); - while (true) - { - ssize_t r = ::sendto(sock_fd, buf, buflen, 0, reinterpret_cast(&nladdr), sizeof(nladdr)); + request.send(fd); - if (r >= buflen) - break; + NetlinkMessage response; + response.receive(fd); - if (r > 0) - { - buf += r; - buflen -= r; - } - else if (errno != EAGAIN) - throwFromErrno("Can't send a Netlink command", ErrorCodes::NETLINK_ERROR); - } -#endif + return response; } -UInt16 getFamilyId(int nl_sock_fd) +UInt16 getFamilyIdImpl(int fd) { -#if defined(__linux__) - struct - { - ::nlmsghdr header; - ::genlmsghdr ge_header; - char buf[256]; - } answer; + NetlinkMessage answer = query(fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY, CTRL_ATTR_FAMILY_NAME, TASKSTATS_GENL_NAME, strlen(TASKSTATS_GENL_NAME) + 1); - static char name[] = TASKSTATS_GENL_NAME; + /// NOTE Why the relevant info is located in the second attribute? + const NetlinkMessage::Attribute * attr = answer.payload.attribute.next(); - sendCommand( - nl_sock_fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY, - CTRL_ATTR_FAMILY_NAME, (void *) name, - strlen(TASKSTATS_GENL_NAME) + 1); + if (attr->header.nla_type != CTRL_ATTR_FAMILY_ID) + throw Exception("Received wrong attribute as an answer to GET_FAMILY Netlink command", ErrorCodes::NETLINK_ERROR); - UInt16 id = 0; - ssize_t rep_len = ::recv(nl_sock_fd, &answer, sizeof(answer), 0); - if (rep_len < 0) - throwFromErrno("Cannot get the family id for " + std::string(TASKSTATS_GENL_NAME) + " from the Netlink socket", ErrorCodes::NETLINK_ERROR); + return unalignedLoad(attr->payload); +} - if (answer.header.nlmsg_type == NLMSG_ERROR ||!NLMSG_OK((&answer.header), rep_len)) - throw Exception("Received an error instead of the family id for " + std::string(TASKSTATS_GENL_NAME) - + " from the Netlink socket", ErrorCodes::NETLINK_ERROR); - const ::nlattr * attr; - attr = static_cast(GENLMSG_DATA(&answer)); - attr = reinterpret_cast(reinterpret_cast(attr) + NLA_ALIGN(attr->nla_len)); - if (attr->nla_type == CTRL_ATTR_FAMILY_ID) - id = *static_cast(NLA_DATA(attr)); +bool checkPermissionsImpl() +{ + /// See man getcap. + __user_cap_header_struct request{}; + request.version = _LINUX_CAPABILITY_VERSION_1; /// It's enough to check just single CAP_NET_ADMIN capability we are interested. + request.pid = getpid(); - return id; -#else - return 0; -#endif + __user_cap_data_struct response{}; + + /// Avoid dependency on 'libcap'. + if (0 != syscall(SYS_capget, &request, &response)) + throwFromErrno("Cannot do 'capget' syscall", ErrorCodes::NETLINK_ERROR); + + return (1 << CAP_NET_ADMIN) & response.effective; +} + + +UInt16 getFamilyId(int fd) +{ + /// It is thread and exception safe since C++11 and even before. + static UInt16 res = getFamilyIdImpl(fd); + return res; } } -TaskStatsInfoGetter::TaskStatsInfoGetter() = default; - - -void TaskStatsInfoGetter::init() +bool TaskStatsInfoGetter::checkPermissions() { -#if defined(__linux__) - if (netlink_socket_fd >= 0) - return; + static bool res = checkPermissionsImpl(); + return res; +} + + +TaskStatsInfoGetter::TaskStatsInfoGetter() +{ + if (!checkPermissions()) + throw Exception("Logical error: TaskStatsInfoGetter is not usable without CAP_NET_ADMIN. Check permissions before creating the object.", + ErrorCodes::LOGICAL_ERROR); netlink_socket_fd = ::socket(PF_NETLINK, SOCK_RAW, NETLINK_GENERIC); if (netlink_socket_fd < 0) throwFromErrno("Can't create PF_NETLINK socket", ErrorCodes::NETLINK_ERROR); + /// On some containerized environments, operation on Netlink socket could hang forever. + /// We set reasonably small timeout to overcome this issue. + struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 50000; @@ -177,112 +240,41 @@ void TaskStatsInfoGetter::init() if (::bind(netlink_socket_fd, reinterpret_cast(&addr), sizeof(addr)) < 0) throwFromErrno("Can't bind PF_NETLINK socket", ErrorCodes::NETLINK_ERROR); - netlink_family_id = getFamilyId(netlink_socket_fd); -#endif + taskstats_family_id = getFamilyId(netlink_socket_fd); } -#if defined(__linux__) -void TaskStatsInfoGetter::getStatImpl(int tid, ::taskstats & out_stats) +void TaskStatsInfoGetter::getStat(::taskstats & out_stats, pid_t tid) { - init(); + NetlinkMessage answer = query(netlink_socket_fd, taskstats_family_id, tid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &tid, sizeof(tid)); - sendCommand(netlink_socket_fd, netlink_family_id, tid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &tid, sizeof(pid_t)); - - NetlinkMessage msg; - ssize_t rv = ::recv(netlink_socket_fd, &msg, sizeof(msg), 0); - - if (msg.n.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&msg.n), rv)) + for (const NetlinkMessage::Attribute * attr = &answer.payload.attribute; + attr < answer.end(); + attr = attr->next()) { - const ::nlmsgerr * err = static_cast(NLMSG_DATA(&msg)); - throw Exception("Can't get Netlink response, error: " + std::to_string(err->error), ErrorCodes::NETLINK_ERROR); - } - - rv = GENLMSG_PAYLOAD(&msg.n); - - const ::nlattr * attr = static_cast(GENLMSG_DATA(&msg)); - ssize_t len = 0; - - while (len < rv) - { - len += NLA_ALIGN(attr->nla_len); - - if (attr->nla_type == TASKSTATS_TYPE_AGGR_TGID || attr->nla_type == TASKSTATS_TYPE_AGGR_PID) + if (attr->header.nla_type == TASKSTATS_TYPE_AGGR_TGID || attr->header.nla_type == TASKSTATS_TYPE_AGGR_PID) { - int aggr_len = NLA_PAYLOAD(attr->nla_len); - int len2 = 0; - - attr = static_cast(NLA_DATA(attr)); - while (len2 < aggr_len) + for (const NetlinkMessage::Attribute * nested_attr = reinterpret_cast(attr->payload); + nested_attr < attr->next(); + nested_attr = nested_attr->next()) { - if (attr->nla_type == TASKSTATS_TYPE_STATS) + if (nested_attr->header.nla_type == TASKSTATS_TYPE_STATS) { - const ::taskstats * ts = static_cast(NLA_DATA(attr)); - out_stats = *ts; + out_stats = unalignedLoad<::taskstats>(nested_attr->payload); + return; } - - len2 += NLA_ALIGN(attr->nla_len); - attr = reinterpret_cast(reinterpret_cast(attr) + len2); } } - - attr = reinterpret_cast(reinterpret_cast(GENLMSG_DATA(&msg)) + len); } + + throw Exception("There is no TASKSTATS_TYPE_STATS attribute in the Netlink response", ErrorCodes::NETLINK_ERROR); } -void TaskStatsInfoGetter::getStat(::taskstats & stat, int tid) +pid_t TaskStatsInfoGetter::getCurrentTID() { - tid = tid < 0 ? getDefaultTID() : tid; - getStatImpl(tid, stat); -} - -#endif - -int TaskStatsInfoGetter::getCurrentTID() -{ -#if defined(__linux__) /// This call is always successful. - man gettid - return static_cast(syscall(SYS_gettid)); -#else - return 0; -#endif -} - -int TaskStatsInfoGetter::getDefaultTID() -{ - if (default_tid < 0) - default_tid = getCurrentTID(); - - return default_tid; -} - - -static bool checkPermissionsImpl() -{ -#if defined(__linux__) - /// See man getcap. - __user_cap_header_struct request{}; - request.version = _LINUX_CAPABILITY_VERSION_1; /// It's enough to check just single CAP_NET_ADMIN capability we are interested. - request.pid = getpid(); - - __user_cap_data_struct response{}; - - /// Avoid dependency on 'libcap'. - if (0 != syscall(SYS_capget, &request, &response)) - throwFromErrno("Cannot do 'capget' syscall", ErrorCodes::NETLINK_ERROR); - - return (1 << CAP_NET_ADMIN) & response.effective; -#else - return false; -#endif -} - -bool TaskStatsInfoGetter::checkPermissions() -{ - /// It is thread- and exception- safe since C++11 - static bool res = checkPermissionsImpl(); - return res; + return static_cast(syscall(SYS_gettid)); } @@ -293,3 +285,42 @@ TaskStatsInfoGetter::~TaskStatsInfoGetter() } } + + +#else + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + +bool TaskStatsInfoGetter::checkPermissions() +{ + return false; +} + + +TaskStatsInfoGetter::TaskStatsInfoGetter() +{ + throw Exception("TaskStats are not implemented for this OS.", ErrorCodes::NOT_IMPLEMENTED); +} + +void TaskStatsInfoGetter::getStat(::taskstats &, pid_t) +{ +} + +pid_t TaskStatsInfoGetter::getCurrentTID() +{ + return 0; +} + +TaskStatsInfoGetter::~TaskStatsInfoGetter() +{ +} + +} + +#endif diff --git a/dbms/src/Common/TaskStatsInfoGetter.h b/dbms/src/Common/TaskStatsInfoGetter.h index 0b44d10f295..4ff5b94da37 100644 --- a/dbms/src/Common/TaskStatsInfoGetter.h +++ b/dbms/src/Common/TaskStatsInfoGetter.h @@ -1,47 +1,39 @@ #pragma once +#include #include +#include +#if defined(__linux__) struct taskstats; +#else +struct taskstats {}; +#endif namespace DB { -class Exception; - - /// Get taskstat info from OS kernel via Netlink protocol. -class TaskStatsInfoGetter +class TaskStatsInfoGetter : private boost::noncopyable { public: TaskStatsInfoGetter(); - TaskStatsInfoGetter(const TaskStatsInfoGetter &) = delete; - -#if defined(__linux__) - void getStat(::taskstats & stat, int tid = -1); -#endif - ~TaskStatsInfoGetter(); + void getStat(::taskstats & stat, pid_t tid); + /// Make a syscall and returns Linux thread id - static int getCurrentTID(); + static pid_t getCurrentTID(); /// Whether the current process has permissions (sudo or cap_net_admin capabilties) to get taskstats info static bool checkPermissions(); -private: - /// Caches current thread tid to avoid extra sys calls - int getDefaultTID(); - int default_tid = -1; - #if defined(__linux__) - void getStatImpl(int tid, ::taskstats & out_stats); -#endif - void init(); - +private: int netlink_socket_fd = -1; - UInt16 netlink_family_id = 0; + UInt16 taskstats_family_id = 0; +#endif }; } diff --git a/dbms/src/Common/ThreadProfileEvents.h b/dbms/src/Common/ThreadProfileEvents.h index 0edbe46c43b..511f2f33e22 100644 --- a/dbms/src/Common/ThreadProfileEvents.h +++ b/dbms/src/Common/ThreadProfileEvents.h @@ -1,5 +1,5 @@ #pragma once -#include + #include #include @@ -10,6 +10,7 @@ #include #endif + namespace ProfileEvents { extern const Event RealTimeMicroseconds; @@ -20,6 +21,7 @@ namespace ProfileEvents extern const Event VoluntaryContextSwitches; extern const Event InvoluntaryContextSwitches; +#if defined(__linux__) extern const Event OSIOWaitMicroseconds; extern const Event OSCPUWaitMicroseconds; extern const Event OSCPUVirtualTimeMicroseconds; @@ -27,6 +29,7 @@ namespace ProfileEvents extern const Event OSWriteChars; extern const Event OSReadBytes; extern const Event OSWriteBytes; +#endif } @@ -85,12 +88,7 @@ struct RUsageCounters static RUsageCounters current(UInt64 real_time_ = getCurrentTimeNanoseconds()) { ::rusage rusage; -#if defined(__APPLE__) - ::getrusage(RUSAGE_SELF, &rusage); -#else ::getrusage(RUSAGE_THREAD, &rusage); -#endif - return RUsageCounters(rusage, real_time_); } @@ -113,18 +111,18 @@ struct RUsageCounters }; +#if defined(__linux__) + struct TasksStatsCounters { -#if defined(__linux__) ::taskstats stat; -#endif + TasksStatsCounters() = default; static TasksStatsCounters current(); static void incrementProfileEvents(const TasksStatsCounters & prev, const TasksStatsCounters & curr, ProfileEvents::Counters & profile_events) { -#if defined(__linux__) profile_events.increment(ProfileEvents::OSCPUWaitMicroseconds, safeDiff(prev.stat.cpu_delay_total, curr.stat.cpu_delay_total) / 1000U); profile_events.increment(ProfileEvents::OSIOWaitMicroseconds, @@ -140,7 +138,6 @@ struct TasksStatsCounters profile_events.increment(ProfileEvents::OSWriteChars, safeDiff(prev.stat.write_char, curr.stat.write_char)); profile_events.increment(ProfileEvents::OSReadBytes, safeDiff(prev.stat.read_bytes, curr.stat.read_bytes)); profile_events.increment(ProfileEvents::OSWriteBytes, safeDiff(prev.stat.write_bytes, curr.stat.write_bytes)); -#endif } static void updateProfileEvents(TasksStatsCounters & last_counters, ProfileEvents::Counters & profile_events) @@ -151,4 +148,17 @@ struct TasksStatsCounters } }; +#else + +struct TasksStatsCounters +{ + ::taskstats stat; + + static TasksStatsCounters current() { return {}; } + static void incrementProfileEvents(const TasksStatsCounters &, const TasksStatsCounters &, ProfileEvents::Counters &) {} + static void updateProfileEvents(TasksStatsCounters &, ProfileEvents::Counters &) {} +}; + +#endif + } diff --git a/dbms/src/Common/ThreadStatus.cpp b/dbms/src/Common/ThreadStatus.cpp index d92c589a5f2..3259b1ec716 100644 --- a/dbms/src/Common/ThreadStatus.cpp +++ b/dbms/src/Common/ThreadStatus.cpp @@ -1,10 +1,13 @@ -#include "ThreadStatus.h" -#include +#include + +#include #include #include #include +#include +#include -#include +#include #include @@ -41,7 +44,6 @@ ThreadStatus::ThreadStatus() last_rusage = std::make_unique(); last_taskstats = std::make_unique(); - taskstats_getter = std::make_unique(); memory_tracker.setDescription("(for thread)"); log = &Poco::Logger::get("ThreadStatus"); @@ -72,9 +74,12 @@ void ThreadStatus::initPerformanceCounters() ++queries_started; *last_rusage = RUsageCounters::current(query_start_time_nanoseconds); - has_permissions_for_taskstats = TaskStatsInfoGetter::checkPermissions(); - if (has_permissions_for_taskstats) + + if (TaskStatsInfoGetter::checkPermissions()) + { + taskstats_getter = std::make_unique(); *last_taskstats = TasksStatsCounters::current(); + } } void ThreadStatus::updatePerformanceCounters() @@ -82,7 +87,7 @@ void ThreadStatus::updatePerformanceCounters() try { RUsageCounters::updateProfileEvents(*last_rusage, performance_counters); - if (has_permissions_for_taskstats) + if (taskstats_getter) TasksStatsCounters::updateProfileEvents(*last_taskstats, performance_counters); } catch (...) diff --git a/dbms/src/Common/ThreadStatus.h b/dbms/src/Common/ThreadStatus.h index b708b3dce03..045cfde3a46 100644 --- a/dbms/src/Common/ThreadStatus.h +++ b/dbms/src/Common/ThreadStatus.h @@ -33,7 +33,6 @@ using InternalTextLogsQueueWeakPtr = std::weak_ptr; class ThreadGroupStatus { public: - mutable std::shared_mutex mutex; ProfileEvents::Counters performance_counters{VariableContext::Process}; @@ -126,7 +125,6 @@ public: ~ThreadStatus(); protected: - ThreadStatus(); void initPerformanceCounters(); @@ -160,11 +158,11 @@ protected: /// Use ptr not to add extra dependencies in the header std::unique_ptr last_rusage; std::unique_ptr last_taskstats; + + /// Set only if we have enough capabilities. std::unique_ptr taskstats_getter; - bool has_permissions_for_taskstats = false; public: - /// Implicitly finalizes current thread in the destructor class CurrentThreadScope { diff --git a/dbms/src/Core/BackgroundSchedulePool.cpp b/dbms/src/Core/BackgroundSchedulePool.cpp index 9cdec4087a4..c9abf559092 100644 --- a/dbms/src/Core/BackgroundSchedulePool.cpp +++ b/dbms/src/Core/BackgroundSchedulePool.cpp @@ -143,12 +143,6 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size) { LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads"); - /// Put all threads of both thread pools to one thread group - /// The master thread exits immediately - CurrentThread::initializeQuery(); - thread_group = CurrentThread::getGroup(); - CurrentThread::detachQuery(); - threads.resize(size); for (auto & thread : threads) thread = std::thread([this] { threadFunction(); }); @@ -217,14 +211,29 @@ void BackgroundSchedulePool::cancelDelayedTask(const TaskInfoPtr & task, std::lo } +void BackgroundSchedulePool::attachToThreadGroup() +{ + std::lock_guard lock(delayed_tasks_mutex); + + if (thread_group) + { + /// Put all threads to one thread pool + CurrentThread::attachTo(thread_group); + } + else + { + CurrentThread::initializeQuery(); + thread_group = CurrentThread::getGroup(); + } +} + + void BackgroundSchedulePool::threadFunction() { setThreadName("BackgrSchedPool"); - /// Put all threads to one thread pool - CurrentThread::attachTo(thread_group); + attachToThreadGroup(); SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); }); - CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool); while (!shutdown) @@ -242,8 +251,7 @@ void BackgroundSchedulePool::delayExecutionThreadFunction() { setThreadName("BckSchPoolDelay"); - /// Put all threads to one thread pool - CurrentThread::attachTo(thread_group); + attachToThreadGroup(); SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); }); while (!shutdown) diff --git a/dbms/src/Core/BackgroundSchedulePool.h b/dbms/src/Core/BackgroundSchedulePool.h index f55cc95dbbc..b7aa1592c19 100644 --- a/dbms/src/Core/BackgroundSchedulePool.h +++ b/dbms/src/Core/BackgroundSchedulePool.h @@ -142,6 +142,8 @@ private: /// Thread group used for profiling purposes ThreadGroupStatusPtr thread_group; + + void attachToThreadGroup(); }; using BackgroundSchedulePoolPtr = std::shared_ptr; diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index 022366cbc04..608bc06b713 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -300,7 +300,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupSt try { if (thread_group) - CurrentThread::attachTo(thread_group); + CurrentThread::attachToIfDetached(thread_group); setThreadName("MergeAggMergThr"); while (!parallel_merge_data->finish) diff --git a/dbms/src/IO/AIO.cpp b/dbms/src/IO/AIO.cpp index 9c8160919f1..e73319319b1 100644 --- a/dbms/src/IO/AIO.cpp +++ b/dbms/src/IO/AIO.cpp @@ -1,4 +1,4 @@ -#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER)) +#if defined(__linux__) #include #include diff --git a/dbms/src/IO/AIO.h b/dbms/src/IO/AIO.h index 3717f9ab90d..d99505fb017 100644 --- a/dbms/src/IO/AIO.h +++ b/dbms/src/IO/AIO.h @@ -1,6 +1,6 @@ #pragma once -#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER)) +#if defined(__linux__) #include diff --git a/dbms/src/IO/AIOContextPool.cpp b/dbms/src/IO/AIOContextPool.cpp index 336c03be7dd..1251bb651b3 100644 --- a/dbms/src/IO/AIOContextPool.cpp +++ b/dbms/src/IO/AIOContextPool.cpp @@ -1,4 +1,4 @@ -#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER)) +#if defined(__linux__) #include #include diff --git a/dbms/src/IO/AIOContextPool.h b/dbms/src/IO/AIOContextPool.h index 3e1c4a039d7..8a2d3e4adbe 100644 --- a/dbms/src/IO/AIOContextPool.h +++ b/dbms/src/IO/AIOContextPool.h @@ -1,6 +1,6 @@ #pragma once -#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER)) +#if defined(__linux__) #include #include diff --git a/dbms/src/IO/ReadBufferAIO.cpp b/dbms/src/IO/ReadBufferAIO.cpp index 9243b65e48e..ca50e11db7e 100644 --- a/dbms/src/IO/ReadBufferAIO.cpp +++ b/dbms/src/IO/ReadBufferAIO.cpp @@ -1,4 +1,4 @@ -#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER)) +#if defined(__linux__) #include #include diff --git a/dbms/src/IO/ReadBufferAIO.h b/dbms/src/IO/ReadBufferAIO.h index 77e35f8e35a..a30057565c0 100644 --- a/dbms/src/IO/ReadBufferAIO.h +++ b/dbms/src/IO/ReadBufferAIO.h @@ -1,6 +1,6 @@ #pragma once -#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER)) +#if defined(__linux__) #include #include diff --git a/dbms/src/IO/ReadBufferFromFileDescriptor.cpp b/dbms/src/IO/ReadBufferFromFileDescriptor.cpp index 9bacf699cc8..70cc84567f3 100644 --- a/dbms/src/IO/ReadBufferFromFileDescriptor.cpp +++ b/dbms/src/IO/ReadBufferFromFileDescriptor.cpp @@ -67,8 +67,9 @@ bool ReadBufferFromFileDescriptor::nextImpl() if (res > 0) bytes_read += res; - /// NOTE: it is quite inaccurate on high loads since the thread could be replaced by another one and we will count cpu time of other thread - /// It is better to use taskstats::blkio_delay_total, but it is quite expensive to get it (TaskStatsInfoGetter has about 500K RPS) + /// It reports real time spent including the time spent while thread was preempted doing nothing. + /// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables). + /// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it (TaskStatsInfoGetter has about 500K RPS). watch.stop(); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); diff --git a/dbms/src/IO/WriteBufferAIO.cpp b/dbms/src/IO/WriteBufferAIO.cpp index dcd42e3c8fe..e8da7a17add 100644 --- a/dbms/src/IO/WriteBufferAIO.cpp +++ b/dbms/src/IO/WriteBufferAIO.cpp @@ -1,4 +1,4 @@ -#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER)) +#if defined(__linux__) #include #include diff --git a/dbms/src/IO/WriteBufferAIO.h b/dbms/src/IO/WriteBufferAIO.h index f5b01637471..7b8d275dfcd 100644 --- a/dbms/src/IO/WriteBufferAIO.h +++ b/dbms/src/IO/WriteBufferAIO.h @@ -1,6 +1,6 @@ #pragma once -#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER)) +#if defined(__linux__) #include #include diff --git a/dbms/src/IO/createReadBufferFromFileBase.cpp b/dbms/src/IO/createReadBufferFromFileBase.cpp index beb73eda861..b16189c9e5d 100644 --- a/dbms/src/IO/createReadBufferFromFileBase.cpp +++ b/dbms/src/IO/createReadBufferFromFileBase.cpp @@ -1,6 +1,6 @@ #include #include -#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER) +#if defined(__linux__) #include #endif #include @@ -14,10 +14,10 @@ namespace ProfileEvents namespace DB { -#if defined(__APPLE__) || defined(__FreeBSD__) || defined(_MSC_VER) +#if !defined(__linux__) namespace ErrorCodes { - extern const int NOT_IMPLEMENTED; + extern const int NOT_IMPLEMENTED; } #endif @@ -31,7 +31,7 @@ std::unique_ptr createReadBufferFromFileBase(const std:: } else { -#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER) +#if defined(__linux__) ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO); return std::make_unique(filename_, buffer_size_, flags_, existing_memory_); #else diff --git a/dbms/src/IO/createWriteBufferFromFileBase.cpp b/dbms/src/IO/createWriteBufferFromFileBase.cpp index de06e2eb1cb..b5670b0b16b 100644 --- a/dbms/src/IO/createWriteBufferFromFileBase.cpp +++ b/dbms/src/IO/createWriteBufferFromFileBase.cpp @@ -1,6 +1,6 @@ #include #include -#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER) +#if defined(__linux__) #include #endif #include @@ -15,10 +15,10 @@ namespace ProfileEvents namespace DB { -#if defined(__APPLE__) || defined(__FreeBSD__) || defined(_MSC_VER) +#if !defined(__linux__) namespace ErrorCodes { - extern const int NOT_IMPLEMENTED; + extern const int NOT_IMPLEMENTED; } #endif @@ -33,7 +33,7 @@ WriteBufferFromFileBase * createWriteBufferFromFileBase(const std::string & file } else { -#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER) +#if defined(__linux__) ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO); return new WriteBufferAIO(filename_, buffer_size_, flags_, mode, existing_memory_); #else diff --git a/dbms/src/Interpreters/SystemLog.h b/dbms/src/Interpreters/SystemLog.h index 5bf3dcfd200..21c0cd8ba1c 100644 --- a/dbms/src/Interpreters/SystemLog.h +++ b/dbms/src/Interpreters/SystemLog.h @@ -51,7 +51,7 @@ namespace DB */ -#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1024 +#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576 class Context; class QueryLog; diff --git a/dbms/src/Interpreters/tests/CMakeLists.txt b/dbms/src/Interpreters/tests/CMakeLists.txt index 04808feb926..fb79250cd7c 100644 --- a/dbms/src/Interpreters/tests/CMakeLists.txt +++ b/dbms/src/Interpreters/tests/CMakeLists.txt @@ -48,5 +48,7 @@ add_check(in_join_subqueries_preprocessor) add_executable (users users.cpp) target_link_libraries (users dbms ${Boost_FILESYSTEM_LIBRARY}) -add_executable (internal_iotop internal_iotop.cpp) -target_link_libraries (internal_iotop dbms) +if (OS_LINUX) + add_executable (internal_iotop internal_iotop.cpp) + target_link_libraries (internal_iotop dbms) +endif () diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp index c7e49f59e7a..d7a0294a1ab 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp @@ -36,7 +36,7 @@ void BackgroundProcessingPoolTaskInfo::wake() Poco::Timestamp current_time; { - std::unique_lock lock(pool.tasks_mutex); + std::unique_lock lock(pool.tasks_mutex); auto next_time_to_execute = iterator->first; auto this_task_handle = iterator->second; @@ -58,12 +58,6 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_) { LOG_INFO(&Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with " << size << " threads"); - /// Put all threads to one thread group - /// The master thread exits immediately - CurrentThread::initializeQuery(); - thread_group = CurrentThread::getGroup(); - CurrentThread::detachQuery(); - threads.resize(size); for (auto & thread : threads) thread = std::thread([this] { threadFunction(); }); @@ -77,7 +71,7 @@ BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Tas Poco::Timestamp current_time; { - std::unique_lock lock(tasks_mutex); + std::unique_lock lock(tasks_mutex); res->iterator = tasks.emplace(current_time, res); } @@ -93,11 +87,11 @@ void BackgroundProcessingPool::removeTask(const TaskHandle & task) /// Wait for all executions of this task. { - std::unique_lock wlock(task->rwlock); + std::unique_lock wlock(task->rwlock); } { - std::unique_lock lock(tasks_mutex); + std::unique_lock lock(tasks_mutex); tasks.erase(task->iterator); } } @@ -122,10 +116,22 @@ void BackgroundProcessingPool::threadFunction() { setThreadName("BackgrProcPool"); - /// Put all threads to one thread pool - CurrentThread::attachTo(thread_group); - SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); }); + { + std::lock_guard lock(tasks_mutex); + if (thread_group) + { + /// Put all threads to one thread pool + CurrentThread::attachTo(thread_group); + } + else + { + CurrentThread::initializeQuery(); + thread_group = CurrentThread::getGroup(); + } + } + + SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); }); CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool); pcg64 rng(randomSeed()); @@ -141,7 +147,7 @@ void BackgroundProcessingPool::threadFunction() Poco::Timestamp min_time; { - std::unique_lock lock(tasks_mutex); + std::unique_lock lock(tasks_mutex); if (!tasks.empty()) { @@ -162,7 +168,7 @@ void BackgroundProcessingPool::threadFunction() if (!task) { - std::unique_lock lock(tasks_mutex); + std::unique_lock lock(tasks_mutex); wake_event.wait_for(lock, std::chrono::duration(sleep_seconds + std::uniform_real_distribution(0, sleep_seconds_random_part)(rng))); @@ -173,12 +179,12 @@ void BackgroundProcessingPool::threadFunction() Poco::Timestamp current_time; if (min_time > current_time) { - std::unique_lock lock(tasks_mutex); + std::unique_lock lock(tasks_mutex); wake_event.wait_for(lock, std::chrono::microseconds( min_time - current_time + std::uniform_int_distribution(0, sleep_seconds_random_part * 1000000)(rng))); } - std::shared_lock rlock(task->rwlock); + std::shared_lock rlock(task->rwlock); if (task->removed) continue; @@ -202,7 +208,7 @@ void BackgroundProcessingPool::threadFunction() Poco::Timestamp next_time_to_execute = Poco::Timestamp() + (done_work ? 0 : sleep_seconds * 1000000); { - std::unique_lock lock(tasks_mutex); + std::unique_lock lock(tasks_mutex); if (task->removed) continue; diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp index fd2fdb50897..98de7b0399c 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -298,16 +298,19 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart( MergeTreeData::DataPart::Checksums * additional_column_checksums) { /// Finish columns serialization. - auto & settings = storage.context.getSettingsRef(); - IDataType::SerializeBinaryBulkSettings serialize_settings; - serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size; - serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0; - OffsetColumns offset_columns; - auto it = columns_list.begin(); - for (size_t i = 0; i < columns_list.size(); ++i, ++it) + if (!serialization_states.empty()) { - serialize_settings.getter = createStreamGetter(it->name, offset_columns, false); - it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]); + auto & settings = storage.context.getSettingsRef(); + IDataType::SerializeBinaryBulkSettings serialize_settings; + serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size; + serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0; + OffsetColumns offset_columns; + auto it = columns_list.begin(); + for (size_t i = 0; i < columns_list.size(); ++i, ++it) + { + serialize_settings.getter = createStreamGetter(it->name, offset_columns, false); + it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]); + } } if (!total_column_list) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 0b63a52e83d..bb97aabe691 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -52,34 +52,6 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage check_period_ms = storage.data.settings.check_delay_period * 1000; task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); }); - task->schedule(); -} - -ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread() -{ - try - { - /// Stop restarting_thread before stopping other tasks - so that it won't restart them again. - need_stop = true; - task->deactivate(); - LOG_TRACE(log, "Restarting thread finished"); - - /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. - storage.fetcher.blocker.cancelForever(); - storage.merger_mutator.actions_blocker.cancelForever(); - - /// Stop other tasks. - - partialShutdown(); - - if (storage.queue_task_handle) - storage.context.getBackgroundPool().removeTask(storage.queue_task_handle); - storage.queue_task_handle.reset(); - } - catch (...) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - } } void ReplicatedMergeTreeRestartingThread::run() @@ -215,10 +187,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.alter_thread.start(); storage.part_check_thread.start(); - if (!storage.queue_task_handle) - storage.queue_task_handle = storage.context.getBackgroundPool().addTask( - std::bind(&StorageReplicatedMergeTree::queueTask, &storage)); - return true; } catch (...) @@ -368,4 +336,16 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() LOG_TRACE(log, "Threads finished"); } + +void ReplicatedMergeTreeRestartingThread::shutdown() +{ + /// Stop restarting_thread before stopping other tasks - so that it won't restart them again. + need_stop = true; + task->deactivate(); + LOG_TRACE(log, "Restarting thread finished"); + + /// Stop other tasks. + partialShutdown(); +} + } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h index 1192ec45703..28314a7d2c1 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h @@ -23,10 +23,17 @@ class ReplicatedMergeTreeRestartingThread { public: ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_); - ~ReplicatedMergeTreeRestartingThread(); + + void start() + { + task->activate(); + task->schedule(); + } void wakeup() { task->schedule(); } + void shutdown(); + private: StorageReplicatedMergeTree & storage; String log_name; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 586405acf75..1ac71665580 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -215,7 +215,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( [this] (const std::string & name) { enqueuePartForCheck(name); }), reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()), queue(*this), fetcher(data), - cleanup_thread(*this), alter_thread(*this), part_check_thread(*this), + cleanup_thread(*this), alter_thread(*this), part_check_thread(*this), restarting_thread(*this), log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)")) { if (path_.empty()) @@ -2063,9 +2063,7 @@ void StorageReplicatedMergeTree::queueUpdatingTask() if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) { - /// Can be called before starting restarting_thread - if (restarting_thread) - restarting_thread->wakeup(); + restarting_thread.wakeup(); return; } @@ -2787,8 +2785,10 @@ void StorageReplicatedMergeTree::startup() data_parts_exchange_endpoint_holder = std::make_shared( data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, context.getInterserverIOHandler()); + queue_task_handle = context.getBackgroundPool().addTask([this] { return queueTask(); }); + /// In this thread replica will be activated. - restarting_thread = std::make_unique(*this); + restarting_thread.start(); /// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attmept to do it startup_event.wait(); @@ -2797,15 +2797,21 @@ void StorageReplicatedMergeTree::startup() void StorageReplicatedMergeTree::shutdown() { - restarting_thread.reset(); + /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. + fetcher.blocker.cancelForever(); + merger_mutator.actions_blocker.cancelForever(); + + restarting_thread.shutdown(); + + if (queue_task_handle) + context.getBackgroundPool().removeTask(queue_task_handle); + queue_task_handle.reset(); if (data_parts_exchange_endpoint_holder) { data_parts_exchange_endpoint_holder->getBlocker().cancelForever(); data_parts_exchange_endpoint_holder = nullptr; } - - fetcher.blocker.cancelForever(); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index ebe28524910..180f893e56c 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -198,7 +198,6 @@ private: void clearOldPartsAndRemoveFromZK(); friend class ReplicatedMergeTreeBlockOutputStream; - friend class ReplicatedMergeTreeRestartingThread; friend class ReplicatedMergeTreePartCheckThread; friend class ReplicatedMergeTreeCleanupThread; friend class ReplicatedMergeTreeAlterThread; @@ -303,7 +302,7 @@ private: ReplicatedMergeTreePartCheckThread part_check_thread; /// A thread that processes reconnection to ZooKeeper when the session expires. - std::unique_ptr restarting_thread; + ReplicatedMergeTreeRestartingThread restarting_thread; /// An event that awakens `alter` method from waiting for the completion of the ALTER query. zkutil::EventPtr alter_query_event = std::make_shared(); diff --git a/docs/ru/data_types/date.md b/docs/ru/data_types/date.md index ff9e40cc7ab..c458deaf627 100644 --- a/docs/ru/data_types/date.md +++ b/docs/ru/data_types/date.md @@ -1,3 +1,5 @@ + + # Date Дата. Хранится в двух байтах в виде (беззнакового) числа дней, прошедших от 1970-01-01. Позволяет хранить значения от чуть больше, чем начала unix-эпохи до верхнего порога, определяющегося константой на этапе компиляции (сейчас - до 2106 года, последний полностью поддерживаемый год - 2105). diff --git a/docs/ru/operations/table_engines/collapsingmergetree.md b/docs/ru/operations/table_engines/collapsingmergetree.md index 6336c0d3000..83dfaf99d72 100644 --- a/docs/ru/operations/table_engines/collapsingmergetree.md +++ b/docs/ru/operations/table_engines/collapsingmergetree.md @@ -1,3 +1,5 @@ + + # CollapsingMergeTree *Движок достаточно специфичен для Яндекс.Метрики.* @@ -33,5 +35,3 @@ CollapsingMergeTree(EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID 1. Написать запрос с GROUP BY и агрегатными функциями, учитывающими знак. Например, чтобы посчитать количество, надо вместо count() написать sum(Sign); чтобы посчитать сумму чего-либо, надо вместо sum(x) написать sum(Sign \* x) и т. п., а также добавить HAVING sum(Sign) `>` 0. Не все величины можно посчитать подобным образом. Например, агрегатные функции min, max не могут быть переписаны. 2. Если необходимо вынимать данные без агрегации (например, проверить наличие строк, самые новые значения которых удовлетворяют некоторым условиям), можно использовать модификатор FINAL для секции FROM. Это вариант существенно менее эффективен. - - diff --git a/docs/ru/operations/table_engines/merge.md b/docs/ru/operations/table_engines/merge.md index aa5d44e71f5..7aa5ebd3348 100644 --- a/docs/ru/operations/table_engines/merge.md +++ b/docs/ru/operations/table_engines/merge.md @@ -1,3 +1,5 @@ + + # Merge Движок `Merge` (не путайте с движком `MergeTree`) не хранит данные самостоятельно, а позволяет читать одновременно из произвольного количества других таблиц. diff --git a/docs/ru/operations/table_engines/mergetree.md b/docs/ru/operations/table_engines/mergetree.md index 62ea3dc2e2f..4d035bcc216 100644 --- a/docs/ru/operations/table_engines/mergetree.md +++ b/docs/ru/operations/table_engines/mergetree.md @@ -2,55 +2,174 @@ # MergeTree -Движок MergeTree поддерживает индекс по первичному ключу и по дате и обеспечивает возможность обновления данных в реальном времени. -Это наиболее продвинутый движок таблиц в ClickHouse. Не путайте с движком Merge. +Движок `MergeTree`, а также другие движки этого семейства (`*MergeTree`) — это наиболее функциональные движки таблиц ClickHousе. -Движок принимает параметры: имя столбца типа Date, содержащего дату; выражение для семплирования (не обязательно); кортеж, определяющий первичный ключ таблицы; гранулированность индекса. +!!!info + Движок [Merge](merge.md#table_engine-merge) не относится к семейству `*MergeTree`. -Пример без поддержки сэмплирования. +Основные возможности: -```text -MergeTree(EventDate, (CounterID, EventDate), 8192) +- Хранит данные, отсортированные по первичному ключу. + + Это позволяет создавать разреженный индекс небольшого объёма, который позволяет быстрее находить данные. + +- Позволяет оперировать партициями, если задан [ключ партиционирования](custom_partitioning_key.md#table_engines-custom_partitioning_key). + + ClickHouse поддерживает отдельные операции с партициями, которые работают эффективнее, чем общие операции с этим же результатом над этими же данными. Также, ClickHouse автоматически отсекает данные по партициям там, где ключ партиционирования указан в запросе. Это также увеличивает эффективность выполнения запросов. + +- Поддерживает репликацию данных. + + Для этого используется семейство таблиц `ReplicatedMergeTree`. Подробнее читайте в разделе [Репликация данных](replication.md#table_engines-replication). + +- Поддерживает сэмплирование данных. + + При необходимости можно задать способ сэмплирования данных в таблице. + + +## Конфигурирование движка при создании таблицы + +``` +ENGINE [=] MergeTree() [PARTITION BY expr] [ORDER BY expr] [SAMPLE BY expr] [SETTINGS name=value, ...] ``` -Пример с поддержкой сэмплирования. +**Секции ENGINE** -```text +- `ORDER BY` — первичный ключ. + + Кортеж столбцов или произвольных выражений. Пример: `ORDER BY (CounerID, EventDate)`. + Если используется ключ сэмплирования, то первичный ключ должен содержать его. Пример: `ORDER BY (CounerID, EventDate, intHash32(UserID))`. + +- `PARTITION BY` — [ключ партиционирования](custom_partitioning_key.md#table_engines-custom_partitioning_key). + + Для партиционирования по месяцам используйте выражение `toYYYYMM(date_column)`, где `date_column` — столбец с датой типа [Date](../../data_types/date.md#data_type-date). В этом случае имена партиций имеют формат `"YYYYMM"`. + +- `SAMPLE BY` — выражение для сэмплирования (не обязательно). Пример: `intHash32(UserID))`. + +- `SETTINGS` — дополнительные параметры, регулирующие поведение `MergeTree` (не обязательно): + + - `index_granularity` — гранулярность индекса. Число строк данных между «засечками» индекса. По умолчанию — 8192. + +**Пример** + +``` +ENGINE MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity=8192 +``` + +В примере мы устанавливаем партиционирование по месяцам. + +Также мы задаем выражение для сэмплирования в виде хэша по идентификатору посетителя. Это позволяет псевдослучайным образом перемешать данные в таблице для каждого `CounterID` и `EventDate`. Если при выборке данных задать секцию [SAMPLE](../../query_language/select.md#select-section-sample) то ClickHouse вернёт равномерно-псевдослучайную выборку данных для подмножества посетителей. + +`index_granularity` можно было не указывать, поскольку 8192 — это значение по умолчанию. + +### Устаревший способ конфигурирования движка + +!!!attention + Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ описанный выше. + +``` +ENGINE [=] MergeTree(date-column [, sampling_expression], (primary, key), index_granularity) +``` + +**Параметры MergeTree()** + +- `date-column` — имя столбца с типом [Date](../../data_types/date.md#data_type-date). На основе этого столбца ClickHouse автоматически создаёт партиции по месяцам. Имена партиций имеют формат `"YYYYMM"`. +- `sampling_expression` — выражение для сэмплирования. +- `(primary, key)` — первичный ключ. Тип — [Tuple()](../../data_types/tuple.md#data_type-tuple). Может состоять из произвольных выражений, но обычно это кортеж столбцов. Обязательно должен включать в себя выражение для сэмплирования, если оно задано. Не обязан включать в себя столбец с датой `date-column`. +- `index_granularity` — гранулярность индекса. Число строк данных между «засечками» индекса. Для большинства задач подходит значение 8192. + +**Пример** + +``` MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID)), 8192) ``` -В таблице типа MergeTree обязательно должен быть отдельный столбец, содержащий дату, здесь это столбец EventDate. Тип столбца с датой — обязательно Date (а не DateTime). +Движок `MergeTree` сконфигурирован таким же образом, как и в примере выше для основного способа конфигурирования движка. -Первичным ключом может быть кортеж из произвольных выражений (обычно это просто кортеж столбцов) или одно выражение. +## Хранение данных -Выражение для сэмплирования (использовать не обязательно) — произвольное выражение. Оно должно также присутствовать в первичном ключе. В примере используется хэширование по идентификатору посетителя, чтобы псевдослучайно перемешать данные в таблице для каждого CounterID и EventDate. То есть, при использовании секции SAMPLE в запросе вы получите равномерно-псевдослучайную выборку данных для подмножества посетителей. +Таблица состоит из *кусков* данных (data parts), отсортированных по первичному ключу. -Таблица реализована как набор кусочков. Каждый кусочек сортирован по первичному ключу. Также для каждого кусочка прописана минимальная и максимальная дата. При вставке в таблицу создаётся новый сортированный кусочек. В фоне периодически инициируется процесс слияния. При слиянии выбирается несколько кусочков, обычно наименьших, и сливаются в один большой сортированный кусочек. +При вставке в таблицу создаются отдельные куски данных, каждый из которых лексикографически отсортирован по первичному ключу. Например, если первичный ключ — `(CounterID, Date)`, то данные в куске будут лежать в порядке `CounterID`, а для каждого `CounterID` в порядке `Date`. -То есть, при вставке в таблицу производится инкрементальная сортировка. Слияние реализовано таким образом, что таблица постоянно состоит из небольшого количества сортированных кусочков, а также само слияние делает не слишком много работы. +Данные, относящиеся к разным партициям, разбиваются на разные куски. В фоновом режиме ClickHouse выполняет слияния (merge) кусков данных для более эффективного хранения. Куски, относящиеся к разным партициям не объединяются. -При вставке данные, относящиеся к разным месяцам, разбиваются на разные кусочки. Кусочки, соответствующие разным месяцам, никогда не объединяются. Это сделано, чтобы обеспечить локальность модификаций данных (для упрощения бэкапов). +Для каждого куска данных ClickHouse создаёт индексный файл, который содержит значение первичного ключа для каждой индексной строки («засечка»). Номера индексных строк определяются как `n * index_granularity`, а максимальное значение `n` равно целой части от деления общего количества строк на `index_granularity`. Для каждого столбца также пишутся «засечки» для тех же индексных строк, что и для первичного ключа, эти «засечки» позволяют находить непосредственно данные в столбцах. -Кусочки объединяются до некоторого предельного размера, чтобы не было слишком длительных слияний. +Вы можете использовать одну большую таблицу, постоянно добавляя в неё данные пачками, именно для этого предназначен движок `MergeTree`. -Для каждого кусочка также пишется индексный файл. Индексный файл содержит значение первичного ключа для каждой index_granularity строки таблицы. То есть, это разреженный индекс сортированных данных. +## Первичные ключи и индексы в запросах -Для столбцов также пишутся «засечки» каждую index_granularity строку, чтобы данные можно было читать в определённом диапазоне. +Рассмотрим первичный ключ — `(CounterID, Date)`, в этом случае, сортировку и индекс можно проиллюстрировать следующим образом: -При чтении из таблицы, запрос SELECT анализируется на предмет того, можно ли использовать индексы. -Индекс может использоваться, если в секции WHERE/PREWHERE, в качестве одного из элементов конъюнкции, или целиком, есть выражение, представляющее операции сравнения на равенства, неравенства, а также IN или LIKE с фиксированным префиксом, над столбцами или выражениями, входящими в первичный ключ или ключ партиционирования, либо над некоторыми частистично монотонными функциями от этих столбцов, а также логические связки над такими выражениями. +``` +Whole data: [-------------------------------------------------------------------------] +CounterID: [aaaaaaaaaaaaaaaaaabbbbcdeeeeeeeeeeeeefgggggggghhhhhhhhhiiiiiiiiikllllllll] +Date: [1111111222222233331233211111222222333211111112122222223111112223311122333] +Marks: | | | | | | | | | | | + a,1 a,2 a,3 b,3 e,2 e,3 g,1 h,2 i,1 i,3 l,3 +Marks numbers: 0 1 2 3 4 5 6 7 8 9 10 +``` + +Если в запросе к данным указать: + +- `CounterID IN ('a', 'h')`, то сервер читает данные в диапазонах засечек `[0, 3)` и `[6, 8)`. +- `CounterID IN ('a', 'h') AND Date = 3`, то сервер читает данные в диапазонах засечек `[1, 3)` и `[7, 8)`. +- `Date = 3`, то сервер читает данные в диапазоне засечек `[1, 10)`. + +Примеры выше показывают, что использование индекса всегда эффективнее, чем full scan. + +Разреженный индекс допускает чтение лишних строк. При чтении одного диапазона первичного ключа, может быть прочитано до `index_granularity * 2` лишних строк в каждом блоке данных. В большинстве случаев ClickHouse не теряет производительности при `index_granularity = 8192`. + +Разреженность индекса позволяет работать даже с очень большим количеством строк в таблицах, поскольку такой индекс всегда помещается в оперативную память компьютера. + +ClickHouse не требует уникального первичного ключа. Можно вставить много строк с одинаковым первичным ключом. + +### Выбор первичного ключа + +Количество столбцов в первичном ключе не ограничено явным образом. В зависимости от структуры данных в первичный ключ можно включать больше или меньше столбцов. Это может: + +- Увеличить эффективность индекса. + + Пусть первичный ключ — `(a, b)`, тогда добавление ещё одного столбца `c` повысит эффективность, если выполнены условия: + + - Есть запросы с условием на столбец `c`. + - Часто встречаются достаточно длинные (в несколько раз больше `index_granularity`) диапазоны данных с одинаковыми значениями `(a, b)`. Иначе говоря, когда добавление ещё одного столбца позволит пропускать достаточно длинные диапазоны данных. + +- Улучшить сжатие данных. + + ClickHouse сортирует данные по первичному ключу, поэтому чем выше однородность, тем лучше сжатие. + +- Обеспечить дополнительную логику при слиянии в движках [CollapsingMergeTree](collapsingmergetree.md#table_engine-collapsingmergetree) и [SummingMergeTree](summingmergetree.md#table_engine-summingmergetree). + + Может потребоваться иметь много полей в первичном ключе, даже если они не нужны для выполнения предыдущих пунктов. + +Длинный первичный ключ будет негативно влиять на производительность вставки и потребление памяти, однако на производительность ClickHouse при запросах `SELECT` лишние столбцы в первичном ключе не влияют. + +### Использование индексов и партиций в запросах + +Для запросов `SELECT` ClickHouse анализирует возможность использования индекса. Индекс может использоваться, если в секции `WHERE/PREWHERE`, в качестве одного из элементов конъюнкции, или целиком, есть выражение, представляющее операции сравнения на равенства, неравенства, а также `IN` или `LIKE` с фиксированным префиксом, над столбцами или выражениями, входящими в первичный ключ или ключ партиционирования, либо над некоторыми частично монотонными функциями от этих столбцов, а также логические связки над такими выражениями. Таким образом, обеспечивается возможность быстро выполнять запросы по одному или многим диапазонам первичного ключа. Например, в указанном примере будут быстро работать запросы для конкретного счётчика; для конкретного счётчика и диапазона дат; для конкретного счётчика и даты, для нескольких счётчиков и диапазона дат и т. п. +Рассмотрим движок сконфигурированный следующим образом: + +``` +ENGINE MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate) SETTINGS index_granularity=8192 +``` + +В этом случае в запросах: + ```sql SELECT count() FROM table WHERE EventDate = toDate(now()) AND CounterID = 34 SELECT count() FROM table WHERE EventDate = toDate(now()) AND (CounterID = 34 OR CounterID = 42) SELECT count() FROM table WHERE ((EventDate >= toDate('2014-01-01') AND EventDate <= toDate('2014-01-31')) OR EventDate = toDate('2014-05-01')) AND CounterID IN (101500, 731962, 160656) AND (CounterID = 101500 OR EventDate != toDate('2014-05-01')) ``` -Во всех этих случаях будет использоваться индекс по дате и по первичному ключу. Видно, что индекс используется даже для достаточно сложных выражений. Чтение из таблицы организовано так, что использование индекса не может быть медленнее full scan-а. +ClickHouse будет использовать индекс по первичному ключу для отсечения не подходящих данных, а также ключ партиционирования по месяцам для отсечения партиций, которые находятся в не подходящих диапазонах дат. -В этом примере индекс не может использоваться. +Запросы выше показывают, что индекс используется даже для сложных выражений. Чтение из таблицы организовано так, что использование индекса не может быть медленнее, чем full scan. + +В примере ниже индекс не может использоваться. ```sql SELECT count() FROM table WHERE CounterID = 34 OR URL LIKE '%upyachka%' @@ -58,14 +177,11 @@ SELECT count() FROM table WHERE CounterID = 34 OR URL LIKE '%upyachka%' Чтобы проверить, сможет ли ClickHouse использовать индекс при выполнении запроса, используйте настройки [force_index_by_date](../settings/settings.md#settings-settings-force_index_by_date) и [force_primary_key](../settings/settings.md#settings-settings-force_primary_key). -Индекс по дате обеспечивает чтение только кусков, содержащих даты из нужного диапазона. При этом кусок данных может содержать данные за многие даты (до целого месяца), а в пределах одного куска данные лежат упорядоченными по первичному ключу, который может не содержать дату в качестве первого столбца. В связи с этим, при использовании запроса с указанием условия только на дату, но не на префикс первичного ключа, будет читаться данных больше, чем за одну дату. +Ключ партиционирования по месяцам обеспечивает чтение только тех блоков данных, которые содержат даты из нужного диапазона. При этом блок данных может содержать данные за многие даты (до целого месяца). В пределах одного блока данные упорядочены по первичному ключу, который может не содержать дату в качестве первого столбца. В связи с этим, при использовании запроса с указанием условия только на дату, но не на префикс первичного ключа, будет читаться данных больше, чем за одну дату. -Для конкуррентного доступа к таблице используется мультиверсионность. То есть, при одновременном чтении и обновлении таблицы, данные будут читаться из набора кусочков, актуального на момент запроса. Длинных блокировок нет. Вставки никак не мешают чтениям. + +## Конкурентный доступ к данным + +Для конкурентного доступа к таблице используется мультиверсионность. То есть, при одновременном чтении и обновлении таблицы, данные будут читаться из набора кусочков, актуального на момент запроса. Длинных блокировок нет. Вставки никак не мешают чтениям. Чтения из таблицы автоматически распараллеливаются. - -Поддерживается запрос `OPTIMIZE`, который вызывает один внеочередной шаг слияния. - -Вы можете использовать одну большую таблицу, постоянно добавляя в неё данные небольшими пачками, именно для этого предназначен движок MergeTree. - -Для всех типов таблиц семейства MergeTree возможна репликация данных — смотрите раздел «Репликация данных». diff --git a/docs/ru/operations/table_engines/replication.md b/docs/ru/operations/table_engines/replication.md index b5459b77b55..1dec0f4f42c 100644 --- a/docs/ru/operations/table_engines/replication.md +++ b/docs/ru/operations/table_engines/replication.md @@ -152,6 +152,8 @@ sudo -u clickhouse touch /var/lib/clickhouse/flags/force_restore_data Отсутствует ограничение на использование сетевой полосы при восстановлении. Имейте это ввиду, если восстанавливаете сразу много реплик. + + ## Преобразование из MergeTree в ReplicatedMergeTree Здесь и далее, под `MergeTree` подразумеваются все движки таблиц семейства `MergeTree`, так же для `ReplicatedMergeTree`. diff --git a/docs/ru/operations/table_engines/summingmergetree.md b/docs/ru/operations/table_engines/summingmergetree.md index 189aac06504..6aa2f116d72 100644 --- a/docs/ru/operations/table_engines/summingmergetree.md +++ b/docs/ru/operations/table_engines/summingmergetree.md @@ -1,3 +1,5 @@ + + # SummingMergeTree Отличается от `MergeTree` тем, что суммирует данные при слиянии. diff --git a/docs/ru/query_language/functions/rounding_functions.md b/docs/ru/query_language/functions/rounding_functions.md index a4fc9fa4a05..849c35013b9 100644 --- a/docs/ru/query_language/functions/rounding_functions.md +++ b/docs/ru/query_language/functions/rounding_functions.md @@ -17,10 +17,41 @@ N может быть отрицательным. В остальном, аналогично функции floor, см. выше. ## round(x\[, N\]) -Возвращает ближайшее к num круглое число, которое может быть меньше или больше или равно x. -Если x находится посередине от ближайших круглых чисел, то возвращается какое-либо одно из них (implementation specific). -Число -0. может считаться или не считаться круглым (implementation specific). -В остальном, аналогично функциям floor и ceil, см. выше. + +Реализует [банковское округление](https://en.wikipedia.org/wiki/Rounding#Round_half_to_even), т.е. округление до ближайшего чётного. + +**Аргументы функции** + +- `x` — число для округления. [Тип](../../data_types/index.md#data_types) — любой числовой. +- `N` — позиция цифры после запятой, до которой следует округлять. + +**Возвращаемое значение** + +Округлённое число того же типа, что и входное число `x`. + +**Пример** + +```sql +SELECT + number / 2 AS x, + round(x) +FROM system.numbers +LIMIT 10 +``` +``` +┌───x─┬─round(divide(number, 2))─┐ +│ 0 │ 0 │ +│ 0.5 │ 0 │ +│ 1 │ 1 │ +│ 1.5 │ 2 │ +│ 2 │ 2 │ +│ 2.5 │ 2 │ +│ 3 │ 3 │ +│ 3.5 │ 4 │ +│ 4 │ 4 │ +│ 4.5 │ 4 │ +└─────┴──────────────────────────┘ +``` ## roundToExp2(num) Принимает число. Если число меньше единицы - возвращает 0. Иначе округляет число вниз до ближайшей (целой неотрицательной) степени двух. diff --git a/docs/ru/query_language/select.md b/docs/ru/query_language/select.md index 232386c485f..d3e8b8a985b 100644 --- a/docs/ru/query_language/select.md +++ b/docs/ru/query_language/select.md @@ -46,6 +46,8 @@ SELECT [DISTINCT] expr_list Модификатор FINAL может быть использован только при SELECT-е из таблицы типа CollapsingMergeTree. При указании FINAL, данные будут выбираться полностью "сколлапсированными". Стоит учитывать, что использование FINAL приводит к выбору кроме указанных в SELECT-е столбцов также столбцов, относящихся к первичному ключу. Также, запрос будет выполняться в один поток, и при выполнении запроса будет выполняться слияние данных. Это приводит к тому, что при использовании FINAL, запрос выполняется медленнее. В большинстве случаев, следует избегать использования FINAL. Подробнее смотрите раздел "Движок CollapsingMergeTree". + + ### Секция SAMPLE Секция SAMPLE позволяет выполнить запрос приближённо. Приближённое выполнение запроса поддерживается только таблицами типа MergeTree\* и только если при создании таблицы было указано выражение, по которому производится выборка (смотрите раздел "Движок MergeTree"). diff --git a/docs/ru/query_language/table_functions/file.md b/docs/ru/query_language/table_functions/file.md index f3e35c7f8f0..9e029a6b729 100644 --- a/docs/ru/query_language/table_functions/file.md +++ b/docs/ru/query_language/table_functions/file.md @@ -2,17 +2,46 @@ # file -`file(path, format, structure)` - возвращает таблицу со столбцами, указанными в structure, созданную из файла path типа format. +Создаёт таблицу из файла. -path - относительный путь до файла от [user_files_path](../../operations/server_settings/settings.md#user_files_path). +``` +file(path, format, structure) +``` -format - [формат](../../interfaces/formats.md#formats) файла. +**Входные параметры** -structure - структура таблицы в форме 'UserID UInt64, URL String'. Определяет имена и типы столбцов. +- `path` — относительный путь до файла от [user_files_path](../../operations/server_settings/settings.md#user_files_path). +- `format` — [формат](../../interfaces/formats.md#formats) файла. +- `structure` — структура таблицы. Формат `'colunmn1_name column1_ype, column2_name column2_type, ...'`. + +**Возвращаемое значение** + +Таблица с указанной структурой, предназначенная для чтения или записи данных в указанном файле. **Пример** -```sql --- получение первых 10 строк таблицы, состоящей из трёх колонок типа UInt32 из CSV файла -SELECT * FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32') LIMIT 10 +Настройка `user_files_path` и содержимое файла `test.csv`: + +```bash +$ grep user_files_path /etc/clickhouse-server/config.xml + /var/lib/clickhouse/user_files/ + +$ cat /var/lib/clickhouse/user_files/test.csv + 1,2,3 + 3,2,1 + 78,43,45 +``` + +Таблица из `test.csv` и выборка первых двух строк из неё: + +```sql +SELECT * +FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32') +LIMIT 2 +``` +``` +┌─column1─┬─column2─┬─column3─┐ +│ 1 │ 2 │ 3 │ +│ 3 │ 2 │ 1 │ +└─────────┴─────────┴─────────┘ ```