Merge remote-tracking branch 'upstream/master' into nikvas0/index_fix

This commit is contained in:
Nikita Vasilev 2019-02-05 22:19:47 +03:00
commit 9eaabcbf3f
308 changed files with 7351 additions and 4300 deletions

28
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View File

@ -0,0 +1,28 @@
---
name: Bug report
about: Create a report to help us improve ClickHouse
title: ''
labels: bug, issue
assignees: ''
---
**Describe the bug**
A clear and concise description of what the bug is.
**How to reproduce**
* Which ClickHouse server version to use
* Which interface to use, if matters
* Non-default settings, if any
* `CREATE TABLE` statements for all tables involved
* Sample data for all these tables, use [clickhouse-obfuscator](https://github.com/yandex/ClickHouse/blob/master/dbms/programs/obfuscator/Obfuscator.cpp#L42-L80) if necessary
* Queries to run that lead to unexpected result
**Expected behavior**
A clear and concise description of what you expected to happen.
**Error message and/or stacktrace**
If applicable, add screenshots to help explain your problem.
**Additional context**
Add any other context about the problem here.

View File

@ -0,0 +1,20 @@
---
name: Feature request
about: Suggest an idea for ClickHouse
title: ''
labels: feature
assignees: ''
---
**Use case.**
A clear and concise description of what is the intended usage scenario is.
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

3
.gitmodules vendored
View File

@ -64,3 +64,6 @@
[submodule "contrib/cppkafka"]
path = contrib/cppkafka
url = https://github.com/mfontanini/cppkafka.git
[submodule "contrib/pdqsort"]
path = contrib/pdqsort
url = https://github.com/orlp/pdqsort

View File

@ -1 +0,0 @@

View File

@ -8,7 +8,7 @@
* Added functions `left`, `right`, `trim`, `ltrim`, `rtrim`, `timestampadd`, `timestampsub` for SQL standard compatibility. [#3826](https://github.com/yandex/ClickHouse/pull/3826) ([Ivan Blinkov](https://github.com/blinkov))
* Support for write in `HDFS` tables and `hdfs` table function. [#4084](https://github.com/yandex/ClickHouse/pull/4084) ([alesapin](https://github.com/alesapin))
* Added functions to search for multiple constant strings from big haystack: `multiPosition`, `multiSearch` ,`firstMatch` also with `-UTF8`, `-CaseInsensitive`, and `-CaseInsensitiveUTF8` variants. [#4053](https://github.com/yandex/ClickHouse/pull/4053) ([Danila Kutenin](https://github.com/danlark1))
* Pruning of unused shards if `SELECT` query filters by sharding key (setting `distributed_optimize_skip_select_on_unused_shards`). [#3851](https://github.com/yandex/ClickHouse/pull/3851) ([Ivan](https://github.com/abyss7))
* Pruning of unused shards if `SELECT` query filters by sharding key (setting `distributed_optimize_skip_select_on_unused_shards`). [#3851](https://github.com/yandex/ClickHouse/pull/3851) ([Gleb Kanterov](https://github.com/kanterov), [Ivan](https://github.com/abyss7))
* Allow `Kafka` engine to ignore some number of parsing errors per block. [#4094](https://github.com/yandex/ClickHouse/pull/4094) ([Ivan](https://github.com/abyss7))
* Added support for `CatBoost` multiclass models evaluation. Function `modelEvaluate` returns tuple with per-class raw predictions for multiclass models. `libcatboostmodel.so` should be built with [#607](https://github.com/catboost/catboost/pull/607). [#3959](https://github.com/yandex/ClickHouse/pull/3959) ([KochetovNicolai](https://github.com/KochetovNicolai))
* Added functions `filesystemAvailable`, `filesystemFree`, `filesystemCapacity`. [#4097](https://github.com/yandex/ClickHouse/pull/4097) ([Boris Granveaud](https://github.com/bgranvea))

View File

@ -96,7 +96,7 @@ option (ENABLE_TESTS "Enables tests" ON)
if (CMAKE_SYSTEM_PROCESSOR MATCHES "amd64|x86_64")
option (USE_INTERNAL_MEMCPY "Use internal implementation of 'memcpy' function instead of provided by libc. Only for x86_64." ON)
if (OS_LINUX AND NOT UNBUNDLED AND MAKE_STATIC_LIBRARIES)
if (OS_LINUX AND NOT UNBUNDLED AND MAKE_STATIC_LIBRARIES AND CMAKE_VERSION VERSION_GREATER "3.9.0")
option (GLIBC_COMPATIBILITY "Set to TRUE to enable compatibility with older glibc libraries. Only for x86_64, Linux. Implies USE_INTERNAL_MEMCPY." ON)
if (GLIBC_COMPATIBILITY)
message (STATUS "Some symbols from glibc will be replaced for compatibility")
@ -253,6 +253,7 @@ endif()
include (cmake/find_libgsasl.cmake)
include (cmake/find_libxml2.cmake)
include (cmake/find_protobuf.cmake)
include (cmake/find_pdqsort.cmake)
include (cmake/find_hdfs3.cmake)
include (cmake/find_consistent-hashing.cmake)
include (cmake/find_base64.cmake)

View File

@ -13,4 +13,5 @@ ClickHouse is an open-source column-oriented database management system that all
## Upcoming Events
* [C++ ClickHouse and CatBoost Sprints](https://events.yandex.ru/events/ClickHouse/2-feb-2019/) in Moscow on February 2.
* [ClickHouse Community Meetup](https://www.eventbrite.com/e/meetup-clickhouse-in-the-wild-deployment-success-stories-registration-55305051899) in San Francisco on February 19.
* [ClickHouse Community Meetup](https://www.eventbrite.com/e/clickhouse-meetup-in-madrid-registration-55376746339) in Madrid on April 2.

2
cmake/find_pdqsort.cmake Normal file
View File

@ -0,0 +1,2 @@
set(PDQSORT_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/pdqsort)
message(STATUS "Using pdqsort: ${PDQSORT_INCLUDE_DIR}")

View File

@ -1,5 +1,11 @@
option(USE_INTERNAL_PROTOBUF_LIBRARY "Set to FALSE to use system protobuf instead of bundled" ${NOT_UNBUNDLED})
if(OS_FREEBSD AND SANITIZE STREQUAL "address")
# ../contrib/protobuf/src/google/protobuf/arena_impl.h:45:10: fatal error: 'sanitizer/asan_interface.h' file not found
set(MISSING_INTERNAL_PROTOBUF_LIBRARY 1)
set(USE_INTERNAL_PROTOBUF_LIBRARY 0)
endif()
if(NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/protobuf/cmake/CMakeLists.txt")
if(USE_INTERNAL_PROTOBUF_LIBRARY)
message(WARNING "submodule contrib/protobuf is missing. to fix try run: \n git submodule update --init --recursive")

View File

@ -5,13 +5,24 @@ if (NOT USE_INTERNAL_RE2_LIBRARY)
find_path (RE2_INCLUDE_DIR NAMES re2/re2.h PATHS ${RE2_INCLUDE_PATHS})
endif ()
string(FIND ${CMAKE_CURRENT_BINARY_DIR} " " _have_space)
if(_have_space GREATER 0)
message(WARNING "Using spaces in build path [${CMAKE_CURRENT_BINARY_DIR}] highly not recommended. Library re2st will be disabled.")
set (MISSING_INTERNAL_RE2_ST_LIBRARY 1)
endif()
if (RE2_LIBRARY AND RE2_INCLUDE_DIR)
set (RE2_ST_LIBRARY ${RE2_LIBRARY})
else ()
elseif (NOT MISSING_INTERNAL_RE2_LIBRARY)
set (USE_INTERNAL_RE2_LIBRARY 1)
set (RE2_LIBRARY re2)
set (RE2_ST_LIBRARY re2_st)
set (USE_RE2_ST 1)
set (RE2_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/re2)
if (NOT MISSING_INTERNAL_RE2_ST_LIBRARY)
set (RE2_ST_LIBRARY re2_st)
set (USE_RE2_ST 1)
else ()
set (RE2_ST_LIBRARY ${RE2_LIBRARY})
endif ()
endif ()
message (STATUS "Using re2: ${RE2_INCLUDE_DIR} : ${RE2_LIBRARY}; ${RE2_ST_INCLUDE_DIR} : ${RE2_ST_LIBRARY}")

View File

@ -8,6 +8,8 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast -Wno-unused-function -Wno-unused-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-non-virtual-dtor -Wno-format -Wno-inconsistent-missing-override -std=c++1z")
endif ()
set_property(DIRECTORY PROPERTY EXCLUDE_FROM_ALL 1)
if (USE_INTERNAL_BOOST_LIBRARY)
add_subdirectory (boost-cmake)
endif ()

View File

@ -39,5 +39,10 @@ add_library(base64 ${LINK_MODE}
${LIBRARY_DIR}/lib/codecs.h
${CMAKE_CURRENT_BINARY_DIR}/config.h)
target_compile_options(base64 PRIVATE ${base64_SSSE3_opt} ${base64_SSE41_opt} ${base64_SSE42_opt} ${base64_AVX_opt} ${base64_AVX2_opt})
set_source_files_properties(${LIBRARY_DIR}/lib/arch/avx/codec.c PROPERTIES COMPILE_FLAGS -mavx)
set_source_files_properties(${LIBRARY_DIR}/lib/arch/avx2/codec.c PROPERTIES COMPILE_FLAGS -mavx2)
set_source_files_properties(${LIBRARY_DIR}/lib/arch/sse41/codec.c PROPERTIES COMPILE_FLAGS -msse4.1)
set_source_files_properties(${LIBRARY_DIR}/lib/arch/sse42/codec.c PROPERTIES COMPILE_FLAGS -msse4.2)
set_source_files_properties(${LIBRARY_DIR}/lib/arch/ssse3/codec.c PROPERTIES COMPILE_FLAGS -mssse3)
target_include_directories(base64 PRIVATE ${LIBRARY_DIR}/include ${CMAKE_CURRENT_BINARY_DIR})

1
contrib/pdqsort vendored Submodule

@ -0,0 +1 @@
Subproject commit 08879029ab8dcb80a70142acb709e3df02de5d37

View File

@ -206,6 +206,10 @@ target_link_libraries (clickhouse_common_io
${CMAKE_DL_LIBS}
)
target_include_directories(clickhouse_common_io SYSTEM BEFORE PUBLIC ${PDQSORT_INCLUDE_DIR})
target_include_directories(clickhouse_common_io SYSTEM BEFORE PUBLIC ${RE2_INCLUDE_DIR})
if(CPUID_LIBRARY)
target_link_libraries(clickhouse_common_io PRIVATE ${CPUID_LIBRARY})
endif()
@ -235,9 +239,6 @@ target_link_libraries (dbms
Threads::Threads
)
if (NOT USE_INTERNAL_RE2_LIBRARY)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${RE2_INCLUDE_DIR})
endif ()
if (NOT USE_INTERNAL_BOOST_LIBRARY)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
@ -257,7 +258,6 @@ if (USE_POCO_SQLODBC)
endif()
endif()
#if (Poco_Data_FOUND AND NOT USE_INTERNAL_POCO_LIBRARY)
if (Poco_Data_FOUND)
target_include_directories (clickhouse_common_io SYSTEM PRIVATE ${Poco_Data_INCLUDE_DIR})
target_include_directories (dbms SYSTEM PRIVATE ${Poco_Data_INCLUDE_DIR})
@ -284,6 +284,7 @@ target_link_libraries (dbms PRIVATE ${Poco_Foundation_LIBRARY})
if (USE_ICU)
target_link_libraries (dbms PRIVATE ${ICU_LIBRARIES})
target_include_directories (dbms SYSTEM PRIVATE ${ICU_INCLUDE_DIRS})
endif ()
if (USE_CAPNP)

View File

@ -28,11 +28,18 @@ add_subdirectory (copier)
add_subdirectory (format)
add_subdirectory (clang)
add_subdirectory (obfuscator)
add_subdirectory (odbc-bridge)
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
add_subdirectory (odbc-bridge)
endif ()
if (CLICKHOUSE_SPLIT_BINARY)
set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-performance-test
clickhouse-extract-from-config clickhouse-compressor clickhouse-format clickhouse-copier clickhouse-odbc-bridge)
clickhouse-extract-from-config clickhouse-compressor clickhouse-format clickhouse-copier)
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-odbc-bridge)
endif ()
if (USE_EMBEDDED_COMPILER)
list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-clang clickhouse-lld)
@ -85,9 +92,6 @@ else ()
if (USE_EMBEDDED_COMPILER)
target_link_libraries (clickhouse PRIVATE clickhouse-compiler-lib)
endif ()
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
target_link_libraries (clickhouse PRIVATE clickhouse-odbc-bridge-lib)
endif()
set (CLICKHOUSE_BUNDLE)
if (ENABLE_CLICKHOUSE_SERVER)
@ -135,15 +139,14 @@ else ()
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-format DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-format)
endif ()
if (ENABLE_CLICKHOUSE_COPIER)
if (ENABLE_CLICKHOUSE_OBFUSCATOR)
add_custom_target (clickhouse-obfuscator ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-obfuscator DEPENDS clickhouse)
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-obfuscator DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-obfuscator)
endif ()
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
add_custom_target (clickhouse-odbc-bridge ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-odbc-bridge DEPENDS clickhouse)
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-odbc-bridge DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-odbc-bridge)
# just to be able to run integration tests
add_custom_target (clickhouse-odbc-bridge-copy ALL COMMAND ${CMAKE_COMMAND} -E create_symlink ${CMAKE_CURRENT_BINARY_DIR}/odbc-bridge/clickhouse-odbc-bridge clickhouse-odbc-bridge DEPENDS clickhouse-odbc-bridge)
endif ()

View File

@ -1542,12 +1542,19 @@ public:
po::options_description main_description("Main options", line_length, min_description_length);
main_description.add_options()
("help", "produce help message")
("config-file,c", po::value<std::string>(), "config-file path")
("config-file,C", po::value<std::string>(), "config-file path")
("config,c", po::value<std::string>(), "config-file path (another shorthand)")
("host,h", po::value<std::string>()->default_value("localhost"), "server host")
("port", po::value<int>()->default_value(9000), "server port")
("secure,s", "Use TLS connection")
("user,u", po::value<std::string>()->default_value("default"), "user")
("password", po::value<std::string>(), "password")
/** If "--password [value]" is used but the value is omitted, the bad argument exception will be thrown.
* implicit_value is used to avoid this exception (to allow user to type just "--password")
* Since currently boost provides no way to check if a value has been set implicitly for an option,
* the "\n" is used to distinguish this case because there is hardly a chance an user would use "\n"
* as the password.
*/
("password", po::value<std::string>()->implicit_value("\n"), "password")
("ask-password", "ask-password")
("query_id", po::value<std::string>(), "query_id")
("query,q", po::value<std::string>(), "query")
@ -1585,13 +1592,11 @@ public:
("structure", po::value<std::string>(), "structure")
("types", po::value<std::string>(), "types")
;
/// Parse main commandline options.
po::parsed_options parsed = po::command_line_parser(
common_arguments.size(), common_arguments.data()).options(main_description).run();
po::variables_map options;
po::store(parsed, options);
if (options.count("version") || options.count("V"))
{
showClientVersion();
@ -1649,9 +1654,14 @@ public:
APPLY_FOR_SETTINGS(EXTRACT_SETTING)
#undef EXTRACT_SETTING
if (options.count("config-file") && options.count("config"))
throw Exception("Two or more configuration files referenced in arguments", ErrorCodes::BAD_ARGUMENTS);
/// Save received data into the internal config.
if (options.count("config-file"))
config().setString("config-file", options["config-file"].as<std::string>());
if (options.count("config"))
config().setString("config-file", options["config"].as<std::string>());
if (options.count("host") && !options["host"].defaulted())
config().setString("host", options["host"].as<std::string>());
if (options.count("query_id"))
@ -1710,11 +1720,11 @@ public:
int mainEntryClickHouseClient(int argc, char ** argv)
{
DB::Client client;
try
{
DB::Client client;
client.init(argc, argv);
return client.run();
}
catch (const boost::program_options::error & e)
{
@ -1726,6 +1736,4 @@ int mainEntryClickHouseClient(int argc, char ** argv)
std::cerr << DB::getCurrentExceptionMessage(true) << std::endl;
return 1;
}
return client.run();
}

View File

@ -8,7 +8,7 @@
#include <Common/Exception.h>
#include <IO/ConnectionTimeouts.h>
#include <common/SetTerminalEcho.h>
#include <common/setTerminalEcho.h>
#include <ext/scope_guard.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -48,27 +48,33 @@ struct ConnectionParameters
is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
default_database = config.getString("database", "");
user = config.getString("user", "");
/// changed the default value to "default" to fix the issue when the user in the prompt is blank
user = config.getString("user", "default");
bool password_prompt = false;
if (config.getBool("ask-password", false))
{
if (config.has("password"))
throw Exception("Specified both --password and --ask-password. Remove one of them", ErrorCodes::BAD_ARGUMENTS);
std::cout << "Password for user " << user << ": ";
SetTerminalEcho(false);
SCOPE_EXIT({
SetTerminalEcho(true);
});
std::getline(std::cin, password);
std::cout << std::endl;
password_prompt = true;
}
else
{
password = config.getString("password", "");
/// if the value of --password is omitted, the password will be set implicitly to "\n"
if (password == "\n")
password_prompt = true;
}
if (password_prompt)
{
std::cout << "Password for user (" << user << "): ";
setTerminalEcho(false);
SCOPE_EXIT({
setTerminalEcho(true);
});
std::getline(std::cin, password);
std::cout << std::endl;
}
compression = config.getBool("compression", true)
? Protocol::Compression::Enable
: Protocol::Compression::Disable;

View File

@ -297,7 +297,7 @@ void LocalServer::processQueries()
try
{
executeQuery(read_buf, write_buf, /* allow_into_outfile = */ true, *context, {});
executeQuery(read_buf, write_buf, /* allow_into_outfile = */ true, *context, {}, {});
}
catch (...)
{

View File

@ -56,9 +56,6 @@ int mainEntryClickHouseClusterCopier(int argc, char ** argv);
#if ENABLE_CLICKHOUSE_OBFUSCATOR || !defined(ENABLE_CLICKHOUSE_OBFUSCATOR)
int mainEntryClickHouseObfuscator(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_ODBC_BRIDGE || !defined(ENABLE_CLICKHOUSE_ODBC_BRIDGE)
int mainEntryClickHouseODBCBridge(int argc, char ** argv);
#endif
#if USE_EMBEDDED_COMPILER
@ -105,9 +102,6 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
#if ENABLE_CLICKHOUSE_OBFUSCATOR || !defined(ENABLE_CLICKHOUSE_OBFUSCATOR)
{"obfuscator", mainEntryClickHouseObfuscator},
#endif
#if ENABLE_CLICKHOUSE_ODBC_BRIDGE || !defined(ENABLE_CLICKHOUSE_ODBC_BRIDGE)
{"odbc-bridge", mainEntryClickHouseODBCBridge},
#endif
#if USE_EMBEDDED_COMPILER
{"clang", mainEntryClickHouseClang},

View File

@ -9,7 +9,7 @@ add_library (clickhouse-odbc-bridge-lib ${LINK_MODE}
validateODBCConnectionString.cpp
)
target_link_libraries (clickhouse-odbc-bridge-lib PRIVATE clickhouse_dictionaries daemon dbms clickhouse_common_io)
target_link_libraries (clickhouse-odbc-bridge-lib PRIVATE daemon dbms clickhouse_common_io)
target_include_directories (clickhouse-odbc-bridge-lib PUBLIC ${ClickHouse_SOURCE_DIR}/libs/libdaemon/include)
if (USE_POCO_SQLODBC)
@ -33,8 +33,11 @@ if (ENABLE_TESTS)
add_subdirectory (tests)
endif ()
if (CLICKHOUSE_SPLIT_BINARY)
add_executable (clickhouse-odbc-bridge odbc-bridge.cpp)
target_link_libraries (clickhouse-odbc-bridge PRIVATE clickhouse-odbc-bridge-lib)
install (TARGETS clickhouse-odbc-bridge ${CLICKHOUSE_ALL_TARGETS} RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
endif ()
# clickhouse-odbc-bridge is always a separate binary.
# Reason: it must not export symbols from SSL, mariadb-client, etc. to not break ABI compatibility with ODBC drivers.
# For this reason, we disabling -rdynamic linker flag. But we do it in strange way:
SET(CMAKE_SHARED_LIBRARY_LINK_CXX_FLAGS "")
add_executable (clickhouse-odbc-bridge odbc-bridge.cpp)
target_link_libraries (clickhouse-odbc-bridge PRIVATE clickhouse-odbc-bridge-lib)
install (TARGETS clickhouse-odbc-bridge RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)

View File

@ -1,4 +1,16 @@
add_library (clickhouse-performance-test-lib ${LINK_MODE} PerformanceTest.cpp)
add_library (clickhouse-performance-test-lib ${LINK_MODE}
JSONString.cpp
StopConditionsSet.cpp
TestStopConditions.cpp
TestStats.cpp
ConfigPreprocessor.cpp
PerformanceTest.cpp
PerformanceTestInfo.cpp
executeQuery.cpp
applySubstitutions.cpp
ReportBuilder.cpp
PerformanceTestSuite.cpp
)
target_link_libraries (clickhouse-performance-test-lib PRIVATE dbms clickhouse_common_io clickhouse_common_config ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_include_directories (clickhouse-performance-test-lib SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})

View File

@ -0,0 +1,90 @@
#include "ConfigPreprocessor.h"
#include <Core/Types.h>
#include <Poco/Path.h>
#include <regex>
namespace DB
{
std::vector<XMLConfigurationPtr> ConfigPreprocessor::processConfig(
const Strings & tests_tags,
const Strings & tests_names,
const Strings & tests_names_regexp,
const Strings & skip_tags,
const Strings & skip_names,
const Strings & skip_names_regexp) const
{
std::vector<XMLConfigurationPtr> result;
for (const auto & path : paths)
{
result.emplace_back(new XMLConfiguration(path));
result.back()->setString("path", Poco::Path(path).absolute().toString());
}
/// Leave tests:
removeConfigurationsIf(result, FilterType::Tag, tests_tags, true);
removeConfigurationsIf(result, FilterType::Name, tests_names, true);
removeConfigurationsIf(result, FilterType::Name_regexp, tests_names_regexp, true);
/// Skip tests
removeConfigurationsIf(result, FilterType::Tag, skip_tags, false);
removeConfigurationsIf(result, FilterType::Name, skip_names, false);
removeConfigurationsIf(result, FilterType::Name_regexp, skip_names_regexp, false);
return result;
}
void ConfigPreprocessor::removeConfigurationsIf(
std::vector<XMLConfigurationPtr> & configs,
ConfigPreprocessor::FilterType filter_type,
const Strings & values,
bool leave) const
{
auto checker = [&filter_type, &values, &leave] (XMLConfigurationPtr & config)
{
if (values.size() == 0)
return false;
bool remove_or_not = false;
if (filter_type == FilterType::Tag)
{
Strings tags_keys;
config->keys("tags", tags_keys);
Strings tags(tags_keys.size());
for (size_t i = 0; i != tags_keys.size(); ++i)
tags[i] = config->getString("tags.tag[" + std::to_string(i) + "]");
for (const std::string & config_tag : tags)
{
if (std::find(values.begin(), values.end(), config_tag) != values.end())
remove_or_not = true;
}
}
if (filter_type == FilterType::Name)
{
remove_or_not = (std::find(values.begin(), values.end(), config->getString("name", "")) != values.end());
}
if (filter_type == FilterType::Name_regexp)
{
std::string config_name = config->getString("name", "");
auto regex_checker = [&config_name](const std::string & name_regexp)
{
std::regex pattern(name_regexp);
return std::regex_search(config_name, pattern);
};
remove_or_not = config->has("name") ? (std::find_if(values.begin(), values.end(), regex_checker) != values.end()) : false;
}
if (leave)
remove_or_not = !remove_or_not;
return remove_or_not;
};
auto new_end = std::remove_if(configs.begin(), configs.end(), checker);
configs.erase(new_end, configs.end());
}
}

View File

@ -0,0 +1,50 @@
#pragma once
#include <Poco/DOM/Document.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Core/Types.h>
#include <vector>
#include <string>
namespace DB
{
using XMLConfiguration = Poco::Util::XMLConfiguration;
using XMLConfigurationPtr = Poco::AutoPtr<XMLConfiguration>;
using XMLDocumentPtr = Poco::AutoPtr<Poco::XML::Document>;
class ConfigPreprocessor
{
public:
ConfigPreprocessor(const Strings & paths_)
: paths(paths_)
{}
std::vector<XMLConfigurationPtr> processConfig(
const Strings & tests_tags,
const Strings & tests_names,
const Strings & tests_names_regexp,
const Strings & skip_tags,
const Strings & skip_names,
const Strings & skip_names_regexp) const;
private:
enum class FilterType
{
Tag,
Name,
Name_regexp
};
/// Removes configurations that has a given value.
/// If leave is true, the logic is reversed.
void removeConfigurationsIf(
std::vector<XMLConfigurationPtr> & configs,
FilterType filter_type,
const Strings & values,
bool leave = false) const;
const Strings paths;
};
}

View File

@ -0,0 +1,66 @@
#include "JSONString.h"
#include <regex>
#include <sstream>
namespace DB
{
namespace
{
std::string pad(size_t padding)
{
return std::string(padding * 4, ' ');
}
const std::regex NEW_LINE{"\n"};
}
void JSONString::set(const std::string & key, std::string value, bool wrap)
{
if (value.empty())
value = "null";
bool reserved = (value[0] == '[' || value[0] == '{' || value == "null");
if (!reserved && wrap)
value = '"' + std::regex_replace(value, NEW_LINE, "\\n") + '"';
content[key] = value;
}
void JSONString::set(const std::string & key, const std::vector<JSONString> & run_infos)
{
std::ostringstream value;
value << "[\n";
for (size_t i = 0; i < run_infos.size(); ++i)
{
value << pad(padding + 1) + run_infos[i].asString(padding + 2);
if (i != run_infos.size() - 1)
value << ',';
value << "\n";
}
value << pad(padding) << ']';
content[key] = value.str();
}
std::string JSONString::asString(size_t cur_padding) const
{
std::ostringstream repr;
repr << "{";
for (auto it = content.begin(); it != content.end(); ++it)
{
if (it != content.begin())
repr << ',';
/// construct "key": "value" string with padding
repr << "\n" << pad(cur_padding) << '"' << it->first << '"' << ": " << it->second;
}
repr << "\n" << pad(cur_padding - 1) << '}';
return repr.str();
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <Core/Types.h>
#include <sys/stat.h>
#include <type_traits>
#include <vector>
#include <map>
namespace DB
{
/// NOTE The code is totally wrong.
class JSONString
{
private:
std::map<std::string, std::string> content;
size_t padding;
public:
explicit JSONString(size_t padding_ = 1) : padding(padding_) {}
void set(const std::string & key, std::string value, bool wrap = true);
template <typename T>
std::enable_if_t<std::is_arithmetic_v<T>> set(const std::string key, T value)
{
set(key, std::to_string(value), /*wrap= */ false);
}
void set(const std::string & key, const std::vector<JSONString> & run_infos);
std::string asString() const
{
return asString(padding);
}
std::string asString(size_t cur_padding) const;
};
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,64 @@
#pragma once
#include <Client/Connection.h>
#include <Common/InterruptListener.h>
#include <common/logger_useful.h>
#include <Poco/Util/XMLConfiguration.h>
#include "PerformanceTestInfo.h"
namespace DB
{
using XMLConfiguration = Poco::Util::XMLConfiguration;
using XMLConfigurationPtr = Poco::AutoPtr<XMLConfiguration>;
using QueriesWithIndexes = std::vector<std::pair<std::string, size_t>>;
class PerformanceTest
{
public:
PerformanceTest(
const XMLConfigurationPtr & config_,
Connection & connection_,
InterruptListener & interrupt_listener_,
const PerformanceTestInfo & test_info_,
Context & context_,
const std::vector<size_t> & queries_to_run_);
bool checkPreconditions() const;
void prepare() const;
std::vector<TestStats> execute();
void finish() const;
const PerformanceTestInfo & getTestInfo() const
{
return test_info;
}
bool checkSIGINT() const
{
return got_SIGINT;
}
private:
void runQueries(
const QueriesWithIndexes & queries_with_indexes,
std::vector<TestStats> & statistics_by_run);
UInt64 calculateMaxExecTime() const;
private:
XMLConfigurationPtr config;
Connection & connection;
InterruptListener & interrupt_listener;
PerformanceTestInfo test_info;
Context & context;
std::vector<size_t> queries_to_run;
Poco::Logger * log;
bool got_SIGINT = false;
};
}

View File

@ -0,0 +1,285 @@
#include "PerformanceTestInfo.h"
#include <Common/getMultipleKeysFromConfig.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <boost/filesystem.hpp>
#include "applySubstitutions.h"
#include <iostream>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
void extractSettings(
const XMLConfigurationPtr & config,
const std::string & key,
const Strings & settings_list,
std::map<std::string, std::string> & settings_to_apply)
{
for (const std::string & setup : settings_list)
{
if (setup == "profile")
continue;
std::string value = config->getString(key + "." + setup);
if (value.empty())
value = "true";
settings_to_apply[setup] = value;
}
}
void checkMetricsInput(const Strings & metrics, ExecutionType exec_type)
{
Strings loop_metrics = {
"min_time", "quantiles", "total_time",
"queries_per_second", "rows_per_second",
"bytes_per_second"};
Strings non_loop_metrics = {
"max_rows_per_second", "max_bytes_per_second",
"avg_rows_per_second", "avg_bytes_per_second"};
if (exec_type == ExecutionType::Loop)
{
for (const std::string & metric : metrics)
{
auto non_loop_pos =
std::find(non_loop_metrics.begin(), non_loop_metrics.end(), metric);
if (non_loop_pos != non_loop_metrics.end())
throw Exception("Wrong type of metric for loop execution type (" + metric + ")",
ErrorCodes::BAD_ARGUMENTS);
}
}
else
{
for (const std::string & metric : metrics)
{
auto loop_pos = std::find(loop_metrics.begin(), loop_metrics.end(), metric);
if (loop_pos != loop_metrics.end())
throw Exception(
"Wrong type of metric for non-loop execution type (" + metric + ")",
ErrorCodes::BAD_ARGUMENTS);
}
}
}
}
namespace fs = boost::filesystem;
PerformanceTestInfo::PerformanceTestInfo(
XMLConfigurationPtr config,
const std::string & profiles_file_)
: profiles_file(profiles_file_)
{
test_name = config->getString("name");
path = config->getString("path");
applySettings(config);
extractQueries(config);
processSubstitutions(config);
getExecutionType(config);
getStopConditions(config);
getMetrics(config);
extractAuxiliaryQueries(config);
}
void PerformanceTestInfo::applySettings(XMLConfigurationPtr config)
{
if (config->has("settings"))
{
std::map<std::string, std::string> settings_to_apply;
Strings config_settings;
config->keys("settings", config_settings);
auto settings_contain = [&config_settings] (const std::string & setting)
{
auto position = std::find(config_settings.begin(), config_settings.end(), setting);
return position != config_settings.end();
};
/// Preprocess configuration file
if (settings_contain("profile"))
{
if (!profiles_file.empty())
{
std::string profile_name = config->getString("settings.profile");
XMLConfigurationPtr profiles_config(new XMLConfiguration(profiles_file));
Strings profile_settings;
profiles_config->keys("profiles." + profile_name, profile_settings);
extractSettings(profiles_config, "profiles." + profile_name, profile_settings, settings_to_apply);
}
}
extractSettings(config, "settings", config_settings, settings_to_apply);
/// This macro goes through all settings in the Settings.h
/// and, if found any settings in test's xml configuration
/// with the same name, sets its value to settings
std::map<std::string, std::string>::iterator it;
#define EXTRACT_SETTING(TYPE, NAME, DEFAULT, DESCRIPTION) \
it = settings_to_apply.find(#NAME); \
if (it != settings_to_apply.end()) \
settings.set(#NAME, settings_to_apply[#NAME]);
APPLY_FOR_SETTINGS(EXTRACT_SETTING)
#undef EXTRACT_SETTING
if (settings_contain("average_rows_speed_precision"))
TestStats::avg_rows_speed_precision =
config->getDouble("settings.average_rows_speed_precision");
if (settings_contain("average_bytes_speed_precision"))
TestStats::avg_bytes_speed_precision =
config->getDouble("settings.average_bytes_speed_precision");
}
}
void PerformanceTestInfo::extractQueries(XMLConfigurationPtr config)
{
if (config->has("query"))
queries = getMultipleValuesFromConfig(*config, "", "query");
if (config->has("query_file"))
{
const std::string filename = config->getString("query_file");
if (filename.empty())
throw Exception("Empty file name", ErrorCodes::BAD_ARGUMENTS);
bool tsv = fs::path(filename).extension().string() == ".tsv";
ReadBufferFromFile query_file(filename);
std::string query;
if (tsv)
{
while (!query_file.eof())
{
readEscapedString(query, query_file);
assertChar('\n', query_file);
queries.push_back(query);
}
}
else
{
readStringUntilEOF(query, query_file);
queries.push_back(query);
}
}
if (queries.empty())
throw Exception("Did not find any query to execute: " + test_name,
ErrorCodes::BAD_ARGUMENTS);
}
void PerformanceTestInfo::processSubstitutions(XMLConfigurationPtr config)
{
if (config->has("substitutions"))
{
/// Make "subconfig" of inner xml block
ConfigurationPtr substitutions_view(config->createView("substitutions"));
constructSubstitutions(substitutions_view, substitutions);
auto queries_pre_format = queries;
queries.clear();
for (const auto & query : queries_pre_format)
{
auto formatted = formatQueries(query, substitutions);
queries.insert(queries.end(), formatted.begin(), formatted.end());
}
}
}
void PerformanceTestInfo::getExecutionType(XMLConfigurationPtr config)
{
if (!config->has("type"))
throw Exception("Missing type property in config: " + test_name,
ErrorCodes::BAD_ARGUMENTS);
std::string config_exec_type = config->getString("type");
if (config_exec_type == "loop")
exec_type = ExecutionType::Loop;
else if (config_exec_type == "once")
exec_type = ExecutionType::Once;
else
throw Exception("Unknown type " + config_exec_type + " in :" + test_name,
ErrorCodes::BAD_ARGUMENTS);
}
void PerformanceTestInfo::getStopConditions(XMLConfigurationPtr config)
{
TestStopConditions stop_conditions_template;
if (config->has("stop_conditions"))
{
ConfigurationPtr stop_conditions_config(config->createView("stop_conditions"));
stop_conditions_template.loadFromConfig(stop_conditions_config);
}
if (stop_conditions_template.empty())
throw Exception("No termination conditions were found in config",
ErrorCodes::BAD_ARGUMENTS);
times_to_run = config->getUInt("times_to_run", 1);
for (size_t i = 0; i < times_to_run * queries.size(); ++i)
stop_conditions_by_run.push_back(stop_conditions_template);
}
void PerformanceTestInfo::getMetrics(XMLConfigurationPtr config)
{
ConfigurationPtr metrics_view(config->createView("metrics"));
metrics_view->keys(metrics);
if (config->has("main_metric"))
{
Strings main_metrics;
config->keys("main_metric", main_metrics);
if (main_metrics.size())
main_metric = main_metrics[0];
}
if (!main_metric.empty())
{
if (std::find(metrics.begin(), metrics.end(), main_metric) == metrics.end())
metrics.push_back(main_metric);
}
else
{
if (metrics.empty())
throw Exception("You shoud specify at least one metric",
ErrorCodes::BAD_ARGUMENTS);
main_metric = metrics[0];
}
if (metrics.size() > 0)
checkMetricsInput(metrics, exec_type);
}
void PerformanceTestInfo::extractAuxiliaryQueries(XMLConfigurationPtr config)
{
if (config->has("create_query"))
create_queries = getMultipleValuesFromConfig(*config, "", "create_query");
if (config->has("fill_query"))
fill_queries = getMultipleValuesFromConfig(*config, "", "fill_query");
if (config->has("drop_query"))
drop_queries = getMultipleValuesFromConfig(*config, "", "drop_query");
}
}

View File

@ -0,0 +1,60 @@
#pragma once
#include <string>
#include <vector>
#include <map>
#include <Interpreters/Settings.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/AutoPtr.h>
#include "StopConditionsSet.h"
#include "TestStopConditions.h"
#include "TestStats.h"
namespace DB
{
enum class ExecutionType
{
Loop,
Once
};
using XMLConfiguration = Poco::Util::XMLConfiguration;
using XMLConfigurationPtr = Poco::AutoPtr<XMLConfiguration>;
using StringToVector = std::map<std::string, Strings>;
/// Class containing all info to run performance test
class PerformanceTestInfo
{
public:
PerformanceTestInfo(XMLConfigurationPtr config, const std::string & profiles_file_);
std::string test_name;
std::string path;
std::string main_metric;
Strings queries;
Strings metrics;
Settings settings;
ExecutionType exec_type;
StringToVector substitutions;
size_t times_to_run;
std::string profiles_file;
std::vector<TestStopConditions> stop_conditions_by_run;
Strings create_queries;
Strings fill_queries;
Strings drop_queries;
private:
void applySettings(XMLConfigurationPtr config);
void extractQueries(XMLConfigurationPtr config);
void processSubstitutions(XMLConfigurationPtr config);
void getExecutionType(XMLConfigurationPtr config);
void getStopConditions(XMLConfigurationPtr config);
void getMetrics(XMLConfigurationPtr config);
void extractAuxiliaryQueries(XMLConfigurationPtr config);
};
}

View File

@ -0,0 +1,410 @@
#include <algorithm>
#include <iostream>
#include <limits>
#include <regex>
#include <thread>
#include <memory>
#include <port/unistd.h>
#include <sys/stat.h>
#include <boost/filesystem.hpp>
#include <boost/program_options.hpp>
#include <Poco/AutoPtr.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/FormattingChannel.h>
#include <Poco/Logger.h>
#include <Poco/Path.h>
#include <Poco/PatternFormatter.h>
#include <Poco/Util/XMLConfiguration.h>
#include <common/logger_useful.h>
#include <Client/Connection.h>
#include <Core/Types.h>
#include <Interpreters/Context.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/UseSSL.h>
#include <Interpreters/Settings.h>
#include <Common/Exception.h>
#include <Common/InterruptListener.h>
#include "TestStopConditions.h"
#include "TestStats.h"
#include "ConfigPreprocessor.h"
#include "PerformanceTest.h"
#include "ReportBuilder.h"
namespace fs = boost::filesystem;
namespace po = boost::program_options;
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int FILE_DOESNT_EXIST;
}
/** Tests launcher for ClickHouse.
* The tool walks through given or default folder in order to find files with
* tests' descriptions and launches it.
*/
class PerformanceTestSuite
{
public:
PerformanceTestSuite(const std::string & host_,
const UInt16 port_,
const bool secure_,
const std::string & default_database_,
const std::string & user_,
const std::string & password_,
const bool lite_output_,
const std::string & profiles_file_,
Strings && input_files_,
Strings && tests_tags_,
Strings && skip_tags_,
Strings && tests_names_,
Strings && skip_names_,
Strings && tests_names_regexp_,
Strings && skip_names_regexp_,
const std::unordered_map<std::string, std::vector<size_t>> query_indexes_,
const ConnectionTimeouts & timeouts)
: connection(host_, port_, default_database_, user_,
password_, timeouts, "performance-test", Protocol::Compression::Enable,
secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable)
, tests_tags(std::move(tests_tags_))
, tests_names(std::move(tests_names_))
, tests_names_regexp(std::move(tests_names_regexp_))
, skip_tags(std::move(skip_tags_))
, skip_names(std::move(skip_names_))
, skip_names_regexp(std::move(skip_names_regexp_))
, query_indexes(query_indexes_)
, lite_output(lite_output_)
, profiles_file(profiles_file_)
, input_files(input_files_)
, log(&Poco::Logger::get("PerformanceTestSuite"))
{
if (input_files.size() < 1)
throw Exception("No tests were specified", ErrorCodes::BAD_ARGUMENTS);
}
/// This functionality seems strange.
//void initialize(Poco::Util::Application & self [[maybe_unused]])
//{
// std::string home_path;
// const char * home_path_cstr = getenv("HOME");
// if (home_path_cstr)
// home_path = home_path_cstr;
// configReadClient(Poco::Util::Application::instance().config(), home_path);
//}
int run()
{
std::string name;
UInt64 version_major;
UInt64 version_minor;
UInt64 version_patch;
UInt64 version_revision;
connection.getServerVersion(name, version_major, version_minor, version_patch, version_revision);
std::stringstream ss;
ss << version_major << "." << version_minor << "." << version_patch;
server_version = ss.str();
report_builder = std::make_shared<ReportBuilder>(server_version);
processTestsConfigurations(input_files);
return 0;
}
private:
Connection connection;
const Strings & tests_tags;
const Strings & tests_names;
const Strings & tests_names_regexp;
const Strings & skip_tags;
const Strings & skip_names;
const Strings & skip_names_regexp;
std::unordered_map<std::string, std::vector<size_t>> query_indexes;
Context global_context = Context::createGlobal();
std::shared_ptr<ReportBuilder> report_builder;
std::string server_version;
InterruptListener interrupt_listener;
using XMLConfiguration = Poco::Util::XMLConfiguration;
using XMLConfigurationPtr = Poco::AutoPtr<XMLConfiguration>;
bool lite_output;
std::string profiles_file;
Strings input_files;
std::vector<XMLConfigurationPtr> tests_configurations;
Poco::Logger * log;
void processTestsConfigurations(const Strings & paths)
{
LOG_INFO(log, "Preparing test configurations");
ConfigPreprocessor config_prep(paths);
tests_configurations = config_prep.processConfig(
tests_tags,
tests_names,
tests_names_regexp,
skip_tags,
skip_names,
skip_names_regexp);
LOG_INFO(log, "Test configurations prepared");
if (tests_configurations.size())
{
Strings outputs;
for (auto & test_config : tests_configurations)
{
auto [output, signal] = runTest(test_config);
if (lite_output)
std::cout << output;
else
outputs.push_back(output);
if (signal)
break;
}
if (!lite_output && outputs.size())
{
std::cout << "[" << std::endl;
for (size_t i = 0; i != outputs.size(); ++i)
{
std::cout << outputs[i];
if (i != outputs.size() - 1)
std::cout << ",";
std::cout << std::endl;
}
std::cout << "]" << std::endl;
}
}
}
std::pair<std::string, bool> runTest(XMLConfigurationPtr & test_config)
{
PerformanceTestInfo info(test_config, profiles_file);
LOG_INFO(log, "Config for test '" << info.test_name << "' parsed");
PerformanceTest current(test_config, connection, interrupt_listener, info, global_context, query_indexes[info.path]);
current.checkPreconditions();
LOG_INFO(log, "Preconditions for test '" << info.test_name << "' are fullfilled");
LOG_INFO(log, "Preparing for run, have " << info.create_queries.size()
<< " create queries and " << info.fill_queries.size() << " fill queries");
current.prepare();
LOG_INFO(log, "Prepared");
LOG_INFO(log, "Running test '" << info.test_name << "'");
auto result = current.execute();
LOG_INFO(log, "Test '" << info.test_name << "' finished");
LOG_INFO(log, "Running post run queries");
current.finish();
LOG_INFO(log, "Postqueries finished");
if (lite_output)
return {report_builder->buildCompactReport(info, result, query_indexes[info.path]), current.checkSIGINT()};
else
return {report_builder->buildFullReport(info, result, query_indexes[info.path]), current.checkSIGINT()};
}
};
}
static void getFilesFromDir(const fs::path & dir, std::vector<std::string> & input_files, const bool recursive = false)
{
Poco::Logger * log = &Poco::Logger::get("PerformanceTestSuite");
if (dir.extension().string() == ".xml")
LOG_WARNING(log, dir.string() + "' is a directory, but has .xml extension");
fs::directory_iterator end;
for (fs::directory_iterator it(dir); it != end; ++it)
{
const fs::path file = (*it);
if (recursive && fs::is_directory(file))
getFilesFromDir(file, input_files, recursive);
else if (!fs::is_directory(file) && file.extension().string() == ".xml")
input_files.push_back(file.string());
}
}
static std::vector<std::string> getInputFiles(const po::variables_map & options, Poco::Logger * log)
{
std::vector<std::string> input_files;
bool recursive = options.count("recursive");
if (!options.count("input-files"))
{
LOG_INFO(log, "Trying to find test scenario files in the current folder...");
fs::path curr_dir(".");
getFilesFromDir(curr_dir, input_files, recursive);
if (input_files.empty())
throw DB::Exception("Did not find any xml files", DB::ErrorCodes::BAD_ARGUMENTS);
else
LOG_INFO(log, "Found " << input_files.size() << " files");
}
else
{
input_files = options["input-files"].as<std::vector<std::string>>();
LOG_INFO(log, "Found " + std::to_string(input_files.size()) + " input files");
std::vector<std::string> collected_files;
for (const std::string & filename : input_files)
{
fs::path file(filename);
if (!fs::exists(file))
throw DB::Exception("File '" + filename + "' does not exist", DB::ErrorCodes::FILE_DOESNT_EXIST);
if (fs::is_directory(file))
{
getFilesFromDir(file, collected_files, recursive);
}
else
{
if (file.extension().string() != ".xml")
throw DB::Exception("File '" + filename + "' does not have .xml extension", DB::ErrorCodes::BAD_ARGUMENTS);
collected_files.push_back(filename);
}
}
input_files = std::move(collected_files);
}
std::sort(input_files.begin(), input_files.end());
return input_files;
}
std::unordered_map<std::string, std::vector<std::size_t>> getTestQueryIndexes(const po::basic_parsed_options<char> & parsed_opts)
{
std::unordered_map<std::string, std::vector<std::size_t>> result;
const auto & options = parsed_opts.options;
for (size_t i = 0; i < options.size() - 1; ++i)
{
const auto & opt = options[i];
if (opt.string_key == "input-files")
{
if (options[i + 1].string_key == "query-indexes")
{
const std::string & test_path = Poco::Path(opt.value[0]).absolute().toString();
for (const auto & query_num_str : options[i + 1].value)
{
size_t query_num = std::stoul(query_num_str);
result[test_path].push_back(query_num);
}
}
}
}
return result;
}
int mainEntryClickHousePerformanceTest(int argc, char ** argv)
try
{
using po::value;
using Strings = DB::Strings;
po::options_description desc("Allowed options");
desc.add_options()
("help", "produce help message")
("lite", "use lite version of output")
("profiles-file", value<std::string>()->default_value(""), "Specify a file with global profiles")
("host,h", value<std::string>()->default_value("localhost"), "")
("port", value<UInt16>()->default_value(9000), "")
("secure,s", "Use TLS connection")
("database", value<std::string>()->default_value("default"), "")
("user", value<std::string>()->default_value("default"), "")
("password", value<std::string>()->default_value(""), "")
("log-level", value<std::string>()->default_value("information"), "Set log level")
("tags", value<Strings>()->multitoken(), "Run only tests with tag")
("skip-tags", value<Strings>()->multitoken(), "Do not run tests with tag")
("names", value<Strings>()->multitoken(), "Run tests with specific name")
("skip-names", value<Strings>()->multitoken(), "Do not run tests with name")
("names-regexp", value<Strings>()->multitoken(), "Run tests with names matching regexp")
("skip-names-regexp", value<Strings>()->multitoken(), "Do not run tests with names matching regexp")
("input-files", value<Strings>()->multitoken(), "Input .xml files")
("query-indexes", value<std::vector<size_t>>()->multitoken(), "Input query indexes")
("recursive,r", "Recurse in directories to find all xml's");
po::options_description cmdline_options;
cmdline_options.add(desc);
po::variables_map options;
po::basic_parsed_options<char> parsed = po::command_line_parser(argc, argv).options(cmdline_options).run();
auto queries_with_indexes = getTestQueryIndexes(parsed);
po::store(parsed, options);
po::notify(options);
Poco::AutoPtr<Poco::PatternFormatter> formatter(new Poco::PatternFormatter("%Y.%m.%d %H:%M:%S.%F <%p> %s: %t"));
Poco::AutoPtr<Poco::ConsoleChannel> console_chanel(new Poco::ConsoleChannel);
Poco::AutoPtr<Poco::FormattingChannel> channel(new Poco::FormattingChannel(formatter, console_chanel));
Poco::Logger::root().setLevel(options["log-level"].as<std::string>());
Poco::Logger::root().setChannel(channel);
Poco::Logger * log = &Poco::Logger::get("PerformanceTestSuite");
if (options.count("help"))
{
std::cout << "Usage: " << argv[0] << " [options] [test_file ...] [tests_folder]\n";
std::cout << desc << "\n";
return 0;
}
Strings input_files = getInputFiles(options, log);
Strings tests_tags = options.count("tags") ? options["tags"].as<Strings>() : Strings({});
Strings skip_tags = options.count("skip-tags") ? options["skip-tags"].as<Strings>() : Strings({});
Strings tests_names = options.count("names") ? options["names"].as<Strings>() : Strings({});
Strings skip_names = options.count("skip-names") ? options["skip-names"].as<Strings>() : Strings({});
Strings tests_names_regexp = options.count("names-regexp") ? options["names-regexp"].as<Strings>() : Strings({});
Strings skip_names_regexp = options.count("skip-names-regexp") ? options["skip-names-regexp"].as<Strings>() : Strings({});
auto timeouts = DB::ConnectionTimeouts::getTCPTimeoutsWithoutFailover(DB::Settings());
DB::UseSSL use_ssl;
DB::PerformanceTestSuite performance_test_suite(
options["host"].as<std::string>(),
options["port"].as<UInt16>(),
options.count("secure"),
options["database"].as<std::string>(),
options["user"].as<std::string>(),
options["password"].as<std::string>(),
options.count("lite") > 0,
options["profiles-file"].as<std::string>(),
std::move(input_files),
std::move(tests_tags),
std::move(skip_tags),
std::move(tests_names),
std::move(skip_names),
std::move(tests_names_regexp),
std::move(skip_names_regexp),
queries_with_indexes,
timeouts);
return performance_test_suite.run();
}
catch (...)
{
std::cout << DB::getCurrentExceptionMessage(/*with stacktrace = */ true) << std::endl;
int code = DB::getCurrentExceptionCode();
return code ? code : 1;
}

View File

@ -0,0 +1,204 @@
#include "ReportBuilder.h"
#include <algorithm>
#include <regex>
#include <sstream>
#include <thread>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getFQDNOrHostName.h>
#include <common/getMemoryAmount.h>
#include "JSONString.h"
namespace DB
{
namespace
{
const std::regex QUOTE_REGEX{"\""};
}
ReportBuilder::ReportBuilder(const std::string & server_version_)
: server_version(server_version_)
, hostname(getFQDNOrHostName())
, num_cores(getNumberOfPhysicalCPUCores())
, num_threads(std::thread::hardware_concurrency())
, ram(getMemoryAmount())
{
}
std::string ReportBuilder::getCurrentTime() const
{
return DateLUT::instance().timeToString(time(nullptr));
}
std::string ReportBuilder::buildFullReport(
const PerformanceTestInfo & test_info,
std::vector<TestStats> & stats,
const std::vector<std::size_t> & queries_to_run) const
{
JSONString json_output;
json_output.set("hostname", hostname);
json_output.set("num_cores", num_cores);
json_output.set("num_threads", num_threads);
json_output.set("ram", ram);
json_output.set("server_version", server_version);
json_output.set("time", getCurrentTime());
json_output.set("test_name", test_info.test_name);
json_output.set("path", test_info.path);
json_output.set("main_metric", test_info.main_metric);
auto has_metric = [&test_info] (const std::string & metric_name)
{
return std::find(test_info.metrics.begin(),
test_info.metrics.end(), metric_name) != test_info.metrics.end();
};
if (test_info.substitutions.size())
{
JSONString json_parameters(2); /// here, 2 is the size of \t padding
for (auto it = test_info.substitutions.begin(); it != test_info.substitutions.end(); ++it)
{
std::string parameter = it->first;
Strings values = it->second;
std::ostringstream array_string;
array_string << "[";
for (size_t i = 0; i != values.size(); ++i)
{
array_string << '"' << std::regex_replace(values[i], QUOTE_REGEX, "\\\"") << '"';
if (i != values.size() - 1)
{
array_string << ", ";
}
}
array_string << ']';
json_parameters.set(parameter, array_string.str());
}
json_output.set("parameters", json_parameters.asString());
}
std::vector<JSONString> run_infos;
for (size_t query_index = 0; query_index < test_info.queries.size(); ++query_index)
{
if (!queries_to_run.empty() && std::find(queries_to_run.begin(), queries_to_run.end(), query_index) == queries_to_run.end())
continue;
for (size_t number_of_launch = 0; number_of_launch < test_info.times_to_run; ++number_of_launch)
{
size_t stat_index = number_of_launch * test_info.queries.size() + query_index;
TestStats & statistics = stats[stat_index];
if (!statistics.ready)
continue;
JSONString runJSON;
auto query = std::regex_replace(test_info.queries[query_index], QUOTE_REGEX, "\\\"");
runJSON.set("query", query);
runJSON.set("query_index", query_index);
if (!statistics.exception.empty())
runJSON.set("exception", statistics.exception);
if (test_info.exec_type == ExecutionType::Loop)
{
/// in seconds
if (has_metric("min_time"))
runJSON.set("min_time", statistics.min_time / double(1000));
if (has_metric("quantiles"))
{
JSONString quantiles(4); /// here, 4 is the size of \t padding
for (double percent = 10; percent <= 90; percent += 10)
{
std::string quantile_key = std::to_string(percent / 100.0);
while (quantile_key.back() == '0')
quantile_key.pop_back();
quantiles.set(quantile_key,
statistics.sampler.quantileInterpolated(percent / 100.0));
}
quantiles.set("0.95",
statistics.sampler.quantileInterpolated(95 / 100.0));
quantiles.set("0.99",
statistics.sampler.quantileInterpolated(99 / 100.0));
quantiles.set("0.999",
statistics.sampler.quantileInterpolated(99.9 / 100.0));
quantiles.set("0.9999",
statistics.sampler.quantileInterpolated(99.99 / 100.0));
runJSON.set("quantiles", quantiles.asString());
}
if (has_metric("total_time"))
runJSON.set("total_time", statistics.total_time);
if (has_metric("queries_per_second"))
runJSON.set("queries_per_second",
double(statistics.queries) / statistics.total_time);
if (has_metric("rows_per_second"))
runJSON.set("rows_per_second",
double(statistics.total_rows_read) / statistics.total_time);
if (has_metric("bytes_per_second"))
runJSON.set("bytes_per_second",
double(statistics.total_bytes_read) / statistics.total_time);
}
else
{
if (has_metric("max_rows_per_second"))
runJSON.set("max_rows_per_second", statistics.max_rows_speed);
if (has_metric("max_bytes_per_second"))
runJSON.set("max_bytes_per_second", statistics.max_bytes_speed);
if (has_metric("avg_rows_per_second"))
runJSON.set("avg_rows_per_second", statistics.avg_rows_speed_value);
if (has_metric("avg_bytes_per_second"))
runJSON.set("avg_bytes_per_second", statistics.avg_bytes_speed_value);
}
run_infos.push_back(runJSON);
}
}
json_output.set("runs", run_infos);
return json_output.asString();
}
std::string ReportBuilder::buildCompactReport(
const PerformanceTestInfo & test_info,
std::vector<TestStats> & stats,
const std::vector<std::size_t> & queries_to_run) const
{
std::ostringstream output;
for (size_t query_index = 0; query_index < test_info.queries.size(); ++query_index)
{
if (!queries_to_run.empty() && std::find(queries_to_run.begin(), queries_to_run.end(), query_index) == queries_to_run.end())
continue;
for (size_t number_of_launch = 0; number_of_launch < test_info.times_to_run; ++number_of_launch)
{
if (test_info.queries.size() > 1)
output << "query \"" << test_info.queries[query_index] << "\", ";
output << "run " << std::to_string(number_of_launch + 1) << ": ";
output << test_info.main_metric << " = ";
size_t index = number_of_launch * test_info.queries.size() + query_index;
output << stats[index].getStatisticByName(test_info.main_metric);
output << "\n";
}
}
return output.str();
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include "PerformanceTestInfo.h"
#include <vector>
#include <string>
namespace DB
{
class ReportBuilder
{
public:
ReportBuilder(const std::string & server_version_);
std::string buildFullReport(
const PerformanceTestInfo & test_info,
std::vector<TestStats> & stats,
const std::vector<std::size_t> & queries_to_run) const;
std::string buildCompactReport(
const PerformanceTestInfo & test_info,
std::vector<TestStats> & stats,
const std::vector<std::size_t> & queries_to_run) const;
private:
std::string server_version;
std::string hostname;
size_t num_cores;
size_t num_threads;
size_t ram;
private:
std::string getCurrentTime() const;
};
}

View File

@ -0,0 +1,63 @@
#include "StopConditionsSet.h"
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void StopConditionsSet::loadFromConfig(const ConfigurationPtr & stop_conditions_view)
{
Strings keys;
stop_conditions_view->keys(keys);
for (const std::string & key : keys)
{
if (key == "total_time_ms")
total_time_ms.value = stop_conditions_view->getUInt64(key);
else if (key == "rows_read")
rows_read.value = stop_conditions_view->getUInt64(key);
else if (key == "bytes_read_uncompressed")
bytes_read_uncompressed.value = stop_conditions_view->getUInt64(key);
else if (key == "iterations")
iterations.value = stop_conditions_view->getUInt64(key);
else if (key == "min_time_not_changing_for_ms")
min_time_not_changing_for_ms.value = stop_conditions_view->getUInt64(key);
else if (key == "max_speed_not_changing_for_ms")
max_speed_not_changing_for_ms.value = stop_conditions_view->getUInt64(key);
else if (key == "average_speed_not_changing_for_ms")
average_speed_not_changing_for_ms.value = stop_conditions_view->getUInt64(key);
else
throw Exception("Met unkown stop condition: " + key, ErrorCodes::LOGICAL_ERROR);
}
++initialized_count;
}
void StopConditionsSet::reset()
{
total_time_ms.fulfilled = false;
rows_read.fulfilled = false;
bytes_read_uncompressed.fulfilled = false;
iterations.fulfilled = false;
min_time_not_changing_for_ms.fulfilled = false;
max_speed_not_changing_for_ms.fulfilled = false;
average_speed_not_changing_for_ms.fulfilled = false;
fulfilled_count = 0;
}
void StopConditionsSet::report(UInt64 value, StopConditionsSet::StopCondition & condition)
{
if (condition.value && !condition.fulfilled && value >= condition.value)
{
condition.fulfilled = true;
++fulfilled_count;
}
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Core/Types.h>
#include <Poco/Util/XMLConfiguration.h>
namespace DB
{
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
/// A set of supported stop conditions.
struct StopConditionsSet
{
void loadFromConfig(const ConfigurationPtr & stop_conditions_view);
void reset();
/// Note: only conditions with UInt64 minimal thresholds are supported.
/// I.e. condition is fulfilled when value is exceeded.
struct StopCondition
{
UInt64 value = 0;
bool fulfilled = false;
};
void report(UInt64 value, StopCondition & condition);
StopCondition total_time_ms;
StopCondition rows_read;
StopCondition bytes_read_uncompressed;
StopCondition iterations;
StopCondition min_time_not_changing_for_ms;
StopCondition max_speed_not_changing_for_ms;
StopCondition average_speed_not_changing_for_ms;
size_t initialized_count = 0;
size_t fulfilled_count = 0;
};
}

View File

@ -0,0 +1,165 @@
#include "TestStats.h"
namespace DB
{
namespace
{
const std::string FOUR_SPACES = " ";
}
std::string TestStats::getStatisticByName(const std::string & statistic_name)
{
if (statistic_name == "min_time")
return std::to_string(min_time) + "ms";
if (statistic_name == "quantiles")
{
std::string result = "\n";
for (double percent = 10; percent <= 90; percent += 10)
{
result += FOUR_SPACES + std::to_string((percent / 100));
result += ": " + std::to_string(sampler.quantileInterpolated(percent / 100.0));
result += "\n";
}
result += FOUR_SPACES + "0.95: " + std::to_string(sampler.quantileInterpolated(95 / 100.0)) + "\n";
result += FOUR_SPACES + "0.99: " + std::to_string(sampler.quantileInterpolated(99 / 100.0)) + "\n";
result += FOUR_SPACES + "0.999: " + std::to_string(sampler.quantileInterpolated(99.9 / 100.)) + "\n";
result += FOUR_SPACES + "0.9999: " + std::to_string(sampler.quantileInterpolated(99.99 / 100.));
return result;
}
if (statistic_name == "total_time")
return std::to_string(total_time) + "s";
if (statistic_name == "queries_per_second")
return std::to_string(queries / total_time);
if (statistic_name == "rows_per_second")
return std::to_string(total_rows_read / total_time);
if (statistic_name == "bytes_per_second")
return std::to_string(total_bytes_read / total_time);
if (statistic_name == "max_rows_per_second")
return std::to_string(max_rows_speed);
if (statistic_name == "max_bytes_per_second")
return std::to_string(max_bytes_speed);
if (statistic_name == "avg_rows_per_second")
return std::to_string(avg_rows_speed_value);
if (statistic_name == "avg_bytes_per_second")
return std::to_string(avg_bytes_speed_value);
return "";
}
void TestStats::update_min_time(UInt64 min_time_candidate)
{
if (min_time_candidate < min_time)
{
min_time = min_time_candidate;
min_time_watch.restart();
}
}
void TestStats::update_max_speed(
size_t max_speed_candidate,
Stopwatch & max_speed_watch,
UInt64 & max_speed)
{
if (max_speed_candidate > max_speed)
{
max_speed = max_speed_candidate;
max_speed_watch.restart();
}
}
void TestStats::update_average_speed(
double new_speed_info,
Stopwatch & avg_speed_watch,
size_t & number_of_info_batches,
double precision,
double & avg_speed_first,
double & avg_speed_value)
{
avg_speed_value = ((avg_speed_value * number_of_info_batches) + new_speed_info);
++number_of_info_batches;
avg_speed_value /= number_of_info_batches;
if (avg_speed_first == 0)
{
avg_speed_first = avg_speed_value;
}
if (std::abs(avg_speed_value - avg_speed_first) >= precision)
{
avg_speed_first = avg_speed_value;
avg_speed_watch.restart();
}
}
void TestStats::add(size_t rows_read_inc, size_t bytes_read_inc)
{
total_rows_read += rows_read_inc;
total_bytes_read += bytes_read_inc;
last_query_rows_read += rows_read_inc;
last_query_bytes_read += bytes_read_inc;
double new_rows_speed = last_query_rows_read / watch_per_query.elapsedSeconds();
double new_bytes_speed = last_query_bytes_read / watch_per_query.elapsedSeconds();
/// Update rows speed
update_max_speed(new_rows_speed, max_rows_speed_watch, max_rows_speed);
update_average_speed(new_rows_speed,
avg_rows_speed_watch,
number_of_rows_speed_info_batches,
avg_rows_speed_precision,
avg_rows_speed_first,
avg_rows_speed_value);
/// Update bytes speed
update_max_speed(new_bytes_speed, max_bytes_speed_watch, max_bytes_speed);
update_average_speed(new_bytes_speed,
avg_bytes_speed_watch,
number_of_bytes_speed_info_batches,
avg_bytes_speed_precision,
avg_bytes_speed_first,
avg_bytes_speed_value);
}
void TestStats::updateQueryInfo()
{
++queries;
sampler.insert(watch_per_query.elapsedSeconds());
update_min_time(watch_per_query.elapsed() / (1000 * 1000)); /// ns to ms
}
TestStats::TestStats()
{
watch.reset();
watch_per_query.reset();
min_time_watch.reset();
max_rows_speed_watch.reset();
max_bytes_speed_watch.reset();
avg_rows_speed_watch.reset();
avg_bytes_speed_watch.reset();
}
void TestStats::startWatches()
{
watch.start();
watch_per_query.start();
min_time_watch.start();
max_rows_speed_watch.start();
max_bytes_speed_watch.start();
avg_rows_speed_watch.start();
avg_bytes_speed_watch.start();
}
}

View File

@ -0,0 +1,87 @@
#pragma once
#include <Core/Types.h>
#include <limits>
#include <Common/Stopwatch.h>
#include <AggregateFunctions/ReservoirSampler.h>
namespace DB
{
struct TestStats
{
TestStats();
Stopwatch watch;
Stopwatch watch_per_query;
Stopwatch min_time_watch;
Stopwatch max_rows_speed_watch;
Stopwatch max_bytes_speed_watch;
Stopwatch avg_rows_speed_watch;
Stopwatch avg_bytes_speed_watch;
bool last_query_was_cancelled = false;
size_t queries = 0;
size_t total_rows_read = 0;
size_t total_bytes_read = 0;
size_t last_query_rows_read = 0;
size_t last_query_bytes_read = 0;
using Sampler = ReservoirSampler<double>;
Sampler sampler{1 << 16};
/// min_time in ms
UInt64 min_time = std::numeric_limits<UInt64>::max();
double total_time = 0;
UInt64 max_rows_speed = 0;
UInt64 max_bytes_speed = 0;
double avg_rows_speed_value = 0;
double avg_rows_speed_first = 0;
static inline double avg_rows_speed_precision = 0.001;
double avg_bytes_speed_value = 0;
double avg_bytes_speed_first = 0;
static inline double avg_bytes_speed_precision = 0.001;
size_t number_of_rows_speed_info_batches = 0;
size_t number_of_bytes_speed_info_batches = 0;
bool ready = false; // check if a query wasn't interrupted by SIGINT
std::string exception;
/// Hack, actually this field doesn't required for statistics
bool got_SIGINT = false;
std::string getStatisticByName(const std::string & statistic_name);
void update_min_time(UInt64 min_time_candidate);
void update_average_speed(
double new_speed_info,
Stopwatch & avg_speed_watch,
size_t & number_of_info_batches,
double precision,
double & avg_speed_first,
double & avg_speed_value);
void update_max_speed(
size_t max_speed_candidate,
Stopwatch & max_speed_watch,
UInt64 & max_speed);
void add(size_t rows_read_inc, size_t bytes_read_inc);
void updateQueryInfo();
void setTotalTime()
{
total_time = watch.elapsedSeconds();
}
void startWatches();
};
}

View File

@ -0,0 +1,38 @@
#include "TestStopConditions.h"
namespace DB
{
void TestStopConditions::loadFromConfig(ConfigurationPtr & stop_conditions_config)
{
if (stop_conditions_config->has("all_of"))
{
ConfigurationPtr config_all_of(stop_conditions_config->createView("all_of"));
conditions_all_of.loadFromConfig(config_all_of);
}
if (stop_conditions_config->has("any_of"))
{
ConfigurationPtr config_any_of(stop_conditions_config->createView("any_of"));
conditions_any_of.loadFromConfig(config_any_of);
}
}
bool TestStopConditions::areFulfilled() const
{
return (conditions_all_of.initialized_count && conditions_all_of.fulfilled_count >= conditions_all_of.initialized_count)
|| (conditions_any_of.initialized_count && conditions_any_of.fulfilled_count);
}
UInt64 TestStopConditions::getMaxExecTime() const
{
UInt64 all_of_time = conditions_all_of.total_time_ms.value;
if (all_of_time == 0 && conditions_all_of.initialized_count != 0) /// max time is not set in all conditions
return 0;
else if(all_of_time != 0 && conditions_all_of.initialized_count > 1) /// max time is set, but we have other conditions
return 0;
UInt64 any_of_time = conditions_any_of.total_time_ms.value;
return std::max(all_of_time, any_of_time);
}
}

View File

@ -0,0 +1,57 @@
#pragma once
#include "StopConditionsSet.h"
#include <Poco/Util/XMLConfiguration.h>
namespace DB
{
/// Stop conditions for a test run. The running test will be terminated in either of two conditions:
/// 1. All conditions marked 'all_of' are fulfilled
/// or
/// 2. Any condition marked 'any_of' is fulfilled
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
class TestStopConditions
{
public:
void loadFromConfig(ConfigurationPtr & stop_conditions_config);
inline bool empty() const
{
return !conditions_all_of.initialized_count && !conditions_any_of.initialized_count;
}
#define DEFINE_REPORT_FUNC(FUNC_NAME, CONDITION) \
void FUNC_NAME(UInt64 value) \
{ \
conditions_all_of.report(value, conditions_all_of.CONDITION); \
conditions_any_of.report(value, conditions_any_of.CONDITION); \
}
DEFINE_REPORT_FUNC(reportTotalTime, total_time_ms)
DEFINE_REPORT_FUNC(reportRowsRead, rows_read)
DEFINE_REPORT_FUNC(reportBytesReadUncompressed, bytes_read_uncompressed)
DEFINE_REPORT_FUNC(reportIterations, iterations)
DEFINE_REPORT_FUNC(reportMinTimeNotChangingFor, min_time_not_changing_for_ms)
DEFINE_REPORT_FUNC(reportMaxSpeedNotChangingFor, max_speed_not_changing_for_ms)
DEFINE_REPORT_FUNC(reportAverageSpeedNotChangingFor, average_speed_not_changing_for_ms)
#undef REPORT
bool areFulfilled() const;
void reset()
{
conditions_all_of.reset();
conditions_any_of.reset();
}
/// Return max exec time for these conditions
/// Return zero if max time cannot be determined
UInt64 getMaxExecTime() const;
private:
StopConditionsSet conditions_all_of;
StopConditionsSet conditions_any_of;
};
}

View File

@ -0,0 +1,82 @@
#include "applySubstitutions.h"
#include <algorithm>
#include <vector>
namespace DB
{
void constructSubstitutions(ConfigurationPtr & substitutions_view, StringToVector & out_substitutions)
{
Strings xml_substitutions;
substitutions_view->keys(xml_substitutions);
for (size_t i = 0; i != xml_substitutions.size(); ++i)
{
const ConfigurationPtr xml_substitution(substitutions_view->createView("substitution[" + std::to_string(i) + "]"));
/// Property values for substitution will be stored in a vector
/// accessible by property name
Strings xml_values;
xml_substitution->keys("values", xml_values);
std::string name = xml_substitution->getString("name");
for (size_t j = 0; j != xml_values.size(); ++j)
{
out_substitutions[name].push_back(xml_substitution->getString("values.value[" + std::to_string(j) + "]"));
}
}
}
/// Recursive method which goes through all substitution blocks in xml
/// and replaces property {names} by their values
void runThroughAllOptionsAndPush(StringToVector::iterator substitutions_left,
StringToVector::iterator substitutions_right,
const std::string & template_query,
Strings & out_queries)
{
if (substitutions_left == substitutions_right)
{
out_queries.push_back(template_query); /// completely substituted query
return;
}
std::string substitution_mask = "{" + substitutions_left->first + "}";
if (template_query.find(substitution_mask) == std::string::npos) /// nothing to substitute here
{
runThroughAllOptionsAndPush(std::next(substitutions_left), substitutions_right, template_query, out_queries);
return;
}
for (const std::string & value : substitutions_left->second)
{
/// Copy query string for each unique permutation
std::string query = template_query;
size_t substr_pos = 0;
while (substr_pos != std::string::npos)
{
substr_pos = query.find(substitution_mask);
if (substr_pos != std::string::npos)
query.replace(substr_pos, substitution_mask.length(), value);
}
runThroughAllOptionsAndPush(std::next(substitutions_left), substitutions_right, query, out_queries);
}
}
Strings formatQueries(const std::string & query, StringToVector substitutions_to_generate)
{
Strings queries_res;
runThroughAllOptionsAndPush(
substitutions_to_generate.begin(),
substitutions_to_generate.end(),
query,
queries_res);
return queries_res;
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <Poco/Util/XMLConfiguration.h>
#include <Core/Types.h>
#include <vector>
#include <string>
#include <map>
namespace DB
{
using StringToVector = std::map<std::string, Strings>;
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
void constructSubstitutions(ConfigurationPtr & substitutions_view, StringToVector & out_substitutions);
Strings formatQueries(const std::string & query, StringToVector substitutions_to_generate);
}

View File

@ -0,0 +1,73 @@
#include "executeQuery.h"
#include <IO/Progress.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <Core/Block.h>
namespace DB
{
namespace
{
void checkFulfilledConditionsAndUpdate(
const Progress & progress, RemoteBlockInputStream & stream,
TestStats & statistics, TestStopConditions & stop_conditions,
InterruptListener & interrupt_listener)
{
statistics.add(progress.rows, progress.bytes);
stop_conditions.reportRowsRead(statistics.total_rows_read);
stop_conditions.reportBytesReadUncompressed(statistics.total_bytes_read);
stop_conditions.reportTotalTime(statistics.watch.elapsed() / (1000 * 1000));
stop_conditions.reportMinTimeNotChangingFor(statistics.min_time_watch.elapsed() / (1000 * 1000));
stop_conditions.reportMaxSpeedNotChangingFor(statistics.max_rows_speed_watch.elapsed() / (1000 * 1000));
stop_conditions.reportAverageSpeedNotChangingFor(statistics.avg_rows_speed_watch.elapsed() / (1000 * 1000));
if (stop_conditions.areFulfilled())
{
statistics.last_query_was_cancelled = true;
stream.cancel(false);
}
if (interrupt_listener.check())
{
statistics.got_SIGINT = true;
statistics.last_query_was_cancelled = true;
stream.cancel(false);
}
}
}
void executeQuery(
Connection & connection,
const std::string & query,
TestStats & statistics,
TestStopConditions & stop_conditions,
InterruptListener & interrupt_listener,
Context & context)
{
statistics.watch_per_query.restart();
statistics.last_query_was_cancelled = false;
statistics.last_query_rows_read = 0;
statistics.last_query_bytes_read = 0;
Settings settings;
RemoteBlockInputStream stream(connection, query, {}, context, &settings);
stream.setProgressCallback(
[&](const Progress & value)
{
checkFulfilledConditionsAndUpdate(
value, stream, statistics,
stop_conditions, interrupt_listener);
});
stream.readPrefix();
while (Block block = stream.read());
stream.readSuffix();
if (!statistics.last_query_was_cancelled)
statistics.updateQueryInfo();
statistics.setTotalTime();
}
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <string>
#include "TestStats.h"
#include "TestStopConditions.h"
#include <Common/InterruptListener.h>
#include <Interpreters/Context.h>
#include <Client/Connection.h>
namespace DB
{
void executeQuery(
Connection & connection,
const std::string & query,
TestStats & statistics,
TestStopConditions & stop_conditions,
InterruptListener & interrupt_listener,
Context & context);
}

View File

@ -4,6 +4,7 @@
#include <Poco/File.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerRequestImpl.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/NetException.h>
@ -558,12 +559,51 @@ void HTTPHandler::processQuery(
client_info.http_method = http_method;
client_info.http_user_agent = request.get("User-Agent", "");
auto appendCallback = [&context] (ProgressCallback callback)
{
auto prev = context.getProgressCallback();
context.setProgressCallback([prev, callback] (const Progress & progress)
{
if (prev)
prev(progress);
callback(progress);
});
};
/// While still no data has been sent, we will report about query execution progress by sending HTTP headers.
if (settings.send_progress_in_http_headers)
context.setProgressCallback([&used_output] (const Progress & progress) { used_output.out->onProgress(progress); });
appendCallback([&used_output] (const Progress & progress) { used_output.out->onProgress(progress); });
if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close)
{
Poco::Net::StreamSocket & socket = dynamic_cast<Poco::Net::HTTPServerRequestImpl &>(request).socket();
appendCallback([&context, &socket](const Progress &)
{
/// Assume that at the point this method is called no one is reading data from the socket any more.
/// True for read-only queries.
try
{
char b;
int status = socket.receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK);
if (status == 0)
context.killCurrentQuery();
}
catch (Poco::TimeoutException &)
{
}
catch (...)
{
context.killCurrentQuery();
}
});
}
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); });
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("Query-Id", current_query_id); });
if (used_output.hasDelayed())
{

View File

@ -11,6 +11,7 @@
#include <Poco/DirectoryIterator.h>
#include <Poco/Net/HTTPServer.h>
#include <Poco/Net/NetException.h>
#include <Poco/Util/HelpFormatter.h>
#include <ext/scope_guard.h>
#include <common/logger_useful.h>
#include <common/ErrorHandlers.h>
@ -47,6 +48,7 @@
#include "MetricsTransmitter.h"
#include <Common/StatusFile.h>
#include "TCPHandlerFactory.h"
#include "Common/config_version.h"
#if defined(__linux__)
#include <Common/hasLinuxCapability.h>
@ -116,6 +118,26 @@ void Server::uninitialize()
BaseDaemon::uninitialize();
}
int Server::run()
{
if (config().hasOption("help"))
{
Poco::Util::HelpFormatter helpFormatter(Server::options());
std::stringstream header;
header << commandName() << " [OPTION] [-- [ARG]...]\n";
header << "positional arguments can be used to rewrite config.xml properties, for example, --http_port=8010";
helpFormatter.setHeader(header.str());
helpFormatter.format(std::cout);
return 0;
}
if (config().hasOption("version"))
{
std::cout << DBMS_NAME << " server version " << VERSION_STRING << "." << std::endl;
return 0;
}
return Application::run();
}
void Server::initialize(Poco::Util::Application & self)
{
BaseDaemon::initialize(self);
@ -127,6 +149,21 @@ std::string Server::getDefaultCorePath() const
return getCanonicalPath(config().getString("path", DBMS_DEFAULT_PATH)) + "cores";
}
void Server::defineOptions(Poco::Util::OptionSet & _options)
{
_options.addOption(
Poco::Util::Option("help", "h", "show help and exit")
.required(false)
.repeatable(false)
.binding("help"));
_options.addOption(
Poco::Util::Option("version", "V", "show version and exit")
.required(false)
.repeatable(false)
.binding("version"));
BaseDaemon::defineOptions(_options);
}
int Server::main(const std::vector<std::string> & /*args*/)
{
Logger * log = &logger();

View File

@ -21,6 +21,8 @@ namespace DB
class Server : public BaseDaemon, public IServer
{
public:
using ServerApplication::run;
Poco::Util::LayeredConfiguration & config() const override
{
return BaseDaemon::config();
@ -41,7 +43,10 @@ public:
return BaseDaemon::isCancelled();
}
void defineOptions(Poco::Util::OptionSet & _options) override;
protected:
int run() override;
void initialize(Application & self) override;
void uninitialize() override;

View File

@ -1 +0,0 @@
<yandex><listen_host>0.0.0.0</listen_host></yandex>

View File

@ -1,16 +1,8 @@
<yandex>
<zookeeper>
<!-- <zookeeper>
<node>
<host>localhost</host>
<port>2181</port>
</node>
<node>
<host>yandex.ru</host>
<port>2181</port>
</node>
<node>
<host>111.0.1.2</host>
<port>2181</port>
</node>
</zookeeper>
</zookeeper>-->
</yandex>

View File

@ -0,0 +1,58 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionEntropy.h>
#include <AggregateFunctions/FactoryHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
AggregateFunctionPtr createAggregateFunctionEntropy(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertNoParameters(name, parameters);
if (argument_types.empty())
throw Exception("Incorrect number of arguments for aggregate function " + name,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
WhichDataType which(argument_types[0]);
if (isNumber(argument_types[0]))
{
if (which.isUInt64())
{
return std::make_shared<AggregateFunctionEntropy<UInt64>>();
}
else if (which.isInt64())
{
return std::make_shared<AggregateFunctionEntropy<Int64>>();
}
else if (which.isInt32())
{
return std::make_shared<AggregateFunctionEntropy<Int32>>();
}
else if (which.isUInt32())
{
return std::make_shared<AggregateFunctionEntropy<UInt32>>();
}
else if (which.isUInt128())
{
return std::make_shared<AggregateFunctionEntropy<UInt128, true>>();
}
}
return std::make_shared<AggregateFunctionEntropy<UInt128>>();
}
}
void registerAggregateFunctionEntropy(AggregateFunctionFactory & factory)
{
factory.registerFunction("entropy", createAggregateFunctionEntropy);
}
}

View File

@ -0,0 +1,152 @@
#pragma once
#include <AggregateFunctions/FactoryHelpers.h>
#include <Common/HashTable/HashMap.h>
#include <Common/NaNUtils.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/UniqVariadicHash.h>
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <cmath>
namespace DB
{
/** Calculates Shannon Entropy, using HashMap and computing empirical distribution function
*/
template <typename Value, bool is_hashed>
struct EntropyData
{
using Weight = UInt64;
using HashingMap = HashMap <
Value, Weight,
HashCRC32<Value>,
HashTableGrower<4>,
HashTableAllocatorWithStackMemory<sizeof(std::pair<Value, Weight>) * (1 << 3)>
>;
using TrivialMap = HashMap <
Value, Weight,
UInt128TrivialHash,
HashTableGrower<4>,
HashTableAllocatorWithStackMemory<sizeof(std::pair<Value, Weight>) * (1 << 3)>
>;
/// If column value is UInt128 then there is no need to hash values
using Map = std::conditional_t<is_hashed, TrivialMap, HashingMap>;
Map map;
void add(const Value & x)
{
if (!isNaN(x))
++map[x];
}
void add(const Value & x, const Weight & weight)
{
if (!isNaN(x))
map[x] += weight;
}
void merge(const EntropyData & rhs)
{
for (const auto & pair : rhs.map)
map[pair.first] += pair.second;
}
void serialize(WriteBuffer & buf) const
{
map.write(buf);
}
void deserialize(ReadBuffer & buf)
{
typename Map::Reader reader(buf);
while (reader.next())
{
const auto &pair = reader.get();
map[pair.first] = pair.second;
}
}
Float64 get() const
{
Float64 shannon_entropy = 0;
UInt64 total_value = 0;
for (const auto & pair : map)
{
total_value += pair.second;
}
Float64 cur_proba;
Float64 log2e = 1 / std::log(2);
for (const auto & pair : map)
{
cur_proba = Float64(pair.second) / total_value;
shannon_entropy -= cur_proba * std::log(cur_proba) * log2e;
}
return shannon_entropy;
}
};
template <typename Value, bool is_hashed = false>
class AggregateFunctionEntropy final : public IAggregateFunctionDataHelper<EntropyData<Value, is_hashed>,
AggregateFunctionEntropy<Value>>
{
public:
AggregateFunctionEntropy()
{}
String getName() const override { return "entropy"; }
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeNumber<Float64>>();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
if constexpr (!std::is_same_v<UInt128, Value>)
{
/// Here we manage only with numerical types
const auto &column = static_cast<const ColumnVector <Value> &>(*columns[0]);
this->data(place).add(column.getData()[row_num]);
}
else
{
this->data(place).add(UniqVariadicHash<true, false>::apply(1, columns, row_num));
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(const_cast<AggregateDataPtr>(place)).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).deserialize(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
auto &column = dynamic_cast<ColumnVector<Float64> &>(to);
column.getData().push_back(this->data(place).get());
}
const char * getHeaderFilePath() const override { return __FILE__; }
};
}

View File

@ -128,7 +128,11 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
}
throw Exception("Unknown aggregate function " + name, ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
auto hints = this->getHints(name);
if (!hints.empty())
throw Exception("Unknown aggregate function " + name + ". Maybe you meant: " + toString(hints), ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
else
throw Exception("Unknown aggregate function " + name, ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
}

View File

@ -19,7 +19,7 @@ namespace ErrorCodes
/** Calculates quantile by collecting all values into array
* and applying n-th element (introselect) algorithm for the resulting array.
*
* It use O(N) memory and it is very inefficient in case of high amount of identical values.
* It uses O(N) memory and it is very inefficient in case of high amount of identical values.
* But it is very CPU efficient for not large datasets.
*/
template <typename Value>

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
/** Calculates quantile by counting number of occurrences for each value in a hash map.
*
* It use O(distinct(N)) memory. Can be naturally applied for values with weight.
* It uses O(distinct(N)) memory. Can be naturally applied for values with weight.
* In case of many identical values, it can be more efficient than QuantileExact even when weight is not used.
*/
template <typename Value>

View File

@ -27,6 +27,7 @@ void registerAggregateFunctionUniqUpTo(AggregateFunctionFactory &);
void registerAggregateFunctionTopK(AggregateFunctionFactory &);
void registerAggregateFunctionsBitwise(AggregateFunctionFactory &);
void registerAggregateFunctionsMaxIntersections(AggregateFunctionFactory &);
void registerAggregateFunctionEntropy(AggregateFunctionFactory &);
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorArray(AggregateFunctionCombinatorFactory &);
@ -65,6 +66,7 @@ void registerAggregateFunctions()
registerAggregateFunctionsMaxIntersections(factory);
registerAggregateFunctionHistogram(factory);
registerAggregateFunctionRetention(factory);
registerAggregateFunctionEntropy(factory);
}
{

View File

@ -12,6 +12,7 @@
#include <Columns/ColumnsCommon.h>
#include <DataStreams/ColumnGathererStream.h>
#include <ext/bit_cast.h>
#include <pdqsort.h>
#ifdef __SSE2__
#include <emmintrin.h>
@ -90,9 +91,9 @@ void ColumnVector<T>::getPermutation(bool reverse, size_t limit, int nan_directi
else
{
if (reverse)
std::sort(res.begin(), res.end(), greater(*this, nan_direction_hint));
pdqsort(res.begin(), res.end(), greater(*this, nan_direction_hint));
else
std::sort(res.begin(), res.end(), less(*this, nan_direction_hint));
pdqsort(res.begin(), res.end(), less(*this, nan_direction_hint));
}
}

View File

@ -69,7 +69,7 @@ public:
static void finalizePerformanceCounters();
/// Returns a non-empty string if the thread is attached to a query
static std::string getCurrentQueryID();
static const std::string & getQueryId();
/// Non-master threads call this method in destructor automatically
static void detachQuery();

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/Exception.h>
#include <Common/NamePrompter.h>
#include <Core/Types.h>
#include <Poco/String.h>
@ -105,6 +106,12 @@ public:
return aliases.count(name) || case_insensitive_aliases.count(name);
}
std::vector<String> getHints(const String & name) const
{
static const auto registered_names = getAllRegisteredNames();
return prompter.getHints(name, registered_names);
}
virtual ~IFactoryWithAliases() {}
private:
@ -120,6 +127,12 @@ private:
/// Case insensitive aliases
AliasMap case_insensitive_aliases;
/**
* prompter for names, if a person makes a typo for some function or type, it
* helps to find best possible match (in particular, edit distance is one or two symbols)
*/
NamePrompter</*MistakeFactor=*/2, /*MaxNumHints=*/2> prompter;
};
}

View File

@ -0,0 +1,83 @@
#pragma once
#include <Core/Types.h>
#include <algorithm>
#include <cctype>
#include <queue>
#include <utility>
namespace DB
{
template <size_t MistakeFactor, size_t MaxNumHints>
class NamePrompter
{
public:
using DistanceIndex = std::pair<size_t, size_t>;
using DistanceIndexQueue = std::priority_queue<DistanceIndex>;
static std::vector<String> getHints(const String & name, const std::vector<String> & prompting_strings)
{
DistanceIndexQueue queue;
for (size_t i = 0; i < prompting_strings.size(); ++i)
appendToQueue(i, name, queue, prompting_strings);
return release(queue, prompting_strings);
}
private:
static size_t levenshteinDistance(const String & lhs, const String & rhs)
{
size_t n = lhs.size();
size_t m = rhs.size();
std::vector<std::vector<size_t>> dp(n + 1, std::vector<size_t>(m + 1));
for (size_t i = 1; i <= n; ++i)
dp[i][0] = i;
for (size_t i = 1; i <= m; ++i)
dp[0][i] = i;
for (size_t j = 1; j <= m; ++j)
{
for (size_t i = 1; i <= n; ++i)
{
if (std::tolower(lhs[i - 1]) == std::tolower(rhs[j - 1]))
dp[i][j] = dp[i - 1][j - 1];
else
dp[i][j] = std::min(dp[i - 1][j] + 1, std::min(dp[i][j - 1] + 1, dp[i - 1][j - 1] + 1));
}
}
return dp[n][m];
}
static void appendToQueue(size_t ind, const String & name, DistanceIndexQueue & queue, const std::vector<String> & prompting_strings)
{
if (prompting_strings[ind].size() <= name.size() + MistakeFactor && prompting_strings[ind].size() + MistakeFactor >= name.size())
{
size_t distance = levenshteinDistance(prompting_strings[ind], name);
if (distance <= MistakeFactor)
{
queue.emplace(distance, ind);
if (queue.size() > MaxNumHints)
queue.pop();
}
}
}
static std::vector<String> release(DistanceIndexQueue & queue, const std::vector<String> & prompting_strings)
{
std::vector<String> ans;
ans.reserve(queue.size());
while (!queue.empty())
{
auto top = queue.top();
queue.pop();
ans.push_back(prompting_strings[top.second]);
}
std::reverse(ans.begin(), ans.end());
return ans;
}
};
}

View File

@ -1,9 +1,9 @@
#include "SharedLibrary.h"
#include <string>
#include <dlfcn.h>
#include <boost/core/noncopyable.hpp>
#include "Exception.h"
namespace DB
{
namespace ErrorCodes
@ -12,9 +12,9 @@ namespace ErrorCodes
extern const int CANNOT_DLSYM;
}
SharedLibrary::SharedLibrary(const std::string & path)
SharedLibrary::SharedLibrary(const std::string & path, int flags)
{
handle = dlopen(path.c_str(), RTLD_LAZY);
handle = dlopen(path.c_str(), flags);
if (!handle)
throw Exception(std::string("Cannot dlopen: ") + dlerror(), ErrorCodes::CANNOT_DLOPEN);
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <dlfcn.h>
#include <memory>
#include <string>
#include <boost/noncopyable.hpp>
@ -8,12 +9,12 @@
namespace DB
{
/** Allows you to open a dynamic library and get a pointer to a function from it.
/** Allows you to open a dynamic library and get a pointer to a function from it.
*/
class SharedLibrary : private boost::noncopyable
{
public:
explicit SharedLibrary(const std::string & path);
explicit SharedLibrary(const std::string & path, int flags = RTLD_LAZY);
~SharedLibrary();

View File

@ -116,7 +116,7 @@ public:
return thread_state.load(std::memory_order_relaxed);
}
String getQueryID();
const std::string & getQueryId() const;
/// Starts new query and create new thread group for it, current thread becomes master thread of the query
void initializeQuery();
@ -160,6 +160,8 @@ protected:
/// Use it only from current thread
Context * query_context = nullptr;
String query_id;
/// A logs queue used by TCPHandler to pass logs to a client
InternalTextLogsQueueWeakPtr logs_queue_ptr;

View File

@ -262,13 +262,7 @@ struct ODBCBridgeMixin
std::vector<std::string> cmd_args;
path.setFileName(
#if CLICKHOUSE_SPLIT_BINARY
"clickhouse-odbc-bridge"
#else
"clickhouse"
#endif
);
path.setFileName("clickhouse-odbc-bridge");
std::stringstream command;

View File

@ -20,7 +20,7 @@ void CachedCompressedReadBuffer::initInput()
if (!file_in)
{
file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size);
compressed_in = &*file_in;
compressed_in = file_in.get();
if (profile_callback)
file_in->setProfileCallback(profile_callback, clock_type);
@ -30,11 +30,12 @@ void CachedCompressedReadBuffer::initInput()
bool CachedCompressedReadBuffer::nextImpl()
{
/// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists.
UInt128 key = cache->hash(path, file_pos);
owned_cell = cache->get(key);
if (!owned_cell)
if (!owned_cell || !codec)
{
/// If not, read it from the file.
initInput();
@ -42,7 +43,6 @@ bool CachedCompressedReadBuffer::nextImpl()
owned_cell = std::make_shared<UncompressedCacheCell>();
size_t size_decompressed;
size_t size_compressed_without_checksum;
owned_cell->compressed_size = readCompressedData(size_decompressed, size_compressed_without_checksum);
@ -50,7 +50,7 @@ bool CachedCompressedReadBuffer::nextImpl()
if (owned_cell->compressed_size)
{
owned_cell->data.resize(size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer());
decompress(owned_cell->data.data(), size_decompressed, owned_cell->compressed_size);
decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum);
/// Put data into cache.
cache->set(key, owned_cell);

View File

@ -23,7 +23,7 @@ bool CompressedReadBufferFromFile::nextImpl()
if (!size_compressed)
return false;
memory.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
memory.resize(size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer());
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
@ -91,7 +91,7 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
return bytes_read;
/// If the decompressed block fits entirely where it needs to be copied.
if (size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER <= n - bytes_read)
if (size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer() <= n - bytes_read)
{
decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
@ -101,7 +101,7 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
{
size_compressed = new_size_compressed;
bytes += offset();
memory.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
memory.resize(size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer());
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
pos = working_buffer.begin();

View File

@ -153,6 +153,4 @@ private:
void attachToThreadGroup();
};
using BackgroundSchedulePoolPtr = std::shared_ptr<BackgroundSchedulePool>;
}

View File

@ -120,17 +120,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (!done_with_join)
{
for (const auto & name_with_alias : subquery.joined_block_aliases)
{
if (block.has(name_with_alias.first))
{
auto pos = block.getPositionByName(name_with_alias.first);
auto column = block.getByPosition(pos);
block.erase(pos);
column.name = name_with_alias.second;
block.insert(std::move(column));
}
}
subquery.renameColumns(block);
if (subquery.joined_block_actions)
subquery.joined_block_actions->execute(block);

View File

@ -183,7 +183,8 @@ private:
try
{
setThreadName("ParalInputsProc");
CurrentThread::attachTo(thread_group);
if (thread_group)
CurrentThread::attachTo(thread_group);
while (!finish)
{

View File

@ -7,7 +7,7 @@
#include <Common/typeid_cast.h>
#include <Poco/String.h>
#include <Common/StringUtils/StringUtils.h>
#include <IO/WriteHelpers.h>
namespace DB
{
@ -87,7 +87,11 @@ DataTypePtr DataTypeFactory::get(const String & family_name_param, const ASTPtr
return it->second(parameters);
}
throw Exception("Unknown data type family: " + family_name, ErrorCodes::UNKNOWN_TYPE);
auto hints = this->getHints(family_name);
if (!hints.empty())
throw Exception("Unknown data type family: " + family_name + ". Maybe you meant: " + toString(hints), ErrorCodes::UNKNOWN_TYPE);
else
throw Exception("Unknown data type family: " + family_name, ErrorCodes::UNKNOWN_TYPE);
}

View File

@ -20,9 +20,8 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
}
DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & context)
DatabaseDictionary::DatabaseDictionary(const String & name_)
: name(name_),
external_dictionaries(context.getExternalDictionaries()),
log(&Logger::get("DatabaseDictionary(" + name + ")"))
{
}
@ -31,23 +30,21 @@ void DatabaseDictionary::loadTables(Context &, ThreadPool *, bool)
{
}
Tables DatabaseDictionary::loadTables()
Tables DatabaseDictionary::listTables(const Context & context)
{
auto objects_map = external_dictionaries.getObjectsMap();
auto objects_map = context.getExternalDictionaries().getObjectsMap();
const auto & dictionaries = objects_map.get();
Tables tables;
for (const auto & pair : dictionaries)
{
const std::string & dict_name = pair.first;
if (deleted_tables.count(dict_name))
continue;
auto dict_ptr = std::static_pointer_cast<IDictionaryBase>(pair.second.loadable);
if (dict_ptr)
{
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
tables[dict_name] = StorageDictionary::create(dict_name, ColumnsDescription{columns}, dictionary_structure, dict_name);
const std::string & dict_name = pair.first;
tables[dict_name] = StorageDictionary::create(dict_name, ColumnsDescription{columns}, context, true, dict_name);
}
}
@ -55,23 +52,21 @@ Tables DatabaseDictionary::loadTables()
}
bool DatabaseDictionary::isTableExist(
const Context & /*context*/,
const Context & context,
const String & table_name) const
{
auto objects_map = external_dictionaries.getObjectsMap();
auto objects_map = context.getExternalDictionaries().getObjectsMap();
const auto & dictionaries = objects_map.get();
return dictionaries.count(table_name) && !deleted_tables.count(table_name);
return dictionaries.count(table_name);
}
StoragePtr DatabaseDictionary::tryGetTable(
const Context & /*context*/,
const Context & context,
const String & table_name) const
{
auto objects_map = external_dictionaries.getObjectsMap();
auto objects_map = context.getExternalDictionaries().getObjectsMap();
const auto & dictionaries = objects_map.get();
if (deleted_tables.count(table_name))
return {};
{
auto it = dictionaries.find(table_name);
if (it != dictionaries.end())
@ -81,7 +76,7 @@ StoragePtr DatabaseDictionary::tryGetTable(
{
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
return StorageDictionary::create(table_name, ColumnsDescription{columns}, dictionary_structure, table_name);
return StorageDictionary::create(table_name, ColumnsDescription{columns}, context, true, table_name);
}
}
}
@ -89,17 +84,17 @@ StoragePtr DatabaseDictionary::tryGetTable(
return {};
}
DatabaseIteratorPtr DatabaseDictionary::getIterator(const Context & /*context*/)
DatabaseIteratorPtr DatabaseDictionary::getIterator(const Context & context)
{
return std::make_unique<DatabaseSnapshotIterator>(loadTables());
return std::make_unique<DatabaseSnapshotIterator>(listTables(context));
}
bool DatabaseDictionary::empty(const Context & /*context*/) const
bool DatabaseDictionary::empty(const Context & context) const
{
auto objects_map = external_dictionaries.getObjectsMap();
auto objects_map = context.getExternalDictionaries().getObjectsMap();
const auto & dictionaries = objects_map.get();
for (const auto & pair : dictionaries)
if (pair.second.loadable && !deleted_tables.count(pair.first))
if (pair.second.loadable)
return false;
return true;
}
@ -115,23 +110,19 @@ void DatabaseDictionary::attachTable(const String & /*table_name*/, const Storag
}
void DatabaseDictionary::createTable(
const Context & /*context*/,
const String & /*table_name*/,
const StoragePtr & /*table*/,
const ASTPtr & /*query*/)
const Context &,
const String &,
const StoragePtr &,
const ASTPtr &)
{
throw Exception("DatabaseDictionary: createTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::removeTable(
const Context & context,
const String & table_name)
const Context &,
const String &)
{
if (!isTableExist(context, table_name))
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
auto objects_map = external_dictionaries.getObjectsMap();
deleted_tables.insert(table_name);
throw Exception("DatabaseDictionary: removeTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::renameTable(

View File

@ -15,7 +15,6 @@ namespace Poco
namespace DB
{
class ExternalDictionaries;
/* Database to store StorageDictionary tables
* automatically creates tables for all dictionaries
@ -23,7 +22,7 @@ class ExternalDictionaries;
class DatabaseDictionary : public IDatabase
{
public:
DatabaseDictionary(const String & name_, const Context & context);
DatabaseDictionary(const String & name_);
String getDatabaseName() const override;
@ -94,13 +93,10 @@ public:
private:
const String name;
mutable std::mutex mutex;
const ExternalDictionaries & external_dictionaries;
std::unordered_set<String> deleted_tables;
Poco::Logger * log;
Tables loadTables();
Tables listTables(const Context & context);
ASTPtr getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const;
};

View File

@ -23,7 +23,7 @@ DatabasePtr DatabaseFactory::get(
else if (engine_name == "Memory")
return std::make_shared<DatabaseMemory>(database_name);
else if (engine_name == "Dictionary")
return std::make_shared<DatabaseDictionary>(database_name, context);
return std::make_shared<DatabaseDictionary>(database_name);
throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}

View File

@ -54,7 +54,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Block & sample_block,
Context & context)
Context & context_)
: update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_}
, host{config.getString(config_prefix + ".host")}
@ -69,11 +69,13 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, sample_block{sample_block}
, context(context)
, context(context_)
, is_local{isLocalAddress({host, port}, context.getTCPPort())}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password, context)}
, load_all_query{query_builder.composeLoadAllQuery()}
{
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0), {});
}
@ -182,7 +184,8 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
{
if (is_local)
{
auto input_block = executeQuery(request, context, true).in;
Context query_context = context;
auto input_block = executeQuery(request, query_context, true).in;
return readInvalidateQuery(*input_block);
}
else
@ -201,7 +204,8 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
Context & context) -> DictionarySourcePtr {
Context & context) -> DictionarySourcePtr
{
return std::make_unique<ClickHouseDictionarySource>(dict_struct, config, config_prefix + ".clickhouse", sample_block, context);
};
factory.registerSource("clickhouse", createTableSource);

View File

@ -2,6 +2,7 @@
#include <memory>
#include <Client/ConnectionPoolWithFailover.h>
#include <Interpreters/Context.h>
#include "DictionaryStructure.h"
#include "ExternalQueryBuilder.h"
#include "IDictionarySource.h"
@ -65,7 +66,7 @@ private:
mutable std::string invalidate_query_response;
ExternalQueryBuilder query_builder;
Block sample_block;
Context & context;
Context context;
const bool is_local;
ConnectionPoolWithFailoverPtr pool;
const std::string load_all_query;

View File

@ -14,7 +14,6 @@ namespace ErrorCodes
void DictionaryFactory::registerLayout(const std::string & layout_type, Creator create_layout)
{
//LOG_DEBUG(log, "Register dictionary layout type `" + layout_type + "`");
if (!registered_layouts.emplace(layout_type, std::move(create_layout)).second)
throw Exception("DictionaryFactory: the layout name '" + layout_type + "' is not unique", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -234,7 +234,8 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & context) -> DictionarySourcePtr {
Context & context) -> DictionarySourcePtr
{
if (dict_struct.has_expressions)
throw Exception{"Dictionary source of type `executable` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};

View File

@ -56,7 +56,8 @@ void registerDictionarySourceFile(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & context) -> DictionarySourcePtr {
Context & context) -> DictionarySourcePtr
{
if (dict_struct.has_expressions)
throw Exception{"Dictionary source of type `file` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};

View File

@ -157,7 +157,8 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & context) -> DictionarySourcePtr {
Context & context) -> DictionarySourcePtr
{
if (dict_struct.has_expressions)
throw Exception{"Dictionary source of type `http` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};

View File

@ -121,21 +121,23 @@ LibraryDictionarySource::LibraryDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & context)
Block & sample_block)
: log(&Logger::get("LibraryDictionarySource"))
, dict_struct{dict_struct_}
, config_prefix{config_prefix}
, path{config.getString(config_prefix + ".path", "")}
, sample_block{sample_block}
, context(context)
{
if (!Poco::File(path).exists())
throw Exception(
"LibraryDictionarySource: Can't load lib " + toString() + ": " + Poco::File(path).path() + " - File doesn't exist",
ErrorCodes::FILE_DOESNT_EXIST);
description.init(sample_block);
library = std::make_shared<SharedLibrary>(path);
library = std::make_shared<SharedLibrary>(path, RTLD_LAZY
#if defined(RTLD_DEEPBIND) // Does not exists in freebsd
| RTLD_DEEPBIND
#endif
);
settings = std::make_shared<CStringsHolder>(getLibSettings(config, config_prefix + lib_config_settings));
if (auto libNew = library->tryGet<decltype(lib_data) (*)(decltype(&settings->strings), decltype(&ClickHouseLibrary::log))>(
"ClickHouseDictionary_v3_libNew"))
@ -148,7 +150,6 @@ LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource &
, config_prefix{other.config_prefix}
, path{other.path}
, sample_block{other.sample_block}
, context(other.context)
, library{other.library}
, description{other.description}
, settings{other.settings}
@ -284,8 +285,9 @@ void registerDictionarySourceLibrary(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & context) -> DictionarySourcePtr {
return std::make_unique<LibraryDictionarySource>(dict_struct, config, config_prefix + ".library", sample_block, context);
const Context &) -> DictionarySourcePtr
{
return std::make_unique<LibraryDictionarySource>(dict_struct, config, config_prefix + ".library", sample_block);
};
factory.registerSource("library", createTableSource);
}

View File

@ -32,8 +32,7 @@ public:
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & context);
Block & sample_block);
LibraryDictionarySource(const LibraryDictionarySource & other);
@ -70,7 +69,6 @@ private:
const std::string config_prefix;
const std::string path;
Block sample_block;
const Context & context;
SharedLibraryPtr library;
ExternalResultDescription description;
std::shared_ptr<CStringsHolder> settings;

View File

@ -36,6 +36,7 @@ endif ()
if (USE_ICU)
target_link_libraries (clickhouse_functions PRIVATE ${ICU_LIBRARIES})
target_include_directories(clickhouse_functions SYSTEM PRIVATE ${ICU_INCLUDE_DIRS})
endif ()
if (USE_VECTORCLASS)

View File

@ -6,6 +6,8 @@
#include <Poco/String.h>
#include <IO/WriteHelpers.h>
namespace DB
{
@ -43,7 +45,13 @@ FunctionBuilderPtr FunctionFactory::get(
{
auto res = tryGet(name, context);
if (!res)
throw Exception("Unknown function " + name, ErrorCodes::UNKNOWN_FUNCTION);
{
auto hints = this->getHints(name);
if (!hints.empty())
throw Exception("Unknown function " + name + ". Maybe you meant: " + toString(hints), ErrorCodes::UNKNOWN_FUNCTION);
else
throw Exception("Unknown function " + name, ErrorCodes::UNKNOWN_FUNCTION);
}
return res;
}

View File

@ -15,9 +15,26 @@ class FunctionIfBase : public IFunction
public:
bool isCompilableImpl(const DataTypes & types) const override
{
/// It's difficult to compare Date and DateTime - cannot use JIT compilation.
bool has_date = false;
bool has_datetime = false;
for (const auto & type : types)
if (!isCompilableType(removeNullable(type)))
{
auto type_removed_nullable = removeNullable(type);
WhichDataType which(type_removed_nullable);
if (which.isDate())
has_date = true;
if (which.isDateTime())
has_datetime = true;
if (has_date && has_datetime)
return false;
if (!isCompilableType(type_removed_nullable))
return false;
}
return true;
}

View File

@ -1146,10 +1146,16 @@ public:
const DataTypePtr & left_type = col_with_type_and_name_left.type;
const DataTypePtr & right_type = col_with_type_and_name_right.type;
WhichDataType which_left{left_type};
WhichDataType which_right{right_type};
const bool left_is_num = col_left_untyped->isNumeric();
const bool right_is_num = col_right_untyped->isNumeric();
if (left_is_num && right_is_num)
bool date_and_datetime = (left_type != right_type) &&
which_left.isDateOrDateTime() && which_right.isDateOrDateTime();
if (left_is_num && right_is_num && !date_and_datetime)
{
if (!(executeNumLeftType<UInt8>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<UInt16>(block, result, col_left_untyped, col_right_untyped)
@ -1203,7 +1209,10 @@ public:
{
auto isBigInteger = &typeIsEither<DataTypeInt64, DataTypeUInt64, DataTypeUUID>;
auto isFloatingPoint = &typeIsEither<DataTypeFloat32, DataTypeFloat64>;
if ((isBigInteger(*types[0]) && isFloatingPoint(*types[1])) || (isBigInteger(*types[1]) && isFloatingPoint(*types[0])))
if ((isBigInteger(*types[0]) && isFloatingPoint(*types[1]))
|| (isBigInteger(*types[1]) && isFloatingPoint(*types[0]))
|| (WhichDataType(types[0]).isDate() && WhichDataType(types[1]).isDateTime())
|| (WhichDataType(types[1]).isDate() && WhichDataType(types[0]).isDateTime()))
return false; /// TODO: implement (double, int_N where N > double's mantissa width)
return isCompilableType(types[0]) && isCompilableType(types[1]);
}

View File

@ -186,7 +186,7 @@ public:
: owned_dict(owned_dict_)
{
if (!owned_dict)
throw Exception("Dictionaries was not loaded. You need to check configuration file.", ErrorCodes::DICTIONARIES_WAS_NOT_LOADED);
throw Exception("Embedded dictionaries were not loaded. You need to check configuration file.", ErrorCodes::DICTIONARIES_WAS_NOT_LOADED);
}
String getName() const override
@ -280,7 +280,7 @@ public:
: owned_dict(owned_dict_)
{
if (!owned_dict)
throw Exception("Dictionaries was not loaded. You need to check configuration file.", ErrorCodes::DICTIONARIES_WAS_NOT_LOADED);
throw Exception("Embedded dictionaries were not loaded. You need to check configuration file.", ErrorCodes::DICTIONARIES_WAS_NOT_LOADED);
}
String getName() const override
@ -418,7 +418,7 @@ public:
: owned_dict(owned_dict_)
{
if (!owned_dict)
throw Exception("Dictionaries was not loaded. You need to check configuration file.", ErrorCodes::DICTIONARIES_WAS_NOT_LOADED);
throw Exception("Embedded dictionaries were not loaded. You need to check configuration file.", ErrorCodes::DICTIONARIES_WAS_NOT_LOADED);
}
String getName() const override
@ -690,7 +690,7 @@ public:
: owned_dict(owned_dict_)
{
if (!owned_dict)
throw Exception("Dictionaries was not loaded. You need to check configuration file.", ErrorCodes::DICTIONARIES_WAS_NOT_LOADED);
throw Exception("Embedded dictionaries were not loaded. You need to check configuration file.", ErrorCodes::DICTIONARIES_WAS_NOT_LOADED);
}
String getName() const override

View File

@ -151,6 +151,8 @@ public:
#endif
virtual bool isStateful() const { return false; }
/** Should we evaluate this function while constant folding, if arguments are constants?
* Usually this is true. Notable counterexample is function 'sleep'.
* If we will call it during query analysis, we will sleep extra amount of time.
@ -230,6 +232,9 @@ public:
/// Get the main function name.
virtual String getName() const = 0;
/// Override and return true if function needs to depend on the state of the data.
virtual bool isStateful() const { return false; }
/// Override and return true if function could take different number of arguments.
virtual bool isVariadic() const { return false; }
@ -322,6 +327,9 @@ class IFunction : public std::enable_shared_from_this<IFunction>,
{
public:
String getName() const override = 0;
bool isStateful() const override { return false; }
/// TODO: make const
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override = 0;
@ -478,6 +486,7 @@ public:
}
String getName() const override { return function->getName(); }
bool isStateful() const override { return function->isStateful(); }
bool isVariadic() const override { return function->isVariadic(); }
size_t getNumberOfArguments() const override { return function->getNumberOfArguments(); }

View File

@ -27,6 +27,11 @@ public:
return name;
}
bool isStateful() const override
{
return true;
}
size_t getNumberOfArguments() const override
{
return 0;

View File

@ -33,6 +33,11 @@ public:
return name;
}
bool isStateful() const override
{
return true;
}
size_t getNumberOfArguments() const override
{
return 1;

View File

@ -22,6 +22,7 @@
#include <Functions/GatherUtils/Algorithms.h>
#include <Functions/FunctionIfBase.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/castColumn.h>
namespace DB
@ -168,7 +169,8 @@ class FunctionIf : public FunctionIfBase</*null_is_false=*/false>
{
public:
static constexpr auto name = "if";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionIf>(); }
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionIf>(context); }
FunctionIf(const Context & context) : context(context) {}
private:
template <typename T0, typename T1>
@ -588,6 +590,72 @@ private:
return true;
}
void executeGeneric(const ColumnUInt8 * cond_col, Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
{
/// Convert both columns to the common type (if needed).
const ColumnWithTypeAndName & arg1 = block.getByPosition(arguments[1]);
const ColumnWithTypeAndName & arg2 = block.getByPosition(arguments[2]);
DataTypePtr common_type = getLeastSupertype({arg1.type, arg2.type});
ColumnPtr col_then = castColumn(arg1, common_type, context);
ColumnPtr col_else = castColumn(arg2, common_type, context);
MutableColumnPtr result_column = common_type->createColumn();
result_column->reserve(input_rows_count);
bool then_is_const = col_then->isColumnConst();
bool else_is_const = col_else->isColumnConst();
const auto & cond_array = cond_col->getData();
if (then_is_const && else_is_const)
{
const IColumn & then_nested_column = static_cast<const ColumnConst &>(*col_then).getDataColumn();
const IColumn & else_nested_column = static_cast<const ColumnConst &>(*col_else).getDataColumn();
for (size_t i = 0; i < input_rows_count; ++i)
{
if (cond_array[i])
result_column->insertFrom(then_nested_column, 0);
else
result_column->insertFrom(else_nested_column, 0);
}
}
else if (then_is_const)
{
const IColumn & then_nested_column = static_cast<const ColumnConst &>(*col_then).getDataColumn();
for (size_t i = 0; i < input_rows_count; ++i)
{
if (cond_array[i])
result_column->insertFrom(then_nested_column, 0);
else
result_column->insertFrom(*col_else, i);
}
}
else if (else_is_const)
{
const IColumn & else_nested_column = static_cast<const ColumnConst &>(*col_else).getDataColumn();
for (size_t i = 0; i < input_rows_count; ++i)
{
if (cond_array[i])
result_column->insertFrom(*col_then, i);
else
result_column->insertFrom(else_nested_column, 0);
}
}
else
{
for (size_t i = 0; i < input_rows_count; ++i)
result_column->insertFrom(cond_array[i] ? *col_then : *col_else, i);
}
block.getByPosition(result).column = std::move(result_column);
}
bool executeForNullableCondition(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
{
const ColumnWithTypeAndName & arg_cond = block.getByPosition(arguments[0]);
@ -873,6 +941,14 @@ public:
const ColumnWithTypeAndName & arg_then = block.getByPosition(arguments[1]);
const ColumnWithTypeAndName & arg_else = block.getByPosition(arguments[2]);
/// A case for identical then and else (pointers are the same).
if (arg_then.column.get() == arg_else.column.get())
{
/// Just point result to them.
block.getByPosition(result).column = arg_then.column;
return;
}
const ColumnUInt8 * cond_col = typeid_cast<const ColumnUInt8 *>(arg_cond.column.get());
const ColumnConst * cond_const_col = checkAndGetColumnConst<ColumnVector<UInt8>>(arg_cond.column.get());
ColumnPtr materialized_cond_col;
@ -919,17 +995,17 @@ public:
if (auto rigth_array = checkAndGetDataType<DataTypeArray>(arg_else.type.get()))
right_id = rigth_array->getNestedType()->getTypeId();
bool executed_with_nums = callOnBasicTypes<true, true, true, true>(left_id, right_id, call);
if (!(executed_with_nums
if (!(callOnBasicTypes<true, true, true, false>(left_id, right_id, call)
|| executeTyped<UInt128, UInt128>(cond_col, block, arguments, result, input_rows_count)
|| executeString(cond_col, block, arguments, result)
|| executeGenericArray(cond_col, block, arguments, result)
|| executeTuple(block, arguments, result, input_rows_count)))
throw Exception("Illegal columns " + arg_then.column->getName() + " and " + arg_else.column->getName()
+ " of second (then) and third (else) arguments of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
{
executeGeneric(cond_col, block, arguments, result, input_rows_count);
}
}
const Context & context;
};
void registerFunctionIf(FunctionFactory & factory)

View File

@ -27,6 +27,11 @@ public:
return name;
}
bool isStateful() const override
{
return true;
}
size_t getNumberOfArguments() const override
{
return 0;

View File

@ -22,6 +22,11 @@ public:
return name;
}
bool isStateful() const override
{
return true;
}
size_t getNumberOfArguments() const override
{
return 0;

View File

@ -41,6 +41,11 @@ public:
return name;
}
bool isStateful() const override
{
return true;
}
size_t getNumberOfArguments() const override
{
return 1;

View File

@ -130,6 +130,11 @@ public:
return name;
}
bool isStateful() const override
{
return true;
}
size_t getNumberOfArguments() const override
{
return 1;

View File

@ -1,111 +0,0 @@
#include <IO/InterserverWriteBuffer.h>
#include <IO/WriteBufferFromOStream.h>
#include <Poco/Version.h>
#include <Poco/URI.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_WRITE_TO_OSTREAM;
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
}
InterserverWriteBuffer::InterserverWriteBuffer(const std::string & host_, int port_,
const std::string & endpoint_,
const std::string & path_,
bool compress_,
size_t buffer_size_,
const Poco::Timespan & connection_timeout,
const Poco::Timespan & send_timeout,
const Poco::Timespan & receive_timeout)
: WriteBuffer(nullptr, 0), host(host_), port(port_), path(path_)
{
std::string encoded_path;
Poco::URI::encode(path, "&#", encoded_path);
std::string encoded_endpoint;
Poco::URI::encode(endpoint_, "&#", encoded_endpoint);
std::string compress_str = compress_ ? "true" : "false";
std::string encoded_compress;
Poco::URI::encode(compress_str, "&#", encoded_compress);
std::stringstream uri;
uri << "http://" << host << ":" << port
<< "/?endpoint=" << encoded_endpoint
<< "&compress=" << encoded_compress
<< "&path=" << encoded_path;
std::string uri_str = Poco::URI(uri.str()).getPathAndQuery();
session.setHost(host);
session.setPort(port);
session.setKeepAlive(true);
/// set the timeout
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000
session.setTimeout(connection_timeout, send_timeout, receive_timeout);
#else
session.setTimeout(connection_timeout);
static_cast <void> (send_timeout);
static_cast <void> (receive_timeout);
#endif
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri_str, Poco::Net::HTTPRequest::HTTP_1_1);
request.setChunkedTransferEncoding(true);
ostr = &session.sendRequest(request);
impl = std::make_unique<WriteBufferFromOStream>(*ostr, buffer_size_);
set(impl->buffer().begin(), impl->buffer().size());
}
InterserverWriteBuffer::~InterserverWriteBuffer()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void InterserverWriteBuffer::nextImpl()
{
if (!offset() || finalized)
return;
/// For correct work with AsynchronousWriteBuffer, which replaces buffers.
impl->set(buffer().begin(), buffer().size());
impl->position() = pos;
impl->next();
}
void InterserverWriteBuffer::finalize()
{
if (finalized)
return;
next();
finalized = true;
}
void InterserverWriteBuffer::cancel()
{
finalized = true;
}
}

View File

@ -1,54 +0,0 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <IO/HashingWriteBuffer.h>
#include <Poco/Net/HTTPClientSession.h>
namespace DB
{
namespace
{
constexpr auto DEFAULT_REMOTE_WRITE_BUFFER_CONNECTION_TIMEOUT = 1;
constexpr auto DEFAULT_REMOTE_WRITE_BUFFER_RECEIVE_TIMEOUT = 1800;
constexpr auto DEFAULT_REMOTE_WRITE_BUFFER_SEND_TIMEOUT = 1800;
}
/** Allows you to write a file to a remote server.
*/
class InterserverWriteBuffer final : public WriteBuffer
{
public:
InterserverWriteBuffer(const std::string & host_, int port_,
const std::string & endpoint_,
const std::string & path_,
bool compress_ = false,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const Poco::Timespan & connection_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_CONNECTION_TIMEOUT, 0),
const Poco::Timespan & send_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_SEND_TIMEOUT, 0),
const Poco::Timespan & receive_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_RECEIVE_TIMEOUT, 0));
~InterserverWriteBuffer() override;
void finalize();
void cancel();
private:
void nextImpl() override;
private:
std::string host;
int port;
std::string path;
Poco::Net::HTTPClientSession session;
std::ostream * ostr; /// this is owned by session
std::unique_ptr<WriteBuffer> impl;
/// Sent all the data and renamed the file
bool finalized = false;
};
}

View File

@ -357,7 +357,18 @@ void ActionsVisitor::visit(const ASTPtr & ast)
? context.getQueryContext()
: context;
const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(node->name, function_context);
FunctionBuilderPtr function_builder;
try
{
function_builder = FunctionFactory::instance().get(node->name, function_context);
}
catch (DB::Exception & e)
{
auto hints = AggregateFunctionFactory::instance().getHints(node->name);
if (!hints.empty())
e.addMessage("Or unknown aggregate function " + node->name + ". Maybe you meant: " + toString(hints));
e.rethrow();
}
Names argument_names;
DataTypes argument_types;

View File

@ -3,6 +3,7 @@
#include <Parsers/IAST.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/SubqueryForSet.h>
namespace DB
@ -11,32 +12,6 @@ namespace DB
class Context;
class ASTFunction;
class Join;
using JoinPtr = std::shared_ptr<Join>;
/// Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section.
struct SubqueryForSet
{
/// The source is obtained using the InterpreterSelectQuery subquery.
BlockInputStreamPtr source;
/// If set, build it from result.
SetPtr set;
JoinPtr join;
/// Apply this actions to joined block.
ExpressionActionsPtr joined_block_actions;
/// Rename column from joined block from this list.
NamesWithAliases joined_block_aliases;
/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
};
/// ID of subquery -> what to do with it.
using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;
/// The case of an explicit enumeration of values.
SetPtr makeExplicitSet(
const ASTFunction * node, const Block & sample_block, bool create_ordered_set,

View File

@ -16,8 +16,7 @@ namespace DB
ExpressionActionsPtr AnalyzedJoin::createJoinedBlockActions(
const JoinedColumnsList & columns_added_by_join,
const ASTSelectQuery * select_query_with_join,
const Context & context,
NameSet & required_columns_from_joined_table) const
const Context & context) const
{
if (!select_query_with_join)
return nullptr;
@ -48,8 +47,14 @@ ExpressionActionsPtr AnalyzedJoin::createJoinedBlockActions(
ASTPtr query = expression_list;
auto syntax_result = SyntaxAnalyzer(context).analyze(query, source_column_names, required_columns);
ExpressionAnalyzer analyzer(query, syntax_result, context, {}, required_columns);
auto joined_block_actions = analyzer.getActions(false);
ExpressionAnalyzer analyzer(query, syntax_result, context, {}, required_columns_set);
return analyzer.getActions(false);
}
NameSet AnalyzedJoin::getRequiredColumnsFromJoinedTable(const JoinedColumnsList & columns_added_by_join,
const ExpressionActionsPtr & joined_block_actions) const
{
NameSet required_columns_from_joined_table;
auto required_action_columns = joined_block_actions->getRequiredColumns();
required_columns_from_joined_table.insert(required_action_columns.begin(), required_action_columns.end());
@ -63,7 +68,7 @@ ExpressionActionsPtr AnalyzedJoin::createJoinedBlockActions(
if (!sample.has(column.name_and_type.name))
required_columns_from_joined_table.insert(column.name_and_type.name);
return joined_block_actions;
return required_columns_from_joined_table;
}
const JoinedColumnsList & AnalyzedJoin::getColumnsFromJoinedTable(

View File

@ -64,9 +64,11 @@ struct AnalyzedJoin
ExpressionActionsPtr createJoinedBlockActions(
const JoinedColumnsList & columns_added_by_join, /// Subset of available_joined_columns.
const ASTSelectQuery * select_query_with_join,
const Context & context,
NameSet & required_columns_from_joined_table /// Columns which will be used in query from joined table.
) const;
const Context & context) const;
/// Columns which will be used in query from joined table.
NameSet getRequiredColumnsFromJoinedTable(const JoinedColumnsList & columns_added_by_join,
const ExpressionActionsPtr & joined_block_actions) const;
const JoinedColumnsList & getColumnsFromJoinedTable(const NameSet & source_columns,
const Context & context,

View File

@ -1,8 +1,8 @@
#include <map>
#include <set>
#include <boost/functional/hash/hash.hpp>
#include <optional>
#include <memory>
#include <Poco/Mutex.h>
#include <Poco/File.h>
#include <Poco/UUID.h>
#include <Poco/Net/IPAddress.h>
#include <common/logger_useful.h>
@ -98,7 +98,7 @@ struct ContextShared
{
Logger * log = &Logger::get("Context");
std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory;
std::unique_ptr<IRuntimeComponentsFactory> runtime_components_factory;
/// For access of most of shared objects. Recursive mutex.
mutable std::recursive_mutex mutex;
@ -124,12 +124,12 @@ struct ContextShared
ConfigurationPtr config; /// Global configuration settings.
Databases databases; /// List of databases and tables in them.
mutable std::shared_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::shared_ptr<ExternalDictionaries> external_dictionaries;
mutable std::shared_ptr<ExternalModels> external_models;
mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::optional<ExternalDictionaries> external_dictionaries;
mutable std::optional<ExternalModels> external_models;
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
std::shared_ptr<ISecurityManager> security_manager; /// Known users.
std::unique_ptr<ISecurityManager> security_manager; /// Known users.
Quotas quotas; /// Known quotas for resource use.
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
@ -138,18 +138,19 @@ struct ContextShared
ViewDependencies view_dependencies; /// Current dependencies
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
BackgroundProcessingPoolPtr background_pool; /// The thread pool for the background work performed by the tables.
BackgroundSchedulePoolPtr schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
std::optional<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
std::shared_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector;
std::unique_ptr<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default)
String format_schema_path; /// Path to a directory that contains schema files used by input formats.
ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers
SystemLogsPtr system_logs; /// Used to log queries and operations on parts
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
@ -206,7 +207,7 @@ struct ContextShared
Context::ConfigReloadCallback config_reload_callback;
ContextShared(std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory_)
ContextShared(std::unique_ptr<IRuntimeComponentsFactory> runtime_components_factory_)
: runtime_components_factory(std::move(runtime_components_factory_)), macros(std::make_unique<Macros>())
{
/// TODO: make it singleton (?)
@ -243,6 +244,8 @@ struct ContextShared
return;
shutdown_called = true;
system_logs.reset();
/** At this point, some tables may have threads that block our mutex.
* To complete them correctly, we will copy the current list of tables,
* and ask them all to finish their work.
@ -263,6 +266,15 @@ struct ContextShared
std::lock_guard lock(mutex);
databases.clear();
}
/// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference).
/// TODO: Get rid of this.
embedded_dictionaries.reset();
external_dictionaries.reset();
external_models.reset();
background_pool.reset();
schedule_pool.reset();
}
private:
@ -276,11 +288,10 @@ private:
Context::Context() = default;
Context Context::createGlobal(std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory)
Context Context::createGlobal(std::unique_ptr<IRuntimeComponentsFactory> runtime_components_factory)
{
Context res;
res.runtime_components_factory = runtime_components_factory;
res.shared = std::make_shared<ContextShared>(runtime_components_factory);
res.shared = std::make_shared<ContextShared>(std::move(runtime_components_factory));
res.quota = std::make_shared<QuotaForIntervals>();
return res;
}
@ -290,18 +301,7 @@ Context Context::createGlobal()
return createGlobal(std::make_unique<RuntimeComponentsFactory>());
}
Context::~Context()
{
try
{
/// Destroy system logs while at least one Context is alive
system_logs.reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
Context::~Context() = default;
InterserverIOHandler & Context::getInterserverIOHandler() { return shared->interserver_io_handler; }
@ -1077,6 +1077,13 @@ void Context::setCurrentQueryId(const String & query_id)
client_info.current_query_id = query_id_to_set;
}
void Context::killCurrentQuery()
{
if (process_list_elem)
{
process_list_elem->cancelQuery(true);
}
};
String Context::getDefaultFormat() const
{
@ -1181,9 +1188,9 @@ EmbeddedDictionaries & Context::getEmbeddedDictionariesImpl(const bool throw_on_
if (!shared->embedded_dictionaries)
{
auto geo_dictionaries_loader = runtime_components_factory->createGeoDictionariesLoader();
auto geo_dictionaries_loader = shared->runtime_components_factory->createGeoDictionariesLoader();
shared->embedded_dictionaries = std::make_shared<EmbeddedDictionaries>(
shared->embedded_dictionaries.emplace(
std::move(geo_dictionaries_loader),
*this->global_context,
throw_on_error);
@ -1202,9 +1209,9 @@ ExternalDictionaries & Context::getExternalDictionariesImpl(const bool throw_on_
if (!this->global_context)
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
auto config_repository = runtime_components_factory->createExternalDictionariesConfigRepository();
auto config_repository = shared->runtime_components_factory->createExternalDictionariesConfigRepository();
shared->external_dictionaries = std::make_shared<ExternalDictionaries>(
shared->external_dictionaries.emplace(
std::move(config_repository),
*this->global_context,
throw_on_error);
@ -1222,9 +1229,9 @@ ExternalModels & Context::getExternalModelsImpl(bool throw_on_error) const
if (!this->global_context)
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
auto config_repository = runtime_components_factory->createExternalModelsConfigRepository();
auto config_repository = shared->runtime_components_factory->createExternalModelsConfigRepository();
shared->external_models = std::make_shared<ExternalModels>(
shared->external_models.emplace(
std::move(config_repository),
*this->global_context,
throw_on_error);
@ -1342,7 +1349,7 @@ BackgroundProcessingPool & Context::getBackgroundPool()
{
auto lock = getLock();
if (!shared->background_pool)
shared->background_pool = std::make_shared<BackgroundProcessingPool>(settings.background_pool_size);
shared->background_pool.emplace(settings.background_pool_size);
return *shared->background_pool;
}
@ -1350,7 +1357,7 @@ BackgroundSchedulePool & Context::getSchedulePool()
{
auto lock = getLock();
if (!shared->schedule_pool)
shared->schedule_pool = std::make_shared<BackgroundSchedulePool>(settings.background_schedule_pool_size);
shared->schedule_pool.emplace(settings.background_schedule_pool_size);
return *shared->schedule_pool;
}
@ -1529,7 +1536,7 @@ Compiler & Context::getCompiler()
auto lock = getLock();
if (!shared->compiler)
shared->compiler = std::make_unique<Compiler>(shared->path + "build/", 1);
shared->compiler.emplace(shared->path + "build/", 1);
return *shared->compiler;
}
@ -1542,7 +1549,7 @@ void Context::initializeSystemLogs()
if (!global_context)
throw Exception("Logical error: no global context for system logs", ErrorCodes::LOGICAL_ERROR);
system_logs = std::make_shared<SystemLogs>(*global_context, getConfigRef());
shared->system_logs = std::make_shared<SystemLogs>(*global_context, getConfigRef());
}
@ -1550,10 +1557,10 @@ QueryLog * Context::getQueryLog()
{
auto lock = getLock();
if (!system_logs || !system_logs->query_log)
if (!shared->system_logs || !shared->system_logs->query_log)
return nullptr;
return system_logs->query_log.get();
return shared->system_logs->query_log.get();
}
@ -1561,10 +1568,10 @@ QueryThreadLog * Context::getQueryThreadLog()
{
auto lock = getLock();
if (!system_logs || !system_logs->query_thread_log)
if (!shared->system_logs || !shared->system_logs->query_thread_log)
return nullptr;
return system_logs->query_thread_log.get();
return shared->system_logs->query_thread_log.get();
}
@ -1573,16 +1580,16 @@ PartLog * Context::getPartLog(const String & part_database)
auto lock = getLock();
/// System logs are shutting down.
if (!system_logs || !system_logs->part_log)
if (!shared->system_logs || !shared->system_logs->part_log)
return nullptr;
/// Will not log operations on system tables (including part_log itself).
/// It doesn't make sense and not allow to destruct PartLog correctly due to infinite logging and flushing,
/// and also make troubles on startup.
if (part_database == system_logs->part_log_database)
if (part_database == shared->system_logs->part_log_database)
return nullptr;
return system_logs->part_log.get();
return shared->system_logs->part_log.get();
}
@ -1612,7 +1619,7 @@ const MergeTreeSettings & Context::getMergeTreeSettings() const
if (!shared->merge_tree_settings)
{
auto & config = getConfigRef();
shared->merge_tree_settings = std::make_unique<MergeTreeSettings>();
shared->merge_tree_settings.emplace();
shared->merge_tree_settings->loadFromConfig("merge_tree", config);
}
@ -1727,7 +1734,6 @@ void Context::reloadConfig() const
void Context::shutdown()
{
system_logs.reset();
shared->shutdown();
}

View File

@ -113,8 +113,6 @@ private:
using Shared = std::shared_ptr<ContextShared>;
Shared shared;
std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory;
ClientInfo client_info;
ExternalTablesInitializer external_tables_initializer_callback;
@ -133,7 +131,6 @@ private:
Context * query_context = nullptr;
Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
Context * global_context = nullptr; /// Global context or nullptr. Could be equal to this.
SystemLogsPtr system_logs; /// Used to log queries and operations on parts
UInt64 session_close_cycle = 0;
bool session_is_used = false;
@ -149,7 +146,7 @@ private:
public:
/// Create initial Context with ContextShared and etc.
static Context createGlobal(std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory);
static Context createGlobal(std::unique_ptr<IRuntimeComponentsFactory> runtime_components_factory);
static Context createGlobal();
Context(const Context &) = default;
@ -236,6 +233,8 @@ public:
void setCurrentDatabase(const String & name);
void setCurrentQueryId(const String & query_id);
void killCurrentQuery();
void setInsertionTable(std::pair<String, String> && db_and_table) { insertion_table = db_and_table; }
const std::pair<String, String> & getInsertionTable() const { return insertion_table; }

View File

@ -0,0 +1,225 @@
#include <Common/typeid_cast.h>
#include <Interpreters/CrossToInnerJoinVisitor.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ParserTablesInSelectQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// It checks if where expression could be moved to JOIN ON expression partially or entirely.
class CheckExpressionVisitorData
{
public:
using TypeToVisit = const ASTFunction;
CheckExpressionVisitorData(const std::vector<DatabaseAndTableWithAlias> & tables_)
: tables(tables_)
, save_where(false)
, flat_ands(true)
{}
void visit(const ASTFunction & node, ASTPtr & ast)
{
if (node.name == "and")
{
if (!node.arguments || node.arguments->children.empty())
throw Exception("Logical error: function requires argiment", ErrorCodes::LOGICAL_ERROR);
for (auto & child : node.arguments->children)
{
if (auto func = typeid_cast<const ASTFunction *>(child.get()))
{
if (func->name == "and")
flat_ands = false;
visit(*func, child);
}
else
save_where = true;
}
}
else if (node.name == "equals")
{
if (checkEquals(node))
asts_to_join_on.push_back(ast);
else
save_where = true;
}
else
save_where = true;
}
bool matchAny() const { return !asts_to_join_on.empty(); }
bool matchAll() const { return matchAny() && !save_where; }
bool canReuseWhere() const { return matchAll() && flat_ands; }
ASTPtr makeOnExpression()
{
if (asts_to_join_on.size() == 1)
return asts_to_join_on[0]->clone();
std::vector<ASTPtr> arguments;
arguments.reserve(asts_to_join_on.size());
for (auto & ast : asts_to_join_on)
arguments.emplace_back(ast->clone());
return makeASTFunction("and", std::move(arguments));
}
private:
const std::vector<DatabaseAndTableWithAlias> & tables;
std::vector<ASTPtr> asts_to_join_on;
bool save_where;
bool flat_ands;
bool checkEquals(const ASTFunction & node)
{
if (!node.arguments)
throw Exception("Logical error: function requires argiment", ErrorCodes::LOGICAL_ERROR);
if (node.arguments->children.size() != 2)
return false;
auto left = typeid_cast<const ASTIdentifier *>(node.arguments->children[0].get());
auto right = typeid_cast<const ASTIdentifier *>(node.arguments->children[1].get());
if (!left || !right)
return false;
return checkIdentifiers(*left, *right);
}
/// Check if the identifiers are from different joined tables. If it's a self joint, tables should have aliases.
/// select * from t1 a cross join t2 b where a.x = b.x
bool checkIdentifiers(const ASTIdentifier & left, const ASTIdentifier & right)
{
/// {best_match, berst_table_pos}
std::pair<size_t, size_t> left_best{0, 0};
std::pair<size_t, size_t> right_best{0, 0};
for (size_t i = 0; i < tables.size(); ++i)
{
size_t match = IdentifierSemantic::canReferColumnToTable(left, tables[i]);
if (match > left_best.first)
{
left_best.first = match;
left_best.second = i;
}
match = IdentifierSemantic::canReferColumnToTable(right, tables[i]);
if (match > right_best.first)
{
right_best.first = match;
right_best.second = i;
}
}
return left_best.first && right_best.first && (left_best.second != right_best.second);
}
};
static bool extractTableName(const ASTTableExpression & expr, std::vector<DatabaseAndTableWithAlias> & names)
{
/// Subselects are not supported.
if (!expr.database_and_table_name)
return false;
names.emplace_back(DatabaseAndTableWithAlias(expr));
return true;
}
static ASTPtr getCrossJoin(ASTSelectQuery & select, std::vector<DatabaseAndTableWithAlias> & table_names)
{
if (!select.tables)
return {};
auto tables = typeid_cast<const ASTTablesInSelectQuery *>(select.tables.get());
if (!tables)
return {};
size_t num_tables = tables->children.size();
if (num_tables != 2)
return {};
auto left = typeid_cast<const ASTTablesInSelectQueryElement *>(tables->children[0].get());
auto right = typeid_cast<const ASTTablesInSelectQueryElement *>(tables->children[1].get());
if (!left || !right || !right->table_join)
return {};
if (auto join = typeid_cast<const ASTTableJoin *>(right->table_join.get()))
{
if (join->kind == ASTTableJoin::Kind::Cross ||
join->kind == ASTTableJoin::Kind::Comma)
{
if (!join->children.empty())
throw Exception("Logical error: CROSS JOIN has expressions", ErrorCodes::LOGICAL_ERROR);
auto & left_expr = typeid_cast<const ASTTableExpression &>(*left->table_expression);
auto & right_expr = typeid_cast<const ASTTableExpression &>(*right->table_expression);
table_names.reserve(2);
if (extractTableName(left_expr, table_names) &&
extractTableName(right_expr, table_names))
return right->table_join;
}
}
return {};
}
std::vector<ASTPtr *> CrossToInnerJoinMatcher::visit(ASTPtr & ast, Data & data)
{
if (auto * t = typeid_cast<ASTSelectQuery *>(ast.get()))
visit(*t, ast, data);
return {};
}
void CrossToInnerJoinMatcher::visit(ASTSelectQuery & select, ASTPtr & ast, Data & data)
{
using CheckExpressionMatcher = OneTypeMatcher<CheckExpressionVisitorData, false>;
using CheckExpressionVisitor = InDepthNodeVisitor<CheckExpressionMatcher, true>;
std::vector<DatabaseAndTableWithAlias> table_names;
ASTPtr ast_join = getCrossJoin(select, table_names);
if (!ast_join)
return;
CheckExpressionVisitor::Data visitor_data{table_names};
CheckExpressionVisitor(visitor_data).visit(select.where_expression);
if (visitor_data.matchAny())
{
auto & join = typeid_cast<ASTTableJoin &>(*ast_join);
join.kind = ASTTableJoin::Kind::Inner;
join.strictness = ASTTableJoin::Strictness::All;
if (visitor_data.canReuseWhere())
join.on_expression.swap(select.where_expression);
else
join.on_expression = visitor_data.makeOnExpression();
if (visitor_data.matchAll())
select.where_expression.reset();
join.children.push_back(join.on_expression);
}
ast = ast->clone(); /// rewrite AST in right manner
data.done = true;
}
}

Some files were not shown because too many files have changed in this diff Show More