Merge remote-tracking branch 'upstream/master' into fix3

This commit is contained in:
proller 2018-08-22 15:22:58 +03:00
commit b6dabaacb3
38 changed files with 648 additions and 406 deletions

View File

@ -482,6 +482,37 @@ private:
Poco::File(history_file).createFile();
}
#if USE_READLINE
/// Install Ctrl+C signal handler that will be used in interactive mode.
if (rl_initialize())
throw Exception("Cannot initialize readline", ErrorCodes::CANNOT_READLINE);
auto clear_prompt_or_exit = [](int)
{
/// This is signal safe.
ssize_t res = write(STDOUT_FILENO, "\n", 1);
/// Allow to quit client while query is in progress by pressing Ctrl+C twice.
/// (First press to Ctrl+C will try to cancel query by InterruptListener).
if (res == 1 && rl_line_buffer[0] && !RL_ISSTATE(RL_STATE_DONE))
{
rl_replace_line("", 0);
if (rl_forced_update_display())
_exit(0);
}
else
{
/// A little dirty, but we struggle to find better way to correctly
/// force readline to exit after returning from the signal handler.
_exit(0);
}
};
if (signal(SIGINT, clear_prompt_or_exit) == SIG_ERR)
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
#endif
loop();
std::cout << (isNewYearMode() ? "Happy new year." : "Bye.") << std::endl;
@ -1518,35 +1549,6 @@ public:
}
}
#if USE_READLINE
if (rl_initialize())
throw Exception("Cannot initialize readline", ErrorCodes::CANNOT_READLINE);
auto clear_prompt_or_exit = [](int)
{
/// This is signal safe.
ssize_t res = write(STDOUT_FILENO, "\n", 1);
/// Allow to quit client while query is in progress by pressing Ctrl+C twice.
/// (First press to Ctrl+C will try to cancel query by InterruptListener).
if (res == 1 && rl_line_buffer[0] && !RL_ISSTATE(RL_STATE_DONE))
{
rl_replace_line("", 0);
if (rl_forced_update_display())
_exit(0);
}
else
{
/// A little dirty, but we struggle to find better way to correctly
/// force readline to exit after returning from the signal handler.
_exit(0);
}
};
if (signal(SIGINT, clear_prompt_or_exit) == SIG_ERR)
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
#endif
ioctl(0, TIOCGWINSZ, &terminal_size);
namespace po = boost::program_options;

View File

@ -366,12 +366,16 @@ int Server::main(const std::vector<std::string> & /*args*/)
dns_cache_updater = std::make_unique<DNSCacheUpdater>(*global_context);
}
#if defined(__linux__)
if (!TaskStatsInfoGetter::checkPermissions())
{
LOG_INFO(log, "It looks like the process has no CAP_NET_ADMIN capability, some performance statistics will be disabled."
" It could happen due to incorrect ClickHouse package installation."
" You could resolve the problem manually with 'sudo setcap cap_net_admin=+ep /usr/bin/clickhouse'");
}
#else
LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled.");
#endif
{
Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0);

View File

