Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Ivan Blinkov 2018-08-22 16:15:22 +03:00
commit 5f4a155582
184 changed files with 1736 additions and 965 deletions

View File

@ -78,7 +78,7 @@ if (USE_STATIC_LIBRARIES)
list(REVERSE CMAKE_FIND_LIBRARY_SUFFIXES)
endif ()
if (CMAKE_LIBRARY_ARCHITECTURE MATCHES "amd64.*|x86_64.*|AMD64.*")
if (CMAKE_SYSTEM_PROCESSOR MATCHES "amd64|x86_64")
option (USE_INTERNAL_MEMCPY "Use internal implementation of 'memcpy' function instead of provided by libc. Only for x86_64." ON)
if (OS_LINUX)

View File

@ -151,7 +151,7 @@ endif ()
if (USE_INTERNAL_LLVM_LIBRARY)
# ld: unknown option: --color-diagnostics
if (APPLE AND COMPILER_GCC)
if (APPLE)
set (LINKER_SUPPORTS_COLOR_DIAGNOSTICS 0 CACHE INTERNAL "")
endif ()
add_subdirectory (llvm/llvm)

View File

@ -157,6 +157,7 @@ target_link_libraries (dbms
${RE2_LIBRARY}
${RE2_ST_LIBRARY}
${BTRIE_LIBRARIES}
${Boost_PROGRAM_OPTIONS_LIBRARY}
)
if (NOT USE_INTERNAL_RE2_LIBRARY)

View File

@ -90,6 +90,8 @@ namespace ErrorCodes
extern const int UNEXPECTED_PACKET_FROM_SERVER;
extern const int CLIENT_OUTPUT_FORMAT_SPECIFIED;
extern const int LOGICAL_ERROR;
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int CANNOT_READLINE;
}
@ -480,6 +482,37 @@ private:
Poco::File(history_file).createFile();
}
#if USE_READLINE
/// Install Ctrl+C signal handler that will be used in interactive mode.
if (rl_initialize())
throw Exception("Cannot initialize readline", ErrorCodes::CANNOT_READLINE);
auto clear_prompt_or_exit = [](int)
{
/// This is signal safe.
ssize_t res = write(STDOUT_FILENO, "\n", 1);
/// Allow to quit client while query is in progress by pressing Ctrl+C twice.
/// (First press to Ctrl+C will try to cancel query by InterruptListener).
if (res == 1 && rl_line_buffer[0] && !RL_ISSTATE(RL_STATE_DONE))
{
rl_replace_line("", 0);
if (rl_forced_update_display())
_exit(0);
}
else
{
/// A little dirty, but we struggle to find better way to correctly
/// force readline to exit after returning from the signal handler.
_exit(0);
}
};
if (signal(SIGINT, clear_prompt_or_exit) == SIG_ERR)
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
#endif
loop();
std::cout << (isNewYearMode() ? "Happy new year." : "Bye.") << std::endl;
@ -1656,7 +1689,7 @@ public:
config().setInt("port", options["port"].as<int>());
if (options.count("secure"))
config().setBool("secure", true);
if (options.count("user"))
if (options.count("user") && !options["user"].defaulted())
config().setString("user", options["user"].as<std::string>());
if (options.count("password"))
config().setString("password", options["password"].as<std::string>());

View File

@ -366,12 +366,16 @@ int Server::main(const std::vector<std::string> & /*args*/)
dns_cache_updater = std::make_unique<DNSCacheUpdater>(*global_context);
}
if (!TaskStatsInfoGetter::checkProcessHasRequiredPermissions())
#if defined(__linux__)
if (!TaskStatsInfoGetter::checkPermissions())
{
LOG_INFO(log, "It looks like the process has not CAP_NET_ADMIN capability, some performance statistics will be disabled."
" It could happen due to incorrect clickhouse package installation."
" You could resolve the problem manually calling 'sudo setcap cap_net_admin=+ep /usr/bin/clickhouse'");
LOG_INFO(log, "It looks like the process has no CAP_NET_ADMIN capability, some performance statistics will be disabled."
" It could happen due to incorrect ClickHouse package installation."
" You could resolve the problem manually with 'sudo setcap cap_net_admin=+ep /usr/bin/clickhouse'");
}
#else
LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled.");
#endif
{
Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0);

View File

@ -175,7 +175,6 @@ ColumnPtr ColumnString::indexImpl(const PaddedPODArray<Type> & indexes, size_t l
Chars_t & res_chars = res->chars;
Offsets & res_offsets = res->offsets;
size_t new_chars_size = 0;
for (size_t i = 0; i < limit; ++i)
new_chars_size += sizeAt(indexes[i]);

View File

@ -331,7 +331,7 @@ template class ColumnVector<Int128>;
template class ColumnVector<Float32>;
template class ColumnVector<Float64>;
template class ColumnVector<Dec32>;
template class ColumnVector<Dec64>;
template class ColumnVector<Dec128>;
template class ColumnVector<Decimal32>;
template class ColumnVector<Decimal64>;
template class ColumnVector<Decimal128>;
}

View File

@ -124,39 +124,25 @@ template <> inline UInt64 unionCastToUInt64(Float32 x)
/// PaddedPODArray extended by Decimal scale
template <typename T, size_t INITIAL_SIZE = 4096>
class DecPaddedPODArray : public PODArray<T, INITIAL_SIZE, Allocator<false>, sizeof(T)-1>
template <typename T>
class DecimalPaddedPODArray : public PaddedPODArray<T>
{
public:
using Base = PODArray<T, INITIAL_SIZE, Allocator<false>, sizeof(T)-1>;
using Base = PaddedPODArray<T>;
using Base::operator[];
using Base::Base;
DecPaddedPODArray()
DecimalPaddedPODArray(std::initializer_list<T> il)
: DecimalPaddedPODArray(std::begin(il), std::end(il))
{}
DecPaddedPODArray(size_t n)
: Base(n)
{}
DecPaddedPODArray(size_t n, const T & x)
: Base(n, x)
{}
DecPaddedPODArray(typename Base::const_iterator from_begin, typename Base::const_iterator from_end)
: Base(from_begin, from_end)
{}
DecPaddedPODArray(std::initializer_list<T> il)
: DecPaddedPODArray(std::begin(il), std::end(il))
{}
DecPaddedPODArray(DecPaddedPODArray && other)
DecimalPaddedPODArray(DecimalPaddedPODArray && other)
{
this->swap(other);
std::swap(scale, other.scale);
}
DecPaddedPODArray & operator= (DecPaddedPODArray && other)
DecimalPaddedPODArray & operator=(DecimalPaddedPODArray && other)
{
this->swap(other);
std::swap(scale, other.scale);
@ -167,7 +153,7 @@ public:
UInt32 getScale() const { return scale; }
private:
UInt32 scale = std::numeric_limits<UInt32>::max();
UInt32 scale = DecimalField::wrongScale();
};
@ -185,7 +171,7 @@ private:
public:
using value_type = T;
using Container = std::conditional_t<decTrait<T>(), DecPaddedPODArray<value_type>, PaddedPODArray<value_type>>;
using Container = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<value_type>, PaddedPODArray<value_type>>;
private:
ColumnVector() {}
@ -269,12 +255,12 @@ public:
Field operator[](size_t n) const override
{
if constexpr (decTrait<T>())
if constexpr (IsDecimalNumber<T>)
{
UInt32 scale = data.getScale();
if (scale == std::numeric_limits<UInt32>::max())
if (scale == DecimalField::wrongScale())
throw Exception("Extracting Decimal field with unknown scale. Scale is lost.", ErrorCodes::LOGICAL_ERROR);
return DecField(data[n], scale);
return DecimalField(data[n], scale);
}
else
return typename NearestFieldType<T>::Type(data[n]);

View File

@ -387,6 +387,8 @@ namespace ErrorCodes
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING = 410;
extern const int PTHREAD_ERROR = 411;
extern const int NETLINK_ERROR = 412;
extern const int CANNOT_SET_SIGNAL_HANDLER = 413;
extern const int CANNOT_READLINE = 414;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -74,7 +74,9 @@
M(ZooKeeperCheck) \
M(ZooKeeperClose) \
M(ZooKeeperWatchResponse) \
M(ZooKeeperExceptions) \
M(ZooKeeperUserExceptions) \
M(ZooKeeperHardwareExceptions) \
M(ZooKeeperOtherExceptions) \
M(ZooKeeperWaitMicroseconds) \
M(ZooKeeperBytesSent) \
M(ZooKeeperBytesReceived) \

View File

@ -2,26 +2,27 @@
#include <Common/Exception.h>
#include <Core/Types.h>
#include <unistd.h>
#if defined(__linux__)
#include <common/unaligned.h>
#include <errno.h>
#include <linux/genetlink.h>
#include <linux/netlink.h>
#include <linux/taskstats.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <syscall.h>
#include <linux/genetlink.h>
#include <linux/netlink.h>
#include <linux/taskstats.h>
#include <linux/capability.h>
/// Basic idea is motivated by "iotop" tool.
/// More info: https://www.kernel.org/doc/Documentation/accounting/taskstats.txt
#define GENLMSG_DATA(glh) ((void *)((char*)NLMSG_DATA(glh) + GENL_HDRLEN))
#define GENLMSG_PAYLOAD(glh) (NLMSG_PAYLOAD(glh, 0) - GENL_HDRLEN)
#define NLA_DATA(na) ((void *)((char*)(na) + NLA_HDRLEN))
#define NLA_PAYLOAD(len) (len - NLA_HDRLEN)
namespace DB
{
@ -29,193 +30,253 @@ namespace DB
namespace ErrorCodes
{
extern const int NETLINK_ERROR;
extern const int LOGICAL_ERROR;
}
namespace
{
static size_t constexpr MAX_MSG_SIZE = 1024;
/** The message contains:
* - Netlink protocol header;
* - Generic Netlink (is a sub-protocol of Netlink that we use) protocol header;
* - Payload
* -- that itself is a list of "Attributes" (sub-messages), each of them contains length (including header), type, and its own payload.
* -- and attribute payload may be represented by the list of embedded attributes.
*/
struct NetlinkMessage
{
::nlmsghdr n;
::genlmsghdr g;
char buf[MAX_MSG_SIZE];
static size_t constexpr MAX_MSG_SIZE = 1024;
alignas(NLMSG_ALIGNTO) ::nlmsghdr header;
struct Attribute
{
::nlattr header;
alignas(NLMSG_ALIGNTO) char payload[0];
const Attribute * next() const
{
return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + NLA_ALIGN(header.nla_len));
}
};
union alignas(NLMSG_ALIGNTO)
{
struct
{
::genlmsghdr generic_header;
union alignas(NLMSG_ALIGNTO)
{
char buf[MAX_MSG_SIZE];
Attribute attribute; /// First attribute. There may be more.
} payload;
};
::nlmsgerr error;
};
size_t payload_size() const
{
return header.nlmsg_len - sizeof(header) - sizeof(generic_header);
}
const Attribute * end() const
{
return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + header.nlmsg_len);
}
void send(int fd) const
{
const char * request_buf = reinterpret_cast<const char *>(this);
ssize_t request_size = header.nlmsg_len;
::sockaddr_nl nladdr{};
nladdr.nl_family = AF_NETLINK;
while (true)
{
ssize_t bytes_sent = ::sendto(fd, request_buf, request_size, 0, reinterpret_cast<const ::sockaddr *>(&nladdr), sizeof(nladdr));
if (bytes_sent <= 0)
{
if (errno == EAGAIN)
continue;
else
throwFromErrno("Can't send a Netlink command", ErrorCodes::NETLINK_ERROR);
}
if (bytes_sent > request_size)
throw Exception("Wrong result of sendto system call: bytes_sent is greater than request size", ErrorCodes::NETLINK_ERROR);
if (bytes_sent == request_size)
break;
request_buf += bytes_sent;
request_size -= bytes_sent;
}
}
void receive(int fd)
{
ssize_t bytes_received = ::recv(fd, this, sizeof(*this), 0);
if (header.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&header), bytes_received))
throw Exception("Can't receive Netlink response, error: " + std::to_string(error.error), ErrorCodes::NETLINK_ERROR);
}
};
int sendCommand(
int sock_fd,
UInt16 nlmsg_type,
UInt32 nlmsg_pid,
UInt8 genl_cmd,
UInt16 nla_type,
void * nla_data,
int nla_len) noexcept
NetlinkMessage query(
int fd,
UInt16 type,
UInt32 pid,
UInt8 command,
UInt16 attribute_type,
const void * attribute_data,
int attribute_size)
{
NetlinkMessage msg{};
NetlinkMessage request;
msg.n.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN);
msg.n.nlmsg_type = nlmsg_type;
msg.n.nlmsg_flags = NLM_F_REQUEST;
msg.n.nlmsg_seq = 0;
msg.n.nlmsg_pid = nlmsg_pid;
msg.g.cmd = genl_cmd;
msg.g.version = 1;
request.header.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN); /// Length of both headers.
request.header.nlmsg_type = type;
request.header.nlmsg_flags = NLM_F_REQUEST; /// A request.
request.header.nlmsg_seq = 0;
request.header.nlmsg_pid = pid;
::nlattr * attr = static_cast<::nlattr *>(GENLMSG_DATA(&msg));
attr->nla_type = nla_type;
attr->nla_len = nla_len + 1 + NLA_HDRLEN;
request.generic_header.cmd = command;
request.generic_header.version = 1;
memcpy(NLA_DATA(attr), nla_data, nla_len);
msg.n.nlmsg_len += NLMSG_ALIGN(attr->nla_len);
request.payload.attribute.header.nla_type = attribute_type;
request.payload.attribute.header.nla_len = attribute_size + 1 + NLA_HDRLEN;
char * buf = reinterpret_cast<char *>(&msg);
ssize_t buflen = msg.n.nlmsg_len;
memcpy(&request.payload.attribute.payload, attribute_data, attribute_size);
::sockaddr_nl nladdr{};
nladdr.nl_family = AF_NETLINK;
request.header.nlmsg_len += NLMSG_ALIGN(request.payload.attribute.header.nla_len);
while (true)
{
ssize_t r = ::sendto(sock_fd, buf, buflen, 0, reinterpret_cast<const ::sockaddr *>(&nladdr), sizeof(nladdr));
request.send(fd);
if (r >= buflen)
break;
NetlinkMessage response;
response.receive(fd);
if (r > 0)
{
buf += r;
buflen -= r;
}
else if (errno != EAGAIN)
return -1;
}
return 0;
return response;
}
UInt16 getFamilyId(int nl_sock_fd) noexcept
UInt16 getFamilyIdImpl(int fd)
{
struct
{
::nlmsghdr header;
::genlmsghdr ge_header;
char buf[256];
} answer;
NetlinkMessage answer = query(fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY, CTRL_ATTR_FAMILY_NAME, TASKSTATS_GENL_NAME, strlen(TASKSTATS_GENL_NAME) + 1);
static char name[] = TASKSTATS_GENL_NAME;
/// NOTE Why the relevant info is located in the second attribute?
const NetlinkMessage::Attribute * attr = answer.payload.attribute.next();
if (sendCommand(
nl_sock_fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY,
CTRL_ATTR_FAMILY_NAME, (void *) name,
strlen(TASKSTATS_GENL_NAME) + 1))
return 0;
if (attr->header.nla_type != CTRL_ATTR_FAMILY_ID)
throw Exception("Received wrong attribute as an answer to GET_FAMILY Netlink command", ErrorCodes::NETLINK_ERROR);
UInt16 id = 0;
ssize_t rep_len = ::recv(nl_sock_fd, &answer, sizeof(answer), 0);
if (answer.header.nlmsg_type == NLMSG_ERROR || (rep_len < 0) || !NLMSG_OK((&answer.header), rep_len))
return 0;
return unalignedLoad<UInt16>(attr->payload);
}
const ::nlattr * attr;
attr = static_cast<const ::nlattr *>(GENLMSG_DATA(&answer));
attr = reinterpret_cast<const ::nlattr *>(reinterpret_cast<const char *>(attr) + NLA_ALIGN(attr->nla_len));
if (attr->nla_type == CTRL_ATTR_FAMILY_ID)
id = *static_cast<const UInt16 *>(NLA_DATA(attr));
return id;
bool checkPermissionsImpl()
{
/// See man getcap.
__user_cap_header_struct request{};
request.version = _LINUX_CAPABILITY_VERSION_1; /// It's enough to check just single CAP_NET_ADMIN capability we are interested.
request.pid = getpid();
__user_cap_data_struct response{};
/// Avoid dependency on 'libcap'.
if (0 != syscall(SYS_capget, &request, &response))
throwFromErrno("Cannot do 'capget' syscall", ErrorCodes::NETLINK_ERROR);
return (1 << CAP_NET_ADMIN) & response.effective;
}
UInt16 getFamilyId(int fd)
{
/// It is thread and exception safe since C++11 and even before.
static UInt16 res = getFamilyIdImpl(fd);
return res;
}
}
TaskStatsInfoGetter::TaskStatsInfoGetter() = default;
void TaskStatsInfoGetter::init()
bool TaskStatsInfoGetter::checkPermissions()
{
if (netlink_socket_fd >= 0)
return;
static bool res = checkPermissionsImpl();
return res;
}
TaskStatsInfoGetter::TaskStatsInfoGetter()
{
if (!checkPermissions())
throw Exception("Logical error: TaskStatsInfoGetter is not usable without CAP_NET_ADMIN. Check permissions before creating the object.",
ErrorCodes::LOGICAL_ERROR);
netlink_socket_fd = ::socket(PF_NETLINK, SOCK_RAW, NETLINK_GENERIC);
if (netlink_socket_fd < 0)
throwFromErrno("Can't create PF_NETLINK socket");
throwFromErrno("Can't create PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
/// On some containerized environments, operation on Netlink socket could hang forever.
/// We set reasonably small timeout to overcome this issue.
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 50000;
if (0 != ::setsockopt(netlink_socket_fd, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char *>(&tv), sizeof(tv)))
throwFromErrno("Can't set timeout on PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
::sockaddr_nl addr{};
addr.nl_family = AF_NETLINK;
if (::bind(netlink_socket_fd, reinterpret_cast<const ::sockaddr *>(&addr), sizeof(addr)) < 0)
throwFromErrno("Can't bind PF_NETLINK socket");
throwFromErrno("Can't bind PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
netlink_family_id = getFamilyId(netlink_socket_fd);
taskstats_family_id = getFamilyId(netlink_socket_fd);
}
bool TaskStatsInfoGetter::getStatImpl(int tid, ::taskstats & out_stats, bool throw_on_error)
void TaskStatsInfoGetter::getStat(::taskstats & out_stats, pid_t tid)
{
init();
NetlinkMessage answer = query(netlink_socket_fd, taskstats_family_id, tid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &tid, sizeof(tid));
if (sendCommand(netlink_socket_fd, netlink_family_id, tid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &tid, sizeof(pid_t)))
throwFromErrno("Can't send a Netlink command");
NetlinkMessage msg;
ssize_t rv = ::recv(netlink_socket_fd, &msg, sizeof(msg), 0);
if (msg.n.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&msg.n), rv))
for (const NetlinkMessage::Attribute * attr = &answer.payload.attribute;
attr < answer.end();
attr = attr->next())
{
const ::nlmsgerr * err = static_cast<const ::nlmsgerr *>(NLMSG_DATA(&msg));
if (throw_on_error)
throw Exception("Can't get Netlink response, error: " + std::to_string(err->error), ErrorCodes::NETLINK_ERROR);
else
return false;
}
rv = GENLMSG_PAYLOAD(&msg.n);
const ::nlattr * attr = static_cast<const ::nlattr *>(GENLMSG_DATA(&msg));
ssize_t len = 0;
while (len < rv)
{
len += NLA_ALIGN(attr->nla_len);
if (attr->nla_type == TASKSTATS_TYPE_AGGR_TGID || attr->nla_type == TASKSTATS_TYPE_AGGR_PID)
if (attr->header.nla_type == TASKSTATS_TYPE_AGGR_TGID || attr->header.nla_type == TASKSTATS_TYPE_AGGR_PID)
{
int aggr_len = NLA_PAYLOAD(attr->nla_len);
int len2 = 0;
attr = static_cast<const ::nlattr *>(NLA_DATA(attr));
while (len2 < aggr_len)
for (const NetlinkMessage::Attribute * nested_attr = reinterpret_cast<const NetlinkMessage::Attribute *>(attr->payload);
nested_attr < attr->next();
nested_attr = nested_attr->next())
{
if (attr->nla_type == TASKSTATS_TYPE_STATS)
if (nested_attr->header.nla_type == TASKSTATS_TYPE_STATS)
{
const ::taskstats * ts = static_cast<const ::taskstats *>(NLA_DATA(attr));
out_stats = *ts;
out_stats = unalignedLoad<::taskstats>(nested_attr->payload);
return;
}
len2 += NLA_ALIGN(attr->nla_len);
attr = reinterpret_cast<const ::nlattr *>(reinterpret_cast<const char *>(attr) + len2);
}
}
attr = reinterpret_cast<const ::nlattr *>(reinterpret_cast<const char *>(GENLMSG_DATA(&msg)) + len);
}
return true;
throw Exception("There is no TASKSTATS_TYPE_STATS attribute in the Netlink response", ErrorCodes::NETLINK_ERROR);
}
void TaskStatsInfoGetter::getStat(::taskstats & stat, int tid)
pid_t TaskStatsInfoGetter::getCurrentTID()
{
tid = tid < 0 ? getDefaultTID() : tid;
getStatImpl(tid, stat, true);
/// This call is always successful. - man gettid
return static_cast<pid_t>(syscall(SYS_gettid));
}
bool TaskStatsInfoGetter::tryGetStat(::taskstats & stat, int tid)
{
tid = tid < 0 ? getDefaultTID() : tid;
return getStatImpl(tid, stat, false);
}
TaskStatsInfoGetter::~TaskStatsInfoGetter()
{
@ -223,32 +284,43 @@ TaskStatsInfoGetter::~TaskStatsInfoGetter()
close(netlink_socket_fd);
}
int TaskStatsInfoGetter::getCurrentTID()
}
#else
namespace DB
{
/// This call is always successful. - man gettid
return static_cast<int>(syscall(SYS_gettid));
}
int TaskStatsInfoGetter::getDefaultTID()
namespace ErrorCodes
{
if (default_tid < 0)
default_tid = getCurrentTID();
return default_tid;
extern const int NOT_IMPLEMENTED;
}
static bool tryGetTaskStats()
bool TaskStatsInfoGetter::checkPermissions()
{
TaskStatsInfoGetter getter;
::taskstats stat;
return getter.tryGetStat(stat);
return false;
}
bool TaskStatsInfoGetter::checkProcessHasRequiredPermissions()
TaskStatsInfoGetter::TaskStatsInfoGetter()
{
throw Exception("TaskStats are not implemented for this OS.", ErrorCodes::NOT_IMPLEMENTED);
}
void TaskStatsInfoGetter::getStat(::taskstats &, pid_t)
{
}
pid_t TaskStatsInfoGetter::getCurrentTID()
{
return 0;
}
TaskStatsInfoGetter::~TaskStatsInfoGetter()
{
/// It is thread- and exception- safe since C++11
static bool res = tryGetTaskStats();
return res;
}
}
#endif

