Merge remote-tracking branch 'upstream/master' into CLICKHOUSE-3893

This commit is contained in:
CurtizJ 2018-08-24 18:09:53 +03:00
commit e0b3283ed3
185 changed files with 2022 additions and 1156 deletions

View File

@ -78,7 +78,7 @@ if (USE_STATIC_LIBRARIES)
list(REVERSE CMAKE_FIND_LIBRARY_SUFFIXES)
endif ()
if (CMAKE_LIBRARY_ARCHITECTURE MATCHES "amd64.*|x86_64.*|AMD64.*")
if (CMAKE_SYSTEM_PROCESSOR MATCHES "amd64|x86_64")
option (USE_INTERNAL_MEMCPY "Use internal implementation of 'memcpy' function instead of provided by libc. Only for x86_64." ON)
if (OS_LINUX)

View File

@ -26,9 +26,9 @@ endif ()
if (NOT Boost_SYSTEM_LIBRARY)
set (USE_INTERNAL_BOOST_LIBRARY 1)
set (Boost_PROGRAM_OPTIONS_LIBRARY boost_program_options_internal)
set (Boost_SYSTEM_LIBRARY boost_system_internal)
set (Boost_FILESYSTEM_LIBRARY boost_filesystem_internal)
set (Boost_PROGRAM_OPTIONS_LIBRARY boost_program_options_internal)
set (Boost_FILESYSTEM_LIBRARY boost_filesystem_internal ${Boost_SYSTEM_LIBRARY})
set (Boost_INCLUDE_DIRS)

View File

@ -1,6 +1,4 @@
if (NOT OS_FREEBSD AND NOT APPLE)
option (USE_INTERNAL_ZLIB_LIBRARY "Set to FALSE to use system zlib library instead of bundled" ${NOT_UNBUNDLED})
endif ()
option (USE_INTERNAL_ZLIB_LIBRARY "Set to FALSE to use system zlib library instead of bundled" ${NOT_UNBUNDLED})
if (NOT USE_INTERNAL_ZLIB_LIBRARY)
find_package (ZLIB)

View File

@ -104,14 +104,14 @@ endif ()
if (ENABLE_MYSQL AND USE_INTERNAL_MYSQL_LIBRARY)
add_subdirectory (mariadb-connector-c-cmake)
target_include_directories(mysqlclient PRIVATE BEFORE ${ZLIB_INCLUDE_DIR})
target_include_directories(mysqlclient PRIVATE BEFORE ${OPENSSL_INCLUDE_DIR})
target_include_directories(mysqlclient BEFORE PRIVATE ${ZLIB_INCLUDE_DIR})
target_include_directories(mysqlclient BEFORE PRIVATE ${OPENSSL_INCLUDE_DIR})
endif ()
if (USE_INTERNAL_RDKAFKA_LIBRARY)
add_subdirectory (librdkafka-cmake)
target_include_directories(rdkafka PRIVATE BEFORE ${ZLIB_INCLUDE_DIR})
target_include_directories(rdkafka PRIVATE BEFORE ${OPENSSL_INCLUDE_DIR})
target_include_directories(rdkafka BEFORE PRIVATE ${ZLIB_INCLUDE_DIR})
target_include_directories(rdkafka BEFORE PRIVATE ${OPENSSL_INCLUDE_DIR})
endif ()
if (ENABLE_ODBC AND USE_INTERNAL_ODBC_LIBRARY)

View File

@ -16,7 +16,7 @@ if (NOT MSVC)
add_definitions(-Wno-unused-variable -Wno-deprecated-declarations)
endif ()
add_library(boost_program_options_internal
add_library(boost_program_options_internal ${SPLIT_SHARED}
${LIBRARY_DIR}/libs/program_options/src/cmdline.cpp
${LIBRARY_DIR}/libs/program_options/src/config_file.cpp
${LIBRARY_DIR}/libs/program_options/src/convert.cpp
@ -29,7 +29,7 @@ ${LIBRARY_DIR}/libs/program_options/src/value_semantic.cpp
${LIBRARY_DIR}/libs/program_options/src/variables_map.cpp
${LIBRARY_DIR}/libs/program_options/src/winmain.cpp)
add_library(boost_filesystem_internal
add_library(boost_filesystem_internal ${SPLIT_SHARED}
${LIBRARY_DIR}/libs/filesystem/src/codecvt_error_category.cpp
${LIBRARY_DIR}/libs/filesystem/src/operations.cpp
${LIBRARY_DIR}/libs/filesystem/src/path.cpp
@ -39,9 +39,11 @@ ${LIBRARY_DIR}/libs/filesystem/src/unique_path.cpp
${LIBRARY_DIR}/libs/filesystem/src/utf8_codecvt_facet.cpp
${LIBRARY_DIR}/libs/filesystem/src/windows_file_codecvt.cpp)
add_library(boost_system_internal
add_library(boost_system_internal ${SPLIT_SHARED}
${LIBRARY_DIR}/libs/system/src/error_code.cpp)
target_link_libraries (boost_filesystem_internal PUBLIC boost_system_internal)
target_include_directories (boost_program_options_internal SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
target_include_directories (boost_filesystem_internal SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
target_include_directories (boost_system_internal SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})

View File

@ -1,6 +1,6 @@
SET(LIBRARY_DIR ${ClickHouse_SOURCE_DIR}/contrib/cctz)
add_library(cctz
add_library(cctz ${SPLIT_SHARED}
${LIBRARY_DIR}/src/civil_time_detail.cc
${LIBRARY_DIR}/src/time_zone_fixed.cc
${LIBRARY_DIR}/src/time_zone_format.cc

View File

@ -54,7 +54,7 @@ ${RDKAFKA_SOURCE_DIR}/lz4hc.c
${RDKAFKA_SOURCE_DIR}/rdgz.c
)
add_library(rdkafka STATIC ${SRCS})
add_library(rdkafka ${SPLIT_SHARED} ${SRCS})
target_include_directories(rdkafka PRIVATE include)
target_include_directories(rdkafka SYSTEM PUBLIC ${RDKAFKA_SOURCE_DIR})
target_link_libraries(rdkafka PUBLIC ${ZLIB_LIBRARIES} ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})

View File

@ -23,7 +23,7 @@ ${ODBC_SOURCE_DIR}/libltdl/loaders/preopen.c
${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/libltdl/libltdlcS.c
)
add_library(ltdl STATIC ${SRCS})
add_library(ltdl ${SPLIT_SHARED} ${SRCS})
target_include_directories(ltdl PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/libltdl)
target_include_directories(ltdl PUBLIC ${ODBC_SOURCE_DIR}/libltdl)
@ -273,7 +273,7 @@ ${ODBC_SOURCE_DIR}/lst/lstSetFreeFunc.c
${ODBC_SOURCE_DIR}/lst/_lstVisible.c
)
add_library(unixodbc STATIC ${SRCS})
add_library(unixodbc ${SPLIT_SHARED} ${SRCS})
target_link_libraries(unixodbc ltdl)

View File

@ -125,6 +125,6 @@ IF (ZSTD_LEGACY_SUPPORT)
${LIBRARY_LEGACY_DIR}/zstd_v07.h)
ENDIF (ZSTD_LEGACY_SUPPORT)
ADD_LIBRARY(zstd ${Sources} ${Headers})
ADD_LIBRARY(zstd ${SPLIT_SHARED} ${Sources} ${Headers})
target_include_directories (zstd PUBLIC ${LIBRARY_DIR})

View File

@ -482,6 +482,37 @@ private:
Poco::File(history_file).createFile();
}
#if USE_READLINE
/// Install Ctrl+C signal handler that will be used in interactive mode.
if (rl_initialize())
throw Exception("Cannot initialize readline", ErrorCodes::CANNOT_READLINE);
auto clear_prompt_or_exit = [](int)
{
/// This is signal safe.
ssize_t res = write(STDOUT_FILENO, "\n", 1);
/// Allow to quit client while query is in progress by pressing Ctrl+C twice.
/// (First press to Ctrl+C will try to cancel query by InterruptListener).
if (res == 1 && rl_line_buffer[0] && !RL_ISSTATE(RL_STATE_DONE))
{
rl_replace_line("", 0);
if (rl_forced_update_display())
_exit(0);
}
else
{
/// A little dirty, but we struggle to find better way to correctly
/// force readline to exit after returning from the signal handler.
_exit(0);
}
};
if (signal(SIGINT, clear_prompt_or_exit) == SIG_ERR)
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
#endif
loop();
std::cout << (isNewYearMode() ? "Happy new year." : "Bye.") << std::endl;
@ -1518,33 +1549,6 @@ public:
}
}
#if USE_READLINE
if (rl_initialize())
throw Exception("Cannot initialize readline", ErrorCodes::CANNOT_READLINE);
auto clear_prompt_or_exit = [](int)
{
/// This is signal safe.
ssize_t res = write(STDOUT_FILENO, "\n", 1);
if (res == 1 && rl_line_buffer[0])
{
rl_replace_line("", 0);
if (rl_forced_update_display())
_exit(0);
}
else
{
/// A little dirty, but we struggle to find better way to correctly
/// force readline to exit after returning from the signal handler.
_exit(0);
}
};
if (signal(SIGINT, clear_prompt_or_exit) == SIG_ERR)
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
#endif
ioctl(0, TIOCGWINSZ, &terminal_size);
namespace po = boost::program_options;
@ -1685,7 +1689,7 @@ public:
config().setInt("port", options["port"].as<int>());
if (options.count("secure"))
config().setBool("secure", true);
if (options.count("user"))
if (options.count("user") && !options["user"].defaulted())
config().setString("user", options["user"].as<std::string>());
if (options.count("password"))
config().setString("password", options["password"].as<std::string>());

View File

@ -104,8 +104,8 @@ try
if (!config().has("query") && !config().has("table-structure")) /// Nothing to process
{
if (!config().hasOption("silent"))
std::cerr << "There are no queries to process." << std::endl;
if (config().hasOption("verbose"))
std::cerr << "There are no queries to process." << '\n';
return Application::EXIT_OK;
}
@ -200,8 +200,7 @@ try
}
catch (const Exception & e)
{
if (!config().hasOption("silent"))
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace"));
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace")) << '\n';
/// If exception code isn't zero, we should return non-zero return code anyway.
return e.code() ? e.code() : -1;
@ -274,7 +273,7 @@ void LocalServer::processQueries()
/// Use the same query_id (and thread group) for all queries
CurrentThread::QueryScope query_scope_holder(*context);
bool echo_query = config().hasOption("echo") || config().hasOption("verbose");
bool echo_queries = config().hasOption("echo") || config().hasOption("verbose");
std::exception_ptr exception;
for (const auto & query : queries)
@ -282,8 +281,12 @@ void LocalServer::processQueries()
ReadBufferFromString read_buf(query);
WriteBufferFromFileDescriptor write_buf(STDOUT_FILENO);
if (echo_query)
std::cerr << query << "\n";
if (echo_queries)
{
writeString(query, write_buf);
writeChar('\n', write_buf);
write_buf.next();
}
try
{
@ -297,8 +300,7 @@ void LocalServer::processQueries()
if (!exception)
exception = std::current_exception();
if (!config().has("silent"))
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace"));
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace")) << '\n';
}
}
@ -360,7 +362,7 @@ void LocalServer::setupUsers()
static void showClientVersion()
{
std::cout << DBMS_NAME << " client version " << VERSION_STRING << "." << std::endl;
std::cout << DBMS_NAME << " client version " << VERSION_STRING << "." << '\n';
}
std::string LocalServer::getHelpHeader() const
@ -421,7 +423,6 @@ void LocalServer::init(int argc, char ** argv)
("format,f", po::value<std::string>(), "default output format (clickhouse-client compatibility)")
("output-format", po::value<std::string>(), "default output format")
("silent,s", "quiet mode, do not print errors")
("stacktrace", "print stack traces of exceptions")
("echo", "print query before execution")
("verbose", "print query and other debugging info")
@ -477,8 +478,6 @@ void LocalServer::init(int argc, char ** argv)
if (options.count("output-format"))
config().setString("output-format", options["output-format"].as<std::string>());
if (options.count("silent"))
config().setBool("silent", true);
if (options.count("stacktrace"))
config().setBool("stacktrace", true);
if (options.count("echo"))
@ -507,7 +506,7 @@ int mainEntryClickHouseLocal(int argc, char ** argv)
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}

View File

