diff --git a/.gitignore b/.gitignore index c24b17d2a44..61c389b6577 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,10 @@ CTestTestfile.cmake *.a *.o +# Python cache +*.pyc +__pycache__ + # ignore generated files *-metrika-yandex diff --git a/contrib/libboost/boost_1_62_0/boost/type_traits.hpp b/contrib/libboost/boost_1_62_0/boost/type_traits.hpp new file mode 100644 index 00000000000..7d651ce28d5 --- /dev/null +++ b/contrib/libboost/boost_1_62_0/boost/type_traits.hpp @@ -0,0 +1,150 @@ +// (C) Copyright John Maddock 2000. +// Use, modification and distribution are subject to the Boost Software License, +// Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt). +// +// See http://www.boost.org/libs/type_traits for most recent version including documentation. + +// See boost/type_traits/*.hpp for full copyright notices. + +#ifndef BOOST_TYPE_TRAITS_HPP +#define BOOST_TYPE_TRAITS_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#if !defined(__BORLANDC__) && !defined(__CUDACC__) +#include +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if !(defined(__sgi) && defined(__EDG_VERSION__) && (__EDG_VERSION__ == 238)) +#include +#include +#endif + +#endif // BOOST_TYPE_TRAITS_HPP diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index c178dac5c01..1e58b28674b 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -67,8 +67,8 @@ add_headers_only(dbms src/Server) list (APPEND dbms_sources ${CONFIG_BUILD}) list (APPEND dbms_headers ${CONFIG_VERSION} ${CONFIG_COMMON}) -list (APPEND dbms_sources src/Functions/IFunction.cpp src/Functions/FunctionFactory.cpp src/Functions/DataTypeTraits.cpp) -list (APPEND dbms_headers src/Functions/IFunction.h src/Functions/FunctionFactory.h src/Functions/DataTypeTraits.h) +list (APPEND dbms_sources src/Functions/IFunction.cpp src/Functions/FunctionFactory.cpp) +list (APPEND dbms_headers src/Functions/IFunction.h src/Functions/FunctionFactory.h) list (APPEND dbms_sources src/AggregateFunctions/AggregateFunctionFactory.cpp @@ -98,6 +98,7 @@ list (APPEND dbms_headers src/TableFunctions/ITableFunction.h src/TableFunctions list(REMOVE_ITEM dbms_sources src/Client/Client.cpp src/Client/Benchmark.cpp + src/Client/PerformanceTest.cpp src/Storages/StorageCloud.cpp src/Databases/DatabaseCloud.cpp src/Common/StringUtils.cpp) diff --git a/dbms/src/Client/CMakeLists.txt b/dbms/src/Client/CMakeLists.txt index da5dc27cdb5..1439b738ea6 100644 --- a/dbms/src/Client/CMakeLists.txt +++ b/dbms/src/Client/CMakeLists.txt @@ -5,6 +5,9 @@ install (FILES config.xml DESTINATION ${CLICKHOUSE_ETC_DIR}/clickhouse-client CO add_library (clickhouse-benchmark Benchmark.cpp) target_link_libraries (clickhouse-benchmark dbms ${Boost_PROGRAM_OPTIONS_LIBRARY}) +add_library (clickhouse-performance-test PerformanceTest.cpp) +target_link_libraries (clickhouse-performance-test dbms ${Boost_PROGRAM_OPTIONS_LIBRARY}) + if (ENABLE_TESTS) add_subdirectory (tests) endif () diff --git a/dbms/src/Client/ConnectionPool.h b/dbms/src/Client/ConnectionPool.h index 8ecc21693ce..056c315089f 100644 --- a/dbms/src/Client/ConnectionPool.h +++ b/dbms/src/Client/ConnectionPool.h @@ -28,8 +28,9 @@ public: public: virtual ~IConnectionPool() {} - /** Selects the connection to work. */ - virtual Entry get(const Settings * settings = nullptr) = 0; + /// Selects the connection to work. + /// If force_connected is false, the client must manually ensure that returned connection is good. + virtual Entry get(const Settings * settings = nullptr, bool force_connected = true) = 0; }; using ConnectionPoolPtr = std::shared_ptr; @@ -77,12 +78,18 @@ public: { } - Entry get(const Settings * settings = nullptr) override + Entry get(const Settings * settings = nullptr, bool force_connected = true) override { + Entry entry; if (settings) - return Base::get(settings->queue_max_wait_ms.totalMilliseconds()); + entry = Base::get(settings->queue_max_wait_ms.totalMilliseconds()); else - return Base::get(-1); + entry = Base::get(-1); + + if (force_connected) + entry->forceConnected(); + + return entry; } const std::string & getHost() const diff --git a/dbms/src/Client/ConnectionPoolWithFailover.cpp b/dbms/src/Client/ConnectionPoolWithFailover.cpp index 3ffd36b7521..0e38f141494 100644 --- a/dbms/src/Client/ConnectionPoolWithFailover.cpp +++ b/dbms/src/Client/ConnectionPoolWithFailover.cpp @@ -42,7 +42,7 @@ ConnectionPoolWithFailover::ConnectionPoolWithFailover( } } -IConnectionPool::Entry ConnectionPoolWithFailover::get(const Settings * settings) +IConnectionPool::Entry ConnectionPoolWithFailover::get(const Settings * settings, bool force_connected) { TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) { @@ -131,7 +131,7 @@ ConnectionPoolWithFailover::tryGetEntry( TryResult result; try { - result.entry = pool.get(settings); + result.entry = pool.get(settings, /* force_connected = */ false); String server_name; UInt64 server_version_major; diff --git a/dbms/src/Client/ConnectionPoolWithFailover.h b/dbms/src/Client/ConnectionPoolWithFailover.h index fd8a20fcbdb..bf2155c04fd 100644 --- a/dbms/src/Client/ConnectionPoolWithFailover.h +++ b/dbms/src/Client/ConnectionPoolWithFailover.h @@ -47,7 +47,7 @@ public: using Entry = IConnectionPool::Entry; /** Allocates connection to work. */ - Entry get(const Settings * settings = nullptr) override; /// From IConnectionPool + Entry get(const Settings * settings = nullptr, bool force_connected = true) override; /// From IConnectionPool /** Allocates up to the specified number of connections to work. * Connections provide access to different replicas of one shard. diff --git a/dbms/src/Client/PerformanceTest.cpp b/dbms/src/Client/PerformanceTest.cpp new file mode 100644 index 00000000000..46f7821b3b4 --- /dev/null +++ b/dbms/src/Client/PerformanceTest.cpp @@ -0,0 +1,1567 @@ +#include +#include +#include +#include +#if __has_include() + #include +#endif +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#include +#include +#include +#include +#include + +#include "InterruptListener.h" + +/** Tests launcher for ClickHouse. + * The tool walks through given or default folder in order to find files with + * tests' descriptions and launches it. + */ +namespace FS = boost::filesystem; +using String = std::string; +const String FOUR_SPACES = " "; + +namespace DB +{ +namespace ErrorCodes +{ + extern const int POCO_EXCEPTION; + extern const int STD_EXCEPTION; + extern const int UNKNOWN_EXCEPTION; + extern const int NOT_IMPLEMENTED; +} + +static String pad(size_t padding) { + return String(padding * 4, ' '); +} + +class JSONString +{ +private: + std::map content; + size_t padding; + +public: + JSONString(size_t padding_ = 1) : padding(padding_){}; + + void set(const String key, String value, bool wrap = true) + { + if (value.empty()) + value = "null"; + + bool reserved = (value[0] == '[' || value[0] == '{' || value == "null"); + if (!reserved && wrap) + value = '\"' + value + '\"'; + + content[key] = value; + } + + template + typename std::enable_if::value>::type set(const String key, T value) + { + set(key, std::to_string(value), /*wrap= */false); + } + + void set(const String key, const std::vector & run_infos) + { + String value = "[\n"; + + for (size_t i = 0; i < run_infos.size(); ++i) + { + value += pad(padding + 1) + run_infos[i].asString(padding + 2); + if (i != run_infos.size() - 1) + value += ','; + + value += "\n"; + } + + value += pad(padding) + ']'; + content[key] = value; + } + + String asString() const { return asString(padding); } + String asString(size_t padding) const + { + String repr = "{"; + + for (auto it = content.begin(); it != content.end(); ++it) + { + if (it != content.begin()) + repr += ','; + /// construct "key": "value" string with padding + repr += "\n" + pad(padding) + '\"' + it->first + '\"' + ": " + it->second; + } + + repr += "\n" + pad(padding - 1) + '}'; + return repr; + } +}; + +enum class PriorityType +{ + Min, + Max +}; + +struct CriterionWithPriority +{ + PriorityType priority; + size_t value; + bool fulfilled; + + CriterionWithPriority() : value(0), fulfilled(false) + { + } + CriterionWithPriority(const CriterionWithPriority &) = default; +}; + +/// Termination criterions. The running test will be terminated in either of two conditions: +/// 1. All criterions marked 'min' are fulfilled +/// or +/// 2. Any criterion marked 'max' is fulfilled +class StopCriterions +{ +private: + using AbstractConfiguration = Poco::AutoPtr; + using Keys = std::vector; + + void initializeStruct(const String & priority, const AbstractConfiguration & stop_criterions_view) + { + Keys keys; + stop_criterions_view->keys(priority, keys); + + PriorityType priority_type = (priority == "min" ? PriorityType::Min : PriorityType::Max); + + for (const String & key : keys) + { + if (key == "timeout_ms") + { + timeout_ms.value = stop_criterions_view->getUInt64(priority + ".timeout_ms"); + timeout_ms.priority = priority_type; + } + else if (key == "rows_read") + { + rows_read.value = stop_criterions_view->getUInt64(priority + ".rows_read"); + rows_read.priority = priority_type; + } + else if (key == "bytes_read_uncompressed") + { + bytes_read_uncompressed.value = stop_criterions_view->getUInt64(priority + ".bytes_read_uncompressed"); + bytes_read_uncompressed.priority = priority_type; + } + else if (key == "iterations") + { + iterations.value = stop_criterions_view->getUInt64(priority + ".iterations"); + iterations.priority = priority_type; + } + else if (key == "min_time_not_changing_for_ms") + { + min_time_not_changing_for_ms.value = stop_criterions_view->getUInt64(priority + ".min_time_not_changing_for_ms"); + min_time_not_changing_for_ms.priority = priority_type; + } + else if (key == "max_speed_not_changing_for_ms") + { + max_speed_not_changing_for_ms.value = stop_criterions_view->getUInt64(priority + ".max_speed_not_changing_for_ms"); + max_speed_not_changing_for_ms.priority = priority_type; + } + else if (key == "average_speed_not_changing_for_ms") + { + average_speed_not_changing_for_ms.value = stop_criterions_view->getUInt64(priority + ".average_speed_not_changing_for_ms"); + average_speed_not_changing_for_ms.priority = priority_type; + } + else + { + throw DB::Exception("Met unkown stop criterion: " + key, 1); + } + + if (priority == "min") + { + ++number_of_initialized_min; + }; + if (priority == "max") + { + ++number_of_initialized_max; + }; + } + } + +public: + StopCriterions() : number_of_initialized_min(0), number_of_initialized_max(0), fulfilled_criterions_min(0), fulfilled_criterions_max(0) + { + } + + StopCriterions(const StopCriterions & another_criterions) + : timeout_ms(another_criterions.timeout_ms), + rows_read(another_criterions.rows_read), + bytes_read_uncompressed(another_criterions.bytes_read_uncompressed), + iterations(another_criterions.iterations), + min_time_not_changing_for_ms(another_criterions.min_time_not_changing_for_ms), + max_speed_not_changing_for_ms(another_criterions.max_speed_not_changing_for_ms), + average_speed_not_changing_for_ms(another_criterions.average_speed_not_changing_for_ms), + + number_of_initialized_min(another_criterions.number_of_initialized_min), + number_of_initialized_max(another_criterions.number_of_initialized_max), + fulfilled_criterions_min(another_criterions.fulfilled_criterions_min), + fulfilled_criterions_max(another_criterions.fulfilled_criterions_max) + { + } + + void loadFromConfig(const AbstractConfiguration & stop_criterions_view) + { + if (stop_criterions_view->has("min")) + { + initializeStruct("min", stop_criterions_view); + } + + if (stop_criterions_view->has("max")) + { + initializeStruct("max", stop_criterions_view); + } + } + + void reset() + { + timeout_ms.fulfilled = false; + rows_read.fulfilled = false; + bytes_read_uncompressed.fulfilled = false; + iterations.fulfilled = false; + min_time_not_changing_for_ms.fulfilled = false; + max_speed_not_changing_for_ms.fulfilled = false; + average_speed_not_changing_for_ms.fulfilled = false; + + fulfilled_criterions_min = 0; + fulfilled_criterions_max = 0; + } + + CriterionWithPriority timeout_ms; + CriterionWithPriority rows_read; + CriterionWithPriority bytes_read_uncompressed; + CriterionWithPriority iterations; + CriterionWithPriority min_time_not_changing_for_ms; + CriterionWithPriority max_speed_not_changing_for_ms; + CriterionWithPriority average_speed_not_changing_for_ms; + + /// Hereafter 'min' and 'max', in context of critetions, mean a level of importance + /// Number of initialized properties met in configuration + size_t number_of_initialized_min; + size_t number_of_initialized_max; + + size_t fulfilled_criterions_min; + size_t fulfilled_criterions_max; +}; + +struct Stats +{ + Stopwatch watch; + Stopwatch watch_per_query; + Stopwatch min_time_watch; + Stopwatch max_rows_speed_watch; + Stopwatch max_bytes_speed_watch; + Stopwatch avg_rows_speed_watch; + Stopwatch avg_bytes_speed_watch; + size_t queries; + size_t rows_read; + size_t bytes_read; + + using Sampler = ReservoirSampler; + Sampler sampler{1 << 16}; + + /// min_time in ms + UInt64 min_time = std::numeric_limits::max(); + double total_time = 0; + + double max_rows_speed = 0; + double max_bytes_speed = 0; + + double avg_rows_speed_value = 0; + double avg_rows_speed_first = 0; + static double avg_rows_speed_precision; + + double avg_bytes_speed_value = 0; + double avg_bytes_speed_first = 0; + static double avg_bytes_speed_precision; + + size_t number_of_rows_speed_info_batches = 0; + size_t number_of_bytes_speed_info_batches = 0; + + bool ready = false; // check if a query wasn't interrupted by SIGINT + + String getStatisticByName(const String & statistic_name) + { + if (statistic_name == "min_time") + { + return std::to_string(min_time) + "ms"; + } + if (statistic_name == "quantiles") + { + String result = "\n"; + + for (double percent = 10; percent <= 90; percent += 10) + { + result += FOUR_SPACES + std::to_string((percent / 100)); + result += ": " + std::to_string(sampler.quantileInterpolated(percent / 100.0)); + result += "\n"; + } + result += FOUR_SPACES + "0.95: " + std::to_string(sampler.quantileInterpolated(95 / 100.0)) + "\n"; + result += FOUR_SPACES + "0.99: " + std::to_string(sampler.quantileInterpolated(99 / 100.0)) + "\n"; + result += FOUR_SPACES + "0.999: " + std::to_string(sampler.quantileInterpolated(99.9 / 100.)) + "\n"; + result += FOUR_SPACES + "0.9999: " + std::to_string(sampler.quantileInterpolated(99.99 / 100.)); + + return result; + } + if (statistic_name == "total_time") + { + return std::to_string(total_time) + "s"; + } + if (statistic_name == "queries_per_second") + { + return std::to_string(queries / total_time); + } + if (statistic_name == "rows_per_second") + { + return std::to_string(rows_read / total_time); + } + if (statistic_name == "bytes_per_second") + { + return std::to_string(bytes_read / total_time); + } + + if (statistic_name == "max_rows_per_second") + { + return std::to_string(max_rows_speed); + } + if (statistic_name == "max_bytes_per_second") + { + return std::to_string(max_bytes_speed); + } + if (statistic_name == "avg_rows_per_second") + { + return std::to_string(avg_rows_speed_value); + } + if (statistic_name == "avg_bytes_per_second") + { + return std::to_string(avg_bytes_speed_value); + } + + return ""; + } + + void update_min_time(const UInt64 min_time_candidate) + { + if (min_time_candidate < min_time) + { + min_time = min_time_candidate; + min_time_watch.restart(); + } + } + + void update_average_speed(const double new_speed_info, + Stopwatch & avg_speed_watch, + size_t & number_of_info_batches, + double precision, + double & avg_speed_first, + double & avg_speed_value) + { + avg_speed_value = ((avg_speed_value * number_of_info_batches) + new_speed_info); + avg_speed_value /= (++number_of_info_batches); + + if (avg_speed_first == 0) + { + avg_speed_first = avg_speed_value; + } + + if (abs(avg_speed_value - avg_speed_first) >= precision) + { + avg_speed_first = avg_speed_value; + avg_speed_watch.restart(); + } + } + + void update_max_speed(const size_t max_speed_candidate, Stopwatch & max_speed_watch, double & max_speed) + { + if (max_speed_candidate > max_speed) + { + max_speed = max_speed_candidate; + max_speed_watch.restart(); + } + } + + void add(size_t rows_read_inc, size_t bytes_read_inc) + { + rows_read += rows_read_inc; + bytes_read += bytes_read_inc; + + double new_rows_speed = rows_read_inc / watch_per_query.elapsedSeconds(); + double new_bytes_speed = bytes_read_inc / watch_per_query.elapsedSeconds(); + + /// Update rows speed + update_max_speed(new_rows_speed, max_rows_speed_watch, max_rows_speed); + update_average_speed(new_rows_speed, + avg_rows_speed_watch, + number_of_rows_speed_info_batches, + avg_rows_speed_precision, + avg_rows_speed_first, + avg_rows_speed_value); + /// Update bytes speed + update_max_speed(new_bytes_speed, max_bytes_speed_watch, max_bytes_speed); + update_average_speed(new_bytes_speed, + avg_bytes_speed_watch, + number_of_bytes_speed_info_batches, + avg_bytes_speed_precision, + avg_bytes_speed_first, + avg_bytes_speed_value); + } + + void updateQueryInfo() + { + ++queries; + sampler.insert(watch_per_query.elapsedSeconds()); + update_min_time(watch_per_query.elapsed() / (1000 * 1000)); /// ns to ms + } + + void setTotalTime() + { + total_time = watch.elapsedSeconds(); + } + + void clear() + { + watch.restart(); + watch_per_query.restart(); + min_time_watch.restart(); + max_rows_speed_watch.restart(); + max_bytes_speed_watch.restart(); + avg_rows_speed_watch.restart(); + avg_bytes_speed_watch.restart(); + + sampler.clear(); + + queries = 0; + rows_read = 0; + bytes_read = 0; + + min_time = std::numeric_limits::max(); + total_time = 0; + max_rows_speed = 0; + max_bytes_speed = 0; + avg_rows_speed_value = 0; + avg_bytes_speed_value = 0; + avg_rows_speed_first = 0; + avg_bytes_speed_first = 0; + avg_rows_speed_precision = 0.001; + avg_bytes_speed_precision = 0.001; + number_of_rows_speed_info_batches = 0; + number_of_bytes_speed_info_batches = 0; + } +}; + +double Stats::avg_rows_speed_precision = 0.001; +double Stats::avg_bytes_speed_precision = 0.001; + +class PerformanceTest +{ +public: + using Strings = std::vector; + + PerformanceTest( + const String & host_, + const UInt16 port_, + const String & default_database_, + const String & user_, + const String & password_, + const bool & lite_output_, + const String & profiles_file_, + Strings && input_files_, + Strings && tests_tags_, + Strings && skip_tags_, + Strings && tests_names_, + Strings && skip_names_, + Strings && tests_names_regexp_, + Strings && skip_names_regexp_ + ) + : connection(host_, port_, default_database_, user_, password_), + gotSIGINT(false), + lite_output(lite_output_), + profiles_file(profiles_file_), + input_files(input_files_), + tests_tags(std::move(tests_tags_)), + skip_tags(std::move(skip_tags_)), + tests_names(std::move(tests_names_)), + skip_names(std::move(skip_names_)), + tests_names_regexp(std::move(tests_names_regexp_)), + skip_names_regexp(std::move(skip_names_regexp_)) + { + if (input_files.size() < 1) + { + throw DB::Exception("No tests were specified", 0); + } + + std::cerr << std::fixed << std::setprecision(3); + std::cout << std::fixed << std::setprecision(3); + processTestsConfigurations(input_files); + } + +private: + String test_name; + + using Query = String; + using Queries = std::vector; + using QueriesWithIndexes = std::vector>; + Queries queries; + + Connection connection; + + using Keys = std::vector; + + Settings settings; + + InterruptListener interrupt_listener; + + using XMLConfiguration = Poco::Util::XMLConfiguration; + using AbstractConfig = Poco::AutoPtr; + using Config = Poco::AutoPtr; + + using Paths = std::vector; + using StringToVector = std::map>; + StringToVector substitutions; + + using StringKeyValue = std::map; + std::vector substitutions_maps; + + bool gotSIGINT; + std::vector stop_criterions; + String main_metric; + bool lite_output; + String profiles_file; + + Strings input_files; + std::vector tests_configurations; + + Strings tests_tags; + Strings skip_tags; + Strings tests_names; + Strings skip_names; + Strings tests_names_regexp; + Strings skip_names_regexp; + + #define incFulfilledCriterions(index, CRITERION) \ + if (!stop_criterions[index].CRITERION.fulfilled) \ + { \ + stop_criterions[index].CRITERION.priority == PriorityType::Min ? ++stop_criterions[index].fulfilled_criterions_min \ + : ++stop_criterions[index].fulfilled_criterions_max; \ + stop_criterions[index].CRITERION.fulfilled = true; \ + } + + enum class ExecutionType + { + Loop, + Once + }; + ExecutionType exec_type; + + enum class FilterType + { + Tag, + Name, + Name_regexp + }; + + size_t times_to_run = 1; + std::vector statistics; + + /// Removes configurations that has a given value. If leave is true, the logic is reversed. + void removeConfigurationsIf(std::vector & configs, FilterType filter_type, const Strings & values, bool leave = false) + { + auto checker = [&filter_type, &values, &leave](Config & config) { + if (values.size() == 0) + return false; + + bool remove_or_not = false; + + if (filter_type == FilterType::Tag) + { + Keys tags_keys; + config->keys("tags", tags_keys); + + Strings tags(tags_keys.size()); + for (size_t i = 0; i != tags_keys.size(); ++i) + tags[i] = config->getString("tags.tag[" + std::to_string(i) + "]"); + + for (const String & config_tag : tags) { + if (std::find(values.begin(), values.end(), config_tag) != values.end()) + remove_or_not = true; + } + } + + if (filter_type == FilterType::Name) + { + remove_or_not = (std::find(values.begin(), values.end(), config->getString("name", "")) != values.end()); + } + + if (filter_type == FilterType::Name_regexp) + { + String config_name = config->getString("name", ""); + auto regex_checker = [&config_name](const String & name_regexp) { + std::regex pattern(name_regexp); + return std::regex_search(config_name, pattern); + }; + + remove_or_not = config->has("name") ? (std::find_if(values.begin(), values.end(), regex_checker) != values.end()) + : false; + } + + if (leave) + remove_or_not = !remove_or_not; + return remove_or_not; + }; + + std::vector::iterator new_end = std::remove_if(configs.begin(), configs.end(), checker); + configs.erase(new_end, configs.end()); + } + + /// Filter tests by tags, names, regexp matching, etc. + void filterConfigurations() + { + /// Leave tests: + removeConfigurationsIf(tests_configurations, FilterType::Tag, tests_tags, true); + removeConfigurationsIf(tests_configurations, FilterType::Name, tests_names, true); + removeConfigurationsIf(tests_configurations, FilterType::Name_regexp, tests_names_regexp, true); + + + /// Skip tests + removeConfigurationsIf(tests_configurations, FilterType::Tag, skip_tags, false); + removeConfigurationsIf(tests_configurations, FilterType::Name, skip_names, false); + removeConfigurationsIf(tests_configurations, FilterType::Name_regexp, skip_names_regexp, false); + } + + /// Checks specified preconditions per test (process cache, table existence, etc.) + bool checkPreconditions(const Config & config) + { + if (!config->has("preconditions")) + return true; + + Keys preconditions; + config->keys("preconditions", preconditions); + size_t table_precondition_index = 0; + + for (const String & precondition : preconditions) + { + if (precondition == "reset_cpu_cache") + if (system("(>&2 echo 'Flushing cache...') && (sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches') && (>&2 echo 'Flushed.')")) { + std::cerr << "Failed to flush cache" << std::endl; + return false; + } + + if (precondition == "ram_size") + { +#if __has_include() + struct sysinfo *system_information = new struct sysinfo(); + if (sysinfo(system_information)) + { + std::cerr << "Failed to check system RAM size" << std::endl; + delete system_information; + } + else + { + size_t ram_size_needed = config->getUInt64("preconditions.ram_size"); + size_t actual_ram = system_information->totalram / 1024 / 1024; + if (ram_size_needed > actual_ram) + { + std::cerr << "Not enough RAM" << std::endl; + delete system_information; + return false; + } + } +#else + throw DB::Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); +#endif + } + + if (precondition == "table_exists") + { + String precondition_key = "preconditions.table_exists[" + std::to_string(table_precondition_index++) + "]"; + String table_to_check = config->getString(precondition_key); + String query = "EXISTS TABLE " + table_to_check + ";"; + + size_t exist = 0; + + connection.sendQuery(query, "", QueryProcessingStage::Complete, &settings, nullptr, false); + + while (true) + { + Connection::Packet packet = connection.receivePacket(); + + if (packet.type == Protocol::Server::Data) { + for (const ColumnWithTypeAndName & column : packet.block.getColumns()) + { + if (column.name == "result" && column.column->getDataAt(0).data != nullptr) { + exist = column.column->get64(0); + } + } + } + + if (packet.type == Protocol::Server::Exception || packet.type == Protocol::Server::EndOfStream) + break; + } + + if (exist == 0) { + std::cerr << "Table " + table_to_check + " doesn't exist" << std::endl; + return false; + } + } + } + + return true; + } + + void processTestsConfigurations(const Paths & input_files) + { + tests_configurations.resize(input_files.size()); + + for (size_t i = 0; i != input_files.size(); ++i) + { + const String path = input_files[i]; + tests_configurations[i] = Config(new XMLConfiguration(path)); + } + + filterConfigurations(); + + if (tests_configurations.size()) + { + Strings outputs; + + for (auto & test_config : tests_configurations) + { + if (!checkPreconditions(test_config)) + { + std::cerr << "Preconditions are not fulfilled for test \"" + test_config->getString("name", "") + "\""; + continue; + } + + String output = runTest(test_config); + if (lite_output) + std::cout << output << std::endl; + else + outputs.push_back(output); + } + + if (!lite_output && outputs.size()) + { + std::cout << "[" << std::endl; + + for (size_t i = 0; i != outputs.size(); ++i) + { + std::cout << outputs[i]; + if (i != outputs.size() - 1) + std::cout << ","; + + std::cout << std::endl; + } + + std::cout << "]" << std::endl; + } + } + } + + void extractSettings(const Config & config, const String & key, + const Strings & settings_list, + std::map settings_to_apply) + { + for (const String & setup : settings_list) + { + if (setup == "profile") + continue; + + String value = config->getString(key + "." + setup); + if (value.empty()) + value = "true"; + + settings_to_apply[setup] = value; + } + } + + String runTest(Config & test_config) + { + queries.clear(); + + test_name = test_config->getString("name"); + std::cerr << "Running: " << test_name << "\n"; + + if (test_config->has("settings")) + { + std::map settings_to_apply; + Keys config_settings; + test_config->keys("settings", config_settings); + + /// Preprocess configuration file + if (std::find(config_settings.begin(), config_settings.end(), "profile") != config_settings.end()) + { + if (!profiles_file.empty()) + { + String profile_name = test_config->getString("settings.profile"); + Config profiles_config(new XMLConfiguration(profiles_file)); + + Keys profile_settings; + profiles_config->keys("profiles." + profile_name, profile_settings); + + extractSettings(profiles_config, "profiles." + profile_name, profile_settings, settings_to_apply); + } + } + + extractSettings(test_config, "settings", config_settings, settings_to_apply); + + /// This macro goes through all settings in the Settings.h + /// and, if found any settings in test's xml configuration + /// with the same name, sets its value to settings + std::map::iterator it; + #define EXTRACT_SETTING(TYPE, NAME, DEFAULT) \ + it = settings_to_apply.find(#NAME); \ + if (it != settings_to_apply.end()) \ + settings.set(#NAME, settings_to_apply[#NAME]); + + APPLY_FOR_SETTINGS(EXTRACT_SETTING) + APPLY_FOR_LIMITS(EXTRACT_SETTING) + + #undef EXTRACT_SETTING + + if (std::find(config_settings.begin(), config_settings.end(), "average_rows_speed_precision") != config_settings.end()) + { + Stats::avg_rows_speed_precision = test_config->getDouble("settings.average_rows_speed_precision"); + } + + if (std::find(config_settings.begin(), config_settings.end(), "average_bytes_speed_precision") != config_settings.end()) + { + Stats::avg_bytes_speed_precision = test_config->getDouble("settings.average_bytes_speed_precision"); + } + } + + Query query; + + if (!test_config->has("query") && !test_config->has("query_file")) + { + throw DB::Exception("Missing query fields in test's config: " + test_name, 1); + } + + if (test_config->has("query") && test_config->has("query_file")) + { + throw DB::Exception("Found both query and query_file fields. Choose only one", 1); + } + + if (test_config->has("query")) + queries.push_back(test_config->getString("query")); + + if (test_config->has("query_file")) + { + const String filename = test_config->getString("query_file"); + if (filename.empty()) + throw DB::Exception("Empty file name", 1); + + bool tsv = FS::path(filename).extension().string() == ".tsv"; + + ReadBufferFromFile query_file(filename); + + if (tsv) + { + while (!query_file.eof()) + { + readEscapedString(query, query_file); + assertChar('\n', query_file); + queries.push_back(query); + } + } + else + { + readStringUntilEOF(query, query_file); + queries.push_back(query); + } + } + + if (queries.empty()) + { + throw DB::Exception("Did not find any query to execute: " + test_name, 1); + } + + if (test_config->has("substitutions")) + { + if (queries.size() > 1) + throw DB::Exception("Only one query is allowed when using substitutions", 1); + + /// Make "subconfig" of inner xml block + AbstractConfig substitutions_view(test_config->createView("substitutions")); + constructSubstitutions(substitutions_view, substitutions); + + queries = formatQueries(queries[0], substitutions); + } + + if (!test_config->has("type")) + { + throw DB::Exception("Missing type property in config: " + test_name, 1); + } + + String config_exec_type = test_config->getString("type"); + if (config_exec_type == "loop") + exec_type = ExecutionType::Loop; + else if (config_exec_type == "once") + exec_type = ExecutionType::Once; + else + throw DB::Exception("Unknown type " + config_exec_type + " in :" + test_name, 1); + + if (test_config->has("times_to_run")) + { + times_to_run = test_config->getUInt("times_to_run"); + } + + stop_criterions.resize(times_to_run * queries.size()); + + if (test_config->has("stop")) + { + AbstractConfig stop_criterions_view(test_config->createView("stop")); + for (StopCriterions & stop_criterion : stop_criterions) + { + stop_criterion.loadFromConfig(stop_criterions_view); + } + } + else + { + throw DB::Exception("No termination conditions were found in config", 1); + } + + AbstractConfig metrics_view(test_config->createView("metrics")); + Keys metrics; + metrics_view->keys(metrics); + + if (test_config->has("main_metric")) + { + Keys main_metrics; + test_config->keys("main_metric", main_metrics); + if (main_metrics.size()) + main_metric = main_metrics[0]; + } + + if (!main_metric.empty()) + { + if (std::find(metrics.begin(), metrics.end(), main_metric) == metrics.end()) + metrics.push_back(main_metric); + } + else + { + if (lite_output) + throw DB::Exception("Specify main_metric for lite output", 1); + } + + if (metrics.size() > 0) + checkMetricsInput(metrics); + + statistics.resize(times_to_run * queries.size()); + for (size_t number_of_launch = 0; number_of_launch < times_to_run; ++number_of_launch) + { + QueriesWithIndexes queries_with_indexes; + + for (size_t query_index = 0; query_index < queries.size(); ++query_index) + { + size_t statistic_index = number_of_launch * queries.size() + query_index; + stop_criterions[statistic_index].reset(); + + queries_with_indexes.push_back({queries[query_index], statistic_index}); + } + + if (interrupt_listener.check()) + gotSIGINT = true; + + if (gotSIGINT) + break; + + runQueries(queries_with_indexes); + } + + if (lite_output) + return minOutput(main_metric); + else + return constructTotalInfo(metrics); + } + + void checkMetricsInput(const Strings & metrics) const + { + std::vector loop_metrics + = {"min_time", "quantiles", "total_time", "queries_per_second", "rows_per_second", "bytes_per_second"}; + + std::vector non_loop_metrics + = {"max_rows_per_second", "max_bytes_per_second", "avg_rows_per_second", "avg_bytes_per_second"}; + + if (exec_type == ExecutionType::Loop) + { + for (const String & metric : metrics) + { + if (std::find(non_loop_metrics.begin(), non_loop_metrics.end(), metric) != non_loop_metrics.end()) + { + throw DB::Exception("Wrong type of metric for loop execution type (" + metric + ")", 1); + } + } + } + else + { + for (const String & metric : metrics) + { + if (std::find(loop_metrics.begin(), loop_metrics.end(), metric) != loop_metrics.end()) + { + throw DB::Exception("Wrong type of metric for non-loop execution type (" + metric + ")", 1); + } + } + } + } + + void runQueries(const QueriesWithIndexes & queries_with_indexes) + { + for (const std::pair & query_and_index : queries_with_indexes) + { + Query query = query_and_index.first; + const size_t statistic_index = query_and_index.second; + + size_t max_iterations = stop_criterions[statistic_index].iterations.value; + size_t iteration = 0; + + statistics[statistic_index].clear(); + execute(query, statistic_index); + + if (exec_type == ExecutionType::Loop) + { + while (!gotSIGINT) + { + ++iteration; + + /// check stop criterions + if (max_iterations && iteration >= max_iterations) + { + incFulfilledCriterions(statistic_index, iterations); + } + + if (stop_criterions[statistic_index].number_of_initialized_min + && (stop_criterions[statistic_index].fulfilled_criterions_min + >= stop_criterions[statistic_index].number_of_initialized_min)) + { + /// All 'min' criterions are fulfilled + break; + } + + if (stop_criterions[statistic_index].number_of_initialized_max && stop_criterions[statistic_index].fulfilled_criterions_max) + { + /// Some 'max' criterions are fulfilled + break; + } + + execute(query, statistic_index); + } + } + + if (!gotSIGINT) + { + statistics[statistic_index].ready = true; + } + } + } + + void execute(const Query & query, const size_t statistic_index) + { + statistics[statistic_index].watch_per_query.restart(); + + RemoteBlockInputStream stream(connection, query, &settings, nullptr, Tables() /*, query_processing_stage*/); + + Progress progress; + stream.setProgressCallback([&progress, &stream, statistic_index, this](const Progress & value) { + progress.incrementPiecewiseAtomically(value); + + this->checkFulfilledCriterionsAndUpdate(progress, stream, statistic_index); + }); + + stream.readPrefix(); + while (Block block = stream.read()) + ; + stream.readSuffix(); + + statistics[statistic_index].updateQueryInfo(); + statistics[statistic_index].setTotalTime(); + } + + void checkFulfilledCriterionsAndUpdate(const Progress & progress, + RemoteBlockInputStream & stream, + const size_t statistic_index) + { + statistics[statistic_index].add(progress.rows, progress.bytes); + + size_t max_rows_to_read = stop_criterions[statistic_index].rows_read.value; + if (max_rows_to_read && statistics[statistic_index].rows_read >= max_rows_to_read) + { + incFulfilledCriterions(statistic_index, rows_read); + } + + size_t max_bytes_to_read = stop_criterions[statistic_index].bytes_read_uncompressed.value; + if (max_bytes_to_read && statistics[statistic_index].bytes_read >= max_bytes_to_read) + { + incFulfilledCriterions(statistic_index, bytes_read_uncompressed); + } + + if (UInt64 max_timeout_ms = stop_criterions[statistic_index].timeout_ms.value) + { + /// cast nanoseconds to ms + if ((statistics[statistic_index].watch.elapsed() / (1000 * 1000)) > max_timeout_ms) + { + incFulfilledCriterions(statistic_index, timeout_ms); + } + } + + size_t min_time_not_changing_for_ms = stop_criterions[statistic_index].min_time_not_changing_for_ms.value; + if (min_time_not_changing_for_ms) + { + size_t min_time_did_not_change_for = statistics[statistic_index].min_time_watch.elapsed() / (1000 * 1000); + + if (min_time_did_not_change_for >= min_time_not_changing_for_ms) + { + incFulfilledCriterions(statistic_index, min_time_not_changing_for_ms); + } + } + + size_t max_speed_not_changing_for_ms = stop_criterions[statistic_index].max_speed_not_changing_for_ms.value; + if (max_speed_not_changing_for_ms) + { + UInt64 speed_not_changing_time = statistics[statistic_index].max_rows_speed_watch.elapsed() / (1000 * 1000); + if (speed_not_changing_time >= max_speed_not_changing_for_ms) + { + incFulfilledCriterions(statistic_index, max_speed_not_changing_for_ms); + } + } + + size_t average_speed_not_changing_for_ms = stop_criterions[statistic_index].average_speed_not_changing_for_ms.value; + if (average_speed_not_changing_for_ms) + { + UInt64 speed_not_changing_time = statistics[statistic_index].avg_rows_speed_watch.elapsed() / (1000 * 1000); + if (speed_not_changing_time >= average_speed_not_changing_for_ms) + { + incFulfilledCriterions(statistic_index, average_speed_not_changing_for_ms); + } + } + + if (stop_criterions[statistic_index].number_of_initialized_min + && (stop_criterions[statistic_index].fulfilled_criterions_min >= stop_criterions[statistic_index].number_of_initialized_min)) + { + /// All 'min' criterions are fulfilled + stream.cancel(); + } + + if (stop_criterions[statistic_index].number_of_initialized_max && stop_criterions[statistic_index].fulfilled_criterions_max) + { + /// Some 'max' criterions are fulfilled + stream.cancel(); + } + + if (interrupt_listener.check()) + { + gotSIGINT = true; + stream.cancel(); + } + } + + void constructSubstitutions(AbstractConfig & substitutions_view, StringToVector & substitutions) + { + Keys xml_substitutions; + substitutions_view->keys(xml_substitutions); + + for (size_t i = 0; i != xml_substitutions.size(); ++i) + { + const AbstractConfig xml_substitution(substitutions_view->createView("substitution[" + std::to_string(i) + "]")); + + /// Property values for substitution will be stored in a vector + /// accessible by property name + std::vector xml_values; + xml_substitution->keys("values", xml_values); + + String name = xml_substitution->getString("name"); + + for (size_t j = 0; j != xml_values.size(); ++j) + { + substitutions[name].push_back(xml_substitution->getString("values.value[" + std::to_string(j) + "]")); + } + } + } + + std::vector formatQueries(const String & query, StringToVector substitutions) + { + std::vector queries; + + StringToVector::iterator substitutions_first = substitutions.begin(); + StringToVector::iterator substitutions_last = substitutions.end(); + --substitutions_last; + + std::map substitutions_map; + + runThroughAllOptionsAndPush(substitutions_first, substitutions_last, query, queries, substitutions_map); + + return queries; + } + + /// Recursive method which goes through all substitution blocks in xml + /// and replaces property {names} by their values + void runThroughAllOptionsAndPush(StringToVector::iterator substitutions_left, + StringToVector::iterator substitutions_right, + const String & template_query, + std::vector & queries, + const StringKeyValue & template_substitutions_map = StringKeyValue()) + { + String name = substitutions_left->first; + std::vector values = substitutions_left->second; + + for (const String & value : values) + { + /// Copy query string for each unique permutation + Query query = template_query; + StringKeyValue substitutions_map = template_substitutions_map; + size_t substr_pos = 0; + + while (substr_pos != String::npos) + { + substr_pos = query.find("{" + name + "}"); + + if (substr_pos != String::npos) + { + query.replace(substr_pos, 1 + name.length() + 1, value); + } + } + + substitutions_map[name] = value; + + /// If we've reached the end of substitution chain + if (substitutions_left == substitutions_right) + { + queries.push_back(query); + substitutions_maps.push_back(substitutions_map); + } + else + { + StringToVector::iterator next_it = substitutions_left; + ++next_it; + + runThroughAllOptionsAndPush(next_it, substitutions_right, query, queries, substitutions_map); + } + } + } + +public: + String constructTotalInfo(Strings metrics) + { + JSONString json_output; + String hostname; + + char hostname_buffer[256]; + if (gethostname(hostname_buffer, 256) == 0) + { + hostname = String(hostname_buffer); + } + + json_output.set("hostname", hostname); + json_output.set("cpu_num", sysconf(_SC_NPROCESSORS_ONLN)); + json_output.set("test_name", test_name); + json_output.set("main_metric", main_metric); + + if (substitutions.size()) + { + JSONString json_parameters(2); /// here, 2 is the size of \t padding + + for (auto it = substitutions.begin(); it != substitutions.end(); ++it) + { + String parameter = it->first; + std::vector values = it->second; + + String array_string = "["; + for (size_t i = 0; i != values.size(); ++i) + { + array_string += '\"' + values[i] + '\"'; + if (i != values.size() - 1) + { + array_string += ", "; + } + } + array_string += ']'; + + json_parameters.set(parameter, array_string); + } + + json_output.set("parameters", json_parameters.asString()); + } + + std::vector run_infos; + for (size_t query_index = 0; query_index < queries.size(); ++query_index) + { + for (size_t number_of_launch = 0; number_of_launch < statistics.size(); ++number_of_launch) + { + if (!statistics[number_of_launch].ready) + continue; + + JSONString runJSON; + + if (substitutions_maps.size()) + { + JSONString parameters(4); + + for (auto it = substitutions_maps[query_index].begin(); it != substitutions_maps[query_index].end(); ++it) + { + parameters.set(it->first, it->second); + } + + runJSON.set("parameters", parameters.asString()); + } + + if (exec_type == ExecutionType::Loop) + { + /// in seconds + if (std::find(metrics.begin(), metrics.end(), "min_time") != metrics.end()) + runJSON.set("min_time", statistics[number_of_launch].min_time / double(1000)); + + if (std::find(metrics.begin(), metrics.end(), "quantiles") != metrics.end()) + { + JSONString quantiles(4); /// here, 4 is the size of \t padding + for (double percent = 10; percent <= 90; percent += 10) + { + String quantile_key = std::to_string(percent / 100.0); + while (quantile_key.back() == '0') + quantile_key.pop_back(); + + quantiles.set(quantile_key, statistics[number_of_launch].sampler.quantileInterpolated(percent / 100.0)); + } + quantiles.set("0.95", statistics[number_of_launch].sampler.quantileInterpolated(95 / 100.0)); + quantiles.set("0.99", statistics[number_of_launch].sampler.quantileInterpolated(99 / 100.0)); + quantiles.set("0.999", statistics[number_of_launch].sampler.quantileInterpolated(99.9 / 100.0)); + quantiles.set("0.9999", statistics[number_of_launch].sampler.quantileInterpolated(99.99 / 100.0)); + + runJSON.set("quantiles", quantiles.asString()); + } + + if (std::find(metrics.begin(), metrics.end(), "total_time") != metrics.end()) + runJSON.set("total_time", statistics[number_of_launch].total_time); + + if (std::find(metrics.begin(), metrics.end(), "queries_per_second") != metrics.end()) + runJSON.set("queries_per_second", double(statistics[number_of_launch].queries) / + statistics[number_of_launch].total_time); + + if (std::find(metrics.begin(), metrics.end(), "rows_per_second") != metrics.end()) + runJSON.set("rows_per_second", double(statistics[number_of_launch].rows_read) / + statistics[number_of_launch].total_time); + + if (std::find(metrics.begin(), metrics.end(), "bytes_per_second") != metrics.end()) + runJSON.set("bytes_per_second", double(statistics[number_of_launch].bytes_read) / + statistics[number_of_launch].total_time); + } + else + { + if (std::find(metrics.begin(), metrics.end(), "max_rows_per_second") != metrics.end()) + runJSON.set("max_rows_per_second", statistics[number_of_launch].max_rows_speed); + + if (std::find(metrics.begin(), metrics.end(), "max_bytes_per_second") != metrics.end()) + runJSON.set("max_bytes_per_second", statistics[number_of_launch].max_bytes_speed); + + if (std::find(metrics.begin(), metrics.end(), "avg_rows_per_second") != metrics.end()) + runJSON.set("avg_rows_per_second", statistics[number_of_launch].avg_rows_speed_value); + + if (std::find(metrics.begin(), metrics.end(), "avg_bytes_per_second") != metrics.end()) + runJSON.set("avg_bytes_per_second", statistics[number_of_launch].avg_bytes_speed_value); + } + + run_infos.push_back(runJSON); + } + } + + json_output.set("runs", run_infos); + + return json_output.asString(); + } + + String minOutput(const String & main_metric) + { + String output; + + for (size_t query_index = 0; query_index < queries.size(); ++query_index) + { + for (size_t number_of_launch = 0; number_of_launch < times_to_run; ++number_of_launch) + { + output += test_name + ", "; + + if (substitutions_maps.size()) + { + for (auto it = substitutions_maps[query_index].begin(); it != substitutions_maps[query_index].end(); ++it) + { + output += it->first + " = " + it->second + ", "; + } + } + + output += "run " + std::to_string(number_of_launch + 1) + ": "; + output += main_metric + " = "; + output += statistics[number_of_launch * queries.size() + query_index].getStatisticByName(main_metric); + output += "\n"; + } + } + + return output; + } +}; +} + +static void getFilesFromDir(const FS::path & dir, std::vector & input_files) +{ + if (dir.extension().string() == ".xml") + std::cerr << "Warning: \"" + dir.string() + "\" is a directory, but has .xml extension" << std::endl; + + FS::directory_iterator end; + for (FS::directory_iterator it(dir); it != end; ++it) + { + const FS::path file = (*it); + if (!FS::is_directory(file) && file.extension().string() == ".xml") + input_files.push_back(file.string()); + } +} + +int mainEntryClickhousePerformanceTest(int argc, char ** argv) +{ + using namespace DB; + + try + { + using boost::program_options::value; + using Strings = std::vector; + + boost::program_options::options_description desc("Allowed options"); + desc.add_options() + ("help", "produce help message") + ("lite", "use lite version of output") + ("profiles-file", value()->default_value(""), "Specify a file with global profiles") + ("host,h", value()->default_value("localhost"), "") + ("port", value()->default_value(9000), "") + ("database", value()->default_value("default"), "") + ("user", value()->default_value("default"), "") + ("password", value()->default_value(""), "") + ("tags", value()->multitoken(), "Run only tests with tag") + ("skip-tags", value()->multitoken(), "Do not run tests with tag") + ("names", value()->multitoken(), "Run tests with specific name") + ("skip-names", value()->multitoken(), "Do not run tests with name") + ("names-regexp", value()->multitoken(), "Run tests with names matching regexp") + ("skip-names-regexp", value()->multitoken(), "Do not run tests with names matching regexp"); + + /// These options will not be displayed in --help + boost::program_options::options_description hidden("Hidden options"); + hidden.add_options()("input-files", value>(), ""); + + /// But they will be legit, though. And they must be given without name + boost::program_options::positional_options_description positional; + positional.add("input-files", -1); + + boost::program_options::options_description cmdline_options; + cmdline_options.add(desc).add(hidden); + + boost::program_options::variables_map options; + boost::program_options::store( + boost::program_options::command_line_parser(argc, argv).options(cmdline_options).positional(positional).run(), options); + boost::program_options::notify(options); + + if (options.count("help")) + { + std::cout << "Usage: " << argv[0] << " [options] [test_file ...] [tests_folder]\n"; + std::cout << desc << "\n"; + return 0; + } + + Strings input_files; + + if (!options.count("input-files")) + { + std::cerr << "Trying to find tests in current folder" << std::endl; + FS::path curr_dir("."); + + getFilesFromDir(curr_dir, input_files); + + if (input_files.empty()) + throw DB::Exception("Did not find any xml files", 1); + } + else + { + input_files = options["input-files"].as(); + + for (const String filename : input_files) + { + FS::path file(filename); + + if (!FS::exists(file)) + throw DB::Exception("File \"" + filename + "\" does not exist", 1); + + if (FS::is_directory(file)) + { + input_files.erase( std::remove(input_files.begin(), input_files.end(), filename) , input_files.end() ); + getFilesFromDir(file, input_files); + } + else + { + if (file.extension().string() != ".xml") + throw DB::Exception("File \"" + filename + "\" does not have .xml extension", 1); + } + } + } + + Strings tests_tags = options.count("tags") + ? options["tags"].as() + : Strings({}); + Strings skip_tags = options.count("skip-tags") + ? options["skip-tags"].as() + : Strings({}); + Strings tests_names = options.count("names") + ? options["names"].as() + : Strings({}); + Strings skip_names = options.count("skip-names") + ? options["skip-names"].as() + : Strings({}); + Strings tests_names_regexp = options.count("names-regexp") + ? options["names-regexp"].as() + : Strings({}); + Strings skip_names_regexp = options.count("skip-names-regexp") + ? options["skip-names-regexp"].as() + : Strings({}); + + PerformanceTest performanceTest( + options["host"].as(), + options["port"].as(), + options["database"].as(), + options["user"].as(), + options["password"].as(), + options.count("lite") > 0, + options["profiles-file"].as(), + std::move(input_files), + std::move(tests_tags), + std::move(skip_tags), + std::move(tests_names), + std::move(skip_names), + std::move(tests_names_regexp), + std::move(skip_names_regexp) + ); + } + catch (...) + { + std::cout << getCurrentExceptionMessage(/*with stacktrace = */true) << std::endl; + return getCurrentExceptionCode(); + } + + return 0; +} diff --git a/dbms/src/Common/Exception.cpp b/dbms/src/Common/Exception.cpp index 2a94cb0b3b3..3a50dc19ea4 100644 --- a/dbms/src/Common/Exception.cpp +++ b/dbms/src/Common/Exception.cpp @@ -7,6 +7,8 @@ #include #include +#include +#include #include @@ -83,28 +85,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded } catch (const Exception & e) { - try - { - std::string text = e.displayText(); - - bool has_embedded_stack_trace = false; - if (check_embedded_stacktrace) - { - auto embedded_stack_trace_pos = text.find("Stack trace"); - has_embedded_stack_trace = embedded_stack_trace_pos != std::string::npos; - if (!with_stacktrace && has_embedded_stack_trace) - { - text.resize(embedded_stack_trace_pos); - Poco::trimRightInPlace(text); - } - } - - stream << "Code: " << e.code() << ", e.displayText() = " << text << ", e.what() = " << e.what(); - - if (with_stacktrace && !has_embedded_stack_trace) - stream << ", Stack trace:\n\n" << e.getStackTrace().toString(); - } - catch (...) {} + stream << getExceptionMessage(e, with_stacktrace, check_embedded_stacktrace); } catch (const Poco::Exception & e) { @@ -230,6 +211,36 @@ void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::str } } +std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace) +{ + std::stringstream stream; + + try + { + std::string text = e.displayText(); + + bool has_embedded_stack_trace = false; + if (check_embedded_stacktrace) + { + auto embedded_stack_trace_pos = text.find("Stack trace"); + has_embedded_stack_trace = embedded_stack_trace_pos != std::string::npos; + if (!with_stacktrace && has_embedded_stack_trace) + { + text.resize(embedded_stack_trace_pos); + Poco::trimRightInPlace(text); + } + } + + stream << "Code: " << e.code() << ", e.displayText() = " << text << ", e.what() = " << e.what(); + + if (with_stacktrace && !has_embedded_stack_trace) + stream << ", Stack trace:\n\n" << e.getStackTrace().toString(); + } + catch (...) {} + + return stream.str(); +} + std::string getExceptionMessage(std::exception_ptr e, bool with_stacktrace) { try @@ -243,4 +254,26 @@ std::string getExceptionMessage(std::exception_ptr e, bool with_stacktrace) } +std::string ExecutionStatus::serializeText() const +{ + std::string res; + { + WriteBufferFromString wb(res); + wb << code << "\n" << escape << message; + } + return res; +} + +void ExecutionStatus::deserializeText(const std::string & data) +{ + ReadBufferFromString rb(data); + rb >> code >> "\n" >> escape >> message; +} + +ExecutionStatus ExecutionStatus::fromCurrentException(const std::string & start_of_message) +{ + return ExecutionStatus(getCurrentExceptionCode(), start_of_message + ": " + getCurrentExceptionMessage(false, true)); +} + + } diff --git a/dbms/src/Common/Exception.h b/dbms/src/Common/Exception.h index 05a40479308..4465a407ddc 100644 --- a/dbms/src/Common/Exception.h +++ b/dbms/src/Common/Exception.h @@ -79,6 +79,7 @@ void throwFromErrno(const std::string & s, int code = 0, int the_errno = errno); void tryLogCurrentException(const char * log_name, const std::string & start_of_message = ""); void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_message = ""); + /** Prints current exception in canonical format. * with_stacktrace - prints stack trace for DB::Exception. * check_embedded_stacktrace - if DB::Exception has embedded stacktrace then @@ -89,9 +90,30 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded /// Returns error code from ErrorCodes int getCurrentExceptionCode(); + +/// An execution status of any piece of code, contains return code and optional error +struct ExecutionStatus +{ + int code = 0; + std::string message; + + ExecutionStatus() = default; + + explicit ExecutionStatus(int return_code, const std::string & exception_message = "") + : code(return_code), message(exception_message) {} + + static ExecutionStatus fromCurrentException(const std::string & start_of_message = ""); + + std::string serializeText() const; + + void deserializeText(const std::string & data); +}; + + void tryLogException(std::exception_ptr e, const char * log_name, const std::string & start_of_message = ""); void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::string & start_of_message = ""); +std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace = false); std::string getExceptionMessage(std::exception_ptr e, bool with_stacktrace); diff --git a/dbms/src/Common/PoolWithFailoverBase.h b/dbms/src/Common/PoolWithFailoverBase.h index 43a2d728c4d..34d74f50354 100644 --- a/dbms/src/Common/PoolWithFailoverBase.h +++ b/dbms/src/Common/PoolWithFailoverBase.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Common/formatIPv6.cpp b/dbms/src/Common/formatIPv6.cpp new file mode 100644 index 00000000000..54617b6aa4d --- /dev/null +++ b/dbms/src/Common/formatIPv6.cpp @@ -0,0 +1,125 @@ +#include +#include +#include +#include + + +namespace DB +{ + +/// integer logarithm, return ceil(log(value, base)) (the smallest integer greater or equal than log(value, base) +static constexpr uint32_t int_log(const uint32_t value, const uint32_t base, const bool carry = false) +{ + return value >= base ? 1 + int_log(value / base, base, value % base || carry) : value % base > 1 || carry; +} + +/// print integer in desired base, faster than sprintf +template +static void print_integer(char *& out, T value) +{ + if (value == 0) + *out++ = '0'; + else + { + char buf[buffer_size]; + auto ptr = buf; + + while (value > 0) + { + *ptr++ = hexLowercase(value % base); + value /= base; + } + + while (ptr != buf) + *out++ = *--ptr; + } +} + +/// print IPv4 address as %u.%u.%u.%u +static void formatIPv4(const unsigned char * src, char *& dst, UInt8 zeroed_tail_bytes_count) +{ + const auto limit = IPV4_BINARY_LENGTH - zeroed_tail_bytes_count; + + for (const auto i : ext::range(0, IPV4_BINARY_LENGTH)) + { + UInt8 byte = (i < limit) ? src[i] : 0; + print_integer<10, UInt8>(dst, byte); + + if (i != IPV4_BINARY_LENGTH - 1) + *dst++ = '.'; + } +} + + +void formatIPv6(const unsigned char * src, char *& dst, UInt8 zeroed_tail_bytes_count) +{ + struct { int base, len; } best{-1}, cur{-1}; + std::array words{}; + + /** Preprocess: + * Copy the input (bytewise) array into a wordwise array. + * Find the longest run of 0x00's in src[] for :: shorthanding. */ + for (const auto i : ext::range(0, IPV6_BINARY_LENGTH - zeroed_tail_bytes_count)) + words[i / 2] |= src[i] << ((1 - (i % 2)) << 3); + + for (const auto i : ext::range(0, words.size())) + { + if (words[i] == 0) { + if (cur.base == -1) + cur.base = i, cur.len = 1; + else + cur.len++; + } + else + { + if (cur.base != -1) + { + if (best.base == -1 || cur.len > best.len) + best = cur; + cur.base = -1; + } + } + } + + if (cur.base != -1) + { + if (best.base == -1 || cur.len > best.len) + best = cur; + } + + if (best.base != -1 && best.len < 2) + best.base = -1; + + /// Format the result. + for (const int i : ext::range(0, words.size())) + { + /// Are we inside the best run of 0x00's? + if (best.base != -1 && i >= best.base && i < (best.base + best.len)) + { + if (i == best.base) + *dst++ = ':'; + continue; + } + + /// Are we following an initial run of 0x00s or any real hex? + if (i != 0) + *dst++ = ':'; + + /// Is this address an encapsulated IPv4? + if (i == 6 && best.base == 0 && (best.len == 6 || (best.len == 5 && words[5] == 0xffffu))) + { + formatIPv4(src + 12, dst, std::min(zeroed_tail_bytes_count, static_cast(IPV4_BINARY_LENGTH))); + break; + } + + print_integer<16>(dst, words[i]); + } + + /// Was it a trailing run of 0x00's? + if (best.base != -1 && (best.base + best.len) == words.size()) + *dst++ = ':'; + + *dst++ = '\0'; +} + +} diff --git a/dbms/src/Common/formatIPv6.h b/dbms/src/Common/formatIPv6.h new file mode 100644 index 00000000000..6baf5544f02 --- /dev/null +++ b/dbms/src/Common/formatIPv6.h @@ -0,0 +1,21 @@ +#pragma once + +#include + +#define IPV4_BINARY_LENGTH 4 +#define IPV6_BINARY_LENGTH 16 +#define IPV4_MAX_TEXT_LENGTH 15 /// Does not count tail zero byte. +#define IPV6_MAX_TEXT_LENGTH 39 + + +namespace DB +{ + + +/** Rewritten inet_ntop6 from http://svn.apache.org/repos/asf/apr/apr/trunk/network_io/unix/inet_pton.c + * performs significantly faster than the reference implementation due to the absence of sprintf calls, + * bounds checking, unnecessary string copying and length calculation. + */ +void formatIPv6(const unsigned char * src, char *& dst, UInt8 zeroed_tail_bytes_count = 0); + +} diff --git a/dbms/src/Core/ColumnWithTypeAndName.h b/dbms/src/Core/ColumnWithTypeAndName.h index 88c6fffadb6..09da42ccf30 100644 --- a/dbms/src/Core/ColumnWithTypeAndName.h +++ b/dbms/src/Core/ColumnWithTypeAndName.h @@ -22,6 +22,10 @@ struct ColumnWithTypeAndName ColumnWithTypeAndName(const ColumnPtr & column_, const DataTypePtr & type_, const String name_) : column(column_), type(type_), name(name_) {} + /// Uses type->createColumn() to create column + ColumnWithTypeAndName(const DataTypePtr & type_, const String name_) + : column(type_->createColumn()), type(type_), name(name_) {} + ColumnWithTypeAndName cloneEmpty() const; bool operator==(const ColumnWithTypeAndName & other) const; String prettyPrint() const; diff --git a/dbms/src/Core/ErrorCodes.cpp b/dbms/src/Core/ErrorCodes.cpp index b86955af26c..0d097792922 100644 --- a/dbms/src/Core/ErrorCodes.cpp +++ b/dbms/src/Core/ErrorCodes.cpp @@ -373,9 +373,10 @@ namespace ErrorCodes extern const int BAD_CAST = 368; extern const int ALL_REPLICAS_ARE_STALE = 369; extern const int DATA_TYPE_CANNOT_BE_USED_IN_TABLES = 370; - extern const int SESSION_NOT_FOUND = 371; - extern const int SESSION_IS_LOCKED = 372; - extern const int INVALID_SESSION_TIMEOUT = 373; + extern const int INCONSISTENT_CLUSTER_DEFINITION = 371; + extern const int SESSION_NOT_FOUND = 372; + extern const int SESSION_IS_LOCKED = 373; + extern const int INVALID_SESSION_TIMEOUT = 374; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Core/toField.h b/dbms/src/Core/toField.h index b11dbdcae24..477a22ccdf3 100644 --- a/dbms/src/Core/toField.h +++ b/dbms/src/Core/toField.h @@ -1,6 +1,5 @@ #pragma once -#include #include #include #include diff --git a/dbms/src/DataStreams/BlockIO.h b/dbms/src/DataStreams/BlockIO.h index 0d704993ea3..5ac609f8a6f 100644 --- a/dbms/src/DataStreams/BlockIO.h +++ b/dbms/src/DataStreams/BlockIO.h @@ -25,7 +25,7 @@ struct BlockIO Block out_sample; /// Example of a block to be written to `out`. /// Callbacks for query logging could be set here. - std::function finish_callback; + std::function finish_callback; std::function exception_callback; /// Call these functions if you want to log the request. @@ -44,18 +44,18 @@ struct BlockIO BlockIO & operator= (const BlockIO & rhs) { /// We provide the correct order of destruction. - out = nullptr; - in = nullptr; - process_list_entry = nullptr; + out = nullptr; + in = nullptr; + process_list_entry = nullptr; - process_list_entry = rhs.process_list_entry; - in = rhs.in; - out = rhs.out; - in_sample = rhs.in_sample; - out_sample = rhs.out_sample; + process_list_entry = rhs.process_list_entry; + in = rhs.in; + out = rhs.out; + in_sample = rhs.in_sample; + out_sample = rhs.out_sample; - finish_callback = rhs.finish_callback; - exception_callback = rhs.exception_callback; + finish_callback = rhs.finish_callback; + exception_callback = rhs.exception_callback; return *this; } diff --git a/dbms/src/DataStreams/PrettyBlockOutputStream.cpp b/dbms/src/DataStreams/PrettyBlockOutputStream.cpp index 3c1ac93d4fa..88b06f31b55 100644 --- a/dbms/src/DataStreams/PrettyBlockOutputStream.cpp +++ b/dbms/src/DataStreams/PrettyBlockOutputStream.cpp @@ -15,6 +15,12 @@ namespace DB { +namespace ErrorCodes +{ + extern const int ILLEGAL_COLUMN; +} + + PrettyBlockOutputStream::PrettyBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_) : ostr(ostr_), max_rows(max_rows_), no_escapes(no_escapes_), context(context_) { diff --git a/dbms/src/DataTypes/DataTypeNullable.cpp b/dbms/src/DataTypes/DataTypeNullable.cpp index 194c3a55e1b..ccce484d10a 100644 --- a/dbms/src/DataTypes/DataTypeNullable.cpp +++ b/dbms/src/DataTypes/DataTypeNullable.cpp @@ -101,7 +101,7 @@ void DataTypeNullable::deserializeTextEscaped(IColumn & column, ReadBuffer & ist if (*istr.position() != '\\') { safeDeserialize(column, - [&istr] { return false; }, + [] { return false; }, [this, &istr] (IColumn & nested) { nested_data_type->deserializeTextEscaped(nested, istr); } ); } else diff --git a/dbms/src/Functions/DataTypeTraits.cpp b/dbms/src/DataTypes/DataTypeTraits.cpp similarity index 89% rename from dbms/src/Functions/DataTypeTraits.cpp rename to dbms/src/DataTypes/DataTypeTraits.cpp index f22a49e270a..fd1b179fb35 100644 --- a/dbms/src/Functions/DataTypeTraits.cpp +++ b/dbms/src/DataTypes/DataTypeTraits.cpp @@ -1,4 +1,4 @@ -#include +#include namespace DB { namespace DataTypeTraits { diff --git a/dbms/src/Functions/DataTypeTraits.h b/dbms/src/DataTypes/DataTypeTraits.h similarity index 99% rename from dbms/src/Functions/DataTypeTraits.h rename to dbms/src/DataTypes/DataTypeTraits.h index 5c12d57fa44..d083e67793d 100644 --- a/dbms/src/Functions/DataTypeTraits.h +++ b/dbms/src/DataTypes/DataTypeTraits.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include #include diff --git a/dbms/src/Functions/NumberTraits.h b/dbms/src/DataTypes/NumberTraits.h similarity index 100% rename from dbms/src/Functions/NumberTraits.h rename to dbms/src/DataTypes/NumberTraits.h diff --git a/dbms/src/Dictionaries/CacheDictionary.cpp b/dbms/src/Dictionaries/CacheDictionary.cpp index 42c8919a5b6..3a5c245e8dc 100644 --- a/dbms/src/Dictionaries/CacheDictionary.cpp +++ b/dbms/src/Dictionaries/CacheDictionary.cpp @@ -1,6 +1,9 @@ #include +#include +#include #include -#include +#include +#include #include #include #include @@ -8,6 +11,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -418,11 +424,11 @@ void CacheDictionary::has(const PaddedPODArray & ids, PaddedPODArray void CacheDictionary::createAttributes() { - const auto size = dict_struct.attributes.size(); - attributes.reserve(size); + const auto attributes_size = dict_struct.attributes.size(); + attributes.reserve(attributes_size); bytes_allocated += size * sizeof(CellMetadata); - bytes_allocated += size * sizeof(attributes.front()); + bytes_allocated += attributes_size * sizeof(attributes.front()); for (const auto & attribute : dict_struct.attributes) { @@ -957,4 +963,33 @@ CacheDictionary::Attribute & CacheDictionary::getAttribute(const std::string & a return attributes[it->second]; } +bool CacheDictionary::isEmptyCell(const UInt64 idx) const +{ + return (idx != zero_cell_idx && cells[idx].id == 0) || (cells[idx].data + == ext::safe_bit_cast(CellMetadata::time_point_t())); +} + +PaddedPODArray CacheDictionary::getCachedIds() const +{ + const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; + + PaddedPODArray array; + for (size_t idx = 0; idx < cells.size(); ++idx) + { + auto & cell = cells[idx]; + if (!isEmptyCell(idx) && !cells[idx].isDefault()) + { + array.push_back(cell.id); + } + } + return array; +} + +BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const +{ + using BlockInputStreamType = DictionaryBlockInputStream; + return std::make_shared(shared_from_this(), max_block_size, getCachedIds(), column_names); +} + + } diff --git a/dbms/src/Dictionaries/CacheDictionary.h b/dbms/src/Dictionaries/CacheDictionary.h index 13bf196fc35..8cc258eeedb 100644 --- a/dbms/src/Dictionaries/CacheDictionary.h +++ b/dbms/src/Dictionaries/CacheDictionary.h @@ -137,6 +137,8 @@ public: void has(const PaddedPODArray & ids, PaddedPODArray & out) const override; + BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override; + private: template using ContainerType = Value[]; template using ContainerPtrType = std::unique_ptr>; @@ -208,6 +210,10 @@ private: const std::vector & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const; + PaddedPODArray getCachedIds() const; + + bool isEmptyCell(const UInt64 idx) const; + UInt64 getCellIdx(const Key id) const; void setDefaultAttributeValue(Attribute & attribute, const Key idx) const; diff --git a/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp b/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp index f235be8107b..cc69d5ff93a 100644 --- a/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp +++ b/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -265,7 +266,7 @@ void ComplexKeyCacheDictionary::has(const Columns & key_columns, const DataTypes /// fetch up-to-date values, decide which ones require update for (const auto row : ext::range(0, rows_num)) { - const StringRef key = placeKeysInPool(row, key_columns, keys, temporary_keys_pool); + const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); keys_array[row] = key; const auto find_result = findCellIdx(key, now); const auto & cell_idx = find_result.cell_idx; @@ -320,11 +321,11 @@ void ComplexKeyCacheDictionary::has(const Columns & key_columns, const DataTypes void ComplexKeyCacheDictionary::createAttributes() { - const auto size = dict_struct.attributes.size(); - attributes.reserve(size); + const auto attributes_size = dict_struct.attributes.size(); + attributes.reserve(attributes_size); bytes_allocated += size * sizeof(CellMetadata); - bytes_allocated += size * sizeof(attributes.front()); + bytes_allocated += attributes_size * sizeof(attributes.front()); for (const auto & attribute : dict_struct.attributes) { @@ -457,7 +458,7 @@ void ComplexKeyCacheDictionary::getItemsNumberImpl( /// fetch up-to-date values, decide which ones require update for (const auto row : ext::range(0, rows_num)) { - const StringRef key = placeKeysInPool(row, key_columns, keys, temporary_keys_pool); + const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); keys_array[row] = key; const auto find_result = findCellIdx(key, now); @@ -536,7 +537,7 @@ void ComplexKeyCacheDictionary::getItemsString( /// fetch up-to-date values, discard on fail for (const auto row : ext::range(0, rows_num)) { - const StringRef key = placeKeysInPool(row, key_columns, keys, temporary_keys_pool); + const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); SCOPE_EXIT(temporary_keys_pool.rollback(key.size)); const auto find_result = findCellIdx(key, now); @@ -581,7 +582,7 @@ void ComplexKeyCacheDictionary::getItemsString( const auto now = std::chrono::system_clock::now(); for (const auto row : ext::range(0, rows_num)) { - const StringRef key = placeKeysInPool(row, key_columns, keys, temporary_keys_pool); + const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); keys_array[row] = key; const auto find_result = findCellIdx(key, now); @@ -899,7 +900,7 @@ StringRef ComplexKeyCacheDictionary::allocKey(const size_t row, const Columns & if (key_size_is_fixed) return placeKeysInFixedSizePool(row, key_columns); - return placeKeysInPool(row, key_columns, keys, *keys_pool); + return placeKeysInPool(row, key_columns, keys, *dict_struct.key, *keys_pool); } void ComplexKeyCacheDictionary::freeKey(const StringRef key) const @@ -910,28 +911,49 @@ void ComplexKeyCacheDictionary::freeKey(const StringRef key) const keys_pool->free(const_cast(key.data), key.size); } -template +template StringRef ComplexKeyCacheDictionary::placeKeysInPool( - const size_t row, const Columns & key_columns, StringRefs & keys, Arena & pool) + const size_t row, const Columns & key_columns, StringRefs & keys, + const std::vector & key_attributes, Pool & pool) { const auto keys_size = key_columns.size(); size_t sum_keys_size{}; - for (const auto i : ext::range(0, keys_size)) - { - keys[i] = key_columns[i]->getDataAtWithTerminatingZero(row); - sum_keys_size += keys[i].size; - } - - const auto res = pool.alloc(sum_keys_size); - auto place = res; for (size_t j = 0; j < keys_size; ++j) { - memcpy(place, keys[j].data, keys[j].size); - place += keys[j].size; + keys[j] = key_columns[j]->getDataAt(row); + sum_keys_size += keys[j].size; + if (key_attributes[j].underlying_type == AttributeUnderlyingType::String) + sum_keys_size += sizeof(size_t) + 1; } - return { res, sum_keys_size }; + auto place = pool.alloc(sum_keys_size); + + auto key_start = place; + for (size_t j = 0; j < keys_size; ++j) + { + if (key_attributes[j].underlying_type == AttributeUnderlyingType::String) + { + auto start = key_start; + auto key_size = keys[j].size + 1; + memcpy(key_start, &key_size, sizeof(size_t)); + key_start += sizeof(size_t); + memcpy(key_start, keys[j].data, keys[j].size); + key_start += keys[j].size; + *key_start = '\0'; + ++key_start; + keys[j].data = start; + keys[j].size += sizeof(size_t) + 1; + } + else + { + memcpy(key_start, keys[j].data, keys[j].size); + keys[j].data = key_start; + key_start += keys[j].size; + } + } + + return { place, sum_keys_size }; } StringRef ComplexKeyCacheDictionary::placeKeysInFixedSizePool( @@ -965,4 +987,26 @@ StringRef ComplexKeyCacheDictionary::copyKey(const StringRef key) const return { res, key.size }; } +bool ComplexKeyCacheDictionary::isEmptyCell(const UInt64 idx) const +{ + return (cells[idx].key == StringRef{} && (idx != zero_cell_idx + || cells[idx].data == ext::safe_bit_cast(CellMetadata::time_point_t()))); +} + +BlockInputStreamPtr ComplexKeyCacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const +{ + std::vector keys; + { + const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; + + for (auto idx : ext::range(0, cells.size())) + if (!isEmptyCell(idx) + && !cells[idx].isDefault()) + keys.push_back(cells[idx].key); + } + + using BlockInputStreamType = DictionaryBlockInputStream; + return std::make_shared(shared_from_this(), max_block_size, keys, column_names); +} + } diff --git a/dbms/src/Dictionaries/ComplexKeyCacheDictionary.h b/dbms/src/Dictionaries/ComplexKeyCacheDictionary.h index ad7b3a3e33a..f4f1c9a5c9b 100644 --- a/dbms/src/Dictionaries/ComplexKeyCacheDictionary.h +++ b/dbms/src/Dictionaries/ComplexKeyCacheDictionary.h @@ -147,6 +147,8 @@ public: void has(const Columns & key_columns, const DataTypes & key_types, PaddedPODArray & out) const; + BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override; + private: template using MapType = HashMapWithSavedHash; template using ContainerType = Value[]; @@ -233,7 +235,8 @@ private: template static StringRef placeKeysInPool( - const std::size_t row, const Columns & key_columns, StringRefs & keys, Arena & pool); + const std::size_t row, const Columns & key_columns, StringRefs & keys, + const std::vector & key_attributes, Arena & pool); StringRef placeKeysInFixedSizePool( const std::size_t row, const Columns & key_columns) const; @@ -255,6 +258,8 @@ private: return findCellIdx(key, now, hash); }; + bool isEmptyCell(const UInt64 idx) const; + const std::string name; const DictionaryStructure dict_struct; const DictionarySourcePtr source_ptr; diff --git a/dbms/src/Dictionaries/ComplexKeyHashedDictionary.cpp b/dbms/src/Dictionaries/ComplexKeyHashedDictionary.cpp index ea522f2d743..c4acabea515 100644 --- a/dbms/src/Dictionaries/ComplexKeyHashedDictionary.cpp +++ b/dbms/src/Dictionaries/ComplexKeyHashedDictionary.cpp @@ -1,6 +1,7 @@ #include #include #include +#include namespace DB @@ -460,22 +461,22 @@ StringRef ComplexKeyHashedDictionary::placeKeysInPool( { const auto keys_size = key_columns.size(); size_t sum_keys_size{}; - for (const auto i : ext::range(0, keys_size)) - { - keys[i] = key_columns[i]->getDataAtWithTerminatingZero(row); - sum_keys_size += keys[i].size; - } - - const auto res = pool.alloc(sum_keys_size); - auto place = res; + const char * block_start = nullptr; for (size_t j = 0; j < keys_size; ++j) { - memcpy(place, keys[j].data, keys[j].size); - place += keys[j].size; + keys[j] = key_columns[j]->serializeValueIntoArena(row, pool, block_start); + sum_keys_size += keys[j].size; } - return { res, sum_keys_size }; + auto key_start = block_start; + for (size_t j = 0; j < keys_size; ++j) + { + keys[j].data = key_start; + key_start += keys[j].size; + } + + return { block_start, sum_keys_size }; } template @@ -502,4 +503,44 @@ void ComplexKeyHashedDictionary::has(const Attribute & attribute, const Columns query_count.fetch_add(rows, std::memory_order_relaxed); } +std::vector ComplexKeyHashedDictionary::getKeys() const +{ + const Attribute & attribute = attributes.front(); + + switch (attribute.type) + { + case AttributeUnderlyingType::UInt8: return getKeys(attribute); break; + case AttributeUnderlyingType::UInt16: return getKeys(attribute); break; + case AttributeUnderlyingType::UInt32: return getKeys(attribute); break; + case AttributeUnderlyingType::UInt64: return getKeys(attribute); break; + case AttributeUnderlyingType::Int8: return getKeys(attribute); break; + case AttributeUnderlyingType::Int16: return getKeys(attribute); break; + case AttributeUnderlyingType::Int32: return getKeys(attribute); break; + case AttributeUnderlyingType::Int64: return getKeys(attribute); break; + case AttributeUnderlyingType::Float32: return getKeys(attribute); break; + case AttributeUnderlyingType::Float64: return getKeys(attribute); break; + case AttributeUnderlyingType::String: return getKeys(attribute); break; + } + return {}; +} + +template +std::vector ComplexKeyHashedDictionary::getKeys(const Attribute & attribute) const +{ + const ContainerType & attr = *std::get>(attribute.maps); + std::vector keys; + keys.reserve(attr.size()); + for (const auto & key : attr) + keys.push_back(key.first); + + return keys; +} + +BlockInputStreamPtr ComplexKeyHashedDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const +{ + using BlockInputStreamType = DictionaryBlockInputStream; + return std::make_shared(shared_from_this(), max_block_size, getKeys(), column_names); +} + + } diff --git a/dbms/src/Dictionaries/ComplexKeyHashedDictionary.h b/dbms/src/Dictionaries/ComplexKeyHashedDictionary.h index 493897766c4..b269810df18 100644 --- a/dbms/src/Dictionaries/ComplexKeyHashedDictionary.h +++ b/dbms/src/Dictionaries/ComplexKeyHashedDictionary.h @@ -16,6 +16,7 @@ namespace DB { + class ComplexKeyHashedDictionary final : public IDictionaryBase { public: @@ -125,6 +126,8 @@ public: void has(const Columns & key_columns, const DataTypes & key_types, PaddedPODArray & out) const; + BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override; + private: template using ContainerType = HashMapWithSavedHash; template using ContainerPtrType = std::unique_ptr>; @@ -188,6 +191,11 @@ private: template void has(const Attribute & attribute, const Columns & key_columns, PaddedPODArray & out) const; + std::vector getKeys() const; + + template + std::vector getKeys(const Attribute & attribute) const; + const std::string name; const DictionaryStructure dict_struct; const DictionarySourcePtr source_ptr; diff --git a/dbms/src/Dictionaries/DictionaryBlockInputStream.h b/dbms/src/Dictionaries/DictionaryBlockInputStream.h new file mode 100644 index 00000000000..1d80b32c65a --- /dev/null +++ b/dbms/src/Dictionaries/DictionaryBlockInputStream.h @@ -0,0 +1,417 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +/* + * BlockInputStream implementation for external dictionaries + * read() returns single block consisting of the in-memory contents of the dictionaries + */ +template +class DictionaryBlockInputStream : public DictionaryBlockInputStreamBase +{ +public: + using DictionatyPtr = std::shared_ptr; + + DictionaryBlockInputStream(std::shared_ptr dictionary, size_t max_block_size, + PaddedPODArray && ids, const Names & column_names); + DictionaryBlockInputStream(std::shared_ptr dictionary, size_t max_block_size, + const std::vector & keys, const Names & column_names); + + using GetColumnsFunction = + std::function& attributes)>; + // Used to separate key columns format for storage and view. + // Calls get_key_columns_function to get key column for dictionary get fuction call + // and get_view_columns_function to get key representation. + // Now used in trie dictionary, where columns are stored as ip and mask, and are showed as string + DictionaryBlockInputStream(std::shared_ptr dictionary, size_t max_block_size, + const Columns & data_columns, const Names & column_names, + GetColumnsFunction && get_key_columns_function, + GetColumnsFunction && get_view_columns_function); + + String getName() const override { + return "DictionaryBlockInputStream"; + } + +protected: + Block getBlock(size_t start, size_t size) const override; + +private: + // pointer types to getXXX functions + // for single key dictionaries + template + using DictionaryGetter = void (DictionaryType::*)( + const std::string &, const PaddedPODArray &, PaddedPODArray &) const; + using DictionaryStringGetter = void (DictionaryType::*)( + const std::string &, const PaddedPODArray &, ColumnString *) const; + // for complex complex key dictionaries + template + using GetterByKey = void (DictionaryType::*)( + const std::string &, const Columns &, const DataTypes &, PaddedPODArray & out) const; + using StringGetterByKey = void (DictionaryType::*)( + const std::string &, const Columns &, const DataTypes &, ColumnString * out) const; + + // call getXXX + // for single key dictionaries + template + void callGetter(DictionaryGetter getter, const PaddedPODArray & ids, + const Columns & keys, const DataTypes & data_types, + Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const; + template + void callGetter(DictionaryStringGetter getter, const PaddedPODArray & ids, + const Columns & keys, const DataTypes & data_types, + Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const; + // for complex complex key dictionaries + template + void callGetter(GetterByKey getter, const PaddedPODArray & ids, + const Columns & keys, const DataTypes & data_types, + Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const; + template + void callGetter(StringGetterByKey getter, const PaddedPODArray & ids, + const Columns & keys, const DataTypes & data_types, + Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const; + + template