View File

@ -1,43 +1,39 @@
#pragma once
#include <Core/Types.h>
#include <sys/types.h>
#include <Core/Types.h>
#include <boost/noncopyable.hpp>
#if defined(__linux__)
struct taskstats;
#else
struct taskstats {};
#endif
namespace DB
{
class Exception;
/// Get taskstat info from OS kernel via Netlink protocol.
class TaskStatsInfoGetter
class TaskStatsInfoGetter : private boost::noncopyable
{
public:
TaskStatsInfoGetter();
TaskStatsInfoGetter(const TaskStatsInfoGetter &) = delete;
void getStat(::taskstats & stat, int tid = -1);
bool tryGetStat(::taskstats & stat, int tid = -1);
~TaskStatsInfoGetter();
void getStat(::taskstats & stat, pid_t tid);
/// Make a syscall and returns Linux thread id
static int getCurrentTID();
static pid_t getCurrentTID();
/// Whether the current process has permissions (sudo or cap_net_admin capabilties) to get taskstats info
static bool checkProcessHasRequiredPermissions();
static bool checkPermissions();
#if defined(__linux__)
private:
/// Caches current thread tid to avoid extra sys calls
int getDefaultTID();
int default_tid = -1;
bool getStatImpl(int tid, ::taskstats & out_stats, bool throw_on_error = false);
void init();
int netlink_socket_fd = -1;
UInt16 netlink_family_id = 0;
UInt16 taskstats_family_id = 0;
#endif
};
}

View File

@ -1,11 +1,14 @@
#pragma once
#include <Common/TaskStatsInfoGetter.h>
#include <Common/ProfileEvents.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <pthread.h>
#if defined(__linux__)
#include <linux/taskstats.h>
#endif
namespace ProfileEvents
@ -18,6 +21,7 @@ namespace ProfileEvents
extern const Event VoluntaryContextSwitches;
extern const Event InvoluntaryContextSwitches;
#if defined(__linux__)
extern const Event OSIOWaitMicroseconds;
extern const Event OSCPUWaitMicroseconds;
extern const Event OSCPUVirtualTimeMicroseconds;
@ -25,6 +29,7 @@ namespace ProfileEvents
extern const Event OSWriteChars;
extern const Event OSReadBytes;
extern const Event OSWriteBytes;
#endif
}
@ -106,6 +111,8 @@ struct RUsageCounters
};
#if defined(__linux__)
struct TasksStatsCounters
{
::taskstats stat;
@ -141,4 +148,17 @@ struct TasksStatsCounters
}
};
#else
struct TasksStatsCounters
{
::taskstats stat;
static TasksStatsCounters current() { return {}; }
static void incrementProfileEvents(const TasksStatsCounters &, const TasksStatsCounters &, ProfileEvents::Counters &) {}
static void updateProfileEvents(TasksStatsCounters &, ProfileEvents::Counters &) {}
};
#endif
}

View File

@ -1,10 +1,13 @@
#include "ThreadStatus.h"
#include <common/logger_useful.h>
#include <sstream>
#include <common/Types.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/TaskStatsInfoGetter.h>
#include <Common/ThreadStatus.h>
#include <Poco/Thread.h>
#include <Poco/Logger.h>
#include <Poco/Ext/ThreadNumber.h>
@ -39,7 +42,6 @@ ThreadStatus::ThreadStatus()
last_rusage = std::make_unique<RUsageCounters>();
last_taskstats = std::make_unique<TasksStatsCounters>();
taskstats_getter = std::make_unique<TaskStatsInfoGetter>();
memory_tracker.setDescription("(for thread)");
log = &Poco::Logger::get("ThreadStatus");
@ -70,9 +72,12 @@ void ThreadStatus::initPerformanceCounters()
++queries_started;
*last_rusage = RUsageCounters::current(query_start_time_nanoseconds);
has_permissions_for_taskstats = TaskStatsInfoGetter::checkProcessHasRequiredPermissions();
if (has_permissions_for_taskstats)
if (TaskStatsInfoGetter::checkPermissions())
{
taskstats_getter = std::make_unique<TaskStatsInfoGetter>();
*last_taskstats = TasksStatsCounters::current();
}
}
void ThreadStatus::updatePerformanceCounters()
@ -80,7 +85,7 @@ void ThreadStatus::updatePerformanceCounters()
try
{
RUsageCounters::updateProfileEvents(*last_rusage, performance_counters);
if (has_permissions_for_taskstats)
if (taskstats_getter)
TasksStatsCounters::updateProfileEvents(*last_taskstats, performance_counters);
}
catch (...)

View File