@ -10,6 +10,9 @@
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <Common/HTMLForm.h>
#include <Parsers/ParserQueryWithOutput.h>
#include <Parsers/parseQuery.h>
#include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include "validateODBCConnectionString.h"
@ -51,7 +54,8 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
Poco::Net::HTMLForm params(request, request.stream());
LOG_TRACE(log, "Request URI: " + request.getURI());
auto process_error = [&response, this](const std::string & message) {
auto process_error = [&response, this](const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
@ -68,9 +72,15 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
process_error("No 'connection_string' in request URL");
return;
}
std::string schema_name = "";
std::string table_name = params.get("table");
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Will fetch info for table '" << table_name << "'");
if (params.has("schema"))
{
schema_name = params.get("schema");
LOG_TRACE(log, "Will fetch info for table '" << schema_name + "." + table_name << "'");
} else
LOG_TRACE(log, "Will fetch info for table '" << table_name << "'");
LOG_TRACE(log, "Got connection str '" << connection_string << "'");
try
@ -86,7 +96,18 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
SCOPE_EXIT(SQLFreeStmt(hstmt, SQL_DROP));
/// TODO Why not do SQLColumns instead?
std::string query = "SELECT * FROM " + table_name + " WHERE 1 = 0";
std::string name = schema_name.empty() ? table_name : schema_name + "." + table_name;
std::stringstream ss;
std::string input = "SELECT * FROM " + name + " WHERE 1 = 0";
ParserQueryWithOutput parser;
ASTPtr select = parseQuery(parser, input.data(), input.data() + input.size(), "", 0);
IAST::FormatSettings settings(ss, true);
settings.always_quote_identifiers = true;
settings.identifier_quoting_style = IdentifierQuotingStyle::DoubleQuotes;
select->format(settings);
std::string query = ss.str();
if (Poco::Data::ODBC::Utility::isError(Poco::Data::ODBC::SQLPrepare(hstmt, reinterpret_cast<SQLCHAR *>(&query[0]), query.size())))
throw Poco::Data::ODBC::DescriptorException(session.dbc());

View File

@ -49,7 +49,8 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
Poco::Net::HTMLForm params(request, request.stream());
LOG_TRACE(log, "Request URI: " + request.getURI());
auto process_error = [&response, this](const std::string & message) {
auto process_error = [&response, this](const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;

View File

@ -366,12 +366,16 @@ int Server::main(const std::vector<std::string> & /*args*/)
dns_cache_updater = std::make_unique<DNSCacheUpdater>(*global_context);
}
if (!TaskStatsInfoGetter::checkProcessHasRequiredPermissions())
#if defined(__linux__)
if (!TaskStatsInfoGetter::checkPermissions())
{
LOG_INFO(log, "It looks like the process has not CAP_NET_ADMIN capability, some performance statistics will be disabled."
" It could happen due to incorrect clickhouse package installation."
" You could resolve the problem manually calling 'sudo setcap cap_net_admin=+ep /usr/bin/clickhouse'");
LOG_INFO(log, "It looks like the process has no CAP_NET_ADMIN capability, some performance statistics will be disabled."
" It could happen due to incorrect ClickHouse package installation."
" You could resolve the problem manually with 'sudo setcap cap_net_admin=+ep /usr/bin/clickhouse'");
}
#else
LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled.");
#endif
{
Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0);

View File

@ -88,7 +88,7 @@ void TCPHandler::runImpl()
try
{
/// We try to send error information to the client.
sendException(e);
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
}
catch (...) {}
@ -103,7 +103,7 @@ void TCPHandler::runImpl()
Exception e("Database " + default_database + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
LOG_ERROR(log, "Code: " << e.code() << ", e.displayText() = " << e.displayText()
<< ", Stack trace:\n\n" << e.getStackTrace().toString());
sendException(e);
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
return;
}
@ -133,6 +133,8 @@ void TCPHandler::runImpl()
std::unique_ptr<Exception> exception;
bool network_error = false;
bool send_exception_with_stack_trace = connection_context.getSettingsRef().calculate_text_stack_trace;
try
{
/// Restore context of request.
@ -149,6 +151,8 @@ void TCPHandler::runImpl()
CurrentThread::initializeQuery();
send_exception_with_stack_trace = query_context.getSettingsRef().calculate_text_stack_trace;
/// Should we send internal logs to client?
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
&& query_context.getSettingsRef().send_logs_level.value != "none")
@ -158,7 +162,8 @@ void TCPHandler::runImpl()
CurrentThread::attachInternalTextLogsQueue(state.logs_queue);
}
query_context.setExternalTablesInitializer([&global_settings, this] (Context & context) {
query_context.setExternalTablesInitializer([&global_settings, this] (Context & context)
{
if (&context != &query_context)
throw Exception("Unexpected context in external tables initializer", ErrorCodes::LOGICAL_ERROR);
@ -245,7 +250,7 @@ void TCPHandler::runImpl()
tryLogCurrentException(log, "Can't send logs to client");
}
sendException(*exception);
sendException(*exception, send_exception_with_stack_trace);
}
}
catch (...)
@ -829,10 +834,10 @@ void TCPHandler::sendLogData(const Block & block)
}
void TCPHandler::sendException(const Exception & e)
void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
{
writeVarUInt(Protocol::Server::Exception, *out);
writeException(e, *out);
writeException(e, *out, with_stack_trace);
out->next();
}

View File

@ -146,7 +146,7 @@ private:
void sendHello();
void sendData(const Block & block); /// Write a block to the network.
void sendLogData(const Block & block);
void sendException(const Exception & e);
void sendException(const Exception & e, bool with_stack_trace);
void sendProgress();
void sendLogs();
void sendEndOfStream();

View File

@ -48,7 +48,7 @@ private:
Mean mean;
Weight weight;
WeightedValue operator+ (const WeightedValue& other)
WeightedValue operator+ (const WeightedValue & other)
{
return {mean + other.weight * (other.mean - mean) / (other.weight + weight), other.weight + weight};
}
@ -263,7 +263,7 @@ public:
compress(max_bins);
}
void merge(const AggregateFunctionHistogramData& other, UInt32 max_bins)
void merge(const AggregateFunctionHistogramData & other, UInt32 max_bins)
{
lower_bound = std::min(lower_bound, other.lower_bound);
upper_bound = std::max(lower_bound, other.upper_bound);
@ -354,7 +354,7 @@ public:
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
auto& data = this->data(const_cast<AggregateDataPtr>(place));
auto & data = this->data(const_cast<AggregateDataPtr>(place));
auto & to_array = static_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = to_array.getOffsets();

View File

@ -14,7 +14,10 @@ AggregateFunctionPtr createAggregateFunctionRetention(const std::string & name,
{
assertNoParameters(name, params);
if (arguments.size() > AggregateFunctionRetentionData::max_events )
if (arguments.size() < 2)
throw Exception("Not enough event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (arguments.size() > AggregateFunctionRetentionData::max_events)
throw Exception("Too many event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<AggregateFunctionRetention>(arguments);

View File

@ -126,19 +126,23 @@ public:
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
auto & data_to = static_cast<ColumnArray &>(to).getData();
auto & data_to = static_cast<ColumnUInt8 &>(static_cast<ColumnArray &>(to).getData()).getData();
auto & offsets_to = static_cast<ColumnArray &>(to).getOffsets();
ColumnArray::Offset current_offset = data_to.size();
data_to.resize(current_offset + events_size);
const bool first_flag = this->data(place).events.test(0);
data_to.insert(first_flag ? Field(static_cast<UInt64>(1)) : Field(static_cast<UInt64>(0)));
for (const auto i : ext::range(1, events_size))
data_to[current_offset] = first_flag;
++current_offset;
for (size_t i = 1; i < events_size; ++i)
{
if (first_flag && this->data(place).events.test(i))
data_to.insert(Field(static_cast<UInt64>(1)));
else
data_to.insert(Field(static_cast<UInt64>(0)));
data_to[current_offset] = (first_flag && this->data(place).events.test(i));
++current_offset;
}
offsets_to.push_back(offsets_to.size() == 0 ? events_size : offsets_to.back() + events_size);
offsets_to.push_back(current_offset);
}
const char * getHeaderFilePath() const override

View File

@ -68,7 +68,7 @@ public:
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override
{
auto & column_unique = static_cast<const IColumnUnique&>(rhs);
auto & column_unique = static_cast<const IColumnUnique &>(rhs);
return getNestedColumn()->compareAt(n, m, *column_unique.getNestedColumn(), nan_direction_hint);
}

View File

@ -112,8 +112,10 @@ public:
std::vector<MutableColumnPtr> scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override ;
void getExtremes(Field & min, Field & max) const override {
void gather(ColumnGathererStream & gatherer_stream) override;
void getExtremes(Field & min, Field & max) const override
{
return getDictionary().index(getIndexes(), 0)->getExtremes(min, max); /// TODO: optimize
}

View File

@ -3,6 +3,7 @@
#include <sstream>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -13,10 +14,12 @@
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
}
ODBCBridgeHelper::ODBCBridgeHelper(
const Configuration & config_, const Poco::Timespan & http_timeout_, const std::string & connection_string_)
: config(config_), http_timeout(http_timeout_), connection_string(connection_string_)
@ -29,16 +32,17 @@ ODBCBridgeHelper::ODBCBridgeHelper(
ping_url.setScheme("http");
ping_url.setPath(PING_HANDLER);
}
void ODBCBridgeHelper::startODBCBridge() const
{
Poco::Path path{config.getString("application.dir", "")};
path.setFileName("clickhouse-odbc-bridge");
path.setFileName("clickhouse");
if (!path.isFile())
throw Exception("clickhouse-odbc-bridge is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND);
if (!Poco::File(path).exists())
throw Exception("clickhouse binary is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND);
std::stringstream command;
command << path.toString() << ' ';
command << path.toString() << " odbc-bridge ";
command << "--http-port " << config.getUInt("odbc_bridge.port", DEFAULT_PORT) << ' ';
command << "--listen-host " << config.getString("odbc_bridge.listen_host", DEFAULT_HOST) << ' ';
command << "--http-timeout " << http_timeout.totalMicroseconds() << ' ';

View File

@ -7,8 +7,11 @@
#include <sstream>
#include <Common/StackTrace.h>
#include <Common/SimpleCache.h>
#include <common/demangle.h>
/// Arcadia compatibility DEVTOOLS-3976
#if defined(BACKTRACE_INCLUDE)
#include BACKTRACE_INCLUDE
@ -19,12 +22,16 @@
StackTrace::StackTrace()
{
frames_size = BACKTRACE_FUNC(frames, STACK_TRACE_MAX_DEPTH);
frames_size = BACKTRACE_FUNC(frames.data(), STACK_TRACE_MAX_DEPTH);
for (size_t i = frames_size; i < STACK_TRACE_MAX_DEPTH; ++i)
frames[i] = nullptr;
}
std::string StackTrace::toString() const
std::string StackTrace::toStringImpl(const Frames & frames, size_t frames_size)
{
char ** symbols = backtrace_symbols(frames, frames_size);
char ** symbols = backtrace_symbols(frames.data(), frames_size);
std::stringstream res;
if (!symbols)
@ -72,3 +79,13 @@ std::string StackTrace::toString() const
free(symbols);
return res.str();
}
std::string StackTrace::toString() const
{
/// Calculation of stack trace text is extremely slow.
/// We use simple cache because otherwise the server could be overloaded by trash queries.
static SimpleCache<decltype(StackTrace::toStringImpl), &StackTrace::toStringImpl> func_cached;
return func_cached(frames, frames_size);
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <string>
#include <array>
#define STACK_TRACE_MAX_DEPTH 32
@ -17,6 +18,9 @@ public:
private:
using Frame = void*;
Frame frames[STACK_TRACE_MAX_DEPTH];
using Frames = std::array<Frame, STACK_TRACE_MAX_DEPTH>;
Frames frames;
size_t frames_size;
static std::string toStringImpl(const Frames & frames, size_t frames_size);
};

View File

@ -5,9 +5,10 @@ StopwatchRUsage::Timestamp StopwatchRUsage::Timestamp::current()
{
StopwatchRUsage::Timestamp res;
::rusage rusage;
::rusage rusage {};
#if !defined(__APPLE__)
::getrusage(RUSAGE_THREAD, &rusage);
#endif
res.user_ns = rusage.ru_utime.tv_sec * 1000000000UL + rusage.ru_utime.tv_usec * 1000UL;
res.sys_ns = rusage.ru_stime.tv_sec * 1000000000UL + rusage.ru_stime.tv_usec * 1000UL;
return res;

View File

@ -2,26 +2,27 @@
#include <Common/Exception.h>
#include <Core/Types.h>
#include <unistd.h>
#if defined(__linux__)
#include <common/unaligned.h>
#include <errno.h>
#include <linux/genetlink.h>
#include <linux/netlink.h>
#include <linux/taskstats.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <syscall.h>
#include <linux/genetlink.h>
#include <linux/netlink.h>
#include <linux/taskstats.h>
#include <linux/capability.h>
/// Basic idea is motivated by "iotop" tool.
/// More info: https://www.kernel.org/doc/Documentation/accounting/taskstats.txt
#define GENLMSG_DATA(glh) ((void *)((char*)NLMSG_DATA(glh) + GENL_HDRLEN))
#define GENLMSG_PAYLOAD(glh) (NLMSG_PAYLOAD(glh, 0) - GENL_HDRLEN)
#define NLA_DATA(na) ((void *)((char*)(na) + NLA_HDRLEN))
#define NLA_PAYLOAD(len) (len - NLA_HDRLEN)
namespace DB
{
@ -29,200 +30,253 @@ namespace DB
namespace ErrorCodes
{
extern const int NETLINK_ERROR;
extern const int LOGICAL_ERROR;
}
namespace
{
static size_t constexpr MAX_MSG_SIZE = 1024;
/** The message contains:
* - Netlink protocol header;
* - Generic Netlink (is a sub-protocol of Netlink that we use) protocol header;
* - Payload
* -- that itself is a list of "Attributes" (sub-messages), each of them contains length (including header), type, and its own payload.
* -- and attribute payload may be represented by the list of embedded attributes.
*/
struct NetlinkMessage
{
::nlmsghdr n;
::genlmsghdr g;
char buf[MAX_MSG_SIZE];
static size_t constexpr MAX_MSG_SIZE = 1024;
alignas(NLMSG_ALIGNTO) ::nlmsghdr header;
struct Attribute
{
::nlattr header;
alignas(NLMSG_ALIGNTO) char payload[0];
const Attribute * next() const
{
return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + NLA_ALIGN(header.nla_len));
}
};
union alignas(NLMSG_ALIGNTO)
{
struct
{
::genlmsghdr generic_header;
union alignas(NLMSG_ALIGNTO)
{
char buf[MAX_MSG_SIZE];
Attribute attribute; /// First attribute. There may be more.
} payload;
};
::nlmsgerr error;
};
size_t payload_size() const
{
return header.nlmsg_len - sizeof(header) - sizeof(generic_header);
}
const Attribute * end() const
{
return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + header.nlmsg_len);
}
void send(int fd) const
{
const char * request_buf = reinterpret_cast<const char *>(this);
ssize_t request_size = header.nlmsg_len;
::sockaddr_nl nladdr{};
nladdr.nl_family = AF_NETLINK;
while (true)
{
ssize_t bytes_sent = ::sendto(fd, request_buf, request_size, 0, reinterpret_cast<const ::sockaddr *>(&nladdr), sizeof(nladdr));
if (bytes_sent <= 0)
{
if (errno == EAGAIN)
continue;
else
throwFromErrno("Can't send a Netlink command", ErrorCodes::NETLINK_ERROR);
}
if (bytes_sent > request_size)
throw Exception("Wrong result of sendto system call: bytes_sent is greater than request size", ErrorCodes::NETLINK_ERROR);
if (bytes_sent == request_size)
break;
request_buf += bytes_sent;
request_size -= bytes_sent;
}
}
void receive(int fd)
{
ssize_t bytes_received = ::recv(fd, this, sizeof(*this), 0);
if (header.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&header), bytes_received))
throw Exception("Can't receive Netlink response, error: " + std::to_string(error.error), ErrorCodes::NETLINK_ERROR);
}
};
int sendCommand(
int sock_fd,
UInt16 nlmsg_type,
UInt32 nlmsg_pid,
UInt8 genl_cmd,
UInt16 nla_type,
void * nla_data,
int nla_len) noexcept
NetlinkMessage query(
int fd,
UInt16 type,
UInt32 pid,
UInt8 command,
UInt16 attribute_type,
const void * attribute_data,
int attribute_size)
{
NetlinkMessage msg{};
NetlinkMessage request;
msg.n.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN);
msg.n.nlmsg_type = nlmsg_type;
msg.n.nlmsg_flags = NLM_F_REQUEST;
msg.n.nlmsg_seq = 0;
msg.n.nlmsg_pid = nlmsg_pid;
msg.g.cmd = genl_cmd;
msg.g.version = 1;
request.header.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN); /// Length of both headers.
request.header.nlmsg_type = type;
request.header.nlmsg_flags = NLM_F_REQUEST; /// A request.
request.header.nlmsg_seq = 0;
request.header.nlmsg_pid = pid;
::nlattr * attr = static_cast<::nlattr *>(GENLMSG_DATA(&msg));
attr->nla_type = nla_type;
attr->nla_len = nla_len + 1 + NLA_HDRLEN;
request.generic_header.cmd = command;
request.generic_header.version = 1;
memcpy(NLA_DATA(attr), nla_data, nla_len);
msg.n.nlmsg_len += NLMSG_ALIGN(attr->nla_len);
request.payload.attribute.header.nla_type = attribute_type;
request.payload.attribute.header.nla_len = attribute_size + 1 + NLA_HDRLEN;
char * buf = reinterpret_cast<char *>(&msg);
ssize_t buflen = msg.n.nlmsg_len;
memcpy(&request.payload.attribute.payload, attribute_data, attribute_size);
::sockaddr_nl nladdr{};
nladdr.nl_family = AF_NETLINK;
request.header.nlmsg_len += NLMSG_ALIGN(request.payload.attribute.header.nla_len);
while (true)
{
ssize_t r = ::sendto(sock_fd, buf, buflen, 0, reinterpret_cast<const ::sockaddr *>(&nladdr), sizeof(nladdr));
request.send(fd);
if (r >= buflen)
break;
NetlinkMessage response;
response.receive(fd);
if (r > 0)
{
buf += r;
buflen -= r;
}
else if (errno != EAGAIN)
return -1;
}
return 0;
return response;
}
UInt16 getFamilyId(int nl_sock_fd) noexcept
UInt16 getFamilyIdImpl(int fd)
{
struct
{
::nlmsghdr header;
::genlmsghdr ge_header;
char buf[256];
} answer;
NetlinkMessage answer = query(fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY, CTRL_ATTR_FAMILY_NAME, TASKSTATS_GENL_NAME, strlen(TASKSTATS_GENL_NAME) + 1);
static char name[] = TASKSTATS_GENL_NAME;
/// NOTE Why the relevant info is located in the second attribute?
const NetlinkMessage::Attribute * attr = answer.payload.attribute.next();
if (sendCommand(
nl_sock_fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY,
CTRL_ATTR_FAMILY_NAME, (void *) name,
strlen(TASKSTATS_GENL_NAME) + 1))
return 0;
if (attr->header.nla_type != CTRL_ATTR_FAMILY_ID)
throw Exception("Received wrong attribute as an answer to GET_FAMILY Netlink command", ErrorCodes::NETLINK_ERROR);
UInt16 id = 0;
ssize_t rep_len = ::recv(nl_sock_fd, &answer, sizeof(answer), 0);
if (answer.header.nlmsg_type == NLMSG_ERROR || (rep_len < 0) || !NLMSG_OK((&answer.header), rep_len))
return 0;
return unalignedLoad<UInt16>(attr->payload);
}
const ::nlattr * attr;
attr = static_cast<const ::nlattr *>(GENLMSG_DATA(&answer));
attr = reinterpret_cast<const ::nlattr *>(reinterpret_cast<const char *>(attr) + NLA_ALIGN(attr->nla_len));
if (attr->nla_type == CTRL_ATTR_FAMILY_ID)
id = *static_cast<const UInt16 *>(NLA_DATA(attr));
return id;
bool checkPermissionsImpl()
{
/// See man getcap.
__user_cap_header_struct request{};
request.version = _LINUX_CAPABILITY_VERSION_1; /// It's enough to check just single CAP_NET_ADMIN capability we are interested.
request.pid = getpid();
__user_cap_data_struct response{};
/// Avoid dependency on 'libcap'.
if (0 != syscall(SYS_capget, &request, &response))
throwFromErrno("Cannot do 'capget' syscall", ErrorCodes::NETLINK_ERROR);
return (1 << CAP_NET_ADMIN) & response.effective;
}
UInt16 getFamilyId(int fd)
{
/// It is thread and exception safe since C++11 and even before.
static UInt16 res = getFamilyIdImpl(fd);
return res;
}
}
TaskStatsInfoGetter::TaskStatsInfoGetter() = default;
void TaskStatsInfoGetter::init()
bool TaskStatsInfoGetter::checkPermissions()
{
if (netlink_socket_fd >= 0)
return;
static bool res = checkPermissionsImpl();
return res;
}
TaskStatsInfoGetter::TaskStatsInfoGetter()
{
if (!checkPermissions())
throw Exception("Logical error: TaskStatsInfoGetter is not usable without CAP_NET_ADMIN. Check permissions before creating the object.",
ErrorCodes::LOGICAL_ERROR);
netlink_socket_fd = ::socket(PF_NETLINK, SOCK_RAW, NETLINK_GENERIC);
if (netlink_socket_fd < 0)
throwFromErrno("Can't create PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
/// On some containerized environments, operation on Netlink socket could hang forever.
/// We set reasonably small timeout to overcome this issue.
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 50000;
netlink_socket_fd = ::socket(PF_NETLINK, SOCK_RAW, NETLINK_GENERIC);
if (netlink_socket_fd < 0)
throwFromErrno("Can't create PF_NETLINK socket");
if (0 != ::setsockopt(netlink_socket_fd, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char *>(&tv), sizeof(tv)))
throwFromErrno("Can't set timeout on PF_NETLINK socket");
throwFromErrno("Can't set timeout on PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
::sockaddr_nl addr{};
addr.nl_family = AF_NETLINK;
if (::bind(netlink_socket_fd, reinterpret_cast<const ::sockaddr *>(&addr), sizeof(addr)) < 0)
throwFromErrno("Can't bind PF_NETLINK socket");
throwFromErrno("Can't bind PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
netlink_family_id = getFamilyId(netlink_socket_fd);
taskstats_family_id = getFamilyId(netlink_socket_fd);
}
bool TaskStatsInfoGetter::getStatImpl(int tid, ::taskstats & out_stats, bool throw_on_error)
void TaskStatsInfoGetter::getStat(::taskstats & out_stats, pid_t tid)
{
init();
NetlinkMessage answer = query(netlink_socket_fd, taskstats_family_id, tid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &tid, sizeof(tid));
if (sendCommand(netlink_socket_fd, netlink_family_id, tid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &tid, sizeof(pid_t)))
throwFromErrno("Can't send a Netlink command");
NetlinkMessage msg;
ssize_t rv = ::recv(netlink_socket_fd, &msg, sizeof(msg), 0);
if (msg.n.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&msg.n), rv))
for (const NetlinkMessage::Attribute * attr = &answer.payload.attribute;
attr < answer.end();
attr = attr->next())
{
const ::nlmsgerr * err = static_cast<const ::nlmsgerr *>(NLMSG_DATA(&msg));
if (throw_on_error)
throw Exception("Can't get Netlink response, error: " + std::to_string(err->error), ErrorCodes::NETLINK_ERROR);
else
return false;
}
rv = GENLMSG_PAYLOAD(&msg.n);
const ::nlattr * attr = static_cast<const ::nlattr *>(GENLMSG_DATA(&msg));
ssize_t len = 0;
while (len < rv)
{
len += NLA_ALIGN(attr->nla_len);
if (attr->nla_type == TASKSTATS_TYPE_AGGR_TGID || attr->nla_type == TASKSTATS_TYPE_AGGR_PID)
if (attr->header.nla_type == TASKSTATS_TYPE_AGGR_TGID || attr->header.nla_type == TASKSTATS_TYPE_AGGR_PID)
{
int aggr_len = NLA_PAYLOAD(attr->nla_len);
int len2 = 0;
attr = static_cast<const ::nlattr *>(NLA_DATA(attr));
while (len2 < aggr_len)
for (const NetlinkMessage::Attribute * nested_attr = reinterpret_cast<const NetlinkMessage::Attribute *>(attr->payload);
nested_attr < attr->next();
nested_attr = nested_attr->next())
{
if (attr->nla_type == TASKSTATS_TYPE_STATS)
if (nested_attr->header.nla_type == TASKSTATS_TYPE_STATS)
{
const ::taskstats * ts = static_cast<const ::taskstats *>(NLA_DATA(attr));
out_stats = *ts;
out_stats = unalignedLoad<::taskstats>(nested_attr->payload);
return;
}
len2 += NLA_ALIGN(attr->nla_len);
attr = reinterpret_cast<const ::nlattr *>(reinterpret_cast<const char *>(attr) + len2);
}
}
attr = reinterpret_cast<const ::nlattr *>(reinterpret_cast<const char *>(GENLMSG_DATA(&msg)) + len);
}
return true;
throw Exception("There is no TASKSTATS_TYPE_STATS attribute in the Netlink response", ErrorCodes::NETLINK_ERROR);
}
void TaskStatsInfoGetter::getStat(::taskstats & stat, int tid)
pid_t TaskStatsInfoGetter::getCurrentTID()
{
tid = tid < 0 ? getDefaultTID() : tid;
getStatImpl(tid, stat, true);
/// This call is always successful. - man gettid
return static_cast<pid_t>(syscall(SYS_gettid));
}
bool TaskStatsInfoGetter::tryGetStat(::taskstats & stat, int tid)
{
tid = tid < 0 ? getDefaultTID() : tid;
return getStatImpl(tid, stat, false);
}
TaskStatsInfoGetter::~TaskStatsInfoGetter()
{
@ -230,32 +284,43 @@ TaskStatsInfoGetter::~TaskStatsInfoGetter()
close(netlink_socket_fd);
}
int TaskStatsInfoGetter::getCurrentTID()
}
#else
namespace DB
{
/// This call is always successful. - man gettid
return static_cast<int>(syscall(SYS_gettid));
}
int TaskStatsInfoGetter::getDefaultTID()
namespace ErrorCodes
{
if (default_tid < 0)
default_tid = getCurrentTID();
return default_tid;
extern const int NOT_IMPLEMENTED;
}
static bool tryGetTaskStats()
bool TaskStatsInfoGetter::checkPermissions()
{
TaskStatsInfoGetter getter;
::taskstats stat;
return getter.tryGetStat(stat);
return false;
}
bool TaskStatsInfoGetter::checkProcessHasRequiredPermissions()
TaskStatsInfoGetter::TaskStatsInfoGetter()
{
throw Exception("TaskStats are not implemented for this OS.", ErrorCodes::NOT_IMPLEMENTED);
}
void TaskStatsInfoGetter::getStat(::taskstats &, pid_t)
{
}
pid_t TaskStatsInfoGetter::getCurrentTID()
{
return 0;
}
TaskStatsInfoGetter::~TaskStatsInfoGetter()
{
/// It is thread- and exception- safe since C++11
static bool res = tryGetTaskStats();
return res;
}
}
#endif

View File

@ -1,43 +1,34 @@
#pragma once
#include <sys/types.h>
#include <Core/Types.h>
#include <boost/noncopyable.hpp>
struct taskstats;
namespace DB
{
class Exception;
/// Get taskstat info from OS kernel via Netlink protocol.
class TaskStatsInfoGetter
class TaskStatsInfoGetter : private boost::noncopyable
{
public:
TaskStatsInfoGetter();
TaskStatsInfoGetter(const TaskStatsInfoGetter &) = delete;
void getStat(::taskstats & stat, int tid = -1);
bool tryGetStat(::taskstats & stat, int tid = -1);
~TaskStatsInfoGetter();
void getStat(::taskstats & stat, pid_t tid);
/// Make a syscall and returns Linux thread id
static int getCurrentTID();
static pid_t getCurrentTID();
/// Whether the current process has permissions (sudo or cap_net_admin capabilties) to get taskstats info
static bool checkProcessHasRequiredPermissions();
static bool checkPermissions();
#if defined(__linux__)
private:
/// Caches current thread tid to avoid extra sys calls
int getDefaultTID();
int default_tid = -1;
bool getStatImpl(int tid, ::taskstats & out_stats, bool throw_on_error = false);
void init();
int netlink_socket_fd = -1;
UInt16 netlink_family_id = 0;
UInt16 taskstats_family_id = 0;
#endif
};
}

View File

@ -1,11 +1,16 @@
#pragma once
#include <Common/TaskStatsInfoGetter.h>
#include <Common/ProfileEvents.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <pthread.h>
#if defined(__linux__)
#include <linux/taskstats.h>
#else
struct taskstats {};
#endif
namespace ProfileEvents
@ -18,6 +23,7 @@ namespace ProfileEvents
extern const Event VoluntaryContextSwitches;
extern const Event InvoluntaryContextSwitches;
#if defined(__linux__)
extern const Event OSIOWaitMicroseconds;
extern const Event OSCPUWaitMicroseconds;
extern const Event OSCPUVirtualTimeMicroseconds;
@ -25,6 +31,7 @@ namespace ProfileEvents
extern const Event OSWriteChars;
extern const Event OSReadBytes;
extern const Event OSWriteBytes;
#endif
}
@ -82,8 +89,10 @@ struct RUsageCounters
static RUsageCounters current(UInt64 real_time_ = getCurrentTimeNanoseconds())
{
::rusage rusage;
::rusage rusage {};
#if !defined(__APPLE__)
::getrusage(RUSAGE_THREAD, &rusage);
#endif
return RUsageCounters(rusage, real_time_);
}
@ -106,6 +115,8 @@ struct RUsageCounters
};
#if defined(__linux__)
struct TasksStatsCounters
{
::taskstats stat;
@ -141,4 +152,17 @@ struct TasksStatsCounters
}
};
#else
struct TasksStatsCounters
{
::taskstats stat;
static TasksStatsCounters current();
static void incrementProfileEvents(const TasksStatsCounters &, const TasksStatsCounters &, ProfileEvents::Counters &) {}
static void updateProfileEvents(TasksStatsCounters &, ProfileEvents::Counters &) {}
};
#endif
}