@ -2,36 +2,27 @@
#include <Common/Exception.h>
#include <Core/Types.h>
#include <errno.h>
#if defined(__linux__)
#include <linux/genetlink.h>
#include <linux/netlink.h>
#include <linux/taskstats.h>
#include <linux/capability.h>
#endif
#include <unistd.h>
#if defined(__linux__)
#include <common/unaligned.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#if __has_include(<sys/syscall.h>)
#include <sys/syscall.h>
#else
#include <syscall.h>
#endif
#include <linux/genetlink.h>
#include <linux/netlink.h>
#include <linux/taskstats.h>
#include <linux/capability.h>
/// Basic idea is motivated by "iotop" tool.
/// More info: https://www.kernel.org/doc/Documentation/accounting/taskstats.txt
#define GENLMSG_DATA(glh) ((void *)((char*)NLMSG_DATA(glh) + GENL_HDRLEN))
#define GENLMSG_PAYLOAD(glh) (NLMSG_PAYLOAD(glh, 0) - GENL_HDRLEN)
#define NLA_DATA(na) ((void *)((char*)(na) + NLA_HDRLEN))
#define NLA_PAYLOAD(len) (len - NLA_HDRLEN)
namespace DB
{
@ -39,131 +30,203 @@ namespace DB
namespace ErrorCodes
{
extern const int NETLINK_ERROR;
extern const int LOGICAL_ERROR;
}
namespace
{
static size_t constexpr MAX_MSG_SIZE = 1024;
/** The message contains:
* - Netlink protocol header;
* - Generic Netlink (is a sub-protocol of Netlink that we use) protocol header;
* - Payload
* -- that itself is a list of "Attributes" (sub-messages), each of them contains length (including header), type, and its own payload.
* -- and attribute payload may be represented by the list of embedded attributes.
*/
struct NetlinkMessage
{
#if defined(__linux__)
::nlmsghdr n;
::genlmsghdr g;
char buf[MAX_MSG_SIZE];
#endif
static size_t constexpr MAX_MSG_SIZE = 1024;
alignas(NLMSG_ALIGNTO) ::nlmsghdr header;
struct Attribute
{
::nlattr header;
alignas(NLMSG_ALIGNTO) char payload[0];
const Attribute * next() const
{
return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + NLA_ALIGN(header.nla_len));
}
};
union alignas(NLMSG_ALIGNTO)
{
struct
{
::genlmsghdr generic_header;
union alignas(NLMSG_ALIGNTO)
{
char buf[MAX_MSG_SIZE];
Attribute attribute; /// First attribute. There may be more.
} payload;
};
::nlmsgerr error;
};
size_t payload_size() const
{
return header.nlmsg_len - sizeof(header) - sizeof(generic_header);
}
const Attribute * end() const
{
return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + header.nlmsg_len);
}
void send(int fd) const
{
const char * request_buf = reinterpret_cast<const char *>(this);
ssize_t request_size = header.nlmsg_len;
::sockaddr_nl nladdr{};
nladdr.nl_family = AF_NETLINK;
while (true)
{
ssize_t bytes_sent = ::sendto(fd, request_buf, request_size, 0, reinterpret_cast<const ::sockaddr *>(&nladdr), sizeof(nladdr));
if (bytes_sent <= 0)
{
if (errno == EAGAIN)
continue;
else
throwFromErrno("Can't send a Netlink command", ErrorCodes::NETLINK_ERROR);
}
if (bytes_sent > request_size)
throw Exception("Wrong result of sendto system call: bytes_sent is greater than request size", ErrorCodes::NETLINK_ERROR);
if (bytes_sent == request_size)
break;
request_buf += bytes_sent;
request_size -= bytes_sent;
}
}
void receive(int fd)
{
ssize_t bytes_received = ::recv(fd, this, sizeof(*this), 0);
if (header.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&header), bytes_received))
throw Exception("Can't receive Netlink response, error: " + std::to_string(error.error), ErrorCodes::NETLINK_ERROR);
}
};
void sendCommand(
int sock_fd,
UInt16 nlmsg_type,
UInt32 nlmsg_pid,
UInt8 genl_cmd,
UInt16 nla_type,
void * nla_data,
int nla_len)
NetlinkMessage query(
int fd,
UInt16 type,
UInt32 pid,
UInt8 command,
UInt16 attribute_type,
const void * attribute_data,
int attribute_size)
{
#if defined(__linux__)
NetlinkMessage msg{};
NetlinkMessage request;
msg.n.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN);
msg.n.nlmsg_type = nlmsg_type;
msg.n.nlmsg_flags = NLM_F_REQUEST;
msg.n.nlmsg_seq = 0;
msg.n.nlmsg_pid = nlmsg_pid;
msg.g.cmd = genl_cmd;
msg.g.version = 1;
request.header.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN); /// Length of both headers.
request.header.nlmsg_type = type;
request.header.nlmsg_flags = NLM_F_REQUEST; /// A request.
request.header.nlmsg_seq = 0;
request.header.nlmsg_pid = pid;
::nlattr * attr = static_cast<::nlattr *>(GENLMSG_DATA(&msg));
attr->nla_type = nla_type;
attr->nla_len = nla_len + 1 + NLA_HDRLEN;
request.generic_header.cmd = command;
request.generic_header.version = 1;
memcpy(NLA_DATA(attr), nla_data, nla_len);
msg.n.nlmsg_len += NLMSG_ALIGN(attr->nla_len);
request.payload.attribute.header.nla_type = attribute_type;
request.payload.attribute.header.nla_len = attribute_size + 1 + NLA_HDRLEN;
char * buf = reinterpret_cast<char *>(&msg);
ssize_t buflen = msg.n.nlmsg_len;
memcpy(&request.payload.attribute.payload, attribute_data, attribute_size);
::sockaddr_nl nladdr{};
nladdr.nl_family = AF_NETLINK;
request.header.nlmsg_len += NLMSG_ALIGN(request.payload.attribute.header.nla_len);
while (true)
{
ssize_t r = ::sendto(sock_fd, buf, buflen, 0, reinterpret_cast<const ::sockaddr *>(&nladdr), sizeof(nladdr));
request.send(fd);
if (r >= buflen)
break;
NetlinkMessage response;
response.receive(fd);
if (r > 0)
{
buf += r;
buflen -= r;
}
else if (errno != EAGAIN)
throwFromErrno("Can't send a Netlink command", ErrorCodes::NETLINK_ERROR);
}
#endif
return response;
}
UInt16 getFamilyId(int nl_sock_fd)
UInt16 getFamilyIdImpl(int fd)
{
#if defined(__linux__)
struct
{
::nlmsghdr header;
::genlmsghdr ge_header;
char buf[256];
} answer;
NetlinkMessage answer = query(fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY, CTRL_ATTR_FAMILY_NAME, TASKSTATS_GENL_NAME, strlen(TASKSTATS_GENL_NAME) + 1);
static char name[] = TASKSTATS_GENL_NAME;
/// NOTE Why the relevant info is located in the second attribute?
const NetlinkMessage::Attribute * attr = answer.payload.attribute.next();
sendCommand(
nl_sock_fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY,
CTRL_ATTR_FAMILY_NAME, (void *) name,
strlen(TASKSTATS_GENL_NAME) + 1);
if (attr->header.nla_type != CTRL_ATTR_FAMILY_ID)
throw Exception("Received wrong attribute as an answer to GET_FAMILY Netlink command", ErrorCodes::NETLINK_ERROR);
UInt16 id = 0;
ssize_t rep_len = ::recv(nl_sock_fd, &answer, sizeof(answer), 0);
if (rep_len < 0)
throwFromErrno("Cannot get the family id for " + std::string(TASKSTATS_GENL_NAME) + " from the Netlink socket", ErrorCodes::NETLINK_ERROR);
return unalignedLoad<UInt16>(attr->payload);
}
if (answer.header.nlmsg_type == NLMSG_ERROR ||!NLMSG_OK((&answer.header), rep_len))
throw Exception("Received an error instead of the family id for " + std::string(TASKSTATS_GENL_NAME)
+ " from the Netlink socket", ErrorCodes::NETLINK_ERROR);
const ::nlattr * attr;
attr = static_cast<const ::nlattr *>(GENLMSG_DATA(&answer));
attr = reinterpret_cast<const ::nlattr *>(reinterpret_cast<const char *>(attr) + NLA_ALIGN(attr->nla_len));
if (attr->nla_type == CTRL_ATTR_FAMILY_ID)
id = *static_cast<const UInt16 *>(NLA_DATA(attr));
bool checkPermissionsImpl()
{
/// See man getcap.
__user_cap_header_struct request{};
request.version = _LINUX_CAPABILITY_VERSION_1; /// It's enough to check just single CAP_NET_ADMIN capability we are interested.
request.pid = getpid();
return id;
#else
return 0;
#endif
__user_cap_data_struct response{};
/// Avoid dependency on 'libcap'.
if (0 != syscall(SYS_capget, &request, &response))
throwFromErrno("Cannot do 'capget' syscall", ErrorCodes::NETLINK_ERROR);
return (1 << CAP_NET_ADMIN) & response.effective;
}
UInt16 getFamilyId(int fd)
{
/// It is thread and exception safe since C++11 and even before.
static UInt16 res = getFamilyIdImpl(fd);
return res;
}
}
TaskStatsInfoGetter::TaskStatsInfoGetter() = default;
void TaskStatsInfoGetter::init()
bool TaskStatsInfoGetter::checkPermissions()
{
#if defined(__linux__)
if (netlink_socket_fd >= 0)
return;
static bool res = checkPermissionsImpl();
return res;
}
TaskStatsInfoGetter::TaskStatsInfoGetter()
{
if (!checkPermissions())
throw Exception("Logical error: TaskStatsInfoGetter is not usable without CAP_NET_ADMIN. Check permissions before creating the object.",
ErrorCodes::LOGICAL_ERROR);
netlink_socket_fd = ::socket(PF_NETLINK, SOCK_RAW, NETLINK_GENERIC);
if (netlink_socket_fd < 0)
throwFromErrno("Can't create PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
/// On some containerized environments, operation on Netlink socket could hang forever.
/// We set reasonably small timeout to overcome this issue.
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 50000;
@ -177,112 +240,41 @@ void TaskStatsInfoGetter::init()
if (::bind(netlink_socket_fd, reinterpret_cast<const ::sockaddr *>(&addr), sizeof(addr)) < 0)
throwFromErrno("Can't bind PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
netlink_family_id = getFamilyId(netlink_socket_fd);
#endif
taskstats_family_id = getFamilyId(netlink_socket_fd);
}
#if defined(__linux__)
void TaskStatsInfoGetter::getStatImpl(int tid, ::taskstats & out_stats)
void TaskStatsInfoGetter::getStat(::taskstats & out_stats, pid_t tid)
{
init();
NetlinkMessage answer = query(netlink_socket_fd, taskstats_family_id, tid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &tid, sizeof(tid));
sendCommand(netlink_socket_fd, netlink_family_id, tid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &tid, sizeof(pid_t));
NetlinkMessage msg;
ssize_t rv = ::recv(netlink_socket_fd, &msg, sizeof(msg), 0);
if (msg.n.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&msg.n), rv))
for (const NetlinkMessage::Attribute * attr = &answer.payload.attribute;
attr < answer.end();
attr = attr->next())
{
const ::nlmsgerr * err = static_cast<const ::nlmsgerr *>(NLMSG_DATA(&msg));
throw Exception("Can't get Netlink response, error: " + std::to_string(err->error), ErrorCodes::NETLINK_ERROR);
}
rv = GENLMSG_PAYLOAD(&msg.n);
const ::nlattr * attr = static_cast<const ::nlattr *>(GENLMSG_DATA(&msg));
ssize_t len = 0;
while (len < rv)
{
len += NLA_ALIGN(attr->nla_len);
if (attr->nla_type == TASKSTATS_TYPE_AGGR_TGID || attr->nla_type == TASKSTATS_TYPE_AGGR_PID)
if (attr->header.nla_type == TASKSTATS_TYPE_AGGR_TGID || attr->header.nla_type == TASKSTATS_TYPE_AGGR_PID)
{
int aggr_len = NLA_PAYLOAD(attr->nla_len);
int len2 = 0;
attr = static_cast<const ::nlattr *>(NLA_DATA(attr));
while (len2 < aggr_len)
for (const NetlinkMessage::Attribute * nested_attr = reinterpret_cast<const NetlinkMessage::Attribute *>(attr->payload);
nested_attr < attr->next();
nested_attr = nested_attr->next())
{
if (attr->nla_type == TASKSTATS_TYPE_STATS)
if (nested_attr->header.nla_type == TASKSTATS_TYPE_STATS)
{
const ::taskstats * ts = static_cast<const ::taskstats *>(NLA_DATA(attr));
out_stats = *ts;
out_stats = unalignedLoad<::taskstats>(nested_attr->payload);
return;
}
len2 += NLA_ALIGN(attr->nla_len);
attr = reinterpret_cast<const ::nlattr *>(reinterpret_cast<const char *>(attr) + len2);
}
}
attr = reinterpret_cast<const ::nlattr *>(reinterpret_cast<const char *>(GENLMSG_DATA(&msg)) + len);
}
throw Exception("There is no TASKSTATS_TYPE_STATS attribute in the Netlink response", ErrorCodes::NETLINK_ERROR);
}
void TaskStatsInfoGetter::getStat(::taskstats & stat, int tid)
pid_t TaskStatsInfoGetter::getCurrentTID()
{
tid = tid < 0 ? getDefaultTID() : tid;
getStatImpl(tid, stat);
}
#endif
int TaskStatsInfoGetter::getCurrentTID()
{
#if defined(__linux__)
/// This call is always successful. - man gettid
return static_cast<int>(syscall(SYS_gettid));
#else
return 0;
#endif
}
int TaskStatsInfoGetter::getDefaultTID()
{
if (default_tid < 0)
default_tid = getCurrentTID();
return default_tid;
}
static bool checkPermissionsImpl()
{
#if defined(__linux__)
/// See man getcap.
__user_cap_header_struct request{};
request.version = _LINUX_CAPABILITY_VERSION_1; /// It's enough to check just single CAP_NET_ADMIN capability we are interested.
request.pid = getpid();
__user_cap_data_struct response{};
/// Avoid dependency on 'libcap'.
if (0 != syscall(SYS_capget, &request, &response))
throwFromErrno("Cannot do 'capget' syscall", ErrorCodes::NETLINK_ERROR);
return (1 << CAP_NET_ADMIN) & response.effective;
#else
return false;
#endif
}
bool TaskStatsInfoGetter::checkPermissions()
{
/// It is thread- and exception- safe since C++11
static bool res = checkPermissionsImpl();
return res;
return static_cast<pid_t>(syscall(SYS_gettid));
}
@ -293,3 +285,42 @@ TaskStatsInfoGetter::~TaskStatsInfoGetter()
}
}
#else
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
bool TaskStatsInfoGetter::checkPermissions()
{
return false;
}
TaskStatsInfoGetter::TaskStatsInfoGetter()
{
throw Exception("TaskStats are not implemented for this OS.", ErrorCodes::NOT_IMPLEMENTED);
}
void TaskStatsInfoGetter::getStat(::taskstats &, pid_t)
{
}
pid_t TaskStatsInfoGetter::getCurrentTID()
{
return 0;
}
TaskStatsInfoGetter::~TaskStatsInfoGetter()
{
}
}
#endif