@ -33,7 +33,6 @@ using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>;
class ThreadGroupStatus
{
public:
mutable std::shared_mutex mutex;
ProfileEvents::Counters performance_counters{VariableContext::Process};
@ -126,7 +125,6 @@ public:
~ThreadStatus();
protected:
ThreadStatus();
void initPerformanceCounters();
@ -160,11 +158,11 @@ protected:
/// Use ptr not to add extra dependencies in the header
std::unique_ptr<RUsageCounters> last_rusage;
std::unique_ptr<TasksStatsCounters> last_taskstats;
/// Set only if we have enough capabilities.
std::unique_ptr<TaskStatsInfoGetter> taskstats_getter;
bool has_permissions_for_taskstats = false;
public:
/// Implicitly finalizes current thread in the destructor
class CurrentThreadScope
{

View File

@ -7,27 +7,6 @@ namespace zkutil
{
/// You should reinitialize ZooKeeper session in case of these errors
inline bool isHardwareError(int32_t zk_return_code)
{
return zk_return_code == ZooKeeperImpl::ZooKeeper::ZINVALIDSTATE
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZSESSIONMOVED
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZCONNECTIONLOSS
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZOPERATIONTIMEOUT;
}
/// Valid errors sent from server
inline bool isUserError(int32_t zk_return_code)
{
return zk_return_code == ZooKeeperImpl::ZooKeeper::ZNONODE
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZBADVERSION
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZNOCHILDRENFOREPHEMERALS
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZNOTEMPTY;
}
using KeeperException = ZooKeeperImpl::Exception;

View File

@ -6,7 +6,7 @@
#include <memory>
#include <common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
namespace ProfileEvents

View File

@ -493,7 +493,7 @@ Responses ZooKeeper::multi(const Requests & requests)
int32_t ZooKeeper::tryMulti(const Requests & requests, Responses & responses)
{
int32_t code = multiImpl(requests, responses);
if (code && !isUserError(code))
if (code && !ZooKeeperImpl::ZooKeeper::isUserError(code))
throw KeeperException(code);
return code;
}
@ -824,7 +824,7 @@ size_t KeeperMultiException::getFailedOpIndex(int32_t code, const Responses & re
if (responses[index]->error)
return index;
if (!isUserError(code))
if (!ZooKeeperImpl::ZooKeeper::isUserError(code))
throw DB::Exception("There are no failed OPs because '" + ZooKeeper::error2string(code) + "' is not valid response code for that",
DB::ErrorCodes::LOGICAL_ERROR);
@ -850,7 +850,7 @@ void KeeperMultiException::check(int32_t code, const Requests & requests, const
if (!code)
return;
if (isUserError(code))
if (ZooKeeperImpl::ZooKeeper::isUserError(code))
throw KeeperMultiException(code, requests, responses);
else
throw KeeperException(code);

View File

@ -25,7 +25,9 @@ namespace DB
namespace ProfileEvents
{
extern const Event ZooKeeperExceptions;
extern const Event ZooKeeperUserExceptions;
extern const Event ZooKeeperHardwareExceptions;
extern const Event ZooKeeperOtherExceptions;
extern const Event ZooKeeperInit;
extern const Event ZooKeeperTransactions;
extern const Event ZooKeeperCreate;
@ -267,7 +269,12 @@ namespace ZooKeeperImpl
Exception::Exception(const std::string & msg, const int32_t code, int)
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code)
{
ProfileEvents::increment(ProfileEvents::ZooKeeperExceptions);
if (ZooKeeper::isUserError(code))
ProfileEvents::increment(ProfileEvents::ZooKeeperUserExceptions);
else if (ZooKeeper::isHardwareError(code))
ProfileEvents::increment(ProfileEvents::ZooKeeperHardwareExceptions);
else
ProfileEvents::increment(ProfileEvents::ZooKeeperOtherExceptions);
}
Exception::Exception(const std::string & msg, const int32_t code)
@ -515,6 +522,25 @@ const char * ZooKeeper::errorMessage(int32_t code)
return "unknown error";
}
bool ZooKeeper::isHardwareError(int32_t zk_return_code)
{
return zk_return_code == ZooKeeperImpl::ZooKeeper::ZINVALIDSTATE
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZSESSIONMOVED
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZCONNECTIONLOSS
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZMARSHALLINGERROR
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZOPERATIONTIMEOUT;
}
bool ZooKeeper::isUserError(int32_t zk_return_code)
{
return zk_return_code == ZooKeeperImpl::ZooKeeper::ZNONODE
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZBADVERSION
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZNOCHILDRENFOREPHEMERALS
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS
|| zk_return_code == ZooKeeperImpl::ZooKeeper::ZNOTEMPTY;
}
ZooKeeper::~ZooKeeper()
{

View File

@ -549,6 +549,12 @@ public:
ZSESSIONMOVED = -118 /// Session moved to another server, so operation is ignored
};
/// Network errors and similar. You should reinitialize ZooKeeper session in case of these errors
static bool isHardwareError(int32_t code);
/// Valid errors sent from the server about database state (like "no node"). Logical and authentication errors (like "bad arguments") are not here.
static bool isUserError(int32_t code);
static const char * errorMessage(int32_t code);
/// For watches.

View File

@ -1,4 +1,4 @@
#include <Common/BackgroundSchedulePool.h>
#include "BackgroundSchedulePool.h"
#include <Common/MemoryTracker.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
@ -143,12 +143,6 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size)
{
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads");
/// Put all threads of both thread pools to one thread group
/// The master thread exits immediately
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
CurrentThread::detachQuery();
threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
@ -217,14 +211,29 @@ void BackgroundSchedulePool::cancelDelayedTask(const TaskInfoPtr & task, std::lo
}
void BackgroundSchedulePool::attachToThreadGroup()
{
std::lock_guard lock(delayed_tasks_mutex);
if (thread_group)
{
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
}
else
{
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
}
}
void BackgroundSchedulePool::threadFunction()
{
setThreadName("BackgrSchedPool");
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool);
while (!shutdown)
@ -242,8 +251,7 @@ void BackgroundSchedulePool::delayExecutionThreadFunction()
{
setThreadName("BckSchPoolDelay");
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
while (!shutdown)

View File

@ -142,6 +142,8 @@ private:
/// Thread group used for profiling purposes
ThreadGroupStatusPtr thread_group;
void attachToThreadGroup();
};
using BackgroundSchedulePoolPtr = std::shared_ptr<BackgroundSchedulePool>;

View File

@ -273,21 +273,21 @@ namespace DB
}
bool DecField::operator < (const DecField & r) const
bool DecimalField::operator < (const DecimalField & r) const
{
using Comparator = DecimalComparison<Dec128, Dec128, LessOp>;
return Comparator::compare(Dec128(dec), Dec128(r.dec), scale, r.scale);
using Comparator = DecimalComparison<Decimal128, Decimal128, LessOp>;
return Comparator::compare(Decimal128(dec), Decimal128(r.dec), scale, r.scale);
}
bool DecField::operator <= (const DecField & r) const
bool DecimalField::operator <= (const DecimalField & r) const
{
using Comparator = DecimalComparison<Dec128, Dec128, LessOrEqualsOp>;
return Comparator::compare(Dec128(dec), Dec128(r.dec), scale, r.scale);
using Comparator = DecimalComparison<Decimal128, Decimal128, LessOrEqualsOp>;
return Comparator::compare(Decimal128(dec), Decimal128(r.dec), scale, r.scale);
}
bool DecField::operator == (const DecField & r) const
bool DecimalField::operator == (const DecimalField & r) const
{
using Comparator = DecimalComparison<Dec128, Dec128, EqualsOp>;
return Comparator::compare(Dec128(dec), Dec128(r.dec), scale, r.scale);
using Comparator = DecimalComparison<Decimal128, Decimal128, EqualsOp>;
return Comparator::compare(Decimal128(dec), Decimal128(r.dec), scale, r.scale);
}
}

View File

@ -28,29 +28,29 @@ using TupleBackend = std::vector<Field>;
STRONG_TYPEDEF(TupleBackend, Tuple) /// Array and Tuple are different types with equal representation inside Field.
class DecField
class DecimalField
{
public:
static constexpr UInt32 wrongScale() { return std::numeric_limits<UInt32>::max(); }
DecField(Int128 value, UInt32 scale_ = wrongScale())
DecimalField(Int128 value, UInt32 scale_ = wrongScale())
: dec(value),
scale(scale_)
{}
operator Dec32() const { return dec; }
operator Dec64() const { return dec; }
operator Dec128() const { return dec; }
operator Decimal32() const { return dec; }
operator Decimal64() const { return dec; }
operator Decimal128() const { return dec; }
UInt32 getScale() const { return scale; }
bool operator < (const DecField & r) const;
bool operator <= (const DecField & r) const;
bool operator == (const DecField & r) const;
bool operator < (const DecimalField & r) const;
bool operator <= (const DecimalField & r) const;
bool operator == (const DecimalField & r) const;
bool operator > (const DecField & r) const { return r < *this; }
bool operator >= (const DecField & r) const { return r <= * this; }
bool operator != (const DecField & r) const { return !(*this == r); }
bool operator > (const DecimalField & r) const { return r < *this; }
bool operator >= (const DecimalField & r) const { return r <= * this; }
bool operator != (const DecimalField & r) const { return !(*this == r); }
private:
Int128 dec;
@ -294,7 +294,7 @@ public:
case Types::String: return get<String>() < rhs.get<String>();
case Types::Array: return get<Array>() < rhs.get<Array>();
case Types::Tuple: return get<Tuple>() < rhs.get<Tuple>();
case Types::Decimal: return get<DecField>() < rhs.get<DecField>();
case Types::Decimal: return get<DecimalField>() < rhs.get<DecimalField>();
}
throw Exception("Bad type of Field", ErrorCodes::BAD_TYPE_OF_FIELD);
@ -323,7 +323,7 @@ public:
case Types::String: return get<String>() <= rhs.get<String>();
case Types::Array: return get<Array>() <= rhs.get<Array>();
case Types::Tuple: return get<Tuple>() <= rhs.get<Tuple>();
case Types::Decimal: return get<DecField>() <= rhs.get<DecField>();
case Types::Decimal: return get<DecimalField>() <= rhs.get<DecimalField>();
}
throw Exception("Bad type of Field", ErrorCodes::BAD_TYPE_OF_FIELD);
@ -350,7 +350,7 @@ public:
case Types::Tuple: return get<Tuple>() == rhs.get<Tuple>();
case Types::UInt128: return get<UInt128>() == rhs.get<UInt128>();
case Types::Int128: return get<Int128>() == rhs.get<Int128>();
case Types::Decimal: return get<DecField>() == rhs.get<DecField>();
case Types::Decimal: return get<DecimalField>() == rhs.get<DecimalField>();
}
throw Exception("Bad type of Field", ErrorCodes::BAD_TYPE_OF_FIELD);
@ -363,7 +363,7 @@ public:
private:
std::aligned_union_t<DBMS_MIN_FIELD_SIZE - sizeof(Types::Which),
Null, UInt64, UInt128, Int64, Int128, Float64, String, Array, Tuple, DecField
Null, UInt64, UInt128, Int64, Int128, Float64, String, Array, Tuple, DecimalField
> storage;
Types::Which which;
@ -412,7 +412,7 @@ private:
case Types::String: f(field.template get<String>()); return;
case Types::Array: f(field.template get<Array>()); return;
case Types::Tuple: f(field.template get<Tuple>()); return;
case Types::Decimal: f(field.template get<DecField>()); return;
case Types::Decimal: f(field.template get<DecimalField>()); return;
default:
throw Exception("Bad type of Field", ErrorCodes::BAD_TYPE_OF_FIELD);
@ -496,7 +496,7 @@ template <> struct Field::TypeToEnum<Float64> { static const Types::Which value
template <> struct Field::TypeToEnum<String> { static const Types::Which value = Types::String; };
template <> struct Field::TypeToEnum<Array> { static const Types::Which value = Types::Array; };
template <> struct Field::TypeToEnum<Tuple> { static const Types::Which value = Types::Tuple; };
template <> struct Field::TypeToEnum<DecField>{ static const Types::Which value = Types::Decimal; };
template <> struct Field::TypeToEnum<DecimalField>{ static const Types::Which value = Types::Decimal; };
template <> struct Field::EnumToType<Field::Types::Null> { using Type = Null; };
template <> struct Field::EnumToType<Field::Types::UInt64> { using Type = UInt64; };
@ -507,7 +507,7 @@ template <> struct Field::EnumToType<Field::Types::Float64> { using Type = Float
template <> struct Field::EnumToType<Field::Types::String> { using Type = String; };
template <> struct Field::EnumToType<Field::Types::Array> { using Type = Array; };
template <> struct Field::EnumToType<Field::Types::Tuple> { using Type = Tuple; };
template <> struct Field::EnumToType<Field::Types::Decimal> { using Type = DecField; };
template <> struct Field::EnumToType<Field::Types::Decimal> { using Type = DecimalField; };
template <typename T>
@ -551,9 +551,9 @@ template <> struct NearestFieldType<Int16> { using Type = Int64; };
template <> struct NearestFieldType<Int32> { using Type = Int64; };
template <> struct NearestFieldType<Int64> { using Type = Int64; };
template <> struct NearestFieldType<Int128> { using Type = Int128; };
template <> struct NearestFieldType<Dec32> { using Type = DecField; };
template <> struct NearestFieldType<Dec64> { using Type = DecField; };
template <> struct NearestFieldType<Dec128> { using Type = DecField; };
template <> struct NearestFieldType<Decimal32> { using Type = DecimalField; };
template <> struct NearestFieldType<Decimal64> { using Type = DecimalField; };
template <> struct NearestFieldType<Decimal128> { using Type = DecimalField; };
template <> struct NearestFieldType<Float32> { using Type = Float64; };
template <> struct NearestFieldType<Float64> { using Type = Float64; };
template <> struct NearestFieldType<String> { using Type = String; };

View File

@ -118,60 +118,55 @@ template <> struct is_arithmetic<__int128>
namespace DB
{
/// Own FieldType for Decimal
/// Own FieldType for Decimal.
/// It is only a "storage" for decimal. To perform operations, you also have to provide a scale (number of digits after point).
template <typename T>
struct Dec
struct Decimal
{
using NativeType = T;
Dec() = default;
Dec(Dec<T> &&) = default;
Dec(const Dec<T> &) = default;
Decimal() = default;
Decimal(Decimal<T> &&) = default;
Decimal(const Decimal<T> &) = default;
Dec(const T & value_)
Decimal(const T & value_)
: value(value_)
{}
template <typename U>
Dec(const Dec<U> & x)
Decimal(const Decimal<U> & x)
: value(x)
{}
constexpr Dec<T> & operator = (Dec<T> &&) = default;
constexpr Dec<T> & operator = (const Dec<T> &) = default;
constexpr Decimal<T> & operator = (Decimal<T> &&) = default;
constexpr Decimal<T> & operator = (const Decimal<T> &) = default;
operator T () const { return value; }
const Dec<T> & operator += (const T & x) { value += x; return *this; }
const Dec<T> & operator -= (const T & x) { value -= x; return *this; }
const Dec<T> & operator *= (const T & x) { value *= x; return *this; }
const Dec<T> & operator /= (const T & x) { value /= x; return *this; }
const Dec<T> & operator %= (const T & x) { value %= x; return *this; }
const Decimal<T> & operator += (const T & x) { value += x; return *this; }
const Decimal<T> & operator -= (const T & x) { value -= x; return *this; }
const Decimal<T> & operator *= (const T & x) { value *= x; return *this; }
const Decimal<T> & operator /= (const T & x) { value /= x; return *this; }
const Decimal<T> & operator %= (const T & x) { value %= x; return *this; }
T value;
};
using Dec32 = Dec<Int32>;
using Dec64 = Dec<Int64>;
using Dec128 = Dec<Int128>;
using Decimal32 = Decimal<Int32>;
using Decimal64 = Decimal<Int64>;
using Decimal128 = Decimal<Int128>;
template <> struct TypeName<Dec32> { static const char * get() { return "Dec32"; } };
template <> struct TypeName<Dec64> { static const char * get() { return "Dec64"; } };
template <> struct TypeName<Dec128> { static const char * get() { return "Dec128"; } };
template <> struct TypeName<Decimal32> { static const char * get() { return "Decimal32"; } };
template <> struct TypeName<Decimal64> { static const char * get() { return "Decimal64"; } };
template <> struct TypeName<Decimal128> { static const char * get() { return "Decimal128"; } };
template <> struct TypeId<Dec32> { static constexpr const size_t value = 16; };
template <> struct TypeId<Dec64> { static constexpr const size_t value = 17; };
template <> struct TypeId<Dec128> { static constexpr const size_t value = 18; };
template <> struct TypeId<Decimal32> { static constexpr const size_t value = 16; };
template <> struct TypeId<Decimal64> { static constexpr const size_t value = 17; };
template <> struct TypeId<Decimal128> { static constexpr const size_t value = 18; };
template <typename T>
inline constexpr bool decTrait() { return false; }
template <> constexpr bool decTrait<Dec32>() { return true; }
template <> constexpr bool decTrait<Dec64>() { return true; }
template <> constexpr bool decTrait<Dec128>() { return true; }
template <typename T>
inline constexpr bool decBaseTrait() { return false; }
template <> constexpr bool decBaseTrait<Int32>() { return true; }
template <> constexpr bool decBaseTrait<Int64>() { return true; }
template <> constexpr bool decBaseTrait<Int128>() { return true; }
constexpr bool IsDecimalNumber = false;
template <> constexpr bool IsDecimalNumber<Decimal32> = true;
template <> constexpr bool IsDecimalNumber<Decimal64> = true;
template <> constexpr bool IsDecimalNumber<Decimal128> = true;
}

View File

@ -15,6 +15,20 @@ namespace ErrorCodes
}
static ColumnPtr castColumnWithDiagnostic(const ColumnWithTypeAndName & src_elem, const ColumnWithTypeAndName & res_elem, const Context & context)
{
try
{
return castColumn(src_elem, res_elem.type, context);
}
catch (Exception & e)
{
e.addMessage("while converting source column " + backQuoteIfNeed(src_elem.name) + " to destination column " + backQuoteIfNeed(res_elem.name));
throw;
}
}
ConvertingBlockInputStream::ConvertingBlockInputStream(
const Context & context_,
const BlockInputStreamPtr & input,
@ -69,7 +83,7 @@ ConvertingBlockInputStream::ConvertingBlockInputStream(
/// Check conversion by dry run CAST function.
castColumn(src_elem, res_elem.type, context);
castColumnWithDiagnostic(src_elem, res_elem, context);
}
}
@ -87,7 +101,7 @@ Block ConvertingBlockInputStream::readImpl()
const auto & src_elem = src.getByPosition(conversion[res_pos]);
auto & res_elem = res.getByPosition(res_pos);
ColumnPtr converted = castColumn(src_elem, res_elem.type, context);
ColumnPtr converted = castColumnWithDiagnostic(src_elem, res_elem, context);
if (src_elem.column->isColumnConst() && !res_elem.column->isColumnConst())
converted = converted->convertToFullColumnIfConst();

View File

@ -17,8 +17,9 @@ namespace ErrorCodes
}
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name)
: expression(expression_)
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_,
const String & filter_column_name, bool remove_filter)
: remove_filter(remove_filter), expression(expression_)
{
children.push_back(input);
@ -40,6 +41,9 @@ FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input
FilterDescription filter_description_check(*column_elem.column);
column_elem.column = column_elem.type->createColumnConst(header.rows(), UInt64(1));
}
if (remove_filter)
header.erase(filter_column_name);
}
@ -69,7 +73,7 @@ Block FilterBlockInputStream::readImpl()
Block res;
if (constant_filter_description.always_false)
return res;
return removeFilterIfNeed(std::move(res));
/// Until non-empty block after filtering or end of stream.
while (1)
@ -81,7 +85,7 @@ Block FilterBlockInputStream::readImpl()
expression->execute(res);
if (constant_filter_description.always_true)
return res;
return removeFilterIfNeed(std::move(res));
size_t columns = res.columns();
ColumnPtr column = res.safeGetByPosition(filter_column).column;
@ -100,7 +104,7 @@ Block FilterBlockInputStream::readImpl()
}
if (constant_filter_description.always_true)
return res;
return removeFilterIfNeed(std::move(res));
FilterDescription filter_and_holder(*column);
@ -142,7 +146,7 @@ Block FilterBlockInputStream::readImpl()
/// Replace the column with the filter by a constant.
res.safeGetByPosition(filter_column).column = res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, UInt64(1));
/// No need to touch the rest of the columns.
return res;
return removeFilterIfNeed(std::move(res));
}
/// Filter the rest of the columns.
@ -170,9 +174,18 @@ Block FilterBlockInputStream::readImpl()
current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
}
return res;
return removeFilterIfNeed(std::move(res));
}
}
Block FilterBlockInputStream::removeFilterIfNeed(Block && block)
{
if (block && remove_filter)
block.erase(static_cast<size_t>(filter_column));
return std::move(block);
}
}