View File

@ -1,10 +1,13 @@
#include "ThreadStatus.h"
#include <common/logger_useful.h>
#include <sstream>
#include <common/Types.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/TaskStatsInfoGetter.h>
#include <Common/ThreadStatus.h>
#include <Poco/Thread.h>
#include <Poco/Logger.h>
#include <Poco/Ext/ThreadNumber.h>
@ -39,7 +42,6 @@ ThreadStatus::ThreadStatus()
last_rusage = std::make_unique<RUsageCounters>();
last_taskstats = std::make_unique<TasksStatsCounters>();
taskstats_getter = std::make_unique<TaskStatsInfoGetter>();
memory_tracker.setDescription("(for thread)");
log = &Poco::Logger::get("ThreadStatus");
@ -70,9 +72,12 @@ void ThreadStatus::initPerformanceCounters()
++queries_started;
*last_rusage = RUsageCounters::current(query_start_time_nanoseconds);
has_permissions_for_taskstats = TaskStatsInfoGetter::checkProcessHasRequiredPermissions();
if (has_permissions_for_taskstats)
if (TaskStatsInfoGetter::checkPermissions())
{
taskstats_getter = std::make_unique<TaskStatsInfoGetter>();
*last_taskstats = TasksStatsCounters::current();
}
}
void ThreadStatus::updatePerformanceCounters()
@ -80,7 +85,7 @@ void ThreadStatus::updatePerformanceCounters()
try
{
RUsageCounters::updateProfileEvents(*last_rusage, performance_counters);
if (has_permissions_for_taskstats)
if (taskstats_getter)
TasksStatsCounters::updateProfileEvents(*last_taskstats, performance_counters);
}
catch (...)

View File

@ -33,7 +33,6 @@ using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>;
class ThreadGroupStatus
{
public:
mutable std::shared_mutex mutex;
ProfileEvents::Counters performance_counters{VariableContext::Process};
@ -126,7 +125,6 @@ public:
~ThreadStatus();
protected:
ThreadStatus();
void initPerformanceCounters();
@ -160,11 +158,11 @@ protected:
/// Use ptr not to add extra dependencies in the header
std::unique_ptr<RUsageCounters> last_rusage;
std::unique_ptr<TasksStatsCounters> last_taskstats;
/// Set only if we have enough capabilities.
std::unique_ptr<TaskStatsInfoGetter> taskstats_getter;
bool has_permissions_for_taskstats = false;
public:
/// Implicitly finalizes current thread in the destructor
class CurrentThreadScope
{

View File

@ -85,8 +85,7 @@ private:
std::string node_path = node->getPath();
node_name = node_path.substr(node_path.find_last_of('/') + 1);
task->activate();
task->schedule();
task->activateAndSchedule();
}
void releaseNode()

View File

@ -20,7 +20,7 @@ public:
/// вызывать из одного потока - не thread safe
template <typename... Args>
void init(Args&&... args);
void init(Args &&... args);
/// был ли класс инициализирован
bool isInitialized() const { return ptr != nullptr; }
@ -76,7 +76,7 @@ private:
};
template <typename... Args>
void ZooKeeperHolder::init(Args&&... args)
void ZooKeeperHolder::init(Args &&... args)
{
ptr = std::make_shared<ZooKeeper>(std::forward<Args>(args)...);
}

View File

@ -1,6 +1,5 @@
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <thread>
#include <fstream>
#if defined(__x86_64__)
@ -14,25 +13,6 @@
unsigned getNumberOfPhysicalCPUCores()
{
#if defined(__linux__)
/// On Linux we try to look at Cgroups limit if it is available.
std::ifstream cgroup_read_in("/sys/fs/cgroup/cpu/cpu.cfs_quota_us");
if (cgroup_read_in.is_open())
{
std::string allocated_cpus_share_str{ std::istreambuf_iterator<char>(cgroup_read_in), std::istreambuf_iterator<char>() };
int allocated_cpus_share_int = std::stoi(allocated_cpus_share_str);
cgroup_read_in.close();
// If a valid value is present
if (allocated_cpus_share_int > 0)
{
unsigned allocated_cpus = (allocated_cpus_share_int + 999) / 1000;
return allocated_cpus;
}
}
#endif
#if defined(__x86_64__)
cpu_raw_data_t raw_data;
if (0 != cpuid_get_raw_data(&raw_data))

View File

@ -1,4 +1,5 @@
#pragma once
#include <string>
/// Maps 0..15 to 0..9A..F or 0..9a..f correspondingly.

View File

@ -39,13 +39,17 @@ std::string getThreadName()
{
std::string name(16, '\0');
#if defined(__FreeBSD__) || defined(__APPLE__)
if (pthread_get_name_np(pthread_self(), name.data(), name.size());
throw DB::Exception("Cannot get thread name with pthread_get_name_np()", DB::ErrorCodes::PTHREAD_ERROR);
#if defined(__APPLE__)
if (pthread_getname_np(pthread_self(), name.data(), name.size()))
throw DB::Exception("Cannot get thread name with pthread_getname_np()", DB::ErrorCodes::PTHREAD_ERROR);
#elif defined(__FreeBSD__)
// TODO: make test. freebsd will have this function soon https://freshbsd.org/commit/freebsd/r337983
// if (pthread_get_name_np(pthread_self(), name.data(), name.size()))
// throw DB::Exception("Cannot get thread name with pthread_get_name_np()", DB::ErrorCodes::PTHREAD_ERROR);
#else
if (0 != prctl(PR_GET_NAME, name.data(), 0, 0, 0))
#endif
DB::throwFromErrno("Cannot get thread name with prctl(PR_GET_NAME)");
#endif
name.resize(std::strlen(name.data()));
return name;

View File

@ -190,7 +190,7 @@ bool test_concurrent()
bool res = true;
auto load_func = [](const std::string& result, std::chrono::seconds sleep_for, bool throw_exc)
auto load_func = [](const std::string & result, std::chrono::seconds sleep_for, bool throw_exc)
{
std::this_thread::sleep_for(sleep_for);
if (throw_exc)

View File

@ -43,17 +43,7 @@ bool BackgroundSchedulePool::TaskInfo::schedule()
if (deactivated || scheduled)
return false;
scheduled = true;
if (delayed)
pool.cancelDelayedTask(shared_from_this(), lock);
/// If the task is not executing at the moment, enqueue it for immediate execution.
/// But if it is currently executing, do nothing because it will be enqueued
/// at the end of the execute() method.
if (!executing)
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
scheduleImpl(lock);
return true;
}
@ -89,6 +79,18 @@ void BackgroundSchedulePool::TaskInfo::activate()
deactivated = false;
}
bool BackgroundSchedulePool::TaskInfo::activateAndSchedule()
{
std::lock_guard lock(schedule_mutex);
deactivated = false;
if (scheduled)
return false;
scheduleImpl(lock);
return true;
}
void BackgroundSchedulePool::TaskInfo::execute()
{
Stopwatch watch;
@ -129,6 +131,20 @@ void BackgroundSchedulePool::TaskInfo::execute()
}
}
void BackgroundSchedulePool::TaskInfo::scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock)
{
scheduled = true;
if (delayed)
pool.cancelDelayedTask(shared_from_this(), schedule_mutex_lock);
/// If the task is not executing at the moment, enqueue it for immediate execution.
/// But if it is currently executing, do nothing because it will be enqueued
/// at the end of the execute() method.
if (!executing)
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
}
zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
{
return [t = shared_from_this()](const ZooKeeperImpl::ZooKeeper::WatchResponse &)
@ -143,12 +159,6 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size)
{
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads");
/// Put all threads of both thread pools to one thread group
/// The master thread exits immediately
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
CurrentThread::detachQuery();
threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
@ -217,14 +227,29 @@ void BackgroundSchedulePool::cancelDelayedTask(const TaskInfoPtr & task, std::lo
}
void BackgroundSchedulePool::attachToThreadGroup()
{
std::lock_guard lock(delayed_tasks_mutex);
if (thread_group)
{
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
}
else
{
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
}
}
void BackgroundSchedulePool::threadFunction()
{
setThreadName("BackgrSchedPool");
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool);
while (!shutdown)
@ -242,8 +267,7 @@ void BackgroundSchedulePool::delayExecutionThreadFunction()
{
setThreadName("BckSchPoolDelay");
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
while (!shutdown)

View File

@ -50,11 +50,14 @@ public:
/// Schedule for execution after specified delay.
bool scheduleAfter(size_t ms);
/// Further attempts to schedule become no-op.
/// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task.
void deactivate();
void activate();
/// Atomically activate task and schedule it for execution.
bool activateAndSchedule();
/// get zkutil::WatchCallback needed for notifications from ZooKeeper watches.
zkutil::WatchCallback getWatchCallback();
@ -64,6 +67,8 @@ public:
void execute();
void scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock);
BackgroundSchedulePool & pool;
std::string log_name;
TaskFunc function;
@ -142,6 +147,8 @@ private:
/// Thread group used for profiling purposes
ThreadGroupStatusPtr thread_group;
void attachToThreadGroup();
};
using BackgroundSchedulePoolPtr = std::shared_ptr<BackgroundSchedulePool>;

View File

