mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 16:50:48 +00:00
Merge branch 'master' of github.com:yandex/ClickHouse
This commit is contained in:
commit
bb1909667c
@ -626,161 +626,164 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
|
||||
for (const auto & listen_host : listen_hosts)
|
||||
{
|
||||
/// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file.
|
||||
uint16_t listen_port = 0;
|
||||
try
|
||||
auto create_server = [&](const char * port_name, auto && func)
|
||||
{
|
||||
/// HTTP
|
||||
if (config().has("http_port"))
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
listen_port = config().getInt("http_port");
|
||||
auto address = socket_bind_listen(socket, listen_host, listen_port);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
|
||||
new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params));
|
||||
/// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file.
|
||||
if (!config().has(port_name))
|
||||
return;
|
||||
|
||||
LOG_INFO(log, "Listening http://" + address.toString());
|
||||
auto port = config().getInt(port_name);
|
||||
try
|
||||
{
|
||||
func(port);
|
||||
}
|
||||
|
||||
/// HTTPS
|
||||
if (config().has("https_port"))
|
||||
catch (const Poco::Exception &)
|
||||
{
|
||||
#if USE_POCO_NETSSL
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
listen_port = config().getInt("https_port");
|
||||
auto address = socket_bind_listen(socket, listen_host, listen_port, /* secure = */ true);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
|
||||
new HTTPHandlerFactory(*this, "HTTPSHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params));
|
||||
std::string message = "Listen [" + listen_host + "]:" + std::to_string(port) + " failed: " + getCurrentExceptionMessage(false);
|
||||
|
||||
LOG_INFO(log, "Listening https://" + address.toString());
|
||||
if (listen_try)
|
||||
{
|
||||
LOG_ERROR(log, message
|
||||
<< ". If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
|
||||
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
|
||||
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
|
||||
" Example for disabled IPv4: <listen_host>::</listen_host>");
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception{message, ErrorCodes::NETWORK_ERROR};
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// HTTP
|
||||
create_server("http_port", [&](UInt16 port)
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socket_bind_listen(socket, listen_host, port);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
|
||||
new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params));
|
||||
|
||||
LOG_INFO(log, "Listening http://" + address.toString());
|
||||
});
|
||||
|
||||
/// HTTPS
|
||||
create_server("https_port", [&](UInt16 port)
|
||||
{
|
||||
#if USE_POCO_NETSSL
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
|
||||
new HTTPHandlerFactory(*this, "HTTPSHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params));
|
||||
|
||||
LOG_INFO(log, "Listening https://" + address.toString());
|
||||
#else
|
||||
throw Exception{"HTTPS protocol is disabled because Poco library was built without NetSSL support.",
|
||||
throw Exception{"HTTPS protocol is disabled because Poco library was built without NetSSL support.",
|
||||
ErrorCodes::SUPPORT_IS_DISABLED};
|
||||
#endif
|
||||
});
|
||||
|
||||
/// TCP
|
||||
create_server("tcp_port", [&](UInt16 port)
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socket_bind_listen(socket, listen_host, port);
|
||||
socket.setReceiveTimeout(settings.receive_timeout);
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
|
||||
new TCPHandlerFactory(*this),
|
||||
server_pool,
|
||||
socket,
|
||||
new Poco::Net::TCPServerParams));
|
||||
|
||||
LOG_INFO(log, "Listening for connections with native protocol (tcp): " + address.toString());
|
||||
});
|
||||
|
||||
/// TCP with SSL
|
||||
create_server("tcp_port_secure", [&](UInt16 port)
|
||||
{
|
||||
#if USE_POCO_NETSSL
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
|
||||
socket.setReceiveTimeout(settings.receive_timeout);
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
|
||||
new TCPHandlerFactory(*this, /* secure= */ true),
|
||||
server_pool,
|
||||
socket,
|
||||
new Poco::Net::TCPServerParams));
|
||||
LOG_INFO(log, "Listening for connections with secure native protocol (tcp_secure): " + address.toString());
|
||||
#else
|
||||
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
|
||||
ErrorCodes::SUPPORT_IS_DISABLED};
|
||||
#endif
|
||||
});
|
||||
|
||||
/// Interserver IO HTTP
|
||||
create_server("interserver_http_port", [&](UInt16 port)
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socket_bind_listen(socket, listen_host, port);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
|
||||
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params));
|
||||
|
||||
LOG_INFO(log, "Listening for replica communication (interserver) http://" + address.toString());
|
||||
});
|
||||
|
||||
create_server("interserver_https_port", [&](UInt16 port)
|
||||
{
|
||||
#if USE_POCO_NETSSL
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
|
||||
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params));
|
||||
|
||||
LOG_INFO(log, "Listening for secure replica communication (interserver) https://" + address.toString());
|
||||
#else
|
||||
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
|
||||
ErrorCodes::SUPPORT_IS_DISABLED};
|
||||
#endif
|
||||
}
|
||||
});
|
||||
|
||||
/// TCP
|
||||
if (config().has("tcp_port"))
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
listen_port = config().getInt("tcp_port");
|
||||
auto address = socket_bind_listen(socket, listen_host, listen_port);
|
||||
socket.setReceiveTimeout(settings.receive_timeout);
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
|
||||
new TCPHandlerFactory(*this),
|
||||
server_pool,
|
||||
socket,
|
||||
new Poco::Net::TCPServerParams));
|
||||
|
||||
LOG_INFO(log, "Listening for connections with native protocol (tcp): " + address.toString());
|
||||
}
|
||||
|
||||
/// TCP with SSL
|
||||
if (config().has("tcp_port_secure"))
|
||||
{
|
||||
create_server("mysql_port", [&](UInt16 port)
|
||||
{
|
||||
#if USE_POCO_NETSSL
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
listen_port = config().getInt("tcp_port_secure");
|
||||
auto address = socket_bind_listen(socket, listen_host, listen_port, /* secure = */ true);
|
||||
socket.setReceiveTimeout(settings.receive_timeout);
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
|
||||
new TCPHandlerFactory(*this, /* secure= */ true),
|
||||
server_pool,
|
||||
socket,
|
||||
new Poco::Net::TCPServerParams));
|
||||
LOG_INFO(log, "Listening for connections with secure native protocol (tcp_secure): " + address.toString());
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
|
||||
socket.setReceiveTimeout(Poco::Timespan());
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
|
||||
new MySQLHandlerFactory(*this),
|
||||
server_pool,
|
||||
socket,
|
||||
new Poco::Net::TCPServerParams));
|
||||
|
||||
LOG_INFO(log, "Listening for MySQL compatibility protocol: " + address.toString());
|
||||
#else
|
||||
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
|
||||
throw Exception{"SSL support for MySQL protocol is disabled because Poco library was built without NetSSL support.",
|
||||
ErrorCodes::SUPPORT_IS_DISABLED};
|
||||
#endif
|
||||
}
|
||||
|
||||
/// At least one of TCP and HTTP servers must be created.
|
||||
if (servers.empty())
|
||||
throw Exception("No 'tcp_port' and 'http_port' is specified in configuration file.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
|
||||
|
||||
/// Interserver IO HTTP
|
||||
if (config().has("interserver_http_port"))
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
listen_port = config().getInt("interserver_http_port");
|
||||
auto address = socket_bind_listen(socket, listen_host, listen_port);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
|
||||
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params));
|
||||
|
||||
LOG_INFO(log, "Listening for replica communication (interserver) http://" + address.toString());
|
||||
}
|
||||
|
||||
if (config().has("interserver_https_port"))
|
||||
{
|
||||
#if USE_POCO_NETSSL
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
listen_port = config().getInt("interserver_https_port");
|
||||
auto address = socket_bind_listen(socket, listen_host, listen_port, /* secure = */ true);
|
||||
socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
socket.setSendTimeout(settings.http_send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
|
||||
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params));
|
||||
|
||||
LOG_INFO(log, "Listening for secure replica communication (interserver) https://" + address.toString());
|
||||
#else
|
||||
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
|
||||
ErrorCodes::SUPPORT_IS_DISABLED};
|
||||
#endif
|
||||
}
|
||||
|
||||
if (config().has("mysql_port"))
|
||||
{
|
||||
#if USE_POCO_NETSSL
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socket_bind_listen(socket, listen_host, config().getInt("mysql_port"), /* secure = */ true);
|
||||
socket.setReceiveTimeout(Poco::Timespan());
|
||||
socket.setSendTimeout(settings.send_timeout);
|
||||
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
|
||||
new MySQLHandlerFactory(*this),
|
||||
server_pool,
|
||||
socket,
|
||||
new Poco::Net::TCPServerParams));
|
||||
|
||||
LOG_INFO(log, "Listening for MySQL compatibility protocol: " + address.toString());
|
||||
#else
|
||||
throw Exception{"SSL support for MySQL protocol is disabled because Poco library was built without NetSSL support.",
|
||||
ErrorCodes::SUPPORT_IS_DISABLED};
|
||||
#endif
|
||||
}
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
std::string message = "Listen [" + listen_host + "]:" + std::to_string(listen_port) + " failed: " + std::to_string(e.code()) + ": " + e.what() + ": " + e.message();
|
||||
if (listen_try)
|
||||
LOG_ERROR(log, message
|
||||
<< " If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
|
||||
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
|
||||
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
|
||||
" Example for disabled IPv4: <listen_host>::</listen_host>");
|
||||
else
|
||||
throw Exception{message, ErrorCodes::NETWORK_ERROR};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (servers.empty())
|
||||
@ -818,10 +821,13 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
current_connections += server->currentConnections();
|
||||
}
|
||||
|
||||
LOG_DEBUG(log,
|
||||
LOG_INFO(log,
|
||||
"Closed all listening sockets."
|
||||
<< (current_connections ? " Waiting for " + toString(current_connections) + " outstanding connections." : ""));
|
||||
|
||||
/// Killing remaining queries.
|
||||
global_context->getProcessList().killAllQueries();
|
||||
|
||||
if (current_connections)
|
||||
{
|
||||
const int sleep_max_ms = 1000 * config().getInt("shutdown_wait_unfinished", 5);
|
||||
@ -839,13 +845,24 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG(
|
||||
LOG_INFO(
|
||||
log, "Closed connections." << (current_connections ? " But " + toString(current_connections) + " remains."
|
||||
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>" : ""));
|
||||
|
||||
dns_cache_updater.reset();
|
||||
main_config_reloader.reset();
|
||||
users_config_reloader.reset();
|
||||
|
||||
if (current_connections)
|
||||
{
|
||||
/// There is no better way to force connections to close in Poco.
|
||||
/// Otherwise connection handlers will continue to live
|
||||
/// (they are effectively dangling objects, but they use global thread pool
|
||||
/// and global thread pool destructor will wait for threads, preventing server shutdown).
|
||||
|
||||
LOG_INFO(log, "Will shutdown forcefully.");
|
||||
_exit(Application::EXIT_OK);
|
||||
}
|
||||
});
|
||||
|
||||
/// try to load dictionaries immediately, throw on error and die
|
||||
|
@ -59,10 +59,13 @@ void TCPHandler::runImpl()
|
||||
connection_context = server.context();
|
||||
connection_context.makeSessionContext();
|
||||
|
||||
Settings global_settings = connection_context.getSettings();
|
||||
/// These timeouts can be changed after receiving query.
|
||||
|
||||
socket().setReceiveTimeout(global_settings.receive_timeout);
|
||||
socket().setSendTimeout(global_settings.send_timeout);
|
||||
auto global_receive_timeout = connection_context.getSettingsRef().receive_timeout;
|
||||
auto global_send_timeout = connection_context.getSettingsRef().send_timeout;
|
||||
|
||||
socket().setReceiveTimeout(global_receive_timeout);
|
||||
socket().setSendTimeout(global_send_timeout);
|
||||
socket().setNoDelay(true);
|
||||
|
||||
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
|
||||
@ -74,6 +77,7 @@ void TCPHandler::runImpl()
|
||||
return;
|
||||
}
|
||||
|
||||
/// User will be authenticated here. It will also set settings from user profile into connection_context.
|
||||
try
|
||||
{
|
||||
receiveHello();
|
||||
@ -117,6 +121,8 @@ void TCPHandler::runImpl()
|
||||
connection_context.setCurrentDatabase(default_database);
|
||||
}
|
||||
|
||||
Settings connection_settings = connection_context.getSettings();
|
||||
|
||||
sendHello();
|
||||
|
||||
connection_context.setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
|
||||
@ -126,9 +132,10 @@ void TCPHandler::runImpl()
|
||||
/// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down.
|
||||
{
|
||||
Stopwatch idle_time;
|
||||
while (!static_cast<ReadBufferFromPocoSocket &>(*in).poll(global_settings.poll_interval * 1000000) && !server.isCancelled())
|
||||
while (!server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(
|
||||
std::min(connection_settings.poll_interval, connection_settings.idle_connection_timeout) * 1000000))
|
||||
{
|
||||
if (idle_time.elapsedSeconds() > global_settings.idle_connection_timeout)
|
||||
if (idle_time.elapsedSeconds() > connection_settings.idle_connection_timeout)
|
||||
{
|
||||
LOG_TRACE(log, "Closing idle connection");
|
||||
return;
|
||||
@ -182,13 +189,13 @@ void TCPHandler::runImpl()
|
||||
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level.value);
|
||||
}
|
||||
|
||||
query_context->setExternalTablesInitializer([&global_settings, this] (Context & context)
|
||||
query_context->setExternalTablesInitializer([&connection_settings, this] (Context & context)
|
||||
{
|
||||
if (&context != &*query_context)
|
||||
throw Exception("Unexpected context in external tables initializer", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
/// Get blocks of temporary tables
|
||||
readData(global_settings);
|
||||
readData(connection_settings);
|
||||
|
||||
/// Reset the input stream, as we received an empty block while receiving external table data.
|
||||
/// So, the stream has been marked as cancelled and we can't read from it anymore.
|
||||
@ -210,7 +217,7 @@ void TCPHandler::runImpl()
|
||||
|
||||
/// Does the request require receive data from client?
|
||||
if (state.need_receive_data_for_insert)
|
||||
processInsertQuery(global_settings);
|
||||
processInsertQuery(connection_settings);
|
||||
else if (state.io.pipeline.initialized())
|
||||
processOrdinaryQueryWithProcessors(query_context->getSettingsRef().max_threads);
|
||||
else
|
||||
@ -317,12 +324,12 @@ void TCPHandler::runImpl()
|
||||
}
|
||||
|
||||
|
||||
void TCPHandler::readData(const Settings & global_settings)
|
||||
void TCPHandler::readData(const Settings & connection_settings)
|
||||
{
|
||||
const auto receive_timeout = query_context->getSettingsRef().receive_timeout.value;
|
||||
|
||||
/// Poll interval should not be greater than receive_timeout
|
||||
const size_t default_poll_interval = global_settings.poll_interval.value * 1000000;
|
||||
const size_t default_poll_interval = connection_settings.poll_interval.value * 1000000;
|
||||
size_t current_poll_interval = static_cast<size_t>(receive_timeout.totalMicroseconds());
|
||||
constexpr size_t min_poll_interval = 5000; // 5 ms
|
||||
size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval));
|
||||
@ -372,7 +379,7 @@ void TCPHandler::readData(const Settings & global_settings)
|
||||
}
|
||||
|
||||
|
||||
void TCPHandler::processInsertQuery(const Settings & global_settings)
|
||||
void TCPHandler::processInsertQuery(const Settings & connection_settings)
|
||||
{
|
||||
/** Made above the rest of the lines, so that in case of `writePrefix` function throws an exception,
|
||||
* client receive exception before sending data.
|
||||
@ -393,7 +400,7 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
|
||||
/// Send block to the client - table structure.
|
||||
sendData(state.io.out->getHeader());
|
||||
|
||||
readData(global_settings);
|
||||
readData(connection_settings);
|
||||
state.io.out->writeSuffix();
|
||||
state.io.onFinish();
|
||||
}
|
||||
|
@ -11,7 +11,7 @@
|
||||
#endif
|
||||
|
||||
#include <pcg_random.hpp>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
|
||||
#if !defined(__APPLE__) && !defined(__FreeBSD__)
|
||||
#include <malloc.h>
|
||||
@ -86,10 +86,8 @@ struct RandomHint
|
||||
{
|
||||
void * mmap_hint()
|
||||
{
|
||||
return reinterpret_cast<void *>(std::uniform_int_distribution<intptr_t>(0x100000000000UL, 0x700000000000UL)(rng));
|
||||
return reinterpret_cast<void *>(std::uniform_int_distribution<intptr_t>(0x100000000000UL, 0x700000000000UL)(thread_local_rng));
|
||||
}
|
||||
private:
|
||||
pcg64 rng{randomSeed()};
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,6 @@
|
||||
#include "QueryProfiler.h"
|
||||
|
||||
#include <random>
|
||||
#include <pcg_random.hpp>
|
||||
#include <common/Pipe.h>
|
||||
#include <common/phdr_cache.h>
|
||||
#include <common/config_common.h>
|
||||
@ -10,7 +9,7 @@
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/WriteBufferFromFileDescriptor.h>
|
||||
|
||||
@ -63,7 +62,6 @@ namespace
|
||||
constexpr size_t QUERY_ID_MAX_LEN = 1024;
|
||||
|
||||
thread_local size_t write_trace_iteration = 0;
|
||||
thread_local pcg64 rng{randomSeed()};
|
||||
|
||||
void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * info, void * context)
|
||||
{
|
||||
@ -161,7 +159,7 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const
|
||||
/// It will allow to sample short queries even if timer period is large.
|
||||
/// (For example, with period of 1 second, query with 50 ms duration will be sampled with 1 / 20 probability).
|
||||
/// It also helps to avoid interference (moire).
|
||||
UInt32 period_rand = std::uniform_int_distribution<UInt32>(0, period)(rng);
|
||||
UInt32 period_rand = std::uniform_int_distribution<UInt32>(0, period)(thread_local_rng);
|
||||
|
||||
struct timespec interval{.tv_sec = period / TIMER_PRECISION, .tv_nsec = period % TIMER_PRECISION};
|
||||
struct timespec offset{.tv_sec = period_rand / TIMER_PRECISION, .tv_nsec = period_rand % TIMER_PRECISION};
|
||||
|
@ -4,14 +4,13 @@
|
||||
#include "TestKeeper.h"
|
||||
|
||||
#include <random>
|
||||
#include <pcg_random.hpp>
|
||||
#include <functional>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/PODArray.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
#include <Poco/Net/NetException.h>
|
||||
@ -159,8 +158,7 @@ struct ZooKeeperArgs
|
||||
}
|
||||
|
||||
/// Shuffle the hosts to distribute the load among ZooKeeper nodes.
|
||||
pcg64 rng(randomSeed());
|
||||
std::shuffle(hosts_strings.begin(), hosts_strings.end(), rng);
|
||||
std::shuffle(hosts_strings.begin(), hosts_strings.end(), thread_local_rng);
|
||||
|
||||
for (auto & host : hosts_strings)
|
||||
{
|
||||
|
4
dbms/src/Common/thread_local_rng.cpp
Normal file
4
dbms/src/Common/thread_local_rng.cpp
Normal file
@ -0,0 +1,4 @@
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <Common/randomSeed.h>
|
||||
|
||||
thread_local pcg64 thread_local_rng{randomSeed()};
|
5
dbms/src/Common/thread_local_rng.h
Normal file
5
dbms/src/Common/thread_local_rng.h
Normal file
@ -0,0 +1,5 @@
|
||||
#pragma once
|
||||
#include <pcg_random.hpp>
|
||||
|
||||
/// Fairly good thread-safe random number generator, but probably slow-down thread creation a little.
|
||||
extern thread_local pcg64 thread_local_rng;
|
@ -101,6 +101,7 @@ struct PerformanceStatistics
|
||||
|
||||
Element data[NUM_ELEMENTS];
|
||||
|
||||
/// It's Ok that generator is not seeded.
|
||||
pcg64 rng;
|
||||
|
||||
/// To select from different algorithms we use a kind of "bandits" algorithm.
|
||||
|
@ -246,26 +246,17 @@ class PacketPayloadWriteBuffer : public WriteBuffer
|
||||
{
|
||||
public:
|
||||
PacketPayloadWriteBuffer(WriteBuffer & out, size_t payload_length, uint8_t & sequence_id)
|
||||
: WriteBuffer(out.position(), 0)
|
||||
, out(out)
|
||||
, sequence_id(sequence_id)
|
||||
, total_left(payload_length)
|
||||
: WriteBuffer(out.position(), 0), out(out), sequence_id(sequence_id), total_left(payload_length)
|
||||
{
|
||||
startPacket();
|
||||
startNewPacket();
|
||||
setWorkingBuffer();
|
||||
pos = out.position();
|
||||
}
|
||||
|
||||
void checkPayloadSize()
|
||||
bool remainingPayloadSize()
|
||||
{
|
||||
if (bytes_written + offset() < payload_length)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Incomplete payload. Written " << bytes << " bytes, expected " << payload_length << " bytes.";
|
||||
throw Exception(ss.str(), 0);
|
||||
|
||||
}
|
||||
return total_left;
|
||||
}
|
||||
|
||||
~PacketPayloadWriteBuffer() override { next(); }
|
||||
private:
|
||||
WriteBuffer & out;
|
||||
uint8_t & sequence_id;
|
||||
@ -273,8 +264,9 @@ private:
|
||||
size_t total_left = 0;
|
||||
size_t payload_length = 0;
|
||||
size_t bytes_written = 0;
|
||||
bool eof = false;
|
||||
|
||||
void startPacket()
|
||||
void startNewPacket()
|
||||
{
|
||||
payload_length = std::min(total_left, MAX_PACKET_LENGTH);
|
||||
bytes_written = 0;
|
||||
@ -282,33 +274,38 @@ private:
|
||||
|
||||
out.write(reinterpret_cast<char *>(&payload_length), 3);
|
||||
out.write(sequence_id++);
|
||||
|
||||
working_buffer = WriteBuffer::Buffer(out.position(), out.position() + std::min(payload_length - bytes_written, out.available()));
|
||||
pos = working_buffer.begin();
|
||||
bytes += 4;
|
||||
}
|
||||
|
||||
/// Sets working buffer to the rest of current packet payload.
|
||||
void setWorkingBuffer()
|
||||
{
|
||||
out.nextIfAtEnd();
|
||||
working_buffer = WriteBuffer::Buffer(out.position(), out.position() + std::min(payload_length - bytes_written, out.available()));
|
||||
|
||||
if (payload_length - bytes_written == 0)
|
||||
{
|
||||
/// Finished writing packet. Due to an implementation of WriteBuffer, working_buffer cannot be empty. Further write attempts will throw Exception.
|
||||
eof = true;
|
||||
working_buffer.resize(1);
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
void nextImpl() override
|
||||
{
|
||||
int written = pos - working_buffer.begin();
|
||||
const int written = pos - working_buffer.begin();
|
||||
if (eof)
|
||||
throw Exception("Cannot write after end of buffer.", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER);
|
||||
|
||||
out.position() += written;
|
||||
bytes_written += written;
|
||||
|
||||
if (bytes_written < payload_length)
|
||||
{
|
||||
out.nextIfAtEnd();
|
||||
working_buffer = WriteBuffer::Buffer(out.position(), out.position() + std::min(payload_length - bytes_written, out.available()));
|
||||
}
|
||||
else if (total_left > 0 || payload_length == MAX_PACKET_LENGTH)
|
||||
{
|
||||
// Starting new packet, since packets of size greater than MAX_PACKET_LENGTH should be split.
|
||||
startPacket();
|
||||
}
|
||||
else
|
||||
{
|
||||
// Finished writing packet. Buffer is set to empty to prevent rewriting (pos will be set to the beginning of a working buffer in next()).
|
||||
// Further attempts to write will stall in the infinite loop.
|
||||
working_buffer = WriteBuffer::Buffer(out.position(), out.position());
|
||||
}
|
||||
/// Packets of size greater than MAX_PACKET_LENGTH are split into few packets of size MAX_PACKET_LENGTH and las packet of size < MAX_PACKET_LENGTH.
|
||||
if (bytes_written == payload_length && (total_left > 0 || payload_length == MAX_PACKET_LENGTH))
|
||||
startNewPacket();
|
||||
|
||||
setWorkingBuffer();
|
||||
}
|
||||
};
|
||||
|
||||
@ -320,7 +317,13 @@ public:
|
||||
{
|
||||
PacketPayloadWriteBuffer buf(buffer, getPayloadSize(), sequence_id);
|
||||
writePayloadImpl(buf);
|
||||
buf.checkPayloadSize();
|
||||
buf.next();
|
||||
if (buf.remainingPayloadSize())
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Incomplete payload. Written " << getPayloadSize() - buf.remainingPayloadSize() << " bytes, expected " << getPayloadSize() << " bytes.";
|
||||
throw Exception(ss.str(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
virtual ~WritePacket() = default;
|
||||
|
@ -255,6 +255,10 @@ static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_i
|
||||
if (desired_microseconds > total_elapsed_microseconds)
|
||||
{
|
||||
UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds;
|
||||
|
||||
/// Never sleep more than one second (it should be enough to limit speed for a reasonable amount, and otherwise it's too easy to make query hang).
|
||||
sleep_microseconds = std::min(UInt64(1000000), sleep_microseconds);
|
||||
|
||||
sleepForMicroseconds(sleep_microseconds);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds);
|
||||
@ -349,7 +353,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
|
||||
ErrorCodes::TOO_SLOW);
|
||||
|
||||
/// If the predicted execution time is longer than `max_execution_time`.
|
||||
if (limits.max_execution_time != 0 && total_rows)
|
||||
if (limits.max_execution_time != 0 && total_rows && progress.read_rows)
|
||||
{
|
||||
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows) / progress.read_rows);
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
#include <random>
|
||||
#include <pcg_random.hpp>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <DataStreams/ConcatBlockInputStream.h>
|
||||
|
||||
|
||||
@ -21,8 +20,7 @@ BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t wid
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
distribution[i] = i % width;
|
||||
|
||||
pcg64 generator(randomSeed());
|
||||
std::shuffle(distribution.begin(), distribution.end(), generator);
|
||||
std::shuffle(distribution.begin(), distribution.end(), thread_local_rng);
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
partitions[distribution[i]].push_back(inputs[i]);
|
||||
|
@ -55,16 +55,18 @@ void MySQLWireBlockOutputStream::write(const Block & block)
|
||||
|
||||
void MySQLWireBlockOutputStream::writeSuffix()
|
||||
{
|
||||
QueryStatus * process_list_elem = context.getProcessListElement();
|
||||
CurrentThread::finalizePerformanceCounters();
|
||||
QueryStatusInfo info = process_list_elem->getInfo();
|
||||
size_t affected_rows = info.written_rows;
|
||||
|
||||
size_t affected_rows = 0;
|
||||
std::stringstream human_readable_info;
|
||||
human_readable_info << std::fixed << std::setprecision(3)
|
||||
<< "Read " << info.read_rows << " rows, " << formatReadableSizeWithBinarySuffix(info.read_bytes) << " in " << info.elapsed_seconds << " sec., "
|
||||
<< static_cast<size_t>(info.read_rows / info.elapsed_seconds) << " rows/sec., "
|
||||
<< formatReadableSizeWithBinarySuffix(info.read_bytes / info.elapsed_seconds) << "/sec.";
|
||||
if (QueryStatus * process_list_elem = context.getProcessListElement())
|
||||
{
|
||||
CurrentThread::finalizePerformanceCounters();
|
||||
QueryStatusInfo info = process_list_elem->getInfo();
|
||||
affected_rows = info.written_rows;
|
||||
human_readable_info << std::fixed << std::setprecision(3)
|
||||
<< "Read " << info.read_rows << " rows, " << formatReadableSizeWithBinarySuffix(info.read_bytes) << " in " << info.elapsed_seconds << " sec., "
|
||||
<< static_cast<size_t>(info.read_rows / info.elapsed_seconds) << " rows/sec., "
|
||||
<< formatReadableSizeWithBinarySuffix(info.read_bytes / info.elapsed_seconds) << "/sec.";
|
||||
}
|
||||
|
||||
if (header.columns() == 0)
|
||||
packet_sender.sendPacket(OK_Packet(0x0, context.mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
|
||||
|
210
dbms/src/Interpreters/CollectJoinOnKeysVisitor.cpp
Normal file
210
dbms/src/Interpreters/CollectJoinOnKeysVisitor.cpp
Normal file
@ -0,0 +1,210 @@
|
||||
#include <Parsers/queryToString.h>
|
||||
|
||||
#include <Interpreters/CollectJoinOnKeysVisitor.h>
|
||||
#include <Interpreters/IdentifierSemantic.h>
|
||||
#include <Interpreters/AnalyzedJoin.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int INVALID_JOIN_ON_EXPRESSION;
|
||||
extern const int AMBIGUOUS_COLUMN_NAME;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
void CollectJoinOnKeysMatcher::Data::addJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast,
|
||||
const std::pair<size_t, size_t> & table_no)
|
||||
{
|
||||
ASTPtr left = left_ast->clone();
|
||||
ASTPtr right = right_ast->clone();
|
||||
|
||||
if (table_no.first == 1 || table_no.second == 2)
|
||||
analyzed_join.addOnKeys(left, right);
|
||||
else if (table_no.first == 2 || table_no.second == 1)
|
||||
analyzed_join.addOnKeys(right, left);
|
||||
else
|
||||
throw Exception("Cannot detect left and right JOIN keys. JOIN ON section is ambiguous.",
|
||||
ErrorCodes::AMBIGUOUS_COLUMN_NAME);
|
||||
has_some = true;
|
||||
}
|
||||
|
||||
void CollectJoinOnKeysMatcher::Data::addAsofJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast,
|
||||
const std::pair<size_t, size_t> & table_no)
|
||||
{
|
||||
if (table_no.first == 1 || table_no.second == 2)
|
||||
{
|
||||
asof_left_key = left_ast->clone();
|
||||
asof_right_key = right_ast->clone();
|
||||
return;
|
||||
}
|
||||
|
||||
throw Exception("ASOF JOIN for (left_table.x <= right_table.x) is not implemented", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
void CollectJoinOnKeysMatcher::Data::asofToJoinKeys()
|
||||
{
|
||||
if (!asof_left_key || !asof_right_key)
|
||||
throw Exception("No inequality in ASOF JOIN ON section.", ErrorCodes::INVALID_JOIN_ON_EXPRESSION);
|
||||
addJoinKeys(asof_left_key, asof_right_key, {1, 2});
|
||||
}
|
||||
|
||||
|
||||
void CollectJoinOnKeysMatcher::visit(const ASTFunction & func, const ASTPtr & ast, Data & data)
|
||||
{
|
||||
if (func.name == "and")
|
||||
return; /// go into children
|
||||
|
||||
if (func.name == "equals")
|
||||
{
|
||||
ASTPtr left = func.arguments->children.at(0);
|
||||
ASTPtr right = func.arguments->children.at(1);
|
||||
auto table_numbers = getTableNumbers(ast, left, right, data);
|
||||
data.addJoinKeys(left, right, table_numbers);
|
||||
return;
|
||||
}
|
||||
|
||||
bool less_or_equals = (func.name == "lessOrEquals");
|
||||
bool greater_or_equals = (func.name == "greaterOrEquals");
|
||||
|
||||
if (data.is_asof && (less_or_equals || greater_or_equals))
|
||||
{
|
||||
if (data.asof_left_key || data.asof_right_key)
|
||||
throwSyntaxException("ASOF JOIN expects exactly one inequality in ON section, unexpected " + queryToString(ast) + ".");
|
||||
|
||||
ASTPtr left = func.arguments->children.at(0);
|
||||
ASTPtr right = func.arguments->children.at(1);
|
||||
auto table_numbers = getTableNumbers(ast, left, right, data);
|
||||
|
||||
if (greater_or_equals)
|
||||
data.addAsofJoinKeys(left, right, table_numbers);
|
||||
else
|
||||
data.addAsofJoinKeys(right, left, std::make_pair(table_numbers.second, table_numbers.first));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
throwSyntaxException("Expected equals expression, got " + queryToString(ast) + ".");
|
||||
}
|
||||
|
||||
void CollectJoinOnKeysMatcher::getIdentifiers(const ASTPtr & ast, std::vector<const ASTIdentifier *> & out)
|
||||
{
|
||||
if (const auto * ident = ast->as<ASTIdentifier>())
|
||||
{
|
||||
if (IdentifierSemantic::getColumnName(*ident))
|
||||
out.push_back(ident);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const auto & child : ast->children)
|
||||
getIdentifiers(child, out);
|
||||
}
|
||||
|
||||
std::pair<size_t, size_t> CollectJoinOnKeysMatcher::getTableNumbers(const ASTPtr & expr, const ASTPtr & left_ast, const ASTPtr & right_ast,
|
||||
Data & data)
|
||||
{
|
||||
std::vector<const ASTIdentifier *> left_identifiers;
|
||||
std::vector<const ASTIdentifier *> right_identifiers;
|
||||
|
||||
getIdentifiers(left_ast, left_identifiers);
|
||||
getIdentifiers(right_ast, right_identifiers);
|
||||
|
||||
size_t left_idents_table = getTableForIdentifiers(left_identifiers, data);
|
||||
size_t right_idents_table = getTableForIdentifiers(right_identifiers, data);
|
||||
|
||||
if (left_idents_table && left_idents_table == right_idents_table)
|
||||
{
|
||||
auto left_name = queryToString(*left_identifiers[0]);
|
||||
auto right_name = queryToString(*right_identifiers[0]);
|
||||
|
||||
throwSyntaxException("In expression " + queryToString(expr) + " columns " + left_name + " and " + right_name
|
||||
+ " are from the same table but from different arguments of equal function.");
|
||||
}
|
||||
|
||||
return std::make_pair(left_idents_table, right_idents_table);
|
||||
}
|
||||
|
||||
const ASTIdentifier * CollectJoinOnKeysMatcher::unrollAliases(const ASTIdentifier * identifier, const Aliases & aliases)
|
||||
{
|
||||
if (identifier->compound())
|
||||
return identifier;
|
||||
|
||||
UInt32 max_attempts = 100;
|
||||
for (auto it = aliases.find(identifier->name); it != aliases.end();)
|
||||
{
|
||||
const ASTIdentifier * parent = identifier;
|
||||
identifier = it->second->as<ASTIdentifier>();
|
||||
if (!identifier)
|
||||
break; /// not a column alias
|
||||
if (identifier == parent)
|
||||
break; /// alias to itself with the same name: 'a as a'
|
||||
if (identifier->compound())
|
||||
break; /// not an alias. Break to prevent cycle through short names: 'a as b, t1.b as a'
|
||||
|
||||
it = aliases.find(identifier->name);
|
||||
if (!max_attempts--)
|
||||
throw Exception("Cannot unroll aliases for '" + identifier->name + "'", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
return identifier;
|
||||
}
|
||||
|
||||
/// @returns 1 if identifiers belongs to left table, 2 for right table and 0 if unknown. Throws on table mix.
|
||||
/// Place detected identifier into identifiers[0] if any.
|
||||
size_t CollectJoinOnKeysMatcher::getTableForIdentifiers(std::vector<const ASTIdentifier *> & identifiers, const Data & data)
|
||||
{
|
||||
size_t table_number = 0;
|
||||
|
||||
for (auto & ident : identifiers)
|
||||
{
|
||||
const ASTIdentifier * identifier = unrollAliases(ident, data.aliases);
|
||||
if (!identifier)
|
||||
continue;
|
||||
|
||||
/// Column name could be cropped to a short form in TranslateQualifiedNamesVisitor.
|
||||
/// In this case it saves membership in IdentifierSemantic.
|
||||
size_t membership = IdentifierSemantic::getMembership(*identifier);
|
||||
|
||||
if (!membership)
|
||||
{
|
||||
const String & name = identifier->name;
|
||||
bool in_left_table = data.source_columns.count(name);
|
||||
bool in_right_table = data.joined_columns.count(name);
|
||||
|
||||
if (in_left_table && in_right_table)
|
||||
throw Exception("Column '" + name + "' is ambiguous", ErrorCodes::AMBIGUOUS_COLUMN_NAME);
|
||||
|
||||
if (in_left_table)
|
||||
membership = 1;
|
||||
if (in_right_table)
|
||||
membership = 2;
|
||||
}
|
||||
|
||||
if (membership && table_number == 0)
|
||||
{
|
||||
table_number = membership;
|
||||
std::swap(ident, identifiers[0]); /// move first detected identifier to the first position
|
||||
}
|
||||
|
||||
if (membership && membership != table_number)
|
||||
{
|
||||
throw Exception("Invalid columns in JOIN ON section. Columns "
|
||||
+ identifiers[0]->getAliasOrColumnName() + " and " + ident->getAliasOrColumnName()
|
||||
+ " are from different tables.", ErrorCodes::INVALID_JOIN_ON_EXPRESSION);
|
||||
}
|
||||
}
|
||||
|
||||
return table_number;
|
||||
}
|
||||
|
||||
[[noreturn]] void CollectJoinOnKeysMatcher::throwSyntaxException(const String & msg)
|
||||
{
|
||||
throw Exception("Invalid expression for JOIN ON. " + msg +
|
||||
" Supported syntax: JOIN ON Expr([table.]column, ...) = Expr([table.]column, ...) "
|
||||
"[AND Expr([table.]column, ...) = Expr([table.]column, ...) ...]",
|
||||
ErrorCodes::INVALID_JOIN_ON_EXPRESSION);
|
||||
}
|
||||
|
||||
}
|
@ -1,23 +1,16 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Names.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
|
||||
#include <Interpreters/InDepthNodeVisitor.h>
|
||||
#include <Interpreters/Aliases.h>
|
||||
#include <Interpreters/SyntaxAnalyzer.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int INVALID_JOIN_ON_EXPRESSION;
|
||||
extern const int AMBIGUOUS_COLUMN_NAME;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
class ASTIdentifier;
|
||||
struct AnalyzedJoin;
|
||||
|
||||
class CollectJoinOnKeysMatcher
|
||||
{
|
||||
@ -30,7 +23,14 @@ public:
|
||||
const NameSet & source_columns;
|
||||
const NameSet & joined_columns;
|
||||
const Aliases & aliases;
|
||||
bool has_some = false;
|
||||
const bool is_asof;
|
||||
ASTPtr asof_left_key{};
|
||||
ASTPtr asof_right_key{};
|
||||
bool has_some{false};
|
||||
|
||||
void addJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast, const std::pair<size_t, size_t> & table_no);
|
||||
void addAsofJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast, const std::pair<size_t, size_t> & table_no);
|
||||
void asofToJoinKeys();
|
||||
};
|
||||
|
||||
static void visit(const ASTPtr & ast, Data & data)
|
||||
@ -48,146 +48,14 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
static void visit(const ASTFunction & func, const ASTPtr & ast, Data & data)
|
||||
{
|
||||
if (func.name == "and")
|
||||
return; /// go into children
|
||||
static void visit(const ASTFunction & func, const ASTPtr & ast, Data & data);
|
||||
|
||||
if (func.name == "equals")
|
||||
{
|
||||
ASTPtr left = func.arguments->children.at(0)->clone();
|
||||
ASTPtr right = func.arguments->children.at(1)->clone();
|
||||
addJoinKeys(ast, left, right, data);
|
||||
return;
|
||||
}
|
||||
static void getIdentifiers(const ASTPtr & ast, std::vector<const ASTIdentifier *> & out);
|
||||
static std::pair<size_t, size_t> getTableNumbers(const ASTPtr & expr, const ASTPtr & left_ast, const ASTPtr & right_ast, Data & data);
|
||||
static const ASTIdentifier * unrollAliases(const ASTIdentifier * identifier, const Aliases & aliases);
|
||||
static size_t getTableForIdentifiers(std::vector<const ASTIdentifier *> & identifiers, const Data & data);
|
||||
|
||||
throwSyntaxException("Expected equals expression, got " + queryToString(ast) + ".");
|
||||
}
|
||||
|
||||
static void getIdentifiers(const ASTPtr & ast, std::vector<const ASTIdentifier *> & out)
|
||||
{
|
||||
if (const auto * ident = ast->as<ASTIdentifier>())
|
||||
{
|
||||
if (IdentifierSemantic::getColumnName(*ident))
|
||||
out.push_back(ident);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const auto & child : ast->children)
|
||||
getIdentifiers(child, out);
|
||||
}
|
||||
|
||||
static void addJoinKeys(const ASTPtr & expr, ASTPtr left_ast, ASTPtr right_ast, Data & data)
|
||||
{
|
||||
std::vector<const ASTIdentifier *> left_identifiers;
|
||||
std::vector<const ASTIdentifier *> right_identifiers;
|
||||
|
||||
getIdentifiers(left_ast, left_identifiers);
|
||||
getIdentifiers(right_ast, right_identifiers);
|
||||
|
||||
size_t left_idents_table = getTableForIdentifiers(left_identifiers, data);
|
||||
size_t right_idents_table = getTableForIdentifiers(right_identifiers, data);
|
||||
|
||||
if (left_idents_table && left_idents_table == right_idents_table)
|
||||
{
|
||||
auto left_name = queryToString(*left_identifiers[0]);
|
||||
auto right_name = queryToString(*right_identifiers[0]);
|
||||
|
||||
throwSyntaxException("In expression " + queryToString(expr) + " columns " + left_name + " and " + right_name
|
||||
+ " are from the same table but from different arguments of equal function.");
|
||||
}
|
||||
|
||||
if (left_idents_table == 1 || right_idents_table == 2)
|
||||
data.analyzed_join.addOnKeys(left_ast, right_ast);
|
||||
else if (left_idents_table == 2 || right_idents_table == 1)
|
||||
data.analyzed_join.addOnKeys(right_ast, left_ast);
|
||||
else
|
||||
throw Exception("Cannot detect left and right JOIN keys. JOIN ON section is ambiguous.",
|
||||
ErrorCodes::AMBIGUOUS_COLUMN_NAME);
|
||||
|
||||
data.has_some = true;
|
||||
}
|
||||
|
||||
static const ASTIdentifier * unrollAliases(const ASTIdentifier * identifier, const Aliases & aliases)
|
||||
{
|
||||
if (identifier->compound())
|
||||
return identifier;
|
||||
|
||||
UInt32 max_attempts = 100;
|
||||
for (auto it = aliases.find(identifier->name); it != aliases.end();)
|
||||
{
|
||||
const ASTIdentifier * parent = identifier;
|
||||
identifier = it->second->as<ASTIdentifier>();
|
||||
if (!identifier)
|
||||
break; /// not a column alias
|
||||
if (identifier == parent)
|
||||
break; /// alias to itself with the same name: 'a as a'
|
||||
if (identifier->compound())
|
||||
break; /// not an alias. Break to prevent cycle through short names: 'a as b, t1.b as a'
|
||||
|
||||
it = aliases.find(identifier->name);
|
||||
if (!max_attempts--)
|
||||
throw Exception("Cannot unroll aliases for '" + identifier->name + "'", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
return identifier;
|
||||
}
|
||||
|
||||
/// @returns 1 if identifiers belongs to left table, 2 for right table and 0 if unknown. Throws on table mix.
|
||||
/// Place detected identifier into identifiers[0] if any.
|
||||
static size_t getTableForIdentifiers(std::vector<const ASTIdentifier *> & identifiers, const Data & data)
|
||||
{
|
||||
size_t table_number = 0;
|
||||
|
||||
for (auto & ident : identifiers)
|
||||
{
|
||||
const ASTIdentifier * identifier = unrollAliases(ident, data.aliases);
|
||||
if (!identifier)
|
||||
continue;
|
||||
|
||||
/// Column name could be cropped to a short form in TranslateQualifiedNamesVisitor.
|
||||
/// In this case it saves membership in IdentifierSemantic.
|
||||
size_t membership = IdentifierSemantic::getMembership(*identifier);
|
||||
|
||||
if (!membership)
|
||||
{
|
||||
const String & name = identifier->name;
|
||||
bool in_left_table = data.source_columns.count(name);
|
||||
bool in_right_table = data.joined_columns.count(name);
|
||||
|
||||
if (in_left_table && in_right_table)
|
||||
throw Exception("Column '" + name + "' is ambiguous", ErrorCodes::AMBIGUOUS_COLUMN_NAME);
|
||||
|
||||
if (in_left_table)
|
||||
membership = 1;
|
||||
if (in_right_table)
|
||||
membership = 2;
|
||||
}
|
||||
|
||||
if (membership && table_number == 0)
|
||||
{
|
||||
table_number = membership;
|
||||
std::swap(ident, identifiers[0]); /// move first detected identifier to the first position
|
||||
}
|
||||
|
||||
if (membership && membership != table_number)
|
||||
{
|
||||
throw Exception("Invalid columns in JOIN ON section. Columns "
|
||||
+ identifiers[0]->getAliasOrColumnName() + " and " + ident->getAliasOrColumnName()
|
||||
+ " are from different tables.", ErrorCodes::INVALID_JOIN_ON_EXPRESSION);
|
||||
}
|
||||
}
|
||||
|
||||
return table_number;
|
||||
}
|
||||
|
||||
[[noreturn]] static void throwSyntaxException(const String & msg)
|
||||
{
|
||||
throw Exception("Invalid expression for JOIN ON. " + msg +
|
||||
" Supported syntax: JOIN ON Expr([table.]column, ...) = Expr([table.]column, ...) "
|
||||
"[AND Expr([table.]column, ...) = Expr([table.]column, ...) ...]",
|
||||
ErrorCodes::INVALID_JOIN_ON_EXPRESSION);
|
||||
}
|
||||
[[noreturn]] static void throwSyntaxException(const String & msg);
|
||||
};
|
||||
|
||||
/// Parse JOIN ON expression and collect ASTs for joined columns.
|
||||
|
@ -5,12 +5,12 @@
|
||||
#include <Poco/Mutex.h>
|
||||
#include <Poco/UUID.h>
|
||||
#include <Poco/Net/IPAddress.h>
|
||||
#include <pcg_random.hpp>
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/formatReadable.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <Compression/ICompressionCodec.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
@ -205,8 +205,6 @@ struct ContextShared
|
||||
|
||||
Context::ApplicationType application_type = Context::ApplicationType::SERVER;
|
||||
|
||||
pcg64 rng{randomSeed()};
|
||||
|
||||
/// vector of xdbc-bridge commands, they will be killed when Context will be destroyed
|
||||
std::vector<std::unique_ptr<ShellCommand>> bridge_commands;
|
||||
|
||||
@ -1172,12 +1170,8 @@ void Context::setCurrentQueryId(const String & query_id)
|
||||
} words;
|
||||
} random;
|
||||
|
||||
{
|
||||
auto lock = getLock();
|
||||
|
||||
random.words.a = shared->rng();
|
||||
random.words.b = shared->rng();
|
||||
}
|
||||
random.words.a = thread_local_rng();
|
||||
random.words.b = thread_local_rng();
|
||||
|
||||
/// Use protected constructor.
|
||||
struct UUID : Poco::UUID
|
||||
|
@ -406,6 +406,15 @@ CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id,
|
||||
}
|
||||
|
||||
|
||||
void ProcessList::killAllQueries()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
for (auto & process : processes)
|
||||
process.cancelQuery(true);
|
||||
}
|
||||
|
||||
|
||||
QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_events, bool get_settings) const
|
||||
{
|
||||
QueryStatusInfo res;
|
||||
|
@ -315,6 +315,8 @@ public:
|
||||
|
||||
/// Try call cancel() for input and output streams of query with specified id and user
|
||||
CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill = false);
|
||||
|
||||
void killAllQueries();
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include <Parsers/ASTTablesInSelectQuery.h>
|
||||
#include <Parsers/ParserTablesInSelectQuery.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
|
||||
@ -509,11 +510,14 @@ void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & s
|
||||
for (const auto & col : analyzed_join.columns_from_joined_table)
|
||||
joined_columns.insert(col.original_name);
|
||||
|
||||
CollectJoinOnKeysVisitor::Data data{analyzed_join, source_columns, joined_columns, aliases};
|
||||
bool is_asof = (table_join.strictness == ASTTableJoin::Strictness::Asof);
|
||||
CollectJoinOnKeysVisitor::Data data{analyzed_join, source_columns, joined_columns, aliases, is_asof};
|
||||
CollectJoinOnKeysVisitor(data).visit(table_join.on_expression);
|
||||
if (!data.has_some)
|
||||
throw Exception("Cannot get JOIN keys from JOIN ON section: " + queryToString(table_join.on_expression),
|
||||
ErrorCodes::INVALID_JOIN_ON_EXPRESSION);
|
||||
if (is_asof)
|
||||
data.asofToJoinKeys();
|
||||
}
|
||||
|
||||
bool make_nullable = join_use_nulls && isLeftOrFull(table_join.kind);
|
||||
|
@ -71,16 +71,18 @@ void MySQLOutputFormat::consume(Chunk chunk)
|
||||
|
||||
void MySQLOutputFormat::finalize()
|
||||
{
|
||||
QueryStatus * process_list_elem = context.getProcessListElement();
|
||||
CurrentThread::finalizePerformanceCounters();
|
||||
QueryStatusInfo info = process_list_elem->getInfo();
|
||||
size_t affected_rows = info.written_rows;
|
||||
|
||||
size_t affected_rows = 0;
|
||||
std::stringstream human_readable_info;
|
||||
human_readable_info << std::fixed << std::setprecision(3)
|
||||
<< "Read " << info.read_rows << " rows, " << formatReadableSizeWithBinarySuffix(info.read_bytes) << " in " << info.elapsed_seconds << " sec., "
|
||||
<< static_cast<size_t>(info.read_rows / info.elapsed_seconds) << " rows/sec., "
|
||||
<< formatReadableSizeWithBinarySuffix(info.read_bytes / info.elapsed_seconds) << "/sec.";
|
||||
if (QueryStatus * process_list_elem = context.getProcessListElement())
|
||||
{
|
||||
CurrentThread::finalizePerformanceCounters();
|
||||
QueryStatusInfo info = process_list_elem->getInfo();
|
||||
affected_rows = info.written_rows;
|
||||
human_readable_info << std::fixed << std::setprecision(3)
|
||||
<< "Read " << info.read_rows << " rows, " << formatReadableSizeWithBinarySuffix(info.read_bytes) << " in " << info.elapsed_seconds << " sec., "
|
||||
<< static_cast<size_t>(info.read_rows / info.elapsed_seconds) << " rows/sec., "
|
||||
<< formatReadableSizeWithBinarySuffix(info.read_bytes / info.elapsed_seconds) << "/sec.";
|
||||
}
|
||||
|
||||
auto & header = getPort(PortKind::Main).getHeader();
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
#include <Storages/AlterCommands.h>
|
||||
@ -162,14 +163,6 @@ static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
|
||||
extern const int MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER = 5 * 60;
|
||||
|
||||
|
||||
/** For randomized selection of replicas. */
|
||||
/// avoid error: non-local variable 'DB::rng' declared '__thread' needs dynamic initialization
|
||||
#ifndef __APPLE__
|
||||
thread_local
|
||||
#endif
|
||||
pcg64 rng{randomSeed()};
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
|
||||
{
|
||||
std::lock_guard lock(current_zookeeper_mutex);
|
||||
@ -708,7 +701,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
|
||||
part->columns, part->checksums);
|
||||
|
||||
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
|
||||
std::shuffle(replicas.begin(), replicas.end(), rng);
|
||||
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
|
||||
bool has_been_already_added = false;
|
||||
|
||||
for (const String & replica : replicas)
|
||||
@ -2445,7 +2438,7 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
|
||||
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
|
||||
|
||||
/// Select replicas in uniformly random order.
|
||||
std::shuffle(replicas.begin(), replicas.end(), rng);
|
||||
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
|
||||
|
||||
for (const String & replica : replicas)
|
||||
{
|
||||
@ -2470,7 +2463,7 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entr
|
||||
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
|
||||
|
||||
/// Select replicas in uniformly random order.
|
||||
std::shuffle(replicas.begin(), replicas.end(), rng);
|
||||
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
|
||||
|
||||
for (const String & replica : replicas)
|
||||
{
|
||||
@ -2529,7 +2522,7 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(
|
||||
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
|
||||
|
||||
/// Select replicas in uniformly random order.
|
||||
std::shuffle(replicas.begin(), replicas.end(), rng);
|
||||
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
|
||||
|
||||
String largest_part_found;
|
||||
String largest_replica_found;
|
||||
|
@ -2,8 +2,7 @@
|
||||
|
||||
#include <algorithm>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <pcg_random.hpp>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
|
||||
|
||||
extern const char * auto_contributors[];
|
||||
@ -23,8 +22,7 @@ void StorageSystemContributors::fillData(MutableColumns & res_columns, const Con
|
||||
for (auto it = auto_contributors; *it; ++it)
|
||||
contributors.emplace_back(*it);
|
||||
|
||||
pcg64 rng(randomSeed());
|
||||
std::shuffle(contributors.begin(), contributors.end(), rng);
|
||||
std::shuffle(contributors.begin(), contributors.end(), thread_local_rng);
|
||||
|
||||
for (auto & it : contributors)
|
||||
res_columns[0]->insert(String(it));
|
||||
|
@ -35,6 +35,7 @@
|
||||
<value>Parquet</value>
|
||||
<value>ODBCDriver2</value>
|
||||
<value>Null</value>
|
||||
<value>MySQLWire</value>
|
||||
</values>
|
||||
</substitution>
|
||||
</substitutions>
|
||||
|
@ -44,6 +44,7 @@
|
||||
<value>Native</value>
|
||||
<value>XML</value>
|
||||
<value>ODBCDriver2</value>
|
||||
<value>MySQLWire</value>
|
||||
</values>
|
||||
</substitution>
|
||||
</substitutions>
|
||||
|
@ -12,7 +12,7 @@ INSERT INTO B(k,t,b) VALUES (2,3,3);
|
||||
|
||||
SELECT A.k, toString(A.t, 'UTC'), A.a, B.b, toString(B.t, 'UTC'), B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t);
|
||||
|
||||
SELECT A.k, toString(A.t, 'UTC'), A.a, B.b, toString(B.t, 'UTC'), B.k FROM A ASOF INNER JOIN B ON A.k == B.k AND A.t == B.t ORDER BY (A.k, A.t);
|
||||
SELECT A.k, toString(A.t, 'UTC'), A.a, B.b, toString(B.t, 'UTC'), B.k FROM A ASOF INNER JOIN B ON A.k == B.k AND A.t >= B.t ORDER BY (A.k, A.t);
|
||||
|
||||
SELECT A.k, toString(A.t, 'UTC'), A.a, B.b, toString(B.t, 'UTC'), B.k FROM A ASOF JOIN B USING(k,t) ORDER BY (A.k, A.t);
|
||||
|
||||
|
13
dbms/tests/queries/0_stateless/00976_asof_join_on.reference
Normal file
13
dbms/tests/queries/0_stateless/00976_asof_join_on.reference
Normal file
@ -0,0 +1,13 @@
|
||||
1 1 0 0
|
||||
1 2 1 2
|
||||
1 3 1 2
|
||||
2 1 0 0
|
||||
2 2 0 0
|
||||
2 3 2 3
|
||||
3 1 0 0
|
||||
3 2 0 0
|
||||
3 3 0 0
|
||||
9
|
||||
1 2 1 2
|
||||
1 3 1 2
|
||||
2 3 2 3
|
21
dbms/tests/queries/0_stateless/00976_asof_join_on.sql
Normal file
21
dbms/tests/queries/0_stateless/00976_asof_join_on.sql
Normal file
@ -0,0 +1,21 @@
|
||||
DROP TABLE IF EXISTS A;
|
||||
DROP TABLE IF EXISTS B;
|
||||
|
||||
CREATE TABLE A(a UInt32, t UInt32) ENGINE = Memory;
|
||||
CREATE TABLE B(b UInt32, t UInt32) ENGINE = Memory;
|
||||
|
||||
INSERT INTO A (a,t) VALUES (1,1),(1,2),(1,3), (2,1),(2,2),(2,3), (3,1),(3,2),(3,3);
|
||||
INSERT INTO B (b,t) VALUES (1,2),(1,4),(2,3);
|
||||
|
||||
SELECT A.a, A.t, B.b, B.t FROM A ASOF LEFT JOIN B ON A.a == B.b AND A.t >= B.t ORDER BY (A.a, A.t);
|
||||
SELECT count() FROM A ASOF LEFT JOIN B ON A.a == B.b AND B.t <= A.t;
|
||||
SELECT A.a, A.t, B.b, B.t FROM A ASOF INNER JOIN B ON B.t <= A.t AND A.a == B.b;
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t <= B.t; -- { serverError 48 }
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND B.t >= A.t; -- { serverError 48 }
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t > B.t; -- { serverError 403 }
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t < B.t; -- { serverError 403 }
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t == B.t; -- { serverError 403 }
|
||||
SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t != B.t; -- { serverError 403 }
|
||||
|
||||
DROP TABLE A;
|
||||
DROP TABLE B;
|
@ -0,0 +1,2 @@
|
||||
SET max_execution_speed = 1, max_execution_time = 3;
|
||||
SELECT count() FROM system.numbers; -- { serverError 159 }
|
@ -15,10 +15,26 @@
|
||||
#define USE_JEMALLOC 0
|
||||
#include <cstdlib>
|
||||
#endif
|
||||
#else
|
||||
#include <cstdlib>
|
||||
#endif
|
||||
|
||||
#define ALWAYS_INLINE inline __attribute__((__always_inline__))
|
||||
#define NO_INLINE __attribute__((__noinline__))
|
||||
// Also defined in Core/Defines.h
|
||||
#if !defined(ALWAYS_INLINE)
|
||||
#if defined(_MSC_VER)
|
||||
#define ALWAYS_INLINE inline __forceinline
|
||||
#else
|
||||
#define ALWAYS_INLINE inline __attribute__((__always_inline__))
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#if !defined(NO_INLINE)
|
||||
#if defined(_MSC_VER)
|
||||
#define NO_INLINE static __declspec(noinline)
|
||||
#else
|
||||
#define NO_INLINE __attribute__((__noinline__))
|
||||
#endif
|
||||
#endif
|
||||
|
||||
namespace Memory
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user