View File

@ -20,7 +20,8 @@ private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_);
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_,
const String & filter_column_name_, bool remove_filter = false);
String getName() const override;
Block getTotals() override;
@ -29,12 +30,16 @@ public:
protected:
Block readImpl() override;
bool remove_filter;
private:
ExpressionActionsPtr expression;
Block header;
ssize_t filter_column;
ConstantFilterDescription constant_filter_description;
Block removeFilterIfNeed(Block && block);
};
}

View File

@ -300,7 +300,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupSt
try
{
if (thread_group)
CurrentThread::attachTo(thread_group);
CurrentThread::attachToIfDetached(thread_group);
setThreadName("MergeAggMergThr");
while (!parallel_merge_data->finish)

View File

@ -139,7 +139,7 @@ void RemoteBlockInputStream::sendExternalTables()
for (const auto & table : external_tables)
{
StoragePtr cur = table.second;
QueryProcessingStage::Enum read_from_table_stage = QueryProcessingStage::Complete;
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(context);
BlockInputStreams input = cur->read(cur->getColumns().getNamesOfPhysical(), {}, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0)

View File

@ -47,7 +47,7 @@ try
Names column_names;
column_names.push_back("number");
QueryProcessingStage::Enum stage;
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreamPtr in;
in = table->read(column_names, {}, context, stage, 8192, 1)[0];

View File

@ -52,7 +52,7 @@ try
Names column_names;
column_names.push_back("number");
QueryProcessingStage::Enum stage;
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreamPtr in = table->read(column_names, {}, context, stage, 8192, 1)[0];
in = std::make_shared<FilterBlockInputStream>(in, expression, "equals(modulo(number, 3), 1)");

View File

@ -34,7 +34,7 @@ try
StoragePtr table = context.getTable("default", "hits6");
QueryProcessingStage::Enum stage;
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreams streams = table->read(column_names, {}, context, stage, settings.max_block_size, settings.max_threads);
for (size_t i = 0, size = streams.size(); i < size; ++i)

View File

@ -669,6 +669,7 @@ void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams(
}
};
bool first_dictionary = true;
while (limit)
{
if (state_with_dictionary->num_pending_rows == 0)
@ -681,8 +682,11 @@ void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams(
index_type.deserialize(*indexes_stream);
if (index_type.need_global_dictionary && (!global_dictionary || index_type.need_update_dictionary))
if (index_type.need_global_dictionary && (!global_dictionary || index_type.need_update_dictionary || (first_dictionary && !settings.continuous_reading)))
{
readDictionary();
first_dictionary = false;
}
if (state_with_dictionary->index_type.has_additional_keys)
readAdditionalKeys();

View File

@ -86,7 +86,7 @@ T DataTypeDecimal<T>::parseFromString(const String & str) const
template <typename T>
void DataTypeDecimal<T>::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
FieldType x = get<DecField>(field);
FieldType x = get<DecimalField>(field);
writeBinary(x, ostr);
}
@ -116,7 +116,7 @@ void DataTypeDecimal<T>::deserializeBinary(Field & field, ReadBuffer & istr) con
{
typename FieldType::NativeType x;
readBinary(x, istr);
field = DecField(T(x), scale);
field = DecimalField(T(x), scale);
}
template <typename T>
@ -141,7 +141,7 @@ void DataTypeDecimal<T>::deserializeBinaryBulk(IColumn & column, ReadBuffer & is
template <typename T>
Field DataTypeDecimal<T>::getDefault() const
{
return DecField(T(0), scale);
return DecimalField(T(0), scale);
}
@ -172,17 +172,17 @@ static DataTypePtr create(const ASTPtr & arguments)
UInt64 precision_value = precision->value.get<UInt64>();
Int64 scale_value = scale->value.get<Int64>();
if (precision_value < minDecimalPrecision() || precision_value > maxDecimalPrecision<Dec128>())
if (precision_value < minDecimalPrecision() || precision_value > maxDecimalPrecision<Decimal128>())
throw Exception("Wrong precision", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (scale_value < 0 || static_cast<UInt64>(scale_value) > precision_value)
throw Exception("Negative scales and scales larger than presicion are not supported", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (precision_value <= maxDecimalPrecision<Dec32>())
return std::make_shared<DataTypeDecimal<Dec32>>(precision_value, scale_value);
else if (precision_value <= maxDecimalPrecision<Dec64>())
return std::make_shared<DataTypeDecimal<Dec64>>(precision_value, scale_value);
return std::make_shared<DataTypeDecimal<Dec128>>(precision_value, scale_value);
if (precision_value <= maxDecimalPrecision<Decimal32>())
return std::make_shared<DataTypeDecimal<Decimal32>>(precision_value, scale_value);
else if (precision_value <= maxDecimalPrecision<Decimal64>())
return std::make_shared<DataTypeDecimal<Decimal64>>(precision_value, scale_value);
return std::make_shared<DataTypeDecimal<Decimal128>>(precision_value, scale_value);
}
@ -194,27 +194,27 @@ void registerDataTypeDecimal(DataTypeFactory & factory)
template <>
Dec32 DataTypeDecimal<Dec32>::getScaleMultiplier(UInt32 scale_)
Decimal32 DataTypeDecimal<Decimal32>::getScaleMultiplier(UInt32 scale_)
{
return common::exp10_i32(scale_);
}
template <>
Dec64 DataTypeDecimal<Dec64>::getScaleMultiplier(UInt32 scale_)
Decimal64 DataTypeDecimal<Decimal64>::getScaleMultiplier(UInt32 scale_)
{
return common::exp10_i64(scale_);
}
template <>
Dec128 DataTypeDecimal<Dec128>::getScaleMultiplier(UInt32 scale_)
Decimal128 DataTypeDecimal<Decimal128>::getScaleMultiplier(UInt32 scale_)
{
return common::exp10_i128(scale_);
}
/// Explicit template instantiations.
template class DataTypeDecimal<Dec32>;
template class DataTypeDecimal<Dec64>;
template class DataTypeDecimal<Dec128>;
template class DataTypeDecimal<Decimal32>;
template class DataTypeDecimal<Decimal64>;
template class DataTypeDecimal<Decimal128>;
}

View File

@ -64,9 +64,9 @@ class DataTypeSimpleSerialization : public IDataType
static constexpr size_t minDecimalPrecision() { return 1; }
template <typename T> static constexpr size_t maxDecimalPrecision() { return 0; }
template <> constexpr size_t maxDecimalPrecision<Dec32>() { return 9; }
template <> constexpr size_t maxDecimalPrecision<Dec64>() { return 18; }
template <> constexpr size_t maxDecimalPrecision<Dec128>() { return 38; }
template <> constexpr size_t maxDecimalPrecision<Decimal32>() { return 9; }
template <> constexpr size_t maxDecimalPrecision<Decimal64>() { return 18; }
template <> constexpr size_t maxDecimalPrecision<Decimal128>() { return 38; }
/// Implements Decimal(P, S), where P is precision, S is scale.
@ -244,11 +244,11 @@ inline const DataTypeDecimal<T> * checkDecimal(const IDataType & data_type)
inline bool isDecimal(const IDataType & data_type)
{
if (typeid_cast<const DataTypeDecimal<Dec32> *>(&data_type))
if (typeid_cast<const DataTypeDecimal<Decimal32> *>(&data_type))
return true;
if (typeid_cast<const DataTypeDecimal<Dec64> *>(&data_type))
if (typeid_cast<const DataTypeDecimal<Decimal64> *>(&data_type))
return true;
if (typeid_cast<const DataTypeDecimal<Dec128> *>(&data_type))
if (typeid_cast<const DataTypeDecimal<Decimal128> *>(&data_type))
return true;
return false;
}

View File

@ -135,6 +135,9 @@ public:
InputStreamGetter getter;
SubstreamPath path;
/// True if continue reading from previous positions in file. False if made fseek to the start of new granule.
bool continuous_reading = true;
bool position_independent_encoding = true;
/// If not zero, may be used to avoid reallocations while reading column of String type.
double avg_value_size_hint = 0;

View File

@ -206,9 +206,9 @@ bool callByTypeAndNumber(UInt8 number, F && f)
case TypeId<Int64>::value: f(T(), Int64()); break;
case TypeId<Int128>::value: f(T(), Int128()); break;
case TypeId<Dec32>::value: f(T(), Dec32()); break;
case TypeId<Dec64>::value: f(T(), Dec64()); break;
case TypeId<Dec128>::value: f(T(), Dec128()); break;
case TypeId<Decimal32>::value: f(T(), Decimal32()); break;
case TypeId<Decimal64>::value: f(T(), Decimal64()); break;
case TypeId<Decimal128>::value: f(T(), Decimal128()); break;
default:
return false;
}
@ -234,9 +234,9 @@ inline bool callByNumbers(UInt8 type_num1, UInt8 type_num2, F && f)
case TypeId<Int64>::value: return callByTypeAndNumber<Int64>(type_num2, std::forward<F>(f));
case TypeId<Int128>::value: return callByTypeAndNumber<Int128>(type_num2, std::forward<F>(f));
case TypeId<Dec32>::value: return callByTypeAndNumber<Dec32>(type_num2, std::forward<F>(f));
case TypeId<Dec64>::value: return callByTypeAndNumber<Dec64>(type_num2, std::forward<F>(f));
case TypeId<Dec128>::value: return callByTypeAndNumber<Dec128>(type_num2, std::forward<F>(f));
case TypeId<Decimal32>::value: return callByTypeAndNumber<Decimal32>(type_num2, std::forward<F>(f));
case TypeId<Decimal64>::value: return callByTypeAndNumber<Decimal64>(type_num2, std::forward<F>(f));
case TypeId<Decimal128>::value: return callByTypeAndNumber<Decimal128>(type_num2, std::forward<F>(f));
default:
break;

View File

@ -577,7 +577,7 @@ using GreatestImpl = std::conditional_t<!NumberTraits::LeastGreatestSpecialCase<
template <typename A>
struct NegateImpl
{
using ResultType = std::conditional_t<decTrait<A>(), A, typename NumberTraits::ResultOfNegate<A>::Type>;
using ResultType = std::conditional_t<IsDecimalNumber<A>, A, typename NumberTraits::ResultOfNegate<A>::Type>;
static inline ResultType apply(A a)
{
@ -619,11 +619,11 @@ struct BitNotImpl
template <typename A>
struct AbsImpl
{
using ResultType = std::conditional_t<decTrait<A>(), A, typename NumberTraits::ResultOfAbs<A>::Type>;
using ResultType = std::conditional_t<IsDecimalNumber<A>, A, typename NumberTraits::ResultOfAbs<A>::Type>;
static inline ResultType apply(A a)
{
if constexpr (decTrait<A>())
if constexpr (IsDecimalNumber<A>)
return a < 0 ? A(-a) : a;
else if constexpr (std::is_integral_v<A> && std::is_signed_v<A>)
return a < 0 ? static_cast<ResultType>(~a) + 1 : a;
@ -717,9 +717,9 @@ struct IntExp10Impl
template <typename T> struct NativeType { using Type = T; };
template <> struct NativeType<Dec32> { using Type = Int32; };
template <> struct NativeType<Dec64> { using Type = Int64; };
template <> struct NativeType<Dec128> { using Type = Int128; };
template <> struct NativeType<Decimal32> { using Type = Int32; };
template <> struct NativeType<Decimal64> { using Type = Int64; };
template <> struct NativeType<Decimal128> { using Type = Int128; };
/// Binary operations for Decimals need scale args
/// +|- scale one of args (which scale factor is not 1). ScaleR = oneof(Scale1, Scale2);
@ -763,7 +763,7 @@ struct DecimalBinaryOperation
return;
}
}
else if constexpr (is_division && decTrait<B>())
else if constexpr (is_division && IsDecimalNumber<B>)
{
for (size_t i = 0; i < size; ++i)
c[i] = applyScaledDiv(a[i], b[i], scale_a);
@ -794,7 +794,7 @@ struct DecimalBinaryOperation
return;
}
}
else if constexpr (is_division && decTrait<B>())
else if constexpr (is_division && IsDecimalNumber<B>)
{
for (size_t i = 0; i < size; ++i)
c[i] = applyScaledDiv(a[i], b, scale_a);
@ -825,7 +825,7 @@ struct DecimalBinaryOperation
return;
}
}
else if constexpr (is_division && decTrait<B>())
else if constexpr (is_division && IsDecimalNumber<B>)
{
for (size_t i = 0; i < size; ++i)
c[i] = applyScaledDiv(a, b[i], scale_a);
@ -846,7 +846,7 @@ struct DecimalBinaryOperation
else if (scale_b != 1)
return applyScaled<false>(a, b, scale_b);
}
else if constexpr (is_division && decTrait<B>())
else if constexpr (is_division && IsDecimalNumber<B>)
return applyScaledDiv(a, b, scale_a);
return apply(a, b);
}
@ -896,7 +896,7 @@ private:
if constexpr (is_division)
{
bool overflow = false;
if constexpr (!decTrait<A>())
if constexpr (!IsDecimalNumber<A>)
overflow |= common::mulOverflow(scale, scale, scale);
overflow |= common::mulOverflow(a, scale, a);
if (overflow)
@ -931,14 +931,14 @@ template <> constexpr bool IsDateOrDateTime<DataTypeDate> = true;
template <> constexpr bool IsDateOrDateTime<DataTypeDateTime> = true;
template <typename DataType> constexpr bool IsDecimal = false;
template <> constexpr bool IsDecimal<DataTypeDecimal<Dec32>> = true;
template <> constexpr bool IsDecimal<DataTypeDecimal<Dec64>> = true;
template <> constexpr bool IsDecimal<DataTypeDecimal<Dec128>> = true;
template <> constexpr bool IsDecimal<DataTypeDecimal<Decimal32>> = true;
template <> constexpr bool IsDecimal<DataTypeDecimal<Decimal64>> = true;
template <> constexpr bool IsDecimal<DataTypeDecimal<Decimal128>> = true;
template <typename T0, typename T1> constexpr bool UseLeftDecimal = false;
template <> constexpr bool UseLeftDecimal<DataTypeDecimal<Dec128>, DataTypeDecimal<Dec32>> = true;
template <> constexpr bool UseLeftDecimal<DataTypeDecimal<Dec128>, DataTypeDecimal<Dec64>> = true;
template <> constexpr bool UseLeftDecimal<DataTypeDecimal<Dec64>, DataTypeDecimal<Dec32>> = true;
template <> constexpr bool UseLeftDecimal<DataTypeDecimal<Decimal128>, DataTypeDecimal<Decimal32>> = true;
template <> constexpr bool UseLeftDecimal<DataTypeDecimal<Decimal128>, DataTypeDecimal<Decimal64>> = true;
template <> constexpr bool UseLeftDecimal<DataTypeDecimal<Decimal64>, DataTypeDecimal<Decimal32>> = true;
template <typename T> using DataTypeFromFieldType = std::conditional_t<std::is_same_v<T, NumberTraits::Error>, InvalidType, DataTypeNumber<T>>;
@ -1020,9 +1020,9 @@ class FunctionBinaryArithmetic : public IFunction
DataTypeFloat64,
DataTypeDate,
DataTypeDateTime,
DataTypeDecimal<Dec32>,
DataTypeDecimal<Dec64>,
DataTypeDecimal<Dec128>
DataTypeDecimal<Decimal32>,
DataTypeDecimal<Decimal64>,
DataTypeDecimal<Decimal128>
>(type, std::forward<F>(f));
}
@ -1345,9 +1345,9 @@ class FunctionUnaryArithmetic : public IFunction
DataTypeInt64,
DataTypeFloat32,
DataTypeFloat64,
DataTypeDecimal<Dec32>,
DataTypeDecimal<Dec64>,
DataTypeDecimal<Dec128>
DataTypeDecimal<Decimal32>,
DataTypeDecimal<Decimal64>,
DataTypeDecimal<Decimal128>
>(type, std::forward<F>(f));
}