@ -21,43 +21,31 @@ namespace ErrorCodes
ColumnGathererStream::ColumnGathererStream(
const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
size_t block_preferred_size_)
: column_name(column_name_), row_sources_buf(row_sources_buf_)
: column_name(column_name_), sources(source_streams.size()), row_sources_buf(row_sources_buf_)
, block_preferred_size(block_preferred_size_), log(&Logger::get("ColumnGathererStream"))
{
if (source_streams.empty())
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
children.assign(source_streams.begin(), source_streams.end());
}
void ColumnGathererStream::init()
{
sources.reserve(children.size());
for (size_t i = 0; i < children.size(); ++i)
{
sources.emplace_back(children[i]->read(), column_name);
Block & block = sources.back().block;
const Block & header = children[i]->getHeader();
/// Sometimes MergeTreeReader injects additional column with partitioning key
if (block.columns() > 2)
if (header.columns() > 2)
throw Exception(
"Block should have 1 or 2 columns, but contains " + toString(block.columns()),
"Block should have 1 or 2 columns, but contains " + toString(header.columns()),
ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
if (!block.has(column_name))
throw Exception(
"Not found column '" + column_name + "' in block.",
ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
if (i == 0)
{
column.name = column_name;
column.type = block.getByName(column_name).type;
column.type = header.getByName(column_name).type;
column.column = column.type->createColumn();
}
if (block.getByName(column_name).column->getName() != column.column->getName())
else if (header.getByName(column_name).column->getName() != column.column->getName())
throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
}
@ -69,10 +57,6 @@ Block ColumnGathererStream::readImpl()
if (children.size() == 1 && row_sources_buf.eof())
return children[0]->read();
/// Initialize first source blocks
if (sources.empty())
init();
if (!source_to_fully_copy && row_sources_buf.eof())
return Block();

View File

@ -76,16 +76,11 @@ private:
/// Cache required fields
struct Source
{
const IColumn * column;
size_t pos;
size_t size;
const IColumn * column = nullptr;
size_t pos = 0;
size_t size = 0;
Block block;
Source(Block && block_, const String & name) : block(std::move(block_))
{
update(name);
}
void update(const String & name)
{
column = block.getByName(name).column.get();
@ -94,7 +89,6 @@ private:
}
};
void init();
void fetchNewBlock(Source & source, size_t source_num);
String column_name;

View File

@ -300,7 +300,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupSt
try
{
if (thread_group)
CurrentThread::attachTo(thread_group);
CurrentThread::attachToIfDetached(thread_group);
setThreadName("MergeAggMergThr");
while (!parallel_merge_data->finish)

View File

@ -669,6 +669,9 @@ void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams(
}
};
if (!settings.continuous_reading)
state_with_dictionary->num_pending_rows = 0;
bool first_dictionary = true;
while (limit)
{

View File

@ -1,4 +1,5 @@
#pragma once
#include <Columns/ColumnVector.h>
#include <Columns/ColumnString.h>
#include <Columns/IColumn.h>
@ -21,23 +22,24 @@ namespace ErrorCodes
}
/*
* BlockInputStream implementation for external dictionaries
* read() returns single block consisting of the in-memory contents of the dictionaries
/* BlockInputStream implementation for external dictionaries
* read() returns blocks consisting of the in-memory contents of the dictionaries
*/
template <typename DictionaryType, typename Key>
class DictionaryBlockInputStream : public DictionaryBlockInputStreamBase
{
public:
using DictionatyPtr = std::shared_ptr<DictionaryType const>;
using DictionaryPtr = std::shared_ptr<DictionaryType const>;
DictionaryBlockInputStream(std::shared_ptr<const IDictionaryBase> dictionary, size_t max_block_size,
PaddedPODArray<Key> && ids, const Names & column_names);
DictionaryBlockInputStream(std::shared_ptr<const IDictionaryBase> dictionary, size_t max_block_size,
const std::vector<StringRef> & keys, const Names & column_names);
using GetColumnsFunction =
std::function<ColumnsWithTypeAndName(const Columns &, const std::vector<DictionaryAttribute> & attributes)>;
// Used to separate key columns format for storage and view.
// Calls get_key_columns_function to get key column for dictionary get fuction call
// and get_view_columns_function to get key representation.
@ -59,64 +61,69 @@ private:
// pointer types to getXXX functions
// for single key dictionaries
template <typename Type>
using DictionaryGetter = void (DictionaryType::*)(
const std::string &, const PaddedPODArray<Key> &, PaddedPODArray<Type> &) const;
using DictionaryStringGetter = void (DictionaryType::*)(
const std::string &, const PaddedPODArray<Key> &, ColumnString *) const;
using DictionaryGetter = void (DictionaryType::*)(const std::string &, const PaddedPODArray<Key> &, PaddedPODArray<Type> &) const;
using DictionaryStringGetter = void (DictionaryType::*)(const std::string &, const PaddedPODArray<Key> &, ColumnString *) const;
// for complex complex key dictionaries
template <typename Type>
using GetterByKey = void (DictionaryType::*)(
const std::string &, const Columns &, const DataTypes &, PaddedPODArray<Type> & out) const;
using StringGetterByKey = void (DictionaryType::*)(
const std::string &, const Columns &, const DataTypes &, ColumnString * out) const;
using GetterByKey = void (DictionaryType::*)(const std::string &, const Columns &, const DataTypes &, PaddedPODArray<Type> & out) const;
using StringGetterByKey = void (DictionaryType::*)(const std::string &, const Columns &, const DataTypes &, ColumnString * out) const;
// call getXXX
// for single key dictionaries
template <typename Type, typename Container>
void callGetter(DictionaryGetter<Type> getter, const PaddedPODArray<Key> & ids,
void callGetter(DictionaryGetter<Type> getter, const PaddedPODArray<Key> & ids_to_fill,
const Columns & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
template <typename Container>
void callGetter(DictionaryStringGetter getter, const PaddedPODArray<Key> & ids,
void callGetter(DictionaryStringGetter getter, const PaddedPODArray<Key> & ids_to_fill,
const Columns & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
// for complex complex key dictionaries
template <typename Type, typename Container>
void callGetter(GetterByKey<Type> getter, const PaddedPODArray<Key> & ids,
void callGetter(GetterByKey<Type> getter, const PaddedPODArray<Key> & ids_to_fill,
const Columns & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
template <typename Container>
void callGetter(StringGetterByKey getter, const PaddedPODArray<Key> & ids,
void callGetter(StringGetterByKey getter, const PaddedPODArray<Key> & ids_to_fill,
const Columns & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
template <template <typename> class Getter, typename StringGetter>
Block fillBlock(const PaddedPODArray<Key> & ids, const Columns & keys,
Block fillBlock(const PaddedPODArray<Key> & ids_to_fill, const Columns & keys,
const DataTypes & types, ColumnsWithTypeAndName && view) const;
template <typename AttributeType, typename Getter>
ColumnPtr getColumnFromAttribute(Getter getter, const PaddedPODArray<Key> & ids,
ColumnPtr getColumnFromAttribute(Getter getter, const PaddedPODArray<Key> & ids_to_fill,
const Columns & keys, const DataTypes & data_types,
const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
template <typename Getter>
ColumnPtr getColumnFromStringAttribute(Getter getter, const PaddedPODArray<Key> & ids,
ColumnPtr getColumnFromStringAttribute(Getter getter, const PaddedPODArray<Key> & ids_to_fill,
const Columns & keys, const DataTypes & data_types,
const DictionaryAttribute& attribute, const DictionaryType& dictionary) const;
ColumnPtr getColumnFromIds(const PaddedPODArray<Key> & ids) const;
const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
ColumnPtr getColumnFromIds(const PaddedPODArray<Key> & ids_to_fill) const;
void fillKeyColumns(const std::vector<StringRef> & keys, size_t start, size_t size,
const DictionaryStructure & dictionary_structure, ColumnsWithTypeAndName & columns) const;
DictionatyPtr dictionary;
DictionaryPtr dictionary;
Names column_names;
PaddedPODArray<Key> ids;
ColumnsWithTypeAndName key_columns;
Poco::Logger * logger;
Block (DictionaryBlockInputStream<DictionaryType, Key>::*fillBlockFunction)(
const PaddedPODArray<Key> & ids, const Columns& keys,
using FillBlockFunction = Block (DictionaryBlockInputStream<DictionaryType, Key>::*)(
const PaddedPODArray<Key> & ids_to_fill, const Columns & keys,
const DataTypes & types, ColumnsWithTypeAndName && view) const;
FillBlockFunction fill_block_function;
Columns data_columns;
GetColumnsFunction get_key_columns_function;
GetColumnsFunction get_view_columns_function;
@ -131,15 +138,16 @@ private:
DictionaryKeyType key_type;
};
template <typename DictionaryType, typename Key>
DictionaryBlockInputStream<DictionaryType, Key>::DictionaryBlockInputStream(
std::shared_ptr<const IDictionaryBase> dictionary, size_t max_block_size,
PaddedPODArray<Key> && ids, const Names& column_names)
PaddedPODArray<Key> && ids, const Names & column_names)
: DictionaryBlockInputStreamBase(ids.size(), max_block_size),
dictionary(std::static_pointer_cast<const DictionaryType>(dictionary)),
column_names(column_names), ids(std::move(ids)),
logger(&Poco::Logger::get("DictionaryBlockInputStream")),
fillBlockFunction(&DictionaryBlockInputStream<DictionaryType, Key>::fillBlock<DictionaryGetter, DictionaryStringGetter>),
fill_block_function(&DictionaryBlockInputStream<DictionaryType, Key>::fillBlock<DictionaryGetter, DictionaryStringGetter>),
key_type(DictionaryKeyType::Id)
{
}
@ -147,11 +155,11 @@ DictionaryBlockInputStream<DictionaryType, Key>::DictionaryBlockInputStream(
template <typename DictionaryType, typename Key>
DictionaryBlockInputStream<DictionaryType, Key>::DictionaryBlockInputStream(
std::shared_ptr<const IDictionaryBase> dictionary, size_t max_block_size,
const std::vector<StringRef> & keys, const Names& column_names)
const std::vector<StringRef> & keys, const Names & column_names)
: DictionaryBlockInputStreamBase(keys.size(), max_block_size),
dictionary(std::static_pointer_cast<const DictionaryType>(dictionary)), column_names(column_names),
logger(&Poco::Logger::get("DictionaryBlockInputStream")),
fillBlockFunction(&DictionaryBlockInputStream<DictionaryType, Key>::fillBlock<GetterByKey, StringGetterByKey>),
fill_block_function(&DictionaryBlockInputStream<DictionaryType, Key>::fillBlock<GetterByKey, StringGetterByKey>),
key_type(DictionaryKeyType::ComplexKey)
{
const DictionaryStructure & dictionaty_structure = dictionary->getStructure();
@ -167,13 +175,14 @@ DictionaryBlockInputStream<DictionaryType, Key>::DictionaryBlockInputStream(
: DictionaryBlockInputStreamBase(data_columns.front()->size(), max_block_size),
dictionary(std::static_pointer_cast<const DictionaryType>(dictionary)), column_names(column_names),
logger(&Poco::Logger::get("DictionaryBlockInputStream")),
fillBlockFunction(&DictionaryBlockInputStream<DictionaryType, Key>::fillBlock<GetterByKey, StringGetterByKey>),
fill_block_function(&DictionaryBlockInputStream<DictionaryType, Key>::fillBlock<GetterByKey, StringGetterByKey>),
data_columns(data_columns),
get_key_columns_function(get_key_columns_function), get_view_columns_function(get_view_columns_function),
key_type(DictionaryKeyType::Callback)
{
}
template <typename DictionaryType, typename Key>
Block DictionaryBlockInputStream<DictionaryType, Key>::getBlock(size_t start, size_t length) const
{
@ -190,13 +199,15 @@ Block DictionaryBlockInputStream<DictionaryType, Key>::getBlock(size_t start, si
columns.emplace_back(column);
view_columns.emplace_back(column, key_column.type, key_column.name);
}
return (this->*fillBlockFunction)({}, columns, {}, std::move(view_columns));
return (this->*fill_block_function)({}, columns, {}, std::move(view_columns));
}
case DictionaryKeyType::Id:
{
PaddedPODArray<Key> block_ids(ids.begin() + start, ids.begin() + start + length);
return (this->*fillBlockFunction)(block_ids, {}, {}, {});
PaddedPODArray<Key> ids_to_fill(ids.begin() + start, ids.begin() + start + length);
return (this->*fill_block_function)(ids_to_fill, {}, {}, {});
}
case DictionaryKeyType::Callback:
{
Columns columns;
@ -214,12 +225,14 @@ Block DictionaryBlockInputStream<DictionaryType, Key>::getBlock(size_t start, si
columns.push_back(key_column.column);
types.push_back(key_column.type);
}
return (this->*fillBlockFunction)({}, columns, types, std::move(view_with_type_and_name));
return (this->*fill_block_function)({}, columns, types, std::move(view_with_type_and_name));
}
}
throw Exception("Unexpected DictionaryKeyType.", ErrorCodes::LOGICAL_ERROR);
}
template <typename DictionaryType, typename Key>
template <typename Type, typename Container>
void DictionaryBlockInputStream<DictionaryType, Key>::callGetter(
@ -260,6 +273,7 @@ void DictionaryBlockInputStream<DictionaryType, Key>::callGetter(
(dict.*getter)(attribute.name, keys, data_types, container);
}
template <typename DictionaryType, typename Key>
template <template <typename> class Getter, typename StringGetter>
Block DictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
@ -287,7 +301,7 @@ Block DictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
for (const auto idx : ext::range(0, structure.attributes.size()))
{
const DictionaryAttribute& attribute = structure.attributes[idx];
const DictionaryAttribute & attribute = structure.attributes[idx];
if (names.find(attribute.name) != names.end())
{
ColumnPtr column;
@ -332,7 +346,7 @@ Block DictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
case AttributeUnderlyingType::String:
{
column = getColumnFromStringAttribute<StringGetter>(
&DictionaryType::getString, ids, keys, data_types, attribute, *dictionary);
&DictionaryType::getString, ids_to_fill, keys, data_types, attribute, *dictionary);
break;
}
}
@ -343,6 +357,7 @@ Block DictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
return Block(block_columns);
}
template <typename DictionaryType, typename Key>
template <typename AttributeType, typename Getter>
ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromAttribute(
@ -358,12 +373,13 @@ ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromAttribut
return std::move(column_vector);
}
template <typename DictionaryType, typename Key>
template <typename Getter>
ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromStringAttribute(
Getter getter, const PaddedPODArray<Key> & ids_to_fill,
const Columns & keys, const DataTypes & data_types,
const DictionaryAttribute& attribute, const DictionaryType & dict) const
const DictionaryAttribute & attribute, const DictionaryType & dict) const
{
auto column_string = ColumnString::create();
auto ptr = column_string.get();
@ -371,6 +387,7 @@ ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromStringAt
return std::move(column_string);
}
template <typename DictionaryType, typename Key>
ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromIds(const PaddedPODArray<Key> & ids_to_fill) const
{
@ -381,6 +398,7 @@ ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromIds(cons
return std::move(column_vector);
}
template <typename DictionaryType, typename Key>
void DictionaryBlockInputStream<DictionaryType, Key>::fillKeyColumns(
const std::vector<StringRef> & keys, size_t start, size_t size,

View File

@ -4,7 +4,7 @@ namespace DB
{
DictionaryBlockInputStreamBase::DictionaryBlockInputStreamBase(size_t rows_count, size_t max_block_size)
: rows_count(rows_count), max_block_size(max_block_size), next_row(0)
: rows_count(rows_count), max_block_size(max_block_size)
{
}

View File

@ -1,4 +1,5 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
namespace DB
@ -7,8 +8,6 @@ namespace DB
class DictionaryBlockInputStreamBase : public IProfilingBlockInputStream
{
protected:
//Block block;
DictionaryBlockInputStreamBase(size_t rows_count, size_t max_block_size);
virtual Block getBlock(size_t start, size_t length) const = 0;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Dictionaries/Embedded/GeodataProviders/Types.h>
#include <string>
struct RegionEntry
{

View File

@ -44,7 +44,7 @@ class IRegionsNamesDataProvider
{
public:
virtual ILanguageRegionsNamesDataSourcePtr getLanguageRegionsNamesSource(
const std::string& language) const = 0;
const std::string & language) const = 0;
virtual ~IRegionsNamesDataProvider() {}
};

View File

@ -44,7 +44,7 @@ public:
RegionsNamesDataProvider(const std::string & directory_);
ILanguageRegionsNamesDataSourcePtr getLanguageRegionsNamesSource(
const std::string& language) const override;
const std::string & language) const override;
private:
std::string getDataFilePath(const std::string & language) const;

View File

@ -187,7 +187,7 @@ private:
void resize(Attribute & attribute, const Key id);
template <typename T>
void setAttributeValueImpl(Attribute & attribute, const Key id, const T& value);
void setAttributeValueImpl(Attribute & attribute, const Key id, const T & value);
void setAttributeValue(Attribute & attribute, const Key id, const Field & value);

View File

@ -56,8 +56,8 @@ namespace
case Poco::MongoDB::ElementTraits<Int32>::TypeId:
static_cast<ColumnVector<T> &>(column).getData().push_back(static_cast<const Poco::MongoDB::ConcreteElement<Int32> &>(value).value());
break;
case Poco::MongoDB::ElementTraits<Int64>::TypeId:
static_cast<ColumnVector<T> &>(column).getData().push_back(static_cast<const Poco::MongoDB::ConcreteElement<Int64> &>(value).value());
case Poco::MongoDB::ElementTraits<Poco::Int64>::TypeId:
static_cast<ColumnVector<T> &>(column).getData().push_back(static_cast<const Poco::MongoDB::ConcreteElement<Poco::Int64> &>(value).value());
break;
case Poco::MongoDB::ElementTraits<Float64>::TypeId:
static_cast<ColumnVector<T> &>(column).getData().push_back(static_cast<const Poco::MongoDB::ConcreteElement<Float64> &>(value).value());

View File

@ -12,7 +12,7 @@
// only after poco
// naming conflict:
// Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string& value);
// Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value);
// dbms/src/IO/WriteHelpers.h:146 #define writeCString(s, buf)
#include <Dictionaries/MongoDBDictionarySource.h>
#include <Dictionaries/MongoDBBlockInputStream.h>

View File

@ -22,10 +22,10 @@ template <typename DictionaryType, typename Key>
class RangeDictionaryBlockInputStream : public DictionaryBlockInputStreamBase
{
public:
using DictionatyPtr = std::shared_ptr<DictionaryType const>;
using DictionaryPtr = std::shared_ptr<DictionaryType const>;
RangeDictionaryBlockInputStream(
DictionatyPtr dictionary, size_t max_block_size, const Names & column_names, PaddedPODArray<Key> && ids,
DictionaryPtr dictionary, size_t max_block_size, const Names & column_names, PaddedPODArray<Key> && ids,
PaddedPODArray<UInt16> && start_dates, PaddedPODArray<UInt16> && end_dates);
String getName() const override
@ -43,10 +43,10 @@ private:
template <typename AttributeType>
ColumnPtr getColumnFromAttribute(DictionaryGetter<AttributeType> getter,
const PaddedPODArray<Key> & ids, const PaddedPODArray<UInt16> & dates,
const DictionaryAttribute& attribute, const DictionaryType& dictionary) const;
ColumnPtr getColumnFromAttributeString(const PaddedPODArray<Key> & ids, const PaddedPODArray<UInt16> & dates,
const DictionaryAttribute& attribute, const DictionaryType& dictionary) const;
const PaddedPODArray<Key> & ids_to_fill, const PaddedPODArray<UInt16> & dates,
const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
ColumnPtr getColumnFromAttributeString(const PaddedPODArray<Key> & ids_to_fill, const PaddedPODArray<UInt16> & dates,
const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
template <typename T>
ColumnPtr getColumnFromPODArray(const PaddedPODArray<T> & array) const;
@ -54,15 +54,15 @@ private:
void addSpecialColumn(
const std::optional<DictionarySpecialAttribute> & attribute, DataTypePtr type,
const std::string & default_name, const std::unordered_set<std::string> & column_names,
const PaddedPODArray<T> & values, ColumnsWithTypeAndName& columns) const;
const PaddedPODArray<T> & values, ColumnsWithTypeAndName & columns) const;
Block fillBlock(const PaddedPODArray<Key> & ids,
Block fillBlock(const PaddedPODArray<Key> & ids_to_fill,
const PaddedPODArray<UInt16> & start_dates, const PaddedPODArray<UInt16> & end_dates) const;
PaddedPODArray<UInt16> makeDateKey(
const PaddedPODArray<UInt16> & start_dates, const PaddedPODArray<UInt16> & end_dates) const;
DictionatyPtr dictionary;
DictionaryPtr dictionary;
Names column_names;
PaddedPODArray<Key> ids;
PaddedPODArray<UInt16> start_dates;
@ -72,7 +72,7 @@ private:
template <typename DictionaryType, typename Key>
RangeDictionaryBlockInputStream<DictionaryType, Key>::RangeDictionaryBlockInputStream(
DictionatyPtr dictionary, size_t max_column_size, const Names & column_names, PaddedPODArray<Key> && ids,
DictionaryPtr dictionary, size_t max_column_size, const Names & column_names, PaddedPODArray<Key> && ids,
PaddedPODArray<UInt16> && start_dates, PaddedPODArray<UInt16> && end_dates)
: DictionaryBlockInputStreamBase(ids.size(), max_column_size),
dictionary(dictionary), column_names(column_names),
@ -103,21 +103,21 @@ Block RangeDictionaryBlockInputStream<DictionaryType, Key>::getBlock(size_t star
template <typename DictionaryType, typename Key>
template <typename AttributeType>
ColumnPtr RangeDictionaryBlockInputStream<DictionaryType, Key>::getColumnFromAttribute(
DictionaryGetter<AttributeType> getter, const PaddedPODArray<Key> & ids,
const PaddedPODArray<UInt16> & dates, const DictionaryAttribute& attribute, const DictionaryType& dictionary) const
DictionaryGetter<AttributeType> getter, const PaddedPODArray<Key> & ids_to_fill,
const PaddedPODArray<UInt16> & dates, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const
{
auto column_vector = ColumnVector<AttributeType>::create(ids.size());
(dictionary.*getter)(attribute.name, ids, dates, column_vector->getData());
auto column_vector = ColumnVector<AttributeType>::create(ids_to_fill.size());
(dictionary.*getter)(attribute.name, ids_to_fill, dates, column_vector->getData());
return std::move(column_vector);
}
template <typename DictionaryType, typename Key>
ColumnPtr RangeDictionaryBlockInputStream<DictionaryType, Key>::getColumnFromAttributeString(
const PaddedPODArray<Key> & ids, const PaddedPODArray<UInt16> & dates,
const DictionaryAttribute& attribute, const DictionaryType& dictionary) const
const PaddedPODArray<Key> & ids_to_fill, const PaddedPODArray<UInt16> & dates,
const DictionaryAttribute & attribute, const DictionaryType & dictionary) const
{
auto column_string = ColumnString::create();
dictionary.getString(attribute.name, ids, dates, column_string.get());
dictionary.getString(attribute.name, ids_to_fill, dates, column_string.get());
return std::move(column_string);
}
@ -137,7 +137,7 @@ template <typename DictionaryType, typename Key>
template <typename T>
void RangeDictionaryBlockInputStream<DictionaryType, Key>::addSpecialColumn(
const std::optional<DictionarySpecialAttribute> & attribute, DataTypePtr type,
const std::string& default_name, const std::unordered_set<std::string> & column_names,
const std::string & default_name, const std::unordered_set<std::string> & column_names,
const PaddedPODArray<T> & values, ColumnsWithTypeAndName & columns) const
{
std::string name = default_name;
@ -167,15 +167,15 @@ PaddedPODArray<UInt16> RangeDictionaryBlockInputStream<DictionaryType, Key>::mak
template <typename DictionaryType, typename Key>
Block RangeDictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
const PaddedPODArray<Key> & ids,
const PaddedPODArray<Key> & ids_to_fill,
const PaddedPODArray<UInt16> & start_dates, const PaddedPODArray<UInt16> & end_dates) const
{
ColumnsWithTypeAndName columns;
const DictionaryStructure& structure = dictionary->getStructure();
const DictionaryStructure & structure = dictionary->getStructure();
std::unordered_set<std::string> names(column_names.begin(), column_names.end());
addSpecialColumn(structure.id, std::make_shared<DataTypeUInt64>(), "ID", names, ids, columns);
addSpecialColumn(structure.id, std::make_shared<DataTypeUInt64>(), "ID", names, ids_to_fill, columns);
addSpecialColumn(structure.range_min, std::make_shared<DataTypeDate>(), "Range Start", names, start_dates, columns);
addSpecialColumn(structure.range_max, std::make_shared<DataTypeDate>(), "Range End", names, end_dates, columns);
@ -183,12 +183,12 @@ Block RangeDictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
for (const auto idx : ext::range(0, structure.attributes.size()))
{
const DictionaryAttribute& attribute = structure.attributes[idx];
const DictionaryAttribute & attribute = structure.attributes[idx];
if (names.find(attribute.name) != names.end())
{
ColumnPtr column;
#define GET_COLUMN_FORM_ATTRIBUTE(TYPE)\
column = getColumnFromAttribute<TYPE>(&DictionaryType::get##TYPE, ids, date_key, attribute, *dictionary)
column = getColumnFromAttribute<TYPE>(&DictionaryType::get##TYPE, ids_to_fill, date_key, attribute, *dictionary)
switch (attribute.underlying_type)
{
case AttributeUnderlyingType::UInt8:
@ -225,7 +225,7 @@ Block RangeDictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
GET_COLUMN_FORM_ATTRIBUTE(Float64);
break;
case AttributeUnderlyingType::String:
column = getColumnFromAttributeString(ids, date_key, attribute, *dictionary);
column = getColumnFromAttributeString(ids_to_fill, date_key, attribute, *dictionary);
break;
}

View File

@ -376,7 +376,7 @@ void RangeHashedDictionary::getIdsAndDates(PaddedPODArray<Key> & ids,
}
template <typename T>
void RangeHashedDictionary::getIdsAndDates(const Attribute& attribute, PaddedPODArray<Key> & ids,
void RangeHashedDictionary::getIdsAndDates(const Attribute & attribute, PaddedPODArray<Key> & ids,
PaddedPODArray<UInt16> & start_dates, PaddedPODArray<UInt16> & end_dates) const
{
const HashMap<UInt64, Values<T>> & attr = *std::get<Ptr<T>>(attribute.maps);

View File

@ -945,14 +945,13 @@ void FunctionArrayEnumerate::executeImpl(Block & block, const ColumnNumbers & ar
ColumnUInt32::Container & res_values = res_nested->getData();
res_values.resize(array->getData().size());
size_t prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
ColumnArray::Offset prev_off = 0;
for (ColumnArray::Offset i = 0; i < offsets.size(); ++i)
{
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
{
ColumnArray::Offset off = offsets[i];
for (ColumnArray::Offset j = prev_off; j < off; ++j)
res_values[j] = j - prev_off + 1;
}
prev_off = off;
}
@ -1101,13 +1100,13 @@ bool FunctionArrayUniq::executeNumber(const ColumnArray * array, const IColumn *
null_map_data = &static_cast<const ColumnUInt8 *>(null_map)->getData();
Set set;
size_t prev_off = 0;
ColumnArray::Offset prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
set.clear();
bool found_null = false;
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
ColumnArray::Offset off = offsets[i];
for (ColumnArray::Offset j = prev_off; j < off; ++j)
{
if (null_map_data && ((*null_map_data)[j] == 1))
found_null = true;
@ -1147,13 +1146,13 @@ bool FunctionArrayUniq::executeString(const ColumnArray * array, const IColumn *
null_map_data = &static_cast<const ColumnUInt8 *>(null_map)->getData();
Set set;
size_t prev_off = 0;
ColumnArray::Offset prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
set.clear();
bool found_null = false;
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
ColumnArray::Offset off = offsets[i];
for (ColumnArray::Offset j = prev_off; j < off; ++j)
{
if (null_map_data && ((*null_map_data)[j] == 1))
found_null = true;
@ -1209,26 +1208,26 @@ bool FunctionArrayUniq::execute128bit(
/// Each binary blob is inserted into a hash table.
///
Set set;
size_t prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
ColumnArray::Offset prev_off = 0;
for (ColumnArray::Offset i = 0; i < offsets.size(); ++i)
{
set.clear();
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
ColumnArray::Offset off = offsets[i];
for (ColumnArray::Offset j = prev_off; j < off; ++j)
{
if (has_nullable_columns)
{
KeysNullMap<UInt128> bitmap{};
for (size_t i = 0; i < columns.size(); ++i)
for (ColumnArray::Offset i = 0; i < columns.size(); ++i)
{
if (null_maps[i])
{
const auto & null_map = static_cast<const ColumnUInt8 &>(*null_maps[i]).getData();
if (null_map[j] == 1)
{
size_t bucket = i / 8;
size_t offset = i % 8;
ColumnArray::Offset bucket = i / 8;
ColumnArray::Offset offset = i % 8;
bitmap[bucket] |= UInt8(1) << offset;
}
}
@ -1257,12 +1256,12 @@ void FunctionArrayUniq::executeHashed(
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
Set set;
size_t prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
ColumnArray::Offset prev_off = 0;
for (ColumnArray::Offset i = 0; i < offsets.size(); ++i)
{
set.clear();
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
ColumnArray::Offset off = offsets[i];
for (ColumnArray::Offset j = prev_off; j < off; ++j)
set.insert(hash128(j, count, columns));
res_values[i] = set.size();
@ -1308,9 +1307,6 @@ void FunctionArrayDistinct::executeImpl(Block & block, const ColumnNumbers & arg
const IColumn & src_data = array->getData();
const ColumnArray::Offsets & offsets = array->getOffsets();
ColumnRawPtrs original_data_columns;
original_data_columns.push_back(&src_data);
IColumn & res_data = res.getData();
ColumnArray::Offsets & res_offsets = res.getOffsets();
@ -1339,13 +1335,14 @@ void FunctionArrayDistinct::executeImpl(Block & block, const ColumnNumbers & arg
|| executeNumber<Float32>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeNumber<Float64>(*inner_col, offsets, res_data, res_offsets, nullable_col)
|| executeString(*inner_col, offsets, res_data, res_offsets, nullable_col)))
executeHashed(offsets, original_data_columns, res_data, res_offsets, nullable_col);
executeHashed(*inner_col, offsets, res_data, res_offsets, nullable_col);
block.getByPosition(result).column = std::move(res_ptr);
}
template <typename T>
bool FunctionArrayDistinct::executeNumber(const IColumn & src_data,
bool FunctionArrayDistinct::executeNumber(
const IColumn & src_data,
const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
ColumnArray::Offsets & res_offsets,
@ -1364,9 +1361,7 @@ bool FunctionArrayDistinct::executeNumber(const IColumn & src_data,
const PaddedPODArray<UInt8> * src_null_map = nullptr;
if (nullable_col)
{
src_null_map = &static_cast<const ColumnUInt8 *>(&nullable_col->getNullMapColumn())->getData();
}
using Set = ClearableHashSet<T,
DefaultHash<T>,
@ -1374,22 +1369,31 @@ bool FunctionArrayDistinct::executeNumber(const IColumn & src_data,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
Set set;
size_t prev_off = 0;
for (size_t i = 0; i < src_offsets.size(); ++i)
ColumnArray::Offset prev_src_offset = 0;
ColumnArray::Offset res_offset = 0;
for (ColumnArray::Offset i = 0; i < src_offsets.size(); ++i)
{
set.clear();
size_t off = src_offsets[i];
for (size_t j = prev_off; j < off; ++j)
ColumnArray::Offset curr_src_offset = src_offsets[i];
for (ColumnArray::Offset j = prev_src_offset; j < curr_src_offset; ++j)
{
if ((set.find(values[j]) == set.end()) && (!nullable_col || (*src_null_map)[j] == 0))
if (nullable_col && (*src_null_map)[j])
continue;
if (set.find(values[j]) == set.end())
{
res_data.emplace_back(values[j]);
set.insert(values[j]);
}
}
res_offsets.emplace_back(set.size() + prev_off);
prev_off = off;
res_offset += set.size();
res_offsets.emplace_back(res_offset);
prev_src_offset = curr_src_offset;
}
return true;
}
@ -1404,9 +1408,7 @@ bool FunctionArrayDistinct::executeString(
const ColumnString * src_data_concrete = checkAndGetColumn<ColumnString>(&src_data);
if (!src_data_concrete)
{
return false;
}
ColumnString & res_data_column_string = typeid_cast<ColumnString &>(res_data_col);
@ -1418,70 +1420,86 @@ bool FunctionArrayDistinct::executeString(
const PaddedPODArray<UInt8> * src_null_map = nullptr;
if (nullable_col)
{
src_null_map = &static_cast<const ColumnUInt8 *>(&nullable_col->getNullMapColumn())->getData();
}
Set set;
size_t prev_off = 0;
for (size_t i = 0; i < src_offsets.size(); ++i)
ColumnArray::Offset prev_src_offset = 0;
ColumnArray::Offset res_offset = 0;
for (ColumnArray::Offset i = 0; i < src_offsets.size(); ++i)
{
set.clear();
size_t off = src_offsets[i];
for (size_t j = prev_off; j < off; ++j)
ColumnArray::Offset curr_src_offset = src_offsets[i];
for (ColumnArray::Offset j = prev_src_offset; j < curr_src_offset; ++j)
{
if (nullable_col && (*src_null_map)[j])
continue;
StringRef str_ref = src_data_concrete->getDataAt(j);
if (set.find(str_ref) == set.end() && (!nullable_col || (*src_null_map)[j] == 0))
if (set.find(str_ref) == set.end())
{
set.insert(str_ref);
res_data_column_string.insertData(str_ref.data, str_ref.size);
}
}
res_offsets.emplace_back(set.size() + prev_off);
prev_off = off;
res_offset += set.size();
res_offsets.emplace_back(res_offset);
prev_src_offset = curr_src_offset;
}
return true;
}
void FunctionArrayDistinct::executeHashed(
const ColumnArray::Offsets & offsets,
const ColumnRawPtrs & columns,
const IColumn & src_data,
const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
ColumnArray::Offsets & res_offsets,
const ColumnNullable * nullable_col)
{
size_t count = columns.size();
using Set = ClearableHashSet<UInt128, UInt128TrivialHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
const PaddedPODArray<UInt8> * src_null_map = nullptr;
if (nullable_col)
{
src_null_map = &static_cast<const ColumnUInt8 *>(&nullable_col->getNullMapColumn())->getData();
}
Set set;
size_t prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
ColumnArray::Offset prev_src_offset = 0;
ColumnArray::Offset res_offset = 0;
for (ColumnArray::Offset i = 0; i < src_offsets.size(); ++i)
{
set.clear();
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
ColumnArray::Offset curr_src_offset = src_offsets[i];
for (ColumnArray::Offset j = prev_src_offset; j < curr_src_offset; ++j)
{
auto hash = hash128(j, count, columns);
if (set.find(hash) == set.end() && (!nullable_col || (*src_null_map)[j] == 0))
if (nullable_col && (*src_null_map)[j])
continue;
UInt128 hash;
SipHash hash_function;
src_data.updateHashWithValue(j, hash_function);
hash_function.get128(reinterpret_cast<char *>(&hash));
if (set.find(hash) == set.end())
{
set.insert(hash);
res_data_col.insertFrom(*columns[0], j);
res_data_col.insertFrom(src_data, j);
}
}
res_offsets.emplace_back(set.size() + prev_off);
prev_off = off;
res_offset += set.size();
res_offsets.emplace_back(res_offset);
prev_src_offset = curr_src_offset;
}
}

View File

@ -1252,8 +1252,8 @@ private:
const ColumnNullable * nullable_col);
void executeHashed(
const ColumnArray::Offsets & offsets,
const ColumnRawPtrs & columns,
const IColumn & src_data,
const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
ColumnArray::Offsets & res_offsets,
const ColumnNullable * nullable_col);

View File

@ -72,7 +72,7 @@ struct HalfMD5Impl
union
{
unsigned char char_data[16];
Poco::UInt64 uint64_data;
uint64_t uint64_data;
} buf;
MD5_CTX ctx;

View File

@ -234,8 +234,8 @@ void PreparedFunctionImpl::executeWithoutColumnsWithDictionary(Block & block, co
executeImpl(block, args, result, input_rows_count);
}
static ColumnPtr replaceColumnsWithDictionaryByNestedAndGetDictionaryIndexes(Block & block, const ColumnNumbers & args,
bool can_be_executed_on_default_arguments)
static ColumnPtr replaceColumnsWithDictionaryByNestedAndGetDictionaryIndexes(
Block & block, const ColumnNumbers & args, bool can_be_executed_on_default_arguments)
{
size_t num_rows = 0;
ColumnPtr indexes;

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <boost/noncopyable.hpp>
#include <Common/Exception.h>

View File

@ -1,6 +1,8 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <boost/noncopyable.hpp>
/// https://stackoverflow.com/questions/20759750/resolving-redefinition-of-timespec-in-time-h
#define timespec linux_timespec

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <Common/Exception.h>
#include <common/logger_useful.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <ext/singleton.h>
#include <condition_variable>

View File

@ -13,14 +13,14 @@ class LimitReadBuffer : public ReadBuffer
{
private:
ReadBuffer & in;
UInt64 limit;
size_t limit;
bool throw_exception;
std::string exception_message;
bool nextImpl() override;
public:
LimitReadBuffer(ReadBuffer & in, UInt64 limit, bool throw_exception, std::string exception_message = {});
LimitReadBuffer(ReadBuffer & in, size_t limit, bool throw_exception, std::string exception_message = {});
~LimitReadBuffer() override;
};

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/ReadBufferAIO.h>
#include <IO/AIOContextPool.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBuffer.h>

View File

@ -67,8 +67,9 @@ bool ReadBufferFromFileDescriptor::nextImpl()
if (res > 0)
bytes_read += res;
/// NOTE: it is quite inaccurate on high loads since the thread could be replaced by another one and we will count cpu time of other thread
/// It is better to use taskstats::blkio_delay_total, but it is quite expensive to get it (TaskStatsInfoGetter has about 500K RPS)
/// It reports real time spent including the time spent while thread was preempted doing nothing.
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it (TaskStatsInfoGetter has about 500K RPS).
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());

View File

@ -870,8 +870,10 @@ void readException(Exception & e, ReadBuffer & buf, const String & additional_me
if (name != "DB::Exception")
out << name << ". ";
out << message
<< ". Stack trace:\n\n" << stack_trace;
out << message << ".";
if (!stack_trace.empty())
out << " Stack trace:\n\n" << stack_trace;
if (has_nested)
{

View File

@ -1,4 +1,4 @@
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/WriteBufferAIO.h>
#include <Common/ProfileEvents.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#if defined(__linux__)
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBuffer.h>

View File

@ -49,18 +49,22 @@ void formatUUID(std::reverse_iterator<const UInt8 *> src16, UInt8 * dst36)
void writeException(const Exception & e, WriteBuffer & buf)
void writeException(const Exception & e, WriteBuffer & buf, bool with_stack_trace)
{
writeBinary(e.code(), buf);
writeBinary(String(e.name()), buf);
writeBinary(e.displayText(), buf);
writeBinary(e.getStackTrace().toString(), buf);
if (with_stack_trace)
writeBinary(e.getStackTrace().toString(), buf);
else
writeBinary(String(), buf);
bool has_nested = e.nested() != nullptr;
writeBinary(has_nested, buf);
if (has_nested)
writeException(Exception(*e.nested()), buf);
writeException(Exception(*e.nested()), buf, with_stack_trace);
}
}

View File

@ -823,7 +823,7 @@ void writeText(const std::vector<T> & x, WriteBuffer & buf)
/// Serialize exception (so that it can be transferred over the network)
void writeException(const Exception & e, WriteBuffer & buf);
void writeException(const Exception & e, WriteBuffer & buf, bool with_stack_trace);
/// An easy-to-use method for converting something to a string in text form.

View File

@ -1,6 +1,6 @@
#include <IO/createReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
#include <IO/ReadBufferAIO.h>
#endif
#include <Common/ProfileEvents.h>
@ -14,10 +14,10 @@ namespace ProfileEvents
namespace DB
{
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(_MSC_VER)
#if !defined(__linux__)
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NOT_IMPLEMENTED;
}
#endif
@ -31,7 +31,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(const std::
}
else
{
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO);
return std::make_unique<ReadBufferAIO>(filename_, buffer_size_, flags_, existing_memory_);
#else

View File

@ -1,6 +1,6 @@
#include <IO/createWriteBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
#include <IO/WriteBufferAIO.h>
#endif
#include <Common/ProfileEvents.h>
@ -15,10 +15,10 @@ namespace ProfileEvents
namespace DB
{
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(_MSC_VER)
#if !defined(__linux__)
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NOT_IMPLEMENTED;
}
#endif
@ -33,7 +33,7 @@ WriteBufferFromFileBase * createWriteBufferFromFileBase(const std::string & file
}
else
{
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(_MSC_VER)
#if defined(__linux__)
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
return new WriteBufferAIO(filename_, buffer_size_, flags_, mode, existing_memory_);
#else

View File

@ -8,11 +8,11 @@
abcdefghijklmn!@Aab#AAabcdefghijklmn$%
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
* There are two lines. First line make sense. Second line contains padding to make file size large enough.
* Compile with
* Compile with
* cmake -D SANITIZE=address
* and run:
./zlib_ng_bug data2.bin
./zlib_ng_bug data2.bin
=================================================================
==204952==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x6310000147ff at pc 0x000000596d7a bp 0x7ffd139edd50 sp 0x7ffd139edd48
READ of size 1 at 0x6310000147ff thread T0

View File

@ -632,7 +632,7 @@ void Context::checkDatabaseAccessRightsImpl(const std::string & database_name) c
return;
}
if (!shared->security_manager->hasAccessToDatabase(client_info.current_user, database_name))
throw Exception("Access denied to database " + database_name, ErrorCodes::DATABASE_ACCESS_DENIED);
throw Exception("Access denied to database " + database_name + " for user " + client_info.current_user , ErrorCodes::DATABASE_ACCESS_DENIED);
}
void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where)

View File

@ -66,6 +66,7 @@
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Interpreters/evaluateQualified.h>
namespace DB
@ -164,35 +165,6 @@ void removeDuplicateColumns(NamesAndTypesList & columns)
}
String DatabaseAndTableWithAlias::getQualifiedNamePrefix() const
{
return (!alias.empty() ? alias : (database + '.' + table)) + '.';
}
void DatabaseAndTableWithAlias::makeQualifiedName(const ASTPtr & ast) const
{
if (auto identifier = typeid_cast<ASTIdentifier *>(ast.get()))
{
String prefix = getQualifiedNamePrefix();
identifier->name.insert(identifier->name.begin(), prefix.begin(), prefix.end());
Names qualifiers;
if (!alias.empty())
qualifiers.push_back(alias);
else
{
qualifiers.push_back(database);
qualifiers.push_back(table);
}
for (const auto & qualifier : qualifiers)
identifier->children.emplace_back(std::make_shared<ASTIdentifier>(qualifier));
}
}
ExpressionAnalyzer::ExpressionAnalyzer(
const ASTPtr & ast_,
const Context & context_,
@ -274,7 +246,7 @@ ExpressionAnalyzer::ExpressionAnalyzer(
getArrayJoinedColumns();
/// Push the predicate expression down to the subqueries.
rewrite_subqueries = PredicateExpressionsOptimizer(select_query, settings).optimize();
rewrite_subqueries = PredicateExpressionsOptimizer(select_query, settings, context).optimize();
/// Delete the unnecessary from `source_columns` list. Create `unknown_required_source_columns`. Form `columns_added_by_join`.
collectUsedColumns();
@ -293,46 +265,6 @@ ExpressionAnalyzer::ExpressionAnalyzer(
analyzeAggregation();
}
static DatabaseAndTableWithAlias getTableNameWithAliasFromTableExpression(const ASTTableExpression & table_expression,
const Context & context)
{
DatabaseAndTableWithAlias database_and_table_with_alias;
if (table_expression.database_and_table_name)
{
const auto & identifier = static_cast<const ASTIdentifier &>(*table_expression.database_and_table_name);
database_and_table_with_alias.alias = identifier.tryGetAlias();
if (table_expression.database_and_table_name->children.empty())
{
database_and_table_with_alias.database = context.getCurrentDatabase();
database_and_table_with_alias.table = identifier.name;
}
else
{
if (table_expression.database_and_table_name->children.size() != 2)
throw Exception("Logical error: number of components in table expression not equal to two", ErrorCodes::LOGICAL_ERROR);
database_and_table_with_alias.database = static_cast<const ASTIdentifier &>(*identifier.children[0]).name;
database_and_table_with_alias.table = static_cast<const ASTIdentifier &>(*identifier.children[1]).name;
}
}
else if (table_expression.table_function)
{
database_and_table_with_alias.alias = table_expression.table_function->tryGetAlias();
}
else if (table_expression.subquery)
{
database_and_table_with_alias.alias = table_expression.subquery->tryGetAlias();
}
else
throw Exception("Logical error: no known elements in ASTTableExpression", ErrorCodes::LOGICAL_ERROR);
return database_and_table_with_alias;
}
void ExpressionAnalyzer::translateQualifiedNames()
{
if (!select_query || !select_query->tables || select_query->tables->children.empty())
@ -357,80 +289,6 @@ void ExpressionAnalyzer::translateQualifiedNames()
translateQualifiedNamesImpl(ast, tables);
}
/// Get the number of components of identifier which are correspond to 'alias.', 'table.' or 'databas.table.' from names.
static size_t getNumComponentsToStripInOrderToTranslateQualifiedName(const ASTIdentifier & identifier,
const DatabaseAndTableWithAlias & names)
{
size_t num_qualifiers_to_strip = 0;
auto get_identifier_name = [](const ASTPtr & ast) { return static_cast<const ASTIdentifier &>(*ast).name; };
/// It is compound identifier
if (!identifier.children.empty())
{
size_t num_components = identifier.children.size();
/// database.table.column
if (num_components >= 3
&& !names.database.empty()
&& get_identifier_name(identifier.children[0]) == names.database
&& get_identifier_name(identifier.children[1]) == names.table)
{
num_qualifiers_to_strip = 2;
}
/// table.column or alias.column. If num_components > 2, it is like table.nested.column.
if (num_components >= 2
&& ((!names.table.empty() && get_identifier_name(identifier.children[0]) == names.table)
|| (!names.alias.empty() && get_identifier_name(identifier.children[0]) == names.alias)))
{
num_qualifiers_to_strip = 1;
}
}
return num_qualifiers_to_strip;
}
/// Checks that ast is ASTIdentifier and remove num_qualifiers_to_strip components from left.
/// Example: 'database.table.name' -> (num_qualifiers_to_strip = 2) -> 'name'.
static void stripIdentifier(ASTPtr & ast, size_t num_qualifiers_to_strip)
{
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(ast.get());
if (!identifier)
throw Exception("ASTIdentifier expected for stripIdentifier", ErrorCodes::LOGICAL_ERROR);
if (num_qualifiers_to_strip)
{
size_t num_components = identifier->children.size();
/// plain column
if (num_components - num_qualifiers_to_strip == 1)
{
String node_alias = identifier->tryGetAlias();
ast = identifier->children.back();
if (!node_alias.empty())
ast->setAlias(node_alias);
}
else
/// nested column
{
identifier->children.erase(identifier->children.begin(), identifier->children.begin() + num_qualifiers_to_strip);
String new_name;
for (const auto & child : identifier->children)
{
if (!new_name.empty())
new_name += '.';
new_name += static_cast<const ASTIdentifier &>(*child.get()).name;
}
identifier->name = new_name;
}
}
}
void ExpressionAnalyzer::translateQualifiedNamesImpl(ASTPtr & ast, const std::vector<DatabaseAndTableWithAlias> & tables)
{
if (auto * identifier = typeid_cast<ASTIdentifier *>(ast.get()))
@ -509,7 +367,6 @@ void ExpressionAnalyzer::translateQualifiedNamesImpl(ASTPtr & ast, const std::ve
}
}
void ExpressionAnalyzer::optimizeIfWithConstantCondition()
{
optimizeIfWithConstantConditionImpl(ast, aliases);
@ -566,10 +423,13 @@ void ExpressionAnalyzer::optimizeIfWithConstantConditionImpl(ASTPtr & current_as
optimizeIfWithConstantConditionImpl(function_node->arguments, aliases);
ASTExpressionList * args = typeid_cast<ASTExpressionList *>(function_node->arguments.get());
ASTPtr condition_expr = args->children.at(0);
ASTPtr then_expr = args->children.at(1);
ASTPtr else_expr = args->children.at(2);
if (args->children.size() != 3)
throw Exception("Wrong number of arguments for function 'if' (" + toString(args->children.size()) + " instead of 3)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTPtr condition_expr = args->children[0];
ASTPtr then_expr = args->children[1];
ASTPtr else_expr = args->children[2];
bool condition;
if (tryExtractConstValueFromCondition(condition_expr, condition))
@ -765,23 +625,6 @@ void ExpressionAnalyzer::findExternalTables(ASTPtr & ast)
external_tables[node->name] = external_storage;
}
static std::pair<String, String> getDatabaseAndTableNameFromIdentifier(const ASTIdentifier & identifier)
{
std::pair<String, String> res;
res.second = identifier.name;
if (!identifier.children.empty())
{
if (identifier.children.size() != 2)
throw Exception("Qualified table name could have only two components", ErrorCodes::LOGICAL_ERROR);
res.first = typeid_cast<const ASTIdentifier &>(*identifier.children[0]).name;
res.second = typeid_cast<const ASTIdentifier &>(*identifier.children[1]).name;
}
return res;
}
static std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
const ASTPtr & subquery_or_table_name, const Context & context, size_t subquery_depth, const Names & required_source_columns)
{

View File

@ -4,6 +4,7 @@
#include <Interpreters/Settings.h>
#include <Core/Block.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/evaluateQualified.h>
#include <Interpreters/ProjectionManipulation.h>
#include <Parsers/StringRange.h>
#include <Parsers/ASTTablesInSelectQuery.h>
@ -91,19 +92,6 @@ struct ScopeStack
const Block & getSampleBlock() const;
};
struct DatabaseAndTableWithAlias
{
String database;
String table;
String alias;
/// "alias." or "database.table." if alias is empty
String getQualifiedNamePrefix() const;
/// If ast is ASTIdentifier, prepend getQualifiedNamePrefix() to it's name.
void makeQualifiedName(const ASTPtr & ast) const;
};
/** Transforms an expression from a syntax tree into a sequence of actions to execute it.
*
* NOTE: if `ast` is a SELECT query from a table, the structure of this table should not change during the lifetime of ExpressionAnalyzer.

View File

@ -114,7 +114,7 @@ static llvm::TargetMachine * getNativeMachine()
llvm::SubtargetFeatures features;
llvm::StringMap<bool> feature_map;
if (llvm::sys::getHostCPUFeatures(feature_map))
for (auto& f : feature_map)
for (auto & f : feature_map)
features.AddFeature(f.first(), f.second);
llvm::TargetOptions options;
return target->createTargetMachine(
@ -545,7 +545,7 @@ public:
}
};
static bool isCompilable(llvm::IRBuilderBase & builder, const IFunctionBase& function)
static bool isCompilable(llvm::IRBuilderBase & builder, const IFunctionBase & function)
{
if (!toNativeType(builder, function.getReturnType()))
return false;

View File

@ -2,11 +2,11 @@
#include <Storages/IStorage.h>
#include <Interpreters/PredicateExpressionsOptimizer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/queryToString.h>
#include <iostream>
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTQualifiedAsterisk.h>
#include <Parsers/queryToString.h>
namespace DB
{
@ -14,18 +14,18 @@ namespace DB
static constexpr auto and_function_name = "and";
PredicateExpressionsOptimizer::PredicateExpressionsOptimizer(
ASTSelectQuery * ast_select_, const Settings & settings_)
: ast_select(ast_select_), settings(settings_)
ASTSelectQuery * ast_select_, const Settings & settings_, const Context & context_)
: ast_select(ast_select_), settings(settings_), context(context_)
{
}
bool PredicateExpressionsOptimizer::optimize()
{
if (!settings.enable_optimize_predicate_expression || !ast_select || !ast_select->tables)
if (!settings.enable_optimize_predicate_expression || !ast_select || !ast_select->tables || ast_select->tables->children.empty())
return false;
SubqueriesProjectionColumns all_subquery_projection_columns;
getAllSubqueryProjectionColumns(ast_select->tables.get(), all_subquery_projection_columns);
getAllSubqueryProjectionColumns(all_subquery_projection_columns);
bool is_rewrite_subqueries = false;
if (!all_subquery_projection_columns.empty())
@ -42,11 +42,16 @@ bool PredicateExpressionsOptimizer::optimizeImpl(
/// split predicate with `and`
PredicateExpressions outer_predicate_expressions = splitConjunctionPredicate(outer_expression);
std::vector<ASTTableExpression *> tables_expression = getSelectTablesExpression(ast_select);
std::vector<DatabaseAndTableWithAlias> database_and_table_with_aliases;
for (const auto & table_expression : tables_expression)
database_and_table_with_aliases.emplace_back(getTableNameWithAliasFromTableExpression(*table_expression, context));
bool is_rewrite_subquery = false;
for (const auto & outer_predicate : outer_predicate_expressions)
{
ASTs outer_predicate_dependent;
getExpressionDependentColumns(outer_predicate, outer_predicate_dependent);
IdentifiersWithQualifiedNameSet outer_predicate_dependencies;
getDependenciesAndQualifiedOfExpression(outer_predicate, outer_predicate_dependencies, database_and_table_with_aliases);
/// TODO: remove origin expression
for (const auto & subquery_projection_columns : subqueries_projection_columns)
@ -55,10 +60,10 @@ bool PredicateExpressionsOptimizer::optimizeImpl(
const ProjectionsWithAliases projection_columns = subquery_projection_columns.second;
OptimizeKind optimize_kind = OptimizeKind::NONE;
if (!cannotPushDownOuterPredicate(projection_columns, subquery, outer_predicate_dependent, is_prewhere, optimize_kind))
if (!cannotPushDownOuterPredicate(projection_columns, subquery, outer_predicate_dependencies, is_prewhere, optimize_kind))
{
ASTPtr inner_predicate;
cloneOuterPredicateForInnerPredicate(outer_predicate, projection_columns, outer_predicate_dependent, inner_predicate);
cloneOuterPredicateForInnerPredicate(outer_predicate, projection_columns, database_and_table_with_aliases, inner_predicate);
switch(optimize_kind)
{
@ -109,34 +114,57 @@ PredicateExpressions PredicateExpressionsOptimizer::splitConjunctionPredicate(AS
return predicate_expressions;
}
void PredicateExpressionsOptimizer::getExpressionDependentColumns(const ASTPtr & expression, ASTs & expression_dependent_columns)
void PredicateExpressionsOptimizer::getDependenciesAndQualifiedOfExpression(const ASTPtr & expression,
IdentifiersWithQualifiedNameSet & dependencies_and_qualified,
std::vector<DatabaseAndTableWithAlias> & tables_with_aliases)
{
if (!typeid_cast<ASTIdentifier *>(expression.get()))
if (const auto identifier = typeid_cast<ASTIdentifier *>(expression.get()))
{
if (!identifier->children.empty())
dependencies_and_qualified.emplace_back(std::pair(identifier, expression->getAliasOrColumnName()));
else
{
size_t best_table_pos = 0;
size_t max_num_qualifiers_to_strip = 0;
/// translate qualifiers for dependent columns
for (size_t table_pos = 0; table_pos < tables_with_aliases.size(); ++table_pos)
{
const auto & table = tables_with_aliases[table_pos];
auto num_qualifiers_to_strip = getNumComponentsToStripInOrderToTranslateQualifiedName(*identifier, table);
if (num_qualifiers_to_strip > max_num_qualifiers_to_strip)
{
max_num_qualifiers_to_strip = num_qualifiers_to_strip;
best_table_pos = table_pos;
}
}
String qualified_name = tables_with_aliases[best_table_pos].getQualifiedNamePrefix() + expression->getAliasOrColumnName();
dependencies_and_qualified.emplace_back(std::pair(identifier, qualified_name));
}
}
else
{
for (const auto & child : expression->children)
getExpressionDependentColumns(child, expression_dependent_columns);
return;
getDependenciesAndQualifiedOfExpression(child, dependencies_and_qualified, tables_with_aliases);
}
expression_dependent_columns.emplace_back(expression);
}
bool PredicateExpressionsOptimizer::cannotPushDownOuterPredicate(
const ProjectionsWithAliases & subquery_projection_columns, ASTSelectQuery * subquery,
ASTs & expression_dependent_columns, bool & is_prewhere, OptimizeKind & optimize_kind)
IdentifiersWithQualifiedNameSet & outer_predicate_dependencies, bool & is_prewhere, OptimizeKind & optimize_kind)
{
if (subquery->final() || subquery->limit_by_expression_list || subquery->limit_offset || subquery->with_expression_list)
if (subquery->final() || subquery->limit_by_expression_list || subquery->limit_length || subquery->with_expression_list)
return true;
for (auto & dependent_column : expression_dependent_columns)
for (auto & predicate_dependency : outer_predicate_dependencies)
{
bool is_found = false;
String dependent_column_name = dependent_column->getAliasOrColumnName();
for (auto projection_column : subquery_projection_columns)
{
if (projection_column.second == dependent_column_name)
if (projection_column.second == predicate_dependency.second)
{
is_found = true;
optimize_kind = isAggregateFunction(projection_column.first) ? OptimizeKind::PUSH_TO_HAVING : optimize_kind;
@ -168,39 +196,21 @@ bool PredicateExpressionsOptimizer::isAggregateFunction(ASTPtr & node)
return false;
}
void PredicateExpressionsOptimizer::getAllSubqueryProjectionColumns(IAST * node, SubqueriesProjectionColumns & all_subquery_projection_columns)
{
if (auto ast_subquery = typeid_cast<ASTSubquery *>(node))
{
ASTs output_projection;
IAST * subquery = ast_subquery->children.at(0).get();
getSubqueryProjectionColumns(subquery, all_subquery_projection_columns, output_projection);
return;
}
for (auto & child : node->children)
getAllSubqueryProjectionColumns(child.get(), all_subquery_projection_columns);
}
void PredicateExpressionsOptimizer::cloneOuterPredicateForInnerPredicate(
const ASTPtr & outer_predicate, const ProjectionsWithAliases & projection_columns, ASTs & predicate_dependent_columns,
ASTPtr & inner_predicate)
const ASTPtr & outer_predicate, const ProjectionsWithAliases & projection_columns,
std::vector<DatabaseAndTableWithAlias> & tables, ASTPtr & inner_predicate)
{
inner_predicate = outer_predicate->clone();
ASTs new_expression_require_columns;
new_expression_require_columns.reserve(predicate_dependent_columns.size());
getExpressionDependentColumns(inner_predicate, new_expression_require_columns);
IdentifiersWithQualifiedNameSet new_expression_requires;
getDependenciesAndQualifiedOfExpression(inner_predicate, new_expression_requires, tables);
for (auto & expression : new_expression_require_columns)
for (auto & require : new_expression_requires)
{
if (auto identifier = typeid_cast<ASTIdentifier *>(expression.get()))
for (auto projection : projection_columns)
{
for (auto projection : projection_columns)
{
if (identifier->name == projection.second)
identifier->name = projection.first->getAliasOrColumnName();
}
if (require.second == projection.second)
require.first->name = projection.first->getAliasOrColumnName();
}
}
}
@ -221,32 +231,159 @@ bool PredicateExpressionsOptimizer::optimizeExpression(const ASTPtr & outer_expr
return true;
}
void PredicateExpressionsOptimizer::getSubqueryProjectionColumns(IAST * subquery, SubqueriesProjectionColumns & all_subquery_projection_columns, ASTs & output_projections)
void PredicateExpressionsOptimizer::getAllSubqueryProjectionColumns(SubqueriesProjectionColumns & all_subquery_projection_columns)
{
if (auto * with_union_subquery = typeid_cast<ASTSelectWithUnionQuery *>(subquery))
for (auto & select : with_union_subquery->list_of_selects->children)
getSubqueryProjectionColumns(select.get(), all_subquery_projection_columns, output_projections);
const auto tables_expression = getSelectTablesExpression(ast_select);
if (auto * without_union_subquery = typeid_cast<ASTSelectQuery *>(subquery))
for (const auto & table_expression : tables_expression)
{
const auto expression_list = without_union_subquery->select_expression_list->children;
/// use first projection as the output projection
if (output_projections.empty())
output_projections = expression_list;
if (output_projections.size() != expression_list.size())
throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
ProjectionsWithAliases subquery_projections;
subquery_projections.reserve(expression_list.size());
for (size_t idx = 0; idx < expression_list.size(); idx++)
subquery_projections.emplace_back(std::pair(expression_list.at(idx), output_projections.at(idx)->getAliasOrColumnName()));
all_subquery_projection_columns.insert(std::pair(subquery, subquery_projections));
if (table_expression->subquery)
{
/// Use qualifiers to translate the columns of subqueries
const auto database_and_table_with_alias = getTableNameWithAliasFromTableExpression(*table_expression, context);
String qualified_name_prefix = database_and_table_with_alias.getQualifiedNamePrefix();
getSubqueryProjectionColumns(all_subquery_projection_columns, qualified_name_prefix,
static_cast<const ASTSubquery *>(table_expression->subquery.get())->children[0]);
}
}
}
void PredicateExpressionsOptimizer::getSubqueryProjectionColumns(SubqueriesProjectionColumns & all_subquery_projection_columns,
String & qualified_name_prefix, const ASTPtr & subquery)
{
ASTs select_with_union_projections;
auto select_with_union_query = static_cast<ASTSelectWithUnionQuery *>(subquery.get());
for (auto & select_without_union_query : select_with_union_query->list_of_selects->children)
{
ProjectionsWithAliases subquery_projections;
auto select_projection_columns = getSelectQueryProjectionColumns(select_without_union_query);
if (!select_projection_columns.empty())
{
if (select_with_union_projections.empty())
select_with_union_projections = select_projection_columns;
for (size_t i = 0; i < select_projection_columns.size(); i++)
subquery_projections.emplace_back(std::pair(select_projection_columns[i],
qualified_name_prefix + select_with_union_projections[i]->getAliasOrColumnName()));
all_subquery_projection_columns.insert(std::pair(select_without_union_query.get(), subquery_projections));
}
}
}
ASTs PredicateExpressionsOptimizer::getSelectQueryProjectionColumns(ASTPtr & ast)
{
ASTs projection_columns;
auto select_query = static_cast<ASTSelectQuery *>(ast.get());
for (const auto & projection_column : select_query->select_expression_list->children)
{
if (typeid_cast<ASTAsterisk *>(projection_column.get()) || typeid_cast<ASTQualifiedAsterisk *>(projection_column.get()))
{
ASTs evaluated_columns = evaluateAsterisk(select_query, projection_column);
for (const auto & column : evaluated_columns)
projection_columns.emplace_back(column);
continue;
}
projection_columns.emplace_back(projection_column);
}
return projection_columns;
}
ASTs PredicateExpressionsOptimizer::evaluateAsterisk(ASTSelectQuery * select_query, const ASTPtr & asterisk)
{
/// SELECT *, SELECT dummy, SELECT 1 AS id
if (!select_query->tables || select_query->tables->children.empty())
return {};
std::vector<ASTTableExpression *> tables_expression = getSelectTablesExpression(select_query);
if (const auto qualified_asterisk = typeid_cast<ASTQualifiedAsterisk *>(asterisk.get()))
{
if (qualified_asterisk->children.size() != 1)
throw Exception("Logical error: qualified asterisk must have exactly one child", ErrorCodes::LOGICAL_ERROR);
ASTIdentifier * ident = typeid_cast<ASTIdentifier *>(qualified_asterisk->children[0].get());
if (!ident)
throw Exception("Logical error: qualified asterisk must have identifier as its child", ErrorCodes::LOGICAL_ERROR);
size_t num_components = ident->children.size();
if (num_components > 2)
throw Exception("Qualified asterisk cannot have more than two qualifiers", ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
for (auto it = tables_expression.begin(); it != tables_expression.end(); ++it)
{
const ASTTableExpression * table_expression = *it;
const auto database_and_table_with_alias = getTableNameWithAliasFromTableExpression(*table_expression, context);
/// database.table.*
if (num_components == 2 && !database_and_table_with_alias.database.empty()
&& static_cast<const ASTIdentifier &>(*ident->children[0]).name == database_and_table_with_alias.database
&& static_cast<const ASTIdentifier &>(*ident->children[1]).name == database_and_table_with_alias.table)
continue;
/// table.* or alias.*
else if (num_components == 0
&& ((!database_and_table_with_alias.table.empty() && ident->name == database_and_table_with_alias.table)
|| (!database_and_table_with_alias.alias.empty() && ident->name == database_and_table_with_alias.alias)))
continue;
else
/// It's not a required table
tables_expression.erase(it);
}
}
ASTs projection_columns;
for (auto & table_expression : tables_expression)
{
if (table_expression->subquery)
{
const auto subquery = static_cast<const ASTSubquery *>(table_expression->subquery.get());
const auto select_with_union_query = static_cast<ASTSelectWithUnionQuery *>(subquery->children[0].get());
const auto subquery_projections = getSelectQueryProjectionColumns(select_with_union_query->list_of_selects->children[0]);
projection_columns.insert(projection_columns.end(), subquery_projections.begin(), subquery_projections.end());
}
else
{
StoragePtr storage;
if (table_expression->table_function)
storage = const_cast<Context &>(context).executeTableFunction(table_expression->table_function);
else if (table_expression->database_and_table_name)
{
const auto database_and_table_ast = static_cast<ASTIdentifier*>(table_expression->database_and_table_name.get());
const auto database_and_table_name = getDatabaseAndTableNameFromIdentifier(*database_and_table_ast);
storage = context.tryGetTable(database_and_table_name.first, database_and_table_name.second);
}
const auto block = storage->getSampleBlock();
for (size_t idx = 0; idx < block.columns(); idx++)
projection_columns.emplace_back(std::make_shared<ASTIdentifier>(block.getByPosition(idx).name));
}
}
return projection_columns;
}
std::vector<ASTTableExpression *> PredicateExpressionsOptimizer::getSelectTablesExpression(ASTSelectQuery * select_query)
{
if (!select_query->tables)
return {};
std::vector<ASTTableExpression *> tables_expression;
const ASTTablesInSelectQuery & tables_in_select_query = static_cast<const ASTTablesInSelectQuery &>(*select_query->tables);
for (const auto & child : tables_in_select_query.children)
{
ASTTablesInSelectQueryElement * tables_element = static_cast<ASTTablesInSelectQueryElement *>(child.get());
if (tables_element->table_expression)
tables_expression.emplace_back(static_cast<ASTTableExpression *>(tables_element->table_expression.get()));
}
return tables_expression;
}
}

View File

@ -7,6 +7,9 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/evaluateQualified.h>
namespace DB
{
@ -21,6 +24,8 @@ using PredicateExpressions = std::vector<ASTPtr>;
using ProjectionWithAlias = std::pair<ASTPtr, String>;
using ProjectionsWithAliases = std::vector<ProjectionWithAlias>;
using SubqueriesProjectionColumns = std::map<IAST *, ProjectionsWithAliases>;
using IdentifierWithQualifiedName = std::pair<ASTIdentifier *, String>;
using IdentifiersWithQualifiedNameSet = std::vector<IdentifierWithQualifiedName>;
/** This class provides functions for Push-Down predicate expressions
@ -37,13 +42,14 @@ using SubqueriesProjectionColumns = std::map<IAST *, ProjectionsWithAliases>;
class PredicateExpressionsOptimizer
{
public:
PredicateExpressionsOptimizer(ASTSelectQuery * ast_select_, const Settings & settings_);
PredicateExpressionsOptimizer(ASTSelectQuery * ast_select_, const Settings & settings_, const Context & context_);
bool optimize();
private:
ASTSelectQuery * ast_select;
const Settings & settings;
const Context & context;
enum OptimizeKind
{
@ -57,24 +63,29 @@ private:
PredicateExpressions splitConjunctionPredicate(ASTPtr & predicate_expression);
void getExpressionDependentColumns(const ASTPtr & expression, ASTs & expression_dependent_columns);
void getDependenciesAndQualifiedOfExpression(const ASTPtr & expression, IdentifiersWithQualifiedNameSet & dependencies_and_qualified,
std::vector<DatabaseAndTableWithAlias> & tables_with_aliases);
bool optimizeExpression(const ASTPtr & outer_expression, ASTPtr & subquery_expression, ASTSelectQuery * subquery);
bool optimizeImpl(ASTPtr & outer_expression, SubqueriesProjectionColumns & subqueries_projection_columns, bool is_prewhere);
bool cannotPushDownOuterPredicate(
const ProjectionsWithAliases & subquery_projection_columns, ASTSelectQuery * subquery,
ASTs & expression_dependent_columns, bool & is_prewhere, OptimizeKind & optimize_kind);
bool cannotPushDownOuterPredicate(const ProjectionsWithAliases & subquery_projection_columns, ASTSelectQuery * subquery,
IdentifiersWithQualifiedNameSet & outer_predicate_dependencies, bool & is_prewhere, OptimizeKind & optimize_kind);
void cloneOuterPredicateForInnerPredicate(
const ASTPtr & outer_predicate, const ProjectionsWithAliases & projection_columns, ASTs & predicate_dependent_columns,
ASTPtr & inner_predicate);
void cloneOuterPredicateForInnerPredicate(const ASTPtr & outer_predicate, const ProjectionsWithAliases & projection_columns,
std::vector<DatabaseAndTableWithAlias> & tables, ASTPtr & inner_predicate);
void getAllSubqueryProjectionColumns(SubqueriesProjectionColumns & all_subquery_projection_columns);
void getAllSubqueryProjectionColumns(IAST * node, SubqueriesProjectionColumns & all_subquery_projection_columns);
void getSubqueryProjectionColumns(SubqueriesProjectionColumns & all_subquery_projection_columns,
String & qualified_name_prefix, const ASTPtr & subquery);
void getSubqueryProjectionColumns(IAST * subquery, SubqueriesProjectionColumns & all_subquery_projection_columns, ASTs & output_projections);
ASTs getSelectQueryProjectionColumns(ASTPtr & ast);
std::vector<ASTTableExpression *> getSelectTablesExpression(ASTSelectQuery * select_query);
ASTs evaluateAsterisk(ASTSelectQuery * select_query, const ASTPtr & asterisk);
};
}

View File

@ -283,6 +283,7 @@ struct Settings
M(SettingUInt64, max_fetch_partition_retries_count, 5, "Amount of retries while fetching partition from another host.") \
M(SettingBool, asterisk_left_columns_only, 0, "If it is set to true, the asterisk only return left of join query.") \
M(SettingUInt64, http_max_multipart_form_data_size, 1024 * 1024 * 1024, "Limit on size of multipart/form-data content. This setting cannot be parsed from URL parameters and should be set in user profile. Note that content is parsed and external tables are created in memory before start of query execution. And this is the only limit that has effect on that stage (limits on max memory usage and max execution time have no effect while reading HTTP form data).") \
M(SettingBool, calculate_text_stack_trace, 1, "Calculate text stack trace in case of exceptions during query execution. This is the default. It requires symbol lookups that may slow down fuzzing tests when huge amount of wrong queries are executed. In normal cases you should not disable this option.") \
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \

View File

@ -51,7 +51,7 @@ namespace DB
*/
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1024
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
class Context;
class QueryLog;

View File

@ -0,0 +1,160 @@
#include <Interpreters/evaluateQualified.h>
#include <Interpreters/Context.h>
#include <Common/typeid_cast.h>
namespace DB
{
/// Checks that ast is ASTIdentifier and remove num_qualifiers_to_strip components from left.
/// Example: 'database.table.name' -> (num_qualifiers_to_strip = 2) -> 'name'.
void stripIdentifier(DB::ASTPtr & ast, size_t num_qualifiers_to_strip)
{
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(ast.get());
if (!identifier)
throw DB::Exception("ASTIdentifier expected for stripIdentifier", DB::ErrorCodes::LOGICAL_ERROR);
if (num_qualifiers_to_strip)
{
size_t num_components = identifier->children.size();
/// plain column
if (num_components - num_qualifiers_to_strip == 1)
{
DB::String node_alias = identifier->tryGetAlias();
ast = identifier->children.back();
if (!node_alias.empty())
ast->setAlias(node_alias);
}
else
/// nested column
{
identifier->children.erase(identifier->children.begin(), identifier->children.begin() + num_qualifiers_to_strip);
DB::String new_name;
for (const auto & child : identifier->children)
{
if (!new_name.empty())
new_name += '.';
new_name += static_cast<const ASTIdentifier &>(*child.get()).name;
}
identifier->name = new_name;
}
}
}
DatabaseAndTableWithAlias getTableNameWithAliasFromTableExpression(const ASTTableExpression & table_expression,
const Context & context)
{
DatabaseAndTableWithAlias database_and_table_with_alias;
if (table_expression.database_and_table_name)
{
const auto & identifier = static_cast<const ASTIdentifier &>(*table_expression.database_and_table_name);
database_and_table_with_alias.alias = identifier.tryGetAlias();
if (table_expression.database_and_table_name->children.empty())
{
database_and_table_with_alias.database = context.getCurrentDatabase();
database_and_table_with_alias.table = identifier.name;
}
else
{
if (table_expression.database_and_table_name->children.size() != 2)
throw Exception("Logical error: number of components in table expression not equal to two", ErrorCodes::LOGICAL_ERROR);
database_and_table_with_alias.database = static_cast<const ASTIdentifier &>(*identifier.children[0]).name;
database_and_table_with_alias.table = static_cast<const ASTIdentifier &>(*identifier.children[1]).name;
}
}
else if (table_expression.table_function)
{
database_and_table_with_alias.alias = table_expression.table_function->tryGetAlias();
}
else if (table_expression.subquery)
{
database_and_table_with_alias.alias = table_expression.subquery->tryGetAlias();
}
else
throw Exception("Logical error: no known elements in ASTTableExpression", ErrorCodes::LOGICAL_ERROR);
return database_and_table_with_alias;
}
/// Get the number of components of identifier which are correspond to 'alias.', 'table.' or 'databas.table.' from names.
size_t getNumComponentsToStripInOrderToTranslateQualifiedName(const ASTIdentifier & identifier,
const DatabaseAndTableWithAlias & names)
{
size_t num_qualifiers_to_strip = 0;
auto get_identifier_name = [](const ASTPtr & ast) { return static_cast<const ASTIdentifier &>(*ast).name; };
/// It is compound identifier
if (!identifier.children.empty())
{
size_t num_components = identifier.children.size();
/// database.table.column
if (num_components >= 3
&& !names.database.empty()
&& get_identifier_name(identifier.children[0]) == names.database
&& get_identifier_name(identifier.children[1]) == names.table)
{
num_qualifiers_to_strip = 2;
}
/// table.column or alias.column. If num_components > 2, it is like table.nested.column.
if (num_components >= 2
&& ((!names.table.empty() && get_identifier_name(identifier.children[0]) == names.table)
|| (!names.alias.empty() && get_identifier_name(identifier.children[0]) == names.alias)))
{
num_qualifiers_to_strip = 1;
}
}
return num_qualifiers_to_strip;
}
std::pair<String, String> getDatabaseAndTableNameFromIdentifier(const ASTIdentifier & identifier)
{
std::pair<String, String> res;
res.second = identifier.name;
if (!identifier.children.empty())
{
if (identifier.children.size() != 2)
throw Exception("Qualified table name could have only two components", ErrorCodes::LOGICAL_ERROR);
res.first = typeid_cast<const ASTIdentifier &>(*identifier.children[0]).name;
res.second = typeid_cast<const ASTIdentifier &>(*identifier.children[1]).name;
}
return res;
}
String DatabaseAndTableWithAlias::getQualifiedNamePrefix() const
{
return (!alias.empty() ? alias : (database + '.' + table)) + '.';
}
void DatabaseAndTableWithAlias::makeQualifiedName(const ASTPtr & ast) const
{
if (auto identifier = typeid_cast<ASTIdentifier *>(ast.get()))
{
String prefix = getQualifiedNamePrefix();
identifier->name.insert(identifier->name.begin(), prefix.begin(), prefix.end());
Names qualifiers;
if (!alias.empty())
qualifiers.push_back(alias);
else
{
qualifiers.push_back(database);
qualifiers.push_back(table);
}
for (const auto & qualifier : qualifiers)
identifier->children.emplace_back(std::make_shared<ASTIdentifier>(qualifier));
}
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/Context.h>
namespace DB
{
struct DatabaseAndTableWithAlias
{
String database;
String table;
String alias;
/// "alias." or "database.table." if alias is empty
String getQualifiedNamePrefix() const;
/// If ast is ASTIdentifier, prepend getQualifiedNamePrefix() to it's name.
void makeQualifiedName(const ASTPtr & ast) const;
};
void stripIdentifier(DB::ASTPtr & ast, size_t num_qualifiers_to_strip);
DatabaseAndTableWithAlias getTableNameWithAliasFromTableExpression(const ASTTableExpression & table_expression,
const Context & context);
size_t getNumComponentsToStripInOrderToTranslateQualifiedName(const ASTIdentifier & identifier,
const DatabaseAndTableWithAlias & names);
std::pair<String, String> getDatabaseAndTableNameFromIdentifier(const ASTIdentifier & identifier);
}

View File

@ -45,6 +45,7 @@ static void checkASTSizeLimits(const IAST & ast, const Settings & settings)
}
/// NOTE This is wrong in case of single-line comments and in case of multiline string literals.
static String joinLines(const String & query)
{
String res = query;
@ -99,7 +100,7 @@ static void onExceptionBeforeStart(const String & query, Context & context, time
/// Exception before the query execution.
context.getQuota().addError();
bool log_queries = context.getSettingsRef().log_queries;
const Settings & settings = context.getSettingsRef();
/// Log the start of query execution into the table if necessary.
QueryLogElement elem;
@ -109,18 +110,19 @@ static void onExceptionBeforeStart(const String & query, Context & context, time
elem.event_time = current_time;
elem.query_start_time = current_time;
elem.query = query.substr(0, context.getSettingsRef().log_queries_cut_to_length);
elem.query = query.substr(0, settings.log_queries_cut_to_length);
elem.exception = getCurrentExceptionMessage(false);
elem.client_info = context.getClientInfo();
setExceptionStackTrace(elem);
if (settings.calculate_text_stack_trace)
setExceptionStackTrace(elem);
logException(context, elem);
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
if (log_queries)
if (settings.log_queries)
if (auto query_log = context.getQueryLog())
query_log->add(elem);
}
@ -363,7 +365,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.profile_counters = std::move(info.profile_counters);
}
setExceptionStackTrace(elem);
if (settings.calculate_text_stack_trace)
setExceptionStackTrace(elem);
logException(context, elem);
/// In case of exception we log internal queries also

View File

@ -48,5 +48,7 @@ add_check(in_join_subqueries_preprocessor)
add_executable (users users.cpp)
target_link_libraries (users dbms ${Boost_FILESYSTEM_LIBRARY})
add_executable (internal_iotop internal_iotop.cpp)
target_link_libraries (internal_iotop dbms)
if (OS_LINUX)
add_executable (internal_iotop internal_iotop.cpp)
target_link_libraries (internal_iotop dbms)
endif ()

View File

@ -28,7 +28,7 @@ int main(int, char **)
"catch (const std::runtime_error & e) { std::cout << \"Caught in .so: \" << e.what() << std::endl; throw; }\n"
"}"
;
}, [](SharedLibraryPtr&){});
}, [](SharedLibraryPtr &){});
auto f = lib->template get<void (*)()>("_Z1fv");

View File

@ -6,7 +6,6 @@
#include <Poco/File.h>
#include <Common/Stopwatch.h>
#include <IO/WriteBufferFromString.h>
#include <linux/taskstats.h>
#include <sys/time.h>
#include <sys/resource.h>
@ -52,16 +51,9 @@ void do_io(size_t id)
int tid = TaskStatsInfoGetter::getCurrentTID();
TaskStatsInfoGetter get_info;
if (!get_info.tryGetStat(stat, tid))
{
std::lock_guard<std::mutex> lock(mutex);
std::cerr << "#" << id << ", tid " << tid << ". Can't get stat\n";
}
else
{
std::lock_guard<std::mutex> lock(mutex);
std::cerr << "#" << id << ", tid " << tid << ", intitial\n" << stat << "\n";
}
get_info.getStat(stat, tid);
std::lock_guard<std::mutex> lock(mutex);
std::cerr << "#" << id << ", tid " << tid << ", intitial\n" << stat << "\n";
size_t copy_size = 1048576 * (1 + id);
std::string path_dst = "test_out_" + std::to_string(id);

View File

@ -50,12 +50,18 @@ bool ParserCase::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!has_branch)
return false;
if (!s_else.ignore(pos, expected))
return false;
ASTPtr expr_else;
if (!p_expr.parse(pos, expr_else, expected))
return false;
if (s_else.ignore(pos, expected))
{
if (!p_expr.parse(pos, expr_else, expected))
return false;
}
else
{
Field field_with_null;
ASTLiteral null_literal(field_with_null);
expr_else = std::make_shared<ASTLiteral>(null_literal);
}
args.push_back(expr_else);
if (!s_end.ignore(pos, expected))

View File

@ -36,7 +36,7 @@ void BackgroundProcessingPoolTaskInfo::wake()
Poco::Timestamp current_time;
{
std::unique_lock<std::mutex> lock(pool.tasks_mutex);
std::unique_lock lock(pool.tasks_mutex);
auto next_time_to_execute = iterator->first;
auto this_task_handle = iterator->second;
@ -58,12 +58,6 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
{
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with " << size << " threads");
/// Put all threads to one thread group
/// The master thread exits immediately
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
CurrentThread::detachQuery();
threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
@ -77,7 +71,7 @@ BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Tas
Poco::Timestamp current_time;
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
res->iterator = tasks.emplace(current_time, res);
}
@ -93,11 +87,11 @@ void BackgroundProcessingPool::removeTask(const TaskHandle & task)
/// Wait for all executions of this task.
{
std::unique_lock<std::shared_mutex> wlock(task->rwlock);
std::unique_lock wlock(task->rwlock);
}
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
tasks.erase(task->iterator);
}
}
@ -122,10 +116,22 @@ void BackgroundProcessingPool::threadFunction()
{
setThreadName("BackgrProcPool");
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
{
std::lock_guard lock(tasks_mutex);
if (thread_group)
{
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
}
else
{
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
}
}
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
pcg64 rng(randomSeed());
@ -141,7 +147,7 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp min_time;
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
if (!tasks.empty())
{
@ -162,7 +168,7 @@ void BackgroundProcessingPool::threadFunction()
if (!task)
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::duration<double>(sleep_seconds
+ std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
@ -173,12 +179,12 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp current_time;
if (min_time > current_time)
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock, std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, sleep_seconds_random_part * 1000000)(rng)));
}
std::shared_lock<std::shared_mutex> rlock(task->rwlock);
std::shared_lock rlock(task->rwlock);
if (task->removed)
continue;
@ -202,7 +208,7 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + (done_work ? 0 : sleep_seconds * 1000000);
{
std::unique_lock<std::mutex> lock(tasks_mutex);
std::unique_lock lock(tasks_mutex);
if (task->removed)
continue;

View File

@ -20,7 +20,7 @@
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
#include <DataStreams/ApplyingMutationsBlockInputStream.h>
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBufferFromFile.h>
@ -885,7 +885,8 @@ static BlockInputStreamPtr createInputStreamWithMutatedData(
select->select_expression_list = std::make_shared<ASTExpressionList>();
select->children.push_back(select->select_expression_list);
select->select_expression_list->children.push_back(std::make_shared<ASTAsterisk>());
for (const auto & column : storage->getColumns().getAllPhysical())
select->select_expression_list->children.push_back(std::make_shared<ASTIdentifier>(column.name));
/// For all commands that are in front of the list and are DELETE commands, we can push them down
/// to the SELECT statement and remove them from commands.
@ -1026,7 +1027,7 @@ MergeTreeDataMergerMutator::MergeAlgorithm MergeTreeDataMergerMutator::chooseMer
{
if (deduplicate)
return MergeAlgorithm::Horizontal;
if (data.context.getMergeTreeSettings().enable_vertical_merge_algorithm == 0)
if (data.settings.enable_vertical_merge_algorithm == 0)
return MergeAlgorithm::Horizontal;
bool is_supported_storage =
@ -1035,9 +1036,9 @@ MergeTreeDataMergerMutator::MergeAlgorithm MergeTreeDataMergerMutator::chooseMer
data.merging_params.mode == MergeTreeData::MergingParams::Replacing ||
data.merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
bool enough_ordinary_cols = gathering_columns.size() >= data.context.getMergeTreeSettings().vertical_merge_algorithm_min_columns_to_activate;
bool enough_ordinary_cols = gathering_columns.size() >= data.settings.vertical_merge_algorithm_min_columns_to_activate;
bool enough_total_rows = sum_rows_upper_bound >= data.context.getMergeTreeSettings().vertical_merge_algorithm_min_rows_to_activate;
bool enough_total_rows = sum_rows_upper_bound >= data.settings.vertical_merge_algorithm_min_rows_to_activate;
bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS;

View File

@ -298,16 +298,19 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
MergeTreeData::DataPart::Checksums * additional_column_checksums)
{
/// Finish columns serialization.
auto & settings = storage.context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
if (!serialization_states.empty())
{
serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
auto & settings = storage.context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
}
}
if (!total_column_list)

View File

@ -23,11 +23,7 @@ class ReplicatedMergeTreeAlterThread
public:
ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_);
void start()
{
task->activate();
task->schedule();
}
void start() { task->activateAndSchedule(); }
void stop() { task->deactivate(); }

View File

@ -214,6 +214,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
if (!block_number_lock)
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it.");
part->is_duplicate = true;
last_block_is_duplicate = true;
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);

View File

@ -23,11 +23,7 @@ class ReplicatedMergeTreeCleanupThread
public:
ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_);
void start()
{
task->activate();
task->schedule();
}
void start() { task->activateAndSchedule(); }
void wakeup() { task->schedule(); }

View File

@ -35,8 +35,7 @@ void ReplicatedMergeTreePartCheckThread::start()
{
std::lock_guard<std::mutex> lock(start_stop_mutex);
need_stop = false;
task->activate();
task->schedule();
task->activateAndSchedule();
}
void ReplicatedMergeTreePartCheckThread::stop()

View File

@ -52,34 +52,6 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
check_period_ms = storage.data.settings.check_delay_period * 1000;
task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); });
task->schedule();
}
ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread()
{
try
{
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
task->deactivate();
LOG_TRACE(log, "Restarting thread finished");
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
storage.fetcher.blocker.cancelForever();
storage.merger_mutator.actions_blocker.cancelForever();
/// Stop other tasks.
partialShutdown();
if (storage.queue_task_handle)
storage.context.getBackgroundPool().removeTask(storage.queue_task_handle);
storage.queue_task_handle.reset();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
void ReplicatedMergeTreeRestartingThread::run()
@ -207,18 +179,13 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.partial_shutdown_called = false;
storage.partial_shutdown_event.reset();
storage.queue_updating_task->activate();
storage.queue_updating_task->schedule();
storage.mutations_updating_task->activate();
storage.mutations_updating_task->schedule();
storage.queue_updating_task->activateAndSchedule();
storage.mutations_updating_task->activateAndSchedule();
storage.mutations_finalizing_task->activateAndSchedule();
storage.cleanup_thread.start();
storage.alter_thread.start();
storage.part_check_thread.start();
if (!storage.queue_task_handle)
storage.queue_task_handle = storage.context.getBackgroundPool().addTask(
std::bind(&StorageReplicatedMergeTree::queueTask, &storage));
return true;
}
catch (...)
@ -360,6 +327,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.queue_updating_task->deactivate();
storage.mutations_updating_task->deactivate();
storage.mutations_finalizing_task->deactivate();
storage.cleanup_thread.stop();
storage.alter_thread.stop();
@ -368,4 +336,16 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
LOG_TRACE(log, "Threads finished");
}
void ReplicatedMergeTreeRestartingThread::shutdown()
{
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
task->deactivate();
LOG_TRACE(log, "Restarting thread finished");
/// Stop other tasks.
partialShutdown();
}
}

View File

@ -23,10 +23,13 @@ class ReplicatedMergeTreeRestartingThread
{
public:
ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeRestartingThread();
void start() { task->activateAndSchedule(); }
void wakeup() { task->schedule(); }
void shutdown();
private:
StorageReplicatedMergeTree & storage;
String log_name;

View File

@ -89,14 +89,15 @@ BlockInputStreams StorageODBC::read(const Names & column_names,
void registerStorageODBC(StorageFactory & factory)
{
factory.registerStorage("ODBC", [](const StorageFactory::Arguments & args) {
factory.registerStorage("ODBC", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 3)
throw Exception(
"Storage ODBC requires exactly 3 parameters: ODBC('DSN', database, table).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
"Storage ODBC requires exactly 3 parameters: ODBC('DSN', database or schema, table)", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < 2; ++i)
for (size_t i = 0; i < 3; ++i)
engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context);
return StorageODBC::create(args.table_name,

View File

@ -215,7 +215,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
[this] (const std::string & name) { enqueuePartForCheck(name); }),
reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()), queue(*this),
fetcher(data),
cleanup_thread(*this), alter_thread(*this), part_check_thread(*this),
cleanup_thread(*this), alter_thread(*this), part_check_thread(*this), restarting_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{
if (path_.empty())
@ -2063,9 +2063,7 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED)
{
/// Can be called before starting restarting_thread
if (restarting_thread)
restarting_thread->wakeup();
restarting_thread.wakeup();
return;
}
@ -2413,8 +2411,7 @@ void StorageReplicatedMergeTree::enterLeaderElection()
LOG_INFO(log, "Became leader");
is_leader = true;
merge_selecting_task->activate();
merge_selecting_task->schedule();
merge_selecting_task->activateAndSchedule();
};
try
@ -2787,8 +2784,10 @@ void StorageReplicatedMergeTree::startup()
data_parts_exchange_endpoint_holder = std::make_shared<InterserverIOEndpointHolder>(
data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, context.getInterserverIOHandler());
queue_task_handle = context.getBackgroundPool().addTask([this] { return queueTask(); });
/// In this thread replica will be activated.
restarting_thread = std::make_unique<ReplicatedMergeTreeRestartingThread>(*this);
restarting_thread.start();
/// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attmept to do it
startup_event.wait();
@ -2797,15 +2796,21 @@ void StorageReplicatedMergeTree::startup()
void StorageReplicatedMergeTree::shutdown()
{
restarting_thread.reset();
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();
merger_mutator.actions_blocker.cancelForever();
restarting_thread.shutdown();
if (queue_task_handle)
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
if (data_parts_exchange_endpoint_holder)
{
data_parts_exchange_endpoint_holder->getBlocker().cancelForever();
data_parts_exchange_endpoint_holder = nullptr;
}
fetcher.blocker.cancelForever();
}

View File

@ -198,7 +198,6 @@ private:
void clearOldPartsAndRemoveFromZK();
friend class ReplicatedMergeTreeBlockOutputStream;
friend class ReplicatedMergeTreeRestartingThread;
friend class ReplicatedMergeTreePartCheckThread;
friend class ReplicatedMergeTreeCleanupThread;
friend class ReplicatedMergeTreeAlterThread;
@ -303,7 +302,7 @@ private:
ReplicatedMergeTreePartCheckThread part_check_thread;
/// A thread that processes reconnection to ZooKeeper when the session expires.
std::unique_ptr<ReplicatedMergeTreeRestartingThread> restarting_thread;
ReplicatedMergeTreeRestartingThread restarting_thread;
/// An event that awakens `alter` method from waiting for the completion of the ALTER query.
zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>();

Some files were not shown because too many files have changed in this diff Show More