View File

@ -1,47 +1,39 @@
#pragma once
#include <sys/types.h>
#include <Core/Types.h>
#include <boost/noncopyable.hpp>
#if defined(__linux__)
struct taskstats;
#else
struct taskstats {};
#endif
namespace DB
{
class Exception;
/// Get taskstat info from OS kernel via Netlink protocol.
class TaskStatsInfoGetter
class TaskStatsInfoGetter : private boost::noncopyable
{
public:
TaskStatsInfoGetter();
TaskStatsInfoGetter(const TaskStatsInfoGetter &) = delete;
#if defined(__linux__)
void getStat(::taskstats & stat, int tid = -1);
#endif
~TaskStatsInfoGetter();
void getStat(::taskstats & stat, pid_t tid);
/// Make a syscall and returns Linux thread id
static int getCurrentTID();
static pid_t getCurrentTID();
/// Whether the current process has permissions (sudo or cap_net_admin capabilties) to get taskstats info
static bool checkPermissions();
private:
/// Caches current thread tid to avoid extra sys calls
int getDefaultTID();
int default_tid = -1;
#if defined(__linux__)
void getStatImpl(int tid, ::taskstats & out_stats);
#endif
void init();
private:
int netlink_socket_fd = -1;
UInt16 netlink_family_id = 0;
UInt16 taskstats_family_id = 0;
#endif
};
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <Common/TaskStatsInfoGetter.h>
#include <Common/ProfileEvents.h>
#include <sys/time.h>
@ -10,6 +10,7 @@
#include <linux/taskstats.h>
#endif
namespace ProfileEvents
{
extern const Event RealTimeMicroseconds;
@ -20,6 +21,7 @@ namespace ProfileEvents
extern const Event VoluntaryContextSwitches;
extern const Event InvoluntaryContextSwitches;
#if defined(__linux__)
extern const Event OSIOWaitMicroseconds;
extern const Event OSCPUWaitMicroseconds;
extern const Event OSCPUVirtualTimeMicroseconds;
@ -27,6 +29,7 @@ namespace ProfileEvents
extern const Event OSWriteChars;
extern const Event OSReadBytes;
extern const Event OSWriteBytes;
#endif
}
@ -85,12 +88,7 @@ struct RUsageCounters
static RUsageCounters current(UInt64 real_time_ = getCurrentTimeNanoseconds())
{
::rusage rusage;
#if defined(__APPLE__)
::getrusage(RUSAGE_SELF, &rusage);
#else
::getrusage(RUSAGE_THREAD, &rusage);
#endif
return RUsageCounters(rusage, real_time_);
}
@ -113,18 +111,18 @@ struct RUsageCounters
};
#if defined(__linux__)
struct TasksStatsCounters
{
#if defined(__linux__)
::taskstats stat;
#endif
TasksStatsCounters() = default;
static TasksStatsCounters current();
static void incrementProfileEvents(const TasksStatsCounters & prev, const TasksStatsCounters & curr, ProfileEvents::Counters & profile_events)
{
#if defined(__linux__)
profile_events.increment(ProfileEvents::OSCPUWaitMicroseconds,
safeDiff(prev.stat.cpu_delay_total, curr.stat.cpu_delay_total) / 1000U);
profile_events.increment(ProfileEvents::OSIOWaitMicroseconds,
@ -140,7 +138,6 @@ struct TasksStatsCounters
profile_events.increment(ProfileEvents::OSWriteChars, safeDiff(prev.stat.write_char, curr.stat.write_char));
profile_events.increment(ProfileEvents::OSReadBytes, safeDiff(prev.stat.read_bytes, curr.stat.read_bytes));
profile_events.increment(ProfileEvents::OSWriteBytes, safeDiff(prev.stat.write_bytes, curr.stat.write_bytes));
#endif
}
static void updateProfileEvents(TasksStatsCounters & last_counters, ProfileEvents::Counters & profile_events)
@ -151,4 +148,17 @@ struct TasksStatsCounters
}
};
#else
struct TasksStatsCounters
{
::taskstats stat;
static TasksStatsCounters current() { return {}; }
static void incrementProfileEvents(const TasksStatsCounters &, const TasksStatsCounters &, ProfileEvents::Counters &) {}
static void updateProfileEvents(TasksStatsCounters &, ProfileEvents::Counters &) {}
};
#endif
}