View File

@ -215,13 +215,13 @@ template <> struct ConstructDecInt<16> { using Type = Int128; };
template <typename T, typename U>
struct DecCompareInt
{
using Type = typename ConstructDecInt<(!decTrait<U>() || sizeof(T) > sizeof(U)) ? sizeof(T) : sizeof(U)>::Type;
using Type = typename ConstructDecInt<(!IsDecimalNumber<U> || sizeof(T) > sizeof(U)) ? sizeof(T) : sizeof(U)>::Type;
using TypeA = Type;
using TypeB = Type;
};
///
template <typename A, typename B, template <typename, typename> typename Operation, bool _actual = decTrait<A>() || decTrait<B>()>
template <typename A, typename B, template <typename, typename> typename Operation, bool _actual = IsDecimalNumber<A> || IsDecimalNumber<B>>
class DecimalComparison
{
public:
@ -257,7 +257,7 @@ public:
static bool compare(A a, B b, UInt32 scale_a, UInt32 scale_b)
{
static const UInt32 max_scale = maxDecimalPrecision<Dec128>();
static const UInt32 max_scale = maxDecimalPrecision<Decimal128>();
if (scale_a > max_scale || scale_b > max_scale)
throw Exception("Bad scale of decimal field", ErrorCodes::DECIMAL_OVERFLOW);
@ -292,7 +292,7 @@ private:
}
template <typename T, typename U>
static std::enable_if_t<decTrait<T>() && decTrait<U>(), Shift>
static std::enable_if_t<IsDecimalNumber<T> && IsDecimalNumber<U>, Shift>
getScales(const DataTypePtr & left_type, const DataTypePtr & right_type)
{
const DataTypeDecimal<T> * decimal0 = checkDecimal<T>(*left_type);
@ -314,7 +314,7 @@ private:
}
template <typename T, typename U>
static std::enable_if_t<decTrait<T>() && !decTrait<U>(), Shift>
static std::enable_if_t<IsDecimalNumber<T> && !IsDecimalNumber<U>, Shift>
getScales(const DataTypePtr & left_type, const DataTypePtr &)
{
Shift shift;
@ -325,7 +325,7 @@ private:
}
template <typename T, typename U>
static std::enable_if_t<!decTrait<T>() && decTrait<U>(), Shift>
static std::enable_if_t<!IsDecimalNumber<T> && IsDecimalNumber<U>, Shift>
getScales(const DataTypePtr &, const DataTypePtr & right_type)
{
Shift shift;

View File

@ -108,7 +108,8 @@ private:
inner,
outer,
singleLine,
pairOfLinesSinglePolygon,
pairOfLinesSingleConvexPolygon,
pairOfLinesSingleNonConvexPolygons,
pairOfLinesDifferentPolygons,
complexPolygon
};
@ -180,6 +181,9 @@ private:
/// Returns a list of half-planes were formed from intersection edges without box edges.
inline std::vector<HalfPlane> findHalfPlanes(const Box & box, const Polygon & intersection);
/// Check that polygon.outer() is convex.
inline bool isConvex(const Polygon & polygon);
using Distance = typename boost::geometry::default_comparable_distance_result<Point, Segment>::type;
/// min(distance(point, edge) : edge in polygon)
@ -306,9 +310,10 @@ bool PointInPolygonWithGrid<CoordinateType>::contains(CoordinateType x, Coordina
return false;
case CellType::singleLine:
return cell.half_planes[0].contains(x, y);
case CellType::pairOfLinesSinglePolygon:
case CellType::pairOfLinesSingleConvexPolygon:
return cell.half_planes[0].contains(x, y) && cell.half_planes[1].contains(x, y);
case CellType::pairOfLinesDifferentPolygons:
case CellType::pairOfLinesSingleNonConvexPolygons:
return cell.half_planes[0].contains(x, y) || cell.half_planes[1].contains(x, y);
case CellType::complexPolygon:
return boost::geometry::within(Point(x, y), polygons[cell.index_of_inner_polygon]);
@ -335,6 +340,35 @@ PointInPolygonWithGrid<CoordinateType>::distance(
return distance;
}
template <typename CoordinateType>
bool PointInPolygonWithGrid<CoordinateType>::isConvex(const PointInPolygonWithGrid<CoordinateType>::Polygon & polygon)
{
const auto & outer = polygon.outer();
/// Segment or point.
if (outer.size() < 4)
return false;
auto vecProduct = [](const Point & from, const Point & to) { return from.x() * to.y() - from.y() * to.x(); };
auto getVector = [](const Point & from, const Point & to) -> Point
{
return Point(to.x() - from.x(), to.y() - from.y());
};
Point first = getVector(outer[0], outer[1]);
Point prev = first;
for (auto i : ext::range(1, outer.size() - 1))
{
Point cur = getVector(outer[i], outer[i + 1]);
if (vecProduct(prev, cur) < 0)
return false;
prev = cur;
}
return vecProduct(prev, first) >= 0;
}
template <typename CoordinateType>
std::vector<typename PointInPolygonWithGrid<CoordinateType>::HalfPlane>
PointInPolygonWithGrid<CoordinateType>::findHalfPlanes(
@ -423,7 +457,8 @@ void PointInPolygonWithGrid<CoordinateType>::addCell(
}
else if (half_planes.size() == 2)
{
cells[index].type = CellType::pairOfLinesSinglePolygon;
cells[index].type = isConvex(intersection) ? CellType::pairOfLinesSingleConvexPolygon
: CellType::pairOfLinesSingleNonConvexPolygons;
cells[index].half_planes[0] = half_planes[0];
cells[index].half_planes[1] = half_planes[1];
}

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <boost/noncopyable.hpp>
#include <Common/Exception.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
/// https://stackoverflow.com/questions/20759750/resolving-redefinition-of-timespec-in-time-h
#define timespec linux_timespec

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <Common/Exception.h>
#include <common/logger_useful.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <ext/singleton.h>
#include <condition_variable>

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/ReadBufferAIO.h>
#include <IO/AIOContextPool.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBuffer.h>

View File

@ -67,8 +67,9 @@ bool ReadBufferFromFileDescriptor::nextImpl()
if (res > 0)
bytes_read += res;
/// NOTE: it is quite inaccurate on high loads since the thread could be replaced by another one and we will count cpu time of other thread
/// It is better to use taskstats::blkio_delay_total, but it is quite expensive to get it (TaskStatsInfoGetter has about 500K RPS)
/// It reports real time spent including the time spent while thread was preempted doing nothing.
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it (TaskStatsInfoGetter has about 500K RPS).
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/WriteBufferAIO.h>
#include <Common/ProfileEvents.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBuffer.h>

View File

@ -1,6 +1,6 @@
#include <IO/createReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
#include <IO/ReadBufferAIO.h>
#endif
#include <Common/ProfileEvents.h>
@ -14,10 +14,10 @@ namespace ProfileEvents
namespace DB
{
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(_MSC_VER)
#if !defined(__linux__)
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NOT_IMPLEMENTED;
}
#endif
@ -31,7 +31,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(const std::
}
else
{
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO);
return std::make_unique<ReadBufferAIO>(filename_, buffer_size_, flags_, existing_memory_);
#else

View File

@ -1,6 +1,6 @@
#include <IO/createWriteBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
#include <IO/WriteBufferAIO.h>
#endif
#include <Common/ProfileEvents.h>
@ -15,10 +15,10 @@ namespace ProfileEvents
namespace DB
{
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(_MSC_VER)
#if !defined(__linux__)
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NOT_IMPLEMENTED;
}
#endif
@ -33,7 +33,7 @@ WriteBufferFromFileBase * createWriteBufferFromFileBase(const std::string & file
}
else
{
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
return new WriteBufferAIO(filename_, buffer_size_, flags_, mode, existing_memory_);
#else

View File

@ -1,21 +1,18 @@
#include <map>
#include <set>
#include <boost/functional/hash/hash.hpp>
#include <Poco/Mutex.h>
#include <Poco/File.h>
#include <Poco/UUID.h>
#include <Poco/Net/IPAddress.h>
#include <common/logger_useful.h>
#include <pcg_random.hpp>
#include <Common/Macros.h>
#include <Common/escapeForFileName.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>

View File

@ -864,7 +864,7 @@ void DDLWorker::run()
}
catch (const zkutil::KeeperException & e)
{
if (!zkutil::isHardwareError(e.code))
if (!ZooKeeperImpl::ZooKeeper::isHardwareError(e.code))
throw;
}
}
@ -892,7 +892,7 @@ void DDLWorker::run()
}
catch (zkutil::KeeperException & e)
{
if (zkutil::isHardwareError(e.code))
if (ZooKeeperImpl::ZooKeeper::isHardwareError(e.code))
{
LOG_DEBUG(log, "Recovering ZooKeeper session after: " << getCurrentExceptionMessage(false));

View File

@ -1079,10 +1079,25 @@ void ExpressionActionsChain::finalize()
for (int i = static_cast<int>(steps.size()) - 1; i >= 0; --i)
{
Names required_output = steps[i].required_output;
std::unordered_map<String, size_t> required_output_indexes;
for (size_t j = 0; j < required_output.size(); ++j)
required_output_indexes[required_output[j]] = j;
auto & can_remove_required_output = steps[i].can_remove_required_output;
if (i + 1 < static_cast<int>(steps.size()))
{
const NameSet & additional_input = steps[i + 1].additional_input;
for (const auto & it : steps[i + 1].actions->getRequiredColumnsWithTypes())
required_output.push_back(it.name);
{
if (additional_input.count(it.name) == 0)
{
auto iter = required_output_indexes.find(it.name);
if (iter == required_output_indexes.end())
required_output.push_back(it.name);
else if (!can_remove_required_output.empty())
can_remove_required_output[iter->second] = false;
}
}
}
steps[i].actions->finalize(required_output);
}

View File

@ -241,7 +241,14 @@ struct ExpressionActionsChain
struct Step
{
ExpressionActionsPtr actions;
/// Columns were added to the block before current step in addition to prev step output.
NameSet additional_input;
/// Columns which are required in the result of current step.
Names required_output;
/// True if column from required_output is needed only for current step and not used in next actions
/// (and can be removed from block). Example: filter column for where actions.
/// If not empty, has the same size with required_output; is filled in finalize().
std::vector<bool> can_remove_required_output;
Step(const ExpressionActionsPtr & actions_ = nullptr, const Names & required_output_ = Names())
: actions(actions_), required_output(required_output_) {}

View File

@ -273,8 +273,8 @@ ExpressionAnalyzer::ExpressionAnalyzer(
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns();
/// Push the predicate expression down to the sub-queries.
rewrite_sub_queries = PredicateExpressionsOptimizer(select_query, settings).optimize();
/// Push the predicate expression down to the subqueries.
rewrite_subqueries = PredicateExpressionsOptimizer(select_query, settings).optimize();
/// Delete the unnecessary from `source_columns` list. Create `unknown_required_source_columns`. Form `columns_added_by_join`.
collectUsedColumns();
@ -2733,6 +2733,67 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
return true;
}
bool ExpressionAnalyzer::appendPrewhere(ExpressionActionsChain & chain, bool only_types)
{
assertSelect();
if (!select_query->prewhere_expression)
return false;
initChain(chain, source_columns);
auto & step = chain.getLastStep();
getRootActions(select_query->prewhere_expression, only_types, false, step.actions);
String prewhere_column_name = select_query->prewhere_expression->getColumnName();
step.required_output.push_back(prewhere_column_name);
step.can_remove_required_output.push_back(true);
{
/// Remove unused source_columns from prewhere actions.
auto tmp_actions = std::make_shared<ExpressionActions>(source_columns, settings);
getRootActions(select_query->prewhere_expression, only_types, false, tmp_actions);
tmp_actions->finalize({prewhere_column_name});
auto required_columns = tmp_actions->getRequiredColumns();
NameSet required_source_columns(required_columns.begin(), required_columns.end());
auto names = step.actions->getSampleBlock().getNames();
NameSet name_set(names.begin(), names.end());
for (const auto & column : source_columns)
if (required_source_columns.count(column.name) == 0)
name_set.erase(column.name);
Names required_output(name_set.begin(), name_set.end());
step.actions->finalize(required_output);
}
{
/// Add empty action with input = {prewhere actions output} + {unused source columns}
/// Reasons:
/// 1. Remove remove source columns which are used only in prewhere actions during prewhere actions execution.
/// Example: select A prewhere B > 0. B can be removed at prewhere step.
/// 2. Store side columns which were calculated during prewhere actions execution if they are used.
/// Example: select F(A) prewhere F(A) > 0. F(A) can be saved from prewhere step.
/// 3. Check if we can remove filter column at prewhere step. If we can, action will store single REMOVE_COLUMN.
ColumnsWithTypeAndName columns = step.actions->getSampleBlock().getColumnsWithTypeAndName();
auto required_columns = step.actions->getRequiredColumns();
NameSet prewhere_input_names(required_columns.begin(), required_columns.end());
NameSet unused_source_columns;
for (const auto & column : source_columns)
{
if (prewhere_input_names.count(column.name) == 0)
{
columns.emplace_back(column.type, column.name);
unused_source_columns.emplace(column.name);
}
}
chain.steps.emplace_back(std::make_shared<ExpressionActions>(std::move(columns), settings));
chain.steps.back().additional_input = std::move(unused_source_columns);
}
return true;
}
bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_types)
{
@ -2745,6 +2806,8 @@ bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_t
ExpressionActionsChain::Step & step = chain.steps.back();
step.required_output.push_back(select_query->where_expression->getColumnName());
step.can_remove_required_output = {true};
getRootActions(select_query->where_expression, only_types, false, step.actions);
return true;

View File

@ -152,6 +152,8 @@ public:
/// Before aggregation:
bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types);
bool appendJoin(ExpressionActionsChain & chain, bool only_types);
/// remove_filter is set in ExpressionActionsChain::finalize();
bool appendPrewhere(ExpressionActionsChain & chain, bool only_types);
bool appendWhere(ExpressionActionsChain & chain, bool only_types);
bool appendGroupBy(ExpressionActionsChain & chain, bool only_types);
void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types);
@ -189,7 +191,7 @@ public:
/// Create Set-s that we can from IN section to use the index on them.
void makeSetsForIndex();
bool isRewriteSubQueriesPredicate() { return rewrite_sub_queries; }
bool isRewriteSubqueriesPredicate() { return rewrite_subqueries; }
private:
ASTPtr ast;
@ -303,7 +305,7 @@ private:
size_t external_table_id = 1;
/// Predicate optimizer overrides the sub queries
bool rewrite_sub_queries = false;
bool rewrite_subqueries = false;
/** Remove all unnecessary columns from the list of all available columns of the table (`columns`).
* At the same time, form a set of unknown columns (`unknown_required_source_columns`),

View File

@ -198,7 +198,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (!context.tryGetExternalTable(it.first))
context.addExternalTable(it.first, it.second);
if (query_analyzer->isRewriteSubQueriesPredicate())
if (query_analyzer->isRewriteSubqueriesPredicate())
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
table_expression, getSubqueryContext(context), required_columns, QueryProcessingStage::Complete, subquery_depth + 1, only_analyze);
}
@ -293,9 +293,37 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
* throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries.
*/
bool has_prewhere = false;
bool has_where = false;
size_t where_step_num;
auto finalizeChain = [&](ExpressionActionsChain & chain)
{
chain.finalize();
if (has_prewhere)
res.prewhere_info->remove_prewhere_column = chain.steps.at(0).can_remove_required_output.at(0);
if (has_where)
res.remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0);
has_prewhere = has_where = false;
chain.clear();
};
{
ExpressionActionsChain chain;
if (query_analyzer->appendPrewhere(chain, !res.first_stage))
{
has_prewhere = true;
res.prewhere_info = std::make_shared<PrewhereInfo>(
chain.steps.front().actions, query.prewhere_expression->getColumnName());
chain.addStep();
}
res.need_aggregate = query_analyzer->hasAggregation();
query_analyzer->appendArrayJoin(chain, dry_run || !res.first_stage);
@ -309,7 +337,8 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
if (query_analyzer->appendWhere(chain, dry_run || !res.first_stage))
{
res.has_where = true;
where_step_num = chain.steps.size() - 1;
has_where = res.has_where = true;
res.before_where = chain.getLastActions();
chain.addStep();
}
@ -320,8 +349,7 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
query_analyzer->appendAggregateFunctionsArguments(chain, dry_run || !res.first_stage);
res.before_aggregation = chain.getLastActions();
chain.finalize();
chain.clear();
finalizeChain(chain);
if (query_analyzer->appendHaving(chain, dry_run || !res.second_stage))
{
@ -348,8 +376,7 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
query_analyzer->appendProjectResult(chain);
res.final_projection = chain.getLastActions();
chain.finalize();
chain.clear();
finalizeChain(chain);
}
/// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
@ -376,31 +403,65 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
* then perform the remaining operations with one resulting stream.
*/
const Settings & settings = context.getSettingsRef();
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// PREWHERE optimization
if (storage)
{
if (!dry_run)
from_stage = storage->getQueryProcessingStage(context);
query_analyzer->makeSetsForIndex();
auto optimize_prewhere = [&](auto & merge_tree)
{
SelectQueryInfo query_info;
query_info.query = query_ptr;
query_info.sets = query_analyzer->getPreparedSets();
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && query.where_expression && !query.prewhere_expression && !query.final())
MergeTreeWhereOptimizer{query_info, context, merge_tree.getData(), query_analyzer->getRequiredSourceColumns(), log};
};
if (const StorageMergeTree * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get()))
optimize_prewhere(*merge_tree);
else if (const StorageReplicatedMergeTree * merge_tree = dynamic_cast<const StorageReplicatedMergeTree *>(storage.get()))
optimize_prewhere(*merge_tree);
}
AnalysisResult expressions;
if (dry_run)
{
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(source_header));
expressions = analyzeExpressions(QueryProcessingStage::FetchColumns, true);
if (expressions.prewhere_info)
pipeline.streams.back() = std::make_shared<FilterBlockInputStream>(
pipeline.streams.back(), expressions.prewhere_info->prewhere_actions,
expressions.prewhere_info->prewhere_column_name, expressions.prewhere_info->remove_prewhere_column
);
}
else
{
if (input)
pipeline.streams.push_back(input);
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(pipeline);
expressions = analyzeExpressions(from_stage, false);
if (from_stage == QueryProcessingStage::WithMergeableState && to_stage == QueryProcessingStage::WithMergeableState)
if (from_stage == QueryProcessingStage::WithMergeableState &&
to_stage == QueryProcessingStage::WithMergeableState)
throw Exception("Distributed on Distributed is not supported", ErrorCodes::NOT_IMPLEMENTED);
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
executeFetchColumns(from_stage, pipeline, expressions.prewhere_info);
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
expressions = analyzeExpressions(from_stage, false);
}
const Settings & settings = context.getSettingsRef();
if (to_stage > QueryProcessingStage::FetchColumns)
{
/// Now we will compose block streams that perform the necessary actions.
@ -433,7 +494,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
}
if (expressions.has_where)
executeWhere(pipeline, expressions.before_where);
executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);
if (expressions.need_aggregate)
executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
@ -562,7 +623,8 @@ static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, siz
}
}
QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline & pipeline)
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, Pipeline & pipeline, const PrewhereInfoPtr & prewhere_info)
{
/// Actions to calculate ALIAS if required.
ExpressionActionsPtr alias_actions;
@ -652,8 +714,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
max_streams = 1;
}
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery or prepared input?
if (!pipeline.streams.empty())
{
@ -685,32 +745,24 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
if (max_streams > 1 && !is_remote)
max_streams *= settings.max_streams_to_max_threads_ratio;
query_analyzer->makeSetsForIndex();
SelectQueryInfo query_info;
query_info.query = query_ptr;
query_info.sets = query_analyzer->getPreparedSets();
query_info.prewhere_info = prewhere_info;
/// PREWHERE optimization
{
auto optimize_prewhere = [&](auto & merge_tree)
{
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && query.where_expression && !query.prewhere_expression && !query.final())
MergeTreeWhereOptimizer{query_info, context, merge_tree.getData(), required_columns, log};
};
if (const StorageMergeTree * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get()))
optimize_prewhere(*merge_tree);
else if (const StorageReplicatedMergeTree * merge_tree = dynamic_cast<const StorageReplicatedMergeTree *>(storage.get()))
optimize_prewhere(*merge_tree);
}
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
pipeline.streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
if (pipeline.streams.empty())
{
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns)));
if (query_info.prewhere_info)
pipeline.streams.back() = std::make_shared<FilterBlockInputStream>(
pipeline.streams.back(), prewhere_info->prewhere_actions,
prewhere_info->prewhere_column_name, prewhere_info->remove_prewhere_column
);
}
pipeline.transform([&](auto & stream)
{
stream->addTableLock(table_lock);
@ -755,23 +807,21 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR);
/// Aliases in table declaration.
if (from_stage == QueryProcessingStage::FetchColumns && alias_actions)
if (processing_stage == QueryProcessingStage::FetchColumns && alias_actions)
{
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, alias_actions);
});
}
return from_stage;
}
void InterpreterSelectQuery::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression)
void InterpreterSelectQuery::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter)
{
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<FilterBlockInputStream>(stream, expression, query.where_expression->getColumnName());
stream = std::make_shared<FilterBlockInputStream>(stream, expression, query.where_expression->getColumnName(), remove_fiter);
});
}

