diff --git a/docs/_includes/install/deb.sh b/docs/_includes/install/deb.sh index 9dceef4c245..0daf12a132f 100644 --- a/docs/_includes/install/deb.sh +++ b/docs/_includes/install/deb.sh @@ -1,11 +1,11 @@ -sudo apt-get install apt-transport-https ca-certificates dirmngr +sudo apt-get install -y apt-transport-https ca-certificates dirmngr sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754 -echo "deb https://packages.clickhouse.com/deb stable main/" | sudo tee \ +echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee \ /etc/apt/sources.list.d/clickhouse.list sudo apt-get update sudo apt-get install -y clickhouse-server clickhouse-client sudo service clickhouse-server start -clickhouse-client # or "clickhouse-client --password" if you set up a password. +clickhouse-client # or "clickhouse-client --password" if you've set up a password. diff --git a/docs/tools/webpack.config.js b/docs/tools/webpack.config.js index fcb3e7bf32d..e0dea964101 100644 --- a/docs/tools/webpack.config.js +++ b/docs/tools/webpack.config.js @@ -14,7 +14,6 @@ module.exports = { entry: [ path.resolve(scssPath, 'bootstrap.scss'), - path.resolve(scssPath, 'greenhouse.scss'), path.resolve(scssPath, 'main.scss'), path.resolve(jsPath, 'main.js'), ], diff --git a/docs/tools/website.py b/docs/tools/website.py index 11772fe7a73..de4cc14670c 100644 --- a/docs/tools/website.py +++ b/docs/tools/website.py @@ -151,6 +151,11 @@ def build_website(args): ) ) + shutil.copytree( + os.path.join(args.website_dir, 'images'), + os.path.join(args.output_dir, 'docs', 'images') + ) + # This file can be requested to check for available ClickHouse releases. shutil.copy2( os.path.join(args.src_dir, 'utils', 'list-versions', 'version_date.tsv'), @@ -231,28 +236,31 @@ def minify_file(path, css_digest, js_digest): def minify_website(args): - # Output greenhouse css separately from main bundle to be included via the greenhouse iframe - command = f"cat '{args.website_dir}/css/greenhouse.css' > '{args.output_dir}/css/greenhouse.css'" - logging.info(command) - output = subprocess.check_output(command, shell=True) - logging.debug(output) - css_in = ' '.join(get_css_in(args)) - css_out = f'{args.output_dir}/css/base.css' - if args.minify: + css_out = f'{args.output_dir}/docs/css/base.css' + os.makedirs(f'{args.output_dir}/docs/css') + + if args.minify and False: # TODO: return closure command = f"purifycss -w '*algolia*' --min {css_in} '{args.output_dir}/*.html' " \ f"'{args.output_dir}/docs/en/**/*.html' '{args.website_dir}/js/**/*.js' > {css_out}" - else: - command = f'cat {css_in} > {css_out}' + logging.info(css_in) + logging.info(command) + output = subprocess.check_output(command, shell=True) + logging.debug(output) + + else: + command = f"cat {css_in}" + output = subprocess.check_output(command, shell=True) + with open(css_out, 'wb+') as f: + f.write(output) - logging.info(command) - output = subprocess.check_output(command, shell=True) - logging.debug(output) with open(css_out, 'rb') as f: css_digest = hashlib.sha3_224(f.read()).hexdigest()[0:8] - js_in = get_js_in(args) - js_out = f'{args.output_dir}/js/base.js' + js_in = ' '.join(get_js_in(args)) + js_out = f'{args.output_dir}/docs/js/base.js' + os.makedirs(f'{args.output_dir}/docs/js') + if args.minify and False: # TODO: return closure js_in = [js[1:-1] for js in js_in] closure_args = [ @@ -271,11 +279,11 @@ def minify_website(args): f.write(js_content) else: - js_in = ' '.join(js_in) - command = f'cat {js_in} > {js_out}' - logging.info(command) + command = f"cat {js_in}" output = subprocess.check_output(command, shell=True) - logging.debug(output) + with open(js_out, 'wb+') as f: + f.write(output) + with open(js_out, 'rb') as f: js_digest = hashlib.sha3_224(f.read()).hexdigest()[0:8] logging.info(js_digest) diff --git a/docs/zh/sql-reference/statements/alter/settings-profile.md b/docs/zh/sql-reference/statements/alter/settings-profile.md deleted file mode 120000 index 0e71ac4e831..00000000000 --- a/docs/zh/sql-reference/statements/alter/settings-profile.md +++ /dev/null @@ -1 +0,0 @@ -../../../../en/sql-reference/statements/alter/settings-profile.md \ No newline at end of file diff --git a/docs/zh/sql-reference/statements/alter/settings-profile.md b/docs/zh/sql-reference/statements/alter/settings-profile.md new file mode 100644 index 00000000000..045b2461e8c --- /dev/null +++ b/docs/zh/sql-reference/statements/alter/settings-profile.md @@ -0,0 +1,16 @@ +--- +toc_priority: 48 +toc_title: 配置文件设置 +--- + +## 更改配置文件设置 {#alter-settings-profile-statement} + +更改配置文件设置。 + +语法: + +``` sql +ALTER SETTINGS PROFILE [IF EXISTS] TO name1 [ON CLUSTER cluster_name1] [RENAME TO new_name1] + [, name2 [ON CLUSTER cluster_name2] [RENAME TO new_name2] ...] + [SETTINGS variable [= value] [MIN [=] min_value] [MAX [=] max_value] [READONLY|WRITABLE] | INHERIT 'profile_name'] [,...] +``` diff --git a/programs/benchmark/Benchmark.cpp b/programs/benchmark/Benchmark.cpp index 35ffb97b8e2..60e5ca92f77 100644 --- a/programs/benchmark/Benchmark.cpp +++ b/programs/benchmark/Benchmark.cpp @@ -435,6 +435,8 @@ private: Progress progress; executor.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); }); + executor.sendQuery(ClientInfo::QueryKind::INITIAL_QUERY); + ProfileInfo info; while (Block block = executor.read()) info.update(block); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b99ffd7ee18..22fe1f2ffff 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -144,7 +144,6 @@ list (APPEND dbms_sources AggregateFunctions/AggregateFunctionState.cpp AggregateFunctions/AggregateFunctionCount.cpp AggregateFunctions/parseAggregateFunctionParameters.cpp) - list (APPEND dbms_headers AggregateFunctions/IAggregateFunction.h AggregateFunctions/IAggregateFunctionCombinator.h @@ -155,10 +154,25 @@ list (APPEND dbms_headers AggregateFunctions/FactoryHelpers.h AggregateFunctions/parseAggregateFunctionParameters.h) -list (APPEND dbms_sources TableFunctions/ITableFunction.cpp TableFunctions/TableFunctionFactory.cpp) -list (APPEND dbms_headers TableFunctions/ITableFunction.h TableFunctions/TableFunctionFactory.h) -list (APPEND dbms_sources Dictionaries/DictionaryFactory.cpp Dictionaries/DictionarySourceFactory.cpp Dictionaries/DictionaryStructure.cpp Dictionaries/getDictionaryConfigurationFromAST.cpp) -list (APPEND dbms_headers Dictionaries/DictionaryFactory.h Dictionaries/DictionarySourceFactory.h Dictionaries/DictionaryStructure.h Dictionaries/getDictionaryConfigurationFromAST.h) +list (APPEND dbms_sources + TableFunctions/ITableFunction.cpp + TableFunctions/TableFunctionView.cpp + TableFunctions/TableFunctionFactory.cpp) +list (APPEND dbms_headers + TableFunctions/ITableFunction.h + TableFunctions/TableFunctionView.h + TableFunctions/TableFunctionFactory.h) + +list (APPEND dbms_sources + Dictionaries/DictionaryFactory.cpp + Dictionaries/DictionarySourceFactory.cpp + Dictionaries/DictionaryStructure.cpp + Dictionaries/getDictionaryConfigurationFromAST.cpp) +list (APPEND dbms_headers + Dictionaries/DictionaryFactory.h + Dictionaries/DictionarySourceFactory.h + Dictionaries/DictionaryStructure.h + Dictionaries/getDictionaryConfigurationFromAST.h) if (NOT ENABLE_SSL) list (REMOVE_ITEM clickhouse_common_io_sources Common/OpenSSLHelpers.cpp) @@ -253,18 +267,16 @@ if (TARGET ch_contrib::nuraft) add_object_library(clickhouse_coordination Coordination) endif() -set (DBMS_COMMON_LIBRARIES) - if (USE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES) add_library (dbms STATIC ${dbms_headers} ${dbms_sources}) - target_link_libraries (dbms PRIVATE ch_contrib::libdivide ${DBMS_COMMON_LIBRARIES}) + target_link_libraries (dbms PRIVATE ch_contrib::libdivide) if (TARGET ch_contrib::jemalloc) target_link_libraries (dbms PRIVATE ch_contrib::jemalloc) endif() set (all_modules dbms) else() add_library (dbms SHARED ${dbms_headers} ${dbms_sources}) - target_link_libraries (dbms PUBLIC ${all_modules} ${DBMS_COMMON_LIBRARIES}) + target_link_libraries (dbms PUBLIC ${all_modules}) target_link_libraries (clickhouse_interpreters PRIVATE ch_contrib::libdivide) if (TARGET ch_contrib::jemalloc) target_link_libraries (clickhouse_interpreters PRIVATE ch_contrib::jemalloc) @@ -557,6 +569,10 @@ if (ENABLE_TESTS) clickhouse_common_zookeeper string_utils) + if (TARGET ch_contrib::yaml_cpp) + target_link_libraries(unit_tests_dbms PRIVATE ch_contrib::yaml_cpp) + endif() + add_check(unit_tests_dbms) endif () diff --git a/src/Client/MultiplexedConnections.cpp b/src/Client/MultiplexedConnections.cpp index d1873ac038d..31fbc609bdc 100644 --- a/src/Client/MultiplexedConnections.cpp +++ b/src/Client/MultiplexedConnections.cpp @@ -133,7 +133,12 @@ void MultiplexedConnections::sendQuery( modified_settings.group_by_two_level_threshold_bytes = 0; } - if (settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas) + bool parallel_reading_from_replicas = settings.max_parallel_replicas > 1 + && settings.allow_experimental_parallel_reading_from_replicas + /// To avoid trying to coordinate with clickhouse-benchmark, + /// since it uses the same code. + && client_info.query_kind != ClientInfo::QueryKind::INITIAL_QUERY; + if (parallel_reading_from_replicas) { client_info.collaborate_with_initiator = true; client_info.count_participating_replicas = replica_info.all_replicas_count; diff --git a/src/Common/Config/YAMLParser.cpp b/src/Common/Config/YAMLParser.cpp index 1d6d7166669..bb83563ecc9 100644 --- a/src/Common/Config/YAMLParser.cpp +++ b/src/Common/Config/YAMLParser.cpp @@ -3,13 +3,9 @@ #if USE_YAML_CPP #include "YAMLParser.h" -#include -#include #include #include -#include -#include #include #include #include @@ -19,8 +15,6 @@ #include -#include - using namespace Poco::XML; namespace DB @@ -74,30 +68,35 @@ void processNode(const YAML::Node & node, Poco::XML::Element & parent_xml_elemen case YAML::NodeType::Sequence: { for (const auto & child_node : node) - if (parent_xml_element.hasChildNodes()) + /// For sequences it depends how we want to process them. + /// Sequences of key-value pairs such as: + /// seq: + /// - k1: val1 + /// - k2: val2 + /// into xml like this: + /// + /// val1 + /// val2 + /// + /// + /// But, if the sequence is just a list, the root-node needs to be repeated, such as: + /// seq: + /// - val1 + /// - val2 + /// into xml like this: + /// val1 + /// val2 + /// + /// Therefore check what type the child is, for further processing. + /// Mixing types (values list or map) will lead to strange results but should not happen. + if (parent_xml_element.hasChildNodes() && !child_node.IsMap()) { - /// We want to process sequences like that: - /// seq: - /// - val1 - /// - k2: val2 - /// - val3 - /// - k4: val4 - /// - val5 - /// into xml like this: - /// val1 - /// - /// val2 - /// - /// val3 - /// - /// val4 - /// - /// val5 - /// So, we create a new parent node with same tag for each child node + /// Create a new parent node with same tag for each child node processNode(child_node, *createCloneNode(parent_xml_element)); } else { + /// Map, so don't recreate the parent node but add directly processNode(child_node, parent_xml_element); } break; diff --git a/src/Common/PODArray.h b/src/Common/PODArray.h index b312fbda21c..9b15782a231 100644 --- a/src/Common/PODArray.h +++ b/src/Common/PODArray.h @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -196,7 +197,7 @@ protected: /// The operation is slow and performed only for debug builds. void protectImpl(int prot) { - static constexpr size_t PROTECT_PAGE_SIZE = 4096; + static size_t PROTECT_PAGE_SIZE = ::getPageSize(); char * left_rounded_up = reinterpret_cast((reinterpret_cast(c_start) - pad_left + PROTECT_PAGE_SIZE - 1) / PROTECT_PAGE_SIZE * PROTECT_PAGE_SIZE); char * right_rounded_down = reinterpret_cast((reinterpret_cast(c_end_of_storage) + pad_right) / PROTECT_PAGE_SIZE * PROTECT_PAGE_SIZE); diff --git a/src/Common/tests/gtest_helper_functions.h b/src/Common/tests/gtest_helper_functions.h new file mode 100644 index 00000000000..9d2ec5bee41 --- /dev/null +++ b/src/Common/tests/gtest_helper_functions.h @@ -0,0 +1,75 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include "Poco/DOM/Document.h" +#include "Poco/DOM/NodeList.h" +#include "Poco/DOM/NamedNodeMap.h" + +const std::string tmp_path = "/tmp/"; + +inline std::unique_ptr getFileWithContents(const char *fileName, const char *fileContents) +{ + using namespace DB; + namespace fs = std::filesystem; + using File = Poco::File; + + + fs::create_directories(fs::path(tmp_path)); + auto config_file = std::make_unique(tmp_path + fileName); + + { + WriteBufferFromFile out(config_file->path()); + writeString(fileContents, out); + } + + return config_file; +} + +inline std::string xmlNodeAsString(Poco::XML::Node *pNode) +{ + const auto& node_name = pNode->nodeName(); + + Poco::XML::XMLString result = "<" + node_name ; + + auto *attributes = pNode->attributes(); + for (auto i = 0; ilength();i++) + { + auto *item = attributes->item(i); + auto name = item->nodeName(); + auto text = item->innerText(); + result += (" " + name + "=\"" + text + "\""); + } + + result += ">"; + if (pNode->hasChildNodes() && pNode->firstChild()->nodeType() != Poco::XML::Node::TEXT_NODE) + { + result += "\n"; + } + + attributes->release(); + + auto *list = pNode->childNodes(); + for (auto i = 0; ilength();i++) + { + auto *item = list->item(i); + auto type = item->nodeType(); + if (type == Poco::XML::Node::ELEMENT_NODE) + { + result += xmlNodeAsString(item); + } + else if (type == Poco::XML::Node::TEXT_NODE) + { + result += item->innerText(); + } + + } + list->release(); + result += ("\n"); + return Poco::XML::fromXMLString(result); +} diff --git a/src/Common/tests/gtest_yaml_parser.cpp b/src/Common/tests/gtest_yaml_parser.cpp new file mode 100644 index 00000000000..8457e6fd4e1 --- /dev/null +++ b/src/Common/tests/gtest_yaml_parser.cpp @@ -0,0 +1,78 @@ +#include + +#if USE_YAML_CPP +#include "gtest_helper_functions.h" +#include +#include +#include +#include +#include "Poco/DOM/Document.h" + +#include + + +using namespace DB; + +TEST(Common, YamlParserInvalidFile) +{ + ASSERT_THROW(YAMLParser::parse("some-non-existing-file.yaml"), Exception); +} + +TEST(Common, YamlParserProcessKeysList) +{ + auto yaml_file = getFileWithContents("keys-list.yaml", R"YAML( +operator: + access_management: "1" + networks: + - ip: "10.1.6.168" + - ip: "::1" + - ip: "127.0.0.1" +)YAML"); + SCOPE_EXIT({ yaml_file->remove(); }); + + Poco::AutoPtr xml = YAMLParser::parse(yaml_file->path()); + auto *p_node = xml->getNodeByPath("/clickhouse"); + EXPECT_EQ(xmlNodeAsString(p_node), R"CONFIG( + +1 + +10.1.6.168 +::1 +127.0.0.1 + + + +)CONFIG"); + +} + +TEST(Common, YamlParserProcessValuesList) +{ + auto yaml_file = getFileWithContents("values-list.yaml", R"YAML( +rules: + - apiGroups: [""] + resources: + - nodes + - nodes/proxy + - services + - endpoints + - pods +)YAML"); + SCOPE_EXIT({ yaml_file->remove(); }); + + Poco::AutoPtr xml = YAMLParser::parse(yaml_file->path()); + auto *p_node = xml->getNodeByPath("/clickhouse"); + EXPECT_EQ(xmlNodeAsString(p_node), R"CONFIG( + + +nodes +nodes/proxy +services +endpoints +pods + + +)CONFIG"); + +} +#endif diff --git a/src/Coordination/KeeperStateManager.cpp b/src/Coordination/KeeperStateManager.cpp index 52d0b0cc881..f9bfea5e69a 100644 --- a/src/Coordination/KeeperStateManager.cpp +++ b/src/Coordination/KeeperStateManager.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include namespace DB { @@ -12,6 +14,70 @@ namespace ErrorCodes extern const int RAFT_ERROR; } +namespace +{ + +bool isLoopback(const std::string & hostname) +{ + try + { + return DNSResolver::instance().resolveHost(hostname).isLoopback(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + return false; +} + +bool isLocalhost(const std::string & hostname) +{ + try + { + return isLocalAddress(DNSResolver::instance().resolveHost(hostname)); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + return false; +} + +std::unordered_map getClientPorts(const Poco::Util::AbstractConfiguration & config) +{ + static const char * config_port_names[] = { + "keeper_server.tcp_port", + "keeper_server.tcp_port_secure", + "interserver_http_port", + "interserver_https_port", + "tcp_port", + "tcp_with_proxy_port", + "tcp_port_secure", + "mysql_port", + "postgresql_port", + "grpc_port", + "prometheus.port", + }; + + std::unordered_map ports; + for (const auto & config_port_name : config_port_names) + { + if (config.has(config_port_name)) + ports[config.getUInt64(config_port_name)] = config_port_name; + } + return ports; +} + +} + +/// this function quite long because contains a lot of sanity checks in config: +/// 1. No duplicate endpoints +/// 2. No "localhost" or "127.0.0.1" or another local addresses mixed with normal addresses +/// 3. Raft internal port is not equal to any other port for client +/// 4. No duplicate IDs +/// 5. Our ID present in hostnames list KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const { KeeperConfigurationWrapper result; @@ -19,12 +85,17 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC Poco::Util::AbstractConfiguration::Keys keys; config.keys(config_prefix + ".raft_configuration", keys); + auto client_ports = getClientPorts(config); + /// Sometimes (especially in cloud envs) users can provide incorrect /// configuration with duplicated raft ids or endpoints. We check them /// on config parsing stage and never commit to quorum. std::unordered_map check_duplicated_hostnames; size_t total_servers = 0; + std::string loopback_hostname; + std::string non_local_hostname; + size_t local_address_counter = 0; for (const auto & server_key : keys) { if (!startsWith(server_key, "server")) @@ -38,13 +109,33 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC int32_t priority = config.getInt(full_prefix + ".priority", 1); bool start_as_follower = config.getBool(full_prefix + ".start_as_follower", false); + if (client_ports.count(port) != 0) + { + throw Exception(ErrorCodes::RAFT_ERROR, "Raft configuration contains hostname '{}' with port '{}' which is equal to '{}' in server configuration", + hostname, port, client_ports[port]); + } + + if (isLoopback(hostname)) + { + loopback_hostname = hostname; + local_address_counter++; + } + else if (isLocalhost(hostname)) + { + local_address_counter++; + } + else + { + non_local_hostname = hostname; + } + if (start_as_follower) result.servers_start_as_followers.insert(new_server_id); auto endpoint = hostname + ":" + std::to_string(port); if (check_duplicated_hostnames.count(endpoint)) { - throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contain duplicate endpoints: " + throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contains duplicate endpoints: " "endpoint {} has been already added with id {}, but going to add it one more time with id {}", endpoint, check_duplicated_hostnames[endpoint], new_server_id); } @@ -54,7 +145,7 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC for (const auto & [id_endpoint, id] : check_duplicated_hostnames) { if (new_server_id == id) - throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contain duplicate ids: id {} has been already added with endpoint {}, " + throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contains duplicate ids: id {} has been already added with endpoint {}, " "but going to add it one more time with endpoint {}", id, id_endpoint, endpoint); } check_duplicated_hostnames.emplace(endpoint, new_server_id); @@ -77,6 +168,24 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC if (result.servers_start_as_followers.size() == total_servers) throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without )"); + if (!loopback_hostname.empty() && !non_local_hostname.empty()) + { + throw Exception( + ErrorCodes::RAFT_ERROR, + "Mixing loopback and non-local hostnames ('{}' and '{}') in raft_configuration is not allowed. " + "Different hosts can resolve it to themselves so it's not allowed.", + loopback_hostname, non_local_hostname); + } + + if (!non_local_hostname.empty() && local_address_counter > 1) + { + throw Exception( + ErrorCodes::RAFT_ERROR, + "Local address specified more than once ({} times) and non-local hostnames also exists ('{}') in raft_configuration. " + "Such configuration is not allowed because single host can vote multiple times.", + local_address_counter, non_local_hostname); + } + return result; } diff --git a/src/Dictionaries/ClickHouseDictionarySource.cpp b/src/Dictionaries/ClickHouseDictionarySource.cpp index deecc3c983e..5a18dcffb22 100644 --- a/src/Dictionaries/ClickHouseDictionarySource.cpp +++ b/src/Dictionaries/ClickHouseDictionarySource.cpp @@ -30,7 +30,7 @@ namespace ErrorCodes static const std::unordered_set dictionary_allowed_keys = { "host", "port", "user", "password", "db", "database", "table", - "update_field", "update_tag", "invalidate_query", "query", "where", "name", "secure"}; + "update_field", "update_lag", "invalidate_query", "query", "where", "name", "secure"}; namespace { diff --git a/src/Dictionaries/MySQLDictionarySource.cpp b/src/Dictionaries/MySQLDictionarySource.cpp index 29d70f3a7c4..6578f91aa73 100644 --- a/src/Dictionaries/MySQLDictionarySource.cpp +++ b/src/Dictionaries/MySQLDictionarySource.cpp @@ -34,7 +34,7 @@ static const std::unordered_set dictionary_allowed_keys = { "host", "port", "user", "password", "db", "database", "table", "schema", "update_field", "invalidate_query", "priority", - "update_tag", "dont_check_update_time", + "update_lag", "dont_check_update_time", "query", "where", "name" /* name_collection */, "socket", "share_connection", "fail_on_connection_loss", "close_connection", "ssl_ca", "ssl_cert", "ssl_key", diff --git a/src/Dictionaries/PostgreSQLDictionarySource.cpp b/src/Dictionaries/PostgreSQLDictionarySource.cpp index 6fdf486fdbf..511d6a7288e 100644 --- a/src/Dictionaries/PostgreSQLDictionarySource.cpp +++ b/src/Dictionaries/PostgreSQLDictionarySource.cpp @@ -30,7 +30,7 @@ static const UInt64 max_block_size = 8192; static const std::unordered_set dictionary_allowed_keys = { "host", "port", "user", "password", "db", "database", "table", "schema", - "update_field", "update_tag", "invalidate_query", "query", "where", "name", "priority"}; + "update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"}; namespace { diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index 57bfaf405e0..e49e9cf6726 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -325,7 +326,7 @@ DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path) void DiskLocal::moveFile(const String & from_path, const String & to_path) { - fs::rename(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path); + renameNoReplace(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path); } void DiskLocal::replaceFile(const String & from_path, const String & to_path) diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index aff4985a4f1..5d61285981b 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -286,7 +286,7 @@ std::unique_ptr DiskS3::writeFile(const String & path, settings->s3_upload_part_size_multiply_parts_count_threshold, settings->s3_max_single_part_upload_size, std::move(object_metadata), - buf_size /*, std::move(schedule) */); + buf_size, std::move(schedule)); auto create_metadata_callback = [this, path, blob_name, mode] (size_t count) { diff --git a/src/Functions/IFunction.h b/src/Functions/IFunction.h index 8063ad77ad0..71af6149774 100644 --- a/src/Functions/IFunction.h +++ b/src/Functions/IFunction.h @@ -267,7 +267,7 @@ public: */ virtual Monotonicity getMonotonicityForRange(const IDataType & /*type*/, const Field & /*left*/, const Field & /*right*/) const { - throw Exception("Function " + getName() + " has no information about its monotonicity.", ErrorCodes::NOT_IMPLEMENTED); + throw Exception("Function " + getName() + " has no information about its monotonicity", ErrorCodes::NOT_IMPLEMENTED); } }; @@ -452,7 +452,7 @@ public: using Monotonicity = IFunctionBase::Monotonicity; virtual Monotonicity getMonotonicityForRange(const IDataType & /*type*/, const Field & /*left*/, const Field & /*right*/) const { - throw Exception("Function " + getName() + " has no information about its monotonicity.", ErrorCodes::NOT_IMPLEMENTED); + throw Exception("Function " + getName() + " has no information about its monotonicity", ErrorCodes::NOT_IMPLEMENTED); } /// For non-variadic functions, return number of arguments; otherwise return zero (that should be ignored). diff --git a/src/IO/HadoopSnappyReadBuffer.cpp b/src/IO/HadoopSnappyReadBuffer.cpp index 324df67e900..cac05b4827b 100644 --- a/src/IO/HadoopSnappyReadBuffer.cpp +++ b/src/IO/HadoopSnappyReadBuffer.cpp @@ -11,7 +11,6 @@ #include "HadoopSnappyReadBuffer.h" - namespace DB { namespace ErrorCodes @@ -32,11 +31,11 @@ inline bool HadoopSnappyDecoder::checkAvailIn(size_t avail_in, int min) inline void HadoopSnappyDecoder::copyToBuffer(size_t * avail_in, const char ** next_in) { - assert(*avail_in <= sizeof(buffer)); + assert(*avail_in + buffer_length <= sizeof(buffer)); - memcpy(buffer, *next_in, *avail_in); + memcpy(buffer + buffer_length, *next_in, *avail_in); - buffer_length = *avail_in; + buffer_length += *avail_in; *next_in += *avail_in; *avail_in = 0; } @@ -78,14 +77,21 @@ inline HadoopSnappyDecoder::Status HadoopSnappyDecoder::readLength(size_t * avai inline HadoopSnappyDecoder::Status HadoopSnappyDecoder::readBlockLength(size_t * avail_in, const char ** next_in) { if (block_length < 0) + { return readLength(avail_in, next_in, &block_length); + } return Status::OK; } inline HadoopSnappyDecoder::Status HadoopSnappyDecoder::readCompressedLength(size_t * avail_in, const char ** next_in) { if (compressed_length < 0) - return readLength(avail_in, next_in, &compressed_length); + { + auto status = readLength(avail_in, next_in, &compressed_length); + if (unlikely(compressed_length > 0 && static_cast(compressed_length) > sizeof(buffer))) + throw Exception(ErrorCodes::SNAPPY_UNCOMPRESS_FAILED, "Too large snappy compressed block. buffer size: {}, compressed block size: {}", sizeof(buffer), compressed_length); + return status; + } return Status::OK; } @@ -111,7 +117,6 @@ HadoopSnappyDecoder::readCompressedData(size_t * avail_in, const char ** next_in { compressed = const_cast(*next_in); } - size_t uncompressed_length = *avail_out; auto status = snappy_uncompress(compressed, compressed_length, *next_out, &uncompressed_length); if (status != SNAPPY_OK) @@ -154,7 +159,9 @@ HadoopSnappyDecoder::Status HadoopSnappyDecoder::readBlock(size_t * avail_in, co return status; } if (total_uncompressed_length != block_length) + { return Status::INVALID_INPUT; + } return Status::OK; } diff --git a/src/IO/examples/hadoop_snappy_read_buffer.cpp b/src/IO/examples/hadoop_snappy_read_buffer.cpp index 9cb01e6d697..eeac3db40a7 100644 --- a/src/IO/examples/hadoop_snappy_read_buffer.cpp +++ b/src/IO/examples/hadoop_snappy_read_buffer.cpp @@ -38,6 +38,11 @@ int main() return 1; } } + if (uncompress(256) != output) + { + std::cout << "test hadoop snappy read buffer failed, buf_size:" << 256 << std::endl; + return 1; + } std::cout << "test hadoop snappy read buffer success" << std::endl; return 0; } diff --git a/src/IO/tests/gtest_hadoop_snappy_decoder.cpp b/src/IO/tests/gtest_hadoop_snappy_decoder.cpp new file mode 100644 index 00000000000..f681e8e61e1 --- /dev/null +++ b/src/IO/tests/gtest_hadoop_snappy_decoder.cpp @@ -0,0 +1,66 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +using namespace DB; +TEST(HadoopSnappyDecoder, repeatNeedMoreInput) +{ + String snappy_base64_content = "AAAl6gAAB67qSzQyMDIxLTA2LTAxAXh4eGIEACQzMTkuNzQyNDMKnjEAHDQyLjgyMTcynjEAIDI5Ni4yODQwNqIxA" + "BgyNy43MjYzqpMAGDMuNTIyMzSiYgAcNjUuNDk1OTeiMQAYOTQuNTg1NaYxABg4OC40NzgyojEAHDMyMS4zOTE1os" + "QAHDM0Ni4xNTI3qjEAGDEuMjA3MTWm9QAQMi41MjamYQAcMjIuNTEyNDieYQAcMzMwLjI5MTKiGgIcMzIzLjAzNDi" + "iwwAcMzE1LjA1MDmiYgAcNDM1Ljc2ODaqxAAUMS45NDA5nvQACDAuMP4rAEorABwzMDMuMjAyNaYZARgwOC4xOTEy" + "pugAGDQ2LjQ0MjKilQMcMjc4Ljk3MTiiMQAcMzUwLjc3NTeirAGqSwEcMzI5LjkyMzGiXAAcMzMxLjc2NzamwAMUM" + "TMuNjM4pjEAGDI3NC4yMzK2MQAINDg0qrMBFDExLjgzNqbbBRgyNDkuNTI5qtsFGDUwLjE4ODmi5AGlSAgwNjWmiA" + "EUMjIuNjU4pqcCBDUzYcCqdgIYMDEuMzcxNbbPBgQ5Na5TBBA0Ljc1OaIiBMGdDDM0OTGeJwQcMjg3LjIyNTmm/AM" + "hVAAyopAAGDMxOC4wMjGmMAAB8AQ0OKpGAhgyMC42MTM4poMBFDg3LjEzOKoxABA5My4xNaZSARQ5NS41ODemTgVh" + "OQwwODg2osIAGDMyNi45NTSmMQAcMjc3LjgxNDmqjQcMNS42MqpqA0F3DDg2MDamzAPhKwQ4OKJWARgzMDYuMTc1q" + "i0EGDgwLjIwNTSihAUYMjk3LjY5NaYiBRAyOTAuM6aNBBgyMzkuMzI5pkIJwdOi7wcYMzcxLjIyNqpiBxQ0NS44Nz" + "Gq9woEODAOZAoANqJ+BRgyNzYuMjExpnYCIYIMMjIyOKKnAmVrBDc0psQAEDMwOS4xqtEJGDMwNC45MzSq8wAMNC4" + "0OKomCyG3DDE4MTGi/AMhJAQxMKqjBhgyNjEuNDQ4rqMGFDIuOTEwN6I5AwQzN7JMCQw2LjcwqqoMGDI2MC44NzOm" + "dwIOTAkMNDgzMqLSBhQyNTkuMjGmYweBiwg3MzOmyQMYNDM3Ljg1N6ZyBq5QARQzMy43MjSqKw4UMTIuNzkxpkkFD" + "mgNDDc4MzCmUAEUOTUuOTQypnoFDiQIDDI2ODmmBQMUNTEuMjc2qikEDtkJBDA1qgUDFDA3LjE1N6ZiAOGUCDMwOa" + "oxABA3NC42NqqmAhA5Ni45N6rIAxwzMDcuMjkzMaL+ChQyNzUuODau/QoANOExpugBGDI0Ny4xODSm5wEYOTEuNDE" + "3MZ7MChQzMzUuNjWquQQUNTMuODg1psMHDu8SCDIyOaYJDoFbCDk4M6aWDhwzNDEuNTcyMKK1AUF4ADSqCwoQMzg1" + "LjSujBIB9Aw0MDUwotoJDi4PCDc0N6aHARgyMjMuODMxpgYRwmcRGDIxMi4xNjWqSgIQMDkuODmuzgMYMTkuNTg0M" + "aK7CMFFADmuZQcQMDYuMzKqXwAIOS4zrl8ADu4PBDQ0qtQUGDQ3LjAzODGmFwIYMTAuOTIwMKLDAKG0DDM3MDOiYg" + "CqNgcORgkEMzeuGwWqXQAhqwg2MDWmSQUYMjY0LjE2N6aZFBIgFgQyM6aiCRQwNi41NTSm9AcYMjczLjczNqqSABg" + "0NS45OTIzpugPFDIxLjc3MqZ4EBwyODYuMDkyNKZAAhg0OS4yMjQzom8GDu0LCDEwNKaTBwAzDiUIADimGQkUMzM4" + "Ljc2qlITADcOmBUAOaYNBhwyNzAuODA4N6qrCQw3LjAwppkYwT4IMjYzrg0GDDMuOTmq/xEQMjIuODOqRgkEMjQOX" + "xKmQA0IMzAwDggVqjwREDY1LjYxsh8aCDQuOKrCBxgyNTQuNjQ2phMUISQENzmqsAwOLgsENTWqeAIQOTEuNTiuzR" + "EANw55CQAwpp8GEDI2My44rgsRFDI0LjMxNqZuBhIrFgAxqswDGDI4OS4zMzCqXwQANoHyADCmbAMUMzI4LjM2pps" + "DDDY1LjKBj57+Cg5PFwQ1NaoVBmFrADaqwgccMjk5LjgxMTCqdwYQMy4wODKmZwcEMzIOqBQAMaaCBRgyMjUuMTE2" + "qtkJADEOLw8AMKYwBBgyMzAuMTQyprwPGDMwMi4wMjemiAEOzQ4MODA0M6YaAhA1NC4yNKYkBWEMDsELqmEAFDIuN" + "jE4N6LNBxgyODMuNTM1qqUfFDk5Ljc4NKaaGQ5UEAgyNjSuqw2usgkYNDMuMDY0MZ5rAyHkCDMzOa6sHg6+CwAwpn" + "YGDnseCDk1MqaoAsHYDDgzNjeiLgsYMjg1LjkzMqZ1EQ67IQgyNTmmMQBB2Qg0OTamuhMUMjcxLjkzqpMWBDMyDoo" + "hADSmYgChhAg2NjimeAIQMzkxLjiqyw4IOTkuDt8bpoYBDDk0LjamaQMO4hAIOTI3qqQYFDQyLjk1M6oxAAQ4NA7G" + "HaZKIg6YCwwxNzYzpiQXFDkwLjk0OKqqAhQ5Ny4yNzSmvwQANg54GKq/CA4AIQg1MzOm/wMUNTYuNzQ2phcCHDM0N" + "S4wOTEyoswHDoAQCDA5M6rOGRA5MS42N6ZPGyQyNzUuNzExMTIK"; + String snappy_content; + Poco::MemoryInputStream istr(snappy_base64_content.data(), snappy_base64_content.size()); + Poco::Base64Decoder decoder(istr); + Poco::StreamCopier::copyToString(decoder, snappy_content); + auto file_writer = std::make_unique("./test.snappy"); + file_writer->write(snappy_content.c_str(), snappy_content.size()); + file_writer->close(); + std::unique_ptr in = std::make_unique("./test.snappy", 128); + HadoopSnappyReadBuffer read_buffer(std::move(in)); + String output; + WriteBufferFromString out(output); + copyData(read_buffer, out); + UInt128 hashcode = sipHash128(output.c_str(), output.size()); + String hashcode_str = getHexUIntLowercase(hashcode); + ASSERT_EQ(hashcode_str, "593afe14f61866915cc00b8c7bd86046"); +} diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index 3773dadaf13..248d212ebf0 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -207,7 +207,6 @@ public: using ShardsInfo = std::vector; - String getHashOfAddresses() const { return hash_of_addresses; } const ShardsInfo & getShardsInfo() const { return shards_info; } const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; } @@ -263,7 +262,6 @@ private: /// Inter-server secret String secret; - String hash_of_addresses; /// Description of the cluster shards. ShardsInfo shards_info; /// Any remote shard. diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 0db07267231..884b8445732 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -116,7 +116,7 @@ void executeQuery( const Settings & settings = context->getSettingsRef(); - if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth) + if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth) throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); std::vector plans; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 318898c02b8..1512e7b90f2 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1918,20 +1918,16 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc else if (interpreter_subquery) { /// Subquery. - /// If we need less number of columns that subquery have - update the interpreter. - if (required_columns.size() < source_header.columns()) - { - ASTPtr subquery = extractTableExpression(query, 0); - if (!subquery) - throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR); + ASTPtr subquery = extractTableExpression(query, 0); + if (!subquery) + throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR); - interpreter_subquery = std::make_unique( - subquery, getSubqueryContext(context), - options.copy().subquery().noModify(), required_columns); + interpreter_subquery = std::make_unique( + subquery, getSubqueryContext(context), + options.copy().subquery().noModify(), required_columns); - if (query_analyzer->hasAggregation()) - interpreter_subquery->ignoreWithTotals(); - } + if (query_analyzer->hasAggregation()) + interpreter_subquery->ignoreWithTotals(); interpreter_subquery->buildQueryPlan(query_plan); query_plan.addInterpreterContext(context); diff --git a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp index 723db59f04b..130b3aae58d 100644 --- a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp +++ b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp @@ -208,8 +208,10 @@ Block InterpreterSelectWithUnionQuery::getCurrentChildResultHeader(const ASTPtr if (ast_ptr_->as()) return InterpreterSelectWithUnionQuery(ast_ptr_, context, options.copy().analyze().noModify(), required_result_column_names) .getSampleBlock(); - else + else if (ast_ptr_->as()) return InterpreterSelectQuery(ast_ptr_, context, options.copy().analyze().noModify()).getSampleBlock(); + else + return InterpreterSelectIntersectExceptQuery(ast_ptr_, context, options.copy().analyze().noModify()).getSampleBlock(); } std::unique_ptr diff --git a/src/Interpreters/JoinedTables.cpp b/src/Interpreters/JoinedTables.cpp index 3aae3982758..482a813bfef 100644 --- a/src/Interpreters/JoinedTables.cpp +++ b/src/Interpreters/JoinedTables.cpp @@ -183,7 +183,9 @@ std::unique_ptr JoinedTables::makeLeftTableSubq { if (!isLeftTableSubquery()) return {}; - return std::make_unique(left_table_expression, context, select_options); + + /// Only build dry_run interpreter during analysis. We will reconstruct the subquery interpreter during plan building. + return std::make_unique(left_table_expression, context, select_options.copy().analyze()); } StoragePtr JoinedTables::getLeftTableStorage() diff --git a/src/Interpreters/Set.cpp b/src/Interpreters/Set.cpp index 32dac7f9e9b..7af3e23d0d4 100644 --- a/src/Interpreters/Set.cpp +++ b/src/Interpreters/Set.cpp @@ -445,7 +445,7 @@ MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector & key_ranges, const DataTypes & data_types) const +BoolMask MergeTreeSetIndex::checkInRange(const std::vector & key_ranges, const DataTypes & data_types, bool single_point) const { size_t tuple_size = indexes_mapping.size(); @@ -468,7 +468,8 @@ BoolMask MergeTreeSetIndex::checkInRange(const std::vector & key_ranges, std::optional new_range = KeyCondition::applyMonotonicFunctionsChainToRange( key_ranges[indexes_mapping[i].key_index], indexes_mapping[i].functions, - data_types[indexes_mapping[i].key_index]); + data_types[indexes_mapping[i].key_index], + single_point); if (!new_range) return {true, true}; diff --git a/src/Interpreters/Set.h b/src/Interpreters/Set.h index 3146b6af03f..2eecb0211a4 100644 --- a/src/Interpreters/Set.h +++ b/src/Interpreters/Set.h @@ -214,7 +214,7 @@ public: bool hasMonotonicFunctionsChain() const; - BoolMask checkInRange(const std::vector & key_ranges, const DataTypes & data_types) const; + BoolMask checkInRange(const std::vector & key_ranges, const DataTypes & data_types, bool single_point = false) const; private: // If all arguments in tuple are key columns, we can optimize NOT IN when there is only one element. diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index e00e0aba7b3..c51201750c5 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -442,9 +442,9 @@ namespace pattern_list_args->children = { std::make_shared("^["), to_remove, - std::make_shared("]*|["), + std::make_shared("]+|["), to_remove, - std::make_shared("]*$") + std::make_shared("]+$") }; func_name = "replaceRegexpAll"; } @@ -455,7 +455,7 @@ namespace pattern_list_args->children = { std::make_shared("^["), to_remove, - std::make_shared("]*") + std::make_shared("]+") }; } else @@ -464,7 +464,7 @@ namespace pattern_list_args->children = { std::make_shared("["), to_remove, - std::make_shared("]*$") + std::make_shared("]+$") }; } func_name = "replaceRegexpOne"; diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 142e56ceb25..d1275444b84 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -210,7 +210,7 @@ static Block adaptBlockStructure(const Block & block, const Block & header) return res; } -void RemoteQueryExecutor::sendQuery() +void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind) { if (sent_query) return; @@ -237,13 +237,7 @@ void RemoteQueryExecutor::sendQuery() auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context->getClientInfo(); - modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - /// Set initial_query_id to query_id for the clickhouse-benchmark. - /// - /// (since first query of clickhouse-benchmark will be issued as SECONDARY_QUERY, - /// due to it executes queries via RemoteBlockInputStream) - if (modified_client_info.initial_query_id.empty()) - modified_client_info.initial_query_id = query_id; + modified_client_info.query_kind = query_kind; if (CurrentThread::isInitialized()) { modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context; diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 655bd5603de..78bc9f611ab 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -83,7 +83,13 @@ public: ~RemoteQueryExecutor(); /// Create connection and send query, external tables and scalars. - void sendQuery(); + /// + /// @param query_kind - kind of query, usually it is SECONDARY_QUERY, + /// since this is the queries between servers + /// (for which this code was written in general). + /// But clickhouse-benchmark uses the same code, + /// and it should pass INITIAL_QUERY. + void sendQuery(ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::SECONDARY_QUERY); /// Query is resent to a replica, the query itself can be modified. std::atomic resent_query { false }; diff --git a/src/Storages/Cache/ExternalDataSourceCache.cpp b/src/Storages/Cache/ExternalDataSourceCache.cpp index 0f78c8d3511..18607c16ffa 100644 --- a/src/Storages/Cache/ExternalDataSourceCache.cpp +++ b/src/Storages/Cache/ExternalDataSourceCache.cpp @@ -27,7 +27,8 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -LocalFileHolder::LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_controller) : file_cache_controller(std::move(cache_controller)) +LocalFileHolder::LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_controller) + : file_cache_controller(std::move(cache_controller)), original_readbuffer(nullptr), thread_pool(nullptr) { file_buffer = file_cache_controller->value().allocFile(); if (!file_buffer) @@ -35,18 +36,43 @@ LocalFileHolder::LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_cont ErrorCodes::LOGICAL_ERROR, "Create file readbuffer failed. {}", file_cache_controller->value().getLocalPath().string()); } +LocalFileHolder::LocalFileHolder( + RemoteFileCacheType::MappedHolderPtr cache_controller, + std::unique_ptr original_readbuffer_, + BackgroundSchedulePool * thread_pool_) + : file_cache_controller(std::move(cache_controller)) + , file_buffer(nullptr) + , original_readbuffer(std::move(original_readbuffer_)) + , thread_pool(thread_pool_) +{ +} + +LocalFileHolder::~LocalFileHolder() +{ + if (original_readbuffer) + { + dynamic_cast(original_readbuffer.get())->seek(0, SEEK_SET); + file_cache_controller->value().startBackgroundDownload(std::move(original_readbuffer), *thread_pool); + } +} + RemoteReadBuffer::RemoteReadBuffer(size_t buff_size) : BufferWithOwnMemory(buff_size) { } std::unique_ptr RemoteReadBuffer::create( - ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr read_buffer, size_t buff_size) + ContextPtr context, + IRemoteFileMetadataPtr remote_file_metadata, + std::unique_ptr read_buffer, + size_t buff_size, + bool is_random_accessed) + { auto remote_path = remote_file_metadata->remote_path; auto remote_read_buffer = std::make_unique(buff_size); std::tie(remote_read_buffer->local_file_holder, read_buffer) - = ExternalDataSourceCache::instance().createReader(context, remote_file_metadata, read_buffer); + = ExternalDataSourceCache::instance().createReader(context, remote_file_metadata, read_buffer, is_random_accessed); if (remote_read_buffer->local_file_holder == nullptr) return read_buffer; remote_read_buffer->remote_file_size = remote_file_metadata->file_size; @@ -55,6 +81,19 @@ std::unique_ptr RemoteReadBuffer::create( bool RemoteReadBuffer::nextImpl() { + if (local_file_holder->original_readbuffer) + { + auto status = local_file_holder->original_readbuffer->next(); + if (status) + { + BufferBase::set( + local_file_holder->original_readbuffer->buffer().begin(), + local_file_holder->original_readbuffer->buffer().size(), + local_file_holder->original_readbuffer->offset()); + } + return status; + } + auto start_offset = local_file_holder->file_buffer->getPosition(); auto end_offset = start_offset + local_file_holder->file_buffer->internalBuffer().size(); local_file_holder->file_cache_controller->value().waitMoreData(start_offset, end_offset); @@ -73,6 +112,16 @@ bool RemoteReadBuffer::nextImpl() off_t RemoteReadBuffer::seek(off_t offset, int whence) { + if (local_file_holder->original_readbuffer) + { + auto ret = dynamic_cast(local_file_holder->original_readbuffer.get())->seek(offset, whence); + BufferBase::set( + local_file_holder->original_readbuffer->buffer().begin(), + local_file_holder->original_readbuffer->buffer().size(), + local_file_holder->original_readbuffer->offset()); + return ret; + } + if (!local_file_holder->file_buffer) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot call seek() in this buffer. It's a bug!"); /* @@ -88,6 +137,10 @@ off_t RemoteReadBuffer::seek(off_t offset, int whence) off_t RemoteReadBuffer::getPosition() { + if (local_file_holder->original_readbuffer) + { + return dynamic_cast(local_file_holder->original_readbuffer.get())->getPosition(); + } return local_file_holder->file_buffer->getPosition(); } @@ -164,7 +217,7 @@ String ExternalDataSourceCache::calculateLocalPath(IRemoteFileMetadataPtr metada } std::pair, std::unique_ptr> ExternalDataSourceCache::createReader( - ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr & read_buffer) + ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr & read_buffer, bool is_random_accessed) { // If something is wrong on startup, rollback to read from the original ReadBuffer. if (!isInitialized()) @@ -180,6 +233,11 @@ std::pair, std::unique_ptr> Externa auto cache = lru_caches->get(local_path); if (cache) { + if (!cache->value().isEnable()) + { + return {nullptr, std::move(read_buffer)}; + } + // The remote file has been updated, need to redownload. if (!cache->value().isValid() || cache->value().isModified(remote_file_metadata)) { @@ -216,6 +274,17 @@ std::pair, std::unique_ptr> Externa lru_caches->weight()); return {nullptr, std::move(read_buffer)}; } + /* + If read_buffer is seekable, use read_buffer directly inside LocalFileHolder. And once LocalFileHolder is released, + start the download process in background. + The cache is marked disable until the download process finish. + For reading parquet files from hdfs, with this optimization, the speedup can reach 3x. + */ + if (dynamic_cast(read_buffer.get()) && is_random_accessed) + { + new_cache->value().disable(); + return {std::make_unique(std::move(new_cache), std::move(read_buffer), &context->getSchedulePool()), nullptr}; + } new_cache->value().startBackgroundDownload(std::move(read_buffer), context->getSchedulePool()); return {std::make_unique(std::move(new_cache)), nullptr}; } diff --git a/src/Storages/Cache/ExternalDataSourceCache.h b/src/Storages/Cache/ExternalDataSourceCache.h index c555198e4c4..a25686b49c1 100644 --- a/src/Storages/Cache/ExternalDataSourceCache.h +++ b/src/Storages/Cache/ExternalDataSourceCache.h @@ -34,10 +34,13 @@ class LocalFileHolder { public: explicit LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_controller); - ~LocalFileHolder() = default; + explicit LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_controller, std::unique_ptr original_readbuffer_, BackgroundSchedulePool * thread_pool_); + ~LocalFileHolder(); RemoteFileCacheType::MappedHolderPtr file_cache_controller; std::unique_ptr file_buffer; + std::unique_ptr original_readbuffer; + BackgroundSchedulePool * thread_pool; }; class RemoteReadBuffer : public BufferWithOwnMemory @@ -45,7 +48,7 @@ class RemoteReadBuffer : public BufferWithOwnMemory public: explicit RemoteReadBuffer(size_t buff_size); ~RemoteReadBuffer() override = default; - static std::unique_ptr create(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr read_buffer, size_t buff_size); + static std::unique_ptr create(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr read_buffer, size_t buff_size, bool is_random_accessed = false); bool nextImpl() override; off_t seek(off_t off, int whence) override; @@ -70,7 +73,8 @@ public: inline bool isInitialized() const { return initialized; } std::pair, std::unique_ptr> - createReader(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr & read_buffer); + createReader(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr & read_buffer, bool is_random_accessed); + void updateTotalSize(size_t size) { total_size += size; } diff --git a/src/Storages/Cache/RemoteCacheController.cpp b/src/Storages/Cache/RemoteCacheController.cpp index b5fc38fffcd..b72f5336ea4 100644 --- a/src/Storages/Cache/RemoteCacheController.cpp +++ b/src/Storages/Cache/RemoteCacheController.cpp @@ -169,6 +169,7 @@ void RemoteCacheController::backgroundDownload(ReadBufferPtr remote_read_buffer) file_status = DOWNLOADED; flush(true); data_file_writer.reset(); + is_enable = true; lock.unlock(); more_data_signal.notify_all(); ExternalDataSourceCache::instance().updateTotalSize(file_metadata_ptr->file_size); diff --git a/src/Storages/Cache/RemoteCacheController.h b/src/Storages/Cache/RemoteCacheController.h index ca2cb837e34..5f9d92c1349 100644 --- a/src/Storages/Cache/RemoteCacheController.h +++ b/src/Storages/Cache/RemoteCacheController.h @@ -63,6 +63,22 @@ public: std::lock_guard lock(mutex); return valid; } + inline bool isEnable() + { + std::lock_guard lock(mutex); + return is_enable; + + } + inline void disable() + { + std::lock_guard lock(mutex); + is_enable = false; + } + inline void enable() + { + std::lock_guard lock(mutex); + is_enable = true; + } IRemoteFileMetadataPtr getFileMetadata() { return file_metadata_ptr; } inline size_t getFileSize() const { return file_metadata_ptr->file_size; } @@ -83,6 +99,17 @@ private: IRemoteFileMetadataPtr file_metadata_ptr; std::filesystem::path local_path; + /** + * is_enable = true, only when the remotereadbuffer has been cached at local disk. + * + * The first time to access a remotebuffer which is not cached at local disk, we use the original remotebuffer directly and mark RemoteCacheController::is_enable = false. + * When the first time access is finished, LocalFileHolder will start a background download process by reusing the same remotebuffer object. After the download process + * finish, is_enable is set true. + * + * So when is_enable=false, if there is anther thread trying to access the same remote file, it would fail to use the local file buffer and use the original remotebuffer + * instead. Avoid multi threads trying to save the same file in to disk at the same time. + */ + bool is_enable = true; bool valid = true; size_t local_cache_bytes_read_before_flush; size_t current_offset; diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 9951fb436b5..aa703bcbb89 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -126,7 +126,7 @@ DistributedSink::DistributedSink( , log(&Poco::Logger::get("DistributedBlockOutputStream")) { const auto & settings = context->getSettingsRef(); - if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth) + if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth) throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); context->getClientInfo().distributed_depth += 1; random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key; diff --git a/src/Storages/ExternalDataSourceConfiguration.cpp b/src/Storages/ExternalDataSourceConfiguration.cpp index 2d4b05c51b5..5549a816a06 100644 --- a/src/Storages/ExternalDataSourceConfiguration.cpp +++ b/src/Storages/ExternalDataSourceConfiguration.cpp @@ -34,7 +34,7 @@ IMPLEMENT_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS) static const std::unordered_set dictionary_allowed_keys = { "host", "port", "user", "password", "db", "database", "table", "schema", "replica", - "update_field", "update_tag", "invalidate_query", "query", + "update_field", "update_lag", "invalidate_query", "query", "where", "name", "secure", "uri", "collection"}; diff --git a/src/Storages/ExternalDataSourceConfiguration.h b/src/Storages/ExternalDataSourceConfiguration.h index 1e08b088b1d..cc3e136ba50 100644 --- a/src/Storages/ExternalDataSourceConfiguration.h +++ b/src/Storages/ExternalDataSourceConfiguration.h @@ -16,7 +16,7 @@ struct ExternalDataSourceConfiguration { String host; UInt16 port = 0; - String username; + String username = "default"; String password; String database; String table; diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index 63445c4a24c..2ae7c30fd5b 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -116,13 +116,12 @@ public: , compression_method(compression_method_) , max_block_size(max_block_size_) , sample_block(std::move(sample_block_)) - , to_read_block(sample_block) , columns_description(getColumnsDescription(sample_block, source_info)) , text_input_field_names(text_input_field_names_) , format_settings(getFormatSettings(getContext())) { - /// Initialize to_read_block, which is used to read data from HDFS. to_read_block = sample_block; + /// Initialize to_read_block, which is used to read data from HDFS. for (const auto & name_type : source_info->partition_name_types) { to_read_block.erase(name_type.name); @@ -171,9 +170,13 @@ public: size_t buff_size = raw_read_buf->internalBuffer().size(); if (buff_size == 0) buff_size = DBMS_DEFAULT_BUFFER_SIZE; - remote_read_buf = RemoteReadBuffer::create(getContext(), - std::make_shared("Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getSize(), curr_file->getLastModTs()), - std::move(raw_read_buf), buff_size); + remote_read_buf = RemoteReadBuffer::create( + getContext(), + std::make_shared( + "Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getSize(), curr_file->getLastModTs()), + std::move(raw_read_buf), + buff_size, + format == "Parquet" || format == "ORC"); } else remote_read_buf = std::move(raw_read_buf); @@ -207,11 +210,17 @@ public: /// Enrich with partition columns. auto types = source_info->partition_name_types.getTypes(); + auto names = source_info->partition_name_types.getNames(); + auto fields = source_info->hive_files[current_idx]->getPartitionValues(); for (size_t i = 0; i < types.size(); ++i) { - auto column = types[i]->createColumnConst(num_rows, source_info->hive_files[current_idx]->getPartitionValues()[i]); - auto previous_idx = sample_block.getPositionByName(source_info->partition_name_types.getNames()[i]); - columns.insert(columns.begin() + previous_idx, column->convertToFullColumnIfConst()); + // Only add the required partition columns. partition columns are not read from readbuffer + // the column must be in sample_block, otherwise sample_block.getPositionByName(names[i]) will throw an exception + if (!sample_block.has(names[i])) + continue; + auto column = types[i]->createColumnConst(num_rows, fields[i]); + auto previous_idx = sample_block.getPositionByName(names[i]); + columns.insert(columns.begin() + previous_idx, column); } /// Enrich with virtual columns. @@ -551,7 +560,34 @@ HiveFilePtr StorageHive::createHiveFileIfNeeded( } return hive_file; } +bool StorageHive::isColumnOriented() const +{ + return format_name == "Parquet" || format_name == "ORC"; +} +void StorageHive::getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const +{ + if (!isColumnOriented()) + sample_block = header_block; + UInt32 erased_columns = 0; + for (const auto & column : partition_columns) + { + if (sample_block.has(column)) + erased_columns++; + } + if (erased_columns == sample_block.columns()) + { + for (size_t i = 0; i < header_block.columns(); ++i) + { + const auto & col = header_block.getByPosition(i); + if (!partition_columns.count(col.name)) + { + sample_block.insert(col); + break; + } + } + } +} Pipe StorageHive::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -617,14 +653,20 @@ Pipe StorageHive::read( sources_info->table_name = hive_table; sources_info->hive_metastore_client = hive_metastore_client; sources_info->partition_name_types = partition_name_types; + + const auto & header_block = metadata_snapshot->getSampleBlock(); + Block sample_block; for (const auto & column : column_names) { + sample_block.insert(header_block.getByName(column)); if (column == "_path") sources_info->need_path_column = true; if (column == "_file") sources_info->need_file_column = true; } + getActualColumnsToRead(sample_block, header_block, NameSet{partition_names.begin(), partition_names.end()}); + if (num_streams > sources_info->hive_files.size()) num_streams = sources_info->hive_files.size(); @@ -636,7 +678,7 @@ Pipe StorageHive::read( hdfs_namenode_url, format_name, compression_method, - metadata_snapshot->getSampleBlock(), + sample_block, context_, max_block_size, text_input_field_names)); diff --git a/src/Storages/Hive/StorageHive.h b/src/Storages/Hive/StorageHive.h index 40787a409e8..323293cbbe0 100644 --- a/src/Storages/Hive/StorageHive.h +++ b/src/Storages/Hive/StorageHive.h @@ -53,6 +53,8 @@ public: NamesAndTypesList getVirtuals() const override; + bool isColumnOriented() const override; + protected: friend class StorageHiveSource; StorageHive( @@ -88,6 +90,8 @@ private: HiveFilePtr createHiveFileIfNeeded(const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_); + void getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const; + String hive_metastore_url; /// Hive database and table diff --git a/src/Storages/MergeTree/KeyCondition.cpp b/src/Storages/MergeTree/KeyCondition.cpp index 323b59e2902..c17eb5a981e 100644 --- a/src/Storages/MergeTree/KeyCondition.cpp +++ b/src/Storages/MergeTree/KeyCondition.cpp @@ -448,7 +448,7 @@ KeyCondition::KeyCondition( { for (size_t i = 0, size = key_column_names.size(); i < size; ++i) { - std::string name = key_column_names[i]; + const auto & name = key_column_names[i]; if (!key_columns.count(name)) key_columns[name] = i; } @@ -1999,7 +1999,7 @@ BoolMask KeyCondition::checkInHyperrectangle( if (!element.set_index) throw Exception("Set for IN is not created yet", ErrorCodes::LOGICAL_ERROR); - rpn_stack.emplace_back(element.set_index->checkInRange(hyperrectangle, data_types)); + rpn_stack.emplace_back(element.set_index->checkInRange(hyperrectangle, data_types, single_point)); if (element.function == RPNElement::FUNCTION_NOT_IN_SET) rpn_stack.back() = !rpn_stack.back(); } diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 524e39bfef7..508f510ea26 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -126,13 +126,9 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() if (ctx->disk->exists(local_new_part_tmp_path)) throw Exception("Directory " + fullPath(ctx->disk, local_new_part_tmp_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); - { - std::lock_guard lock(global_ctx->mutator->tmp_parts_lock); - global_ctx->mutator->tmp_parts.emplace(local_tmp_part_basename); - } + global_ctx->data->temporary_parts.add(local_tmp_part_basename); SCOPE_EXIT( - std::lock_guard lock(global_ctx->mutator->tmp_parts_lock); - global_ctx->mutator->tmp_parts.erase(local_tmp_part_basename); + global_ctx->data->temporary_parts.remove(local_tmp_part_basename); ); global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical(); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index e28c065def7..ee9120dbceb 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1488,7 +1488,7 @@ static bool isOldPartDirectory(const DiskPtr & disk, const String & directory_pa } -size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMutator & merger_mutator, size_t custom_directories_lifetime_seconds) +size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds) { /// If the method is already called from another thread, then we don't need to do anything. std::unique_lock lock(clear_old_temporary_directories_mutex, std::defer_lock); @@ -1520,9 +1520,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMuta { if (disk->isDirectory(it->path()) && isOldPartDirectory(disk, it->path(), deadline)) { - if (merger_mutator.hasTemporaryPart(basename)) + if (temporary_parts.contains(basename)) { - LOG_WARNING(log, "{} is an active destination for one of merge/mutation (consider increasing temporary_directories_lifetime setting)", full_path); + LOG_WARNING(log, "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path); continue; } else diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index bdbe1cf7aba..e45c8fab54b 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -3,30 +3,31 @@ #include #include #include -#include -#include -#include -#include -#include -#include -#include #include #include #include #include #include #include +#include +#include +#include +#include +#include +#include #include #include -#include #include #include #include -#include -#include +#include +#include +#include +#include #include #include -#include +#include +#include #include @@ -585,7 +586,7 @@ public: /// Delete all directories which names begin with "tmp" /// Must be called with locked lockForShare() because it's using relative_data_path. - size_t clearOldTemporaryDirectories(const MergeTreeDataMergerMutator & merger_mutator, size_t custom_directories_lifetime_seconds); + size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds); size_t clearEmptyParts(); @@ -929,7 +930,6 @@ public: mutable std::mutex currently_submerging_emerging_mutex; protected: - friend class IMergeTreeDataPart; friend class MergeTreeDataMergerMutator; friend struct ReplicatedMergeTreeTableMetadata; @@ -1226,6 +1226,8 @@ private: /// Create zero-copy exclusive lock for part and disk. Useful for coordination of /// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree. virtual std::optional tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; } + + TemporaryParts temporary_parts; }; /// RAII struct to record big parts that are submerging or emerging. diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index d59ac5d8f1d..d8dec4a0d24 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -797,10 +797,4 @@ ExecuteTTLType MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadat } -bool MergeTreeDataMergerMutator::hasTemporaryPart(const std::string & basename) const -{ - std::lock_guard lock(tmp_parts_lock); - return tmp_parts.contains(basename); -} - } diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 0c4e697cbeb..31d61adcc11 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -200,26 +200,6 @@ private: ITTLMergeSelector::PartitionIdToTTLs next_recompress_ttl_merge_times_by_partition; /// Performing TTL merges independently for each partition guarantees that /// there is only a limited number of TTL merges and no partition stores data, that is too stale - -public: - /// Returns true if passed part name is active. - /// (is the destination for one of active mutation/merge). - /// - /// NOTE: that it accept basename (i.e. dirname), not the path, - /// since later requires canonical form. - bool hasTemporaryPart(const std::string & basename) const; - -private: - /// Set of active temporary paths that is used as the destination. - /// List of such paths is required to avoid trying to remove them during cleanup. - /// - /// NOTE: It is pretty short, so use STL is fine. - std::unordered_set tmp_parts; - /// Lock for "tmp_parts". - /// - /// NOTE: mutable is required to mark hasTemporaryPath() const - mutable std::mutex tmp_parts_lock; - }; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 26bfd951d3d..3b6c727cd02 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -64,7 +64,7 @@ void ReplicatedMergeTreeCleanupThread::iterate() /// Both use relative_data_path which changes during rename, so we /// do it under share lock storage.clearOldWriteAheadLogs(); - storage.clearOldTemporaryDirectories(storage.merger_mutator, storage.getSettings()->temporary_directories_lifetime.totalSeconds()); + storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds()); } /// This is loose condition: no problem if we actually had lost leadership at this moment diff --git a/src/Storages/MergeTree/TemporaryParts.cpp b/src/Storages/MergeTree/TemporaryParts.cpp new file mode 100644 index 00000000000..4239c8232e5 --- /dev/null +++ b/src/Storages/MergeTree/TemporaryParts.cpp @@ -0,0 +1,24 @@ +#include + +namespace DB +{ + +bool TemporaryParts::contains(const std::string & basename) const +{ + std::lock_guard lock(mutex); + return parts.contains(basename); +} + +void TemporaryParts::add(std::string basename) +{ + std::lock_guard lock(mutex); + parts.emplace(std::move(basename)); +} + +void TemporaryParts::remove(const std::string & basename) +{ + std::lock_guard lock(mutex); + parts.erase(basename); +} + +} diff --git a/src/Storages/MergeTree/TemporaryParts.h b/src/Storages/MergeTree/TemporaryParts.h new file mode 100644 index 00000000000..bc9d270856f --- /dev/null +++ b/src/Storages/MergeTree/TemporaryParts.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +/// Manages set of active temporary paths that should not be cleaned by background thread. +class TemporaryParts : private boost::noncopyable +{ +private: + /// To add const qualifier for contains() + mutable std::mutex mutex; + + /// NOTE: It is pretty short, so use STL is fine. + std::unordered_set parts; + +public: + /// Returns true if passed part name is active. + /// (is the destination for one of active mutation/merge). + /// + /// NOTE: that it accept basename (i.e. dirname), not the path, + /// since later requires canonical form. + bool contains(const std::string & basename) const; + + void add(std::string basename); + void remove(const std::string & basename); +}; + +} diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index f97c09471c3..c1f2e14da7c 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -1000,7 +1000,8 @@ void StorageBuffer::reschedule() size_t min = std::max(min_thresholds.time - time_passed, 1); size_t max = std::max(max_thresholds.time - time_passed, 1); - flush_handle->scheduleAfter(std::min(min, max) * 1000); + size_t flush = std::max(flush_thresholds.time - time_passed, 1); + flush_handle->scheduleAfter(std::min({min, max, flush}) * 1000); } void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index da648aa4e5c..5bfb3b4ce45 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -56,6 +56,8 @@ #include #include #include +#include +#include #include #include @@ -118,6 +120,7 @@ namespace ErrorCodes extern const int ALTER_OF_COLUMN_IS_FORBIDDEN; extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES; extern const int ARGUMENT_OUT_OF_BOUND; + extern const int TOO_LARGE_DISTRIBUTED_DEPTH; } namespace ActionLocks @@ -705,6 +708,9 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context) { const Settings & settings = local_context->getSettingsRef(); + if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth) + throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); + std::shared_ptr storage_src; auto & select = query.select->as(); auto new_query = std::dynamic_pointer_cast(query.clone()); @@ -719,28 +725,60 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer storage_src = std::dynamic_pointer_cast(joined_tables.getLeftTableStorage()); if (storage_src) { - const auto select_with_union_query = std::make_shared(); - select_with_union_query->list_of_selects = std::make_shared(); + /// Unwrap view() function. + if (storage_src->remote_table_function_ptr) + { + const TableFunctionPtr src_table_function = + TableFunctionFactory::instance().get(storage_src->remote_table_function_ptr, local_context); + const TableFunctionView * view_function = + assert_cast(src_table_function.get()); + new_query->select = view_function->getSelectQuery().clone(); + } + else + { + const auto select_with_union_query = std::make_shared(); + select_with_union_query->list_of_selects = std::make_shared(); - auto new_select_query = std::dynamic_pointer_cast(select_query->clone()); - select_with_union_query->list_of_selects->children.push_back(new_select_query); + auto new_select_query = std::dynamic_pointer_cast(select_query->clone()); + select_with_union_query->list_of_selects->children.push_back(new_select_query); - new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName()); + new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName()); - new_query->select = select_with_union_query; + new_query->select = select_with_union_query; + } } } } } - if (!storage_src || storage_src->getClusterName() != getClusterName()) + const Cluster::AddressesWithFailover & src_addresses = storage_src ? storage_src->getCluster()->getShardsAddresses() : Cluster::AddressesWithFailover{}; + const Cluster::AddressesWithFailover & dst_addresses = getCluster()->getShardsAddresses(); + /// Compare addresses instead of cluster name, to handle remote()/cluster(). + /// (since for remote()/cluster() the getClusterName() is empty string) + if (src_addresses != dst_addresses) { + /// The warning should be produced only for root queries, + /// since in case of parallel_distributed_insert_select=1, + /// it will produce warning for the rewritten insert, + /// since destination table is still Distributed there. + if (local_context->getClientInfo().distributed_depth == 0) + { + LOG_WARNING(log, + "Parallel distributed INSERT SELECT is not possible " + "(source cluster={} ({} addresses), destination cluster={} ({} addresses))", + storage_src ? storage_src->getClusterName() : "", + src_addresses.size(), + getClusterName(), + dst_addresses.size()); + } return nullptr; } if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL) { new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName()); + /// Reset table function for INSERT INTO remote()/cluster() + new_query->table_function.reset(); } const auto & cluster = getCluster(); @@ -757,12 +795,15 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer new_query_str = buf.str(); } + ContextMutablePtr query_context = Context::createCopy(local_context); + ++query_context->getClientInfo().distributed_depth; + for (size_t shard_index : collections::range(0, shards_info.size())) { const auto & shard_info = shards_info[shard_index]; if (shard_info.isLocal()) { - InterpreterInsertQuery interpreter(new_query, local_context); + InterpreterInsertQuery interpreter(new_query, query_context); pipelines.emplace_back(std::make_unique()); pipelines.back()->init(interpreter.execute().pipeline); } @@ -776,7 +817,7 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer /// INSERT SELECT query returns empty block auto remote_query_executor - = std::make_shared(shard_info.pool, std::move(connections), new_query_str, Block{}, local_context); + = std::make_shared(shard_info.pool, std::move(connections), new_query_str, Block{}, query_context); pipelines.emplace_back(std::make_unique()); pipelines.back()->init(Pipe(std::make_shared(remote_query_executor, false, settings.async_socket_for_remote))); pipelines.back()->setSinks([](const Block & header, QueryPipelineBuilder::StreamType) -> ProcessorPtr diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index e47e0fddd6c..45b1cd640ee 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -114,8 +114,6 @@ public: /// Used by InterpreterInsertQuery std::string getRemoteDatabaseName() const { return remote_database; } std::string getRemoteTableName() const { return remote_table; } - /// Returns empty string if tables is used by TableFunctionRemote - std::string getClusterName() const { return cluster_name; } ClusterPtr getCluster() const; /// Used by InterpreterSystemQuery @@ -201,6 +199,7 @@ private: std::optional getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const; size_t getRandomShardIndex(const Cluster::ShardsInfo & shards); + std::string getClusterName() const { return cluster_name.empty() ? "" : cluster_name; } const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index e425caa0532..fc14d055d63 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -125,7 +125,7 @@ void StorageMergeTree::startup() /// Temporary directories contain incomplete results of merges (after forced restart) /// and don't allow to reinitialize them, so delete each of them immediately - clearOldTemporaryDirectories(merger_mutator, 0); + clearOldTemporaryDirectories(0); /// NOTE background task will also do the above cleanups periodically. time_after_previous_cleanup_parts.restart(); @@ -1163,7 +1163,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign assignee.scheduleCommonTask(ExecutableLambdaAdapter::create( [this, share_lock] () { - return clearOldTemporaryDirectories(merger_mutator, getSettings()->temporary_directories_lifetime.totalSeconds()); + return clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds()); }, common_assignee_trigger, getStorageID()), /* need_trigger */ false); scheduled = true; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 6ad14502de7..c53a4963fbe 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -451,7 +451,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } /// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart), /// don't allow to reinitialize them, delete each of them immediately. - clearOldTemporaryDirectories(merger_mutator, 0); + clearOldTemporaryDirectories(0); clearOldWriteAheadLogs(); } diff --git a/src/TableFunctions/CMakeLists.txt b/src/TableFunctions/CMakeLists.txt index c9948a4b131..c58f93e310a 100644 --- a/src/TableFunctions/CMakeLists.txt +++ b/src/TableFunctions/CMakeLists.txt @@ -4,14 +4,18 @@ if (TARGET ch_contrib::hivemetastore) add_headers_and_sources(clickhouse_table_functions Hive) endif () -list(REMOVE_ITEM clickhouse_table_functions_sources ITableFunction.cpp TableFunctionFactory.cpp) -list(REMOVE_ITEM clickhouse_table_functions_headers ITableFunction.h TableFunctionFactory.h) +list(REMOVE_ITEM clickhouse_table_functions_sources + ITableFunction.cpp + TableFunctionView.cpp + TableFunctionFactory.cpp) +list(REMOVE_ITEM clickhouse_table_functions_headers + ITableFunction.h + TableFunctionView.h + TableFunctionFactory.h) add_library(clickhouse_table_functions ${clickhouse_table_functions_sources}) + +target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms) if (TARGET ch_contrib::hivemetastore) - target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms ch_contrib::hivemetastore ch_contrib::hdfs) -else () - target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms) + target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::hivemetastore ch_contrib::hdfs) endif () - - diff --git a/src/TableFunctions/TableFunctionView.cpp b/src/TableFunctions/TableFunctionView.cpp index 2cab8aeca25..e9fcbb219a3 100644 --- a/src/TableFunctions/TableFunctionView.cpp +++ b/src/TableFunctions/TableFunctionView.cpp @@ -15,6 +15,12 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } + +const ASTSelectWithUnionQuery & TableFunctionView::getSelectQuery() const +{ + return *create.select; +} + void TableFunctionView::parseArguments(const ASTPtr & ast_function, ContextPtr /*context*/) { const auto * function = ast_function->as(); diff --git a/src/TableFunctions/TableFunctionView.h b/src/TableFunctions/TableFunctionView.h index c20b45e7546..4afb049e738 100644 --- a/src/TableFunctions/TableFunctionView.h +++ b/src/TableFunctions/TableFunctionView.h @@ -16,6 +16,9 @@ class TableFunctionView : public ITableFunction public: static constexpr auto name = "view"; std::string getName() const override { return name; } + + const ASTSelectWithUnionQuery & getSelectQuery() const; + private: StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override; const char * getStorageTypeName() const override { return "View"; } diff --git a/tests/ci/ci_config.py b/tests/ci/ci_config.py index 000d3d9a000..b45a4ce90c6 100644 --- a/tests/ci/ci_config.py +++ b/tests/ci/ci_config.py @@ -231,7 +231,6 @@ CI_CONFIG = { }, "Stateful tests (aarch64, actions)": { "required_build": "package_aarch64", - "force_tests": True, }, "Stateful tests (release, DatabaseOrdinary, actions)": { "required_build": "package_release", @@ -259,7 +258,6 @@ CI_CONFIG = { }, "Stateless tests (aarch64, actions)": { "required_build": "package_aarch64", - "force_tests": True, }, "Stateless tests (release, wide parts enabled, actions)": { "required_build": "package_release", diff --git a/tests/integration/test_hive_query/test.py b/tests/integration/test_hive_query/test.py index 9997457e93d..20b6a6cb8f2 100644 --- a/tests/integration/test_hive_query/test.py +++ b/tests/integration/test_hive_query/test.py @@ -27,37 +27,52 @@ def started_cluster(): def test_create_parquet_table(started_cluster): logging.info('Start testing creating hive table ...') node = started_cluster.instances['h0_0_0'] - node.query("set input_format_parquet_allow_missing_columns = true") - result = node.query(""" - DROP TABLE IF EXISTS default.demo_parquet; - CREATE TABLE default.demo_parquet (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo') PARTITION BY(day); + test_passed = False + for i in range(10): + node.query("set input_format_parquet_allow_missing_columns = true") + result = node.query(""" +DROP TABLE IF EXISTS default.demo_parquet; +CREATE TABLE default.demo_parquet (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo') PARTITION BY(day) """) - logging.info("create result {}".format(result)) - time.sleep(120) - assert result.strip() == '' + logging.info("create result {}".format(result)) + if result.strip() == '': + test_passed = True + break + time.sleep(60) + assert test_passed def test_create_parquet_table_1(started_cluster): logging.info('Start testing creating hive table ...') node = started_cluster.instances['h0_0_0'] - node.query("set input_format_parquet_allow_missing_columns = true") - result = node.query(""" - DROP TABLE IF EXISTS default.demo_parquet_parts; - CREATE TABLE default.demo_parquet_parts (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String), `hour` String) ENGINE = Hive('thrift://hivetest:9083', 'test', 'parquet_demo') PARTITION BY(day, hour); + for i in range(10): + node.query("set input_format_parquet_allow_missing_columns = true") + result = node.query(""" +DROP TABLE IF EXISTS default.demo_parquet_parts; +CREATE TABLE default.demo_parquet_parts (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String), `hour` String) ENGINE = Hive('thrift://hivetest:9083', 'test', 'parquet_demo') PARTITION BY(day, hour); """) - logging.info("create result {}".format(result)) - time.sleep(120) - assert result.strip() == '' + logging.info("create result {}".format(result)) + if result.strip() == '': + test_passed = True + break + time.sleep(60) + assert test_passed def test_create_orc_table(started_cluster): logging.info('Start testing creating hive table ...') node = started_cluster.instances['h0_0_0'] - result = node.query(""" + test_passed = False + for i in range(10): + result = node.query(""" DROP TABLE IF EXISTS default.demo_orc; CREATE TABLE default.demo_orc (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo_orc') PARTITION BY(day) """) - logging.info("create result {}".format(result)) + logging.info("create result {}".format(result)) + if result.strip() == '': + test_passed = True + break + time.sleep(60) - assert result.strip() == '' + assert test_passed def test_create_text_table(started_cluster): logging.info('Start testing creating hive table ...') @@ -146,25 +161,20 @@ def test_parquet_groupby_by_hive_function(started_cluster): def test_cache_read_bytes(started_cluster): node = started_cluster.instances['h0_0_0'] - node.query("set input_format_parquet_allow_missing_columns = true") result = node.query(""" - DROP TABLE IF EXISTS default.demo_parquet; - CREATE TABLE default.demo_parquet (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo') PARTITION BY(day) + CREATE TABLE IF NOT EXISTS default.demo_parquet_1 (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo') PARTITION BY(day) """) - result = node.query(""" - SELECT day, count(*) FROM default.demo_parquet group by day order by day + test_passed = False + for i in range(10): + result = node.query(""" + SELECT day, count(*) FROM default.demo_parquet_1 group by day order by day settings input_format_parquet_allow_missing_columns = true """) - result = node.query(""" - SELECT day, count(*) FROM default.demo_parquet group by day order by day - """) - expected_result = """2021-11-01 1 -2021-11-05 2 -2021-11-11 1 -2021-11-16 2 -""" - time.sleep(120) - assert result == expected_result - result = node.query("select sum(ProfileEvent_ExternalDataSourceLocalCacheReadBytes) from system.metric_log where ProfileEvent_ExternalDataSourceLocalCacheReadBytes > 0") - logging.info("Read bytes from cache:{}".format(result)) - - assert result.strip() != '0' + node.query("system flush logs") + result = node.query("select sum(ProfileEvent_ExternalDataSourceLocalCacheReadBytes) from system.metric_log where ProfileEvent_ExternalDataSourceLocalCacheReadBytes > 0") + if result.strip() == '0': + logging.info("ProfileEvent_ExternalDataSourceLocalCacheReadBytes == 0") + time.sleep(10) + continue + test_passed = True + break + assert test_passed diff --git a/tests/queries/0_stateless/01060_avro.reference b/tests/queries/0_stateless/01060_avro.reference index 224a369d993..a375ae280a9 100644 --- a/tests/queries/0_stateless/01060_avro.reference +++ b/tests/queries/0_stateless/01060_avro.reference @@ -42,6 +42,7 @@ not compatible = compression 1000 1000 +1000 = other 0 1000 diff --git a/tests/queries/0_stateless/01060_avro.sh b/tests/queries/0_stateless/01060_avro.sh index 1cfe5582d0a..3c70927db25 100755 --- a/tests/queries/0_stateless/01060_avro.sh +++ b/tests/queries/0_stateless/01060_avro.sh @@ -50,8 +50,12 @@ echo '=' compression cat "$DATA_DIR"/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' cat "$DATA_DIR"/simple.deflate.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' -#snappy is optional -#cat $DATA_DIR/simple.snappy.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' +# snappy is optional +if [ "$( ${CLICKHOUSE_LOCAL} -q "SELECT value FROM system.build_options where name = 'USE_SNAPPY' LIMIT 1")" == "1" ]; then +cat $DATA_DIR/simple.snappy.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' +else +echo 1000 +fi echo '=' other #no data diff --git a/tests/queries/0_stateless/01086_odbc_roundtrip.sh b/tests/queries/0_stateless/01086_odbc_roundtrip.sh index 705746032f8..20066c6b34c 100755 --- a/tests/queries/0_stateless/01086_odbc_roundtrip.sh +++ b/tests/queries/0_stateless/01086_odbc_roundtrip.sh @@ -1,6 +1,7 @@ #!/usr/bin/env bash -# Tags: no-asan, no-msan, no-fasttest +# Tags: no-asan, no-msan, no-fasttest, no-cpu-aarch64 # Tag no-msan: can't pass because odbc libraries are not instrumented +# Tag no-cpu-aarch64: clickhouse-odbc is not setup for arm CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh diff --git a/tests/queries/0_stateless/01275_parallel_mv.reference b/tests/queries/0_stateless/01275_parallel_mv.reference index a5987acafde..9021ae2bb1a 100644 --- a/tests/queries/0_stateless/01275_parallel_mv.reference +++ b/tests/queries/0_stateless/01275_parallel_mv.reference @@ -2,8 +2,8 @@ set parallel_view_processing=1; insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } system flush logs; -select length(thread_ids) from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1'; -8 +select length(thread_ids) >= 8 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1'; +1 select count() from testX; 10 select count() from testXA; @@ -15,8 +15,8 @@ select count() from testXC; set parallel_view_processing=0; insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } system flush logs; -select length(thread_ids) from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0'; -5 +select length(thread_ids) >= 5 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0'; +1 select count() from testX; 20 select count() from testXA; diff --git a/tests/queries/0_stateless/01275_parallel_mv.sql b/tests/queries/0_stateless/01275_parallel_mv.sql index 11e5ff41417..27b8ef96e0b 100644 --- a/tests/queries/0_stateless/01275_parallel_mv.sql +++ b/tests/queries/0_stateless/01275_parallel_mv.sql @@ -15,7 +15,7 @@ create materialized view testXC engine=MergeTree order by tuple() as select slee set parallel_view_processing=1; insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } system flush logs; -select length(thread_ids) from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1'; +select length(thread_ids) >= 8 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1'; select count() from testX; select count() from testXA; @@ -25,7 +25,7 @@ select count() from testXC; set parallel_view_processing=0; insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } system flush logs; -select length(thread_ids) from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0'; +select length(thread_ids) >= 5 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0'; select count() from testX; select count() from testXA; diff --git a/tests/queries/0_stateless/02100_replaceRegexpAll_bug.reference b/tests/queries/0_stateless/02100_replaceRegexpAll_bug.reference index 993dd9b1cde..4dff9ef38ef 100644 --- a/tests/queries/0_stateless/02100_replaceRegexpAll_bug.reference +++ b/tests/queries/0_stateless/02100_replaceRegexpAll_bug.reference @@ -9,3 +9,4 @@ 1 1 1 +1 diff --git a/tests/queries/0_stateless/02100_replaceRegexpAll_bug.sql b/tests/queries/0_stateless/02100_replaceRegexpAll_bug.sql index 32f7f63f6d0..66ccb044549 100644 --- a/tests/queries/0_stateless/02100_replaceRegexpAll_bug.sql +++ b/tests/queries/0_stateless/02100_replaceRegexpAll_bug.sql @@ -12,3 +12,5 @@ SELECT '1,,' == replaceRegexpOne('1,,', '^[,]*|[,]*$', '') x; SELECT '5935,5998,6014' == trim(BOTH ', ' FROM '5935,5998,6014, ') x; SELECT '5935,5998,6014' == replaceRegexpAll('5935,5998,6014, ', concat('^[', regexpQuoteMeta(', '), ']*|[', regexpQuoteMeta(', '), ']*$'), '') AS x; + +SELECT trim(BOTH '"' FROM '2') == '2' diff --git a/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.reference b/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.reference new file mode 100644 index 00000000000..05fbb680c65 --- /dev/null +++ b/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.reference @@ -0,0 +1,27 @@ +-- { echoOn } +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=1, max_distributed_depth=1; -- { serverError TOO_LARGE_DISTRIBUTED_DEPTH } +select * from dst_02224; +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=1, max_distributed_depth=2; +select * from dst_02224; +1 +1 +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=2, max_distributed_depth=1; +select * from dst_02224; +1 +1 +truncate table dst_02224; +insert into function remote('127.{1,2}', currentDatabase(), dst_02224, key) +select * from remote('127.{1,2}', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=2, max_distributed_depth=1; +select * from dst_02224; +1 +1 diff --git a/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.sql b/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.sql new file mode 100644 index 00000000000..023f220e930 --- /dev/null +++ b/tests/queries/0_stateless/02224_parallel_distributed_insert_select_cluster.sql @@ -0,0 +1,34 @@ +drop table if exists dst_02224; +drop table if exists src_02224; +create table dst_02224 (key Int) engine=Memory(); +create table src_02224 (key Int) engine=Memory(); +insert into src_02224 values (1); + +-- { echoOn } +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=1, max_distributed_depth=1; -- { serverError TOO_LARGE_DISTRIBUTED_DEPTH } +select * from dst_02224; + +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=1, max_distributed_depth=2; +select * from dst_02224; + +truncate table dst_02224; +insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key) +select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=2, max_distributed_depth=1; +select * from dst_02224; + +truncate table dst_02224; +insert into function remote('127.{1,2}', currentDatabase(), dst_02224, key) +select * from remote('127.{1,2}', currentDatabase(), src_02224, key) +settings parallel_distributed_insert_select=2, max_distributed_depth=1; +select * from dst_02224; +-- { echoOff } + +drop table src_02224; +drop table dst_02224; diff --git a/tests/queries/0_stateless/02225_parallel_distributed_insert_select_view.reference b/tests/queries/0_stateless/02225_parallel_distributed_insert_select_view.reference new file mode 100644 index 00000000000..98fb6a68656 --- /dev/null +++ b/tests/queries/0_stateless/02225_parallel_distributed_insert_select_view.reference @@ -0,0 +1,4 @@ +1 +1 +1 +1 diff --git a/tests/queries/0_stateless/02225_parallel_distributed_insert_select_view.sh b/tests/queries/0_stateless/02225_parallel_distributed_insert_select_view.sh new file mode 100755 index 00000000000..376a49fd820 --- /dev/null +++ b/tests/queries/0_stateless/02225_parallel_distributed_insert_select_view.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +# NOTE: sh test is required since view() does not have current database + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " +drop table if exists dst_02225; +drop table if exists src_02225; +create table dst_02225 (key Int) engine=Memory(); +create table src_02225 (key Int) engine=Memory(); +insert into src_02225 values (1); +" + +$CLICKHOUSE_CLIENT --param_database=$CLICKHOUSE_DATABASE -nm -q " +truncate table dst_02225; +insert into function remote('127.{1,2}', currentDatabase(), dst_02225, key) +select * from remote('127.{1,2}', view(select * from {database:Identifier}.src_02225), key) +settings parallel_distributed_insert_select=2, max_distributed_depth=1; +select * from dst_02225; + +-- w/o sharding key +truncate table dst_02225; +insert into function remote('127.{1,2}', currentDatabase(), dst_02225, key) +select * from remote('127.{1,2}', view(select * from {database:Identifier}.src_02225)) +settings parallel_distributed_insert_select=2, max_distributed_depth=1; +select * from dst_02225; +" + +$CLICKHOUSE_CLIENT -nm -q " +drop table src_02225; +drop table dst_02225; +" diff --git a/tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.reference b/tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.sh b/tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.sh new file mode 100755 index 00000000000..2a163746e20 --- /dev/null +++ b/tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " +drop table if exists data_02226; +create table data_02226 (key Int) engine=MergeTree() order by key +as select * from numbers(1); +" + +# Regression for: +# +# Logical error: 'Coordinator for parallel reading from replicas is not initialized'. +opts=( + --allow_experimental_parallel_reading_from_replicas 1 + --max_parallel_replicas 3 + + --iterations 1 +) +$CLICKHOUSE_BENCHMARK --query "select * from remote('127.1', $CLICKHOUSE_DATABASE, data_02226)" "${opts[@]}" >& /dev/null +ret=$? + +$CLICKHOUSE_CLIENT -nm -q " +drop table data_02226; +" + +exit $ret diff --git a/tests/queries/0_stateless/02232_partition_pruner_single_point.reference b/tests/queries/0_stateless/02232_partition_pruner_single_point.reference new file mode 100644 index 00000000000..1191247b6d9 --- /dev/null +++ b/tests/queries/0_stateless/02232_partition_pruner_single_point.reference @@ -0,0 +1,2 @@ +1 +2 diff --git a/tests/queries/0_stateless/02232_partition_pruner_single_point.sql b/tests/queries/0_stateless/02232_partition_pruner_single_point.sql new file mode 100644 index 00000000000..0400d0e1b59 --- /dev/null +++ b/tests/queries/0_stateless/02232_partition_pruner_single_point.sql @@ -0,0 +1,14 @@ +DROP TABLE IF EXISTS lower_test; + +CREATE TABLE lower_test ( + a Int32, + b String +) ENGINE=MergeTree +PARTITION BY b +ORDER BY a; + +INSERT INTO lower_test (a,b) VALUES (1,'A'),(2,'B'),(3,'C'); + +SELECT a FROM lower_test WHERE lower(b) IN ('a','b') order by a; + +DROP TABLE lower_test; diff --git a/website/benchmark/dbms/index.html b/website/benchmark/dbms/index.html index b4e29098ead..a856bbb0502 100644 --- a/website/benchmark/dbms/index.html +++ b/website/benchmark/dbms/index.html @@ -15,7 +15,7 @@
- ClickHouse + ClickHouse