View File

@ -1,10 +1,13 @@
#include "ThreadStatus.h"
#include <common/logger_useful.h>
#include <sstream>
#include <common/Types.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/TaskStatsInfoGetter.h>
#include <Common/ThreadStatus.h>
#include <Poco/Thread.h>
#include <Poco/Logger.h>
#include <Poco/Ext/ThreadNumber.h>
@ -41,7 +44,6 @@ ThreadStatus::ThreadStatus()
last_rusage = std::make_unique<RUsageCounters>();
last_taskstats = std::make_unique<TasksStatsCounters>();
taskstats_getter = std::make_unique<TaskStatsInfoGetter>();
memory_tracker.setDescription("(for thread)");
log = &Poco::Logger::get("ThreadStatus");
@ -72,9 +74,12 @@ void ThreadStatus::initPerformanceCounters()
++queries_started;
*last_rusage = RUsageCounters::current(query_start_time_nanoseconds);
has_permissions_for_taskstats = TaskStatsInfoGetter::checkPermissions();
if (has_permissions_for_taskstats)
if (TaskStatsInfoGetter::checkPermissions())
{
taskstats_getter = std::make_unique<TaskStatsInfoGetter>();
*last_taskstats = TasksStatsCounters::current();
}
}
void ThreadStatus::updatePerformanceCounters()
@ -82,7 +87,7 @@ void ThreadStatus::updatePerformanceCounters()
try
{
RUsageCounters::updateProfileEvents(*last_rusage, performance_counters);
if (has_permissions_for_taskstats)
if (taskstats_getter)
TasksStatsCounters::updateProfileEvents(*last_taskstats, performance_counters);
}
catch (...)

View File

@ -33,7 +33,6 @@ using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>;
class ThreadGroupStatus
{
public:
mutable std::shared_mutex mutex;
ProfileEvents::Counters performance_counters{VariableContext::Process};
@ -126,7 +125,6 @@ public:
~ThreadStatus();
protected:
ThreadStatus();
void initPerformanceCounters();
@ -160,11 +158,11 @@ protected:
/// Use ptr not to add extra dependencies in the header
std::unique_ptr<RUsageCounters> last_rusage;
std::unique_ptr<TasksStatsCounters> last_taskstats;
/// Set only if we have enough capabilities.
std::unique_ptr<TaskStatsInfoGetter> taskstats_getter;
bool has_permissions_for_taskstats = false;
public:
/// Implicitly finalizes current thread in the destructor
class CurrentThreadScope
{

View File

@ -143,12 +143,6 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size)
{
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads");
/// Put all threads of both thread pools to one thread group
/// The master thread exits immediately
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
CurrentThread::detachQuery();
threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
@ -217,14 +211,29 @@ void BackgroundSchedulePool::cancelDelayedTask(const TaskInfoPtr & task, std::lo
}
void BackgroundSchedulePool::attachToThreadGroup()
{
std::lock_guard lock(delayed_tasks_mutex);
if (thread_group)
{
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
}
else
{
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
}
}
void BackgroundSchedulePool::threadFunction()
{
setThreadName("BackgrSchedPool");
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool);
while (!shutdown)
@ -242,8 +251,7 @@ void BackgroundSchedulePool::delayExecutionThreadFunction()
{
setThreadName("BckSchPoolDelay");
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
while (!shutdown)

View File

@ -142,6 +142,8 @@ private:
/// Thread group used for profiling purposes
ThreadGroupStatusPtr thread_group;
void attachToThreadGroup();
};
using BackgroundSchedulePoolPtr = std::shared_ptr<BackgroundSchedulePool>;

View File

@ -300,7 +300,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupSt
try
{
if (thread_group)
CurrentThread::attachTo(thread_group);
CurrentThread::attachToIfDetached(thread_group);
setThreadName("MergeAggMergThr");
while (!parallel_merge_data->finish)

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <boost/noncopyable.hpp>
#include <Common/Exception.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <boost/noncopyable.hpp>

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <Common/Exception.h>
#include <common/logger_useful.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <ext/singleton.h>
#include <condition_variable>

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/ReadBufferAIO.h>
#include <IO/AIOContextPool.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBuffer.h>

View File

@ -67,8 +67,9 @@ bool ReadBufferFromFileDescriptor::nextImpl()
if (res > 0)
bytes_read += res;
/// NOTE: it is quite inaccurate on high loads since the thread could be replaced by another one and we will count cpu time of other thread
/// It is better to use taskstats::blkio_delay_total, but it is quite expensive to get it (TaskStatsInfoGetter has about 500K RPS)
/// It reports real time spent including the time spent while thread was preempted doing nothing.
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it (TaskStatsInfoGetter has about 500K RPS).
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/WriteBufferAIO.h>
#include <Common/ProfileEvents.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBuffer.h>

View File

@ -1,6 +1,6 @@
#include <IO/createReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
#include <IO/ReadBufferAIO.h>
#endif
#include <Common/ProfileEvents.h>
@ -14,10 +14,10 @@ namespace ProfileEvents
namespace DB
{
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(_MSC_VER)
#if !defined(__linux__)
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NOT_IMPLEMENTED;
}
#endif
@ -31,7 +31,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(const std::
}
else
{
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO);
return std::make_unique<ReadBufferAIO>(filename_, buffer_size_, flags_, existing_memory_);
#else

View File

@ -1,6 +1,6 @@
#include <IO/createWriteBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
#include <IO/WriteBufferAIO.h>
#endif
#include <Common/ProfileEvents.h>
@ -15,10 +15,10 @@ namespace ProfileEvents
namespace DB
{
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(_MSC_VER)
#if !defined(__linux__)
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NOT_IMPLEMENTED;
}
#endif
@ -33,7 +33,7 @@ WriteBufferFromFileBase * createWriteBufferFromFileBase(const std::string & file
}
else
{
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
return new WriteBufferAIO(filename_, buffer_size_, flags_, mode, existing_memory_);
#else

View File

@ -51,7 +51,7 @@ namespace DB
*/
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1024
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
class Context;
class QueryLog;

View File

@ -48,5 +48,7 @@ add_check(in_join_subqueries_preprocessor)
add_executable (users users.cpp)
target_link_libraries (users dbms ${Boost_FILESYSTEM_LIBRARY})
add_executable (internal_iotop internal_iotop.cpp)
target_link_libraries (internal_iotop dbms)
if (OS_LINUX)
add_executable (internal_iotop internal_iotop.cpp)
target_link_libraries (internal_iotop dbms)
endif ()

View File

@ -36,7 +36,7 @@ void BackgroundProcessingPoolTaskInfo::wake()
Poco::Timestamp current_time;
{
std::unique_lock<std::mutex> lock(pool.tasks_mutex);
std::unique_lock lock(pool.tasks_mutex);
auto next_time_to_execute = iterator->first;
auto this_task_handle = iterator->second;
@ -58,12 +58,6 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
{
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with " << size << " threads");
/// Put all threads to one thread group
/// The master thread exits immediately
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
CurrentThread::detachQuery();
threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
@ -77,7 +71,7 @@ BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Tas
Poco::Timestamp current_time;
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
res->iterator = tasks.emplace(current_time, res);
}
@ -93,11 +87,11 @@ void BackgroundProcessingPool::removeTask(const TaskHandle & task)
/// Wait for all executions of this task.
{
std::unique_lock<std::shared_mutex> wlock(task->rwlock);
std::unique_lock wlock(task->rwlock);
}
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
tasks.erase(task->iterator);
}
}
@ -122,10 +116,22 @@ void BackgroundProcessingPool::threadFunction()
{
setThreadName("BackgrProcPool");
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
{
std::lock_guard lock(tasks_mutex);
if (thread_group)
{
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
}
else
{
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
}
}
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
pcg64 rng(randomSeed());
@ -141,7 +147,7 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp min_time;
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
if (!tasks.empty())
{
@ -162,7 +168,7 @@ void BackgroundProcessingPool::threadFunction()
if (!task)
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::duration<double>(sleep_seconds
+ std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
@ -173,12 +179,12 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp current_time;
if (min_time > current_time)
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock, std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, sleep_seconds_random_part * 1000000)(rng)));
}
std::shared_lock<std::shared_mutex> rlock(task->rwlock);
std::shared_lock rlock(task->rwlock);
if (task->removed)
continue;
@ -202,7 +208,7 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + (done_work ? 0 : sleep_seconds * 1000000);
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
if (task->removed)
continue;

View File

@ -298,16 +298,19 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
MergeTreeData::DataPart::Checksums * additional_column_checksums)
{
/// Finish columns serialization.
auto & settings = storage.context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
if (!serialization_states.empty())
{
serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
auto & settings = storage.context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
}
}
if (!total_column_list)

View File

@ -52,34 +52,6 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
check_period_ms = storage.data.settings.check_delay_period * 1000;
task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); });
task->schedule();
}
ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread()
{
try
{
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
task->deactivate();
LOG_TRACE(log, "Restarting thread finished");
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
storage.fetcher.blocker.cancelForever();
storage.merger_mutator.actions_blocker.cancelForever();
/// Stop other tasks.
partialShutdown();
if (storage.queue_task_handle)
storage.context.getBackgroundPool().removeTask(storage.queue_task_handle);
storage.queue_task_handle.reset();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
void ReplicatedMergeTreeRestartingThread::run()
@ -215,10 +187,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.alter_thread.start();
storage.part_check_thread.start();
if (!storage.queue_task_handle)
storage.queue_task_handle = storage.context.getBackgroundPool().addTask(
std::bind(&StorageReplicatedMergeTree::queueTask, &storage));
return true;
}
catch (...)
@ -368,4 +336,16 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
LOG_TRACE(log, "Threads finished");
}
void ReplicatedMergeTreeRestartingThread::shutdown()
{
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
task->deactivate();
LOG_TRACE(log, "Restarting thread finished");
/// Stop other tasks.
partialShutdown();
}
}