View File

@ -8,6 +8,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/SelectQueryInfo.h>
namespace Poco { class Logger; }
@ -137,6 +138,8 @@ private:
bool has_order_by = false;
bool has_limit_by = false;
bool remove_where_filter = false;
ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
@ -154,6 +157,7 @@ private:
bool second_stage = false;
SubqueriesForSets subqueries_for_sets;
PrewhereInfoPtr prewhere_info;
};
AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage, bool dry_run);
@ -168,10 +172,9 @@ private:
/// dry_run - don't read from table, use empty header block instead.
void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);
/// Fetch data from the table. Returns the stage to which the query was processed in Storage.
QueryProcessingStage::Enum executeFetchColumns(Pipeline & pipeline);
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, Pipeline & pipeline, const PrewhereInfoPtr & prewhere_info);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row);

View File

@ -24,20 +24,20 @@ bool PredicateExpressionsOptimizer::optimize()
if (!settings.enable_optimize_predicate_expression || !ast_select || !ast_select->tables)
return false;
SubQueriesProjectionColumns all_subquery_projection_columns;
SubqueriesProjectionColumns all_subquery_projection_columns;
getAllSubqueryProjectionColumns(ast_select->tables.get(), all_subquery_projection_columns);
bool is_rewrite_sub_queries = false;
bool is_rewrite_subqueries = false;
if (!all_subquery_projection_columns.empty())
{
is_rewrite_sub_queries |= optimizeImpl(ast_select->where_expression, all_subquery_projection_columns, false);
is_rewrite_sub_queries |= optimizeImpl(ast_select->prewhere_expression, all_subquery_projection_columns, true);
is_rewrite_subqueries |= optimizeImpl(ast_select->where_expression, all_subquery_projection_columns, false);
is_rewrite_subqueries |= optimizeImpl(ast_select->prewhere_expression, all_subquery_projection_columns, true);
}
return is_rewrite_sub_queries;
return is_rewrite_subqueries;
}
bool PredicateExpressionsOptimizer::optimizeImpl(
ASTPtr & outer_expression, SubQueriesProjectionColumns & sub_queries_projection_columns, bool is_prewhere)
ASTPtr & outer_expression, SubqueriesProjectionColumns & subqueries_projection_columns, bool is_prewhere)
{
/// split predicate with `and`
PredicateExpressions outer_predicate_expressions = splitConjunctionPredicate(outer_expression);
@ -49,7 +49,7 @@ bool PredicateExpressionsOptimizer::optimizeImpl(
getExpressionDependentColumns(outer_predicate, outer_predicate_dependent);
/// TODO: remove origin expression
for (const auto & subquery_projection_columns : sub_queries_projection_columns)
for (const auto & subquery_projection_columns : subqueries_projection_columns)
{
auto subquery = static_cast<ASTSelectQuery *>(subquery_projection_columns.first);
const ProjectionsWithAliases projection_columns = subquery_projection_columns.second;
@ -168,7 +168,7 @@ bool PredicateExpressionsOptimizer::isAggregateFunction(ASTPtr & node)
return false;
}
void PredicateExpressionsOptimizer::getAllSubqueryProjectionColumns(IAST * node, SubQueriesProjectionColumns & all_subquery_projection_columns)
void PredicateExpressionsOptimizer::getAllSubqueryProjectionColumns(IAST * node, SubqueriesProjectionColumns & all_subquery_projection_columns)
{
if (auto ast_subquery = typeid_cast<ASTSubquery *>(node))
{
@ -221,7 +221,7 @@ bool PredicateExpressionsOptimizer::optimizeExpression(const ASTPtr & outer_expr
return true;
}
void PredicateExpressionsOptimizer::getSubqueryProjectionColumns(IAST * subquery, SubQueriesProjectionColumns & all_subquery_projection_columns, ASTs & output_projections)
void PredicateExpressionsOptimizer::getSubqueryProjectionColumns(IAST * subquery, SubqueriesProjectionColumns & all_subquery_projection_columns, ASTs & output_projections)
{
if (auto * with_union_subquery = typeid_cast<ASTSelectWithUnionQuery *>(subquery))
for (auto & select : with_union_subquery->list_of_selects->children)

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
using PredicateExpressions = std::vector<ASTPtr>;
using ProjectionWithAlias = std::pair<ASTPtr, String>;
using ProjectionsWithAliases = std::vector<ProjectionWithAlias>;
using SubQueriesProjectionColumns = std::map<IAST *, ProjectionsWithAliases>;
using SubqueriesProjectionColumns = std::map<IAST *, ProjectionsWithAliases>;
/** This class provides functions for Push-Down predicate expressions
@ -61,7 +61,7 @@ private:
bool optimizeExpression(const ASTPtr & outer_expression, ASTPtr & subquery_expression, ASTSelectQuery * subquery);
bool optimizeImpl(ASTPtr & outer_expression, SubQueriesProjectionColumns & sub_queries_projection_columns, bool is_prewhere);
bool optimizeImpl(ASTPtr & outer_expression, SubqueriesProjectionColumns & subqueries_projection_columns, bool is_prewhere);
bool cannotPushDownOuterPredicate(
const ProjectionsWithAliases & subquery_projection_columns, ASTSelectQuery * subquery,
@ -72,9 +72,9 @@ private:
ASTPtr & inner_predicate);
void getAllSubqueryProjectionColumns(IAST * node, SubQueriesProjectionColumns & all_subquery_projection_columns);
void getAllSubqueryProjectionColumns(IAST * node, SubqueriesProjectionColumns & all_subquery_projection_columns);
void getSubqueryProjectionColumns(IAST * subquery, SubQueriesProjectionColumns & all_subquery_projection_columns, ASTs & output_projections);
void getSubqueryProjectionColumns(IAST * subquery, SubqueriesProjectionColumns & all_subquery_projection_columns, ASTs & output_projections);
};
}

View File

@ -51,7 +51,7 @@ namespace DB
*/
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1024
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
class Context;
class QueryLog;

View File

@ -165,9 +165,9 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type)
if (typeid_cast<const DataTypeInt64 *>(&type)) return convertNumericType<Int64>(src, type);
if (typeid_cast<const DataTypeFloat32 *>(&type)) return convertNumericType<Float32>(src, type);
if (typeid_cast<const DataTypeFloat64 *>(&type)) return convertNumericType<Float64>(src, type);
if (auto * ptype = typeid_cast<const DataTypeDecimal<Dec32> *>(&type)) return convertDecimalType(src, *ptype);
if (auto * ptype = typeid_cast<const DataTypeDecimal<Dec64> *>(&type)) return convertDecimalType(src, *ptype);
if (auto * ptype = typeid_cast<const DataTypeDecimal<Dec128> *>(&type)) return convertDecimalType(src, *ptype);
if (auto * ptype = typeid_cast<const DataTypeDecimal<Decimal32> *>(&type)) return convertDecimalType(src, *ptype);
if (auto * ptype = typeid_cast<const DataTypeDecimal<Decimal64> *>(&type)) return convertDecimalType(src, *ptype);
if (auto * ptype = typeid_cast<const DataTypeDecimal<Decimal128> *>(&type)) return convertDecimalType(src, *ptype);
const bool is_date = typeid_cast<const DataTypeDate *>(&type);
bool is_datetime = false;

View File

@ -48,5 +48,7 @@ add_check(in_join_subqueries_preprocessor)
add_executable (users users.cpp)
target_link_libraries (users dbms ${Boost_FILESYSTEM_LIBRARY})
add_executable (internal_iotop internal_iotop.cpp)
target_link_libraries (internal_iotop dbms)
if (OS_LINUX)
add_executable (internal_iotop internal_iotop.cpp)
target_link_libraries (internal_iotop dbms)
endif ()

View File

@ -52,16 +52,9 @@ void do_io(size_t id)
int tid = TaskStatsInfoGetter::getCurrentTID();
TaskStatsInfoGetter get_info;
if (!get_info.tryGetStat(stat, tid))
{
std::lock_guard<std::mutex> lock(mutex);
std::cerr << "#" << id << ", tid " << tid << ". Can't get stat\n";
}
else
{
std::lock_guard<std::mutex> lock(mutex);
std::cerr << "#" << id << ", tid " << tid << ", intitial\n" << stat << "\n";
}
get_info.getStat(stat, tid);
std::lock_guard<std::mutex> lock(mutex);
std::cerr << "#" << id << ", tid " << tid << ", intitial\n" << stat << "\n";
size_t copy_size = 1048576 * (1 + id);
std::string path_dst = "test_out_" + std::to_string(id);

View File

@ -13,8 +13,8 @@ bool ParserUnionQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expected & exp
if (!ParserSubquery().parse(pos, node, expected) && !ParserSelectQuery().parse(pos, node, expected))
return false;
if (auto * ast_sub_query = typeid_cast<ASTSubquery *>(node.get()))
node = ast_sub_query->children.at(0);
if (auto * ast_subquery = typeid_cast<ASTSubquery *>(node.get()))
node = ast_subquery->children.at(0);
return true;
}

View File

@ -157,6 +157,11 @@ public:
return res;
}
/** Returns stage to which query is going to be processed in read() function.
* (Normally, the function only reads the columns from the list, but in other cases,
* for example, the request can be partially processed on a remote server.)
*/
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const { return QueryProcessingStage::FetchColumns; }
/** Read a set of columns from the table.
* Accepts a list of columns to read, as well as a description of the query,
@ -164,9 +169,7 @@ public:
* (indexes, locks, etc.)
* Returns a stream with which you can read data sequentially
* or multiple streams for parallel data reading.
* The `processed_stage` info is also written to what stage the request was processed.
* (Normally, the function only reads the columns from the list, but in other cases,
* for example, the request can be partially processed on a remote server.)
* The `processed_stage` must be the result of getQueryProcessingStage() function.
*
* context contains settings for one query.
* Usually Storage does not care about these settings, since they are used in the interpreter.
@ -181,7 +184,7 @@ public:
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
@ -344,6 +347,20 @@ protected:
using ITableDeclaration::ITableDeclaration;
using std::enable_shared_from_this<IStorage>::shared_from_this;
void checkQueryProcessingStage(QueryProcessingStage::Enum processed_stage, const Context & context)
{
auto expected_stage = getQueryProcessingStage(context);
checkQueryProcessingStage(processed_stage, expected_stage);
}
void checkQueryProcessingStage(QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum expected_stage)
{
if (processed_stage != expected_stage)
throw Exception("Unexpected query processing stage for storage " + getName() +
": expected " + QueryProcessingStage::toString(expected_stage) +
", got " + QueryProcessingStage::toString(processed_stage), ErrorCodes::LOGICAL_ERROR);
}
private:
friend class TableStructureReadLock;

View File

@ -269,12 +269,12 @@ BlockInputStreams StorageKafka::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
if (num_consumers == 0)
return BlockInputStreams();

View File

@ -39,7 +39,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -36,7 +36,7 @@ void BackgroundProcessingPoolTaskInfo::wake()
Poco::Timestamp current_time;
{
std::unique_lock<std::mutex> lock(pool.tasks_mutex);
std::unique_lock lock(pool.tasks_mutex);
auto next_time_to_execute = iterator->first;
auto this_task_handle = iterator->second;
@ -58,12 +58,6 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
{
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with " << size << " threads");
/// Put all threads to one thread group
/// The master thread exits immediately
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
CurrentThread::detachQuery();
threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
@ -77,7 +71,7 @@ BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Tas
Poco::Timestamp current_time;
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
res->iterator = tasks.emplace(current_time, res);
}
@ -93,11 +87,11 @@ void BackgroundProcessingPool::removeTask(const TaskHandle & task)
/// Wait for all executions of this task.
{
std::unique_lock<std::shared_mutex> wlock(task->rwlock);
std::unique_lock wlock(task->rwlock);
}
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
tasks.erase(task->iterator);
}
}
@ -122,10 +116,22 @@ void BackgroundProcessingPool::threadFunction()
{
setThreadName("BackgrProcPool");
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
{
std::lock_guard lock(tasks_mutex);
if (thread_group)
{
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
}
else
{
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
}
}
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
pcg64 rng(randomSeed());
@ -141,7 +147,7 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp min_time;
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
if (!tasks.empty())
{
@ -162,7 +168,7 @@ void BackgroundProcessingPool::threadFunction()
if (!task)
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::duration<double>(sleep_seconds
+ std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
@ -173,12 +179,12 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp current_time;
if (min_time > current_time)
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock, std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, sleep_seconds_random_part * 1000000)(rng)));
}
std::shared_lock<std::shared_mutex> rlock(task->rwlock);
std::shared_lock rlock(task->rwlock);
if (task->removed)
continue;
@ -202,7 +208,7 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + (done_work ? 0 : sleep_seconds * 1000000);
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
if (task->removed)
continue;

View File

@ -3,10 +3,6 @@
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Common/FieldVisitors.h>

View File

@ -6,6 +6,7 @@
#include <Columns/ColumnArray.h>
#include <Common/typeid_cast.h>
#include <ext/range.h>
#include <DataTypes/DataTypeNothing.h>
namespace DB
@ -20,8 +21,7 @@ namespace ErrorCodes
MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
MergeTreeData & storage,
const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column_name,
const PrewhereInfoPtr & prewhere_info,
UInt64 max_block_size_rows,
UInt64 preferred_block_size_bytes,
UInt64 preferred_max_column_in_block_size_bytes,
@ -32,8 +32,7 @@ MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
const Names & virt_column_names)
:
storage(storage),
prewhere_actions(prewhere_actions),
prewhere_column_name(prewhere_column_name),
prewhere_info(prewhere_info),
max_block_size_rows(max_block_size_rows),
preferred_block_size_bytes(preferred_block_size_bytes),
preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes),
@ -117,20 +116,20 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (!task->range_reader.isInitialized())
{
if (prewhere_actions)
if (prewhere_info)
{
if (reader->getColumns().empty())
{
task->range_reader = MergeTreeRangeReader(
pre_reader.get(), index_granularity, nullptr, prewhere_actions,
&prewhere_column_name, &task->ordered_names,
pre_reader.get(), index_granularity, nullptr, prewhere_info->prewhere_actions,
&prewhere_info->prewhere_column_name, &task->ordered_names,
task->should_reorder, task->remove_prewhere_column, true);
}
else
{
task->pre_range_reader = MergeTreeRangeReader(
pre_reader.get(), index_granularity, nullptr, prewhere_actions,
&prewhere_column_name, &task->ordered_names,
pre_reader.get(), index_granularity, nullptr, prewhere_info->prewhere_actions,
&prewhere_info->prewhere_column_name, &task->ordered_names,
task->should_reorder, task->remove_prewhere_column, false);
task->range_reader = MergeTreeRangeReader(
@ -141,7 +140,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
else
{
task->range_reader = MergeTreeRangeReader(
reader.get(), index_granularity, nullptr, prewhere_actions,
reader.get(), index_granularity, nullptr, nullptr,
nullptr, &task->ordered_names, task->should_reorder, false, true);
}
}
@ -167,10 +166,10 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
task->size_predictor->update(read_result.block);
}
if (read_result.block && prewhere_actions && !task->remove_prewhere_column)
if (read_result.block && prewhere_info && !task->remove_prewhere_column)
{
/// Convert const column to full here because it's cheaper to filter const column than full.
auto & column = read_result.block.getByName(prewhere_column_name);
auto & column = read_result.block.getByName(prewhere_info->prewhere_column_name);
column.column = column.column->convertToFullColumnIfConst();
}
@ -215,6 +214,20 @@ void MergeTreeBaseBlockInputStream::injectVirtualColumns(Block & block) const
}
void MergeTreeBaseBlockInputStream::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info)
{
if (prewhere_info)
{
prewhere_info->prewhere_actions->execute(block);
if (prewhere_info->remove_prewhere_column)
block.erase(prewhere_info->prewhere_column_name);
if (!block)
block.insert({nullptr, std::make_shared<DataTypeNothing>(), "_nothing"});
}
}
MergeTreeBaseBlockInputStream::~MergeTreeBaseBlockInputStream() = default;
}

View File

@ -3,6 +3,7 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
@ -18,8 +19,7 @@ class MergeTreeBaseBlockInputStream : public IProfilingBlockInputStream
public:
MergeTreeBaseBlockInputStream(
MergeTreeData & storage,
const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
UInt64 max_block_size_rows,
UInt64 preferred_block_size_bytes,
UInt64 preferred_max_column_in_block_size_bytes,
@ -31,8 +31,10 @@ public:
~MergeTreeBaseBlockInputStream() override;
static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info);
protected:
Block readImpl() override final;
Block readImpl() final;
/// Creates new this->task, and initilizes readers
virtual bool getNewTask() = 0;
@ -47,8 +49,7 @@ protected:
protected:
MergeTreeData & storage;
ExpressionActionsPtr prewhere_actions;
String prewhere_column_name;
PrewhereInfoPtr prewhere_info;
UInt64 max_block_size_rows;
UInt64 preferred_block_size_bytes;

View File

@ -24,8 +24,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
Names column_names,
const MarkRanges & mark_ranges_,
bool use_uncompressed_cache_,
ExpressionActionsPtr prewhere_actions_,
String prewhere_column_,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,
size_t min_bytes_to_use_direct_io_,
size_t max_read_buffer_size_,
@ -34,10 +33,10 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
size_t part_index_in_query_,
bool quiet)
:
MergeTreeBaseBlockInputStream{storage_, prewhere_actions_, prewhere_column_, max_block_size_rows_,
MergeTreeBaseBlockInputStream{storage_, prewhere_info, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names},
ordered_names{column_names},
required_columns{column_names},
data_part{owned_data_part_},
part_columns_lock(data_part->columns_lock),
all_mark_ranges(mark_ranges_),
@ -61,7 +60,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
addTotalRowsApprox(total_rows);
header = storage.getSampleBlockForColumns(ordered_names);
header = storage.getSampleBlockForColumns(required_columns);
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
/// NOTE: We may use similar code to implement non blocking ALTERs.
@ -79,6 +78,9 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
}
injectVirtualColumns(header);
executePrewhereActions(header, prewhere_info);
ordered_names = getHeader().getNames();
}
@ -99,15 +101,15 @@ try
}
is_first_task = false;
Names pre_column_names, column_names = ordered_names;
bool remove_prewhere_column = false;
Names pre_column_names;
Names column_names = required_columns;
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(storage, data_part, column_names).empty();
if (prewhere_actions)
if (prewhere_info)
{
pre_column_names = prewhere_actions->getRequiredColumns();
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
@ -117,9 +119,6 @@ try
should_reorder = true;
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
/// If the expression in PREWHERE is not a column of the table, you do not need to output a column with it
/// (from storage expect to receive only the columns of the table).
remove_prewhere_column = !pre_name_set.count(prewhere_column_name);
Names post_column_names;
for (const auto & name : column_names)
@ -159,9 +158,9 @@ try
auto size_predictor = (preferred_block_size_bytes == 0) ? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
task = std::make_unique<MergeTreeReadTask>(data_part, remaining_mark_ranges, part_index_in_query, ordered_names,
column_name_set, columns, pre_columns, remove_prewhere_column, should_reorder,
std::move(size_predictor));
task = std::make_unique<MergeTreeReadTask>(
data_part, remaining_mark_ranges, part_index_in_query, ordered_names, column_name_set, columns, pre_columns,
prewhere_info && prewhere_info->remove_prewhere_column, should_reorder, std::move(size_predictor));
if (!reader)
{
@ -175,7 +174,7 @@ try
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (prewhere_actions)
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, data_part, pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
@ -24,8 +25,7 @@ public:
Names column_names,
const MarkRanges & mark_ranges,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
String prewhere_column,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,
size_t min_bytes_to_use_direct_io,
size_t max_read_buffer_size,
@ -51,6 +51,8 @@ private:
Block header;
/// Used by Task
Names required_columns;
/// Names from header. Used in order to order columns in read blocks.
Names ordered_names;
NameSet column_name_set;
NamesAndTypesList columns;

View File

@ -1208,7 +1208,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
MarkRanges ranges{MarkRange(0, part->marks_count)};
BlockInputStreamPtr part_in = std::make_shared<MergeTreeBlockInputStream>(
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges,
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
false, nullptr, false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
auto compression_settings = this->context.chooseCompressionSettings(
part->bytes_on_disk,

View File

@ -20,7 +20,7 @@
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
#include <DataStreams/ApplyingMutationsBlockInputStream.h>
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBufferFromFile.h>
@ -572,6 +572,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
auto compression_settings = data.context.chooseCompressionSettings(
merge_entry->total_size_bytes_compressed,
static_cast<double> (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
String rows_sources_file_path;
std::unique_ptr<WriteBuffer> rows_sources_uncompressed_write_buf;
std::unique_ptr<WriteBuffer> rows_sources_write_buf;
@ -603,7 +612,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
{
auto input = std::make_unique<MergeTreeBlockInputStream>(
data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->marks_count)),
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
false, nullptr, true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback(MergeProgressCallback(
merge_entry, sum_input_rows_upper_bound, column_sizes, watch_prev_elapsed, merge_alg));
@ -674,10 +683,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (deduplicate)
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, SizeLimits(), 0 /*limit_hint*/, Names());
auto compression_settings = data.context.chooseCompressionSettings(
merge_entry->total_size_bytes_compressed,
static_cast<double> (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream to{
data, new_part_tmp_path, merging_columns, compression_settings, merged_column_to_size, aio_threshold};
@ -747,7 +752,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
{
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
false, nullptr, true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep(
merge_entry, sum_input_rows_exact, column_sizes, column_name, watch_prev_elapsed));
@ -880,7 +885,8 @@ static BlockInputStreamPtr createInputStreamWithMutatedData(
select->select_expression_list = std::make_shared<ASTExpressionList>();
select->children.push_back(select->select_expression_list);
select->select_expression_list->children.push_back(std::make_shared<ASTAsterisk>());
for (const auto & column : storage->getColumns().getAllPhysical())
select->select_expression_list->children.push_back(std::make_shared<ASTIdentifier>(column.name));
/// For all commands that are in front of the list and are DELETE commands, we can push them down
/// to the SELECT statement and remove them from commands.
@ -969,6 +975,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
String new_part_tmp_path = new_data_part->getFullPath();
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
auto compression_settings = context.chooseCompressionSettings(
source_part->bytes_on_disk,
static_cast<double>(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes());
auto in = createInputStreamWithMutatedData(storage_from_source_part, commands, context_for_reading);
if (data.hasPrimaryKey())
@ -979,10 +994,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
auto compression_settings = context.chooseCompressionSettings(
source_part->bytes_on_disk,
static_cast<double>(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream out(data, new_part_tmp_path, all_columns, compression_settings);
MergeTreeDataPart::MinMaxIndex minmax_idx;

View File

@ -135,13 +135,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned num_streams,
Int64 max_block_number_to_read) const
{
return readFromParts(
data.getDataPartsVector(), column_names_to_return, query_info, context, processed_stage,
data.getDataPartsVector(), column_names_to_return, query_info, context,
max_block_size, num_streams, max_block_number_to_read);
}
@ -150,7 +149,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
const Names & column_names_to_return,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned num_streams,
Int64 max_block_number_to_read) const
@ -207,7 +205,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
std::multiset<String> part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
data.check(real_column_names);
processed_stage = QueryProcessingStage::FetchColumns;
const Settings & settings = context.getSettingsRef();
Names primary_sort_columns = data.getPrimarySortColumns();
@ -510,23 +507,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
LOG_DEBUG(log, "MinMax index condition: " << minmax_idx_condition->toString());
/// PREWHERE
ExpressionActionsPtr prewhere_actions;
String prewhere_column;
if (select.prewhere_expression)
{
ExpressionAnalyzer analyzer(select.prewhere_expression, context, nullptr, available_real_columns);
prewhere_actions = analyzer.getActions(false);
prewhere_column = select.prewhere_expression->getColumnName();
SubqueriesForSets prewhere_subqueries = analyzer.getSubqueriesForSets();
/** Compute the subqueries right now.
* NOTE Disadvantage - these calculations do not fit into the query execution pipeline.
* They are done before the execution of the pipeline; they can not be interrupted; during the computation, packets of progress are not sent.
*/
if (!prewhere_subqueries.empty())
CreatingSetsBlockInputStream(std::make_shared<NullBlockInputStream>(Block()), prewhere_subqueries,
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode)).read();
}
RangesInDataParts parts_with_ranges;
@ -583,8 +566,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
prewhere_actions,
prewhere_column,
query_info.prewhere_info,
virt_column_names,
settings);
}
@ -596,8 +578,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
prewhere_actions,
prewhere_column,
query_info.prewhere_info,
virt_column_names,
settings);
}
@ -622,8 +603,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
const Names & column_names,
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings) const
{
@ -658,7 +638,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size());
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_actions, prewhere_column, true,
num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_info, true,
column_names, MergeTreeReadPool::BackoffSettings(settings), settings.preferred_block_size_bytes, false);
/// Let's estimate total number of rows for progress bar.
@ -670,7 +650,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
res.emplace_back(std::make_shared<MergeTreeThreadBlockInputStream>(
i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache,
prewhere_actions, prewhere_column, settings, virt_columns));
prewhere_info, settings, virt_columns));
if (i == 0)
{
@ -744,7 +724,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io,
use_uncompressed_cache, prewhere_info, true, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query);
res.push_back(source_stream);
@ -763,8 +743,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
const Names & column_names,
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings) const
{
@ -790,7 +769,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
virt_columns, part.part_index_in_query);
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.getPrimaryExpression()));