Performance comparison of analytical DBMS

diff --git a/website/benchmark/hardware/index.html b/website/benchmark/hardware/index.html index 06878eb077c..42c87c334c0 100644 --- a/website/benchmark/hardware/index.html +++ b/website/benchmark/hardware/index.html @@ -15,7 +15,7 @@
- ClickHouse + ClickHouse

{{ title }}

diff --git a/website/css/greenhouse.css b/website/css/greenhouse.css deleted file mode 100644 index 76812a169e8..00000000000 --- a/website/css/greenhouse.css +++ /dev/null @@ -1 +0,0 @@ -#main{padding-bottom:0;padding-top:0}#wrapper{max-width:1078px;padding:0}body>#wrapper>#main>#wrapper>#content,body>#wrapper>#main>#wrapper>#logo,body>#wrapper>#main>#wrapper>h1{display:none}body>#wrapper>#main>#wrapper>#board_title{margin-top:0}body>#wrapper>#main>#logo{margin-top:80px}body>#wrapper>#main>:last-child{margin-bottom:120px} \ No newline at end of file diff --git a/website/js/base.js b/website/js/base.js index 52b801eb98f..d953d5f6a1f 100644 --- a/website/js/base.js +++ b/website/js/base.js @@ -55,7 +55,7 @@ $('pre').each(function(_, element) { $(element).prepend( - 'Copy' + 'Copy' ); }); diff --git a/website/src/scss/greenhouse.scss b/website/src/scss/greenhouse.scss deleted file mode 100644 index 710b606fa15..00000000000 --- a/website/src/scss/greenhouse.scss +++ /dev/null @@ -1,27 +0,0 @@ -#main { - padding-bottom: 0; - padding-top: 0; -} - -#wrapper { - max-width: 1078px; - padding: 0; -} - -body > #wrapper > #main > #wrapper > #logo, -body > #wrapper > #main > #wrapper > h1, -body > #wrapper > #main > #wrapper > #content { - display: none; -} - -body > #wrapper > #main > #wrapper > #board_title { - margin-top: 0; -} - -body > #wrapper > #main > #logo { - margin-top: 80px; -} - -body > #wrapper > #main > :last-child { - margin-bottom: 120px; -} diff --git a/website/templates/common_css.html b/website/templates/common_css.html index ac10b233f25..b26b2bf973e 100644 --- a/website/templates/common_css.html +++ b/website/templates/common_css.html @@ -1,4 +1,4 @@ - + {% for src in extra_css %} diff --git a/website/templates/common_js.html b/website/templates/common_js.html index 72421f00562..93e35d37918 100644 --- a/website/templates/common_js.html +++ b/website/templates/common_js.html @@ -1,4 +1,4 @@ - + {% for src in extra_js %} diff --git a/website/templates/common_meta.html b/website/templates/common_meta.html index 018d533e893..07aa05d28b1 100644 --- a/website/templates/common_meta.html +++ b/website/templates/common_meta.html @@ -7,7 +7,7 @@ {% if title %}{{ title }}{% else %}{{ _('ClickHouse - fast open-source OLAP DBMS') }}{% endif %} - + @@ -15,7 +15,7 @@ {% if page and page.meta.image %} {% else %} - + {% endif %} diff --git a/website/templates/docs/amp.html b/website/templates/docs/amp.html index 5d2777af188..dc7dd7acb49 100644 --- a/website/templates/docs/amp.html +++ b/website/templates/docs/amp.html @@ -20,7 +20,7 @@ diff --git a/website/templates/docs/nav.html b/website/templates/docs/nav.html index 4d57d282796..afac39c2fab 100644 --- a/website/templates/docs/nav.html +++ b/website/templates/docs/nav.html @@ -1,7 +1,7 @@