View File

@ -23,10 +23,17 @@ class ReplicatedMergeTreeRestartingThread
{
public:
ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeRestartingThread();
void start()
{
task->activate();
task->schedule();
}
void wakeup() { task->schedule(); }
void shutdown();
private:
StorageReplicatedMergeTree & storage;
String log_name;

View File

@ -215,7 +215,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
[this] (const std::string & name) { enqueuePartForCheck(name); }),
reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()), queue(*this),
fetcher(data),
cleanup_thread(*this), alter_thread(*this), part_check_thread(*this),
cleanup_thread(*this), alter_thread(*this), part_check_thread(*this), restarting_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{
if (path_.empty())
@ -2063,9 +2063,7 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED)
{
/// Can be called before starting restarting_thread
if (restarting_thread)
restarting_thread->wakeup();
restarting_thread.wakeup();
return;
}
@ -2787,8 +2785,10 @@ void StorageReplicatedMergeTree::startup()
data_parts_exchange_endpoint_holder = std::make_shared<InterserverIOEndpointHolder>(
data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, context.getInterserverIOHandler());
queue_task_handle = context.getBackgroundPool().addTask([this] { return queueTask(); });
/// In this thread replica will be activated.
restarting_thread = std::make_unique<ReplicatedMergeTreeRestartingThread>(*this);
restarting_thread.start();
/// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attmept to do it
startup_event.wait();
@ -2797,15 +2797,21 @@ void StorageReplicatedMergeTree::startup()
void StorageReplicatedMergeTree::shutdown()
{
restarting_thread.reset();
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();
merger_mutator.actions_blocker.cancelForever();
restarting_thread.shutdown();
if (queue_task_handle)
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
if (data_parts_exchange_endpoint_holder)
{
data_parts_exchange_endpoint_holder->getBlocker().cancelForever();
data_parts_exchange_endpoint_holder = nullptr;
}
fetcher.blocker.cancelForever();
}

View File

@ -198,7 +198,6 @@ private:
void clearOldPartsAndRemoveFromZK();
friend class ReplicatedMergeTreeBlockOutputStream;
friend class ReplicatedMergeTreeRestartingThread;
friend class ReplicatedMergeTreePartCheckThread;
friend class ReplicatedMergeTreeCleanupThread;
friend class ReplicatedMergeTreeAlterThread;
@ -303,7 +302,7 @@ private:
ReplicatedMergeTreePartCheckThread part_check_thread;
/// A thread that processes reconnection to ZooKeeper when the session expires.
std::unique_ptr<ReplicatedMergeTreeRestartingThread> restarting_thread;
ReplicatedMergeTreeRestartingThread restarting_thread;
/// An event that awakens `alter` method from waiting for the completion of the ALTER query.
zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>();

View File

@ -1,3 +1,5 @@
<a name="data_type-date"></a>
# Date
Дата. Хранится в двух байтах в виде (беззнакового) числа дней, прошедших от 1970-01-01. Позволяет хранить значения от чуть больше, чем начала unix-эпохи до верхнего порога, определяющегося константой на этапе компиляции (сейчас - до 2106 года, последний полностью поддерживаемый год - 2105).

View File