View File

@ -26,7 +26,6 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams,
Int64 max_block_number_to_read) const;
@ -36,7 +35,6 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams,
Int64 max_block_number_to_read) const;
@ -52,8 +50,7 @@ private:
const Names & column_names,
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings) const;
@ -62,8 +59,7 @@ private:
const Names & column_names,
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings) const;

View File

@ -1,10 +1,12 @@
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Columns/FilterDescription.h>
#include <ext/range.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnNothing.h>
#include <ext/range.h>
#if __SSE2__
#include <emmintrin.h>
#include <DataTypes/DataTypeNothing.h>
#endif
namespace DB
@ -436,7 +438,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
merge_tree_reader->evaluateMissingDefaults(read_result.block);
if (should_reorder || always_reorder || block.columns())
merge_tree_reader->reorderColumns(read_result.block, *ordered_names);
merge_tree_reader->reorderColumns(read_result.block, *ordered_names, prewhere_column_name);
}
}
else
@ -452,7 +454,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
merge_tree_reader->evaluateMissingDefaults(read_result.block);
if (should_reorder || always_reorder)
merge_tree_reader->reorderColumns(read_result.block, *ordered_names);
merge_tree_reader->reorderColumns(read_result.block, *ordered_names, prewhere_column_name);
}
}
@ -611,23 +613,25 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
if (!result.block)
return;
auto getNumRows = [&]()
{
/// If block has single column, it's filter. We need to count bytes in it in order to get the number of rows.
if (result.block.columns() > 1)
return result.block.rows();
else if (result.getFilter())
return countBytesInFilter(result.getFilter()->getData());
else
return prev_rows;
};
if (remove_prewhere_column)
result.block.erase(*prewhere_column_name);
else
{
/// Calculate the number of rows in block in order to create const column.
size_t rows = result.block.rows();
/// If block has single column, it's filter. We need to count bytes in it in order to get the number of rows.
if (result.block.columns() == 1)
{
if (result.getFilter())
rows = countBytesInFilter(result.getFilter()->getData());
else
rows = prev_rows;
}
prewhere_column.column = prewhere_column.type->createColumnConst(getNumRows(), UInt64(1));
prewhere_column.column = prewhere_column.type->createColumnConst(rows, UInt64(1));
}
/// If block is empty, create column in order to store rows number.
if (last_reader_in_chain && result.block.columns() == 0)
result.block.insert({ColumnNothing::create(getNumRows()), std::make_shared<DataTypeNothing>(), "_nothing"});
}
}

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <ext/range.h>
#include <Storages/MergeTree/MergeTreeBaseBlockInputStream.h>
namespace ProfileEvents
@ -15,19 +16,20 @@ namespace DB
MergeTreeReadPool::MergeTreeReadPool(
const size_t threads, const size_t sum_marks, const size_t min_marks_for_concurrent_read,
RangesInDataParts parts, MergeTreeData & data, const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column_name, const bool check_columns, const Names & column_names,
RangesInDataParts parts, MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
const bool check_columns, const Names & column_names,
const BackoffSettings & backoff_settings, size_t preferred_block_size_bytes,
const bool do_not_steal_tasks)
: backoff_settings{backoff_settings}, backoff_state{threads}, data{data},
column_names{column_names}, do_not_steal_tasks{do_not_steal_tasks}, predict_block_size_bytes{preferred_block_size_bytes > 0}
column_names{column_names}, do_not_steal_tasks{do_not_steal_tasks},
predict_block_size_bytes{preferred_block_size_bytes > 0}, prewhere_info{prewhere_info}
{
const auto per_part_sum_marks = fillPerPartInfo(parts, prewhere_actions, prewhere_column_name, check_columns);
const auto per_part_sum_marks = fillPerPartInfo(parts, prewhere_info, check_columns);
fillPerThreadInfo(threads, sum_marks, per_part_sum_marks, parts, min_marks_for_concurrent_read);
}
MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, const size_t thread)
MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, const size_t thread, const Names & ordered_names)
{
const std::lock_guard<std::mutex> lock{mutex};
@ -111,9 +113,9 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
: std::make_unique<MergeTreeBlockSizePredictor>(*per_part_size_predictor[part_idx]); /// make a copy
return std::make_unique<MergeTreeReadTask>(
part.data_part, ranges_to_get_from_part, part.part_index_in_query, column_names,
part.data_part, ranges_to_get_from_part, part.part_index_in_query, ordered_names,
per_part_column_name_set[part_idx], per_part_columns[part_idx], per_part_pre_columns[part_idx],
per_part_remove_prewhere_column[part_idx], per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
prewhere_info && prewhere_info->remove_prewhere_column, per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
}
@ -122,7 +124,6 @@ Block MergeTreeReadPool::getHeader() const
return data.getSampleBlockForColumns(column_names);
}
void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInfo info)
{
if (backoff_settings.min_read_latency_ms == 0 || do_not_steal_tasks)
@ -165,8 +166,7 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf
std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
RangesInDataParts & parts, const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name,
const bool check_columns)
RangesInDataParts & parts, const PrewhereInfoPtr & prewhere_info, const bool check_columns)
{
std::vector<size_t> per_part_sum_marks;
Block sample_block = data.getSampleBlock();
@ -193,10 +193,10 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
Names required_pre_column_names;
if (prewhere_actions)
if (prewhere_info)
{
/// collect columns required for PREWHERE evaluation
required_pre_column_names = prewhere_actions->getRequiredColumns();
required_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
/// there must be at least one column required for PREWHERE
if (required_pre_column_names.empty())
@ -208,13 +208,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
should_reoder = true;
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const NameSet pre_name_set{
std::begin(required_pre_column_names), std::end(required_pre_column_names)
};
/** If expression in PREWHERE is not table column, then no need to return column with it to caller
* (because storage is expected only to read table columns).
*/
per_part_remove_prewhere_column.push_back(0 == pre_name_set.count(prewhere_column_name));
const NameSet pre_name_set(required_pre_column_names.begin(), required_pre_column_names.end());
Names post_column_names;
for (const auto & name : required_column_names)
@ -223,8 +217,6 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
required_column_names = post_column_names;
}
else
per_part_remove_prewhere_column.push_back(false);
per_part_column_name_set.emplace_back(std::begin(required_column_names), std::end(required_column_names));

View File

@ -3,6 +3,7 @@
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/SelectQueryInfo.h>
#include <mutex>
@ -66,12 +67,12 @@ private:
public:
MergeTreeReadPool(
const size_t threads, const size_t sum_marks, const size_t min_marks_for_concurrent_read,
RangesInDataParts parts, MergeTreeData & data, const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column_name, const bool check_columns, const Names & column_names,
RangesInDataParts parts, MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
const bool check_columns, const Names & column_names,
const BackoffSettings & backoff_settings, size_t preferred_block_size_bytes,
const bool do_not_steal_tasks = false);
MergeTreeReadTaskPtr getTask(const size_t min_marks_to_read, const size_t thread);
MergeTreeReadTaskPtr getTask(const size_t min_marks_to_read, const size_t thread, const Names & ordered_names);
/** Each worker could call this method and pass information about read performance.
* If read performance is too low, pool could decide to lower number of threads: do not assign more tasks to several threads.
@ -83,8 +84,7 @@ public:
private:
std::vector<size_t> fillPerPartInfo(
RangesInDataParts & parts, const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name,
const bool check_columns);
RangesInDataParts & parts, const PrewhereInfoPtr & prewhere_info, const bool check_columns);
void fillPerThreadInfo(
const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
@ -93,15 +93,15 @@ private:
std::vector<std::shared_lock<std::shared_mutex>> per_part_columns_lock;
MergeTreeData & data;
Names column_names;
Names ordered_names;
bool do_not_steal_tasks;
bool predict_block_size_bytes;
std::vector<NameSet> per_part_column_name_set;
std::vector<NamesAndTypesList> per_part_columns;
std::vector<NamesAndTypesList> per_part_pre_columns;
/// @todo actually all of these values are either true or false for the whole query, thus no vector required
std::vector<char> per_part_remove_prewhere_column;
std::vector<char> per_part_should_reorder;
std::vector<MergeTreeBlockSizePredictorPtr> per_part_size_predictor;
PrewhereInfoPtr prewhere_info;
struct Part
{

View File

@ -427,6 +427,7 @@ void MergeTreeReader::readData(
}
settings.getter = get_stream_getter(false);
settings.continuous_reading = continue_reading;
auto & deserialize_state = deserialize_binary_bulk_state_map[name];
type.deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, settings, deserialize_state);
IDataType::updateAvgValueSizeHint(column, avg_value_size_hint);
@ -542,7 +543,7 @@ void MergeTreeReader::fillMissingColumns(Block & res, bool & should_reorder, boo
}
}
void MergeTreeReader::reorderColumns(Block & res, const Names & ordered_names)
void MergeTreeReader::reorderColumns(Block & res, const Names & ordered_names, const String * filter_name)
{
try
{
@ -552,6 +553,9 @@ void MergeTreeReader::reorderColumns(Block & res, const Names & ordered_names)
if (res.has(name))
ordered_block.insert(res.getByName(name));
if (filter_name && !ordered_block.has(*filter_name) && res.has(*filter_name))
ordered_block.insert(res.getByName(*filter_name));
std::swap(res, ordered_block);
}
catch (Exception & e)

View File

@ -45,7 +45,8 @@ public:
/// If at least one column was added, reorders all columns in the block according to ordered_names.
void fillMissingColumns(Block & res, bool & should_reorder, bool & should_evaluate_missing_defaults);
/// Sort columns to ensure consistent order among all blocks.
void reorderColumns(Block & res, const Names & ordered_names);
/// If filter_name is not nullptr and block has filter column, move it to the end of block.
void reorderColumns(Block & res, const Names & ordered_names, const String * filter_name);
/// Evaluate defaulted columns if necessary.
void evaluateMissingDefaults(Block & res);

View File

@ -16,12 +16,11 @@ MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
size_t preferred_max_column_in_block_size_bytes,
MergeTreeData & storage,
const bool use_uncompressed_cache,
const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Settings & settings,
const Names & virt_column_names)
:
MergeTreeBaseBlockInputStream{storage, prewhere_actions, prewhere_column, max_block_size_rows,
MergeTreeBaseBlockInputStream{storage, prewhere_info, max_block_size_rows,
preferred_block_size_bytes, preferred_max_column_in_block_size_bytes, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, use_uncompressed_cache, true, virt_column_names},
thread{thread},
@ -35,6 +34,8 @@ MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
}
else
min_marks_to_read = min_marks_to_read_;
ordered_names = getHeader().getNames();
}
@ -42,6 +43,7 @@ Block MergeTreeThreadBlockInputStream::getHeader() const
{
auto res = pool->getHeader();
injectVirtualColumns(res);
executePrewhereActions(res, prewhere_info);
return res;
}
@ -49,7 +51,7 @@ Block MergeTreeThreadBlockInputStream::getHeader() const
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool MergeTreeThreadBlockInputStream::getNewTask()
{
task = pool->getTask(min_marks_to_read, thread);
task = pool->getTask(min_marks_to_read, thread, ordered_names);
if (!task)
{
@ -78,7 +80,7 @@ bool MergeTreeThreadBlockInputStream::getNewTask()
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback);
if (prewhere_actions)
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
storage, task->mark_ranges, min_bytes_to_use_direct_io,
@ -92,7 +94,7 @@ bool MergeTreeThreadBlockInputStream::getNewTask()
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size,
reader->getAvgValueSizeHints(), profile_callback);
if (prewhere_actions)
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
storage, task->mark_ranges, min_bytes_to_use_direct_io,

View File

@ -23,8 +23,7 @@ public:
size_t preferred_max_column_in_block_size_bytes,
MergeTreeData & storage,
const bool use_uncompressed_cache,
const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Settings & settings,
const Names & virt_column_names);
@ -38,11 +37,15 @@ protected:
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool getNewTask() override;
private:
/// "thread" index (there are N threads and each thread is assigned index in interval [0..N-1])
size_t thread;
std::shared_ptr<MergeTreeReadPool> pool;
size_t min_marks_to_read;
/// Names from header. Used in order to order columns in read blocks.
Names ordered_names;
};
}

View File

@ -298,16 +298,19 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
MergeTreeData::DataPart::Checksums * additional_column_checksums)
{
/// Finish columns serialization.
auto & settings = storage.context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
if (!serialization_states.empty())
{
serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
auto & settings = storage.context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
}
}
if (!total_column_list)

View File

@ -1,7 +1,7 @@
#pragma once
#include <thread>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
#include <Common/ZooKeeper/Types.h>
#include <Core/Types.h>
#include <common/logger_useful.h>

View File

@ -1,5 +1,4 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Interpreters/PartLog.h>
@ -309,7 +308,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
/// Lock nodes have been already deleted, do not delete them in destructor
block_number_lock->assumeUnlocked();
}
else if (zkutil::isUserError(multi_code))
else if (ZooKeeperImpl::ZooKeeper::isUserError(multi_code))
{
String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp();
@ -338,7 +337,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
else if (zkutil::isHardwareError(multi_code))
else if (ZooKeeperImpl::ZooKeeper::isHardwareError(multi_code))
{
transaction.rollback();
throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': "

View File

@ -4,10 +4,9 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <common/logger_useful.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
#include <thread>
#include <map>
#include <pcg_random.hpp>

View File

@ -10,7 +10,7 @@
#include <Poco/Event.h>
#include <Core/Types.h>
#include <common/logger_useful.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
namespace DB
{

View File

@ -10,7 +10,7 @@
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
namespace DB

View File

@ -52,34 +52,6 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
check_period_ms = storage.data.settings.check_delay_period * 1000;
task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); });
task->schedule();
}
ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread()
{
try
{
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
task->deactivate();
LOG_TRACE(log, "Restarting thread finished");
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
storage.fetcher.blocker.cancelForever();
storage.merger_mutator.actions_blocker.cancelForever();
/// Stop other tasks.
partialShutdown();
if (storage.queue_task_handle)
storage.context.getBackgroundPool().removeTask(storage.queue_task_handle);
storage.queue_task_handle.reset();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
void ReplicatedMergeTreeRestartingThread::run()
@ -215,10 +187,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.alter_thread.start();
storage.part_check_thread.start();
if (!storage.queue_task_handle)
storage.queue_task_handle = storage.context.getBackgroundPool().addTask(
std::bind(&StorageReplicatedMergeTree::queueTask, &storage));
return true;
}
catch (...)
@ -368,4 +336,16 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
LOG_TRACE(log, "Threads finished");
}
void ReplicatedMergeTreeRestartingThread::shutdown()
{
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
task->deactivate();
LOG_TRACE(log, "Restarting thread finished");
/// Stop other tasks.
partialShutdown();
}
}

View File

@ -2,7 +2,7 @@
#include <Poco/Event.h>
#include <common/logger_useful.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
#include <Core/Types.h>
#include <thread>
#include <atomic>
@ -23,10 +23,17 @@ class ReplicatedMergeTreeRestartingThread
{
public:
ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeRestartingThread();
void start()
{
task->activate();
task->schedule();
}
void wakeup() { task->schedule(); }
void shutdown();
private:
StorageReplicatedMergeTree & storage;
String log_name;

View File

@ -22,12 +22,12 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams) override
{
return MergeTreeDataSelectExecutor(part->storage).readFromParts(
{part}, column_names, query_info, context, processed_stage, max_block_size, num_streams, 0);
{part}, column_names, query_info, context, max_block_size, num_streams, 0);
}
protected:

View File

@ -10,12 +10,29 @@ namespace DB
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class Set;
using SetPtr = std::shared_ptr<Set>;
/// Information about calculated sets in right hand side of IN.
using PreparedSets = std::unordered_map<StringRange, SetPtr, StringRangePointersHash, StringRangePointersEqualTo>;
struct PrewhereInfo
{
/// Actions which are executed on block in order to get filter column for prewhere step.
ExpressionActionsPtr prewhere_actions;
String prewhere_column_name;
bool remove_prewhere_column = false;
PrewhereInfo() = default;
explicit PrewhereInfo(ExpressionActionsPtr prewhere_actions_, String prewhere_column_name_)
: prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
/** Query along with some additional data,
* that can be used during query processing
@ -25,6 +42,8 @@ struct SelectQueryInfo
{
ASTPtr query;
PrewhereInfoPtr prewhere_info;
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)
PreparedSets sets;

Some files were not shown because too many files have changed in this diff Show More