@ -1,3 +1,5 @@
<a name="table_engine-collapsingmergetree"></a>
# CollapsingMergeTree
*Движок достаточно специфичен для Яндекс.Метрики.*
@ -33,5 +35,3 @@ CollapsingMergeTree(EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID
1. Написать запрос с GROUP BY и агрегатными функциями, учитывающими знак. Например, чтобы посчитать количество, надо вместо count() написать sum(Sign); чтобы посчитать сумму чего-либо, надо вместо sum(x) написать sum(Sign \* x) и т. п., а также добавить HAVING sum(Sign) `>` 0. Не все величины можно посчитать подобным образом. Например, агрегатные функции min, max не могут быть переписаны.
2. Если необходимо вынимать данные без агрегации (например, проверить наличие строк, самые новые значения которых удовлетворяют некоторым условиям), можно использовать модификатор FINAL для секции FROM. Это вариант существенно менее эффективен.

View File

@ -1,3 +1,5 @@
<a name="table_engine-merge"></a>
# Merge
Движок `Merge` (не путайте с движком `MergeTree`) не хранит данные самостоятельно, а позволяет читать одновременно из произвольного количества других таблиц.

View File

@ -2,55 +2,174 @@
# MergeTree
Движок MergeTree поддерживает индекс по первичному ключу и по дате и обеспечивает возможность обновления данных в реальном времени.
Это наиболее продвинутый движок таблиц в ClickHouse. Не путайте с движком Merge.
Движок `MergeTree`, а также другие движки этого семейства (`*MergeTree`) — это наиболее функциональные движки таблиц ClickHousе.
Движок принимает параметры: имя столбца типа Date, содержащего дату; выражение для семплирования (не обязательно); кортеж, определяющий первичный ключ таблицы; гранулированность индекса.
!!!info
Движок [Merge](merge.md#table_engine-merge) не относится к семейству `*MergeTree`.
Пример без поддержки сэмплирования.
Основные возможности:
```text
MergeTree(EventDate, (CounterID, EventDate), 8192)
- Хранит данные, отсортированные по первичному ключу.
Это позволяет создавать разреженный индекс небольшого объёма, который позволяет быстрее находить данные.
- Позволяет оперировать партициями, если задан [ключ партиционирования](custom_partitioning_key.md#table_engines-custom_partitioning_key).
ClickHouse поддерживает отдельные операции с партициями, которые работают эффективнее, чем общие операции с этим же результатом над этими же данными. Также, ClickHouse автоматически отсекает данные по партициям там, где ключ партиционирования указан в запросе. Это также увеличивает эффективность выполнения запросов.
- Поддерживает репликацию данных.
Для этого используется семейство таблиц `ReplicatedMergeTree`. Подробнее читайте в разделе [Репликация данных](replication.md#table_engines-replication).
- Поддерживает сэмплирование данных.
При необходимости можно задать способ сэмплирования данных в таблице.
## Конфигурирование движка при создании таблицы
```
ENGINE [=] MergeTree() [PARTITION BY expr] [ORDER BY expr] [SAMPLE BY expr] [SETTINGS name=value, ...]
```
Пример с поддержкой сэмплирования.
**Секции ENGINE**
```text
- `ORDER BY` — первичный ключ.
Кортеж столбцов или произвольных выражений. Пример: `ORDER BY (CounerID, EventDate)`.
Если используется ключ сэмплирования, то первичный ключ должен содержать его. Пример: `ORDER BY (CounerID, EventDate, intHash32(UserID))`.
- `PARTITION BY` — [ключ партиционирования](custom_partitioning_key.md#table_engines-custom_partitioning_key).
Для партиционирования по месяцам используйте выражение `toYYYYMM(date_column)`, где `date_column` — столбец с датой типа [Date](../../data_types/date.md#data_type-date). В этом случае имена партиций имеют формат `"YYYYMM"`.
- `SAMPLE BY` — выражение для сэмплирования (не обязательно). Пример: `intHash32(UserID))`.
- `SETTINGS` — дополнительные параметры, регулирующие поведение `MergeTree` (не обязательно):
- `index_granularity` — гранулярность индекса. Число строк данных между «засечками» индекса. По умолчанию — 8192.
**Пример**
```
ENGINE MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity=8192
```
В примере мы устанавливаем партиционирование по месяцам.
Также мы задаем выражение для сэмплирования в виде хэша по идентификатору посетителя. Это позволяет псевдослучайным образом перемешать данные в таблице для каждого `CounterID` и `EventDate`. Если при выборке данных задать секцию [SAMPLE](../../query_language/select.md#select-section-sample) то ClickHouse вернёт равномерно-псевдослучайную выборку данных для подмножества посетителей.
`index_granularity` можно было не указывать, поскольку 8192 — это значение по умолчанию.
### Устаревший способ конфигурирования движка
!!!attention
Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ описанный выше.
```
ENGINE [=] MergeTree(date-column [, sampling_expression], (primary, key), index_granularity)
```
**Параметры MergeTree()**
- `date-column` — имя столбца с типом [Date](../../data_types/date.md#data_type-date). На основе этого столбца ClickHouse автоматически создаёт партиции по месяцам. Имена партиций имеют формат `"YYYYMM"`.
- `sampling_expression` — выражение для сэмплирования.
- `(primary, key)` — первичный ключ. Тип — [Tuple()](../../data_types/tuple.md#data_type-tuple). Может состоять из произвольных выражений, но обычно это кортеж столбцов. Обязательно должен включать в себя выражение для сэмплирования, если оно задано. Не обязан включать в себя столбец с датой `date-column`.
- `index_granularity` — гранулярность индекса. Число строк данных между «засечками» индекса. Для большинства задач подходит значение 8192.
**Пример**
```
MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID)), 8192)
```
В таблице типа MergeTree обязательно должен быть отдельный столбец, содержащий дату, здесь это столбец EventDate. Тип столбца с датой — обязательно Date (а не DateTime).
Движок `MergeTree` сконфигурирован таким же образом, как и в примере выше для основного способа конфигурирования движка.
Первичным ключом может быть кортеж из произвольных выражений (обычно это просто кортеж столбцов) или одно выражение.
## Хранение данных
Выражение для сэмплирования (использовать не обязательно) — произвольное выражение. Оно должно также присутствовать в первичном ключе. В примере используется хэширование по идентификатору посетителя, чтобы псевдослучайно перемешать данные в таблице для каждого CounterID и EventDate. То есть, при использовании секции SAMPLE в запросе вы получите равномерно-псевдослучайную выборку данных для подмножества посетителей.
Таблица состоит из *кусков* данных (data parts), отсортированных по первичному ключу.
Таблица реализована как набор кусочков. Каждый кусочек сортирован по первичному ключу. Также для каждого кусочка прописана минимальная и максимальная дата. При вставке в таблицу создаётся новый сортированный кусочек. В фоне периодически инициируется процесс слияния. При слиянии выбирается несколько кусочков, обычно наименьших, и сливаются в один большой сортированный кусочек.
При вставке в таблицу создаются отдельные куски данных, каждый из которых лексикографически отсортирован по первичному ключу. Например, если первичный ключ — `(CounterID, Date)`, то данные в куске будут лежать в порядке `CounterID`, а для каждого `CounterID` в порядке `Date`.
То есть, при вставке в таблицу производится инкрементальная сортировка. Слияние реализовано таким образом, что таблица постоянно состоит из небольшого количества сортированных кусочков, а также само слияние делает не слишком много работы.
Данные, относящиеся к разным партициям, разбиваются на разные куски. В фоновом режиме ClickHouse выполняет слияния (merge) кусков данных для более эффективного хранения. Куски, относящиеся к разным партициям не объединяются.
При вставке данные, относящиеся к разным месяцам, разбиваются на разные кусочки. Кусочки, соответствующие разным месяцам, никогда не объединяются. Это сделано, чтобы обеспечить локальность модификаций данных (для упрощения бэкапов).
Для каждого куска данных ClickHouse создаёт индексный файл, который содержит значение первичного ключа для каждой индексной строки («засечка»). Номера индексных строк определяются как `n * index_granularity`, а максимальное значение `n` равно целой части от деления общего количества строк на `index_granularity`. Для каждого столбца также пишутся «засечки» для тех же индексных строк, что и для первичного ключа, эти «засечки» позволяют находить непосредственно данные в столбцах.
Кусочки объединяются до некоторого предельного размера, чтобы не было слишком длительных слияний.
Вы можете использовать одну большую таблицу, постоянно добавляя в неё данные пачками, именно для этого предназначен движок `MergeTree`.
Для каждого кусочка также пишется индексный файл. Индексный файл содержит значение первичного ключа для каждой index_granularity строки таблицы. То есть, это разреженный индекс сортированных данных.
## Первичные ключи и индексы в запросах
Для столбцов также пишутся «засечки» каждую index_granularity строку, чтобы данные можно было читать в определённом диапазоне.
Рассмотрим первичный ключ — `(CounterID, Date)`, в этом случае, сортировку и индекс можно проиллюстрировать следующим образом:
При чтении из таблицы, запрос SELECT анализируется на предмет того, можно ли использовать индексы.
Индекс может использоваться, если в секции WHERE/PREWHERE, в качестве одного из элементов конъюнкции, или целиком, есть выражение, представляющее операции сравнения на равенства, неравенства, а также IN или LIKE с фиксированным префиксом, над столбцами или выражениями, входящими в первичный ключ или ключ партиционирования, либо над некоторыми частистично монотонными функциями от этих столбцов, а также логические связки над такими выражениями.
```
Whole data: [-------------------------------------------------------------------------]
CounterID: [aaaaaaaaaaaaaaaaaabbbbcdeeeeeeeeeeeeefgggggggghhhhhhhhhiiiiiiiiikllllllll]
Date: [1111111222222233331233211111222222333211111112122222223111112223311122333]
Marks: | | | | | | | | | | |
a,1 a,2 a,3 b,3 e,2 e,3 g,1 h,2 i,1 i,3 l,3
Marks numbers: 0 1 2 3 4 5 6 7 8 9 10
```
Если в запросе к данным указать:
- `CounterID IN ('a', 'h')`, то сервер читает данные в диапазонах засечек `[0, 3)` и `[6, 8)`.
- `CounterID IN ('a', 'h') AND Date = 3`, то сервер читает данные в диапазонах засечек `[1, 3)` и `[7, 8)`.
- `Date = 3`, то сервер читает данные в диапазоне засечек `[1, 10)`.
Примеры выше показывают, что использование индекса всегда эффективнее, чем full scan.
Разреженный индекс допускает чтение лишних строк. При чтении одного диапазона первичного ключа, может быть прочитано до `index_granularity * 2` лишних строк в каждом блоке данных. В большинстве случаев ClickHouse не теряет производительности при `index_granularity = 8192`.
Разреженность индекса позволяет работать даже с очень большим количеством строк в таблицах, поскольку такой индекс всегда помещается в оперативную память компьютера.
ClickHouse не требует уникального первичного ключа. Можно вставить много строк с одинаковым первичным ключом.
### Выбор первичного ключа
Количество столбцов в первичном ключе не ограничено явным образом. В зависимости от структуры данных в первичный ключ можно включать больше или меньше столбцов. Это может:
- Увеличить эффективность индекса.
Пусть первичный ключ — `(a, b)`, тогда добавление ещё одного столбца `c` повысит эффективность, если выполнены условия:
- Есть запросы с условием на столбец `c`.
- Часто встречаются достаточно длинные (в несколько раз больше `index_granularity`) диапазоны данных с одинаковыми значениями `(a, b)`. Иначе говоря, когда добавление ещё одного столбца позволит пропускать достаточно длинные диапазоны данных.
- Улучшить сжатие данных.
ClickHouse сортирует данные по первичному ключу, поэтому чем выше однородность, тем лучше сжатие.
- Обеспечить дополнительную логику при слиянии в движках [CollapsingMergeTree](collapsingmergetree.md#table_engine-collapsingmergetree) и [SummingMergeTree](summingmergetree.md#table_engine-summingmergetree).
Может потребоваться иметь много полей в первичном ключе, даже если они не нужны для выполнения предыдущих пунктов.
Длинный первичный ключ будет негативно влиять на производительность вставки и потребление памяти, однако на производительность ClickHouse при запросах `SELECT` лишние столбцы в первичном ключе не влияют.
### Использование индексов и партиций в запросах
Для запросов `SELECT` ClickHouse анализирует возможность использования индекса. Индекс может использоваться, если в секции `WHERE/PREWHERE`, в качестве одного из элементов конъюнкции, или целиком, есть выражение, представляющее операции сравнения на равенства, неравенства, а также `IN` или `LIKE` с фиксированным префиксом, над столбцами или выражениями, входящими в первичный ключ или ключ партиционирования, либо над некоторыми частично монотонными функциями от этих столбцов, а также логические связки над такими выражениями.
Таким образом, обеспечивается возможность быстро выполнять запросы по одному или многим диапазонам первичного ключа. Например, в указанном примере будут быстро работать запросы для конкретного счётчика; для конкретного счётчика и диапазона дат; для конкретного счётчика и даты, для нескольких счётчиков и диапазона дат и т. п.
Рассмотрим движок сконфигурированный следующим образом:
```
ENGINE MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate) SETTINGS index_granularity=8192
```
В этом случае в запросах:
```sql
SELECT count() FROM table WHERE EventDate = toDate(now()) AND CounterID = 34
SELECT count() FROM table WHERE EventDate = toDate(now()) AND (CounterID = 34 OR CounterID = 42)
SELECT count() FROM table WHERE ((EventDate >= toDate('2014-01-01') AND EventDate <= toDate('2014-01-31')) OR EventDate = toDate('2014-05-01')) AND CounterID IN (101500, 731962, 160656) AND (CounterID = 101500 OR EventDate != toDate('2014-05-01'))
```
Во всех этих случаях будет использоваться индекс по дате и по первичному ключу. Видно, что индекс используется даже для достаточно сложных выражений. Чтение из таблицы организовано так, что использование индекса не может быть медленнее full scan-а.
ClickHouse будет использовать индекс по первичному ключу для отсечения не подходящих данных, а также ключ партиционирования по месяцам для отсечения партиций, которые находятся в не подходящих диапазонах дат.
В этом примере индекс не может использоваться.
Запросы выше показывают, что индекс используется даже для сложных выражений. Чтение из таблицы организовано так, что использование индекса не может быть медленнее, чем full scan.
В примере ниже индекс не может использоваться.
```sql
SELECT count() FROM table WHERE CounterID = 34 OR URL LIKE '%upyachka%'
@ -58,14 +177,11 @@ SELECT count() FROM table WHERE CounterID = 34 OR URL LIKE '%upyachka%'
Чтобы проверить, сможет ли ClickHouse использовать индекс при выполнении запроса, используйте настройки [force_index_by_date](../settings/settings.md#settings-settings-force_index_by_date) и [force_primary_key](../settings/settings.md#settings-settings-force_primary_key).
Индекс по дате обеспечивает чтение только кусков, содержащих даты из нужного диапазона. При этом кусок данных может содержать данные за многие даты (до целого месяца), а в пределах одного куска данные лежат упорядоченными по первичному ключу, который может не содержать дату в качестве первого столбца. В связи с этим, при использовании запроса с указанием условия только на дату, но не на префикс первичного ключа, будет читаться данных больше, чем за одну дату.
Ключ партиционирования по месяцам обеспечивает чтение только тех блоков данных, которые содержат даты из нужного диапазона. При этом блок данных может содержать данные за многие даты (до целого месяца). В пределах одного блока данные упорядочены по первичному ключу, который может не содержать дату в качестве первого столбца. В связи с этим, при использовании запроса с указанием условия только на дату, но не на префикс первичного ключа, будет читаться данных больше, чем за одну дату.
Для конкуррентного доступа к таблице используется мультиверсионность. То есть, при одновременном чтении и обновлении таблицы, данные будут читаться из набора кусочков, актуального на момент запроса. Длинных блокировок нет. Вставки никак не мешают чтениям.
## Конкурентный доступ к данным
Для конкурентного доступа к таблице используется мультиверсионность. То есть, при одновременном чтении и обновлении таблицы, данные будут читаться из набора кусочков, актуального на момент запроса. Длинных блокировок нет. Вставки никак не мешают чтениям.
Чтения из таблицы автоматически распараллеливаются.
Поддерживается запрос `OPTIMIZE`, который вызывает один внеочередной шаг слияния.
Вы можете использовать одну большую таблицу, постоянно добавляя в неё данные небольшими пачками, именно для этого предназначен движок MergeTree.
Для всех типов таблиц семейства MergeTree возможна репликация данных — смотрите раздел «Репликация данных».

View File

@ -152,6 +152,8 @@ sudo -u clickhouse touch /var/lib/clickhouse/flags/force_restore_data
Отсутствует ограничение на использование сетевой полосы при восстановлении. Имейте это ввиду, если восстанавливаете сразу много реплик.
<a name="convert-mergetree-to-replicated"></a>
## Преобразование из MergeTree в ReplicatedMergeTree
Здесь и далее, под `MergeTree` подразумеваются все движки таблиц семейства `MergeTree`, так же для `ReplicatedMergeTree`.

View File

@ -1,3 +1,5 @@
<a name="table_engine-summingmergetree"></a>
# SummingMergeTree
Отличается от `MergeTree` тем, что суммирует данные при слиянии.

View File

@ -17,10 +17,41 @@ N может быть отрицательным.
В остальном, аналогично функции floor, см. выше.
## round(x\[, N\])
Возвращает ближайшее к num круглое число, которое может быть меньше или больше или равно x.
Если x находится посередине от ближайших круглых чисел, то возвращается какое-либо одно из них (implementation specific).
Число -0. может считаться или не считаться круглым (implementation specific).
В остальном, аналогично функциям floor и ceil, см. выше.
Реализует [банковское округление](https://en.wikipedia.org/wiki/Rounding#Round_half_to_even), т.е. округление до ближайшего чётного.
**Аргументы функции**
- `x` — число для округления. [Тип](../../data_types/index.md#data_types) — любой числовой.
- `N` — позиция цифры после запятой, до которой следует округлять.
**Возвращаемое значение**
Округлённое число того же типа, что и входное число `x`.
**Пример**
```sql
SELECT
number / 2 AS x,
round(x)
FROM system.numbers
LIMIT 10
```
```
┌───x─┬─round(divide(number, 2))─┐
│ 0 │ 0 │
│ 0.5 │ 0 │
│ 1 │ 1 │
│ 1.5 │ 2 │
│ 2 │ 2 │
│ 2.5 │ 2 │
│ 3 │ 3 │
│ 3.5 │ 4 │
│ 4 │ 4 │
│ 4.5 │ 4 │
└─────┴──────────────────────────┘
```
## roundToExp2(num)
Принимает число. Если число меньше единицы - возвращает 0. Иначе округляет число вниз до ближайшей (целой неотрицательной) степени двух.

View File

@ -46,6 +46,8 @@ SELECT [DISTINCT] expr_list
Модификатор FINAL может быть использован только при SELECT-е из таблицы типа CollapsingMergeTree. При указании FINAL, данные будут выбираться полностью "сколлапсированными". Стоит учитывать, что использование FINAL приводит к выбору кроме указанных в SELECT-е столбцов также столбцов, относящихся к первичному ключу. Также, запрос будет выполняться в один поток, и при выполнении запроса будет выполняться слияние данных. Это приводит к тому, что при использовании FINAL, запрос выполняется медленнее. В большинстве случаев, следует избегать использования FINAL. Подробнее смотрите раздел "Движок CollapsingMergeTree".
<a name="select-section-sample"></a>
### Секция SAMPLE
Секция SAMPLE позволяет выполнить запрос приближённо. Приближённое выполнение запроса поддерживается только таблицами типа MergeTree\* и только если при создании таблицы было указано выражение, по которому производится выборка (смотрите раздел "Движок MergeTree").

View File

@ -2,17 +2,46 @@
# file
`file(path, format, structure)` - возвращает таблицу со столбцами, указанными в structure, созданную из файла path типа format.
Создаёт таблицу из файла.
path - относительный путь до файла от [user_files_path](../../operations/server_settings/settings.md#user_files_path).
```
file(path, format, structure)
```
format - [формат](../../interfaces/formats.md#formats) файла.
**Входные параметры**
structure - структура таблицы в форме 'UserID UInt64, URL String'. Определяет имена и типы столбцов.
- `path` — относительный путь до файла от [user_files_path](../../operations/server_settings/settings.md#user_files_path).
- `format` — [формат](../../interfaces/formats.md#formats) файла.
- `structure` — структура таблицы. Формат `'colunmn1_name column1_ype, column2_name column2_type, ...'`.
**Возвращаемое значение**
Таблица с указанной структурой, предназначенная для чтения или записи данных в указанном файле.
**Пример**
```sql
-- получение первых 10 строк таблицы, состоящей из трёх колонок типа UInt32 из CSV файла
SELECT * FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32') LIMIT 10
Настройка `user_files_path` и содержимое файла `test.csv`:
```bash
$ grep user_files_path /etc/clickhouse-server/config.xml
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
$ cat /var/lib/clickhouse/user_files/test.csv
1,2,3
3,2,1
78,43,45
```
Таблица из `test.csv` и выборка первых двух строк из неё:
```sql
SELECT *
FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
LIMIT 2
```
```
┌─column1─┬─column2─┬─column3─┐
│ 1 │ 2 │ 3 │
│ 3 │ 2 │ 1 │
└─────────┴─────────┴─────────┘
```